Debezium 기반 임베디드 CDC 구현 및 활용

기술 개요와 핵심 장점

Debezium의 임베디드 CDC(Change Data Capture)는 Kafka 클러스터 없이도 데이터베이스 변경 이벤트를 실시간으로 캡처할 수 있는 경량 솔루션입니다. 애플리케이션 내부에 직접 엔진을 포함시켜 사용하는 방식으로 다음과 같은 주요 이점을 제공합니다:

  • 저지연 처리: 밀리초 단위로 DB 변경 사항을 감지하여 실시간성이 중요한 시스템에 적합합니다.
  • 리소스 절약: Kafka 인프라 구성 없이도 작동하므로 자원 제한 환경에서 유리합니다.
  • 유연한 통합: 애플리케이션과 밀접하게 결합되어 복잡한 외부 시스템 의존성을 줄입니다.

아키텍처 구성 요소

임베디드 모드는 모듈식 설계로 이루어져 있으며 주요 구성은 아래와 같습니다:

  1. 커넥터 계층

    MySQL, PostgreSQL, SQL Server 등 다양한 RDBMS 지원하며 각각의 로그 분석 메커니즘을 통해 변경 내용을 추출합니다.

    • MySQL 커넥터: mysql-binlog-connector-java 라이브러리를 이용해 바이너리 로그(binlog)를 파싱합니다.
    • PostgreSQL 커넥터: wal2json 같은 논리 디코딩 플러그인을 사용하여 WAL(Write-Ahead Log)에서 변경 사항을 읽어옵니다.
  2. 이벤트 처리 파이프라인

    변경 이벤트는 다음 단계를 거쳐 처리됩니다:

    • 오프셋 관리: 마지막 처리 지점을 기록하여 장애 발생 시에도 중단된 위치부터 재개할 수 있습니다.
    • SMT(Message Transformation): 필드 마스킹, 토픽 라우팅, 형식 변환 등의 작업을 수행할 수 있습니다.
    • 사용자 정의 핸들러: 이벤트 수신 후 캐시 갱신이나 비즈니스 로직 실행과 같은 커스텀 동작을 정의할 수 있습니다.
  3. 스레드 모델

    성능 최적화를 위해 두 가지 스레드 풀을 활용합니다:

    • 폴링 스레드 풀: 데이터베이스 접속 및 이벤트 조회를 담당합니다.
    • 처리 스레드 풀: 이벤트 변환 및 콜백 함수 호출을 병렬로 수행하여 처리량을 높입니다.

실제 적용 예시: MySQL → Redis 동기화

다음 예제에서는 임베디드 Debezium을 활용해 MySQL의 주문 테이블 변경 사항을 Redis에 반영하는 방법을 설명합니다.

1. 의존성 설정(Maven)

<dependencies>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-embedded</artifactId>
        <version>1.9.7.Final</version>
    </dependency>
    <dependency>
        <groupId>io.debezium</groupId>
        <artifactId>debezium-connector-mysql</artifactId>
        <version>1.9.7.Final</version>
    </dependency>
    <dependency>
        <groupId>mysql</groupId>
        <artifactId>mysql-connector-java</artifactId>
        <version>8.0.28</version>
    </dependency>
    <dependency>
        <groupId>redis.clients</groupId>
        <artifactId>jedis</artifactId>
        <version>4.3.1</version>
    </dependency>
</dependencies>

2. 엔진 초기화 코드

import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import org.apache.kafka.connect.source.SourceRecord;
import redis.clients.jedis.Jedis;

import java.util.Properties;

public class MysqlToRedisSync {

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("name", "mysql-source");
        props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
        props.setProperty("database.hostname", "localhost");
        props.setProperty("database.port", "3306");
        props.setProperty("database.user", "debezium_user");
        props.setProperty("database.password", "secure_password");
        props.setProperty("database.server.id", "184054");
        props.setProperty("database.server.name", "my_app_server");
        props.setProperty("database.include.list", "order_db");
        props.setProperty("table.include.list", "order_db.orders");
        props.setProperty("database.history",
                "io.debezium.relational.history.FileDatabaseHistory");
        props.setProperty("database.history.file.filename", "/tmp/history_file.dat");

        DebeziumEngine<ChangeEvent<String, String>> engine =
                DebeziumEngine.create(Json.class)
                        .using(props)
                        .notifying(event -> {
                            SourceRecord record = event.record();
                            String tableName = (String) record.sourceOffset().get("table");
                            if ("orders".equals(tableName)) {
                                String keyJson = (String) record.key();
                                String valueJson = (String) record.value();
                                syncToCache(keyJson, valueJson);
                            }
                        })
                        .build();

        engine.run();
    }

    private static void syncToCache(String keyData, String valueData) {
        try (Jedis jedis = new Jedis("localhost", 6379)) {
            jedis.set("order:" + keyData.hashCode(), valueData);
        } catch (Exception ex) {
            ex.printStackTrace();
        }
    }
}

3. MySQL 설정

