기타
Kafka Connect 커스텀 SMT - JSON 데이터 외부 API POST 호출 + MongoDB Sink Connector
ipxy
2025. 4. 12. 10:07
728x90
Kafka → SMT (레코드 JSON을 외부 API로 POST) → (응답 활용 or 무시) → Kafka/외부 시스템
🛠 Java 커스텀 SMT 예제 (POST 방식, JSON 전송)
package com.example.connect.transforms;
import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import java.io.IOException;
import java.util.Map;
public class PostJsonToApiSMT<R extends ConnectRecord<R>> implements Transformation<R> {
private OkHttpClient httpClient;
private String apiUrl;
private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
private ObjectMapper objectMapper;
@Override
public void configure(Map<String, ?> configs) {
this.apiUrl = (String) configs.get("api.url");
this.httpClient = new OkHttpClient();
this.objectMapper = new ObjectMapper();
}
@Override
public R apply(R record) {
if (record.value() == null) return record;
try {
// Struct → JSON 문자열 변환
String jsonPayload = objectMapper.writeValueAsString(record.value());
// POST 요청 생성
RequestBody body = RequestBody.create(jsonPayload, JSON);
Request request = new Request.Builder()
.url(apiUrl)
.post(body)
.build();
// 외부 API 호출
try (Response response = httpClient.newCall(request).execute()) {
if (!response.isSuccessful()) {
throw new ConnectException("Unexpected API response: " + response);
}
// ✅ 응답을 활용하고 싶으면 여기서 response.body().string() 사용 가능
}
return record; // 레코드는 원본 그대로 전달
} catch (IOException e) {
throw new ConnectException("Failed to call external API", e);
}
}
@Override
public void close() {
httpClient.dispatcher().executorService().shutdown();
}
@Override
public ConfigDef config() {
return new ConfigDef()
.define("api.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "POST할 외부 API URL");
}
}
📋 설명
ObjectMapper | Kafka 레코드의 value를 JSON 문자열로 변환 |
OkHttpClient | 외부 HTTP API에 POST 요청을 보내는 데 사용 |
RequestBody | 변환한 JSON 문자열을 요청 본문에 실어 전송 |
Response 처리 | 성공(2xx) 응답만 통과, 실패하면 예외 던짐 |
record | 변환 후 레코드는 원본 그대로 Kafka로 진행 |
(선택) | 응답 결과를 이용해 메시지를 변경할 수도 있음 (추가 예제 가능) |
🔥 Connector 설정 (YAML 예시)
transforms: postToApi
transforms.postToApi.type: com.example.connect.transforms.PostJsonToApiSMT
transforms.postToApi.api.url: "https://your.external.api/endpoint"
- transforms.postToApi.api.url : 외부 서버의 엔드포인트 URL
- transforms.postToApi.type : 커스텀 SMT 클래스명
✅ "Kafka Connect 커스텀 SMT로 외부 API 호출 + MongoDB Sink Connector" 같이 사용
[ Kafka Topic ]
↓
[ Kafka Connect Worker ]
↓
[ MongoDB Sink Connector ]
↓ (중간에)
[ SMT: External API POST 호출 (변환) ]
↓
[ MongoDB에 저장 (원본 or 변환된 데이터) ]
📦 흐름
단계 | 설명 |
1 | Kafka 토픽에서 MongoDB Sink Connector가 메시지를 Consume |
2 | Sink Connector에 설정된 transforms(SMT) 단계에서 커스텀 SMT가 동작 |
3 | SMT 안에서 Kafka 메시지를 외부 API로 POST 전송 |
4 | 응답을 받아 메시지를 수정하거나, 아니면 그냥 통과 |
5 | 최종 메시지가 MongoDB에 저장 |
✔️ SMT는 MongoDB Sink Connector의 'transforms' 안에 설정됩니다.
🔥 Connector 설정 예시
name: mongodb-sink-with-api-call
config:
connector.class: com.mongodb.kafka.connect.MongoSinkConnector
tasks.max: 1
topics: your-topic
connection.uri: "mongodb://your-mongo-url"
database: your_database
collection: your_collection
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
# 👇 SMT 설정
transforms: callExternalApi
transforms.callExternalApi.type: com.example.connect.transforms.PostJsonToApiSMT
transforms.callExternalApi.api.url: "https://your.external.api/endpoint"
포인트:
- transforms.callExternalApi.type 에 아까 만든 커스텀 SMT 등록
- transforms.callExternalApi.api.url 에 API 주소 설정
🧠 정리
구분 | 설명 |
커넥터 | MongoDB Sink Connector 사용 (com.mongodb.kafka.connect.MongoSinkConnector) |
SMT | 외부 API 호출용 커스텀 SMT 등록 |
순서 | 메시지 → SMT 변환 → MongoDB 저장 |
특징 | 외부 API 호출 성공/실패에 따라 레코드 가공 가능 |
🚨 주의사항
항목 | 설명 |
SMT 실패 처리 | API 호출 실패 시 SMT가 예외를 던지면 전체 Sink Task가 죽을 수 있음 → 오류 처리 중요 |
타임아웃 설정 | OkHttpClient timeout 짧게 설정해서 Worker가 멈추지 않게 |
성능 | 외부 API 호출이 느리면 MongoDB 저장 속도도 같이 느려짐 (대량 처리시 병목 주의) |
동시성 제한 | 필요하면 OkHttp를 비동기로 만들거나, 큐를 두고 별도 스레드 처리 가능 |
Retry 전략 | API 호출 실패 시 재시도 기능 넣고, 최악의 경우 실패 레코드는 Dead Letter Queue로 |
728x90