DB

MongoDB Source Connector 설정 (Kafka Connect)

ipxy 2025. 4. 12. 10:40
728x90

 

MongoDB Source ConnectorMongoDB의 데이터를 Kafka로 실시간 스트리밍하는 역할을 합니다.
주요 개념:

항목  설명
역할 MongoDB → Kafka 로 데이터 복제 (변경 스트림 기반)
방식 MongoDB Change Streams 활용
패턴 CDC(Change Data Capture)
제공 형태 Confluent MongoDB Source Connector 또는 Debezium MongoDB Connector 등 사용 가능

기본 설정 항목 (MongoDB Source Connector)

Kafka Connect용 Connector 설정 JSON(or YAML)을 예시로 보여드리겠습니다.

{
  "name": "mongo-source-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "tasks.max": "1",

    // MongoDB 접속 설정
    "connection.uri": "mongodb://mongo-user:mongo-password@mongo-host:27017/admin",

    // 복제할 데이터베이스 및 컬렉션
    "database": "yourDatabase",
    "collection": "yourCollection",

    // 카프카로 보내질 토픽 이름
    "topic.prefix": "mongo.",

    // 모드 선택 (Change Stream 기반)
    "publish.full.document.only": "true",
    "copy.existing": "true",

    // 시작 시점 설정
    "copy.existing.pipeline": "[{\"$match\": {}}]",
    
    // 커넥터 오류 복구 설정
    "errors.tolerance": "all",
    "errors.log.enable": "true",
    "errors.log.include.messages": "true"
  }
}

주요 설정 설명

설정 항목 설명
connection.uri MongoDB 접속 URI (ID/PW 포함 또는 인증서 기반)
database 복제할 MongoDB 데이터베이스 이름
collection 복제할 MongoDB 컬렉션 이름 (생략하면 전체 데이터베이스)
topic.prefix Kafka로 보낼 토픽 이름 앞에 붙일 접두사
copy.existing Connector 시작 시 기존 데이터를 복제할지 여부
publish.full.document.only 변경 스트림 이벤트 중 전체 문서(fullDocument)만 출력할지 여부
errors.tolerance 에러 발생 시 무시할지 여부 (all이면 무시하고 계속)
errors.log.enable 에러를 로그에 출력할지 여부
errors.log.include.messages 에러 발생 시 구체적인 메시지도 함께 출력할지 여부

 

추가 고급 설정 (선택)

고급 옵션 설명
pipeline MongoDB Change Streams에 사용할 Match/Pipeline 쿼리 (필터링)
copy.existing.pipeline 초기 데이터 복제 시 사용할 Match/Pipeline 쿼리
output.format.key Kafka 메시지의 Key 포맷 지정 (json, bson, schema 등)
output.format.value Kafka 메시지의 Value 포맷 지정
heartbeat.interval.ms Heartbeat 주기 설정 (커넥션 유지를 위해)
post.processor.chain 데이터 후처리 체인 설정 (필터, 변환 등 가능)

 

728x90