Spark SQL이 MySQL과 통합될 때 기본적으로 지원하는 모드는 Append, Overwrite, ErrorIfExists, Ignore가 있습니다. 이 문서에서는 Spark SQL이 MySQL로 데이터를 쓰면서 UPDATE 작업을 지원하도록 설정하는 방법을 설명합니다.
- 배경 정보
Spark는 데이터 소스와의 상호작용을 위한 모드를 지정하는 데 사용되는 열거형 클래스를 제공합니다. 원본 코드 분석 결과 Spark는 UPDATE 작업을 기본적으로 지원하지 않습니다.
- Spark SQL에서 UPDATE 작업을 지원하도록 설정하는 방법
일반적으로 Spark SQL을 통해 MySQL로 데이터를 쓸 때 다음 API를 사용합니다:
dataframe.write
.format("sql.execution.customDatasource.jdbc")
.option("jdbc.driver", "com.mysql.jdbc.Driver")
.option("jdbc.url", "jdbc:mysql://localhost:3306/test?user=root&password=&useUnicode=true&characterEncoding=gbk&autoReconnect=true&failOverReadOnly=false")
.option("jdbc.db", "test")
.save()
하위레벨에서 Spark는 JDBC 양식 JdbcDialect를 통해 전달되는 데이터를 다음 SQL 문으로 번역합니다:
INSERT INTO student (columns_1, columns_2, ...) VALUES (?, ?, ...)
이 SQL 문은 명백히 INSERT 문이며 우리가 기대하는 UPDATE 작업과는 다릅니다. 예를 들어:
UPDATE table_name SET field1=new-value1, field2=new-value2
MySQL만이 지원하는 다음 SQL 문을 사용할 수 있습니다:
INSERT INTO student (columns_1, columns_2) VALUES ('값1', '값2') ON DUPLICATE KEY UPDATE columns_1 = '값1 업데이트', columns_2 = '값2 업데이트';
위의 SQL 문은 데이터가 없으면 삽입하고 데이터가 있으면 업데이트를 수행합니다. 따라서 우리의 주요 관심사는 Spark SQL 내부에서 JdbcDialect를 통해 다음 SQL 문을 생성하도록 설정하는 것입니다:
INSERT INTO 테이블명 (columns_1, columns_2) VALUES ('값1', '값2') ON DUPLICATE KEY UPDATE columns_1 = '값1 업데이트', columns_2 = '값2 업데이트';
- 원본 코드 개조 전 전체 코드 설계 및 실행 흐름 이해
dataframe.write를 호출하면 DataFrameWriter 객체가 반환됩니다. 이 객체는 Spark SQL이 외부 데이터 소스에 쓰기 위한 입구 역할을 합니다. save() 메서드가 호출되면 데이터가 실제로 쓰기 시작합니다.
save() 원본 코드의 세부 내용은 다음과 같습니다:
def save(): Unit = {
val dataSource = DataSource(
df.sparkSession,
className = source,
partitionColumns = partitioningColumns.getOrElse(Nil),
bucketSpec = getBucketSpec,
options = extraOptions.toMap
)
dataSource.write(mode, df)
}
dataSource.write(mode, df)의 세부 내용은 다음과 같습니다:
// mode: 삽입 방식, df: 삽입할 데이터
dataSource.write(mode, df)
데이터 소스가 데이터베이스로 지정된 경우 아래와 같이 dataSource.createRelation을 통해 로직이 실행됩니다.
- 원본 코드 개조
Spark SQL이 MySQL로 데이터를 쓰는 작업은 DefaultSource 클래스로 들어갑니다. 이 클래스에서 UPDATE 작업을 지원하도록 설정해야 합니다.
UPDATE 작업을 지원하려면 다음 조건을 체크해야 합니다:
if (saveMode == CustomSaveMode.Update) {
// UPDATE용 SQL 문 생성
s"INSERT INTO $table ($columns) VALUES ($placeholders) ON DUPLICATE KEY UPDATE $duplicateSetting"
} else {
// 기본 INSERT 문 생성
s"INSERT INTO $table ($columns) VALUES ($placeholders)"
}
위의 로직을 통해 맞춤형 SQL 문이 생성됩니다. 그러나 Spark에서.prepareStatement를 통해 SQL 문을 실행할 때 다음과 같은 문제가 발생할 수 있습니다:
INSERT INTO TABLE (컬럼1, 컬럼2, 컬럼3) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE 컬럼1=?, 컬럼2=?, 컬럼3=?
위의 SQL 문은 6개의 플레이스홀더를 가지고 있지만, 데이터가 3개일 때는 3개의 세터가 생성됩니다. 이는 데이터베이스 드라이버가 예외를 발생시킬 수 있습니다.
이를 해결하기 위해 다음과 같이 세터를 조정해야 합니다:
val isUpdate = saveMode == CustomSaveMode.Update
val setters: Array[JDBCValueSetter] = isUpdate match {
case true => {
val setters = rddSchema.fields.map(_.dataType).map(makeSetter(conn, dialect, _)).toArray
Array.fill(2)(setters).flatten
}
case _ => {
rddSchema.fields.map(_.dataType).map(makeSetter(conn, dialect, _)).toArray
}
}
- JDBCValueSetter의 조정
데이터 유형에 따른 세터를 조정합니다:
private type JDBCValueSetter = (PreparedStatement, Row, Int, Int) => Unit
private def makeSetter(
conn: Connection,
dialect: JdbcDialect,
dataType: DataType): JDBCValueSetter = dataType match {
case IntegerType => {
(stmt: PreparedStatement, row: Row, pos: Int, cursor:Int) => {
stmt.setInt(pos + 1, row.getInt(pos - cursor))
}
}
// 그외 데이터 유형은 유사하게 조정
}
- 완전한 코드