PostgreSQL 데이터 변경 구독 및 소비 위치 관리 기술

PostgreSQL 논리적 디코딩을 통한 데이터 변경 구독 구현 가이드

PostgreSQL의 논리적 디코딩 기능을 활용해 데이터 변경 이벤트를 구독하고 소비 위치를 관리하는 방법을 정리합니다. wal2json 또는 pgoutput 플러그인을 사용해 MySQL binlog와 유사한 기능을 구현할 수 있습니다.

1. PostgreSQL 설정 준비

1.1 커널 파라미터 조정


# 논리적 디코딩 활성화
wal_level = logical

# 복제 스레드 최대 수 설정
max_wal_senders = 8

# WAL 파일 보존 용량 설정
wal_keep_size = 512MB

1.2 복제 슬롯 생성


-- wal2json 플러그인 사용 시
SELECT * FROM pg_create_logical_replication_slot('replicationSlotName', 'wal2json');

-- pgoutput 플러그인 사용 시
SELECT * FROM pg_create_logical_replication_slot('replicationSlotName', 'pgoutput');

2. Java 구현 전략

방안 1: JDBC + 논리적 디코딩


import org.postgresql.PGConnection;
import org.postgresql.replication.PGReplicationStream;

public class PostgresCDCConsumer {

    private static final String DB_URL = "jdbc:postgresql://localhost:5432/dbname";
    private static final String SLOT_NAME = "replicationSlotName";
    private static final String POSITION_FILE = "position.txt";

    public static void main(String[] args) {
        try (Connection conn = DriverManager.getConnection(DB_URL, "user", "password")) {
            PGConnection pgConn = conn.unwrap(PGConnection.class);
            
            // 이전 위치 조회
            String lastLsn = getLastLsnFromStorage();
            
            // 복제 스트림 생성
            PGReplicationStream stream = pgConn
                .getReplicationAPI()
                .replicationStream()
                .logical()
                .withSlotName(SLOT_NAME)
                .withStartPosition(lastLsn != null ? 
                    LogSequenceNumber.valueOf(lastLsn) : 
                    LogSequenceNumber.valueOf("0/0"))
                .start();

            // 소비 스레드 실행
            new Thread(() -> {
                try {
                    while (true) {
                        ByteBuffer buffer = stream.read(1, TimeUnit.SECONDS);
                        if (buffer == null) continue;
                        
                        // 이벤트 처리
                        processChange(new String(buffer.array(), offset));
                        saveLsnToStorage(stream.getLastReceiveLSN().toString());
                        stream.setAppliedLSN(stream.getLastReceiveLSN());
                    }
                } catch (Exception e) {
                    e.printStackTrace();
                } finally {
                    stream.close();
                }
            }).start();
        } catch (SQLException e) {
            e.printStackTrace();
        }
    }

    private static void processChange(String jsonMessage) {
        // JSON 파싱 및 이벤트 처리 로직
    }
    
    private static String getLastLsnFromStorage() { ... }
    private static void saveLsnToStorage(String lsn) { ... }
}

방안 2: Debezium 엔진 활용


import io.debezium.engine.DebeziumEngine;
import io.debezium.engine.format.Json;

public class DebeziumPostgresCDCExample {

    private static final String POSITION_FILE = "debezium_position.txt";

    public static void main(String[] args) {
        Properties props = new Properties();
        props.setProperty("connector.class", "io.debezium.connector.postgresql.PostgresConnector");
        props.setProperty("offset.storage", "io.debezium.storage.file.offset.FileOffsetBackingStore");
        props.setProperty("offset.storage.file.filename", POSITION_FILE);
        
        props.setProperty("database.hostname", "localhost");
        props.setProperty("database.port", "5432");
        props.setProperty("database.user", "user");
        props.setProperty("database.password", "password");
        props.setProperty("database.dbname", "dbname");
        props.setProperty("database.server.name", "postgres_server");
        
        props.setProperty("plugin.name", "pgoutput");
        props.setProperty("slot.name", "replicationSlotName");
        props.setProperty("publication.name", "my_publication");
        
        try (DebeziumEngine> engine = DebeziumEngine.create(Json.class)
                .using(props)
                .notifying(record -> processChange(record))
                .build()) {
                
            ExecutorService executor = Executors.newSingleThreadExecutor();
            executor.execute(engine);
            
            Runtime.getRuntime().addShutdownHook(new Thread(engine::close));
            executor.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    private static void processChange(ChangeEvent record) {
        // 이벤트 처리 로직
    }
}

3. 소비 위치 관리 전략

3.1 파일 기반 저장소


private static void savePositionToFile(String slotName, String lsn) {
    try (BufferedWriter writer = Files.newBufferedWriter(Paths.get("position.txt"))) {
        writer.write(slotName + ":" + lsn);
    } catch (IOException e) {
        System.err.println("Error saving position: " + e.getMessage());
    }
}

private static String loadPositionFromFile() {
    try (BufferedReader reader = Files.newBufferedReader(Paths.get("position.txt"))) {
        return reader.readLine();
    } catch (IOException e) {
        return null;
    }
}

3.2 데이터베이스 기반 저장소


private static void savePositionToDB(String slotName, String lsn) {
    try (Connection conn = DriverManager.getConnection(DB_URL, "user", "password")) {
        PreparedStatement stmt = conn.prepareStatement(
            "INSERT INTO replication_positions (slot_name, lsn, update_time) " +
            "VALUES (?, ?, NOW()) " +
            "ON CONFLICT(slot_name) DO UPDATE SET lsn = EXCLUDED.lsn, update_time = NOW()");
        stmt.setString(1, slotName);
        stmt.setString(2, lsn);
        stmt.executeUpdate();
    } catch (SQLException e) {
        System.err.println("Database error: " + e.getMessage());
    }
}

4. 운영 최적화 팁

  • 연결 중단 시 자동 재연결 메커니즘 구현
  • 메시지 처리 실패 시 재처리 큐 관리
  • 배치 처리를 통한 성능 향상
  • WAL 파일 보존 시간 조정
  • 복제 슬롯 존재 여부 검증 로직 추가
  • 다중 스레드 처리 시 순서 보장

5. 문제 해결 가이드

  • 복제 슬롯 없음: 슬롯 생성 확인
  • WAL 파일 누수: wal_keep_size 조정
  • 데이터 누적: 소비자 스레드 수 증가
  • 버전 호환성: PostgreSQL 버전과 플러그인 호환성 확인

태그: PostgreSQL Debezium CDC Logical Decoding Data Streaming

5월 28일 00:39에 게시됨