티스토리 뷰
Programming
Kafka Connect의 SMT(Single Message Transform)를 활용하여 RESTHeart에 HTTP 요청(조회/등록/수정/삭제)
ipxy 2025. 4. 15. 16:54728x90
✅ 예시
Kafka 메시지를 기반으로 RESTHeart에 다음 작업을 수행:
- 조회(GET) → 메시지 키로 조건 조회
- 등록(POST) → 메시지 내용을 새 문서로 등록
- 수정(PUT/PATCH) → 특정 키의 문서 수정
- 삭제(DELETE) → 특정 키에 해당하는 문서 삭제
🔧 1. SMT 동작 방식 요약
- Kafka → Kafka Connect → SMT → RESTHeart (HTTP 요청) → 응답 무시 or 활용
- SMT는 메시지를 변환 또는 외부 시스템 호출 가능(비동기 or 동기)
- 요청 실패 시 DLQ로 전달하도록 설계 가능
📦 2. 커스텀 SMT Maven 프로젝트 구조
custom-smt-rest/
├── src/
│ └── main/
│ └── java/
│ └── com/example/smt/
│ └── RestHeartHttpSMT.java
├── pom.xml
📄 3. SMT Java 코드 예시 (등록/수정/삭제/조회)
package com.example.smt;
import org.apache.kafka.connect.connector.ConnectRecord;
import org.apache.kafka.connect.transforms.Transformation;
import org.apache.kafka.connect.transforms.util.SimpleConfig;
import okhttp3.*;
import java.io.IOException;
import java.util.Map;
public class RestHeartHttpSMT<R extends ConnectRecord<R>> implements Transformation<R> {
private static final MediaType JSON = MediaType.parse("application/json; charset=utf-8");
private String restHeartUrl;
private String httpMethod;
private OkHttpClient client;
@Override
public void configure(Map<String, ?> configs) {
SimpleConfig config = new SimpleConfig(CONFIG_DEF, configs);
restHeartUrl = config.getString("restheart.url");
httpMethod = config.getString("restheart.method").toUpperCase();
client = new OkHttpClient();
}
@Override
public R apply(R record) {
try {
RequestBody body = RequestBody.create(record.value().toString(), JSON);
Request.Builder builder = new Request.Builder().url(restHeartUrl);
switch (httpMethod) {
case "POST": builder.post(body); break;
case "PUT": builder.put(body); break;
case "PATCH": builder.patch(body); break;
case "DELETE": builder.delete(); break;
case "GET": builder.get(); break;
default: throw new IllegalArgumentException("Invalid method: " + httpMethod);
}
Response response = client.newCall(builder.build()).execute();
if (!response.isSuccessful()) {
throw new IOException("Unexpected code " + response);
}
} catch (Exception e) {
throw new RuntimeException("RESTHeart call failed", e);
}
return record;
}
@Override
public void close() {}
@Override
public ConfigDef config() {
return CONFIG_DEF;
}
private static final ConfigDef CONFIG_DEF = new ConfigDef()
.define("restheart.url", ConfigDef.Type.STRING, ConfigDef.Importance.HIGH, "RESTHeart API URL")
.define("restheart.method", ConfigDef.Type.STRING, "POST", ConfigDef.Importance.MEDIUM, "HTTP Method (GET/POST/PUT/DELETE)");
}
⚙️ 4. Kafka Connector 설정 예시 (예: SinkConnector + 이 SMT 추가)
{
"name": "kafka-to-restheart",
"config": {
"connector.class": "org.apache.kafka.connect.file.FileStreamSinkConnector",
"topics": "test-topic",
"tasks.max": "1",
"file": "/tmp/devnull", // 실제 저장 안함
"transforms": "HttpRest",
"transforms.HttpRest.type": "com.example.smt.RestHeartHttpSMT",
"restheart.url": "http://restheart:8080/db/collection", // RESTHeart 주소
"restheart.method": "POST"
}
}
📫 RESTHeart 별 동작 예시
작업 | 설정 값 (restheart.method) | RESTHeart 동작 | URL 예시 |
조회 (GET) | "GET" | 특정 ID 조회 | /db/mycol/123 |
등록 (POST) | "POST" | 문서 추가 | /db/mycol/ |
수정 (PUT) | "PUT" | 전체 문서 덮어쓰기 | /db/mycol/123 |
삭제 (DELETE) | "DELETE" | 해당 ID 삭제 | /db/mycol/123 |
🛠️ 빌드 및 배포
# pom.xml에서 패키징 후
mvn clean package
# JAR 파일을 Kafka Connect plugin 경로로 이동
cp target/custom-smt-rest-1.0.jar $KAFKA_CONNECT_PLUGIN_PATH/
728x90
'Programming' 카테고리의 다른 글
Path Variable의 개념과 대규모 시스템에서의 영향 (0) | 2025.04.17 |
---|---|
APISIX Lua 플러그인: RESTHeart 데이터 Kafka 및 외부 API 전송 튜토리얼 (0) | 2025.04.16 |
Spring Boot로 RESTHeart CRUD (Create, Read, Update, Delete) (0) | 2025.04.14 |
Lua로 APISIX 흐름 가로채기 구현 팁 (0) | 2025.04.12 |
C#에서 UUID (0) | 2025.04.01 |