DB

📚 MongoDB Source Connector 설정 전체 목록

ipxy 2025. 4. 14. 10:43
728x90

MongoDB Source Connector는 대략 3가지 범주로 설정을 나눌 수 있습니다:

  • 기본 연결 / 필수 항목
  • 고급 옵션 / 스트리밍 관련
  • 오류 처리 / 변환 처리(SMT) / 성능 조정

범주별로 모든 항목을 설명 + 언제 사용하는지까지 구체적으로 적어볼게요.


1. 기본 연결 / 필수 항목

항목 필수 설명

connector.class com.mongodb.kafka.connect.MongoSourceConnector 로 고정
tasks.max 병렬 실행할 태스크 수. 기본 1. 복수 지정시 컬렉션을 샤딩해서 분산 읽기 가능
connection.uri MongoDB 접속 URI (Authentication, ReplicaSet, TLS 모두 설정 가능)
database 관찰할 데이터베이스 이름. 생략 가능 (컬렉션 단위 감시만 할 수도 있음)
collection 감시할 컬렉션 이름. 생략시 데이터베이스 내 모든 변경사항 수집
topic.prefix Kafka Topic 생성 시 붙일 접두사. (Topic 이름 = prefix + collection)

주의: database/collection을 모두 생략하면 MongoDB 서버 전체를 감시하게 됩니다. 운영에서는 보통 명시합니다.


2. 스트리밍/데이터 읽기 관련 고급 설정

 

항목  기본값  설명
output.format.key json Kafka 메시지의 Key 포맷 (json, schema, bson)
output.format.value json Kafka 메시지의 Value 포맷 (json, schema, bson)
change.stream.full.document default 업데이트 이벤트시 전체 문서 포함 여부 (updateLookup)
publish.full.document.only false 변경된 문서 전체만 Kafka로 보내고 operationType 무시할지
pipeline 없음 MongoDB Aggregation Pipeline 적용해서 변경 Stream 필터링 가능
copy.existing false 시작 시 기존 데이터를 Kafka로 적재할지 여부
copy.existing.pipeline 없음 기존 데이터 적재 시 적용할 필터 파이프라인
copy.existing.max.threads 1 기존 데이터 적재시 사용할 쓰레드 수
copy.existing.queue.size 16000 적재 큐 사이즈. 큐가 넘치면 스로틀링 발생
copy.existing.namespace.mapper 기본 매퍼 초기 적재 시 Namespace → Topic 매핑 커스터마이즈 가능
poll.await.time.ms 5000 MongoDB Change Stream 대기 최대 시간 (ms)
cursor.max.await.time.ms 1000 MongoDB Change Stream 커서 대기 최대 시간 (ms)
batch.size 1000 한번에 가져올 문서 수 (Change Stream에서 읽어올 batch 크기)

🔥 중요한 것만 요약

  • change.stream.full.document = updateLookup → update 이벤트시 변경된 필드만 가져오지 않고 전체 문서 가져옴. (일반적으로 추천)
  • copy.existing = true → 과거 데이터까지 가져와서 적재. (초기 마이그레이션에 유용)
  • pipeline → MongoDB 서버에 부하 줄이기 위해, 특정 변경사항만 수집할 때 필수

3. 오류 처리 / 로그 / Dead Letter Queue 설정

 

항목  기본값  설명
errors.tolerance none 에러 발생 시 무시(all) 또는 중단(none)할지
errors.log.enable false 에러 로그 활성화
errors.log.include.messages false 에러 로그에 레코드 전체 포함 여부
errors.deadletterqueue.topic.name 없음 에러가 발생한 레코드를 전송할 Dead Letter Queue Kafka Topic 이름
errors.deadletterqueue.context.headers.enable false DLQ에 Context Header 포함 여부
errors.deadletterqueue.topic.replication.factor 3 DLQ Topic 생성시 Replication Factor

🔥 주요 포인트

  • errors.tolerance=all + DLQ 설정 조합 → 운영환경에서는 필수
  • DLQ를 안 만들면 에러 발생 시 Connector 자체가 중단될 수 있음

4. Kafka 메시지 변환(SMT) 설정

MongoDB Source Connector는 Kafka Connect의 SMT(Single Message Transform) 기능을 지원합니다.

 

항목  설명
transforms 적용할 SMT 리스트 (쉼표로 구분)
transforms.<name>.type SMT 클래스 이름 지정
transforms.<name>.<config> 각 SMT별 상세 옵션

예를 들어, 특정 필드를 제거하는 SMT를 추가하고 싶으면:

"transforms": "dropField",
"transforms.dropField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropField.blacklist": "_id, internalField"

SMT 종류: HoistField, ReplaceField, ExtractField, MaskField, TimestampConverter, ValueToKey 등 가능.


5. 성능/튜닝/기타 설정

 

항목  기본값  설명
heartbeat.interval.ms 3000 Connector Heartbeat 간격 (ms)
max.batch.size 0 (제한 없음) Kafka로 보낼 배치 최대 크기
topic.creation.default.replication.factor 3 새 Topic 생성시 replication factor
topic.creation.default.partitions 6 새 Topic 생성시 파티션 수
internal.key.converter org.apache.kafka.connect.json.JsonConverter 내부 key 변환기
internal.value.converter org.apache.kafka.connect.json.JsonConverter 내부 value 변환기

🖐️ 알아야 할 특이사항 (운영팁)

  • MongoDB 버전: 최소 3.6 이상, 권장 4.4 이상
  • ReplicaSet 필요: Change Stream은 ReplicaSet 또는 Sharded Cluster 환경이어야 작동
  • Kafka Topic 구성: Collection 단위로 자동으로 Topic이 생성됨
  • Kafka Connect: Standalone 모드, Distributed 모드 모두 지원

🔥 실제 운영 추천 Best Practice

상황 추천 설정

신규 데이터 + 과거 데이터 다 Kafka로 옮기기 copy.existing: true, change.stream.full.document: updateLookup
MongoDB 데이터 많은 경우 batch.size, copy.existing.max.threads 조정
에러 무중단 처리 errors.tolerance: all, errors.deadletterqueue.topic.name 설정
특정 조건 데이터만 Kafka로 보내기 pipeline 옵션 활용 (ex: status == 'active' 인 것만)
SMT 변환 필요 transforms 이용 (필드 제거, 키 변경 등)

✨ 전체 요약 그림

MongoDB (ChangeStream)
    ↓
Mongo Source Connector
    ↓  (필터링, 변환, 포맷팅)
Kafka Topic
    ↓
Kafka Consumer (다른 시스템 연계)

 

728x90