DB
MongoDB Source Connector의 필터링 / 변환 / 포맷팅 완전 정리
ipxy
2025. 4. 14. 10:41
728x90
MongoDB에서 Kafka로 보내기 전에, 3단계 과정을 거칠 수 있습니다.
- 필터링 (Filter)
MongoDB Change Stream 이벤트 중 필요한 것만 선택 (→ MongoDB 서버단 처리) - 변환 (Transform)
Kafka로 보내기 전에 메시지 구조를 바꾸거나 수정 (→ Kafka Connect SMT) - 포맷팅 (Formatting)
Kafka 메시지의 Key/Value 직렬화 형태 지정 (→ JSON, BSON 등)
이 3단계를 조합하면 훨씬 깔끔하고, 목적에 맞는 데이터 스트림을 만들 수 있어요.
✋ 1. 필터링 (Filter) - MongoDB 단에서 필요한 데이터만 선택
방법
- MongoDB Source Connector의 pipeline 옵션 사용
- MongoDB의 Aggregation Pipeline 문법으로 이벤트를 걸러냅니다.
사용 예시 1: 특정 컬렉션 이벤트만 가져오기
"pipeline": "[{\"$match\": {\"ns.coll\": \"orders\"}}]"
➔ orders 컬렉션의 데이터 변경 이벤트만 Kafka로 보냄.
사용 예시 2: status가 "active"인 문서만 가져오기
"pipeline": "[{\"$match\": {\"fullDocument.status\": \"active\"}}]"
➔ 변경된 문서의 status 필드가 "active"일 때만 Kafka로 보냄.
사용 예시 3: 오퍼레이션 타입별 필터링
"pipeline": "[{\"$match\": {\"operationType\": {\"$in\": [\"insert\", \"update\"]}}}]"
➔ insert나 update 이벤트만 Kafka로 보냄. delete 이벤트는 무시.
$match는 MongoDB Aggregation Pipeline 연산자입니다. operationType 은 MongoDB Change Stream 이벤트 타입 필드입니다. "insert" : 문서 삽입 "update" : 문서 일부 수정 "replace" : 문서 전체 대체 "delete" : 문서 삭제 "invalidate" : 컬렉션/DB 삭제로 인한 스트림 종료
✅ 핵심 요약
- $match 를 이용해 "필터"하는 방식
- MongoDB Change Stream에서 서버단에서 미리 필터링 → Kafka로 오는 데이터 자체를 줄임
- 서버부하, 네트워크 부하, Kafka 부하까지 줄일 수 있어 "필수 기능"입니다
✋ 2. 변환 (Transform) - Kafka Connect SMT (Single Message Transform)
Kafka로 보내기 전에 메시지를 수정/가공/재구성하는 과정입니다.
MongoDB Source Connector는 Kafka Connect의 SMT 기능을 그대로 쓸 수 있습니다.
대표적인 변환(SMT) 종류
변환 | 종류 | 설명 SMT 클래스 |
필드 제거 | 특정 필드 삭제 | ReplaceField |
필드 이름 변경 | 필드 이름 바꿔서 보내기 | ReplaceField |
필드 일부 추출 | 특정 필드만 남기기 | ExtractField |
타임스탬프 변환 | 타임스탬프 포맷 변경 | TimestampConverter |
Key 변경 | 특정 필드를 Kafka 메시지 Key로 지정 | ValueToKey |
필드 추가 | 새로운 필드 추가 | InsertField |
SMT 설정 예시 1: 특정 필드 삭제
"transforms": "dropField",
"transforms.dropField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropField.blacklist": "_id, internalField"
➔ Kafka 메시지에서 _id, internalField 필드를 삭제하고 보내줍니다.
SMT 설정 예시 2: 특정 필드만 추출
"transforms": "extract",
"transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extract.field": "fullDocument"
➔ MongoDB Change Stream 이벤트 중 fullDocument 부분만 Kafka 메시지로 보냅니다.
(기본 이벤트 메타데이터는 제외)
SMT 설정 예시 3: Kafka 메시지 Key를 특정 필드로 변경
"transforms": "makeKey",
"transforms.makeKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.makeKey.fields": "orderId"
➔ MongoDB 문서의 orderId 필드를 Kafka 메시지 Key로 사용합니다.
✅ 핵심 요약
- SMT를 이용해 Kafka Topic을 깔끔하게 정리할 수 있다.
- 필요 없는 필드는 제거하고, 필요한 필드만 남길 수 있다.
- Kafka 메시지 Key를 제대로 설정하면 Consumer side Join 같은 것도 쉽게 가능해진다.
✋ 3. 포맷팅 (Formatting) - Kafka 메시지 직렬화 방식 지정
MongoDB Source Connector는 Kafka로 보낼 때, Key와 Value의 포맷을 지정할 수 있습니다.
옵션 | 설명 |
output.format.key | Kafka 메시지 Key 포맷 (default: json) |
output.format.value | Kafka 메시지 Value 포맷 (default: json) |
가능한 포맷:
- json (보통 가장 많이 사용)
- schema
- bson
가장 많이 쓰는 기본 설정
"output.format.key": "json",
"output.format.value": "json"
JSON 포맷으로 Kafka Topic에 들어오게 됩니다.
BSON 포맷 설정 예시 (특수한 경우)
"output.format.value": "bson"
- BSON은 MongoDB 네이티브 포맷
- 일반적으로 Kafka Consumer에서 BSON 처리가 어려우므로 JSON 권장합니다.
✨ 전체 흐름 그림
MongoDB Change Stream
↓ (Pipeline 필터링)
필터링 된 Change Event
↓ (SMT 변환 처리)
변환 된 Kafka 메시지
↓ (JSON 포맷 직렬화)
Kafka Topic으로 전송
🛠️ 실전 Tip
상황 추천 방법
MongoDB 변경 중 insert만 가져오고 싶다 | pipeline으로 operationType == "insert"만 필터 |
Kafka 메시지에서 필요 없는 내부 필드를 삭제하고 싶다 | ReplaceField SMT로 필드 삭제 |
Kafka Key를 MongoDB의 특정 필드로 쓰고 싶다 | ValueToKey SMT 적용 |
Kafka 메시지를 읽는 다른 시스템이 JSON을 기대한다 | output.format.value: json 지정 |
🧩 요약 정리
단계 기능 어디서 설정?
필터링 | MongoDB에서 필요한 데이터만 | pipeline 옵션 |
변환 | Kafka로 보내기 전 메시지 수정 | transforms (SMT) |
포맷팅 | Kafka 메시지 직렬화 포맷 지정 | output.format.key / output.format.value |
728x90