DB

Kafka Connect Transformer란?

ipxy 2025. 4. 4. 14:33
728x90

🎯 Kafka Connect Transformer란?

Kafka Connect SMT (Single Message Transformation)

Kafka Connect에서 Kafka 메시지MongoDB로 보내기 전에 변형/필터/가공하는 기능입니다.

즉,

  • Kafka에 들어온 원본 메시지
  • MongoDB에 저장하기 좋게 가공할 수 있어요.

예시:

  • 필드 추가/삭제
  • JSON 구조 변경
  • 특정 필드를 키로 설정
  • 값 변환 (대문자/소문자, 타입 변경)
  • 토픽 이름에 따라 다른 컬렉션으로 분기

🛠 Transformer 사용하는 방법

Connector 등록할 때 transforms 설정을 추가합니다.

{
  "transforms": "변환이름",
  "transforms.변환이름.type": "org.apache.kafka.connect.transforms.변환클래스",
  "transforms.변환이름.추가옵션": "..."
}
  • 변환이름 : 내가 붙이는 이름
  • 변환클래스 : 사용할 변환 종류
  • 추가옵션 : 변환별 세부 옵션

✨ 대표 Transformer 종류

 

Transformer  설명  사용 예시
InsertField 특정 필드 추가 레코드에 timestamp 필드 추가
ReplaceField 필드 이름 바꾸기/필드 제외 민감한 필드 제거
MaskField 필드 값 마스킹 개인정보 숨기기
SetSchemaMetadata 스키마 변경 스키마 없는 데이터 강제 스키마 적용
ValueToKey value의 일부 필드를 Kafka message key로 복사 id를 key로

📦 MongoDB Sink에 가장 많이 쓰는 조합 예시

예시1. Kafka 메시지에 timestamp 필드 추가해서 MongoDB에 저장하기

{
  "transforms": "InsertTS",
  "transforms.InsertTS.type": "org.apache.kafka.connect.transforms.InsertField$Value",
  "transforms.InsertTS.timestamp.field": "created_at"
}

Kafka 원본:

{ "name": "Alice" }

MongoDB에 저장될 데이터:

{ 
  "name": "Alice",
  "created_at": "2025-04-04T10:30:00.123Z"
}

예시2. 특정 필드 삭제 (예: 개인정보 삭제)

{
  "transforms": "HideSensitive",
  "transforms.HideSensitive.type": "org.apache.kafka.connect.transforms.ReplaceField$Value",
  "transforms.HideSensitive.blacklist": "password,ssn"
}

Kafka 원본:

{
  "username": "alice",
  "password": "1234",
  "ssn": "123-45-6789"
}

MongoDB 저장 결과:

{
  "username": "alice"
}

예시3. id 값을 Kafka message key로 이동시키기

{
  "transforms": "ExtractId",
  "transforms.ExtractId.type": "org.apache.kafka.connect.transforms.ValueToKey",
  "transforms.ExtractId.fields": "id"
}
  • MongoDB Sink는 key 값을 문서 _id로 매핑할 수도 있습니다.
  • document.id.strategy=ProvidedInKeyStrategy 설정 필요.

이걸 조합하면 MongoDB에서 id 중복 없이 예쁘게 저장 가능!


📋 한눈에 요약

 

할 일 Transformer 설정
필드 추가 InsertField
필드 이름 변경/삭제 ReplaceField
필드 마스킹 MaskField
특정 필드를 Key로 만들기 ValueToKey
데이터 스키마 강제 설정 SetSchemaMetadata

✨ 추가로 응용하면…

  • 토픽별로 컬렉션을 다르게 분기 (RegexRouter Transformer)
  • Kafka 메시지 안에 nested field만 빼서 저장 (ExtractField)
  • 날짜 형식을 고정 (예: yyyy-MM-dd로)

완전 다양한 변형이 가능합니다! 🛠️


📢 주의사항

  • Transformer 순서 중요합니다.
    여러 개 쓸 때 transforms에 순서대로 나열해야 해요.
{
  "transforms": "InsertTS,HideSensitive,ExtractId",
  "transforms.InsertTS.type": "...",
  "transforms.HideSensitive.type": "...",
  "transforms.ExtractId.type": "..."
}
  • value.converter.schemas.enable=false 설정 시 일부 Transformer 제한이 있을 수 있습니다.

 

728x90