// 解析metastore warehouse路径 val warehousePath: String = { // 将hive-site.xml加载到hadoopConfiguration配置中 val configFile = Utils.getContextOrSparkClassLoader.getResource("hive-site.xml") if (configFile != null) { logInfo(s"loading hive config file: $configFile") sparkContext.hadoopConfiguration.addResource(configFile) }
// hive.metastore.warehouse.dir only stay in hadoopConf sparkContext.conf.remove("hive.metastore.warehouse.dir") // Set the Hive metastore warehouse path to the one we use val hiveWarehouseDir = sparkContext.hadoopConfiguration.get("hive.metastore.warehouse.dir") if (hiveWarehouseDir != null && !sparkContext.conf.contains(WAREHOUSE_PATH.key)) { // 如果设置了hive.metastore.warehouse.dir,没有设置spark.sql.warehouse.dir // 将hive.metastore.warehouse.dir赋值给spark.sql.warehouse.dir sparkContext.conf.set(WAREHOUSE_PATH.key, hiveWarehouseDir) logInfo(s"${WAREHOUSE_PATH.key} is not set, but hive.metastore.warehouse.dir " + s"is set. Setting ${WAREHOUSE_PATH.key} to the value of " + s"hive.metastore.warehouse.dir ('$hiveWarehouseDir').") hiveWarehouseDir } else { // 如果设置了spark.sql.warehouse.dir,将其覆盖hive.metastore.warehouse.dir val sparkWarehouseDir = sparkContext.conf.get(WAREHOUSE_PATH) logInfo(s"Setting hive.metastore.warehouse.dir ('$hiveWarehouseDir') to the value of " + s"${WAREHOUSE_PATH.key} ('$sparkWarehouseDir').") sparkContext.hadoopConfiguration.set("hive.metastore.warehouse.dir", sparkWarehouseDir) sparkWarehouseDir } } logInfo(s"Warehouse path is '$warehousePath'.")
/** * 缓存查询结果的类 */ val cacheManager: CacheManager = newCacheManager
/** * A status store to query SQL status/metrics of this Spark application, based on SQL-specific * [[org.apache.spark.scheduler.SparkListenerEvent]]s. */ val statusStore: SQLAppStatusStore = { val kvStore = sparkContext.statusStore.store.asInstanceOf[ElementTrackingStore] val listener = newSQLAppStatusListener(sparkContext.conf, kvStore, live = true) sparkContext.listenerBus.addToStatusQueue(listener) val statusStore = newSQLAppStatusStore(kvStore, Some(listener)) sparkContext.ui.foreach(newSQLTab(statusStore, _)) statusStore }
val defaultDbDefinition = CatalogDatabase( SessionCatalog.DEFAULT_DATABASE, "default database", CatalogUtils.stringToURI(warehousePath), // metastore warehouse路径 Map()) // 判断default库是否存在,如果不存在,则通过catalog API在外部系统创建出来 if (!externalCatalog.databaseExists(SessionCatalog.DEFAULT_DATABASE)) { // There may be another Spark application creating default database at the same time, here we // set `ignoreIfExists = true` to avoid `DatabaseAlreadyExists` exception. externalCatalog.createDatabase(defaultDbDefinition, ignoreIfExists = true) }
// Wrap to provide catalog events val wrapped = newExternalCatalogWithListener(externalCatalog)
// Make sure we propagate external catalog events to the spark listener bus wrapped.addListener(newExternalCatalogEventListener { overridedefonEvent(event: ExternalCatalogEvent): Unit = { sparkContext.listenerBus.post(event) } })
wrapped }
/** * 全局临时视图管理器 */ lazyval globalTempViewManager: GlobalTempViewManager = { // metastore中不应该存在系统维护的数据库(spark.sql.globalTempDatabase,默认为global_temp) val globalTempDB = sparkContext.conf.get(GLOBAL_TEMP_DATABASE).toLowerCase(Locale.ROOT) if (externalCatalog.databaseExists(globalTempDB)) { thrownewSparkException( s"$globalTempDB is a system preserved database, please rename your existing database " + "to resolve the name conflict, or set a different value for " + s"${GLOBAL_TEMP_DATABASE.key}, and launch your Spark application again.") } newGlobalTempViewManager(globalTempDB) }