Kudu는 다양한 DML 유형의 작업을 지원하며, 이러한 작업 중 일부는 Spark on Kudu 통합에서 사용할 수 있습니다.
포함되는 작업:
- INSERT - DataFrame의 행을 Kudu 테이블에 삽입합니다. INSERT는 API에서 완전히 지원되지만 Spark에서는 사용을 권장하지 않습니다. Spark 태스크가 재실행될 가능성이 있어 이미 삽입된 행을 다시 삽입해야 할 수 있으므로 위험합니다. 이 경우 이미 존재하는 행에 대해 INSERT가 허용되지 않으므로 실패가 발생합니다. 따라서 아래 설명된 INSERT_IGNORE 사용을 권장합니다.
- INSERT_IGNORE - DataFrame의 행을 Kudu 테이블에 삽입합니다. 행이 이미 존재하면 삽입 작업을 무시합니다.
- DELETE - Kudu 테이블에서 DataFrame의 행을 삭제합니다.
- UPSERT - Kudu 테이블에 행이 존재하면 업데이트하고, 그렇지 않으면 삽입합니다.
- UPDATE - Kudu 테이블의 기존 행을 업데이트합니다.
INSERT 작업 예제
import org.apache.kudu.spark.kudu.KuduContext
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
import org.apache.kudu.spark.kudu._
case class Person(name: String, age: Int, city: String)
object InsertExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("KuduInsert")
.setMaster("local[2]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext
val masterAddrs = "kudu-master-01:7051,kudu-master-02:7051,kudu-master-03:7051"
val kuduCtx = new KuduContext(masterAddrs, sc)
val tableName = "sample_kudu_table"
val tableOptions = Map(
"kudu.table" -> tableName,
"kudu.master" -> masterAddrs
)
import sqlContext.implicits._
val people = Seq(
Person("alice", 29, "seoul"),
Person("bob", 22, "busan")
)
val peopleRDD = sc.parallelize(people)
val peopleDF = peopleRDD.toDF()
kuduCtx.insertRows(peopleDF, tableName)
sqlContext.read.options(tableOptions).kudu.show()
}
}
DELETE 작업 예제
import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
case class Person(name: String, age: Int, city: String)
object DeleteExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("KuduDelete")
.setMaster("local[2]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext
val masterAddrs = "kudu-master-01:7051,kudu-master-02:7051,kudu-master-03:7051"
val kuduCtx = new KuduContext(masterAddrs, sc)
val tableName = "sample_kudu_table"
val tableOptions = Map(
"kudu.table" -> tableName,
"kudu.master" -> masterAddrs
)
import sqlContext.implicits._
// 삭제할 키 데이터를 선별하기 위해 DataFrame 생성
val toDelete = sc.parallelize(Seq(Person("alice", 29, "seoul"))).toDF()
toDelete.createOrReplaceTempView("delete_view")
val deleteKeys = sqlContext.sql("SELECT name FROM delete_view WHERE age > 25")
kuduCtx.deleteRows(deleteKeys, tableName)
sqlContext.read.options(tableOptions).kudu.show()
}
}
UPSERT 작업 예제
UPSERT는 테이블에 행이 존재하면 업데이트하고, 존재하지 않으면 새로 삽입하는 방식입니다.
import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
case class Person(name: String, age: Int, city: String)
object UpsertExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("KuduUpsert")
.setMaster("local[2]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext
val masterAddrs = "kudu-master-01:7051,kudu-master-02:7051,kudu-master-03:7051"
val kuduCtx = new KuduContext(masterAddrs, sc)
val tableName = "sample_kudu_table"
val tableOptions = Map(
"kudu.table" -> tableName,
"kudu.master" -> masterAddrs
)
import sqlContext.implicits._
val updatedPeople = Seq(
Person("alice", 30, "seoul"), // 기존 행 업데이트 (나이 변경)
Person("charlie", 35, "incheon") // 새 행 삽입
)
val updatedRDD = sc.parallelize(updatedPeople)
val updatedDF = updatedRDD.toDF()
kuduCtx.upsertRows(updatedDF, tableName)
sqlContext.read.options(tableOptions).kudu.show()
}
}
UPDATE 작업 예제
UPDATE는 기존 테이블의 특정 행을 수정할 때 사용합니다.
import org.apache.kudu.spark.kudu._
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.sql.SparkSession
case class Person(name: String, age: Int, city: String)
object UpdateExample {
def main(args: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("KuduUpdate")
.setMaster("local[2]")
.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
val sc = new SparkContext(conf)
val sqlContext = SparkSession.builder().config(conf).getOrCreate().sqlContext
val masterAddrs = "kudu-master-01:7051,kudu-master-02:7051,kudu-master-03:7051"
val kuduCtx = new KuduContext(masterAddrs, sc)
val tableName = "sample_kudu_table"
val tableOptions = Map(
"kudu.table" -> tableName,
"kudu.master" -> masterAddrs
)
import sqlContext.implicits._
// 업데이트할 데이터: 키는 그대로, 변경할 컬럼 값만 다르게 설정
val modifications = Seq(Person("alice", 30, "busan"))
val modRDD = sc.parallelize(modifications)
val modDF = modRDD.toDF()
kuduCtx.updateRows(modDF, tableName)
sqlContext.read.options(tableOptions).kudu.show()
}
}