티스토리 뷰
728x90
📄 Kafka Connect 기반 Base64 이미지 자동 저장 설계
1. 개요
Kafka 프로듀서로 전송되는 Base64 인코딩된 이미지 데이터를
Kafka Connect 환경에서 자동 디코딩하여,
파일명 기준으로 지정된 폴더에 저장하는 기능을 구현한다.
✅ 목표:
- Kafka 메시지 수신 → Base64 디코딩 → 파일명으로 폴더 저장
- 장애 발생 시에도 운영 중단 없이 계속 처리
2. 아키텍처 흐름
[Kafka Producer]
↓ (topic: images-topic)
[Kafka Connect - Custom SinkConnector]
↓ (Base64 디코딩 + 파일 저장)
[Filesystem 저장 (예: /tmp/decoded_images)]
- Producer: filename, image 필드 포함한 JSON 메시지 전송
- Kafka Connect: 커스텀 SinkConnector(Base64FileSinkConnector) 사용
- 파일 저장 경로: /tmp/decoded_images/ (커넥터 설정 가능)
3. Kafka 메시지 포맷 (예시)
{
"filename": "photo1.jpg",
"image": "iVBORw0KGgoAAAANSUhEUgAAAAUA..."
}
- filename: 저장할 파일 이름
- image: Base64로 인코딩된 이미지 데이터
4. 설계 상세
4.1 커스텀 SinkConnector 설계 (Base64FileSinkConnector)
항목 설명
Connector Class | com.example.Base64FileSinkConnector |
Task Class | com.example.Base64FileSinkTask |
주요 기능 | ① Kafka 메시지 수신② Base64 디코딩③ 지정 폴더에 파일 저장 |
디렉토리 자동 생성 | 시작 시 output.directory 존재 여부 확인 후 생성 |
장애 허용 | 개별 레코드 처리 실패 시 무시 (errors.tolerance=all) |
4.2 주요 Java 코드 구조
- Base64FileSinkConnector.java
public class Base64FileSinkConnector extends SinkConnector {
// start, stop, taskClass, config 설정
}
- Base64FileSinkTask.java
public class Base64FileSinkTask extends SinkTask {
public void put(Collection<SinkRecord> records) {
for (SinkRecord record : records) {
// filename과 image 필드 추출
// base64 디코딩 후 파일로 저장
}
}
}
파일 저장 로직 요약:
- filename과 image 필드 추출
- Base64 디코딩
- output.directory/filename 으로 저장
- 에러 발생 시 레코드 스킵하고 다음 진행
5. KafkaConnector 리소스(YAML)
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: base64-file-sink-connector
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: com.example.Base64FileSinkConnector
tasksMax: 1
config:
topics: images-topic
output.directory: /tmp/decoded_images
key.converter: org.apache.kafka.connect.storage.StringConverter
value.converter: org.apache.kafka.connect.json.JsonConverter
value.converter.schemas.enable: false
errors.tolerance: all
설정 | 항목 설명 |
topics | 수신할 Kafka 토픽 (예: images-topic) |
output.directory | 저장할 파일 시스템 경로 |
errors.tolerance | 오류 허용 여부 (all 설정 시 레코드 오류 무시) |
6. 배포 및 운영 절차
6.1 빌드
- Maven 또는 Gradle로 커넥터 코드를 빌드하여 .jar 파일 생성
- Kafka Connect 플러그인 디렉토리에 jar 복사
- 예: /opt/kafka/connect-plugins/base64-file-sink/base64-file-sink-connector.jar
- Kafka Connect 워커 재시작 또는 REST API 플러그인 리로드
6.2 운영
- Connector가 정상 기동되면 지정된 폴더에 파일 자동 저장
- 오류 발생 시, DLQ(Dead Letter Queue) 설정하여 문제 메시지 별도 관리 가능
7. 고려사항 및 확장 계획
구분 | 고려사항 |
성능 | 대용량 이미지 처리 시 병렬 작업 고려 (multi-task) |
파일 포맷 검증 | 특정 파일 포맷만 저장하도록 MIME Type 검사 기능 추가 가능 |
보안 | output.directory 접근 제어 권한 설정 필요 |
모니터링 | 저장 실패 건수, 성공 건수 지표 수집 가능 (JMX/Prometheus) |
고급 기능 | 파일 저장 성공 시 별도 Kafka 토픽에 알림 발송 기능 추가 가능 |
728x90
'기타' 카테고리의 다른 글
APISIX에서 HTTP 요청 흐름 가로채기 (0) | 2025.04.12 |
---|---|
Kafka Connect 커스텀 SMT - JSON 데이터 외부 API POST 호출 + MongoDB Sink Connector (0) | 2025.04.12 |
curl 주요 옵션 정리 (0) | 2025.04.03 |
Apache APISIX 로 API Gateway(Spring Cloud Gateway) 대체 (0) | 2025.03.31 |
RabbitMQ → Kafka 직접 치환 (0) | 2025.03.31 |