DB
📚 MongoDB Source Connector 설정 전체 목록
ipxy
2025. 4. 14. 10:43
728x90
MongoDB Source Connector는 대략 3가지 범주로 설정을 나눌 수 있습니다:
- 기본 연결 / 필수 항목
- 고급 옵션 / 스트리밍 관련
- 오류 처리 / 변환 처리(SMT) / 성능 조정
범주별로 모든 항목을 설명 + 언제 사용하는지까지 구체적으로 적어볼게요.
1. 기본 연결 / 필수 항목
항목 필수 설명
connector.class | ✅ | com.mongodb.kafka.connect.MongoSourceConnector 로 고정 |
tasks.max | ✅ | 병렬 실행할 태스크 수. 기본 1. 복수 지정시 컬렉션을 샤딩해서 분산 읽기 가능 |
connection.uri | ✅ | MongoDB 접속 URI (Authentication, ReplicaSet, TLS 모두 설정 가능) |
database | ⭕ | 관찰할 데이터베이스 이름. 생략 가능 (컬렉션 단위 감시만 할 수도 있음) |
collection | ⭕ | 감시할 컬렉션 이름. 생략시 데이터베이스 내 모든 변경사항 수집 |
topic.prefix | ✅ | Kafka Topic 생성 시 붙일 접두사. (Topic 이름 = prefix + collection) |
주의: database/collection을 모두 생략하면 MongoDB 서버 전체를 감시하게 됩니다. 운영에서는 보통 명시합니다.
2. 스트리밍/데이터 읽기 관련 고급 설정
항목 | 기본값 | 설명 |
output.format.key | json | Kafka 메시지의 Key 포맷 (json, schema, bson) |
output.format.value | json | Kafka 메시지의 Value 포맷 (json, schema, bson) |
change.stream.full.document | default | 업데이트 이벤트시 전체 문서 포함 여부 (updateLookup) |
publish.full.document.only | false | 변경된 문서 전체만 Kafka로 보내고 operationType 무시할지 |
pipeline | 없음 | MongoDB Aggregation Pipeline 적용해서 변경 Stream 필터링 가능 |
copy.existing | false | 시작 시 기존 데이터를 Kafka로 적재할지 여부 |
copy.existing.pipeline | 없음 | 기존 데이터 적재 시 적용할 필터 파이프라인 |
copy.existing.max.threads | 1 | 기존 데이터 적재시 사용할 쓰레드 수 |
copy.existing.queue.size | 16000 | 적재 큐 사이즈. 큐가 넘치면 스로틀링 발생 |
copy.existing.namespace.mapper | 기본 매퍼 | 초기 적재 시 Namespace → Topic 매핑 커스터마이즈 가능 |
poll.await.time.ms | 5000 | MongoDB Change Stream 대기 최대 시간 (ms) |
cursor.max.await.time.ms | 1000 | MongoDB Change Stream 커서 대기 최대 시간 (ms) |
batch.size | 1000 | 한번에 가져올 문서 수 (Change Stream에서 읽어올 batch 크기) |
🔥 중요한 것만 요약
- change.stream.full.document = updateLookup → update 이벤트시 변경된 필드만 가져오지 않고 전체 문서 가져옴. (일반적으로 추천)
- copy.existing = true → 과거 데이터까지 가져와서 적재. (초기 마이그레이션에 유용)
- pipeline → MongoDB 서버에 부하 줄이기 위해, 특정 변경사항만 수집할 때 필수
3. 오류 처리 / 로그 / Dead Letter Queue 설정
항목 | 기본값 | 설명 |
errors.tolerance | none | 에러 발생 시 무시(all) 또는 중단(none)할지 |
errors.log.enable | false | 에러 로그 활성화 |
errors.log.include.messages | false | 에러 로그에 레코드 전체 포함 여부 |
errors.deadletterqueue.topic.name | 없음 | 에러가 발생한 레코드를 전송할 Dead Letter Queue Kafka Topic 이름 |
errors.deadletterqueue.context.headers.enable | false | DLQ에 Context Header 포함 여부 |
errors.deadletterqueue.topic.replication.factor | 3 | DLQ Topic 생성시 Replication Factor |
🔥 주요 포인트
- errors.tolerance=all + DLQ 설정 조합 → 운영환경에서는 필수
- DLQ를 안 만들면 에러 발생 시 Connector 자체가 중단될 수 있음
4. Kafka 메시지 변환(SMT) 설정
MongoDB Source Connector는 Kafka Connect의 SMT(Single Message Transform) 기능을 지원합니다.
항목 | 설명 |
transforms | 적용할 SMT 리스트 (쉼표로 구분) |
transforms.<name>.type | SMT 클래스 이름 지정 |
transforms.<name>.<config> | 각 SMT별 상세 옵션 |
예를 들어, 특정 필드를 제거하는 SMT를 추가하고 싶으면:
"transforms": "dropField",
"transforms.dropField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropField.blacklist": "_id, internalField"
SMT 종류: HoistField, ReplaceField, ExtractField, MaskField, TimestampConverter, ValueToKey 등 가능.
5. 성능/튜닝/기타 설정
항목 | 기본값 | 설명 |
heartbeat.interval.ms | 3000 | Connector Heartbeat 간격 (ms) |
max.batch.size | 0 (제한 없음) | Kafka로 보낼 배치 최대 크기 |
topic.creation.default.replication.factor | 3 | 새 Topic 생성시 replication factor |
topic.creation.default.partitions | 6 | 새 Topic 생성시 파티션 수 |
internal.key.converter | org.apache.kafka.connect.json.JsonConverter | 내부 key 변환기 |
internal.value.converter | org.apache.kafka.connect.json.JsonConverter | 내부 value 변환기 |
🖐️ 알아야 할 특이사항 (운영팁)
- MongoDB 버전: 최소 3.6 이상, 권장 4.4 이상
- ReplicaSet 필요: Change Stream은 ReplicaSet 또는 Sharded Cluster 환경이어야 작동
- Kafka Topic 구성: Collection 단위로 자동으로 Topic이 생성됨
- Kafka Connect: Standalone 모드, Distributed 모드 모두 지원
🔥 실제 운영 추천 Best Practice
상황 추천 설정
신규 데이터 + 과거 데이터 다 Kafka로 옮기기 | copy.existing: true, change.stream.full.document: updateLookup |
MongoDB 데이터 많은 경우 | batch.size, copy.existing.max.threads 조정 |
에러 무중단 처리 | errors.tolerance: all, errors.deadletterqueue.topic.name 설정 |
특정 조건 데이터만 Kafka로 보내기 | pipeline 옵션 활용 (ex: status == 'active' 인 것만) |
SMT 변환 필요 | transforms 이용 (필드 제거, 키 변경 등) |
✨ 전체 요약 그림
MongoDB (ChangeStream)
↓
Mongo Source Connector
↓ (필터링, 변환, 포맷팅)
Kafka Topic
↓
Kafka Consumer (다른 시스템 연계)
728x90