.NET Core에서 Canal을 이용한 MySQL 데이터 동기화 구현

MySQL Binlog 설정 및 준비

Canal은 MySQL의 binlog를 기반으로 데이터 변경 사항을 실시간으로 감지하고 처리하는 오픈소스 컴포넌트입니다. 이를 사용하기 위해선 먼저 MySQL 서버에서 binlog 기능이 활성화되어 있어야 하며, 포맷은 ROW 모드로 설정되어야 합니다.

MySQL 설정 파일(my.cnf 또는 mysqld.cnf)에 다음 내용을 추가합니다:

[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server_id=1

설정 후 MySQL을 재시작하고, 아래 쿼리를 통해 정상 적용 여부를 확인할 수 있습니다:

SHOW VARIABLES LIKE '%log_bin%';
SHOW VARIABLES LIKE '%binlog_format%';
SHOW VARIABLES LIKE '%server_id%';
SHOW MASTER STATUS;

특히 log_bin 값이 ON, binlog_formatROW인지 반드시 확인해야 합니다.

계정 권한 및 접속 문제 해결

Canal은 MySQL에 별도의 클라이언트처럼 접속하여 binlog를 읽어오므로, 외부 접속 가능한 계정이 필요합니다. 초기 설정 시 canal 전용 계정으로 접근이 실패했다면, 다음과 같은 원인이 있을 수 있습니다:

  • 계정의 원격 접속 권한 부족 (GRANT 미설정)
  • 방화벽 또는 Docker 네트워크 설정 문제
  • MySQL 버전과 Canal 호환성 이슈

테스트 단계에서는 root 계정을 사용해 연결 테스트를 수행하는 것이 빠른 문제 진단에 도움이 됩니다. 이후 보안 강화를 위해 dedicated user를 생성하고 권한을 제한합니다:

CREATE USER 'canal'@'%' IDENTIFIED BY 'canal_password';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
FLUSH PRIVILEGES;

Canal 서버 구성

최신 버전의 Canal (예: v1.1.7 이상)을 GitHub 릴리스 페이지에서 다운로드하여 압축 해제합니다. 두 가지 주요 설정 파일을 수정해야 합니다.

1. conf/canal.properties

전역 설정으로, 서버 포트와 운영 모드를 지정합니다:

canal.port = 11111
canal.serverMode = tcp
canal.destinations = example
canal.auto.scan = true

2. conf/example/instance.properties

특정 MySQL 인스턴스에 대한 연결 정보를 포함합니다:

canal.instance.mysql.slaveId = 1234
canal.instance.master.address = 192.168.0.192:3306
canal.instance.dbUsername = root
canal.instance.dbPassword = 123456
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex = shopproject\\.Orders

여기서 filter.regex는 동기화 대상 테이블을 정규 표현식으로 지정하며, 위 예제는 shopproject.Orders 테이블의 변화만 추적합니다.

.NET Core 클라이언트 개발

NuGet에서 CanalSharp 패키지를 설치하여 Canal 서버와 통신합니다:

<PackageReference Include="CanalSharp" Version="1.2.0" />

다음은 메시지를 수신하고 처리하는 핵심 코드입니다:

using CanalSharp.Connections;
using CanalSharp.Protocol;
using Microsoft.Extensions.Logging;
using System.Collections.Generic;
using System.Threading.Tasks;

public class CanalDataSyncService
{
    private readonly ILogger<CanalDataSyncService> _logger;

    public CanalDataSyncService(ILogger<CanalDataSyncService> logger)
    {
        _logger = logger;
    }

    public async Task StartListeningAsync()
    {
        var options = new SimpleCanalOptions("127.0.0.1", 11111, "example");
        var connection = new SimpleCanalConnection(options, _logger);

        await connection.ConnectAsync();
        await connection.SubscribeAsync("shopproject\\.Orders");

        while (true)
        {
            try
            {
                var message = await connection.GetWithoutAckAsync(1024);
                if (message.Id == 0 || message.Entries.Count == 0)
                {
                    await Task.Delay(100);
                    continue;
                }

                ProcessEntries(message.Entries);
                await connection.AckAsync(message.Id);
            }
            catch (System.Exception ex)
            {
                _logger.LogError(ex, "메시지 처리 중 오류 발생");
                await Task.Delay(5000);
            }
        }
    }

    private void ProcessEntries(IList<Entry> entries)
    {
        foreach (var entry in entries)
        {
            if (entry.EntryType != EntryType.Rowdata) continue;

            var rowChange = RowChange.Parser.ParseFrom(entry.StoreValue);
            var tableName = entry.Header.TableName;

            foreach (var rowData in rowChange.RowDatas)
            {
                switch (rowChange.EventType)
                {
                    case EventType.Insert:
                        HandleInsert(rowData.AfterColumns, tableName);
                        break;
                    case EventType.Delete:
                        HandleDelete(rowData.BeforeColumns, tableName);
                        break;
                    case EventType.Update:
                        HandleUpdate(rowData.BeforeColumns, rowData.AfterColumns, tableName);
                        break;
                }
            }
        }
    }

    private void HandleInsert(IEnumerable<Column> columns, string table)
    {
        foreach (var col in columns)
        {
            _logger.LogInformation($"[INSERT] {table}.{col.Name} = {col.Value}");
        }
    }

    private void HandleDelete(IEnumerable<Column> columns, string table)
    {
        foreach (var col in columns)
        {
            _logger.LogInformation($"[DELETE] {table}.{col.Name} = {col.Value}");
        }
    }

    private void HandleUpdate(IEnumerable<Column> beforeCols, IEnumerable<Column> afterCols, string table)
    {
        var updated = new Dictionary<string, (string Old, string New)>();

        var beforeDict = beforeCols.ToDictionary(c => c.Name, c => c.Value);
        var afterDict = afterCols.ToDictionary(c => c.Name, c => c.Value);

        foreach (var kvp in afterDict)
        {
            if (!beforeDict.TryGetValue(kvp.Key, out var oldValue) || oldValue != kvp.Value)
            {
                updated[kvp.Key] = (oldValue, kvp.Value);
            }
        }

        foreach (var (col, vals) in updated)
        {
            _logger.LogInformation($"[UPDATE] {table}.{col}: '{vals.Old}' → '{vals.New}'");
        }
    }
}

서비스 등록 및 실행

ASP.NET Core 애플리케이션의 Startup.cs 또는 Program.cs에서 백그라운드 서비스로 등록합니다:

services.AddHostedService<CanalDataSyncService>();

또는 Startup 시점에 직접 실행:

app.ApplicationServices.GetService<CanalDataSyncService>()?.StartListeningAsync();

문제 해결 팁

  • SocketTimeoutException: 네트워크 지연 또는 MySQL/CANAL 간 핑 타임아웃 문제일 수 있음. canal.instance.network.soTimeout 값을 조정하거나 네트워크 상태 점검.
  • Authentication Error: 계정 권한 부족 또는 암호화 방식 불일치. MySQL 8+에서는 caching_sha2_password 대신 mysql_native_password 사용 고려.
  • Version Compatibility: 최신 MySQL에서는 최신 Canal 버전 사용 권장. v1.1.5 이하 버전은 신규 환경과 호환되지 않을 수 있음.

실제 운영 환경에서는 Kafka나 RocketMQ와 연동하여 메시지 큐 기반의 안정적인 전달 구조를 설계하는 것이 바람직합니다.

태그: Canal .NET Core MySQL binlog 데이터 동기화

7월 5일 17:52에 게시됨