성능 튜닝을 위한 핵심 기법
6.1 파티션 수 조정을 통한 병렬 처리 최적화
파티션 수는 Spark 애플리케이션의 병렬성과 리소스 사용 효율에 직접적인 영향을 미친다. 너무 적은 파티션은 클러스터 자원 활용도를 낮추고, 지나치게 많은 파티션은 스케줄링 오버헤드를 증가시킨다.
# HDFS 파일 로딩 시 초기 파티션 수 설정
log_rdd = sc.textFile("hdfs:///data/logs/access.log", minPartitions=20)
# 변환 후 파티션 수 조절: 병렬성을 높이기 위해 재분배
processed_rdd = cleaned_rdd.repartition(40)
# 출력 단계에서 파티션 병합: 과도한 소규모 출력 파일 방지
aggregated_result.coalesce(5).saveAsTextFile("/output/result")
6.2 Shuffle 최소화를 위한 고급 맵 연산 활용
Shuffle은 네트워크 I/O와 디스크 접근을 동반하므로 성능 저하의 주요 원인이다. mapPartitions를 사용하면 각 파티션 내에서 로컬 집계를 수행함으로써 키-값 쌍의 전송량을 줄일 수 있다.
def local_word_tally(iterator):
counts = {}
for sentence in iterator:
for token in sentence.strip().split():
if token:
counts[token] = counts.get(token, 0) + 1
return [(k, v) for k, v in counts.items()]
# 전체 단어 카운트 프로세스
token_rdd = raw_text_rdd.map(lambda line: line.lower())
count_rdd = token_rdd.mapPartitions(local_word_tally)
final_counts = count_rdd.reduceByKey(lambda a, b: a + b)
6.3 스토리지 전략 기반의 지속성 관리
반복적으로 사용되는 RDD는 메모리 또는 디스크에 캐싱하여 계산 비용을 절감할 수 있다. 특히 반복 루프 또는 여러 액션에서 동일한 데이터를 참조할 때 유용하다.
# 메모리 우선, 필요 시 디스크 백업 저장
frequent_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_SER)
# 작업 완료 후 캐시 해제로 메모리 확보
try:
result = frequent_rdd.collect()
finally:
frequent_rdd.unpersist()
일상적인 실패 사례와 그 대응 방안
7.1 드라이버 메모리 부족 (OOM)
collect()와 같은 액션은 모든 데이터를 드라이버 노드로 전송하므로 대용량 데이터셋에서 OutOfMemoryError를 유발할 수 있다.
- 해결책:
take(n)으로 상위 n개 항목만 추출 - 드라이버 힙 크기 증대:
--driver-memory 8g - 스트리밍 방식의 배치 처리 설계
7.2 불균형한 데이터 분포 (Data Skew)
특정 키에 데이터가 집중되어 일부 태스크만 오랜 시간 실행되는 현상. 이로 인해 전체 잡의 진행이 지연된다.
# Skew 완화를 위한 솔트 기법(Salting)
import random
# Step 1: 키에 무작위 접미사를 추가하여 분산
salted = raw_pairs.map(lambda kv: ((kv[0], random.randint(0, 4)), kv[1]))
# Step 2: 첫 번째 집계 (부분 집계)
partial = salted.reduceByKey(lambda a, b: a + b)
# Step 3: 솔트 제거 후 최종 집계
desalted = partial.map(lambda x: (x[0][0], x[1]))
final_skew_fixed = desalted.reduceByKey(lambda a, b: a + b)
7.3 다수의 소규모 출력 파일 생성
과도한 파티션이 saveAsTextFile과 같은 출력 작업에서 수백 개의 tiny file을 생성할 수 있으며, 이는 HDFS와 후속 처리에 부담을 준다.
# 출력 전 파티션 수를 의미 있게 조정
result_rdd.coalesce(3).saveAsTextFile("/final/report")
# 또는 목적에 맞는 repartition
balanced_output = result_rdd.repartition(8)
응용 사례: 문서별 TF-IDF 계산
8.1 문서 단위 단어 빈도 산출
각 문서에서 등장하는 단어의 빈도를 식별하기 위해 wholeTextFiles를 활용한다.
def compute_per_doc_frequency(sc, source_dir):
# (경로, 전체 내용) 형식의 RDD 생성
docs = sc.wholeTextFiles(source_dir)
# 문서별 단어 발생 횟수 카운팅
term_freq = docs.flatMap(lambda file:
[((file[0], word), 1) for word in file[1].split()]
).reduceByKey(lambda a, b: a + b)
return term_freq
8.2 TF-IDF 값 도출
Term Frequency-Inverse Document Frequency는 단어의 중요도를 평가하는 지표로, 일반적인 단어보다 드문 단어에 더 높은 가중치를 부여한다.
import math
def calculate_tfidf_scores(sc, data_path):
# 문서별 TF 계산
tf_rdd = compute_per_doc_frequency(sc, data_path)
# DF: 각 단어가 몇 개의 문서에 등장했는가?
doc_occurrence = tf_rdd.map(lambda x: (x[0][1], 1)).reduceByKey(add)
# 전체 문서 수
total_documents = sc.wholeTextFiles(data_path).count()
# TF × log(N/DF): TF-IDF 공식 적용
tfidf_rdd = tf_rdd.join(doc_occurrence) \
.mapValues(lambda values:
values[0] * math.log(total_documents / values[1])
)
return tfidf_rdd.map(lambda x: (x[0][0], x[0][1], x[1])) # (doc, word, score)