DataFrame와 RDD 상호작용
공식 문서: https://spark.apache.org/docs/2.2.1/sql-programming-guide.html
1. DataFrame와 RDD 상호작용 - 반사 방법 (RDD ⇒ DataFrame)
① 샘플 클래스 생성, ② toDF() 메소드 호출
package com.spark.example
import org.apache.spark.sql.SparkSession
/**
* DataFrame과 RDD의 상호작용 예제
*/
object SparkDataConversion {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SparkDataConversion").master("local[2]").getOrCreate()
// RDD => DataFrame 변환
val rdd = spark.sparkContext.textFile("file:///data/users.txt")
// 암시적 변환 임포트
import spark.implicits._
val userDataFrame = rdd.map(_.split(",")).map(row => UserData(row(0).toInt, row(1), row(2).toInt)).toDF()
userDataFrame.show()
spark.stop()
}
case class UserData(id: Int, name: String, age: Int)
}
요약:
- 반사를 사용하여 특정 데이터 타입을 포함하는 RDD의 메타데이터를 추론합니다. 이 메타데이터는 케이스 클래스입니다
- DataFrame API 또는 SQL 방식으로 프로그래밍할 수 있습니다
2. DataFrame와 RDD 상호작용 - 프로그래밍 방법 (RDD ⇒ DataFrame)
두 번째 프로그래밍 방식은 첫 번째 반사 방식보다 복잡하지만, 컬럼과 그들의 타입이 런타임까지 알려지지 않을 때 DataFrame/Dataset을 구성할 수 있습니다. 케이스 클래스를 미리 정의할 수 없을 때 이 방식을 사용해야 하며, 다음 세 단계를 따라야 합니다: ① Row 객체를 사용하여 RDD 생성 ② StructType을 사용하여 스키마 정의 ③ SparkSession의 createDataFrame 메소드를 사용하여 스키마를 RDD의 Row 객체에 적용
def convertWithSchema(spark: SparkSession): Unit = {
// RDD => DataFrame 변환
val rdd = spark.sparkContext.textFile("file:///data/users.txt")
// 1. Row 객체를 사용하여 RDD 생성
val userRDD = rdd.map(_.split(",")).map(row => Row(row(0).toInt, row(1), row(2).toInt))
// 2. StructType을 사용하여 스키마 정의
val schema = StructType(Array(StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)))
// 3. createDataFrame 메소드로 스키마 적용
val userDataFrame = spark.createDataFrame(userRDD, schema)
userDataFrame.printSchema()
userDataFrame.show()
}
요약:
DataFrame과 RDD 상호작용의 두 가지 방법:
- 반사: 케이스 클래스 사용 - 전제 조건: 필드와 필드 타입을 미리 알아야 함
- 프로그래밍: Row 객체 사용 - 첫 번째 방식이 요구사항을 충족시키지 못할 때 (컬럼을 미리 모를 때)
- 선택 가이드라인: 첫 번째 방식을 우선적으로 고려
3. Dataset 소개
공식 문서에 따르면, Dataset은 분산 데이터셋이며 새로운 인터페이스로 Spark 1.6 버전에서 추가되었습니다. DataFrame이 먼저 나온 후 1.6 버전에서 Dataset이 등장했습니다.
Dataset의 장점:
- 강한 타입 지원
- 람다 표현식 지원
- Spark SQL 실행 엔진의 최적화 제공
- DataFrame의 대부분 기능을 포함
Dataset은 다음과 같은 방법으로 구성할 수 있습니다:
- JVM 객체
- map, flatMap, filter와 같은 함수 표현식
Dataset은 Java와 Scala 언어에서 사용할 수 있지만, Python은 아직 Dataset API를 지원하지 않으므로 Python 개발 시에는 DataFrame API를 사용해야 합니다.
- DataFrame API의 확장이며 Spark의 최신 데이터 추상입니다
- 사용자 친화적인 API 스타일로 타입 안전 검사와 DataFrame의 쿼리 최적화 특성을 모두 제공
- Dataset은 인코더를 지원하므로 힙 외부 데이터에 접근할 때 전체 객체 역직렬화를 피해 효율성을 높일 수 있습니다
- 케이스 클래스를 사용하여 Dataset에서 데이터 구조 정보를 정의하며, 케이스 클래스의 각 속성 이름은 Dataset의 필드 이름에 직접 매핑됩니다
- DataFrame은 Dataset의 특별한 경우로, DataFrame = Dataset[Row]로 볼 수 있습니다. 따라서 as() 메소드를 통해 DataFrame을 Dataset으로 변환할 수 있습니다. Row는 Car, Person과 같은 타입과 유사한 타입으로, 모든 테이블 구조 정보를 Row로 표현합니다
- DataSet은 강한 타입을 지원합니다. 예를 들어 Dataset[Car], Dataset[Person] 등을 가질 수 있습니다
- DataFrame은 필드는 알지만 필드 타입을 모르므로 컴파일 시 타입 오류를 검사할 수 없습니다. 예를 들어 String에 대해 감산 연산을 수행하면 실행 시 오류가 발생합니다. 반면 DataSet은 필드뿐만 아니라 필드 타입까지 알기 때문에 더 엄격한 오류 검사를 제공합니다. 이는 JSON 객체와 클래스 객체 간의 차이와 유사합니다
// 예시: DataFrame => Dataset (샘플 클래스 생성, DataFrame.as[샘플 클래스 이름])
package com.spark.example
import org.apache.spark.sql.SparkSession
/**
* Dataset 조작 예제
*/
object DatasetExample {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("DatasetExample").master("local[2]").getOrCreate()
// CSV 파일 경로
val path = "file:///data/sales.csv"
// 암시적 변환 임포트
import spark.implicits._
// Spark가 CSV 파일을 어떻게 파싱할지 설정
val salesDataFrame = spark.read.option("header","true").option("inferSchema","true").csv(path)
salesDataFrame.show
// salesDataFrame을 Dataset으로 변환
val salesDataset = salesDataFrame.as[SalesRecord]
salesDataset.map(record => record.itemId).show
spark.stop()
}
// CSV 파일의 열명을 복사하여 케이스 클래스 생성
case class SalesRecord(transactionId:Int,customerId:Int,itemId:Int,amountPaid:Double)
}
4. 확장: RDD, DataFrame 및 Dataset의 상호 변환
4.1 Dataset을 RDD로 변환
rdd() 메소드를 호출하면 됩니다.
1) Dataset 생성
scala> val DS = Seq(Person("Andy", 32)).toDS()
DS: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
2) Dataset을 RDD로 변환
scala> DS.rdd
res11: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[15] at rdd at <console>:28
4.2 DataFrame과 Dataset의 상호 변환
1. DataFrame을 Dataset으로 변환
1) DataFrame 생성
scala> val df = spark.read.json("examples/src/main/resources/people.json")
df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
2) 케이스 클래스 정의
scala> case class Person(name: String, age: Long)
defined class Person
3) DataFrame을 Dataset으로 변환
scala> df.as[Person]
res14: org.apache.spark.sql.Dataset[Person] = [age: bigint, name: string]
2. Dataset을 DataFrame으로 변환
1) 케이스 클래스 정의
scala> case class Person(name: String, age: Long)
defined class Person
2) Dataset 생성
scala> val ds = Seq(Person("Andy", 32)).toDS()
ds: org.apache.spark.sql.Dataset[Person] = [name: string, age: bigint]
3) Dataset을 DataFrame으로 변환
scala> val df = ds.toDF
df: org.apache.spark.sql.DataFrame = [name: string, age: bigint]
4) 결과 확인
scala> df.show
+----+---+
|name|age|
+----+---+
|Andy| 32|
+----+---+
5. 데모 메소드 캡슐화
package com.spark.example
import org.apache.spark.sql.types.{IntegerType, StringType, StructField, StructType}
import org.apache.spark.sql.{Row, SparkSession}
/**
* DataFrame과 RDD의 상호작용 데모
*/
object SparkDataConversionDemo {
def main(args: Array[String]): Unit = {
val spark = SparkSession.builder().appName("SparkDataConversionDemo").master("local[2]").getOrCreate()
// 반사 방식으로 변환
// convertWithReflection(spark)
// 프로그래밍 방식으로 변환
// convertWithSchema(spark)
spark.stop()
}
// 프로그래밍 방식으로 변환
def convertWithSchema(spark: SparkSession): Unit = {
// RDD => DataFrame 변환
val rdd = spark.sparkContext.textFile("file:///data/users.txt")
// 1. Row 객체를 사용하여 RDD 생성
val userRDD = rdd.map(_.split(",")).map(row => Row(row(0).toInt, row(1), row(2).toInt))
// 2. StructType을 사용하여 스키마 정의
val schema = StructType(Array(StructField("id", IntegerType, true),
StructField("name", StringType, true),
StructField("age", IntegerType, true)))
// 3. createDataFrame 메소드로 스키마 적용
val userDataFrame = spark.createDataFrame(userRDD, schema)
userDataFrame.printSchema()
userDataFrame.show()
}
// 반사 방식으로 변환
private def convertWithReflection(spark: SparkSession) = {
// RDD => DataFrame 변환
val rdd = spark.sparkContext.textFile("file:///data/users.txt")
// 암시적 변환 임포트
import spark.implicits._
val userDataFrame = rdd.map(_.split(",")).map(row => UserData(row(0).toInt, row(1), row(2).toInt)).toDF()
userDataFrame.show()
// 나이가 20보다 큰 데이터 필터링
userDataFrame.filter(userDataFrame.col("age") > 20).show
// Spark SQL 방식으로 데이터 쿼리
userDataFrame.createOrReplaceTempView("user_data")
spark.sql("show tables").show()
spark.sql("select * from user_data where age > 20").show()
}
case class UserData(id: Int, name: String, age: Int)
}
참고: Spark 대데이터 처리 - DataFrame과 Dataset