defload(paths: String*): DataFrame = { if (source.toLowerCase(Locale.ROOT) == DDLUtils.HIVE_PROVIDER) { thrownewAnalysisException("Hive data source can only be used with tables, you can not " + "read files of Hive data source directly.") }
val cls = DataSource.lookupDataSource(source, sparkSession.sessionState.conf) if (classOf[DataSourceV2].isAssignableFrom(cls)) { val ds = cls.newInstance().asInstanceOf[DataSourceV2] if (ds.isInstanceOf[ReadSupport]) { val sessionOptions = DataSourceV2Utils.extractSessionConfigs( ds = ds, conf = sparkSession.sessionState.conf) val pathsOption = { val objectMapper = newObjectMapper() DataSourceOptions.PATHS_KEY -> objectMapper.writeValueAsString(paths.toArray) } Dataset.ofRows(sparkSession, DataSourceV2Relation.create( ds, sessionOptions ++ extraOptions.toMap + pathsOption, userSpecifiedSchema = userSpecifiedSchema)) } else { loadV1Source(paths: _*) } } else { loadV1Source(paths: _*) } }
defresolveRelation(checkFilesExist: Boolean = true): BaseRelation = { // 每次创建数据源实现类的实例 val relation = (providingClass.newInstance(), userSpecifiedSchema) match { // TODO: Throw when too much is given. case (dataSource: SchemaRelationProvider, Some(schema)) => dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions, schema) case (dataSource: RelationProvider, None) => dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) case (_: SchemaRelationProvider, None) => thrownewAnalysisException(s"A schema needs to be specified when using $className.") case (dataSource: RelationProvider, Some(schema)) => val baseRelation = dataSource.createRelation(sparkSession.sqlContext, caseInsensitiveOptions) if (baseRelation.schema != schema) { thrownewAnalysisException(s"$className does not allow user-specified schemas.") } baseRelation
// We are reading from the results of a streaming query. Load files from the metadata log // instead of listing them using HDFS APIs. case (format: FileFormat, _) ifFileStreamSink.hasMetadata( caseInsensitiveOptions.get("path").toSeq ++ paths, sparkSession.sessionState.newHadoopConf()) => ... ... HadoopFsRelation( fileCatalog, partitionSchema = fileCatalog.partitionSchema, dataSchema = dataSchema, bucketSpec = None, format, caseInsensitiveOptions)(sparkSession)
// This is a non-streaming file based datasource. case (format: FileFormat, _) => val globbedPaths = checkAndGlobPathIfNecessary(checkEmptyGlobPath = true, checkFilesExist = checkFilesExist) val useCatalogFileIndex = sparkSession.sqlContext.conf.manageFilesourcePartitions && catalogTable.isDefined && catalogTable.get.tracksPartitionsInCatalog && catalogTable.get.partitionColumnNames.nonEmpty val (fileCatalog, dataSchema, partitionSchema) = if (useCatalogFileIndex) { val defaultTableSize = sparkSession.sessionState.conf.defaultSizeInBytes val index = newCatalogFileIndex( sparkSession, catalogTable.get, catalogTable.get.stats.map(_.sizeInBytes.toLong).getOrElse(defaultTableSize)) (index, catalogTable.get.dataSchema, catalogTable.get.partitionSchema) } else { val index = createInMemoryFileIndex(globbedPaths) val (resultDataSchema, resultPartitionSchema) = getOrInferFileFormatSchema(format, Some(index)) (index, resultDataSchema, resultPartitionSchema) }
deflookupDataSource(provider: String, conf: SQLConf): Class[_] = { val provider1 = backwardCompatibilityMap.getOrElse(provider, provider) match { case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "native" => classOf[OrcFileFormat].getCanonicalName case name if name.equalsIgnoreCase("orc") && conf.getConf(SQLConf.ORC_IMPLEMENTATION) == "hive" => "org.apache.spark.sql.hive.orc.OrcFileFormat" case"com.databricks.spark.avro"if conf.replaceDatabricksSparkAvroEnabled => "org.apache.spark.sql.avro.AvroFileFormat" case name => name } val provider2 = s"$provider1.DefaultSource" val loader = Utils.getContextOrSparkClassLoader // 加载所有注册为DataSourceRegister接口的类型 val serviceLoader = ServiceLoader.load(classOf[DataSourceRegister], loader)
try { // 首先,在注册为DataSourceRegister的类型中查找是否别名与请求的provider1有匹配的 serviceLoader.asScala.filter(_.shortName().equalsIgnoreCase(provider1)).toList match { // 请求的provider格式与注册的别名都不配置 caseNil => try { // 直接加载provider1和provider2所表示的类 Try(loader.loadClass(provider1)).orElse(Try(loader.loadClass(provider2))) match { caseSuccess(dataSource) => // Found the data source using fully qualified path dataSource caseFailure(error) => ... ... } } catch { ... ... } case head :: Nil => // 有准确的注册别名 head.getClass case sources => // 对于有多个注册别名与之配置的情况,如果只有一个类的是"org.apache.spark"开头,就使用这个Spark内部的类。 val sourceNames = sources.map(_.getClass.getName) val internalSources = sources.filter(_.getClass.getName.startsWith("org.apache.spark")) if (internalSources.size == 1) { logWarning(s"Multiple sources found for $provider1 (${sourceNames.mkString(", ")}), " + s"defaulting to the internal datasource (${internalSources.head.getClass.getName}).") internalSources.head.getClass } else { thrownewAnalysisException(s"Multiple sources found for $provider1 " + s"(${sourceNames.mkString(", ")}), please specify the fully qualified class name.") } } } catch { ... ... } }