DB

MongoDB Sink Connector 설정에 유용한 주요 옵션들

ipxy 2025. 4. 3. 15:46
728x90

✅ MongoDB Sink Connector 설정에 유용한 주요 옵션들

설명
connection.uri 필수. MongoDB 접속 주소 (mongodb://user:pass@host:port/?authSource=...)
topics Kafka에서 수신할 토픽 이름 (쉼표로 여러 개 가능)
database MongoDB 내 사용할 데이터베이스 이름
collection MongoDB 내 컬렉션 이름
tasks.max 병렬 태스크 수 (성능 확장용)
writemodel.strategy ReplaceOneDefaultStrategy, InsertOneDefaultStrategy 등 쓰기 전략
document.id.strategy 문서의 _id 생성 방식 (예: BsonOidStrategy, ProvidedInKeyStrategy, FullKeyStrategy)
document.id.strategy.overwrite.existing 기존 문서를 덮어쓸지 여부 (true/false)
transforms Kafka 메시지 가공용 SMT(Single Message Transform) 이름
transforms.X.type SMT 타입 (HoistField, ExtractField, ValueToKey, 등)
errors.tolerance 오류 허용 수준 (none, all)
errors.deadletterqueue.topic.name 에러 메시지를 보낼 DLQ 토픽 이름
errors.deadletterqueue.context.headers.enable DLQ 메시지에 원본 메타데이터 포함 여부
value.converter Kafka 메시지 value 처리 방식 (JsonConverter, AvroConverter, 등)
value.converter.schemas.enable 스키마 포함 여부 (false 권장)
key.converter Kafka 메시지 key 처리 방식
key.converter.schemas.enable key에도 스키마 붙일지 여부
delete.on.null.values 메시지 값이 null일 때 Mongo 문서 삭제 여부 (true 시 tombstone 처리됨)
max.batch.size MongoDB에 한번에 쓸 레코드 수
max.buffer.size 버퍼에 보관할 레코드 수 (flush 전)
rate.limiting.timeout 쓰기 실패 시 재시도 시간 제한 (ms)
rate.limiting.every.n 지정 수만큼 문서마다 지연 (백프레셔 대응)

✅ SMT(Single Message Transform) 관련 예시

MongoDB Sink에서 자주 쓰는 SMT 설정 예시도 함께 알려드릴게요:

메시지의 payload 필드만 MongoDB에 넣고 싶을 때:

"transforms": "HoistField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "payload"

메시지의 특정 필드(예: id)를 Mongo 문서 _id로 쓰고 싶을 때:

"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list": "id",
"document.id.strategy.partial.value.projection.type": "AllowList"

✅ 샘플: 전체 예시


 

{
  // MongoDB Kafka Sink Connector 클래스 지정
  "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",

  // 기존 문서가 있을 경우 덮어쓰기 설정
  "document.id.strategy.overwrite.existing": "true",

  // MongoDB에 저장할 때 ReplaceOne 방식 사용 (기존 문서를 교체)
  "writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy",

  // JSON payload를 루트로 감싸는 HoistField 변환기 설정
  "transforms.HoistField.field": "payload",

  // 최대 태스크 수 지정 (1개만 사용하여 단일 처리)
  "tasks.max": "1",

  // Kafka에서 수신할 토픽 이름
  "topics": "user-tp",

  // 사용할 Kafka Connect 변환(transform) 체인 설정
  "transforms": "HoistField",

  // HoistField 변환기 타입 지정 (value 기준으로 작동)
  "transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",

  // MongoDB에 저장할 컬렉션 이름
  "collection": "user",

  // DLQ(Dead Letter Queue) 전송 시 헤더 정보 포함 여부
  "errors.deadletterqueue.context.headers.enable": "true",

  // Kafka 메시지 key에 스키마 포함 여부 (false로 설정하여 스키마 생략)
  "key.converter.schemas.enable": "false",

  // MongoDB에 저장할 DB 이름
  "database": "portal",

  // 문서 ID 전략: MongoDB ObjectId 사용
  "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",

  // DLQ 토픽 이름 설정 (에러 발생 시 메시지 저장 위치)
  "errors.deadletterqueue.topic.name": "dlq-users",

  // MongoDB 접속 URI (보안을 위해 값은 빈 문자열로 비워둠 — 실제 운영 시 연결 정보 입력 필요)
  "connection.uri": "",

  // Kafka 메시지 value에 스키마 포함 여부 (false로 설정하여 스키마 생략)
  "value.converter.schemas.enable": "false",

  // 이 커넥터 인스턴스의 이름 (Kafka Connect 내부 식별자)
  "name": "mongodb-sink-user",

  // 오류 허용 수준: 모든 오류 허용 (DLQ로 전송)
  "errors.tolerance": "all",

  // DLQ 토픽의 복제 팩터 (브로커 수가 1개인 경우 1로 설정)
  "errors.deadletterqueue.topic.replication.factor": "1",

  // Kafka 메시지 value 변환기 (JSON 사용)
  "value.converter": "org.apache.kafka.connect.json.JsonConverter",

  // Kafka 메시지 key 변환기 (JSON 사용)
  "key.converter": "org.apache.kafka.connect.json.JsonConverter"
}

 

 

728x90