티스토리 뷰
728x90
🎯 Kafka Connect Transformer란?
Kafka Connect SMT (Single Message Transformation)
Kafka Connect에서 Kafka 메시지를 MongoDB로 보내기 전에 변형/필터/가공하는 기능입니다.
즉,
- Kafka에 들어온 원본 메시지를
- MongoDB에 저장하기 좋게 가공할 수 있어요.
예시:
- 필드 추가/삭제
- JSON 구조 변경
- 특정 필드를 키로 설정
- 값 변환 (대문자/소문자, 타입 변경)
- 토픽 이름에 따라 다른 컬렉션으로 분기
🛠 Transformer 사용하는 방법
Connector 등록할 때 transforms 설정을 추가합니다.
{
"transforms": "변환이름",
"transforms.변환이름.type": "org.apache.kafka.connect.transforms.변환클래스",
"transforms.변환이름.추가옵션": "..."
}
- 변환이름 : 내가 붙이는 이름
- 변환클래스 : 사용할 변환 종류
- 추가옵션 : 변환별 세부 옵션
✨ 대표 Transformer 종류
Transformer | 설명 | 사용 예시 |
InsertField | 특정 필드 추가 | 레코드에 timestamp 필드 추가 |
ReplaceField | 필드 이름 바꾸기/필드 제외 | 민감한 필드 제거 |
MaskField | 필드 값 마스킹 | 개인정보 숨기기 |
SetSchemaMetadata | 스키마 변경 | 스키마 없는 데이터 강제 스키마 적용 |
ValueToKey | value의 일부 필드를 Kafka message key로 복사 | id를 key로 |
📦 MongoDB Sink에 가장 많이 쓰는 조합 예시
예시1. Kafka 메시지에 timestamp 필드 추가해서 MongoDB에 저장하기
{
"transforms": "InsertTS",
"transforms.InsertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
"transforms.InsertTS.timestamp.field": "created_at"
}
Kafka 원본:
{ "name": "Alice" }
MongoDB에 저장될 데이터:
{
"name": "Alice",
"created_at": "2025-04-04T10:30:00.123Z"
}
예시2. 특정 필드 삭제 (예: 개인정보 삭제)
{
"transforms": "HideSensitive",
"transforms.HideSensitive.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
"transforms.HideSensitive.blacklist": "password,ssn"
}
Kafka 원본:
{
"username": "alice",
"password": "1234",
"ssn": "123-45-6789"
}
MongoDB 저장 결과:
{
"username": "alice"
}
예시3. id 값을 Kafka message key로 이동시키기
{
"transforms": "ExtractId",
"transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ValueToKey",
"transforms.ExtractId.fields": "id"
}
- MongoDB Sink는 key 값을 문서 _id로 매핑할 수도 있습니다.
- document.id.strategy=ProvidedInKeyStrategy 설정 필요.
이걸 조합하면 MongoDB에서 id 중복 없이 예쁘게 저장 가능!
📋 한눈에 요약
할 일 | Transformer 설정 |
필드 추가 | InsertField |
필드 이름 변경/삭제 | ReplaceField |
필드 마스킹 | MaskField |
특정 필드를 Key로 만들기 | ValueToKey |
데이터 스키마 강제 설정 | SetSchemaMetadata |
✨ 추가로 응용하면…
- 토픽별로 컬렉션을 다르게 분기 (RegexRouter Transformer)
- Kafka 메시지 안에 nested field만 빼서 저장 (ExtractField)
- 날짜 형식을 고정 (예: yyyy-MM-dd로)
완전 다양한 변형이 가능합니다! 🛠️
📢 주의사항
- Transformer 순서 중요합니다.
여러 개 쓸 때 transforms에 순서대로 나열해야 해요.
{
"transforms": "InsertTS,HideSensitive,ExtractId",
"transforms.InsertTS.type": "...",
"transforms.HideSensitive.type": "...",
"transforms.ExtractId.type": "..."
}
- value.converter.schemas.enable=false 설정 시 일부 Transformer 제한이 있을 수 있습니다.
728x90
'DB' 카테고리의 다른 글
MongoDB Source Connector 설정 (Kafka Connect) (0) | 2025.04.12 |
---|---|
MongoDB Change Streams + Kafka (MongoDB의 데이터 변경사항을 Kafka 메시지로 실시간 전송) (0) | 2025.04.12 |
Kafka Connect MongoDB Sink Connector 등록/수정/삭제/조회 (0) | 2025.04.04 |
MongoDB Sink Connector 설정에 유용한 주요 옵션들 (0) | 2025.04.03 |
MongoDB를 PostgreSQL + FerretDB로 마이그레이션 (0) | 2025.03.26 |