Hive on Spark에서의 공유 상태 관리

Spark SQL에는 다음과 같은 클래스가 존재합니다:
org.apache.spark.sql.internal.SharedState
이 클래스는 주로 다음 기능들을 담당합니다:
  1. 메타데이터 경로 관리 (warehousePath)
  2. 쿼리 결과 캐시 관리 (cacheManager)
  3. 실행 상태 및 메트릭 모니터링 (statusStore)
  4. 외부 카탈로그 관리 (externalCatalog)
  5. 전역 임시 뷰 관리 (globalTempViewManager)

1. 메타데이터 경로 관리 (warehousePath)

스파크 SQL의 메타데이터 경로를 관리하는 부분입니다. 일반적으로 Hive를 기본 메타데이터 저장소로 사용하며, 이를 위해 "hive-site.xml" 파일을 로드하고 해당 설정에서 경로를 가져옵니다. 예를 들어, Hive의 메타데이터 디렉토리를 아래와 같이 지정할 수 있습니다:
hive.metastore.warehouse.dir
만약 Hive를 사용하지 않는 경우, 기본값으로 스파크 SQL의 내장 메타데이터 경로를 사용합니다. 이 과정은 아래 코드에서 보여집니다:

val warehousePath: String = {
  val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml")
  if (configFile != null) {
    sparkContext.hadoopConfiguration.addResource(configFile)
  }

  val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir")
  if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) {
    sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir)
  } else {
    val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH)
    sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir)
  }
  sparkWarehouseDir
}

2. 쿼리 결과 캐시 관리 (CacheManager)

쿼리 결과를 캐싱하여 재사용할 수 있도록 관리합니다. 이를 통해 이미 실행된 쿼리의 결과를 다시 계산하지 않고 사용할 수 있습니다. 예제 코드는 다음과 같습니다:

val cacheManager: CacheManager = new CacheManager

3. 실행 상태 및 메트릭 모니터링 (statusStore)

애플리케이션의 SQL 상태와 메트릭 정보를 추적합니다. 이를 위해 별도의 리스너를 사용하며, 동시성 문제를 방지하기 위해 CopyOnWriteArrayList와 같은 스레드 세이프 컨테이너를 활용합니다. 구현 예시:

val statusStore: SQLAppStatusStore = {
  val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore]
  val listener = new SQLAppStatusListener(sparkContext.conf, kvStore, live = true)
  sparkContext.listenerBus.addToStatusQueue(listener)
  new SQLAppStatusStore(kvStore, Some(listener))
}

4. 외부 카탈로그 관리 (externalCatalog)

외부 시스템과 상호작용하는 카탈로그를 정의합니다. 기본 데이터베이스가 존재하지 않으면 자동으로 생성하며, 카탈로그 이벤트를 리스너 버스에 전달합니다. 코드 예시:

lazy val externalCatalog: ExternalCatalog = {
  val catalogImpl = SharedState.reflect[ExternalCatalog, SparkConf, Configuration](
    SharedState.externalCatalogClassName(sparkContext.conf),
    sparkContext.conf,
    sparkContext.hadoopConfiguration)

  val defaultDbDefinition = CatalogDatabase(
    SessionCatalog.DEFAULT_DATABASE,
    "default database",
    CatalogUtils.stringToURI(warehousePath),
    Map())

  if (!catalogImpl.databaseExists(SessionCatalog.DEFAULT_DATABASE)) {
    catalogImpl.createDatabase(defaultDbDefinition, ignoreIfExists = true)
  }

  catalogImpl.addListener(new ExternalCatalogEventListener {
    override def onEvent(event: ExternalCatalogEvent): Unit = {
      sparkContext.listenerBus.post(event)
    }
  })

  catalogImpl
}

5. 전역 임시 뷰 관리 (globalTempViewManager)

스파크 실행 중 발생하는 임시 데이터베이스가 외부 카탈로그에 충돌하지 않도록 관리합니다. GLOBAL_TEMP_DATABASE가 외부 카탈로그에 존재하면 에러를 발생시킵니다. 구현 예제:

lazy val globalTempViewManager: GlobalTempViewManager = {
  val tempDBName = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT)
  if (externalCatalog.databaseExists(tempDBName)) {
    throw new SparkException(
      s"$tempDBName is a reserved database name. Please rename your existing database or set a different value for ${GLOBAL_TEMP_DATABASE.key}.")
  }
  new GlobalTempViewManager(tempDBName)
}

태그: SparkSQL HiveIntegration SharedState

5월 24일 11:33에 게시됨