Spring Boot 기반 Kafka 메시지 처리 구현

Kafka 통합을 위한 Spring Boot 설정

Spring Boot 애플리케이션에서 Apache Kafka와의 통신을 구현하기 위해선 적절한 의존성 추가와 구성이 필요하다. 본 문서는 인증 보안(SASL/SCRAM)이 적용된 Kafka 클러스터와의 연결부터, 메시지 송수신 로직 구현까지를 다룬다.

의존성 설정 (Maven)

Kafka 통신을 위해 spring-kafka 모듈을 프로젝트에 포함시킨다.

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.8.5</version>
</dependency>

프로듀서 측 설정 (application.yml)

Kafka 프로듀서는 아래와 같은 보안 및 성능 관련 파라미터를 포함하여 구성된다.

spring:
  kafka:
    bootstrap-servers: 17*.***.***.***:19091,17*.***.***.***:19091,17*.***.***.***:19091
    properties:
      sasl:
        mechanism: SCRAM-SHA-256
        jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="***" password="***";
      security.protocol: SASL_PLAINTEXT
    producer:
      retries: 3
      acks: all
      batch-size: 512000
      properties:
        linger.ms: 1
      buffer-memory: 33554432
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
      compression-type: lz4
      properties:
        enable.idempotence: false

메시지 전송 유틸리티 클래스

전역적으로 사용할 수 있는 Kafka 전송 헬퍼 클래스를 정의한다. 스프링 컨텍스트에서 KafkaTemplate을 주입받아 비동기 전송과 콜백 처리를 수행한다.

package com.example.util;

import cn.hutool.extra.spring.SpringUtil;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.util.concurrent.FailureCallback;
import org.springframework.util.concurrent.SuccessCallback;

@Slf4j
public class KafkaMessageSender {

    private static final KafkaTemplate<String, String> template = SpringUtil.getBean(KafkaTemplate.class);

    private static final SuccessCallback onSuccess = result -> log.info("메시지 전송 성공: {}", result);

    private static final FailureCallback onFailure = ex -> log.error("메시지 전송 실패: ", ex);

    public static <T> void sendMessage(String topic, T payload) {
        sendMessage(topic, payload, onSuccess, onFailure);
    }

    public static <T> void sendMessage(String topic, T payload, SuccessCallback success, FailureCallback failure) {
        String jsonPayload = JSONObject.toJSONString(payload);
        template.send(topic, jsonPayload).addCallback(success, failure);
    }
}

샘플 데이터 생성 및 전송 서비스

테스트용 데이터를 생성하고 Kafka 토픽으로 전송하는 서비스 예제이다.

@Service
@Slf4j
public class DataProducerService {

    @Value("${test.maxrows}")
    private int maxRows;

    private static final String[] MOBILE_PREFIXES = {
        "134", "135", "136", "137", "138", "139",
        "150", "151", "152", "157", "158", "159",
        "180", "181", "182", "183", "184", "185",
        "174", "192", "178"
    };

    public void publishTestData() {
        Random random = new Random();
        for (int i = 0; i < maxRows; i++) {
            DataRecord record = new DataRecord();
            record.setTimestamp(new Date());
            record.setPhoneNumber(generatePhoneNumber(random));
            record.setValue1(random.nextFloat());
            record.setValue2(random.nextDouble());

            MessageEnvelope envelope = new MessageEnvelope();
            envelope.setData(JSONObject.toJSONString(record));
            envelope.setRequestId(java.util.UUID.randomUUID().toString());
            envelope.setSendTime(new Date());

            KafkaMessageSender.sendMessage("user-activity-test", envelope);
        }
    }

    private String generatePhoneNumber(Random rand) {
        StringBuilder sb = new StringBuilder(MOBILE_PREFIXES[rand.nextInt(MOBILE_PREFIXES.length)]);
        for (int i = 0; i < 8; i++) {
            sb.append(rand.nextInt(10));
        }
        return sb.toString();
    }
}

컨슈머 측 yml 설정

메시지 소비자는 수동 커밋 모드로 동작하며, 배치 처리와 오류 핸들링을 고려해 구성한다.

