티스토리 뷰
728x90
✅ MongoDB Sink Connector 설정에 유용한 주요 옵션들
키 | 설명 |
connection.uri | 필수. MongoDB 접속 주소 (mongodb://user:pass@host:port/?authSource=...) |
topics | Kafka에서 수신할 토픽 이름 (쉼표로 여러 개 가능) |
database | MongoDB 내 사용할 데이터베이스 이름 |
collection | MongoDB 내 컬렉션 이름 |
tasks.max | 병렬 태스크 수 (성능 확장용) |
writemodel.strategy | ReplaceOneDefaultStrategy, InsertOneDefaultStrategy 등 쓰기 전략 |
document.id.strategy | 문서의 _id 생성 방식 (예: BsonOidStrategy, ProvidedInKeyStrategy, FullKeyStrategy) |
document.id.strategy.overwrite.existing | 기존 문서를 덮어쓸지 여부 (true/false) |
transforms | Kafka 메시지 가공용 SMT(Single Message Transform) 이름 |
transforms.X.type | SMT 타입 (HoistField, ExtractField, ValueToKey, 등) |
errors.tolerance | 오류 허용 수준 (none, all) |
errors.deadletterqueue.topic.name | 에러 메시지를 보낼 DLQ 토픽 이름 |
errors.deadletterqueue.context.headers.enable | DLQ 메시지에 원본 메타데이터 포함 여부 |
value.converter | Kafka 메시지 value 처리 방식 (JsonConverter, AvroConverter, 등) |
value.converter.schemas.enable | 스키마 포함 여부 (false 권장) |
key.converter | Kafka 메시지 key 처리 방식 |
key.converter.schemas.enable | key에도 스키마 붙일지 여부 |
delete.on.null.values | 메시지 값이 null일 때 Mongo 문서 삭제 여부 (true 시 tombstone 처리됨) |
max.batch.size | MongoDB에 한번에 쓸 레코드 수 |
max.buffer.size | 버퍼에 보관할 레코드 수 (flush 전) |
rate.limiting.timeout | 쓰기 실패 시 재시도 시간 제한 (ms) |
rate.limiting.every.n | 지정 수만큼 문서마다 지연 (백프레셔 대응) |
✅ SMT(Single Message Transform) 관련 예시
MongoDB Sink에서 자주 쓰는 SMT 설정 예시도 함께 알려드릴게요:
메시지의 payload 필드만 MongoDB에 넣고 싶을 때:
"transforms": "HoistField",
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
"transforms.HoistField.field": "payload"
메시지의 특정 필드(예: id)를 Mongo 문서 _id로 쓰고 싶을 때:
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list": "id",
"document.id.strategy.partial.value.projection.type": "AllowList"
✅ 샘플: 전체 예시
{
// MongoDB Kafka Sink Connector 클래스 지정
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector",
// 기존 문서가 있을 경우 덮어쓰기 설정
"document.id.strategy.overwrite.existing": "true",
// MongoDB에 저장할 때 ReplaceOne 방식 사용 (기존 문서를 교체)
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneDefaultStrategy",
// JSON payload를 루트로 감싸는 HoistField 변환기 설정
"transforms.HoistField.field": "payload",
// 최대 태스크 수 지정 (1개만 사용하여 단일 처리)
"tasks.max": "1",
// Kafka에서 수신할 토픽 이름
"topics": "user-tp",
// 사용할 Kafka Connect 변환(transform) 체인 설정
"transforms": "HoistField",
// HoistField 변환기 타입 지정 (value 기준으로 작동)
"transforms.HoistField.type": "org.apache.kafka.connect.transforms.HoistField$Value",
// MongoDB에 저장할 컬렉션 이름
"collection": "user",
// DLQ(Dead Letter Queue) 전송 시 헤더 정보 포함 여부
"errors.deadletterqueue.context.headers.enable": "true",
// Kafka 메시지 key에 스키마 포함 여부 (false로 설정하여 스키마 생략)
"key.converter.schemas.enable": "false",
// MongoDB에 저장할 DB 이름
"database": "portal",
// 문서 ID 전략: MongoDB ObjectId 사용
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy",
// DLQ 토픽 이름 설정 (에러 발생 시 메시지 저장 위치)
"errors.deadletterqueue.topic.name": "dlq-users",
// MongoDB 접속 URI (보안을 위해 값은 빈 문자열로 비워둠 — 실제 운영 시 연결 정보 입력 필요)
"connection.uri": "",
// Kafka 메시지 value에 스키마 포함 여부 (false로 설정하여 스키마 생략)
"value.converter.schemas.enable": "false",
// 이 커넥터 인스턴스의 이름 (Kafka Connect 내부 식별자)
"name": "mongodb-sink-user",
// 오류 허용 수준: 모든 오류 허용 (DLQ로 전송)
"errors.tolerance": "all",
// DLQ 토픽의 복제 팩터 (브로커 수가 1개인 경우 1로 설정)
"errors.deadletterqueue.topic.replication.factor": "1",
// Kafka 메시지 value 변환기 (JSON 사용)
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
// Kafka 메시지 key 변환기 (JSON 사용)
"key.converter": "org.apache.kafka.connect.json.JsonConverter"
}
728x90
'DB' 카테고리의 다른 글
Kafka Connect Transformer란? (0) | 2025.04.04 |
---|---|
Kafka Connect MongoDB Sink Connector 등록/수정/삭제/조회 (0) | 2025.04.04 |
MongoDB를 PostgreSQL + FerretDB로 마이그레이션 (0) | 2025.03.26 |
Debezium을 사용한 Microsoft SQL Server 변경 데이터 캡처(CDC) 설정 가이드 (0) | 2025.03.18 |
Debezium을 SQL Server Express에서 사용할 수 있는 대안 방법 (0) | 2025.03.18 |