RocketMQ 메시지 플랫폼에서 Producer는 메시지 생성 및 전송을 담당하는 핵심 구성 요소다. 이번 글에서는 Producer의 생성부터 실제 운영에 필요한 내부 메커니즘까지 단계별로 살펴본다.
Producer 기본 구조
Producer를 사용하려면 먼저 DefaultMQProducer 인스턴스를 생성하고 필수 설정을 완료한 후 start() 메서드를 호출해야 한다. 가장 간단한 사용 예시는 다음과 같다.
DefaultMQProducer sender = new DefaultMQProducer("order_service_group");
sender.setNamesrvAddr("localhost:9876");
sender.start();
Message payload = new Message("order_topic", "create", "order-1001".getBytes());
SendResult result = sender.send(payload);
여기서 start() 호출이 실제로 어떤 일을 벌이는지가 핵심이다. 표면적으로는 단순한 초기화처럼 보이지만, 내부적으로는 다수의 백그라운드 서비스가 가동된다.
초기화 단계별 파헤치기
1단계: 서비스 상태 관리
Producer는 상태 머신 패턴으로 동작하며, 다음 네 가지 상태를 가진다.
| 상태 | 의미 |
|---|---|
CREATE_JUST | 객체만 생성된 초기 상태 |
RUNNING | 정상 운영 중 |
START_FAILED | 시작 과정에서 예외 발생 |
SHUTDOWN_ALREADY | 종료 완료 |
상태 전이는 단방향으로 설계되어 있어, 한번 실패한 인스턴스를 재사용하지 못하도록 방어적으로 구현되어 있다.
2단계: 클라이언트 인스턴스 획득
각 Producer는 독립적으로 네트워크 연결을 관리하지 않는다. 대신 MQClientManager라는 싱글턴 관리자를 통해 MQClientInstance를 공유받는다.
// 클라이언트 식별자 생성 규칙
String clientId = clientIP + "@" + instanceName;
// 인스턴스는 전역 캐시에서 관리
ConcurrentMap<String, MQClientInstance> instanceCache;
식별자에 IP와 나노타임 기반의 고유 문자열을 조합하는 이유는, 동일 JVM 내에서 여러 Producer/Consumer가 각각 독립적인 통신 채널을 유지해야 하기 때문이다.
3단계: 생산자 등록
획득한 클라이언트 인스턴스 내부에는 생산자 테이블이 존재한다.
// MQClientInstance 내부
private final ConcurrentMap<String, MQProducerInner> producerRegistry;
여기서 키는 생산자 그룹명이며, 동일 그룹 내에서 인스턴스를 식별하는 근거가 된다. 등록 성공 여부는 boolean 값으로 반환되며, 실패 시 예외가 발생한다.
4단계: 코어 서비스 기동
이 단계에서 실제 네트워크 통신과 관련된 서비스들이 순차적으로 시작된다.
원격 통신 계층 (mQClientAPIImpl)
Netty 기반의 비동기 통신 채널을 초기화한다. 이 객체는 이후 모든 NameServer, Broker와의 상호작용을 담당한다.
재균형 서비스 (rebalanceService)
Consumer 측에서 주로 활용되나, Producer 측에서도 메시큐 할당 정보 변경 시 관련 로직을 처리한다.
스케줄러 클러스터
여러 개의 단일 스레드 스케줄러가 동시에 가동되며, 각각 고유한 역할을 수행한다.
| 스케줄러 | 초기 지연 | 실행 주기 | 담당 업무 |
|---|---|---|---|
| NameServer 주소 갱신 | 10초 | 2분 | 동적 NameServer 목록 동기화 |
| 토픽 라우트 갱신 | 10밀리초 | 30초 | 토픽-브로커 매핑 정보 갱신 |
| 브로커 헬스체크 | 1초 | 30초 | 비활성 브로커 제거 및 하트비트 전송 |
| 소비 오프셋 영속화 | 10초 | 5초 | 컨슈머 진행 상황 디스크 기록 |
| 스레드 풀 동적 조정 | 1분 | 1분 | 처리량 변화에 따른 워커 스레드 수 조정 |
로컬 캐시 메커니즘
Producer의 핵심 설계 철학 중 하나는 외부 의존성 최소화다. NameServer가 일시적으로 접근 불가능해도 메시지 전송이 가능하도록, 모든 메타데이터는 로컬에 복제본을 유지한다.
토픽 라우트 테이블
// 토픽별 브로커 분산 정보
private final ConcurrentMap<String, TopicRouteData> routeCache;
TopicRouteData는 특정 토픽의 메시지가 어느 브로커의 어느 큐에 분산될지를 결정하는 핵심 데이터다. 이 정보는 주기적으로 NameServer에서 가져와 갱신하며, Producer의 메시지 라우팅 전략에 직접 영향을 준다.
토픽 발행 정보 테이블
// 발행 가능 여부 및 큐 선택 전략 포함
private final ConcurrentMap<String, TopicPublishInfo> publishInfoCache;
TopicPublishInfo는 TopicRouteData를 기반으로 생성되며, 실제 메시지 전송 시 큐 선택 알고리즘에 활용된다. 캐시된 정보가 없을 경우에는 NameServer로 즉시 조회를 시도한다.
브로커 주소 테이블
// 브로커 클러스터 내 개별 노드 주소
private final ConcurrentMap<String, HashMap<Long, String>> brokerEndpointRegistry;
마스터/슬레이브 구분을 포함한 브로컈별 물리 주소를 관리한다. 이 테이블은 토픽 라우트 갱신 시 함께 업데이트되며, 장애 발생 시 빠른 failover를 지원한다.
토픽 엔드포인트 매핑
// 토픽 → (메시지큐 → 브로커명)
private final ConcurrentMap<String, ConcurrentMap<MessageQueue, String>> endpointMappings;
메시지 큐 수준의 세부 매핑 정보를 유지하여, 메시지 전송 대상을 정밀하게 결정할 수 있게 한다.
장애 감지 체계
Producer는 브로컈의 상태를 지속적으로 모니터링하며, 이를 위해 MQFaultStrategy를 활용한다.
FaultItem 관리
class FaultItem {
String brokerName;
long faultStartTimestamp; // 장애 발생 시점
long faultDurationNanos; // 예상 복구 소요 시간
volatile boolean available; // 현재 가용 여부
}
각 브로컈별 FaultItem은 3초 간격으로 검증되며, 응답 지연이 임계치를 초과하면 해당 브로컈를 일시적으로 제외한다. 이 정보는 메시지 전송 시 선택 우선순위에 반영되어, 자연스럽게 정상 브로컈로 트래픽이 우회된다.
비동기 요청 관리
RPC 스타일의 요청-응답 패턴을 지원하기 위해, Producer는 미래의 응답을 추상화한 RequestResponseFuture를 관리한다.
// 비동기 응답 대기 테이블
private final ConcurrentHashMap<String, RequestResponseFuture> pendingRequests;
키는 요청별 UUID이며, 1초 간격으로 만료 여부를 검사한다. 타임아웃 발생 시 등록된 콜백이 호출되거나 예외가 전파된다. 이 메커니즘은 동기 호출의 내부 구현에도 사용되어, 외부적으로는 블로킹 API처럼 보이지만 내부적으로는 비동기 처리된다.
메시지 궤적 추적
선택적으로 활성화할 수 있는 TraceDispatcher는 메시지의 전체 생명주기를 기록한다. start() 호출 시 별도의 추적 채널이 초기화되며, 이를 통해 메시지가 어느 브로컈의 어느 큐를 거쳐 최종 Consumer에게 전달되었는지를 추적할 수 있다.
Producer 초기화 흐름 정리
- 생산자 그룹명 검증 및 네임스페이스 적용
- 클라이언트 인스턴스 획득 또는 재사용
- 생산자 인스턴스를 클라이언트에 등록
- 원격 통신 채널 초기화
- 백그라운드 스케줄러 그룹 기동
- 토픽 메타데이터 초기 로드
- 브로컈 장애 감지 서비스 활성화
- 초기 하트비트 전송으로 연결 검증
- 비동기 요청 스캐너 가동
- 메시지 궤적 추적 채널 준비 (옵션)
이 모든 과정이 완료되어야 serviceState가 RUNNING으로 전환되며, 이후부터 send() 호출이 정상 처리된다. 각 단계에서 예외가 발생하면 상태는 START_FAILED로 고정되고, 해당 인스턴스는 재사용할 수 없게 된다.