티스토리 뷰

728x90

이 문서는 Kafka Connect를 사용하여 Kafka 토픽의 데이터를 MongoDB로 전송하는 싱크 커넥터(Sink Connector) 설정을 종합적으로 정리합니다. 기본 설정부터 고급 설정까지 모든 주요 옵션을 다룹니다.

1. curl 명령어 (커넥터 생성)

curl -X POST http://localhost:8083/connectors -H "Content-Type: application/json" -d '{...}'
Use code with caution.Bash
  • curl -X POST: HTTP POST 요청으로 커넥터를 생성.
  • http://localhost:8083/connectors: Kafka Connect REST API 엔드포인트 (기본 포트: 8083).
  • -H "Content-Type: application/json": 요청 본문이 JSON 형식임을 명시.
  • -d '{...}': JSON 형식의 커넥터 설정.

2. 커넥터 설정 (config)

{
  "name": "mongo-sink-connector",  // 커넥터의 고유한 이름
  "config": {
    // -- 필수 설정 --
    "connector.class": "com.mongodb.kafka.connect.MongoSinkConnector", // 커넥터 클래스
    "tasks.max": "1",           // 최대 작업(task) 수
    "topics": "my_kafka_topic", // Kafka 토픽 이름 (","로 여러 개 지정 가능)
    "connection.uri": "mongodb://localhost:27017", // MongoDB 연결 URI
    "database": "mydatabase",     // MongoDB 데이터베이스 이름
    "collection": "mycollection",   // MongoDB 컬렉션 이름
    "key.converter": "org.apache.kafka.connect.storage.StringConverter", // 키 변환기
    "value.converter": "org.apache.kafka.connect.json.JsonConverter",  // 값 변환기
    "value.converter.schemas.enable": "false", // 값에 스키마 정보 포함 여부 (JsonConverter 사용 시)

    // -- 데이터 변환 (Transforms) --
    "transforms": "AddPrefix,RenameField", // 적용할 변환 목록
    "transforms.AddPrefix.type": "org.apache.kafka.connect.transforms.RegexRouter", // 변환 유형
    "transforms.AddPrefix.regex": "(.*)",       // 정규 표현식 (RegexRouter 예시)
    "transforms.AddPrefix.replacement": "prefix_$1", // 치환 문자열 (RegexRouter 예시)
    "transforms.RenameField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value", // 필드 이름 변경 변환
    "transforms.RenameField.renames": "old_field_name:new_field_name", // 이전 필드 이름:새 필드 이름

    // -- 데이터 필터링 (Predicates) --
    "predicates": "TopicMatch", // 적용할 조건 목록
    "predicates.TopicMatch.type": "org.apache.kafka.connect.predicates.TopicNameMatches", // 조건 유형
    "predicates.TopicMatch.pattern": "my_kafka_topic", // 토픽 이름 패턴

    // -- 쓰기 동작 (Write Behavior) --
    "document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.BsonOidStrategy", // _id 생성 전략
    "max.batch.size": "0",        // 한 번에 쓸 최대 레코드 수 (0: 제한 없음)
    "rate.limiting.timeout": "0",   // 쓰기 요청 대기 시간(초) (rate limiting 사용 시)
    "rate.limiting.every.n": "0",   // rate limiting 적용 빈도 (0: 사용 안 함)
    "bulk.write.ordered": "true", // 순서가 지정된 bulk write 사용 여부

    // -- 오류 처리 (Error Handling) --
    "errors.tolerance": "none",    // 오류 처리 ('none': 즉시 실패, 'all': 무시)
    "errors.log.enable": "false",   // 오류 로그 기록 여부
    "errors.log.include.messages": "false", // 로그에 메시지 내용 포함 여부
    "errors.deadletterqueue.topic.name": "", // 데드 레터 큐(DLQ) 토픽 이름
    "errors.deadletterqueue.context.headers.enable": "false", // DLQ 메시지에 헤더 추가 여부

    // -- 연결 및 인증 (Connection and Authentication) --
    // "connection.uri" (위 필수 설정에 포함)
    "mongodb.ssl.enabled": "false",              // SSL/TLS 사용 여부
    "mongodb.ssl.truststore.location": "",      // truststore 경로 (SSL 사용 시)
    "mongodb.ssl.truststore.password": "",      // truststore 비밀번호 (SSL 사용 시)
    "mongodb.ssl.keystore.location": "",       // keystore 경로 (SSL 양방향 인증 시)
    "mongodb.ssl.keystore.password": "",       // keystore 비밀번호 (SSL 양방향 인증 시)
    "mongodb.ssl.invalid.host.name.allowed": "false", // 호스트 이름 불일치 허용 여부 (SSL)

    // -- 기타 설정 --
    "topic.namespace.map": "",          // 토픽-데이터베이스/컬렉션 매핑
    "field.renamer.mapping": "",        // 필드 이름 매핑
    "field.exclude.list": "",          // 제외할 필드 목록
    "topic.override.<topicName>.<propertyName>": "" // 특정 토픽에 대한 설정 재정의
  }
}

