스프링 부트와 RabbitMQ 연동: Redis와 ACK 메커니즘을 이용한 안정적인 메시지 처리

메시지의 유효성을 유지하기 위해, 메시지 소비 시 처리가 실패할 경우 소비를 중단하고 비즈니스 로직에 따라 ACK를 반환해야 합니다. 본 프로젝트에서는 Redis와 ACK 메커니즘을 이중 보험 방식으로 적용하여 메시지가 반드시 정확하게 소비되도록 보장합니다.

1. 기본 개념

먼저 Topic 방식을 사용합니다(이전 내용을 모르시면 해당 부분을 참고하시기 바랍니다). 이전 내용에서는 SpringBoot 애너테이션을 사용하여 구현했지만, 제어권이 완전하지 않아 대규모 프로젝트에서는 권장되지 않습니다.

2. 수동 ACK 설정

소스 코드 분석을 통해(여기서는 소스 코드 분석은 생략합니다), 구성 클래스에서 리스너를 정의하고 특정 큐를 수신해야 합니다(AcknowledgeMode.MANUAL는 필수입니다).

/**
 * 메시지 수신 리스너, 이 리스너는 고객 거래 내역 메시지를 수신합니다
 * 소비자 구성에 대한 설명
 * @return
 */
@Bean
public SimpleMessageListenerContainer messageContainer1(ConnectionFactory connectionFactory, TransactionConsumeImpl transactionConsume) {
    SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
    container.setQueues(queueMessage());
    container.setExposeListenerChannel(true);
    container.setMaxConcurrentConsumers(1);
    container.setConcurrentConsumers(1);
    container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 수동 확인 모드 설정
    container.setMessageListener(transactionConsume);
    return container;
}

TransactionConsumeImplChannelAwareMessageListener를 상속해야 하며, 수동 ACK 반환은 채널을 통해 이루어집니다.

@Component
public class TransactionConsumeImpl implements ChannelAwareMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(TransactionConsumeImpl.class);
    private static final Gson gson = new Gson();
    
    @Autowired
    private JedisShardInfo jedisShardInfo;
    
    @Autowired
    private ExecutorService threadPool;
    
    @Autowired
    private BoluomeFlowService boluomeFlowService;
    
    @Override
    public void onMessage(Message message, Channel channel) throws Exception {
        String body = new String(message.getBody(), "utf-8"); // 메시지 변환, JSON 데이터 형식 사용
        threadPool.execute(new Runnable() {   // 다중 스레드 처리
            @Override
            public void run() {
                Jedis jedis = jedisShardInfo.createResource();
                jedis.sadd(TopicRabbitConfig.TRANSACTION_QUEUE, body); // 현재 메시지 유형의 키에 추가하여 메시지 손실 방지
                BoluomeFlow flow = gson.fromJson(body, BoluomeFlow.class);
                String json = gson.toJson(flow);
                
                if (boluomeFlowService.insert(flow)) {  // 추가 성공 시 성공 반환
                    logger.info("고객 거래 내역 1건 추가:{}", json);
                    jedis.srem(TopicRabbitConfig.TRANSACTION_QUEUE, body); // 이미 소비된 메시지 형식의 세트에서 제거
                    try {
                        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 수동 ACK 반환, 이 메시지가 정상적으로 소비되었음을 알림
                    } catch (IOException ie) {
                        logger.error("소비 성공 콜백 시 IO 작업 예외");
                    }
                } else {
                    logger.info("고객 거래 내역 추가 실패 기록:{}", json);
                }
            }
        });
    }
}

3. ACK 관련 메서드 설명

channel.basicAck(message.getMessageProperties().getDeliveryTag(), false); // 메시지 식별자, false는 현재 메시지만 수신 확인, true는 모든 소비자가 받은 메시지 확인
channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // ack가 false를 반환하고 큐로 다시 전송, API에 설명이 명확함
  • channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 메시지 거부
  • true: 다음 소비자에게 전송
  • false: 아무도 수신하지 않고 큐에서 삭제

4. RabbitMQ 고급 설정

import com.rabbitmq.client.Channel;
import config.callback.ConfirmCallBackListener;
import config.callback.ReturnCallBackListener;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.AcknowledgeMode;
import org.springframework.amqp.core.AmqpAdmin;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.annotation.EnableRabbit;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.rabbit.config.SimpleRabbitListenerContainerFactory;
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitAdmin;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.ComponentScan;
import org.springframework.context.annotation.Configuration;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import po.Mail;
import rabbitMQ.listener.TransactionConsumerImpl;

import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

// RabbitMQ 기본 연결 구성
@Configuration
@ComponentScan(basePackages = {"rabbitMQ.listener","config.callback"})
@EnableRabbit
public class RabbitMQConfiguration {
    
    private static final Logger logger = LoggerFactory.getLogger(RabbitMQConfiguration.class);
    
    @Autowired
    private TransactionConsumerImpl transactionConsumer;
    
    @Autowired
    private ConfirmCallBackListener confirmCallBackListener;
    
