기술 개요와 핵심 장점
Debezium의 임베디드 CDC(Change Data Capture)는 Kafka 클러스터 없이도 데이터베이스 변경 이벤트를 실시간으로 캡처할 수 있는 경량 솔루션입니다. 애플리케이션 내부에 직접 엔진을 포함시켜 사용하는 방식으로 다음과 같은 주요 이점을 제공합니다:
- 저지연 처리: 밀리초 단위로 DB 변경 사항을 감지하여 실시간성이 중요한 시스템에 적합합니다.
- 리소스 절약: Kafka 인프라 구성 없이도 작동하므로 자원 제한 환경에서 유리합니다.
- 유연한 통합: 애플리케이션과 밀접하게 결합되어 복잡한 외부 시스템 의존성을 줄입니다.
아키텍처 구성 요소
임베디드 모드는 모듈식 설계로 이루어져 있으며 주요 구성은 아래와 같습니다:
- 커넥터 계층
MySQL, PostgreSQL, SQL Server 등 다양한 RDBMS 지원하며 각각의 로그 분석 메커니즘을 통해 변경 내용을 추출합니다.
- MySQL 커넥터:
mysql-binlog-connector-java라이브러리를 이용해 바이너리 로그(binlog)를 파싱합니다. - PostgreSQL 커넥터:
wal2json같은 논리 디코딩 플러그인을 사용하여 WAL(Write-Ahead Log)에서 변경 사항을 읽어옵니다.
- MySQL 커넥터:
- 이벤트 처리 파이프라인
변경 이벤트는 다음 단계를 거쳐 처리됩니다:
- 오프셋 관리: 마지막 처리 지점을 기록하여 장애 발생 시에도 중단된 위치부터 재개할 수 있습니다.
- SMT(Message Transformation): 필드 마스킹, 토픽 라우팅, 형식 변환 등의 작업을 수행할 수 있습니다.
- 사용자 정의 핸들러: 이벤트 수신 후 캐시 갱신이나 비즈니스 로직 실행과 같은 커스텀 동작을 정의할 수 있습니다.
- 스레드 모델
성능 최적화를 위해 두 가지 스레드 풀을 활용합니다:
- 폴링 스레드 풀: 데이터베이스 접속 및 이벤트 조회를 담당합니다.
- 처리 스레드 풀: 이벤트 변환 및 콜백 함수 호출을 병렬로 수행하여 처리량을 높입니다.
실제 적용 예시: MySQL → Redis 동기화
다음 예제에서는 임베디드 Debezium을 활용해 MySQL의 주문 테이블 변경 사항을 Redis에 반영하는 방법을 설명합니다.
1. 의존성 설정(Maven)
<dependencies>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-embedded</artifactId>
<version>1.9.7.Final</version>
</dependency>
<dependency>
<groupId>io.debezium</groupId>
<artifactId>debezium-connector-mysql</artifactId>
<version>1.9.7.Final</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>8.0.28</version>
</dependency>
<dependency>
<groupId>redis.clients</groupId>
<artifactId>jedis</artifactId>
<version>4.3.1</version>
</dependency>
</dependencies>
2. 엔진 초기화 코드
import io.debezium.config.Configuration;
import io.debezium.embedded.EmbeddedEngine;
import io.debezium.engine.ChangeEvent;
import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;
import org.apache.kafka.connect.source.SourceRecord;
import redis.clients.jedis.Jedis;
import java.util.Properties;
public class MysqlToRedisSync {
public static void main(String[] args) {
Properties props = new Properties();
props.setProperty("name", "mysql-source");
props.setProperty("connector.class", "io.debezium.connector.mysql.MySqlConnector");
props.setProperty("database.hostname", "localhost");
props.setProperty("database.port", "3306");
props.setProperty("database.user", "debezium_user");
props.setProperty("database.password", "secure_password");
props.setProperty("database.server.id", "184054");
props.setProperty("database.server.name", "my_app_server");
props.setProperty("database.include.list", "order_db");
props.setProperty("table.include.list", "order_db.orders");
props.setProperty("database.history",
"io.debezium.relational.history.FileDatabaseHistory");
props.setProperty("database.history.file.filename", "/tmp/history_file.dat");
DebeziumEngine<ChangeEvent<String, String>> engine =
DebeziumEngine.create(Json.class)
.using(props)
.notifying(event -> {
SourceRecord record = event.record();
String tableName = (String) record.sourceOffset().get("table");
if ("orders".equals(tableName)) {
String keyJson = (String) record.key();
String valueJson = (String) record.value();
syncToCache(keyJson, valueJson);
}
})
.build();
engine.run();
}
private static void syncToCache(String keyData, String valueData) {
try (Jedis jedis = new Jedis("localhost", 6379)) {
jedis.set("order:" + keyData.hashCode(), valueData);
} catch (Exception ex) {
ex.printStackTrace();
}
}
}
3. MySQL 설정
# /etc/mysql/conf.d/debezium.cnf
[mysqld]
log-bin=mysql-bin
binlog-format=row
server-id=1
binlog-do-db=order_db
4. 동작 결과 확인
- 새로운 주문 생성:
INSERT INTO orders (id, status) VALUES (1, 'CREATED')실행 시 Redis에order:1키가 생성되며 값은{"id":1,"status":"CREATED"}. - 주문 상태 수정:
UPDATE orders SET status='SHIPPED' WHERE id=1수행 시 해당 키의 값이 갱신됨.
중요 설정 옵션
| 옵션명 | 설명 |
|---|---|
database.server.name |
Kafka 토픽 이름에 사용되는 서버 식별자 (임베디드 모드에서는 내부 용도). |
table.include.list |
감시 대상 테이블 목록 (정규 표현식 지원 가능). |
snapshot.mode |
초기 스냅샷 방식 (initial=전체 동기화, never=증분만). |
tombstones.on.delete |
삭제 이벤트에 대해 tombstone 레코드 생성 여부 (기본값 false). |
record.processing.threads |
이벤트 처리 스레드 수 (기본값 CPU 코어 수). |
적용 범위 및 고려사항
- 추천 활용 사례:
- MySQL → Redis 캐시 자동 갱신
- 마이크로서비스 간 데이터 연동 (예: 주문 변경 알림 발송)
- 모든 데이터 변경 기록 보관 (감사 로깅)
- 주의사항:
- 트랜잭션 일관성: 다중 테이블 간 트랜잭션 원자성 보장 불가능하므로 비즈니스 레벨에서 처리 필요.
- 성능 조정: 고부하 환경에서는
max.batch.size, 스레드 풀 크기 등을 조절해야 함. - 오류 처리:
ErrorHandler인터페이스 구현을 통해 연결 실패 등의 예외 상황 대응 필요.
런타임 중 테이블 모니터링 동적 변경
Debezium 임베디드 모드 자체적으로는 실행 중에 감지할 테이블 목록을 즉시 수정하는 기능을 제공하지 않습니다. 그러나 특정 전략을 통해 일부 유연성을 확보할 수 있습니다.
I. 구성 제약 조건
엔진 시작 시점에 아래 설정들이 고정되며 이후 변경 불가:
table.include.list: 감지 대상 테이블 지정 (schema.table형식).database.include.list: 감지 대상 데이터베이스 지정.
II. 동적 모니터링 구현 방법
방법 1: 엔진 재시작
- 새로운 테이블 목록으로
Properties객체 업데이트. - 기존
engine.close()호출하여 리소스 해제. - 새로운 설정으로 다시 엔진 생성 및 실행.
// 초기 설정
Properties configProps = new Properties();
configProps.setProperty("table.include.list", "db1.tableA,db1.tableB");
DebeziumEngine<ChangeEvent<String, String>> debeziumEngine = DebeziumEngine.create(Json.class)
.using(configProps)
.notifying(this::processEvent)
.build();
// 테이블 추가 후 재시작
configProps.setProperty("table.include.list", "db1.tableA,db1.tableC");
debeziumEngine.close();
debeziumEngine = DebeziumEngine.create(Json.class)
.using(configProps)
.notifying(this::processEvent)
.build();
debeziumEngine.run();
방법 2: 외부 도구와 연동
- 메타데이터 탐색:
information_schema.tables같은 시스템 테이블을 주기적으로 조회하여 신규 테이블 탐지. - 자동 구성 갱신: 새로운 테이블 발견 시 관련 설정 파일 또는 외부 설정 저장소를 업데이트하고 엔진 재시작 트리거.
방법 3: 정규표현식 활용
특정 패턴에 맞는 모든 테이블을 한 번에 감시하도록 설정:
configProps.setProperty("table.include.list", "db1\\..*"); // db1 스키마 하위 모든 테이블
⚠️ 이 경우 불필요한 테이블도 포함될 수 있으므로 비즈니스 로직에서 필터링 필요.
III. 고려사항
- 재시작 오버헤드: 빈번한 엔진 재시작은 성능 저하 우려 있으므로 변경 빈도에 따라 적절히 선택.
- 오프셋 유지: 재시작 시 오프셋 정보(
offset.storage)가 유실되지 않도록 안정적인 저장소 사용 권장. - 트랜잭션 정합성: 새로 추가된 테이블에 대한 이벤트 누락 방지를 위해 적절한 검증 필요.
IV. 적용 가이드라인
- 낮은 변경 빈도: 매일/매주 간격으로 테이블 목록 변경 시 재시작 방식 충분히 활용 가능.
- 높은 변경 빈도: 자동화된 설정 갱신 및 재시작 메커니즘 구축 필요.
- 복잡한 요구사항: Kafka Connect 모드를 고려하고 REST API를 통해 동적으로 커넥터 설정 관리 가능.