# /etc/mysql/conf.d/debezium.cnf
[mysqld]
log-bin=mysql-bin
binlog-format=row
server-id=1
binlog-do-db=order_db

4. 동작 결과 확인

  • 새로운 주문 생성: INSERT INTO orders (id, status) VALUES (1, 'CREATED') 실행 시 Redis에 order:1 키가 생성되며 값은 {"id":1,"status":"CREATED"}.
  • 주문 상태 수정: UPDATE orders SET status='SHIPPED' WHERE id=1 수행 시 해당 키의 값이 갱신됨.

중요 설정 옵션

옵션명 설명
database.server.name Kafka 토픽 이름에 사용되는 서버 식별자 (임베디드 모드에서는 내부 용도).
table.include.list 감시 대상 테이블 목록 (정규 표현식 지원 가능).
snapshot.mode 초기 스냅샷 방식 (initial=전체 동기화, never=증분만).
tombstones.on.delete 삭제 이벤트에 대해 tombstone 레코드 생성 여부 (기본값 false).
record.processing.threads 이벤트 처리 스레드 수 (기본값 CPU 코어 수).

적용 범위 및 고려사항

  • 추천 활용 사례:
    • MySQL → Redis 캐시 자동 갱신
    • 마이크로서비스 간 데이터 연동 (예: 주문 변경 알림 발송)
    • 모든 데이터 변경 기록 보관 (감사 로깅)
  • 주의사항:
    • 트랜잭션 일관성: 다중 테이블 간 트랜잭션 원자성 보장 불가능하므로 비즈니스 레벨에서 처리 필요.
    • 성능 조정: 고부하 환경에서는 max.batch.size, 스레드 풀 크기 등을 조절해야 함.
    • 오류 처리: ErrorHandler 인터페이스 구현을 통해 연결 실패 등의 예외 상황 대응 필요.

런타임 중 테이블 모니터링 동적 변경

Debezium 임베디드 모드 자체적으로는 실행 중에 감지할 테이블 목록을 즉시 수정하는 기능을 제공하지 않습니다. 그러나 특정 전략을 통해 일부 유연성을 확보할 수 있습니다.

I. 구성 제약 조건

엔진 시작 시점에 아래 설정들이 고정되며 이후 변경 불가:

  • table.include.list: 감지 대상 테이블 지정 (schema.table 형식).
  • database.include.list: 감지 대상 데이터베이스 지정.

II. 동적 모니터링 구현 방법

방법 1: 엔진 재시작

  1. 새로운 테이블 목록으로 Properties 객체 업데이트.
  2. 기존 engine.close() 호출하여 리소스 해제.
  3. 새로운 설정으로 다시 엔진 생성 및 실행.
// 초기 설정
Properties configProps = new Properties();
configProps.setProperty("table.include.list", "db1.tableA,db1.tableB");
DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = DebeziumEngine.create(Json.class)
    .using(configProps)
    .notifying(this::processEvent)
    .build();

// 테이블 추가 후 재시작
configProps.setProperty("table.include.list", "db1.tableA,db1.tableC");
debeziumEngine.close();
debeziumEngine = DebeziumEngine.create(Json.class)
    .using(configProps)
    .notifying(this::processEvent)
    .build();
debeziumEngine.run();

방법 2: 외부 도구와 연동

  • 메타데이터 탐색: information_schema.tables 같은 시스템 테이블을 주기적으로 조회하여 신규 테이블 탐지.
  • 자동 구성 갱신: 새로운 테이블 발견 시 관련 설정 파일 또는 외부 설정 저장소를 업데이트하고 엔진 재시작 트리거.

방법 3: 정규표현식 활용

특정 패턴에 맞는 모든 테이블을 한 번에 감시하도록 설정:

configProps.setProperty("table.include.list", "db1\\..*"); // db1 스키마 하위 모든 테이블

⚠️ 이 경우 불필요한 테이블도 포함될 수 있으므로 비즈니스 로직에서 필터링 필요.

III. 고려사항

  1. 재시작 오버헤드: 빈번한 엔진 재시작은 성능 저하 우려 있으므로 변경 빈도에 따라 적절히 선택.
  2. 오프셋 유지: 재시작 시 오프셋 정보(offset.storage)가 유실되지 않도록 안정적인 저장소 사용 권장.
  3. 트랜잭션 정합성: 새로 추가된 테이블에 대한 이벤트 누락 방지를 위해 적절한 검증 필요.

IV. 적용 가이드라인

  • 낮은 변경 빈도: 매일/매주 간격으로 테이블 목록 변경 시 재시작 방식 충분히 활용 가능.
  • 높은 변경 빈도: 자동화된 설정 갱신 및 재시작 메커니즘 구축 필요.
  • 복잡한 요구사항: Kafka Connect 모드를 고려하고 REST API를 통해 동적으로 커넥터 설정 관리 가능.

태그: Debezium CDC Embedded Engine MySQL Redis

6월 27일 20:01에 게시됨