RocketMQ Producer 초기화와 내부 동작 메커니즘 분석

RocketMQ 메시지 플랫폼에서 Producer는 메시지 생성 및 전송을 담당하는 핵심 구성 요소다. 이번 글에서는 Producer의 생성부터 실제 운영에 필요한 내부 메커니즘까지 단계별로 살펴본다.

Producer 기본 구조

Producer를 사용하려면 먼저 DefaultMQProducer 인스턴스를 생성하고 필수 설정을 완료한 후 start() 메서드를 호출해야 한다. 가장 간단한 사용 예시는 다음과 같다.

DefaultMQProducer sender = new DefaultMQProducer("order_service_group");
sender.setNamesrvAddr("localhost:9876");
sender.start();

Message payload = new Message("order_topic", "create", "order-1001".getBytes());
SendResult result = sender.send(payload);

여기서 start() 호출이 실제로 어떤 일을 벌이는지가 핵심이다. 표면적으로는 단순한 초기화처럼 보이지만, 내부적으로는 다수의 백그라운드 서비스가 가동된다.

초기화 단계별 파헤치기

1단계: 서비스 상태 관리

Producer는 상태 머신 패턴으로 동작하며, 다음 네 가지 상태를 가진다.

상태의미
CREATE_JUST객체만 생성된 초기 상태
RUNNING정상 운영 중
START_FAILED시작 과정에서 예외 발생
SHUTDOWN_ALREADY종료 완료

상태 전이는 단방향으로 설계되어 있어, 한번 실패한 인스턴스를 재사용하지 못하도록 방어적으로 구현되어 있다.

2단계: 클라이언트 인스턴스 획득

각 Producer는 독립적으로 네트워크 연결을 관리하지 않는다. 대신 MQClientManager라는 싱글턴 관리자를 통해 MQClientInstance를 공유받는다.

// 클라이언트 식별자 생성 규칙
String clientId = clientIP + "@" + instanceName;

// 인스턴스는 전역 캐시에서 관리
ConcurrentMap<String, MQClientInstance> instanceCache;

식별자에 IP와 나노타임 기반의 고유 문자열을 조합하는 이유는, 동일 JVM 내에서 여러 Producer/Consumer가 각각 독립적인 통신 채널을 유지해야 하기 때문이다.

3단계: 생산자 등록

획득한 클라이언트 인스턴스 내부에는 생산자 테이블이 존재한다.

// MQClientInstance 내부
private final ConcurrentMap<String, MQProducerInner> producerRegistry;

여기서 키는 생산자 그룹명이며, 동일 그룹 내에서 인스턴스를 식별하는 근거가 된다. 등록 성공 여부는 boolean 값으로 반환되며, 실패 시 예외가 발생한다.

4단계: 코어 서비스 기동

이 단계에서 실제 네트워크 통신과 관련된 서비스들이 순차적으로 시작된다.

원격 통신 계층 (mQClientAPIImpl)

Netty 기반의 비동기 통신 채널을 초기화한다. 이 객체는 이후 모든 NameServer, Broker와의 상호작용을 담당한다.

재균형 서비스 (rebalanceService)

Consumer 측에서 주로 활용되나, Producer 측에서도 메시큐 할당 정보 변경 시 관련 로직을 처리한다.

스케줄러 클러스터

여러 개의 단일 스레드 스케줄러가 동시에 가동되며, 각각 고유한 역할을 수행한다.

스케줄러초기 지연실행 주기담당 업무
NameServer 주소 갱신10초2분동적 NameServer 목록 동기화
토픽 라우트 갱신10밀리초30초토픽-브로커 매핑 정보 갱신
브로커 헬스체크1초30초비활성 브로커 제거 및 하트비트 전송
소비 오프셋 영속화10초5초컨슈머 진행 상황 디스크 기록
스레드 풀 동적 조정1분1분처리량 변화에 따른 워커 스레드 수 조정

로컬 캐시 메커니즘

Producer의 핵심 설계 철학 중 하나는 외부 의존성 최소화다. NameServer가 일시적으로 접근 불가능해도 메시지 전송이 가능하도록, 모든 메타데이터는 로컬에 복제본을 유지한다.

토픽 라우트 테이블

// 토픽별 브로커 분산 정보
private final ConcurrentMap<String, TopicRouteData> routeCache;

