RDD 성능 최적화 및 실용적인 문제 해결 전략

성능 튜닝을 위한 핵심 기법

6.1 파티션 수 조정을 통한 병렬 처리 최적화

파티션 수는 Spark 애플리케이션의 병렬성과 리소스 사용 효율에 직접적인 영향을 미친다. 너무 적은 파티션은 클러스터 자원 활용도를 낮추고, 지나치게 많은 파티션은 스케줄링 오버헤드를 증가시킨다.

# HDFS 파일 로딩 시 초기 파티션 수 설정
log_rdd = sc.textFile("hdfs:///data/logs/access.log", minPartitions=20)

# 변환 후 파티션 수 조절: 병렬성을 높이기 위해 재분배
processed_rdd = cleaned_rdd.repartition(40)

# 출력 단계에서 파티션 병합: 과도한 소규모 출력 파일 방지
aggregated_result.coalesce(5).saveAsTextFile("/output/result")

6.2 Shuffle 최소화를 위한 고급 맵 연산 활용

Shuffle은 네트워크 I/O와 디스크 접근을 동반하므로 성능 저하의 주요 원인이다. mapPartitions를 사용하면 각 파티션 내에서 로컬 집계를 수행함으로써 키-값 쌍의 전송량을 줄일 수 있다.

def local_word_tally(iterator):
    counts = {}
    for sentence in iterator:
        for token in sentence.strip().split():
            if token:
                counts[token] = counts.get(token, 0) + 1
    return [(k, v) for k, v in counts.items()]

# 전체 단어 카운트 프로세스
token_rdd = raw_text_rdd.map(lambda line: line.lower())
count_rdd = token_rdd.mapPartitions(local_word_tally)
final_counts = count_rdd.reduceByKey(lambda a, b: a + b)

6.3 스토리지 전략 기반의 지속성 관리

반복적으로 사용되는 RDD는 메모리 또는 디스크에 캐싱하여 계산 비용을 절감할 수 있다. 특히 반복 루프 또는 여러 액션에서 동일한 데이터를 참조할 때 유용하다.

# 메모리 우선, 필요 시 디스크 백업 저장
frequent_rdd.persist(storageLevel=StorageLevel.MEMORY_AND_DISK_SER)

# 작업 완료 후 캐시 해제로 메모리 확보
try:
    result = frequent_rdd.collect()
finally:
    frequent_rdd.unpersist()

일상적인 실패 사례와 그 대응 방안

7.1 드라이버 메모리 부족 (OOM)

collect()와 같은 액션은 모든 데이터를 드라이버 노드로 전송하므로 대용량 데이터셋에서 OutOfMemoryError를 유발할 수 있다.

  • 해결책: take(n)으로 상위 n개 항목만 추출
  • 드라이버 힙 크기 증대: --driver-memory 8g
  • 스트리밍 방식의 배치 처리 설계

7.2 불균형한 데이터 분포 (Data Skew)

특정 키에 데이터가 집중되어 일부 태스크만 오랜 시간 실행되는 현상. 이로 인해 전체 잡의 진행이 지연된다.

# Skew 완화를 위한 솔트 기법(Salting)
import random

# Step 1: 키에 무작위 접미사를 추가하여 분산
salted = raw_pairs.map(lambda kv: ((kv[0], random.randint(0, 4)), kv[1]))

# Step 2: 첫 번째 집계 (부분 집계)
partial = salted.reduceByKey(lambda a, b: a + b)

# Step 3: 솔트 제거 후 최종 집계
desalted = partial.map(lambda x: (x[0][0], x[1]))
final_skew_fixed = desalted.reduceByKey(lambda a, b: a + b)

7.3 다수의 소규모 출력 파일 생성

과도한 파티션이 saveAsTextFile과 같은 출력 작업에서 수백 개의 tiny file을 생성할 수 있으며, 이는 HDFS와 후속 처리에 부담을 준다.

# 출력 전 파티션 수를 의미 있게 조정
result_rdd.coalesce(3).saveAsTextFile("/final/report")

# 또는 목적에 맞는 repartition
balanced_output = result_rdd.repartition(8)

응용 사례: 문서별 TF-IDF 계산

8.1 문서 단위 단어 빈도 산출

각 문서에서 등장하는 단어의 빈도를 식별하기 위해 wholeTextFiles를 활용한다.

def compute_per_doc_frequency(sc, source_dir):
    # (경로, 전체 내용) 형식의 RDD 생성
    docs = sc.wholeTextFiles(source_dir)
    
    # 문서별 단어 발생 횟수 카운팅
    term_freq = docs.flatMap(lambda file: 
        [((file[0], word), 1) for word in file[1].split()]
    ).reduceByKey(lambda a, b: a + b)
    
    return term_freq

8.2 TF-IDF 값 도출

Term Frequency-Inverse Document Frequency는 단어의 중요도를 평가하는 지표로, 일반적인 단어보다 드문 단어에 더 높은 가중치를 부여한다.

import math

def calculate_tfidf_scores(sc, data_path):
    # 문서별 TF 계산
    tf_rdd = compute_per_doc_frequency(sc, data_path)
    
    # DF: 각 단어가 몇 개의 문서에 등장했는가?
    doc_occurrence = tf_rdd.map(lambda x: (x[0][1], 1)).reduceByKey(add)
    
    # 전체 문서 수
    total_documents = sc.wholeTextFiles(data_path).count()
    
    # TF × log(N/DF): TF-IDF 공식 적용
    tfidf_rdd = tf_rdd.join(doc_occurrence) \
                      .mapValues(lambda values: 
                          values[0] * math.log(total_documents / values[1])
                      )
    
    return tfidf_rdd.map(lambda x: (x[0][0], x[0][1], x[1]))  # (doc, word, score)

태그: Spark RDD 성능최적화 데이터스킬 Tuning

6월 8일 00:46에 게시됨