티스토리 뷰

728x90
Kafka → SMT (레코드 JSON을 외부 API로 POST) → (응답 활용 or 무시) → Kafka/외부 시스템

🛠 Java 커스텀 SMT 예제 (POST 방식, JSON 전송)

package com.example.connect.transforms;

import com.fasterxml.jackson.databind.ObjectMapper;
import okhttp3.*;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.errors.ConnectException;
import java.io.IOException;
import java.util.Map;

public class PostJsonToApiSMT<R extends ConnectRecord<R>> implements Transformation<R> {

    private OkHttpClient httpClient;
    private String apiUrl;
    private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
    private ObjectMapper objectMapper;

    @Override
    public void configure(Map<String, ?> configs) {
        this.apiUrl = (String) configs.get("api.url");
        this.httpClient = new OkHttpClient();
        this.objectMapper = new ObjectMapper();
    }

    @Override
    public R apply(R record) {
        if (record.value() == null) return record;

        try {
            // Struct → JSON 문자열 변환
            String jsonPayload = objectMapper.writeValueAsString(record.value());

            // POST 요청 생성
            RequestBody body = RequestBody.create(jsonPayload, JSON);
            Request request = new Request.Builder()
                    .url(apiUrl)
                    .post(body)
                    .build();

            // 외부 API 호출
            try (Response response = httpClient.newCall(request).execute()) {
                if (!response.isSuccessful()) {
                    throw new ConnectException("Unexpected API response: " + response);
                }
                // ✅ 응답을 활용하고 싶으면 여기서 response.body().string() 사용 가능
            }

            return record; // 레코드는 원본 그대로 전달

        } catch (IOException e) {
            throw new ConnectException("Failed to call external API", e);
        }
    }

    @Override
    public void close() {
        httpClient.dispatcher().executorService().shutdown();
    }

    @Override
    public ConfigDef config() {
        return new ConfigDef()
                .define("api.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "POST할 외부 API URL");
    }
}

📋 설명

ObjectMapper Kafka 레코드의 value를 JSON 문자열로 변환
OkHttpClient 외부 HTTP API에 POST 요청을 보내는 데 사용
RequestBody 변환한 JSON 문자열을 요청 본문에 실어 전송
Response 처리 성공(2xx) 응답만 통과, 실패하면 예외 던짐
record 변환 후 레코드는 원본 그대로 Kafka로 진행
(선택) 응답 결과를 이용해 메시지를 변경할 수도 있음 (추가 예제 가능)

🔥 Connector 설정 (YAML 예시)

transforms: postToApi
transforms.postToApi.type: com.example.connect.transforms.PostJsonToApiSMT
transforms.postToApi.api.url: "https://your.external.api/endpoint"
  • transforms.postToApi.api.url : 외부 서버의 엔드포인트 URL
  • transforms.postToApi.type : 커스텀 SMT 클래스명

✅ "Kafka Connect 커스텀 SMT로 외부 API 호출 + MongoDB Sink Connector" 같이 사용

 

[ Kafka Topic ]
     ↓
[ Kafka Connect Worker ]
     ↓
[ MongoDB Sink Connector ]
     ↓ (중간에)
[ SMT: External API POST 호출 (변환) ]
     ↓
[ MongoDB에 저장 (원본 or 변환된 데이터) ]

📦 흐름

단계  설명
1 Kafka 토픽에서 MongoDB Sink Connector가 메시지를 Consume
2 Sink Connector에 설정된 transforms(SMT) 단계에서 커스텀 SMT가 동작
3 SMT 안에서 Kafka 메시지를 외부 API로 POST 전송
4 응답을 받아 메시지를 수정하거나, 아니면 그냥 통과
5 최종 메시지가 MongoDB에 저장

✔️ SMT는 MongoDB Sink Connector의 'transforms' 안에 설정됩니다.


🔥 Connector 설정 예시

name: mongodb-sink-with-api-call
config:
  connector.class: com.mongodb.kafka.connect.MongoSinkConnector
  tasks.max: 1
  topics: your-topic
  connection.uri: "mongodb://your-mongo-url"
  database: your_database
  collection: your_collection
  key.converter: org.apache.kafka.connect.storage.StringConverter
  value.converter: org.apache.kafka.connect.json.JsonConverter
  value.converter.schemas.enable: false

  # 👇 SMT 설정
  transforms: callExternalApi
  transforms.callExternalApi.type: com.example.connect.transforms.PostJsonToApiSMT
  transforms.callExternalApi.api.url: "https://your.external.api/endpoint"

포인트:

  • transforms.callExternalApi.type 에 아까 만든 커스텀 SMT 등록
  • transforms.callExternalApi.api.url 에 API 주소 설정

🧠 정리

구분 설명
커넥터 MongoDB Sink Connector 사용 (com.mongodb.kafka.connect.MongoSinkConnector)
SMT 외부 API 호출용 커스텀 SMT 등록
순서 메시지 → SMT 변환 → MongoDB 저장
특징 외부 API 호출 성공/실패에 따라 레코드 가공 가능

🚨 주의사항

항목 설명
SMT 실패 처리 API 호출 실패 시 SMT가 예외를 던지면 전체 Sink Task가 죽을 수 있음 → 오류 처리 중요
타임아웃 설정 OkHttpClient timeout 짧게 설정해서 Worker가 멈추지 않게
성능 외부 API 호출이 느리면 MongoDB 저장 속도도 같이 느려짐 (대량 처리시 병목 주의)
동시성 제한 필요하면 OkHttp를 비동기로 만들거나, 큐를 두고 별도 스레드 처리 가능
Retry 전략 API 호출 실패 시 재시도 기능 넣고, 최악의 경우 실패 레코드는 Dead Letter Queue로

 

728x90
250x250
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함