TopicRouteData는 특정 토픽의 메시지가 어느 브로커의 어느 큐에 분산될지를 결정하는 핵심 데이터다. 이 정보는 주기적으로 NameServer에서 가져와 갱신하며, Producer의 메시지 라우팅 전략에 직접 영향을 준다.

토픽 발행 정보 테이블

// 발행 가능 여부 및 큐 선택 전략 포함
private final ConcurrentMap<String, TopicPublishInfo> publishInfoCache;

TopicPublishInfoTopicRouteData를 기반으로 생성되며, 실제 메시지 전송 시 큐 선택 알고리즘에 활용된다. 캐시된 정보가 없을 경우에는 NameServer로 즉시 조회를 시도한다.

브로커 주소 테이블

// 브로커 클러스터 내 개별 노드 주소
private final ConcurrentMap<String, HashMap<Long, String>> brokerEndpointRegistry;

마스터/슬레이브 구분을 포함한 브로컈별 물리 주소를 관리한다. 이 테이블은 토픽 라우트 갱신 시 함께 업데이트되며, 장애 발생 시 빠른 failover를 지원한다.

토픽 엔드포인트 매핑

// 토픽 → (메시지큐 → 브로커명)
private final ConcurrentMap<String, ConcurrentMap<MessageQueue, String>> endpointMappings;

메시지 큐 수준의 세부 매핑 정보를 유지하여, 메시지 전송 대상을 정밀하게 결정할 수 있게 한다.

장애 감지 체계

Producer는 브로컈의 상태를 지속적으로 모니터링하며, 이를 위해 MQFaultStrategy를 활용한다.

FaultItem 관리

class FaultItem {
    String brokerName;
    long faultStartTimestamp;    // 장애 발생 시점
    long faultDurationNanos;     // 예상 복구 소요 시간
    volatile boolean available;  // 현재 가용 여부
}

각 브로컈별 FaultItem은 3초 간격으로 검증되며, 응답 지연이 임계치를 초과하면 해당 브로컈를 일시적으로 제외한다. 이 정보는 메시지 전송 시 선택 우선순위에 반영되어, 자연스럽게 정상 브로컈로 트래픽이 우회된다.

비동기 요청 관리

RPC 스타일의 요청-응답 패턴을 지원하기 위해, Producer는 미래의 응답을 추상화한 RequestResponseFuture를 관리한다.

// 비동기 응답 대기 테이블
private final ConcurrentHashMap<String, RequestResponseFuture> pendingRequests;

키는 요청별 UUID이며, 1초 간격으로 만료 여부를 검사한다. 타임아웃 발생 시 등록된 콜백이 호출되거나 예외가 전파된다. 이 메커니즘은 동기 호출의 내부 구현에도 사용되어, 외부적으로는 블로킹 API처럼 보이지만 내부적으로는 비동기 처리된다.

메시지 궤적 추적

선택적으로 활성화할 수 있는 TraceDispatcher는 메시지의 전체 생명주기를 기록한다. start() 호출 시 별도의 추적 채널이 초기화되며, 이를 통해 메시지가 어느 브로컈의 어느 큐를 거쳐 최종 Consumer에게 전달되었는지를 추적할 수 있다.

Producer 초기화 흐름 정리

  1. 생산자 그룹명 검증 및 네임스페이스 적용
  2. 클라이언트 인스턴스 획득 또는 재사용
  3. 생산자 인스턴스를 클라이언트에 등록
  4. 원격 통신 채널 초기화
  5. 백그라운드 스케줄러 그룹 기동
  6. 토픽 메타데이터 초기 로드
  7. 브로컈 장애 감지 서비스 활성화
  8. 초기 하트비트 전송으로 연결 검증
  9. 비동기 요청 스캐너 가동
  10. 메시지 궤적 추적 채널 준비 (옵션)

이 모든 과정이 완료되어야 serviceStateRUNNING으로 전환되며, 이후부터 send() 호출이 정상 처리된다. 각 단계에서 예외가 발생하면 상태는 START_FAILED로 고정되고, 해당 인스턴스는 재사용할 수 없게 된다.

태그: RocketMQ Message Queue Producer Netty Distributed Systems

5월 27일 10:49에 게시됨