DB
MongoDB Change Streams + Kafka (MongoDB의 데이터 변경사항을 Kafka 메시지로 실시간 전송)
ipxy
2025. 4. 12. 10:37
728x90
전체 구조
MongoDB (Change Streams)
↓
Kafka Connect (MongoDB Source Connector)
↓
Kafka Topic
↓
(필요하면) Kafka Consumer → 다른 시스템
🔥 핵심
항목 | 설명 |
MongoDB | 데이터 변경(CRUD) 발생 |
Change Streams | MongoDB의 변경 이벤트를 실시간 감지 |
Kafka Connect | MongoDB Source Connector를 통해 Kafka로 변경 이벤트를 전송 |
Kafka Broker | 이벤트를 Topic에 저장 |
Kafka Consumer | 소비하여 다른 서비스/API/DB로 전파 가능 |
🛠 세팅 방법 정리
1. MongoDB 준비
- 버전: MongoDB 4.0 이상 필요
- Replica Set 또는 Sharded Cluster 구성 필수
➔ 단일 노드라도 --replSet 옵션 주고 ReplicaSet 모드로 구동해야 함!
mongod --replSet "rs0" --bind_ip_all
초기화
mongo
rs.initiate()
2. Kafka Connect MongoDB Source Connector 설치
- Kafka Connect 클러스터에 MongoDB Source Connector 플러그인을 설치합니다.
confluent-hub install mongodb/kafka-connect-mongodb:latest
또는 직접 JAR 파일 다운받아서 넣을 수도 있습니다 (/plugins 경로에).
3. MongoDB Source Connector 설정 (예시)
Kafka Connect에 등록할 Connector JSON 설정입니다.
{
"name": "mongo-source-connector",
"config": {
"connector.class": "com.mongodb.kafka.connect.MongoSourceConnector",
"tasks.max": "1",
"connection.uri": "mongodb://username:password@mongo1:27017,mongo2:27017,mongo3:27017/?replicaSet=rs0",
"database": "your_database",
"collection": "your_collection",
"output.format.value": "json",
"output.schema.infer.value": "true",
"topic.prefix": "mongo.",
"change.stream.full.document": "updateLookup"
}
}
주요 옵션 설명
옵션 | 설명 |
connection.uri | MongoDB 연결 정보 (ReplicaSet 모드) |
database | 감시할 DB 이름 |
collection | 감시할 컬렉션 이름 (지정하지 않으면 전체) |
topic.prefix | Kafka로 보낼 때 토픽 이름 접두어 설정 |
change.stream.full.document | 변경된 전체 문서를 가져올지 여부 (updateLookup 추천) |
변경이 발생하면 Kafka에 mongo.{database}.{collection} 같은 토픽으로 발행됩니다.
4. Kafka로 전송되는 메시지 예시
MongoDB에서 예를 들어 users 컬렉션에 변경이 생기면 Kafka로 이런 JSON 메시지가 날아옵니다:
{
"_id": {
"_data": "825F3B...00"
},
"operationType": "insert",
"fullDocument": {
"_id": "12345",
"name": "홍길동",
"email": "gildong@example.com"
},
"ns": {
"db": "your_database",
"coll": "users"
},
"documentKey": {
"_id": "12345"
}
}
- operationType: insert, update, delete 구분 가능
- fullDocument: 변경된 실제 문서
⚡ 주의사항
- MongoDB는 반드시 ReplicaSet이어야 Change Streams 작동함.
- 네트워크 레이턴시가 크면 Kafka에 밀려서 쌓일 수 있으니 tasks.max, batch.size 같은 최적화 필요.
- 토픽 Naming 규칙 주의 (prefix를 제대로 설계해야 나중에 Consumer가 구분 가능).
- updateLookup 옵션 안 주면 update 시 필드 일부만 올 수 있음 ➔ 대부분 실전에서는 updateLookup 켜는 게 좋음.
728x90