티스토리 뷰

728x90

MongoDB에서 Kafka로 보내기 전에, 3단계 과정을 거칠 수 있습니다.

  1. 필터링 (Filter)
    MongoDB Change Stream 이벤트 중 필요한 것만 선택 (→ MongoDB 서버단 처리)
  2. 변환 (Transform)
    Kafka로 보내기 전에 메시지 구조를 바꾸거나 수정 (→ Kafka Connect SMT)
  3. 포맷팅 (Formatting)
    Kafka 메시지의 Key/Value 직렬화 형태 지정 (→ JSON, BSON 등)

이 3단계를 조합하면 훨씬 깔끔하고, 목적에 맞는 데이터 스트림을 만들 수 있어요.


✋ 1. 필터링 (Filter) - MongoDB 단에서 필요한 데이터만 선택

방법

  • MongoDB Source Connector의 pipeline 옵션 사용
  • MongoDB의 Aggregation Pipeline 문법으로 이벤트를 걸러냅니다.

사용 예시 1: 특정 컬렉션 이벤트만 가져오기

"pipeline": "[{\"$match\": {\"ns.coll\": \"orders\"}}]"

➔ orders 컬렉션의 데이터 변경 이벤트만 Kafka로 보냄.


사용 예시 2: status가 "active"인 문서만 가져오기

"pipeline": "[{\"$match\": {\"fullDocument.status\": \"active\"}}]"

➔ 변경된 문서의 status 필드가 "active"일 때만 Kafka로 보냄.


사용 예시 3: 오퍼레이션 타입별 필터링

"pipeline": "[{\"$match\": {\"operationType\": {\"$in\": [\"insert\", \"update\"]}}}]"

➔ insert나 update 이벤트만 Kafka로 보냄. delete 이벤트는 무시.

$match는 MongoDB Aggregation Pipeline 연산자입니다.
operationType 은 MongoDB Change Stream 이벤트 타입 필드입니다.
"insert" : 문서 삽입
"update" : 문서 일부 수정
"replace" : 문서 전체 대체
"delete" : 문서 삭제
"invalidate" : 컬렉션/DB 삭제로 인한 스트림 종료

 

 


✅ 핵심 요약

  • $match 를 이용해 "필터"하는 방식
  • MongoDB Change Stream에서 서버단에서 미리 필터링 → Kafka로 오는 데이터 자체를 줄임
  • 서버부하, 네트워크 부하, Kafka 부하까지 줄일 수 있어 "필수 기능"입니다

✋ 2. 변환 (Transform) - Kafka Connect SMT (Single Message Transform)

Kafka로 보내기 전에 메시지를 수정/가공/재구성하는 과정입니다.

MongoDB Source Connector는 Kafka Connect의 SMT 기능을 그대로 쓸 수 있습니다.


대표적인 변환(SMT) 종류

변환  종류  설명 SMT 클래스
필드 제거 특정 필드 삭제 ReplaceField
필드 이름 변경 필드 이름 바꿔서 보내기 ReplaceField
필드 일부 추출 특정 필드만 남기기 ExtractField
타임스탬프 변환 타임스탬프 포맷 변경 TimestampConverter
Key 변경 특정 필드를 Kafka 메시지 Key로 지정 ValueToKey
필드 추가 새로운 필드 추가 InsertField

SMT 설정 예시 1: 특정 필드 삭제

"transforms": "dropField",
"transforms.dropField.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.dropField.blacklist": "_id, internalField"

➔ Kafka 메시지에서 _id, internalField 필드를 삭제하고 보내줍니다.


SMT 설정 예시 2: 특정 필드만 추출

"transforms": "extract",
"transforms.extract.type": "org.apache.kafka.connect.transforms.ExtractField$Value",
"transforms.extract.field": "fullDocument"

➔ MongoDB Change Stream 이벤트 중 fullDocument 부분만 Kafka 메시지로 보냅니다.
(기본 이벤트 메타데이터는 제외)


SMT 설정 예시 3: Kafka 메시지 Key를 특정 필드로 변경

"transforms": "makeKey",
"transforms.makeKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.makeKey.fields": "orderId"

➔ MongoDB 문서의 orderId 필드를 Kafka 메시지 Key로 사용합니다.


✅ 핵심 요약

  • SMT를 이용해 Kafka Topic을 깔끔하게 정리할 수 있다.
  • 필요 없는 필드는 제거하고, 필요한 필드만 남길 수 있다.
  • Kafka 메시지 Key를 제대로 설정하면 Consumer side Join 같은 것도 쉽게 가능해진다.

✋ 3. 포맷팅 (Formatting) - Kafka 메시지 직렬화 방식 지정

MongoDB Source Connector는 Kafka로 보낼 때, Key와 Value의 포맷을 지정할 수 있습니다.


 

옵션  설명
output.format.key Kafka 메시지 Key 포맷 (default: json)
output.format.value Kafka 메시지 Value 포맷 (default: json)

가능한 포맷:

  • json (보통 가장 많이 사용)
  • schema
  • bson

가장 많이 쓰는 기본 설정

"output.format.key": "json",
"output.format.value": "json"

JSON 포맷으로 Kafka Topic에 들어오게 됩니다.


BSON 포맷 설정 예시 (특수한 경우)

"output.format.value": "bson"
  • BSON은 MongoDB 네이티브 포맷
  • 일반적으로 Kafka Consumer에서 BSON 처리가 어려우므로 JSON 권장합니다.

✨ 전체 흐름 그림

MongoDB Change Stream
    ↓  (Pipeline 필터링)
필터링 된 Change Event
    ↓  (SMT 변환 처리)
변환 된 Kafka 메시지
    ↓  (JSON 포맷 직렬화)
Kafka Topic으로 전송

🛠️ 실전 Tip

상황 추천 방법

MongoDB 변경 중 insert만 가져오고 싶다 pipeline으로 operationType == "insert"만 필터
Kafka 메시지에서 필요 없는 내부 필드를 삭제하고 싶다 ReplaceField SMT로 필드 삭제
Kafka Key를 MongoDB의 특정 필드로 쓰고 싶다 ValueToKey SMT 적용
Kafka 메시지를 읽는 다른 시스템이 JSON을 기대한다 output.format.value: json 지정

🧩 요약 정리

단계 기능 어디서 설정?

필터링 MongoDB에서 필요한 데이터만 pipeline 옵션
변환 Kafka로 보내기 전 메시지 수정 transforms (SMT)
포맷팅 Kafka 메시지 직렬화 포맷 지정 output.format.key / output.format.value

 

728x90
250x250
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함