Apache Spark에서 RDD, DataFrame, Dataset 변환 방법

DataFrame와 RDD 상호작용

공식 문서: https://spark.apache.org/docs/2.2.1/sql-programming-guide.html

1. DataFrame와 RDD 상호작용 - 반사 방법 (RDD ⇒ DataFrame)

① 샘플 클래스 생성, ② toDF() 메소드 호출

package com.spark.example
import org.apache.spark.sql.SparkSession
/**
* DataFrame과 RDD의 상호작용 예제
*/
object SparkDataConversion {
def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("SparkDataConversion").master("local[2]").getOrCreate()

// RDD => DataFrame 변환
val rdd = spark.sparkContext.textFile("file:///data/users.txt")

// 암시적 변환 임포트
import spark.implicits._
val userDataFrame = rdd.map(_.split(",")).map(row => UserData(row(0).toInt, row(1), row(2).toInt)).toDF()

userDataFrame.show()

spark.stop()
}
case class UserData(id: Int, name: String, age: Int)
}

요약:

  • 반사를 사용하여 특정 데이터 타입을 포함하는 RDD의 메타데이터를 추론합니다. 이 메타데이터는 케이스 클래스입니다
  • DataFrame API 또는 SQL 방식으로 프로그래밍할 수 있습니다

2. DataFrame와 RDD 상호작용 - 프로그래밍 방법 (RDD ⇒ DataFrame)

두 번째 프로그래밍 방식은 첫 번째 반사 방식보다 복잡하지만, 컬럼과 그들의 타입이 런타임까지 알려지지 않을 때 DataFrame/Dataset을 구성할 수 있습니다. 케이스 클래스를 미리 정의할 수 없을 때 이 방식을 사용해야 하며, 다음 세 단계를 따라야 합니다: ① Row 객체를 사용하여 RDD 생성 ② StructType을 사용하여 스키마 정의 ③ SparkSession의 createDataFrame 메소드를 사용하여 스키마를 RDD의 Row 객체에 적용

def convertWithSchema(spark: SparkSession): Unit = {
// RDD => DataFrame 변환
val rdd = spark.sparkContext.textFile("file:///data/users.txt")

// 1. Row 객체를 사용하여 RDD 생성
val userRDD = rdd.map(_.split(",")).map(row => Row(row(0).toInt, row(1), row(2).toInt))

// 2. StructType을 사용하여 스키마 정의
val schema = StructType(Array(StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)))

// 3. createDataFrame 메소드로 스키마 적용
val userDataFrame = spark.createDataFrame(userRDD, schema)
userDataFrame.printSchema()
userDataFrame.show()
}

요약:

DataFrame과 RDD 상호작용의 두 가지 방법:

  1. 반사: 케이스 클래스 사용 - 전제 조건: 필드와 필드 타입을 미리 알아야 함
  2. 프로그래밍: Row 객체 사용 - 첫 번째 방식이 요구사항을 충족시키지 못할 때 (컬럼을 미리 모를 때)
  3. 선택 가이드라인: 첫 번째 방식을 우선적으로 고려

3. Dataset 소개

공식 문서에 따르면, Dataset은 분산 데이터셋이며 새로운 인터페이스로 Spark 1.6 버전에서 추가되었습니다. DataFrame이 먼저 나온 후 1.6 버전에서 Dataset이 등장했습니다.

Dataset의 장점:

  • 강한 타입 지원
  • 람다 표현식 지원
  • Spark SQL 실행 엔진의 최적화 제공
  • DataFrame의 대부분 기능을 포함

Dataset은 다음과 같은 방법으로 구성할 수 있습니다:

  • JVM 객체
  • map, flatMap, filter와 같은 함수 표현식

Dataset은 Java와 Scala 언어에서 사용할 수 있지만, Python은 아직 Dataset API를 지원하지 않으므로 Python 개발 시에는 DataFrame API를 사용해야 합니다.

  1. DataFrame API의 확장이며 Spark의 최신 데이터 추상입니다
  2. 사용자 친화적인 API 스타일로 타입 안전 검사와 DataFrame의 쿼리 최적화 특성을 모두 제공
  3. Dataset은 인코더를 지원하므로 힙 외부 데이터에 접근할 때 전체 객체 역직렬화를 피해 효율성을 높일 수 있습니다
  4. 케이스 클래스를 사용하여 Dataset에서 데이터 구조 정보를 정의하며, 케이스 클래스의 각 속성 이름은 Dataset의 필드 이름에 직접 매핑됩니다
  5. DataFrame은 Dataset의 특별한 경우로, DataFrame = Dataset[Row]로 볼 수 있습니다. 따라서 as() 메소드를 통해 DataFrame을 Dataset으로 변환할 수 있습니다. Row는 Car, Person과 같은 타입과 유사한 타입으로, 모든 테이블 구조 정보를 Row로 표현합니다
  6. DataSet은 강한 타입을 지원합니다. 예를 들어 Dataset[Car], Dataset[Person] 등을 가질 수 있습니다
  7. DataFrame은 필드는 알지만 필드 타입을 모르므로 컴파일 시 타입 오류를 검사할 수 없습니다. 예를 들어 String에 대해 감산 연산을 수행하면 실행 시 오류가 발생합니다. 반면 DataSet은 필드뿐만 아니라 필드 타입까지 알기 때문에 더 엄격한 오류 검사를 제공합니다. 이는 JSON 객체와 클래스 객체 간의 차이와 유사합니다
