Programming

APISIX Lua 플러그인: RESTHeart 데이터 Kafka 및 외부 API 전송 튜토리얼

ipxy 2025. 4. 16. 07:45
728x90

개요

Apache APISIX는 Lua로 작성된 플러그인을 통해 요청 처리를 확장할 수 있는 API 게이트웨이입니다. 이 튜토리얼에서는 APISIX Lua 플러그인을 사용하여 RESTHeart를 통해 MongoDB 데이터를 조회하고, 그 결과를 Kafka 또는 외부 REST API로 전달하는 방법을 다룹니다. RESTHeart는 MongoDB에 대한 REST 인터페이스로, HTTP 요청으로 MongoDB 데이터를 손쉽게 조회할 수 있습니다.

우리는 플러그인에서 RESTHeart API를 호출하여 JSON 형식의 데이터를 얻은 뒤, 이 데이터의 일부 필드를 추출하여 (1) Kafka 브로커로 전송하거나 (2) 외부 HTTP 서비스에 POST/PUT으로 보낼 것입니다. 각 방식마다 Lua 코드 예제와 함께 필요한 설정, 보안 고려 사항, 오류 처리 전략 등을 설명합니다.

플러그인 구조 및 동작 흐름

APISIX의 Lua 플러그인은 일반적으로 init.lua에서 불러들여지며, 플러그인 파일 내에 플러그인 이름, 설정 스키마, 그리고 핸들러 함수들(예: access, log 등)이 정의됩니다. 본 예제에서는 요청 단계에서 동작하도록 access 함수에 로직을 구현하겠습니다. access 단계에서 RESTHeart에 데이터를 요청하고, 받은 응답을 가공하여 Kafka나 외부 API로 전송합니다.

  • 설정 스키마(schema): 플러그인 설정으로 RESTHeart의 URL, Kafka 브로커 정보, 토픽명, 외부 API의 URL 등을 지정할 수 있습니다. 이러한 값들은 conf 객체로 전달되어 플러그인 코드에서 사용됩니다.
  • Lua 라이브러리: APISIX에 내장된 OpenResty 환경에서는 HTTP 요청을 위한 lua-resty-http 모듈과 Kafka 클라이언트를 위한 lua-resty-kafka 모듈을 사용할 수 있습니다​이들 라이브러리를 require로 불러와 HTTP 통신과 Kafka 메시지 전송을 구현합니다.
  • 실행 흐름: 플러그인이 실행되면, (a) RESTHeart에 HTTP 요청하여 데이터를 가져오고 → (b) 응답 JSON에서 필요한 필드 추출 → (c) 추출한 데이터를 Kafka 또는 외부 API로 전송 → (d) 성공/실패 여부를 로그로 기록하는 순서로 동작합니다.

RESTHeart를 통한 MongoDB 데이터 조회

