기타

Kafka Connect 기반 Base64 이미지 자동 저장 설계

ipxy 2025. 4. 11. 13:05
728x90

 

📄 Kafka Connect 기반 Base64 이미지 자동 저장 설계

1. 개요

Kafka 프로듀서로 전송되는 Base64 인코딩된 이미지 데이터
Kafka Connect 환경에서 자동 디코딩하여,
파일명 기준으로 지정된 폴더에 저장하는 기능을 구현한다.

목표:

  • Kafka 메시지 수신 → Base64 디코딩 → 파일명으로 폴더 저장
  • 장애 발생 시에도 운영 중단 없이 계속 처리

2. 아키텍처 흐름

[Kafka Producer] 
   ↓ (topic: images-topic)
[Kafka Connect - Custom SinkConnector] 
   ↓ (Base64 디코딩 + 파일 저장)
[Filesystem 저장 (예: /tmp/decoded_images)]
  • Producer: filename, image 필드 포함한 JSON 메시지 전송
  • Kafka Connect: 커스텀 SinkConnector(Base64FileSinkConnector) 사용
  • 파일 저장 경로: /tmp/decoded_images/ (커넥터 설정 가능)

3. Kafka 메시지 포맷 (예시)

{
  "filename": "photo1.jpg",
  "image": "iVBORw0KGgoAAAANSUhEUgAAAAUA..."
}
  • filename: 저장할 파일 이름
  • image: Base64로 인코딩된 이미지 데이터

4. 설계 상세

4.1 커스텀 SinkConnector 설계 (Base64FileSinkConnector)

항목 설명

Connector Class com.example.Base64FileSinkConnector
Task Class com.example.Base64FileSinkTask
주요 기능 ① Kafka 메시지 수신② Base64 디코딩③ 지정 폴더에 파일 저장
디렉토리 자동 생성 시작 시 output.directory 존재 여부 확인 후 생성
장애 허용 개별 레코드 처리 실패 시 무시 (errors.tolerance=all)

4.2 주요 Java 코드 구조

  • Base64FileSinkConnector.java
public class Base64FileSinkConnector extends SinkConnector {
    // start, stop, taskClass, config 설정
}
  • Base64FileSinkTask.java
public class Base64FileSinkTask extends SinkTask {
    public void put(Collection<SinkRecord> records) {
        for (SinkRecord record : records) {
            // filename과 image 필드 추출
            // base64 디코딩 후 파일로 저장
        }
    }
}

파일 저장 로직 요약:

  1. filename과 image 필드 추출
  2. Base64 디코딩
  3. output.directory/filename 으로 저장
  4. 에러 발생 시 레코드 스킵하고 다음 진행

5. KafkaConnector 리소스(YAML)

apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
  name: base64-file-sink-connector
  labels:
    strimzi.io/cluster: my-connect-cluster
spec:
  class: com.example.Base64FileSinkConnector
  tasksMax: 1
  config:
    topics: images-topic
    output.directory: /tmp/decoded_images
    key.converter: org.apache.kafka.connect.storage.StringConverter
    value.converter: org.apache.kafka.connect.json.JsonConverter
    value.converter.schemas.enable: false
    errors.tolerance: all

 

설정  항목 설명
topics 수신할 Kafka 토픽 (예: images-topic)
output.directory 저장할 파일 시스템 경로
errors.tolerance 오류 허용 여부 (all 설정 시 레코드 오류 무시)

6. 배포 및 운영 절차

6.1 빌드

  1. Maven 또는 Gradle로 커넥터 코드를 빌드하여 .jar 파일 생성
  2. Kafka Connect 플러그인 디렉토리에 jar 복사
    • 예: /opt/kafka/connect-plugins/base64-file-sink/base64-file-sink-connector.jar
  3. Kafka Connect 워커 재시작 또는 REST API 플러그인 리로드

6.2 운영

  • Connector가 정상 기동되면 지정된 폴더에 파일 자동 저장
  • 오류 발생 시, DLQ(Dead Letter Queue) 설정하여 문제 메시지 별도 관리 가능

7. 고려사항 및 확장 계획

구분 고려사항
성능 대용량 이미지 처리 시 병렬 작업 고려 (multi-task)
파일 포맷 검증 특정 파일 포맷만 저장하도록 MIME Type 검사 기능 추가 가능
보안 output.directory 접근 제어 권한 설정 필요
모니터링 저장 실패 건수, 성공 건수 지표 수집 가능 (JMX/Prometheus)
고급 기능 파일 저장 성공 시 별도 Kafka 토픽에 알림 발송 기능 추가 가능

 

728x90