배경 및 요구사항
마이크로서비스 환경에서 각 비즈니스 모듈은 독립적인 Kafka 토픽을 소비해야 하는 경우가 많습니다. 모듈이 동적으로 추가되거나 모니터링 대상 토픽이 실시간으로 변경될 수 있으므로, 정적인 @KafkaListener 어노테이션만으로는 유연한 대응과 모듈 간 소비 격리를 달성하기 어렵습니다. 특히 토픽 패턴 매칭만으로는 명확한 모듈별 분리 소비가 불가능합니다. 이를 해결하기 위해 Spring Kafka의 ConcurrentMessageListenerContainer를 프로그래밍 방식으로 동적 생성하고 수명 주기를 관리하는 방안을 구현합니다.
프로젝트 의존성 및 설정
Spring Boot 기반의 프로젝트에서 Spring Kafka를 사용하기 위해 다음 의존성을 추가합니다.
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka-test</artifactId>
<scope>test</scope>
</dependency>
애플리케이션의 application.yml 또는 application.properties에 기본 컨슈머 설정을 정의합니다. 수동 커밋과 즉시 ACK 모드를 사용하여 메시지 처리의 안정성을 확보합니다.
spring.kafka.bootstrap-servers=kafka-node1:9092,kafka-node2:9092,kafka-node3:9092
spring.kafka.consumer.group-id=dynamic-module-group
spring.kafka.consumer.auto-offset-reset=earliest
spring.kafka.consumer.enable-auto-commit=false
spring.kafka.consumer.max-poll-records=100
spring.kafka.listener.ack-mode=manual_immediate
동적 리스너 컨테이너 구현
기존 컨테이너의 상태를 확인하고, 토픽 리스트에 변경 사항이 있을 경우 기존 인스턴스를 안전하게 중지한 뒤 새로운 설정으로 컨테이너를 재생성하는 핵심 로직입니다. 각 모듈의 ID를 키로 사용하여 활성 컨테이너를 관리합니다.
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;
import org.springframework.kafka.listener.ContainerProperties;
import org.springframework.kafka.listener.AcknowledgingMessageListener;
import org.springframework.kafka.core.ConsumerFactory;
import java.util.List;
import java.util.Arrays;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
public class DynamicKafkaConsumerManager {
private final ConsumerFactory<String, String> consumerFactory;
private final Map<String, ConcurrentMessageListenerContainer<String, String>> activeContainers = new ConcurrentHashMap<>();
public DynamicKafkaConsumerManager(ConsumerFactory<String, String> consumerFactory) {
this.consumerFactory = consumerFactory;
}
/**
* 특정 모듈에 대한 동적 토픽 리스너 컨테이너를 생성하고 시작합니다.
*
* @param moduleId 모듈 식별자
* @param moduleName 모듈 표시명
* @param targetTopics 소비할 토픽 목록
* @return 실행 중인 리스너 컨테이너
*/
public ConcurrentMessageListenerContainer<String, String> initializeAndStartContainer(
String moduleId, String moduleName, List<String> targetTopics) {
String[] topicArr = targetTopics.toArray(new String[0]);
boolean requiresRestart = true;
if (activeContainers.containsKey(moduleId)) {
String[] currentTopics = activeContainers.get(moduleId).getContainerProperties().getTopics();
if (Arrays.equals(currentTopics, topicArr)) {
requiresRestart = false;
} else {
log.info("모듈 [{}]의 토픽이 변경되었습니다. 컨슈머를 재시작합니다.", moduleId);
activeContainers.get(moduleId).stop();
}
}
if (!requiresRestart) {
return activeContainers.get(moduleId);
}
ContainerProperties containerProps = new ContainerProperties(topicArr);
int threadCount = Integer.parseInt(System.getProperty("kafka.listener.concurrency", "3"));
containerProps.setMessageListener((AcknowledgingMessageListener<String, String>) (record, acknowledgment) -> {
try {
processIncomingMessage(record);
} finally {
acknowledgment.acknowledge();
}
});
containerProps.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);
ConcurrentMessageListenerContainer<String, String> container =
new ConcurrentMessageListenerContainer<>(consumerFactory, containerProps);
container.setConcurrency(threadCount);
container.setBeanName(moduleName);
container.start();
activeContainers.put(moduleId, container);
return container;
}
private void processIncomingMessage(Object record) {
// 비즈니스 로직 처리
}
}
위 코드는 실행 중인 컨테이너를 ConcurrentHashMap에 캐싱하여 중복 생성을 방지하고, 토픽 변경 감지 시 기존 컨테이너를 안전하게 중지한 후 새로 시작하는 로직을 포함하고 있습니다. 이를 통해 동적으로 추가되는 모듈마다 독립적인 스레드 풀과 오프셋 관리가 가능해져, 특정 모듈의 지연이나 장애가 다른 모듈의 소비에 영향을 미치는 것을 원천적으로 차단할 수 있습니다.