저는 현재 작업 플랫폼 프로젝트에서 Spark 1.6.1 버전을 사용하고 있었으며, 주로 Python으로 개발했습니다.
현재 Spark 1.6.1 버전에서 2.4.6 버전으로 직접 업그레이드하는 작업을 진행했으며, 이 과정에서 여러 문제가 발생하여 이를 기록합니다:
- 문법 호환성 문제
데이터 플랫폼 작업은 일별 작업과 시간별 작업으로 나뉘며, 처리된 데이터를 Hive 테이블(분구)에 씁니다.
예를 들어:
1.6 버전에서 사용한 최종 적용 문법은:
source.write.format("orc").partitionBy(%s).insertInto("%s.%s", True)
2.4로 업그레이드 후 다음과 같은 오류가 발생했습니다:
insertInto() can't be used together with partitionBy()
Spark 2.0 이후부터 insertInto 자체가 삽입할 테이블이 분구되어 있다고 간주하기 때문에 partitionBy를 사용할 필요가 없게 되었습니다.
하지만 저희 테이블은 분구 삽입이 필요했습니다. 예를 들어:
CREATE EXTERNAL TABLE `ad.adwise_ad_order`(
`sdate` int COMMENT '날짜',
`order_id` string COMMENT '광고 주문 ID',
`req_num` bigint COMMENT '광고 요청량',
`imp_filter_pv` bigint COMMENT '광고 노출 필터 PV',
`click_filter_pv` bigint COMMENT '광고 클릭 필터 PV',
`imp_num` bigint COMMENT '광고 노출량',
`vis_req_num` bigint COMMENT '광고 가시 요청량',
`vis_imp_num` bigint COMMENT '광고 가시 노출량',
`vis_display_num` bigint COMMENT '광고 가시 노출 횟수',
`click_num` bigint COMMENT '광고 클릭량',
`lands_num` bigint COMMENT '광고 리드 수',
`req_uv` bigint COMMENT '광고 요청 UV',
`imp_uv` bigint COMMENT '광고 노출 UV',
`imp_login_uv` bigint COMMENT '광고 노출 회원 수',
`vis_req_uv` bigint COMMENT '광고 가시 요청 UV',
`vis_imp_uv` bigint COMMENT '광고 가시 노출 UV',
`vis_imp_login_uv` bigint COMMENT '광고 가시 노출 회원 수',
`lands_uv` bigint COMMENT '광고 리드 UV',
`click_uv` bigint COMMENT '광고 클릭 UV',
`lands_login_uv` bigint COMMENT '광고 리드 회원 수')
PARTITIONED BY (
`dt` string)
ROW FORMAT SERDE
'org.apache.hadoop.hive.ql.io.orc.OrcSerde'
WITH SERDEPROPERTIES (
'colelction.delim'=',',
'field.delim'='\t',
'line.delim'='\n',
'mapkey.delim'=':',
'serialization.format'='\t')
STORED AS INPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcInputFormat'
OUTPUTFORMAT
'org.apache.hadoop.hive.ql.io.orc.OrcOutputFormat'
LOCATION
'viewfs://AutoLfCluster/team/ad/pre/adwise_ad_order'
TBLPROPERTIES (
'transient_lastDdlTime'='1512551282')
View Code### 1.1、saveAsTable로 인한 모든 분구 덮어쓰기
API를 확인한 결과 partitionBy와 saveAsTable은 조합할 수 있음을 발견하고, 코드를 다음과 같이 변경했습니다:
df.write.mode("override").partitionBy(['dt', 'hour']).saveAsTable("XXXXX")
임시 테이블로 테스트하지 않았기 때문에 문제없이 실행된다고 생각했지만, 다음 날 테이블의 분구가 덮어쓰기된 것을 발견했습니다.
ps. 여기서 덮어쓰기된 분구 파일을 복원하는 스크립트를 제공합니다:
#! /bin/bash
# 역순으로 날짜 순회
# 전달된 순회 시작 시간과 종료 시간
startdate="$1"
enddate="$2"
echo 'startdate: '$startdate
echo 'enddate: '$enddate
echo "-----------------------------------"
# 1-300 시퀀스, 300번 순회, 종료 시간 제한으로 실제로는 300번 순회하지 않음
for i in `seq 1 300`; do
# 시작 시간이 종료 시간보다 작으면 스크립트 종료
if [[ $startdate -lt $enddate ]]; then
break
fi
echo $startdate
# hiveSQL 스크립트 실행, 저는 날짜별로 hiveSQL을 실행해야 했으므로 이 부분은 무시해도 됩니다
hive -e "alter table ad.clues_unable_distribute add partition(dt='$startdate');"
# 실행할 때마다 시작 날짜를 하루 빼기, 순서대로 하려면 아래 -1을 +1로 변경하고 시작/종료 시간도 교체
startdate=$(date -d "$startdate -1 day" +%Y%m%d)
done
View Code### 1.2、saveAsTable으로 인한 메타데이터 불일치 문제
위에서 테이블 분구를 덮어쓰고 스크립트로 분구를 복원한 후, 코드를 롤백하고 1.6 버전으로 되돌렸습니다. 그런데 업무 담당자 중 한 명이 tableA에 column1이라는 필드를 추가했습니다.
업무 담당자가 작성한 SQL은 Hive 클라이언트에서 완벽하게 실행되었기 때문에 코드를 제출하고 배포했습니다...
그런데 해당 작업이 계속해서 오류를 발생시켰고, 오류 내용은 새로 추가된 column1이 tableA 테이블에서 해당 열을 찾을 수 없다는 것이었습니다.
그리고 Zeppelin에서 업무 SQL을 실행해도 여전히 column1 열을 찾을 수 없었습니다. 이는 Spark 메타데이터와 Hive 메타데이터가 불일치함을 의미합니다!
Jira에서 데이터 삽입이 완료된 후(insertInto) 테이블을 새로 고쳐야 한다고 제안했습니다.
API는 다음과 같습니다:
spark.catalog.refreshTable()
다음과 같은 순서로 배치했습니다:
1 df.write.option("nullValue", "").mode("overwrite").insertInto("%s.%s",True)
2 spark.catalog.refreshTable()
하지만 작동하지 않았습니다.
다음을 시도해 보았습니다:
1 df.write.option("nullValue", "").mode("overwrite").insertInto("%s.%s",True)
2 spark.sql("alter table xxx add if not exists partition(a=b,c=d)")
이 모든 방법이 통하지 않았습니다. 결국 saveAsTable이 이전 데이터를 삭제하고 덮어썼기 때문이라는 것을 발견했습니다. 이로 인해 해당 테이블이 특별하게 되어 Hive 메타 정보를 동기화할 수 없었습니다.
따라서 테이블을 삭제하고 외부 테이블을 다시 생성한 후 분구를 다시 연결했습니다. 이렇게 하면 새로 추가된 필드가 나타납니다. 참고: 위 문제는 saveAsTable이 테이블을 덮어쓰기 때문에 발생했습니다.
1.3、InsertInto(db.table , false)로 인한 데이터 왜곡
위 방법이 통하지 않자, 삽입 문장을 다음과 같이 변경했습니다:
InsertInto(db.table , false)
두 번째 인자가 false인 경우 덮어쓰지 않는다는 의미입니다. 그런데 데이터 분구가 덮어쓰기되지 않지만 모든 데이터가 하나의 분구로 이동하는 문제가 발생했습니다. 추가 모드이기 때문에 데이터 왜곡이 발생했습니다.
1.4、최종 해결 방법
최종적으로 해결된 방법의 코드를 직접 공유합니다. 핵심은 다음과 같습니다:
spark.sql.sources.partitionOverwriteMode
최종 삽입 코드:
#!/usr/bin/python
# -*- coding:utf-8 -*-
# describe: local csv to hive orc
# create:%s
import os
os.environ['SPARK_HOME']="/data/sysdir/servers/spark-2.4.6-bin-2.7.2-scala2.11"
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql import HiveContext
import sys
reload(sys)
sys.setdefaultencoding( "utf-8" )
spark = SparkSession.builder.appName("%s").config("spark.sql.sources.partitionOverwriteMode","dynamic").enableHiveSupport().getOrCreate()
hiveContext = HiveContext(spark)
hiveContext.sql("SET hive.exec.dynamic.partition = true")
hiveContext.sql("SET hive.exec.dynamic.partition.mode = nonstrict")
sink = hiveContext.table("%s.%s")
df = hiveContext.read.format("com.autohome.databricks.spark.csv")%s.option("treatEmptyValuesAsNulls", "true").option("header","false").option('delimiter', '\\t').load("file://%s", schema=sink.schema).repartition(1)
# 분구 값 추가 이전에는 비어있음
for i in zip([%s],[%s]):
df = df.withColumn(i[0], when(df[i[0]] != i[1], i[1]).otherwise(i[1]))
df.write.option("nullValue", "").mode("overwrite").insertInto("%s.%s",True)
spark.sparkContext.stop()
''' % (date, db+'.'+table+'.'+date,db, table,nullDealingStr,datafilepath,partitions, pvalues, db, table)
insertInto와 saveAsTable의 차이점
여기서 두 API의 차이점을 간단히 설명합니다:
먼저 API 소스 코드 소개를 살펴보겠습니다:
1):insertInto
insertInto의 공식 선언
위 선언에는 다음과 같은 2가지 정보가 포함되어 있습니다:
1、spark-sql 데이터를 삽입할 때 DataFrame을 사용하는데, 이 DataFrame의 schema는 대상 테이블(삽입할 테이블)의 schema 정보와 일치해야 합니다
2、insertInto와 saveAsTable은 다릅니다. insertInto는 위치 기반으로 데이터를 삽입합니다
위 두 선언은 혼란스러워 보입니다. 하지만 한 가지 기억하면 이해할 수 있습니다:
insertInto를 사용하는 전제 조건은 해당 테이블이 존재하는 것입니다. 이 테이블을 기반으로 삽입합니다
saveAsTable은 해당 테이블의 존재 여부에 의존하지 않습니다. saveAsTable은 데이터를 쓸 때 필드 이름을 기반으로 일치시켜 삽입합니다
2、spark2.0 이후 pyspark로 스크립트 제출 문제
많은 작업이 pyspark를 사용하여 제출됩니다. 하지만 2.0 이후부터 Spark는 pyspark로 스크립트를 제출하는 것을 지원하지 않으므로, pyspark를 spark-submit으로 통일해야 합니다.
3、phoenix 버전 호환성 문제
재컴파일하면 해결됩니다. 검색하면 많은 정보가 나옵니다.
4、csv 파일 Map 지원 문제
해결 링크: https://www.cnblogs.com/niutao/p/13674489.html
git 코드 주소: https://github.com/niutaofan/pareCSV.git
5、CSV 처리 시 이중 따옴표"로 인해 형식을 인식할 수 없는 문제
"csv 파일 Map 지원 문제" 해결 방식과 동일합니다. 발생하는 문제는 Spark가 csv 파일을 처리하는 코드, 주로 commons-csv 코드입니다.
사용자 정의 데이터 소스를 통해 Spark가 csv 파일을 스캔할 때 다음과 같은 코드가 사용됩니다:
tokenRdd를 사용하여 각 행 데이터를 스캔하여 RDD를 생성하며, 문제는 이 메소드에서 발생합니다:
위 parseCSV는 각 csv 행 데이터를 파싱한 후 RDD를 반환합니다
특수 문자를 파싱할 수 없는 오류는 이 메소드에서 발생합니다:
311번 코드 CSVParser.parse(line, csvFormat).getRecords를 보면, line은 각 행 데이터입니다.
저는 csv 데이터에 일부 불법 문자가 포함된 문자열과 유사한 오류가 발생했습니다:
따라서 이 한 줄 코드를 간단히 수정하면 됩니다. 예를 들어:
이렇게 하면 csv 각 행의 불법 이중 따옴표 문제를 해결할 수 있습니다!