Spark에서 Kudu DML 작업 처리하기

Kudu는 다양한 DML 유형의 작업을 지원하며, 이러한 작업 중 일부는 Spark on Kudu 통합에서 사용할 수 있습니다.

포함되는 작업:

  • INSERT - DataFrame의 행을 Kudu 테이블에 삽입합니다. INSERT는 API에서 완전히 지원되지만 Spark에서는 사용을 권장하지 않습니다. Spark 태스크가 재실행될 가능성이 있어 이미 삽입된 행을 다시 삽입해야 할 수 있으므로 위험합니다. 이 경우 이미 존재하는 행에 대해 INSERT가 허용되지 않으므로 실패가 발생합니다. 따라서 아래 설명된 INSERT_IGNORE 사용을 권장합니다.
  • INSERT_IGNORE - DataFrame의 행을 Kudu 테이블에 삽입합니다. 행이 이미 존재하면 삽입 작업을 무시합니다.
  • DELETE - Kudu 테이블에서 DataFrame의 행을 삭제합니다.
  • UPSERT - Kudu 테이블에 행이 존재하면 업데이트하고, 그렇지 않으면 삽입합니다.
  • UPDATE - Kudu 테이블의 기존 행을 업데이트합니다.

INSERT 작업 예제


import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.kudu.spark.kudu._

case class Person(name: String, age: Int, city: String)

object InsertExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("KuduInsert")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext

    val masterAddrs = "kudu-master-01:7051,kudu-master-02:7051,kudu-master-03:7051"
    val kuduCtx = new KuduContext(masterAddrs, sc)

    val tableName = "sample_kudu_table"
    val tableOptions = Map(
      "kudu.table"  -> tableName,
      "kudu.master" -> masterAddrs
    )

    import sqlContext.implicits._

    val people = Seq(
      Person("alice", 29, "seoul"),
      Person("bob", 22, "busan")
    )
    val peopleRDD = sc.parallelize(people)
    val peopleDF = peopleRDD.toDF()

    kuduCtx.insertRows(peopleDF, tableName)

    sqlContext.read.options(tableOptions).kudu.show()
  }
}

DELETE 작업 예제


import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

case class Person(name: String, age: Int, city: String)

object DeleteExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("KuduDelete")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext

    val masterAddrs = "kudu-master-01:7051,kudu-master-02:7051,kudu-master-03:7051"
    val kuduCtx = new KuduContext(masterAddrs, sc)

    val tableName = "sample_kudu_table"
    val tableOptions = Map(
      "kudu.table"  -> tableName,
      "kudu.master" -> masterAddrs
    )

    import sqlContext.implicits._

    // 삭제할 키 데이터를 선별하기 위해 DataFrame 생성
    val toDelete = sc.parallelize(Seq(Person("alice", 29, "seoul"))).toDF()
    toDelete.createOrReplaceTempView("delete_view")

    val deleteKeys = sqlContext.sql("SELECT name FROM delete_view WHERE age > 25")
    kuduCtx.deleteRows(deleteKeys, tableName)

    sqlContext.read.options(tableOptions).kudu.show()
  }
}

UPSERT 작업 예제

UPSERT는 테이블에 행이 존재하면 업데이트하고, 존재하지 않으면 새로 삽입하는 방식입니다.


import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

case class Person(name: String, age: Int, city: String)

object UpsertExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("KuduUpsert")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext

    val masterAddrs = "kudu-master-01:7051,kudu-master-02:7051,kudu-master-03:7051"
    val kuduCtx = new KuduContext(masterAddrs, sc)

    val tableName = "sample_kudu_table"
    val tableOptions = Map(
      "kudu.table"  -> tableName,
      "kudu.master" -> masterAddrs
    )

    import sqlContext.implicits._

    val updatedPeople = Seq(
      Person("alice", 30, "seoul"),  // 기존 행 업데이트 (나이 변경)
      Person("charlie", 35, "incheon")  // 새 행 삽입
    )
    val updatedRDD = sc.parallelize(updatedPeople)
    val updatedDF = updatedRDD.toDF()

    kuduCtx.upsertRows(updatedDF, tableName)

    sqlContext.read.options(tableOptions).kudu.show()
  }
}

UPDATE 작업 예제

UPDATE는 기존 테이블의 특정 행을 수정할 때 사용합니다.


import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession

case class Person(name: String, age: Int, city: String)

object UpdateExample {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setAppName("KuduUpdate")
      .setMaster("local[2]")
      .set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    val sc = new SparkContext(conf)
    val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext

    val masterAddrs = "kudu-master-01:7051,kudu-master-02:7051,kudu-master-03:7051"
    val kuduCtx = new KuduContext(masterAddrs, sc)

    val tableName = "sample_kudu_table"
    val tableOptions = Map(
      "kudu.table"  -> tableName,
      "kudu.master" -> masterAddrs
    )

    import sqlContext.implicits._

    // 업데이트할 데이터: 키는 그대로, 변경할 컬럼 값만 다르게 설정
    val modifications = Seq(Person("alice", 30, "busan"))
    val modRDD = sc.parallelize(modifications)
    val modDF = modRDD.toDF()

    kuduCtx.updateRows(modDF, tableName)

    sqlContext.read.options(tableOptions).kudu.show()
  }
}

태그: Kudu Spark DataFrame DML INSERT

6월 24일 23:24에 게시됨