Flink SQL 환경 구성
Flink의 Table API와 SQL 기능은 flink-table 모듈을 통해 제공되며, 프로젝트에 다음 의존성을 추가해야 합니다:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table_2.11</artifactId>
<version>1.5.0</version>
</dependency>
또한 스칼라 기반의 배치 또는 스트리밍 처리를 위해 별도의 라이브러리가 필요합니다. 배치 작업을 위한 의존성은 다음과 같습니다:
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.11</artifactId>
<version>1.5.0</version>
</dependency>
실행 환경 생성
Flink의 테이블 쿼리는 배치 및 스트리밍 환경 모두에서 동일한 구조를 따릅니다. 아래는 두 가지 환경에서 TableEnvironment를 생성하는 예시입니다:
// 스트리밍 환경
val streamEnv = StreamExecutionEnvironment.getExecutionEnvironment
val streamTableEnv = TableEnvironment.getTableEnvironment(streamEnv)
// 배치 환경
val batchEnv = ExecutionEnvironment.getExecutionEnvironment
val batchTableEnv = TableEnvironment.getTableEnvironment(batchEnv)
TableEnvironment는 테이블 처리의 중심 역할을 하며, 다음과 같은 기능을 수행합니다:
- 내부 디렉터리에 테이블을 등록
- 외부 데이터 소스를 연결하는 외부 디렉터리 등록
- SQL 쿼리 실행
- 사용자 정의 함수(스칼라, 테이블, 집계 등) 등록
- DataStream 또는 DataSet을 테이블로 변환
- ExecutionEnvironment 또는 StreamExecutionEnvironment 참조 유지
테이블 등록 방식
TableEnvironment는 이름 기반으로 테이블을 관리하며, 입력 테이블과 출력 테이블로 나뉩니다.
입력 테이블 등록
입력 테이블은 쿼리의 데이터 원천으로 사용되며, 다음과 같은 소스에서 등록할 수 있습니다:
- 기존의
Table객체 (테이블 API나 SQL 쿼리 결과) TableSource(파일, 데이터베이스, 메시지 큐 등 외부 시스템 접근)DataStream또는DataSet(기존 데이터 스트림/셋)
출력 테이블 등록 (TableSink)
쿼리 결과를 외부 시스템에 저장하기 위해 TableSink를 등록할 수 있습니다. 예를 들어 파일, 데이터베이스, 메시지 브로커 등에 데이터를 내보낼 수 있습니다.
실제 사용 예시: CSV 파일 처리
다음은 배치 환경에서 CSV 파일을 읽어와 조건 필터링 후 다른 파일로 저장하는 예제입니다:
// 배치 환경 생성
val executionEnv = ExecutionEnvironment.getExecutionEnvironment
// 테이블 환경 초기화
val tableEnv = TableEnvironment.getTableEnvironment(executionEnv)
// CSV 파일 소스 정의
val csvSource = CsvTableSource.builder()
.path("data1.csv")
.field("id", Types.INT)
.field("name", Types.STRING)
.field("age", Types.INT)
.fieldDelimiter(",")
.lineDelimiter("\n")
.ignoreFirstLine()
.ignoreParseErrors()
.build()
// 외부 소스를 테이블로 등록
tableEnv.registerTableSource("tableA", csvSource)
// 테이블 API로 데이터 조회 및 필터링
val filteredTable = tableEnv.scan("tableA")
.select("id, name, age")
.filter("name == 'lisi'")
// 결과를 CSV 파일로 출력
filteredTable.writeToSink(
new CsvTableSink("bbb.csv", ",", 1, FileSystem.WriteMode.OVERWRITE)
)
// 실행
executionEnv.execute()
또한 동일한 작업을 SQL 문을 통해 수행할 수도 있습니다:
val sqlResult = tableEnv.sqlQuery(
"SELECT id, name, age FROM tableA WHERE name = 'lisi' ORDER BY id LIMIT 10"
)
sqlResult.writeToSink(
new CsvTableSink("output_sql.csv", ",", 1, FileSystem.WriteMode.OVERWRITE)
)