티스토리 뷰

728x90

전체 구조

MongoDB (Change Streams)
  ↓
Kafka Connect (MongoDB Source Connector)
  ↓
Kafka Topic
  ↓
(필요하면) Kafka Consumer → 다른 시스템

🔥 핵심 

항목 설명
MongoDB 데이터 변경(CRUD) 발생
Change Streams MongoDB의 변경 이벤트를 실시간 감지
Kafka Connect MongoDB Source Connector를 통해 Kafka로 변경 이벤트를 전송
Kafka Broker 이벤트를 Topic에 저장
Kafka Consumer 소비하여 다른 서비스/API/DB로 전파 가능

🛠 세팅 방법 정리

1. MongoDB 준비

  • 버전: MongoDB 4.0 이상 필요
  • Replica Set 또는 Sharded Cluster 구성 필수
    ➔ 단일 노드라도 --replSet 옵션 주고 ReplicaSet 모드로 구동해야 함!
mongod --replSet "rs0" --bind_ip_all

초기화

mongo
rs.initiate()

2. Kafka Connect MongoDB Source Connector 설치

  • Kafka Connect 클러스터에 MongoDB Source Connector 플러그인을 설치합니다.
confluent-hub install mongodb/kafka-connect-mongodb:latest

또는 직접 JAR 파일 다운받아서 넣을 수도 있습니다 (/plugins 경로에).


3. MongoDB Source Connector 설정 (예시)

Kafka Connect에 등록할 Connector JSON 설정입니다.

{
  "name": "mongo-source-connector",
  "config": {
    "connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
    "tasks.max": "1",
    
    "connection.uri": "mongodb://username:password@mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=rs0",
    
    "database": "your_database",
    "collection": "your_collection",
    
    "output.format.value": "json",
    "output.schema.infer.value": "true",
    
    "topic.prefix": "mongo.",

    "change.stream.full.document": "updateLookup"
  }
}

주요 옵션 설명

옵션 설명
connection.uri MongoDB 연결 정보 (ReplicaSet 모드)
database 감시할 DB 이름
collection 감시할 컬렉션 이름 (지정하지 않으면 전체)
topic.prefix Kafka로 보낼 때 토픽 이름 접두어 설정
change.stream.full.document 변경된 전체 문서를 가져올지 여부 (updateLookup 추천)

변경이 발생하면 Kafka에 mongo.{database}.{collection} 같은 토픽으로 발행됩니다.


4. Kafka로 전송되는 메시지 예시

MongoDB에서 예를 들어 users 컬렉션에 변경이 생기면 Kafka로 이런 JSON 메시지가 날아옵니다:

{
  "_id": {
    "_data": "825F3B...00"
  },
  "operationType": "insert",
  "fullDocument": {
    "_id": "12345",
    "name": "홍길동",
    "email": "gildong@example.com"
  },
  "ns": {
    "db": "your_database",
    "coll": "users"
  },
  "documentKey": {
    "_id": "12345"
  }
}
  • operationType: insert, update, delete 구분 가능
  • fullDocument: 변경된 실제 문서

⚡ 주의사항

  • MongoDB는 반드시 ReplicaSet이어야 Change Streams 작동함.
  • 네트워크 레이턴시가 크면 Kafka에 밀려서 쌓일 수 있으니 tasks.max, batch.size 같은 최적화 필요.
  • 토픽 Naming 규칙 주의 (prefix를 제대로 설계해야 나중에 Consumer가 구분 가능).
  • updateLookup 옵션 안 주면 update 시 필드 일부만 올 수 있음 ➔ 대부분 실전에서는 updateLookup 켜는 게 좋음.
728x90
250x250
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함