먼저 RESTHeart로부터 MongoDB 데이터를 가져오는 방법입니다. RESTHeart는 특정 URL 엔드포인트로 MongoDB 컬렉션이나 문서를 조회할 수 있습니다 (예: GET http://<RESTHeart 주소>/<DB>/<컬렉션>/<문서ID>). APISIX Lua 플러그인에서 lua-resty-http 클라이언트를 사용하여 RESTHeart에 HTTP 요청을 보내고 응답(JSON)을 받아올 수 있습니다.

아래 Lua 코드는 resty.http 모듈을 사용해 RESTHeart API에 GET 요청을 보내는 예시입니다. conf.restheart_url은 플러그인 설정으로 지정된 RESTHeart의 API URL입니다. 예시에서는 응답 본문을 받아 JSON으로 디코딩한 후 필요한 필드를 추출합니다:

-- RESTHeart 데이터 조회 예제 (Lua 플러그인 access 단계)
local http = require "resty.http"
local httpc = http.new()

-- RESTHeart에 GET 요청 보내기
local res, err = httpc:request_uri(conf.restheart_url, {
    method = "GET",
    headers = {
        ["Accept"] = "application/json"
        -- 인증이 필요하다면 여기 Authorization 헤더 등 추가
    }
})
if not res then
    ngx.log(ngx.ERR, "RESTHeart 요청 실패: ", err)
    return  -- 요청 실패 처리 (로그 기록 후 종료)
end

ngx.log(ngx.INFO, "RESTHeart 응답 상태: ", res.status)
local body = res.body  -- 응답 본문 (JSON 형태의 문자열)
-- 로그로 응답 일부 출력 (디버그용)
ngx.log(ngx.DEBUG, "RESTHeart 응답 본문: ", body)

-- JSON 파싱 및 특정 필드 추출
local cjson = require "cjson.safe"  -- 안전한 JSON 디코더 사용
local data = cjson.decode(body)
if not data then
    ngx.log(ngx.ERR, "RESTHeart 응답 JSON 파싱 실패")
    return
end

-- 예시: 응답 JSON에서 필요한 필드만 추출
local extracted = {
    id = data._id,           -- MongoDB 문서 ID
    name = data.name,        -- 예시 필드: name
    status = data.status     -- 예시 필드: status
}
local json_str = cjson.encode(extracted)  -- JSON 문자열로 변환
ngx.log(ngx.INFO, "추출된 데이터: ", json_str)

위 코드에서는 resty.http의 request_uri 함수를 사용하여 간편하게 HTTP GET 요청을 수행합니다​ res.body에는 RESTHeart로부터 받은 JSON 문자열이 들어있으며, 이를 cjson.safe.decode를 통해 Lua 테이블로 변환합니다. 그런 다음 원하는 필드 (id, name, status 등)만 골라 새로운 테이블을 만들고 cjson.encode로 JSON 형식 문자열로 준비합니다.

참고: RESTHeart 응답의 구조에 맞게 필드명을 변경해야 합니다. 예를 들어 MongoDB 문서의 필드명이 다르다면 그 키를 사용하면 됩니다. 또한, RESTHeart에서 쿼리 파라미터를 사용해 특정 조건의 문서를 조회하거나, _fields와 같은 매개변수로 필요한 필드만 선택해서 응답받을 수도 있습니다.

Kafka로 전송 (Kafka Producer)

이제 추출된 JSON 데이터를 Kafka로 보내는 방법입니다. OpenResty 기반의 APISIX에서는 lua-resty-kafka 라이브러리를 통해 Kafka 프로듀서 기능을 직접 구현할 수 있습니다​ APISIX의 기본 Kafka 로거(plugin)도 내부적으로 이 라이브러리를 사용하여 비동기적으로 로그를 전송합니다. 여기서는 플러그인 코드 내에서 직접 프로듀서를 생성하고 메시지를 보내는 방법을 예제로 보여줍니다.

Kafka로 데이터를 보내기 위해서는 브로커 정보(호스트/포트 목록)와 토픽명이 필요합니다. 필요에 따라 Kafka 클러스터의 보안 설정도 고려해야 합니다 (예: SASL 인증, TLS 암호화 등). 아래 Lua 코드 예제는 Kafka 브로커 두 개로 구성된 클러스터에 JSON 메시지를 전송하는 방법을 보여줍니다:

-- Kafka 브로커 설정
local producer = require "resty.kafka.producer"
local broker_list = {
    { host = "kafka1.mydomain.com", port = 9092 },
    { host = "kafka2.mydomain.com", port = 9092 }
}
-- (선택) SASL 인증이 필요한 경우 설정 예제:
-- broker_list[1].sasl_config = {
--     mechanism = "PLAIN",
--     user = "USERNAME",
--     password = "PASSWORD"
-- }
-- broker_list[2].sasl_config = broker_list[1].sasl_config  -- 동일한 인증 사용

local kafka_topic = "my_topic"  -- 전송할 토픽 이름

-- Kafka 프로듀서 생성 (비동기 모드 권장)
local p = producer:new(broker_list, { producer_type = "async" })
-- Kafka로 메시지 전송 (키 없이 전송; 필요하면 key 값 지정 가능)
local ok, err = p:send(kafka_topic, nil, json_str)
if not ok then
    ngx.log(ngx.ERR, "Kafka 전송 실패: ", err)
else
    ngx.log(ngx.INFO, "Kafka 토픽 '", kafka_topic, "'으로 메시지 전송 성공")
end

위 코드에서 producer:new를 호출할 때 producer_type = "async"로 설정하여 비동기 전송을 사용했습니다 비동기 모드에서는 메시지가 백그라운드로 전송되며 APISIX 워커(worker) 내에 프로듀서 객체가 재사용됩니다. 이는 동기 방식보다 지연을 줄이고 성능을 높여줍니다. p:send(topic, key, message)를 호출하면 지정한 토픽으로 메시지를 보냅니다. 예제에서는 key를 nil로 하여 파티션 키 없이 전송하지만, 필요하면 특정 키를 지정하여 Kafka 파티션을 제어할 수 있습니다.

필요한 설정 및 보안 고려 사항:

  • 브로커 목록: 최소 하나 이상의 Kafka 브로커 { host, port } 정보를 설정해야 합니다. 다중 브로커를 넣으면 장애 조치(failover)에 유리합니다.
  • SASL 인증: Kafka 클러스터가 인증을 요구한다면, sasl_config 옵션을 사용해 메커니즘 및 자격증명을 설정해야 합니다​. 예시는 PLAIN 방식이며, SCRAM-SHA-256/512 등도 지원됩니다 (이 경우 Lua용 추가 라이브러리 필요).
  • SSL/TLS: 브로커가 SSL 포트를 사용하는 경우 OpenResty가 SSL을 지원하도록 컴파일되어 있어야 하며, ssl 설정을 활성화하거나 kafka_ssl 모듈을 사용해야 합니다. APISIX 환경에서는 적절히 구성되어 있다면 SSL 브로커에도 연결 가능합니다.
  • 토픽 및 메시지 형식: 토픽 이름은 Kafka에 미리 생성되어 있어야 합니다. 메시지는 문자열로 전송되며, 여기서는 JSON 문자열을 전송하므로 소비자 측에서 JSON 파싱하여 사용할 수 있습니다. (예: {"id":"...","name":"...","status":"..."} 형식)
  • 전송 확인: 기본적으로 required_acks=1로 리더 브로커의 수신만 확인하지만, 중요 데이터의 경우 required_acks=-1 (모든 ISR 브로커 ack) 설정을 고려하세요​ 이 값은 producer:new의 옵션으로 설정할 수 있습니다.
  • 오류 처리: p:send의 반환값을 통해 전송 성공 여부를 판단합니다. 실패 시 (ok가 nil이거나 false) ngx.log(ngx.ERR, ...)으로 에러를 기록하고 필요하면 재시도를 할 수 있습니다. 비동기 모드에서는 라이브러리가 자체 재시도 및 배치 처리를 일부 해주지만, 브로커 불가 등으로 실패가 지속된다면 로그에 남겨두고 별도 조치가 필요합니다.

외부 REST API로 POST/PUT 전송

Kafka 대신 REST API로 데이터를 전송하려는 경우, Lua에서 HTTP POST/PUT 요청을 수행하여 외부 서비스의 엔드포인트로 JSON 데이터를 보낼 수 있습니다. APISIX Lua 플러그인에서 이를 구현하려면 앞서 사용한 lua-resty-http 모듈을 다시 활용하면 됩니다. resty.http의 request_uri 함수로 HTTP 메서드, 요청 URL, 헤더, 바디를 지정하여 호출할 수 있습니다

다음은 추출한 JSON 데이터를 외부 API에 POST로 전송하는 예시 Lua 코드입니다. conf.external_api_url은 플러그인 설정으로 주입된 대상 API URL이고, 필요에 따라 인증 헤더(예: Bearer 토큰)를 포함하고 있습니다:

-- 외부 REST API에 데이터 전달 예제
local target_url = conf.external_api_url  -- 예: "https://api.example.com/data"
local httpc2 = http.new()
local res2, err2 = httpc2:request_uri(target_url, {
    method = "POST",             -- 또는 "PUT"으로 변경 가능
    body = json_str,             -- RESTHeart에서 받은 일부 필드 JSON
    headers = {
        ["Content-Type"] = "application/json",
        ["Authorization"] = "Bearer xxxxxx"  -- 인증 필요 시 토큰/키 사용
    }
})
if not res2 then
    ngx.log(ngx.ERR, "외부 API 호출 실패: ", err2)
elseif res2.status >= 300 then
    ngx.log(ngx.ERR, "외부 API 응답 오류 상태: ", res2.status, " - 응답본문: ", res2.body)
else
    ngx.log(ngx.INFO, "외부 API 전송 성공, 응답 상태: ", res2.status)
end

위 코드에서는 request_uri로 POST 요청을 보내고, Content-Type: application/json 헤더를 설정하여 JSON 본문임을 알립니다. 또한 예시로 Authorization 헤더에 Bearer 토큰을 넣어 인증을 처리했습니다. 응답 객체 res2의 status를 확인하여 2xx(정상)인지 판단하고, 3xx 이상이면 오류로 간주하여 로그를 남깁니다. res2.body를 통해 외부 API의 응답 본문을 확인하거나 필요하다면 추가 처리도 가능합니다.

실패 시 재시도 및 로깅 전략:
외부 API 호출이 실패할 경우 (네트워크 장애, 5xx 서버 오류 등), 다음과 같은 전략을 고려할 수 있습니다:

  • 재시도: 즉시 실패로 처리하지 않고, 일정 횟수(n회) 재시도를 구현할 수 있습니다. 간단히 for 루프로 1~2회 추가 시도하거나, ngx.timer.at을 활용하여 지연 재시도를 비동기로 수행하는 방법이 있습니다. 단, 재시도 사이에 ngx.sleep(초) 등을 써서 약간의 딜레이를 줄 것을 권장합니다 (너무 빠른 반복 요청 방지). 재시도 횟수를 제한하여 무한 루프에 빠지지 않도록 주의합니다.
  • 백업 로깅: 최종적으로도 실패하면 해당 데이터를 나중에 재처리할 수 있게 로그 파일이나 별도 저장소(예: 데이터베이스, 메시지 큐의 DLQ)로 기록해두는 방법이 있습니다. APISIX 자체 로그로 남겨두면 운영자가 확인하여 수동으로 대응할 수 있고, 필요 시 이러한 실패 로그를 수집/모니터링하는 것도 고려해야 합니다.
  • 오류 응답 처리: 이 플러그인이 APISIX 요청 흐름에서 필수적인 작업이라면, 외부 전송 실패 시 클라이언트에 에러를 반환할지 여부를 결정해야 합니다. 예를 들어, 클라이언트에게 502 Bad Gateway 등으로 알릴 수도 있고, 아니면 플러그인은 백그라운드 처리용이고 클라이언트 응답은 정상 처리할 수도 있습니다. 사용 사례에 맞게 결정합니다.

마무리 및 활용 구성

위에서 설명한 두 가지 방식은 동시에 활용할 수도 있습니다. 하나의 Lua 플러그인에서 RESTHeart의 응답을 받아 Kafka외부 API둘 다 전송하도록 구현할 수 있습니다 (각각의 코드 블록을 순차적으로 실행). APISIX 플러그인을 경량화하기 위해 필요하지 않은 경우 한 가지만 활성화할 수도 있으며, 플러그인 설정에 플래그를 두어 Kafka 전송 여부나 외부 API 전송 여부를 제어하면 유연하게 동작시킬 수 있습니다.

실제 APISIX에 이 플러그인을 적용하려면, 플러그인 Lua 파일을 작성한 뒤 APISIX 설정(config.yaml 등)에 해당 플러그인을 등록하고 사용하고자 하는 라우트(route) 또는 서비스에 플러그인 설정을 추가해야 합니다. 예를 들어, Route 설정에서:

"plugins": {
    "my-data-forward-plugin": {
        "restheart_url": "http://restheart:8080/mydb/mycollection/1234",
        "kafka_brokers": [
            { "host": "kafka1.mydomain.com", "port": 9092 }
        ],
        "kafka_topic": "my_topic",
        "external_api_url": "https://api.example.com/data",
        "enable_kafka": true,
        "enable_http": true
    }
}

와 같이 설정하고 플러그인을 활성화하면, 해당 라우트로 들어오는 요청에 대해 플러그인이 동작하여 RESTHeart에서 데이터를 가져오고 설정에 따라 Kafka/HTTP로 전송하게 됩니다.

이 튜토리얼의 예제 코드는 이해를 돕기 위한 간략화된 것입니다. 실제 환경에서는 에러 처리, 보안 설정, 성능 고려(비동기 처리) 등을 꼼꼼히 적용해야 합니다. 하지만 여기 제시된 코드 조각들을 통해 APISIX Lua 플러그인에서 RESTHeart 연동 및 외부 시스템(Kafka, REST API)과의 데이터를 주고받는 방법의 전반적인 구현 방안을 익힐 수 있을 것입니다.

728x90