티스토리 뷰

728x90

✅  예시

Kafka 메시지를 기반으로 RESTHeart에 다음 작업을 수행:

  • 조회(GET) → 메시지 키로 조건 조회
  • 등록(POST) → 메시지 내용을 새 문서로 등록
  • 수정(PUT/PATCH) → 특정 키의 문서 수정
  • 삭제(DELETE) → 특정 키에 해당하는 문서 삭제

🔧 1. SMT 동작 방식 요약

  • Kafka → Kafka Connect → SMT → RESTHeart (HTTP 요청) → 응답 무시 or 활용
  • SMT는 메시지를 변환 또는 외부 시스템 호출 가능(비동기 or 동기)
  • 요청 실패 시 DLQ로 전달하도록 설계 가능

📦 2. 커스텀 SMT Maven 프로젝트 구조

custom-smt-rest/
├── src/
│   └── main/
│       └── java/
│           └── com/example/smt/
│               └── RestHeartHttpSMT.java
├── pom.xml

📄 3. SMT Java 코드 예시 (등록/수정/삭제/조회)

package com.example.smt;

import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import okhttp3.*;

import java.io.IOException;
import java.util.Map;

public class RestHeartHttpSMT<R extends ConnectRecord<R>> implements Transformation<R> {
    private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
    private String restHeartUrl;
    private String httpMethod;
    private OkHttpClient client;

    @Override
    public void configure(Map<String, ?> configs) {
        SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
        restHeartUrl = config.getString("restheart.url");
        httpMethod = config.getString("restheart.method").toUpperCase();
        client = new OkHttpClient();
    }

    @Override
    public R apply(R record) {
        try {
            RequestBody body = RequestBody.create(record.value().toString(), JSON);
            Request.Builder builder = new Request.Builder().url(restHeartUrl);

            switch (httpMethod) {
                case "POST": builder.post(body); break;
                case "PUT": builder.put(body); break;
                case "PATCH": builder.patch(body); break;
                case "DELETE": builder.delete(); break;
                case "GET": builder.get(); break;
                default: throw new IllegalArgumentException("Invalid method: " + httpMethod);
            }

            Response response = client.newCall(builder.build()).execute();
            if (!response.isSuccessful()) {
                throw new IOException("Unexpected code " + response);
            }

        } catch (Exception e) {
            throw new RuntimeException("RESTHeart call failed", e);
        }

        return record;
    }

    @Override
    public void close() {}

    @Override
    public ConfigDef config() {
        return CONFIG_DEF;
    }

    private static final ConfigDef CONFIG_DEF = new ConfigDef()
        .define("restheart.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "RESTHeart API URL")
        .define("restheart.method", ConfigDef.Type.STRING, "POST", ConfigDef.Importance.MEDIUM, "HTTP Method (GET/POST/PUT/DELETE)");
}

⚙️ 4. Kafka Connector 설정 예시 (예: SinkConnector + 이 SMT 추가)

{
  "name": "kafka-to-restheart",
  "config": {
    "connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
    "topics": "test-topic",
    "tasks.max": "1",
    "file": "/tmp/devnull",  // 실제 저장 안함
    "transforms": "HttpRest",
    "transforms.HttpRest.type": "com.example.smt.RestHeartHttpSMT",
    "restheart.url": "http://restheart:8080/db/collection",  // RESTHeart 주소
    "restheart.method": "POST"
  }
}

📫 RESTHeart 별 동작 예시

작업 설정 값 (restheart.method)  RESTHeart 동작  URL 예시
조회 (GET) "GET" 특정 ID 조회 /db/mycol/123
등록 (POST) "POST" 문서 추가 /db/mycol/
수정 (PUT) "PUT" 전체 문서 덮어쓰기 /db/mycol/123
삭제 (DELETE) "DELETE" 해당 ID 삭제 /db/mycol/123

🛠️ 빌드 및 배포

# pom.xml에서 패키징 후
mvn clean package

# JAR 파일을 Kafka Connect plugin 경로로 이동
cp target/custom-smt-rest-1.0.jar $KAFKA_CONNECT_PLUGIN_PATH/

 

728x90
250x250
공지사항
최근에 올라온 글
최근에 달린 댓글
Total
Today
Yesterday
링크
TAG
more
«   2025/04   »
1 2 3 4 5
6 7 8 9 10 11 12
13 14 15 16 17 18 19
20 21 22 23 24 25 26
27 28 29 30
글 보관함