DB
Kafka Connect MongoDB Sink Connector 설정 총정리
ipxy
2025. 3. 18. 14:51
728x90
이 문서는 Kafka Connect를 사용하여 Kafka 토픽의 데이터를 MongoDB로 전송하는 싱크 커넥터(Sink Connector) 설정을 종합적으로 정리합니다. 기본 설정부터 고급 설정까지 모든 주요 옵션을 다룹니다.
1. curl 명령어 (커넥터 생성)
curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{...}'
Use code with caution.Bash
- curl -X POST: HTTP POST 요청으로 커넥터를 생성.
- http://localhost:8083/connectors: Kafka Connect REST API 엔드포인트 (기본 포트: 8083).
- -H "Content-Type: application/json": 요청 본문이 JSON 형식임을 명시.
- -d '{...}': JSON 형식의 커넥터 설정.
2. 커넥터 설정 (config)
{
"name": "mongo-sink-connector", // 커넥터의 고유한 이름
"config": {
// -- 필수 설정 --
"connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", // 커넥터 클래스
"tasks.max": "1", // 최대 작업(task) 수
"topics": "my_kafka_topic", // Kafka 토픽 이름 (","로 여러 개 지정 가능)
"connection.uri": "mongodb://localhost:27017", // MongoDB 연결 URI
"database": "mydatabase", // MongoDB 데이터베이스 이름
"collection": "mycollection", // MongoDB 컬렉션 이름
"key.converter": "org.apache.kafka.connect.storage.StringConverter", // 키 변환기
"value.converter": "org.apache.kafka.connect.json.JsonConverter", // 값 변환기
"value.converter.schemas.enable": "false", // 값에 스키마 정보 포함 여부 (JsonConverter 사용 시)
// -- 데이터 변환 (Transforms) --
"transforms": "AddPrefix,RenameField", // 적용할 변환 목록
"transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", // 변환 유형
"transforms.AddPrefix.regex": "(.*)", // 정규 표현식 (RegexRouter 예시)
"transforms.AddPrefix.replacement": "prefix_$1", // 치환 문자열 (RegexRouter 예시)
"transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", // 필드 이름 변경 변환
"transforms.RenameField.renames": "old_field_name:new_field_name", // 이전 필드 이름:새 필드 이름
// -- 데이터 필터링 (Predicates) --
"predicates": "TopicMatch", // 적용할 조건 목록
"predicates.TopicMatch.type": "org.apache.kafka.connect.predicates.TopicNameMatches", // 조건 유형
"predicates.TopicMatch.pattern": "my_kafka_topic", // 토픽 이름 패턴
// -- 쓰기 동작 (Write Behavior) --
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy", // _id 생성 전략
"max.batch.size": "0", // 한 번에 쓸 최대 레코드 수 (0: 제한 없음)
"rate.limiting.timeout": "0", // 쓰기 요청 대기 시간(초) (rate limiting 사용 시)
"rate.limiting.every.n": "0", // rate limiting 적용 빈도 (0: 사용 안 함)
"bulk.write.ordered": "true", // 순서가 지정된 bulk write 사용 여부
// -- 오류 처리 (Error Handling) --
"errors.tolerance": "none", // 오류 처리 ('none': 즉시 실패, 'all': 무시)
"errors.log.enable": "false", // 오류 로그 기록 여부
"errors.log.include.messages": "false", // 로그에 메시지 내용 포함 여부
"errors.deadletterqueue.topic.name": "", // 데드 레터 큐(DLQ) 토픽 이름
"errors.deadletterqueue.context.headers.enable": "false", // DLQ 메시지에 헤더 추가 여부
// -- 연결 및 인증 (Connection and Authentication) --
// "connection.uri" (위 필수 설정에 포함)
"mongodb.ssl.enabled": "false", // SSL/TLS 사용 여부
"mongodb.ssl.truststore.location": "", // truststore 경로 (SSL 사용 시)
"mongodb.ssl.truststore.password": "", // truststore 비밀번호 (SSL 사용 시)
"mongodb.ssl.keystore.location": "", // keystore 경로 (SSL 양방향 인증 시)
"mongodb.ssl.keystore.password": "", // keystore 비밀번호 (SSL 양방향 인증 시)
"mongodb.ssl.invalid.host.name.allowed": "false", // 호스트 이름 불일치 허용 여부 (SSL)
// -- 기타 설정 --
"topic.namespace.map": "", // 토픽-데이터베이스/컬렉션 매핑
"field.renamer.mapping": "", // 필드 이름 매핑
"field.exclude.list": "", // 제외할 필드 목록
"topic.override.<topicName>.<propertyName>": "" // 특정 토픽에 대한 설정 재정의
}
}
각 설정 항목 설명
- name: 커넥터의 고유한 이름.
- connector.class: 사용할 커넥터 클래스 (MongoDB Sink Connector).
- tasks.max: 커넥터가 실행할 최대 작업 수.
- topics: 데이터를 읽어올 Kafka 토픽 (","로 구분하여 여러 개 지정 가능).
- connection.uri: MongoDB 연결 문자열 (인증 정보 포함 가능).
- database: 데이터를 저장할 MongoDB 데이터베이스.
- collection: 데이터를 저장할 MongoDB 컬렉션.
- key.converter: Kafka 메시지 키 변환기.
- value.converter: Kafka 메시지 값 변환기.
- value.converter.schemas.enable: 값 변환기가 JsonConverter일 때 스키마 정보 포함 여부.
- transforms: 적용할 데이터 변환 목록.
- transforms.<name>.type: 변환 유형 (클래스 이름).
- transforms.<name>.<property>: 변환 속성.
- predicates: 적용할 데이터 필터링 조건 목록.
- predicates.<name>.type: 조건 유형 (클래스 이름).
- predicates.<name>.<property>: 조건 속성.
- document.id.strategy: MongoDB 문서 _id 생성 전략.
- max.batch.size: 한 번에 MongoDB에 쓸 최대 레코드 수.
- rate.limiting.timeout: 쓰기 요청 대기 시간(초) (rate limiting 사용 시).
- rate.limiting.every.n: rate limiting 적용 빈도.
- bulk.write.ordered: 순서가 지정된 bulk write 사용 여부.
- errors.tolerance: 오류 처리 방식 (none 또는 all).
- errors.log.enable: 오류 로그 기록 여부.
- errors.log.include.messages: 로그에 Kafka 메시지 내용 포함 여부.
- errors.deadletterqueue.topic.name: 데드 레터 큐 토픽 이름.
- errors.deadletterqueue.context.headers.enable: DLQ 메시지에 컨텍스트 헤더를 추가할지 여부
- mongodb.ssl.enabled: SSL/TLS 연결 사용 여부.
- mongodb.ssl.truststore.location: truststore 경로 (SSL 사용 시).
- mongodb.ssl.truststore.password: truststore 비밀번호 (SSL 사용 시).
- mongodb.ssl.keystore.location: keystore 경로 (SSL 양방향 인증 시).
- mongodb.ssl.keystore.password: keystore 비밀번호 (SSL 양방향 인증 시).
- mongodb.ssl.invalid.host.name.allowed: SSL 서버 인증서 호스트 이름 불일치 허용
- topic.namespace.map: Kafka 토픽과 MongoDB 데이터베이스/컬렉션 매핑 규칙.
- field.renamer.mapping: Kafka 필드와 MongoDB 필드 이름 매핑.
- field.exclude.list: MongoDB 문서에서 제외할 필드 목록.
- topic.override.<topicName>.<propertyName>: 특정 Kafka 토픽에 대한 설정 재정의.
728x90