    @Autowired
    private ReturnCallBackListener returnCallBackListener;
    
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("mq.xxx.cn");
        connectionFactory.setUsername("admin");
        connectionFactory.setPassword("admin");
        connectionFactory.setPort(5672);
        return connectionFactory;
    }
    
    @Bean
    public AmqpAdmin amqpAdmin() {
        return new RabbitAdmin(connectionFactory());
    }
    
    @Bean
    public RabbitTemplate rabbitTemplate() {
        RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory());
        
        // 메시지 큐 사용 시 고려해야 할 문제는 생산자 메시지 전송 실패와 소비자 메시지 처리 실패입니다.
        // 생산자 메시지 전송 성공 시 메시지 전송 성공 확인, 실패 시 메시지 전송 실패 정보 반환 후 처리
        // 소비자 메시지 처리 성공 시 메시지 큐에서 자동 삭제, 실패 시 메시지를 다시 큐로 반환하여 대기
        
        // 메시지 확인 리스너
        rabbitTemplate.setConfirmCallback(confirmCallBackListener);
        // 메시지 전송 실패 반환 리스너
        rabbitTemplate.setReturnCallback(returnCallBackListener);
        // mandatory를 true로 설정해야 return callback이 작동합니다
        rabbitTemplate.setMandatory(true);
        
        return rabbitTemplate;
    }
    
    @Bean
    public Queue paymentQueue() {
        // 두 번째 매개변수 durable=true는 지속성을 의미합니다
        Queue queue = new Queue("paymentQueue", true);
        return queue;
    }
    
    /**
     * 스레드 풀 초기화 - 다중 스레드로 소비 작업 실행
     */
    @Bean
    public ExecutorService threadPool(){
        return Executors.newFixedThreadPool(20);
    }
    
    /**
     * 소비자 측에서는 SimpleRabbitListenerContainerFactory만 생성하면 됩니다.
     * 이것이 RabbitListenerContainer를 생성하는 데 도움이 되며,
     * 그런 다음 @RabbitListener를 사용하여 수신자가 정보를 받았을 때 처리할 메서드를 지정할 수 있습니다.
     */
    @Bean
    public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory() {
        SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
        factory.setConnectionFactory(connectionFactory());
        factory.setConcurrentConsumers(3);
        factory.setMaxConcurrentConsumers(10);
        factory.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return factory;
    }
    
    // 소비자 구성: @RabbitListener와 @Bean SimpleMessageListenerContainer 방식
    
    /**
     * 소비자 구성에 대한 설명
     * 방법 1: 각 Queue에 대해 하나의 SimpleMessageListenerContainer 지정
     * MessageListener implements ChannelAwareMessageListener 인터페이스 구현
     * onMessage 메서드 직접 구현
     * 
     * 역할: 지정된 Queue(paymentQueue)의 고객 거래 내역 메시지를 수신하는 리스너
     * @return
     */
    @Bean
    public SimpleMessageListenerContainer simpleMessageListenerContainer(ConnectionFactory connectionFactory) {
        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        container.setQueues(paymentQueue());
        container.setExposeListenerChannel(true);
        container.setMaxConcurrentConsumers(1);
        container.setConcurrentConsumers(1);
        // 수동 확인 모드 설정
        container.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        container.setMessageListener(transactionConsumer);
        return container;
    }
    
    // 방법 2: @RabbitListener 사용
    @RabbitListener(queues = "paymentQueue") 
    public void processPayment(@Payload Mail mail, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag, Channel channel) throws Exception{
        logger.info("rabbit receiver message:{}", mail);
        try {
            channel.basicAck(deliveryTag, false);
        } catch (Exception e) {
            logger.error("process message error: {}", e);
        }
    }
}

5. 메시지 수신 구현 클래스

import com.alibaba.fastjson.JSONObject;
import com.rabbitmq.client.Channel;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.core.ChannelAwareMessageListener;
import org.springframework.amqp.support.converter.SimpleMessageConverter;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import po.Mail;

import java.io.IOException;
import java.util.concurrent.ExecutorService;

@Service("transactionConsumerImpl")
public class PaymentConsumerImpl implements ChannelAwareMessageListener {
    private static final Logger logger = LoggerFactory.getLogger(PaymentConsumerImpl.class);
    
    private SimpleMessageConverter converter = new SimpleMessageConverter();
    
    @Autowired
    private ExecutorService threadPool;
    
    // 메시지 처리 성공 후 ack 확인 전송 또는 실패 후 nack 전송하여 메시지 재전송
    public void onMessage(final Message message, final Channel channel) throws Exception {
        final String body = new String(message.getBody(), "utf-8"); // 메시지 변환, JSON 데이터 형식 사용
        Object msg = converter.fromMessage(message);
        
        // 메일 처리 로직
        System.out.println(JSONObject.toJSONString(msg));
        
        try {
            // 수동 ACK 반환, 이 메시지가 정상적으로 소비되었음을 알림
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e1) {
            e1.printStackTrace();
            System.out.println("메시지가 반복적으로 처리 실패, 다시 수신 거부... 실패");
        }
    }
}

6. 메시지 거부 및 재큐 메커니즘

메시지 처리 중 예외가 발생할 경우, 메시지가 이미 재전송된(redelivered) 상태인지 확인하여 적절한 조치를 취할 수 있습니다:

try {
    // 메시지 처리 로직
    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (Exception e) {
    e.printStackTrace();
    
    if (message.getMessageProperties().getRedelivered()) {
        System.out.println("메시지가 반복적으로 처리 실패, 다시 수신 거부...");
        try {
            channel.basicReject(message.getMessageProperties().getDeliveryTag(), true); // 메시지 거부
        } catch (IOException e1) {
            e1.printStackTrace();
            System.out.println("메시지가 반복적으로 처리 실패, 다시 수신 거부... 실패");
        }
    } else {
        System.out.println("메시지가 곧 다시 큐로 반환될 것입니다...");
        try {
            channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true); // requeue: 메시지를 다시 큐로 보낼지 여부
        } catch (IOException e1) {
            e1.printStackTrace();
            System.out.println("requeue: 메시지를 다시 큐로 보낼지 여부... 실패");
        }
    }
}

이러한 이중 보험 방식(Redis와 ACK 메커니즘)을 통해 메시지 손실을 방지하고 시스템의 안정성을 높일 수 있습니다.

태그: RabbitMQ SpringBoot Redis MessageQueue ACK

6월 22일 02:27에 게시됨