Azure Event Hubs에서 이벤트 수신 처리 구현

이전 포스트에서는 Azure Event Hubs로 이벤트 데이터를 전송하는 방법을 코드를 통해 살펴보았습니다. 이번에는 그 반대 과정인, Event Hubs로부터 이벤트를 수신하고 처리하는 애플리케이션을 구성하는 방법을 설명합니다.

1. 콘솔 애플리케이션 생성

Visual Studio에서 새로운 .NET 프레임워크 기반의 콘솔 애플리케이션을 생성합니다. 예: EventHubMessageReceiver.

2. 필요한 NuGet 패키지 설치

아래 두 개의 NuGet 패키지를 프로젝트에 추가합니다.

  • Microsoft.Azure.EventHubs
  • Microsoft.Azure.EventHubs.Processor

3. 이벤트 처리기 클래스 구현

이벤트 수신 및 처리 로직은 IEventProcessor 인터페이스를 구현하여 정의합니다. 다음은 사용자 정의 이벤트 처리기 예제입니다.

using Microsoft.Azure.EventHubs;
using Microsoft.Azure.EventHubs.Processor;
using System;
using System.Collections.Generic;
using System.Text;
using System.Threading.Tasks;

public class CustomEventProcessor : IEventProcessor
{
    public Task OpenAsync(PartitionContext context)
    {
        Console.WriteLine($"수신기 시작됨 - 파티션 ID: {context.PartitionId}");
        return Task.CompletedTask;
    }

    public Task CloseAsync(PartitionContext context, CloseReason reason)
    {
        Console.WriteLine($"수신기 종료됨 - 파티션: {context.PartitionId}, 이유: {reason}");
        return Task.CompletedTask;
    }

    public Task ProcessErrorAsync(PartitionContext context, Exception error)
    {
        Console.WriteLine($"오류 발생 - 파티션 {context.PartitionId}: {error.Message}");
        return Task.CompletedTask;
    }

    public async Task ProcessEventsAsync(PartitionContext context, IEnumerable<EventData> events)
    {
        foreach (var eventData in events)
        {
            string messageBody = Encoding.UTF8.GetString(eventData.Body.ToArray());
            Console.WriteLine($"[수신 완료] 파티션: {context.PartitionId}, 메시지: {messageBody}");
        }

        // 체크포인트를 설정하여 다음 수신 위치 저장
        await context.CheckpointAsync();
    }
}

4. 주 실행 로직 작성

메인 애플리케이션에서는 EventProcessorHost를 사용해 위에서 정의한 처리기를 등록하고, 지속적으로 이벤트를 수신하도록 구성합니다.

먼저, 아래와 같은 연결 정보 상수들을 정의합니다. 실제 값으로 교체해야 합니다.

private const string EventHubConnectionString = "Endpoint=sb://your-namespace.servicebus.windows.net/;SharedAccessKeyName=...";
private const string EventHubName = "your-eventhub-name";
private const string ConsumerGroupName = "$Default";

private const string StorageAccountName = "yourstorageaccount";
private const string StorageAccountKey = "your-storage-account-key";
private const string ContainerName = "eventprocessor"; // Blob 컨테이너 이름

private static readonly string BlobStorageConnectionString = 
    $"DefaultEndpointsProtocol=https;AccountName={StorageAccountName};AccountKey={StorageAccountKey};EndpointSuffix=core.windows.net";

5. 이벤트 수신기 호스트 등록

다음과 같이 비동기 메서드를 통해 이벤트 처리기를 등록하고 수신을 시작합니다.

private static async Task StartReceivingAsync()
{
    var processorHost = new EventProcessorHost(
        EventHubName,
        ConsumerGroupName,
        EventHubConnectionString,
        BlobStorageConnectionString,
        ContainerName);

    Console.WriteLine("이벤트 처리기 등록 중...");

    await processorHost.RegisterEventProcessorAsync<CustomEventProcessor>();

    Console.WriteLine("이벤트 수신 대기 중... 종료하려면 Enter 키를 누르세요.");
    Console.ReadLine();

    Console.WriteLine("처리기 해제 중...");
    await processorHost.UnregisterEventProcessorAsync();
}

6. Main 메서드

애플리케이션 진입점에서 비동기 작업을 동기적으로 실행합니다.

static void Main(string[] args)
{
    StartReceivingAsync().GetAwaiter().GetResult();
}

7. 필수 구성 요소: Azure Storage

EventProcessorHost는 내부적으로 상태 관리를 위해 Azure Blob Storage를 사용합니다. 따라서 반드시 유효한 스토리지 계정과 컨테이너가 사전에 생성되어 있어야 하며, 해당 컨테이너 이름은 코드에 정확히 일치해야 합니다.

또한, Event Hub 인스턴스의 Consumer Groups 설정도 확인하고, 필요 시 별도의 그룹을 생성할 수 있습니다.

이제 애플리케이션을 실행하면, 이전에 전송된 이벤트 메시지들이 실시간으로 출력되는 것을 확인할 수 있습니다.

태그: Azure Event Hubs EventProcessorHost IEventProcessor Blob Storage C#

6월 10일 16:44에 게시됨