각 설정 항목 설명

  • name: 커넥터의 고유한 이름.
  • connector.class: 사용할 커넥터 클래스 (MongoDB Sink Connector).
  • tasks.max: 커넥터가 실행할 최대 작업 수.
  • topics: 데이터를 읽어올 Kafka 토픽 (","로 구분하여 여러 개 지정 가능).
  • connection.uri: MongoDB 연결 문자열 (인증 정보 포함 가능).
  • database: 데이터를 저장할 MongoDB 데이터베이스.
  • collection: 데이터를 저장할 MongoDB 컬렉션.
  • key.converter: Kafka 메시지 키 변환기.
  • value.converter: Kafka 메시지 값 변환기.
  • value.converter.schemas.enable: 값 변환기가 JsonConverter일 때 스키마 정보 포함 여부.
  • transforms: 적용할 데이터 변환 목록.
  • transforms.<name>.type: 변환 유형 (클래스 이름).
  • transforms.<name>.<property>: 변환 속성.
  • predicates: 적용할 데이터 필터링 조건 목록.
  • predicates.<name>.type: 조건 유형 (클래스 이름).
  • predicates.<name>.<property>: 조건 속성.
  • document.id.strategy: MongoDB 문서 _id 생성 전략.
  • max.batch.size: 한 번에 MongoDB에 쓸 최대 레코드 수.
  • rate.limiting.timeout: 쓰기 요청 대기 시간(초) (rate limiting 사용 시).
  • rate.limiting.every.n: rate limiting 적용 빈도.
  • bulk.write.ordered: 순서가 지정된 bulk write 사용 여부.
  • errors.tolerance: 오류 처리 방식 (none 또는 all).
  • errors.log.enable: 오류 로그 기록 여부.
  • errors.log.include.messages: 로그에 Kafka 메시지 내용 포함 여부.
  • errors.deadletterqueue.topic.name: 데드 레터 큐 토픽 이름.
  • errors.deadletterqueue.context.headers.enable: DLQ 메시지에 컨텍스트 헤더를 추가할지 여부
  • mongodb.ssl.enabled: SSL/TLS 연결 사용 여부.
  • mongodb.ssl.truststore.location: truststore 경로 (SSL 사용 시).
  • mongodb.ssl.truststore.password: truststore 비밀번호 (SSL 사용 시).
  • mongodb.ssl.keystore.location: keystore 경로 (SSL 양방향 인증 시).
  • mongodb.ssl.keystore.password: keystore 비밀번호 (SSL 양방향 인증 시).
  • mongodb.ssl.invalid.host.name.allowed: SSL 서버 인증서 호스트 이름 불일치 허용
  • topic.namespace.map: Kafka 토픽과 MongoDB 데이터베이스/컬렉션 매핑 규칙.
  • field.renamer.mapping: Kafka 필드와 MongoDB 필드 이름 매핑.
  • field.exclude.list: MongoDB 문서에서 제외할 필드 목록.
  • topic.override.<topicName>.<propertyName>: 특정 Kafka 토픽에 대한 설정 재정의.
728x90
250x250
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함