spring:
  kafka:
    consumer:
      enable-auto-commit: false
      auto-offset-reset: latest
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      max-poll-records: 1000
      properties:
        session.timeout.ms: 120000
        request.timeout.ms: 600000
    listener:
      ack-mode: manual_immediate
      concurrency: 3
      type: batch
      missing-topics-fatal: false
    properties:
      sasl:
        mechanism: SCRAM-SHA-256
        jaas.config: org.apache.kafka.common.security.scram.ScramLoginModule required username="***" password="***";
      security.protocol: SASL_PLAINTEXT

고급 컨슈머 팩토리 설정

배치 및 단일 레코드 오류 시 Redis로 장애 메시지를 저장하는 커스텀 에러 핸들러를 정의한다.

@Component
@Slf4j
public class KafkaConsumerConfiguration {

    @Autowired
    private ConcurrentKafkaListenerContainerFactory<Object, Object> containerFactory;

    @Autowired
    private RedisClient redisClient; // 커스텀 Redis 접근 컴포넌트

    @Bean("customListenerContainerFactory")
    public ConcurrentKafkaListenerContainerFactory<Object, Object> customContainerFactory() {
        containerFactory.getContainerProperties().setPollTimeout(3000);

        // 단일 레코드 오류 처리
        containerFactory.setErrorHandler((exception, record, consumer) -> {
            log.error("단건 수신 오류: {}", record.topic(), exception);
            try {
                MessageEnvelope msg = JSONUtil.toBean(record.value().toString(), MessageEnvelope.class);
                redisClient.saveToHash("failed-messages", msg.getRequestId(), record.value().toString());
            } catch (Exception e) {
                log.warn("Redis 저장 실패", e);
            }
        });

        // 배치 오류 처리
        containerFactory.setBatchErrorHandler((exception, records) -> {
            log.error("배치 수신 중 오류 발생", exception);
            records.forEach(record -> {
                try {
                    MessageEnvelope msg = JSONUtil.toBean(record.value().toString(), MessageEnvelope.class);
                    redisClient.saveToHash("failed-messages", msg.getRequestId(), record.value().toString());
                } catch (Exception e) {
                    log.warn("개별 레코드 처리 실패", e);
                }
            });
        });

        return containerFactory;
    }
}

메시지 수신 및 처리 서비스

Kafka 리스너를 통해 메시지를 수신하고, 파이프라인 방식으로 Redis에 일괄 저장한다.

@Service
@Slf4j
public class MessageConsumerService {

    @Resource(name = "secondaryRedisTemplate")
    private RedisTemplate<String, Object> redisTemplate;

    @Autowired
    private ExecutorService processingPool;

    @KafkaListener(
        topics = "user-activity-test",
        groupId = "test-consumer-group",
        containerFactory = "customListenerContainerFactory",
        concurrency = "16"
    )
    public void processMessages(List<ConsumerRecord<String, String>> records, Acknowledgment ack) {
        log.info("{}개의 메시지 수신", records.size());
        long start = System.currentTimeMillis();

        try {
            List<MessageEnvelope> envelopes = records.stream()
                .map(record -> JSONUtil.toBean(record.value(), MessageEnvelope.class))
                .toList();

            // Redis 파이프라이닝을 통한 고속 저장
            redisTemplate.executePipelined((RedisCallback<Object>) connection -> {
                Random rand = new Random();
                for (MessageEnvelope envelope : envelopes) {
                    String key = "event:" + rand.nextInt(1000);
                    byte[] val = envelope.getData().getBytes();
                    connection.setEx(key.getBytes(), 120, val);
                }
                return null;
            });

        } catch (Exception e) {
            log.error("메시지 처리 중 오류", e);
        } finally {
            log.info("처리 완료, 소요 시간: {}ms", System.currentTimeMillis() - start);
            ack.acknowledge(); // 수동 커밋
        }
    }
}

태그: spring-boot kafka spring-kafka scram-authentication redis-pipelining

5월 27일 03:28에 게시됨