Spring Kafka를 활용한 동적 토픽 관리 및 격리된 컨슈머 컨테이너 구현

배경 및 요구사항

마이크로서비스 환경에서 각 비즈니스 모듈은 독립적인 Kafka 토픽을 소비해야 하는 경우가 많습니다. 모듈이 동적으로 추가되거나 모니터링 대상 토픽이 실시간으로 변경될 수 있으므로, 정적인 @KafkaListener 어노테이션만으로는 유연한 대응과 모듈 간 소비 격리를 달성하기 어렵습니다. 특히 토픽 패턴 매칭만으로는 명확한 모듈별 분리 소비가 불가능합니다. 이를 해결하기 위해 Spring Kafka의 ConcurrentMessageListenerContainer를 프로그래밍 방식으로 동적 생성하고 수명 주기를 관리하는 방안을 구현합니다.

프로젝트 의존성 및 설정

Spring Boot 기반의 프로젝트에서 Spring Kafka를 사용하기 위해 다음 의존성을 추가합니다.

<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
  <groupId>org.springframework.kafka</groupId>
  <artifactId>spring-kafka-test</artifactId>
  <scope>test</scope>
</dependency>

애플리케이션의 application.yml 또는 application.properties에 기본 컨슈머 설정을 정의합니다. 수동 커밋과 즉시 ACK 모드를 사용하여 메시지 처리의 안정성을 확보합니다.

spring.kafka.bootstrap-servers=kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
spring.kafka.consumer.group-id=dynamic-module-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.ack-mode=manual_immediate

동적 리스너 컨테이너 구현

기존 컨테이너의 상태를 확인하고, 토픽 리스트에 변경 사항이 있을 경우 기존 인스턴스를 안전하게 중지한 뒤 새로운 설정으로 컨테이너를 재생성하는 핵심 로직입니다. 각 모듈의 ID를 키로 사용하여 활성 컨테이너를 관리합니다.

import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.core.ConsumerFactory;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;

public class DynamicKafkaConsumerManager {

    private final ConsumerFactory<String, String> consumerFactory;
    private final Map<String, ConcurrentMessageListenerContainer<String, String>> activeContainers = new ConcurrentHashMap<>();
    
    public DynamicKafkaConsumerManager(ConsumerFactory<String, String> consumerFactory) {
        this.consumerFactory = consumerFactory;
    }

    /**
     * 특정 모듈에 대한 동적 토픽 리스너 컨테이너를 생성하고 시작합니다.
     *
     * @param moduleId   모듈 식별자
     * @param moduleName 모듈 표시명
     * @param targetTopics 소비할 토픽 목록
     * @return 실행 중인 리스너 컨테이너
     */
    public ConcurrentMessageListenerContainer<String, String> initializeAndStartContainer(
            String moduleId, String moduleName, List<String> targetTopics) {
        
        String[] topicArr = targetTopics.toArray(new String[0]);
        boolean requiresRestart = true;

        if (activeContainers.containsKey(moduleId)) {
            String[] currentTopics = activeContainers.get(moduleId).getContainerProperties().getTopics();
            if (Arrays.equals(currentTopics, topicArr)) {
                requiresRestart = false;
            } else {
                log.info("모듈 [{}]의 토픽이 변경되었습니다. 컨슈머를 재시작합니다.", moduleId);
                activeContainers.get(moduleId).stop();
            }
        }

        if (!requiresRestart) {
            return activeContainers.get(moduleId);
        }

        ContainerProperties containerProps = new ContainerProperties(topicArr);
        int threadCount = Integer.parseInt(System.getProperty("kafka.listener.concurrency", "3"));

        containerProps.setMessageListener((AcknowledgingMessageListener<String, String>) (record, acknowledgment) -> {
            try {
                processIncomingMessage(record);
            } finally {
                acknowledgment.acknowledge();
            }
        });

        containerProps.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        ConcurrentMessageListenerContainer<String, String> container =
                new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);

        container.setConcurrency(threadCount);
        container.setBeanName(moduleName);
        container.start();

        activeContainers.put(moduleId, container);
        return container;
    }

    private void processIncomingMessage(Object record) {
        // 비즈니스 로직 처리
    }
}

위 코드는 실행 중인 컨테이너를 ConcurrentHashMap에 캐싱하여 중복 생성을 방지하고, 토픽 변경 감지 시 기존 컨테이너를 안전하게 중지한 후 새로 시작하는 로직을 포함하고 있습니다. 이를 통해 동적으로 추가되는 모듈마다 독립적인 스레드 풀과 오프셋 관리가 가능해져, 특정 모듈의 지연이나 장애가 다른 모듈의 소비에 영향을 미치는 것을 원천적으로 차단할 수 있습니다.

태그: SpringKafka ApacheKafka SpringBoot ConcurrentMessageListenerContainer microservices

7월 2일 04:03에 게시됨