Spark SQL에서 MySQL로 데이터 쓰기 시 UPDATE 작업을 지원하는 방법

Spark SQL이 MySQL과 통합될 때 기본적으로 지원하는 모드는 Append, Overwrite, ErrorIfExists, Ignore가 있습니다. 이 문서에서는 Spark SQL이 MySQL로 데이터를 쓰면서 UPDATE 작업을 지원하도록 설정하는 방법을 설명합니다.

  1. 배경 정보

Spark는 데이터 소스와의 상호작용을 위한 모드를 지정하는 데 사용되는 열거형 클래스를 제공합니다. 원본 코드 분석 결과 Spark는 UPDATE 작업을 기본적으로 지원하지 않습니다.

  1. Spark SQL에서 UPDATE 작업을 지원하도록 설정하는 방법

일반적으로 Spark SQL을 통해 MySQL로 데이터를 쓸 때 다음 API를 사용합니다:

dataframe.write
   .format("sql.execution.customDatasource.jdbc")
   .option("jdbc.driver", "com.mysql.jdbc.Driver")
   .option("jdbc.url", "jdbc:mysql://localhost:3306/test?user=root&password=&useUnicode=true&characterEncoding=gbk&autoReconnect=true&failOverReadOnly=false")
   .option("jdbc.db", "test")
   .save()

하위레벨에서 Spark는 JDBC 양식 JdbcDialect를 통해 전달되는 데이터를 다음 SQL 문으로 번역합니다:

INSERT INTO student (columns_1, columns_2, ...) VALUES (?, ?, ...)

이 SQL 문은 명백히 INSERT 문이며 우리가 기대하는 UPDATE 작업과는 다릅니다. 예를 들어:

UPDATE table_name SET field1=new-value1, field2=new-value2

MySQL만이 지원하는 다음 SQL 문을 사용할 수 있습니다:

INSERT INTO student (columns_1, columns_2) VALUES ('값1', '값2') ON DUPLICATE KEY UPDATE columns_1 = '값1 업데이트', columns_2 = '값2 업데이트';

위의 SQL 문은 데이터가 없으면 삽입하고 데이터가 있으면 업데이트를 수행합니다. 따라서 우리의 주요 관심사는 Spark SQL 내부에서 JdbcDialect를 통해 다음 SQL 문을 생성하도록 설정하는 것입니다:

INSERT INTO 테이블명 (columns_1, columns_2) VALUES ('값1', '값2') ON DUPLICATE KEY UPDATE columns_1 = '값1 업데이트', columns_2 = '값2 업데이트';
  1. 원본 코드 개조 전 전체 코드 설계 및 실행 흐름 이해

dataframe.write를 호출하면 DataFrameWriter 객체가 반환됩니다. 이 객체는 Spark SQL이 외부 데이터 소스에 쓰기 위한 입구 역할을 합니다. save() 메서드가 호출되면 데이터가 실제로 쓰기 시작합니다.

save() 원본 코드의 세부 내용은 다음과 같습니다:

def save(): Unit = {
   val dataSource = DataSource(
     df.sparkSession,
     className = source,
     partitionColumns = partitioningColumns.getOrElse(Nil),
     bucketSpec = getBucketSpec,
     options = extraOptions.toMap
   )
   dataSource.write(mode, df)
 }

dataSource.write(mode, df)의 세부 내용은 다음과 같습니다:

// mode: 삽입 방식, df: 삽입할 데이터
dataSource.write(mode, df)

데이터 소스가 데이터베이스로 지정된 경우 아래와 같이 dataSource.createRelation을 통해 로직이 실행됩니다.

  1. 원본 코드 개조

Spark SQL이 MySQL로 데이터를 쓰는 작업은 DefaultSource 클래스로 들어갑니다. 이 클래스에서 UPDATE 작업을 지원하도록 설정해야 합니다.

UPDATE 작업을 지원하려면 다음 조건을 체크해야 합니다:

if (saveMode == CustomSaveMode.Update) {
   // UPDATE용 SQL 문 생성
   s"INSERT INTO $table ($columns) VALUES ($placeholders) ON DUPLICATE KEY UPDATE $duplicateSetting"
} else {
   // 기본 INSERT 문 생성
   s"INSERT INTO $table ($columns) VALUES ($placeholders)"
}

위의 로직을 통해 맞춤형 SQL 문이 생성됩니다. 그러나 Spark에서.prepareStatement를 통해 SQL 문을 실행할 때 다음과 같은 문제가 발생할 수 있습니다:

INSERT INTO TABLE (컬럼1, 컬럼2, 컬럼3) VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE 컬럼1=?, 컬럼2=?, 컬럼3=?

위의 SQL 문은 6개의 플레이스홀더를 가지고 있지만, 데이터가 3개일 때는 3개의 세터가 생성됩니다. 이는 데이터베이스 드라이버가 예외를 발생시킬 수 있습니다.

이를 해결하기 위해 다음과 같이 세터를 조정해야 합니다:

val isUpdate = saveMode == CustomSaveMode.Update
val setters: Array[JDBCValueSetter] = isUpdate match {
   case true => {
     val setters = rddSchema.fields.map(_.dataType).map(makeSetter(conn, dialect, _)).toArray
     Array.fill(2)(setters).flatten
   }
   case _ => {
     rddSchema.fields.map(_.dataType).map(makeSetter(conn, dialect, _)).toArray
   }
}
  1. JDBCValueSetter의 조정

데이터 유형에 따른 세터를 조정합니다:

private type JDBCValueSetter = (PreparedStatement, Row, Int, Int) => Unit
private def makeSetter(
   conn: Connection,
   dialect: JdbcDialect,
   dataType: DataType): JDBCValueSetter = dataType match {
   case IntegerType => {
     (stmt: PreparedStatement, row: Row, pos: Int, cursor:Int) => {
       stmt.setInt(pos + 1, row.getInt(pos - cursor))
     }
   }
   // 그외 데이터 유형은 유사하게 조정
}
  1. 완전한 코드

GitHub 저장소 참조

태그: Spark JDBC MySQL UPDATE 데이터베이스

5월 28일 23:38에 게시됨