PySpark에서 Spark 작업 환경을 초기화하고 버전 정보를 확인하는 방법은 다음과 같습니다.
# 필요한 모듈 가져오기
from pyspark import SparkConf, SparkContext
# Spark 설정 객체 생성
config = SparkConf().setMaster("local[*]").setAppName("example_app")
# SparkContext 생성
context = SparkContext(conf=config)
# 현재 PySpark 버전 출력
print(context.version)
# 작업 종료
context.stop()
PySpark 프로그래밍 구조
SparkContext는 PySpark의 모든 기능을 시작하는 핵심 객체입니다. 일반적인 작업 흐름은 아래와 같습니다:
- SparkContext를 통해 데이터 입력
- 입력된 데이터는 RDD로 변환되어 처리
- RDD의 메서드를 사용해 결과 출력
데이터 입력 방식
Spark에 로드되는 모든 데이터는 RDD(Resilient Distributed Dataset) 형식으로 관리됩니다. RDD는 내결함성을 갖춘 분산 데이터 집합으로, 다양한 데이터 소스를 지원합니다.
파이썬 컨테이너를 RDD로 변환하기
SparkContext의 parallelize() 메서드를 사용하면 파이썬의 여러 자료형을 RDD로 변환할 수 있습니다:
- List
- Tuple
- Set
- Dictionary (키만 포함)
- String (문자 단위로 분할)
from pyspark import SparkConf, SparkContext
# Spark 초기화
conf = SparkConf().setMaster("local[*]").setAppName("data_to_rdd")
sc = SparkContext(conf=conf)
# 다양한 파이썬 데이터 구조 정의
sample_list = [10, 20, 30]
sample_tuple = (40, 50, 60)
sample_string = "hello"
sample_dict = {"a": 1, "b": 2}
sample_set = {70, 80, 90}
# 각각을 RDD로 변환
rdd_from_list = sc.parallelize(sample_list)
rdd_from_tuple = sc.parallelize(sample_tuple)
rdd_from_string = sc.parallelize(sample_string)
rdd_from_dict = sc.parallelize(sample_dict.keys())
rdd_from_set = sc.parallelize(sample_set)
# 결과 출력
print(rdd_from_list.collect())
print(rdd_from_tuple.collect())
print(rdd_from_string.collect())
print(rdd_from_dict.collect())
print(rdd_from_set.collect())
sc.stop()
파일로부터 RDD 생성하기
텍스트 파일을 읽어 RDD로 변환할 수도 있습니다. textFile() 메서드를 사용합니다.
from pyspark import SparkConf, SparkContext
# Spark 초기화
conf = SparkConf().setMaster("local[*]").setAppName("file_to_rdd")
sc = SparkContext(conf=conf)
# 파일 경로 지정 및 RDD 생성
file_path = "D:/sample_data.txt"
file_rdd = sc.textFile(file_path)
# 파일 내용 출력
print(file_rdd.collect())
sc.stop()