Spark SQL에는 다음과 같은 클래스가 존재합니다:
이 클래스는 주로 다음 기능들을 담당합니다:
만약 Hive를 사용하지 않는 경우, 기본값으로 스파크 SQL의 내장 메타데이터 경로를 사용합니다. 이 과정은 아래 코드에서 보여집니다:
org.apache.spark.sql.internal.SharedState
- 메타데이터 경로 관리 (warehousePath)
- 쿼리 결과 캐시 관리 (cacheManager)
- 실행 상태 및 메트릭 모니터링 (statusStore)
- 외부 카탈로그 관리 (externalCatalog)
- 전역 임시 뷰 관리 (globalTempViewManager)
1. 메타데이터 경로 관리 (warehousePath)
스파크 SQL의 메타데이터 경로를 관리하는 부분입니다. 일반적으로 Hive를 기본 메타데이터 저장소로 사용하며, 이를 위해 "hive-site.xml" 파일을 로드하고 해당 설정에서 경로를 가져옵니다. 예를 들어, Hive의 메타데이터 디렉토리를 아래와 같이 지정할 수 있습니다:
hive.metastore.warehouse.dir
val warehousePath: String = {
val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
if (configFile != null) {
sparkContext.hadoopConfiguration.addResource(configFile)
}
val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
} else {
val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
}
sparkWarehouseDir
}
2. 쿼리 결과 캐시 관리 (CacheManager)
쿼리 결과를 캐싱하여 재사용할 수 있도록 관리합니다. 이를 통해 이미 실행된 쿼리의 결과를 다시 계산하지 않고 사용할 수 있습니다. 예제 코드는 다음과 같습니다:
val cacheManager: CacheManager = new CacheManager
3. 실행 상태 및 메트릭 모니터링 (statusStore)
애플리케이션의 SQL 상태와 메트릭 정보를 추적합니다. 이를 위해 별도의 리스너를 사용하며, 동시성 문제를 방지하기 위해 CopyOnWriteArrayList와 같은 스레드 세이프 컨테이너를 활용합니다. 구현 예시:
val statusStore: SQLAppStatusStore = {
val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
sparkContext.listenerBus.addToStatusQueue(listener)
new SQLAppStatusStore(kvStore, Some(listener))
}
4. 외부 카탈로그 관리 (externalCatalog)
외부 시스템과 상호작용하는 카탈로그를 정의합니다. 기본 데이터베이스가 존재하지 않으면 자동으로 생성하며, 카탈로그 이벤트를 리스너 버스에 전달합니다. 코드 예시:
lazy val externalCatalog: ExternalCatalog = {
val catalogImpl = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
SharedState.externalCatalogClassName(sparkContext.conf),
sparkContext.conf,
sparkContext.hadoopConfiguration)
val defaultDbDefinition = CatalogDatabase(
SessionCatalog.DEFAULT_DATABASE,
"default database",
CatalogUtils.stringToURI(warehousePath),
Map())
if (!catalogImpl.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
catalogImpl.createDatabase(defaultDbDefinition, ignoreIfExists = true)
}
catalogImpl.addListener(new ExternalCatalogEventListener {
override def onEvent(event: ExternalCatalogEvent): Unit = {
sparkContext.listenerBus.post(event)
}
})
catalogImpl
}
5. 전역 임시 뷰 관리 (globalTempViewManager)
스파크 실행 중 발생하는 임시 데이터베이스가 외부 카탈로그에 충돌하지 않도록 관리합니다. GLOBAL_TEMP_DATABASE가 외부 카탈로그에 존재하면 에러를 발생시킵니다. 구현 예제:
lazy val globalTempViewManager: GlobalTempViewManager = {
val tempDBName = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
if (externalCatalog.databaseExists(tempDBName)) {
throw new SparkException(
s"$tempDBName is a reserved database name. Please rename your existing database or set a different value for ${GLOBAL_TEMP_DATABASE.key}.")
}
new GlobalTempViewManager(tempDBName)
}