MapReduce 아키텍처의 핵심 원리
MapReduce는 대규모 데이터 처리를 위한 분산 컴퓨팅 모델로, 데이터를 분할 → 처리 → 집계의 세 단계로 나누어 처리한다. 주요 단계는 다음과 같다:
- Map 단계: 입력 데이터를 키-값 쌍으로 변환하고, 각 요소에 대해 독립적인 연산 수행.
- Shuffle & Sort 단계: 동일한 키를 가진 값들을 그룹화하여 리듀서로 전달.
- Reduce 단계: 그룹화된 데이터를 기반으로 최종 결과 생성.
이 과정에서 데이터 이동량 최소화와 스케일링 가능한 파티셔닝 전략이 성능 결정 요소다.
Spark 기반 분산 처리 개발 팁
Spark는 RDD(Resilient Distributed Dataset) 기반의 추상화를 통해 복잡한 분산 로직을 간결하게 표현할 수 있다. 핵심은 데이터 흐름 중심 설계이다.
예시: 사용자-아이템 상호작용 데이터에서 두 아이템 간 유사도 계산
# 잘못된 접근: 동적으로 모든 사용자 리스트를 비교
def calculate_similarity_bad(item1, item2):
users = user_item.filter(lambda x: x[0] in [item1, item2])
intersection = set(users.map(lambda x: x[1]).collect()) # 메모리 오버 위험
return len(intersection) / (len(users) - len(intersection))
보편적인 방법은 집합 연산을 공식화하는 것이다:
- 교집합: 각 사용자가 소유한 아이템 쌍을 생성해 집계
- 합집합: 두 아이템의 사용자 수 합 - 교집합
# 효율적인 구현 예시
item_user_pairs = user_item.map(lambda x: ((x[1], x[0]), 1)) \
.reduceByKey(lambda a, b: a + b)
# 교집합 계산
intersection_count = item_user_pairs.filter(lambda k, v: k[0] == item1 and k[1] == item2).first()[1]
성능 최적화 전략
- 불필요한 단계 제거
reduceByKey를 과도하게 사용하면 스테이지 수 증가 →combineByKey또는aggregateByKey활용cache()나persist()로 자주 재사용되는 RDD 저장
- 파티션 조정
- 데이터 크기와 노드 리소스에 맞춰
repartition(n)또는coalesce(n, shuffle=False)적용 - 작은 파일 수 출력을 위해
coalesce사용 → 쓰기 작업 병렬도 감소
- 데이터 크기와 노드 리소스에 맞춰
- 병렬성 확장
yield를 활용한 제네레이터 +repartition조합으로 실행 병렬도 획기적 향상 (최대 40배 이상 성능 향상 가능)
YARN 환경에서의 모니터링 및 디버깅
spark-submit 명령어 예시:
/usr/lib/spark/bin/spark-submit \
--master yarn \
--deploy-mode client \
--name "dodng-xxx" \
--queue default \
--num-executors 6 \
--driver-memory 4g \
--executor-memory 4g \
--executor-cores 4 \
xxx.py $(date +%Y-%m-%d -d -39day) $(date +%Y-%m-%d -d -3day) $(date +%Y%m%d -d -0day)
실행 상태 확인 방법:
- 노드 주소를 기반으로 직접
ssh접속하여 컨테이너 상태 확인 - 예:
ssh ng9a81a2a-core-instance-hz5sqddm-2.novalocal - 메모리 할당 분석: 드라이버 1개 + 실행자 6개 → 4GB × 7 = 28GB 예상, 실제 12~16GB 사용 → 추가 메모리 요구 고려 필요
Spark vs Flink: 선택 기준
- 성능: Flink의 스트리밍 처리 능력이 Spark보다 높지만, 배치 처리에서는 차이 미미
- 생태계: Spark의 커뮤니티 규모, 문서, 도구 생태계가 더 발달됨
- 사용 목적: 실시간 처리 필요 시 Flink, 배치 중심 시 Spark 선호
주의사항: 전역 변수 사용 시 주의
전역 변수에 collect()로 가져온 데이터를 저장하는 방식은 비효율적일 수 있음. 예:
item_freq = user_item.map(lambda x: (x[1][0], 1)) \
.reduceByKey(lambda a, b: a + b) \
.collect()
global g_item_freq_d # 전역 변수에 저장
→ 이 경우 메모리 부하 및 네트워크 오버헤드 발생. 대안으로 broadcast() 사용 권장.