// 예시: DataFrame => Dataset (샘플 클래스 생성, DataFrame.as[샘플 클래스 이름])

package com.spark.example
import org.apache.spark.sql.SparkSession

/**
* Dataset 조작 예제
*/
object DatasetExample {

def main(args: Array[String]): Unit = {

val spark = SparkSession.builder().appName("DatasetExample").master("local[2]").getOrCreate()

// CSV 파일 경로
val path = "file:///data/sales.csv"

// 암시적 변환 임포트
import spark.implicits._

// Spark가 CSV 파일을 어떻게 파싱할지 설정
val salesDataFrame = spark.read.option("header","true").option("inferSchema","true").csv(path)
salesDataFrame.show

// salesDataFrame을 Dataset으로 변환
val salesDataset = salesDataFrame.as[SalesRecord]
salesDataset.map(record => record.itemId).show

spark.stop()
}

// CSV 파일의 열명을 복사하여 케이스 클래스 생성
case class SalesRecord(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)

}

4. 확장: RDD, DataFrame 및 Dataset의 상호 변환

4.1 Dataset을 RDD로 변환
rdd() 메소드를 호출하면 됩니다.

1) Dataset 생성
scala> val DS = Seq(Person("Andy", 32)).toDS()
DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

2) Dataset을 RDD로 변환
scala> DS.rdd
res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28


4.2 DataFrame과 Dataset의 상호 변환
1. DataFrame을 Dataset으로 변환
1) DataFrame 생성
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]

2) 케이스 클래스 정의
scala> case class Person(name: String, age: Long)
defined class Person

3) DataFrame을 Dataset으로 변환
scala> df.as[Person]
res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]


2. Dataset을 DataFrame으로 변환
1) 케이스 클래스 정의
scala> case class Person(name: String, age: Long)
defined class Person

2) Dataset 생성
scala> val ds = Seq(Person("Andy", 32)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]

3) Dataset을 DataFrame으로 변환
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]

4) 결과 확인
scala> df.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+

5. 데모 메소드 캡슐화

package com.spark.example
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
  * DataFrame과 RDD의 상호작용 데모
  */
object SparkDataConversionDemo {
  def main(args: Array[String]): Unit = {

    val spark = SparkSession.builder().appName("SparkDataConversionDemo").master("local[2]").getOrCreate()

    // 반사 방식으로 변환
    // convertWithReflection(spark)

    // 프로그래밍 방식으로 변환
    // convertWithSchema(spark)

    spark.stop()
  }

  // 프로그래밍 방식으로 변환
  def convertWithSchema(spark: SparkSession): Unit = {
    // RDD => DataFrame 변환
    val rdd = spark.sparkContext.textFile("file:///data/users.txt")

    // 1. Row 객체를 사용하여 RDD 생성
    val userRDD = rdd.map(_.split(",")).map(row => Row(row(0).toInt, row(1), row(2).toInt))

    // 2. StructType을 사용하여 스키마 정의
    val schema = StructType(Array(StructField("id", IntegerType, true),
      StructField("name", StringType, true),
      StructField("age", IntegerType, true)))
    
    // 3. createDataFrame 메소드로 스키마 적용
    val userDataFrame = spark.createDataFrame(userRDD, schema)
    userDataFrame.printSchema()
    userDataFrame.show()
  }
  
  // 반사 방식으로 변환
  private def convertWithReflection(spark: SparkSession) = {
    // RDD => DataFrame 변환
    val rdd = spark.sparkContext.textFile("file:///data/users.txt")

    // 암시적 변환 임포트
    import spark.implicits._
    val userDataFrame = rdd.map(_.split(",")).map(row => UserData(row(0).toInt, row(1), row(2).toInt)).toDF()

    userDataFrame.show()

    // 나이가 20보다 큰 데이터 필터링
    userDataFrame.filter(userDataFrame.col("age") > 20).show

    // Spark SQL 방식으로 데이터 쿼리
    userDataFrame.createOrReplaceTempView("user_data")
    spark.sql("show tables").show()
    spark.sql("select * from user_data where age > 20").show()
  }

  case class UserData(id: Int, name: String, age: Int)
}

참고: Spark 대데이터 처리 - DataFrame과 Dataset

태그: Apache Spark RDD DataFrame Dataset Spark SQL

5월 23일 14:21에 게시됨