DB
Kafka Connect MongoDB Sink Connector 등록/수정/삭제/조회
ipxy
2025. 4. 4. 14:31
728x90
등록 (Insert) | ✅ 가능 | 기본 동작 |
수정 (Update) | ✅ 가능 | 특정 설정 + 메시지 포맷 필요 |
삭제 (Delete) | ✅ 가능 | 특정 설정 + Tombstone 메시지 |
조회 (Select) | ❌ 불가 | Sink는 쓰기 전용(읽기 불가) |
1. 등록 (Insert)
- 아무 설정도 추가 안 해도 Kafka → MongoDB로 "Insert" 됩니다.
- Kafka에 JSON 데이터가 오면 → MongoDB 컬렉션에 새 문서로 들어갑니다.
Kafka 메시지 예시:
{
"id": "123",
"name": "Alice",
"email": "alice@example.com"
}
MongoDB 결과:
{
"_id": ObjectId("..."),
"id": "123",
"name": "Alice",
"email": "alice@example.com"
}
2. 수정 (Update)
MongoDB에서 기존 문서를 "수정"하고 싶을 때는 설정이 필요합니다.
필수 설정:
- writemodel.strategy를 ReplaceOneBusinessKeyStrategy 또는 UpdateOneBusinessKeyStrategy 로 설정해야 합니다.
{
"writemodel.strategy": "com.mongodb.kafka.connect.sink.writemodel.strategy.ReplaceOneBusinessKeyStrategy",
"document.id.strategy": "com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.overrides": "your-topic:com.mongodb.kafka.connect.sink.processor.id.strategy.PartialValueStrategy",
"document.id.strategy.partial.value.projection.list": "id",
"document.id.strategy.partial.value.projection.type": "AllowList"
}
- 여기서 "id" 같은 필드를 기준으로 MongoDB 문서를 찾아서 덮어씁니다.
Kafka 메시지 예시 (Update):
{
"id": "123",
"email": "alice_updated@example.com"
}
MongoDB 결과:
{
"_id": ObjectId("..."),
"id": "123",
"name": "Alice", // name은 없어질 수 있음 (Replace 방식 주의!)
"email": "alice_updated@example.com"
}
💡 주의: Replace 방식은 전체 문서를 "덮어쓰기" 때문에 필요한 필드를 모두 포함해야 해요.
Update만 하고 싶으면 별도 Update Strategy 써야 합니다.
3. 삭제 (Delete)
Kafka → MongoDB 문서를 "삭제"도 가능합니다.
방법은 Tombstone 메시지를 보내는 겁니다.
Tombstone 메시지란?
- Kafka 메시지에서 value를 null로 보내는 것
- key 값만 남기고 value가 null → 이걸 MongoDB Sink가 감지해서 삭제합니다.
Kafka Tombstone 예시:
# Key: {"id": "123"}
# Value: null
필수 설정 추가:
{
"delete.on.null.values": "true"
}
이걸 켜야만, value=null일 때 MongoDB에서 삭제됩니다.
4. 조회 (Select)
**조회(Select)**는 Kafka Connect Sink로는 절대 할 수 없습니다. ❌
- Sink Connector는 Kafka → MongoDB로 쓰기 전용입니다.
- MongoDB 데이터를 읽어오려면 Source Connector(반대 방향)를 써야 합니다.
728x90