Disruptor 프레임워크를 이용한 생산자-소비자 패턴 구현

 `Disruptor`는 LMAX사가 개발한 고성능 메모리 메시지 큐로 단일 스레드 처리 능력이 600만 주문/초에 달합니다. 본 글에서는 이 프레임워크를 사용하여 생산자-소비자 패턴을 구현해 보겠습니다.

1. Maven 의존성 설정

   <!-- https://mvnrepository.com/artifact/com.lmax/disruptor -->
        <dependency>
            <groupId>com.lmax</groupId>
            <artifactId>disruptor</artifactId>
            <version>3.4.2</version>
        </dependency>

2. 메시지 이벤트 클래스

package com.example.mq.system;

import com.lmax.disruptor.EventFactory;
import com.lmax.disruptor.EventProcessor;

/**
 * 메시지 이벤트 객체
 */
public class DataMessage {

    private String payload;

    public String getPayload() {
        return payload;
    }

    public void setPayload(String payload) {
        this.payload = payload;
    }
}


3. 메시지 이벤트 핸들러

package com.example.mq.system;

import com.lmax.disruptor.EventHandler;
import com.lmax.disruptor.WorkHandler;

/**
 * 메시지 처리 핸들러
 */
public class MessageProcessor implements EventHandler<DataMessage>, WorkHandler<DataMessage> {

    private String processorName;

    public MessageProcessor(String name) {
        this.processorName = name;
    }


    @Override
    public void onEvent(DataMessage event, long sequence, boolean endOfBatch) throws Exception {
        System.out.println(processorName+"-----처리 시작-----"+sequence);
        Thread.sleep(1000*10);
        System.out.println("스레드 이름:  "+Thread.currentThread().getName());
        System.out.println(event.getPayload()+" 처리 완료 시퀀스: "+sequence);
    }

    @Override
    public void onEvent(DataMessage event) throws Exception {
        System.out.println(processorName+"-----처리 시작-----");
        Thread.sleep(1000*10);
        System.out.println("스레드 이름:  "+Thread.currentThread().getName());
        System.out.println(event.getPayload());
        System.out.println(processorName+"-----처리 완료-----");


    }
}


이 메시지 핸들러는 두 개의 인터페이스를 구현합니다. EventHandler 인터페이스는 메시지가 모든 소비자에게 전달되는 통합 소비 방식을 구현하며, WorkHandler 인터페이스는 메시지가 하나의 소비자만 처리하는 그룹 소비 방식을 구현합니다. 여러 소비자가 순환 방식으로 메시지를 처리합니다.

4. Disruptor 설정

package com.example.mq.system;

import com.lmax.disruptor.*;
import com.lmax.disruptor.dsl.Disruptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * Disruptor 환경 설정
 */
@Configuration
public class DisruptorConfig {



    @Bean
    public RingBuffer<DataMessage> ringBuffer(){

        NamedThreadFactory threadFactory = new NamedThreadFactory("DataMessage-",true);
        EventFactory<DataMessage> eventFactory = new EventFactory<DataMessage>() {
            @Override
            public DataMessage newInstance() {
                return new DataMessage();
            }
        };


        Disruptor<DataMessage> disruptor = new Disruptor<>(eventFactory,1024, threadFactory);


//두 개의 소비자 정의
        MessageProcessor p1 = new MessageProcessor("p1");
        MessageProcessor p2 = new MessageProcessor("p2");

        //disruptor.handleEventsWith(p1,p2); //통합 소비: 모든 소비자가 모든 메시지를 처리

        disruptor.handleEventsWithWorkerPool(p1,p2);//그룹 소비: 각 메시지는 하나의 소비자만 처리, 여러 소비자가 순환 처리

        //disruptor.handleEventsWith(p1).then(p2);   //순차 소비: 1,3이 먼저 병렬 처리, 그 후 2가 처리

        disruptor.start();//다중 소비자 설정, 각 소비자는 별도 스레드로 처리
        return disruptor.getRingBuffer();
    }
}


5. 메시지 발행 클래스

package com.example.mq.system;

import com.lmax.disruptor.EventTranslatorOneArg;
import com.lmax.disruptor.RingBuffer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 메시지 발행 클래스
 */
@Component
public class MessagePublisher {






    public static void publish(String message){

       /**
         * 발행 성공 여부를 반환, 실패 시 이 값을 바탕으로 비즈니스 로직 처리 가능
         */
        boolean success = ringBuffer.tryPublishEvent(TRANSLATOR, message);
    }






    private static final EventTranslatorOneArg<DataMessage,String> TRANSLATOR =  new EventTranslatorOneArg<DataMessage,String>() {


        @Override
        public void translateTo(DataMessage event, long sequence, String arg0) {
            event.setPayload(arg0);
        }
    };

    private static RingBuffer<DataMessage> ringBuffer;

    @Autowired
    public  void setRingBuffer(RingBuffer<DataMessage> ringBuffer) {
        MessagePublisher.ringBuffer = ringBuffer;
    }
}


6. 테스트

현재 테스트는 그룹 모드를 사용하며, 하나의 메시지는 하나의 소비자만 처리하고 각 소비자는 별도 스레드로 동작하는 것을 확인할 수 있습니다.

7. 원리 설명

Disruptor 프레임워크의 핵심 원리는 RingBuffer(환형 버퍼) 배열 구조입니다. 이 구조를 통해 고성능 메시지 처리가 가능하며, 이에 대한 자세한 내용은 다음 기회에 더 깊게 다루도록 하겠습니다.

태그: Disruptor 생산자-소비자 패턴 Java 동시성 프로그래밍 메시지 큐 RingBuffer

6월 14일 19:55에 게시됨