티스토리 뷰
728x90
MongoDB Source Connector는 MongoDB의 데이터를 Kafka로 실시간 스트리밍하는 역할을 합니다.
주요 개념:
항목 | 설명 |
역할 | MongoDB → Kafka 로 데이터 복제 (변경 스트림 기반) |
방식 | MongoDB Change Streams 활용 |
패턴 | CDC(Change Data Capture) |
제공 형태 | Confluent MongoDB Source Connector 또는 Debezium MongoDB Connector 등 사용 가능 |
기본 설정 항목 (MongoDB Source Connector)
Kafka Connect용 Connector 설정 JSON(or YAML)을 예시로 보여드리겠습니다.
{
"name": "mongo-source-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
// MongoDB 접속 설정
"connection.uri": "mongodb://mongo-user:mongo-password@mongo-host:27017/admin",
// 복제할 데이터베이스 및 컬렉션
"database": "yourDatabase",
"collection": "yourCollection",
// 카프카로 보내질 토픽 이름
"topic.prefix": "mongo.",
// 모드 선택 (Change Stream 기반)
"publish.full.document.only": "true",
"copy.existing": "true",
// 시작 시점 설정
"copy.existing.pipeline": "[{\"$match\": {}}]",
// 커넥터 오류 복구 설정
"errors.tolerance": "all",
"errors.log.enable": "true",
"errors.log.include.messages": "true"
}
}
주요 설정 설명
설정 항목 | 설명 |
connection.uri | MongoDB 접속 URI (ID/PW 포함 또는 인증서 기반) |
database | 복제할 MongoDB 데이터베이스 이름 |
collection | 복제할 MongoDB 컬렉션 이름 (생략하면 전체 데이터베이스) |
topic.prefix | Kafka로 보낼 토픽 이름 앞에 붙일 접두사 |
copy.existing | Connector 시작 시 기존 데이터를 복제할지 여부 |
publish.full.document.only | 변경 스트림 이벤트 중 전체 문서(fullDocument)만 출력할지 여부 |
errors.tolerance | 에러 발생 시 무시할지 여부 (all이면 무시하고 계속) |
errors.log.enable | 에러를 로그에 출력할지 여부 |
errors.log.include.messages | 에러 발생 시 구체적인 메시지도 함께 출력할지 여부 |
추가 고급 설정 (선택)
고급 옵션 | 설명 |
pipeline | MongoDB Change Streams에 사용할 Match/Pipeline 쿼리 (필터링) |
copy.existing.pipeline | 초기 데이터 복제 시 사용할 Match/Pipeline 쿼리 |
output.format.key | Kafka 메시지의 Key 포맷 지정 (json, bson, schema 등) |
output.format.value | Kafka 메시지의 Value 포맷 지정 |
heartbeat.interval.ms | Heartbeat 주기 설정 (커넥션 유지를 위해) |
post.processor.chain | 데이터 후처리 체인 설정 (필터, 변환 등 가능) |
728x90
'DB' 카테고리의 다른 글
📚 MongoDB Source Connector 설정 전체 목록 (0) | 2025.04.14 |
---|---|
MongoDB Source Connector의 필터링 / 변환 / 포맷팅 완전 정리 (0) | 2025.04.14 |
MongoDB Change Streams + Kafka (MongoDB의 데이터 변경사항을 Kafka 메시지로 실시간 전송) (0) | 2025.04.12 |
Kafka Connect Transformer란? (0) | 2025.04.04 |
Kafka Connect MongoDB Sink Connector 등록/수정/삭제/조회 (0) | 2025.04.04 |