本篇内容介绍了“DataSourceV2流处理方法是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!
SparkSession结构化流处理最后其实是通过DataSet的writeStream触发执行的。这点与传统的spark sql方式是不一样的。writeStream会找到StreamingQueryManager的startQuery方法,然后一步步到MicroBatchExecution和ContinuousExecution。
核心点:MicroBatchExecution和ContinuousExecution里面会对StreamingRelationV2进行转换,转换成StreamingDataSourceV2Relation。而MicroBatchExecution和ContinuousExecution只有在StreamingQueryManager的createQuery方法中才会被使用到。那么这个StreamingQueryManager的createQuery方法会在哪里被使用到呢?跟踪代码会发现是DataStreamWriter中调用StreamingQueryManager的startQuery方法进而调用到createQuery方法的。
而DataStreamWriter是Dataset的writeStream创建的。
【以上说的是写入流的过程】。
关键类:BaseSessionStateBuilder,里面有analyzer的定义。
protected def analyzer: Analyzer = new Analyzer(catalog, v2SessionCatalog, conf) {
override val extendedResolutionRules: Seq[Rule[LogicalPlan]] =
new FindDataSourceTable(session) +:
new ResolveSQLOnFile(session) +:
new FallBackFileSourceV2(session) +:
DataSourceResolution(conf, this.catalogManager) +:
customResolutionRules
override val postHocResolutionRules: Seq[Rule[LogicalPlan]] =
new DetectAmbiguousSelfJoin(conf) +:
PreprocessTableCreation(session) +:
PreprocessTableInsertion(conf) +:
DataSourceAnalysis(conf) +:
customPostHocResolutionRules
override val extendedCheckRules: Seq[LogicalPlan => Unit] =
PreWriteCheck +:
PreReadCheck +:
HiveOnlyCheck +:
TableCapabilityCheck +:
customCheckRules
}
这里没有特别需要关注的,先忽略。
DataSourceV2是指spark中V2版本的结构化流处理引擎框架。这里说的逻辑计划就是StreamingDataSourceV2Relation,对应的物理计划分成两类:MicroBatchScanExec和ContinuousScanExec,两者的应用场景从取名上就可以分辨出来,一个是微批处理模式;另一个则是连续流模式。
我们先从物理计划开始解析。
这两个物理计划基于同一个父类:DataSourceV2ScanExecBase,先看看父类的代码:
关键代码:
override def doExecute(): RDD[InternalRow] = {
val numOutputRows = longMetric("numOutputRows")
inputRDD.map { r =>
numOutputRows += 1
r
}
}
子类需要重写inputRDD。
StreamExecution
两种重要的checkpoint属性:
val offsetLog = new OffsetSeqLog(sparkSession, checkpointFile("offsets"))
val commitLog = new CommitLog(sparkSession, checkpointFile("commits"))
offsetLog是当前读取到哪个offset了,commitLog是当前处理到哪个Offset了。这两个Log非常重要,合在一起保证了Exactly-once语义。
MicroBatchScanExec
好了,先看看MicroBatchScanExec是怎么重写inputRDD的。
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start, end)
override lazy val readerFactory: PartitionReaderFactory = stream.createReaderFactory()
override lazy val inputRDD: RDD[InternalRow] = {
new DataSourceRDD(sparkContext, partitions, readerFactory, supportsColumnar)
}
有三个地方,第一个是重写Seq[InputPartition],调用stream的planInputPartitions方法,注意下这里的stream类型是MicroBatchStream;第二个是重写readerFactory,获得读取器工厂类;第三个重写是inputRDD,创建DataSourceRDD作为inputRDD,而前两步重写的Seq[InputPartition]和readerFactory作为DataSourceRDD的构造参数。
这里首先大概看下DataSourceRDD的功能是什么。
DataSourceRDD这个类的代码很短,很容易看清楚。最重要的就是compute方法,先给出全部代码:
override def compute(split: Partition, context: TaskContext): Iterator[InternalRow] = {
val inputPartition = castPartition(split).inputPartition
val reader: PartitionReader[_] = if (columnarReads) {
partitionReaderFactory.createColumnarReader(inputPartition)
} else {
partitionReaderFactory.createReader(inputPartition)
}
context.addTaskCompletionListener[Unit](_ => reader.close())
val iter = new Iterator[Any] {
private[this] var valuePrepared = false
override def hasNext: Boolean = {
if (!valuePrepared) {
valuePrepared = reader.next()
}
valuePrepared
}
override def next(): Any = {
if (!hasNext) {
throw new java.util.NoSuchElementException("End of stream")
}
valuePrepared = false
reader.get()
}
}
// TODO: SPARK-25083 remove the type erasure hack in data source scan
new InterruptibleIterator(context, iter.asInstanceOf[Iterator[InternalRow]])
}
先根据读取器工厂类创建一个PartitionReader,然后调用PartitionReader的get方法获取数据。就是这么简单了!
ContinuousScanExec
最后再看下ContinuousScanExec的定义。
override lazy val partitions: Seq[InputPartition] = stream.planInputPartitions(start)
override lazy val readerFactory: ContinuousPartitionReaderFactory = {
stream.createContinuousReaderFactory()
}
override lazy val inputRDD: RDD[InternalRow] = {
EpochCoordinatorRef.get(
sparkContext.getLocalProperty(ContinuousExecution.EPOCH_COORDINATOR_ID_KEY),
sparkContext.env)
.askSync[Unit](SetReaderPartitions(partitions.size))
new ContinuousDataSourceRDD(
sparkContext,
sqlContext.conf.continuousStreamingExecutorQueueSize,
sqlContext.conf.continuousStreamingExecutorPollIntervalMs,
partitions,
schema,
readerFactory.asInstanceOf[ContinuousPartitionReaderFactory])
}
和微批处理模式MicroBatchScanExec类似,也有三个地方重写,第一个是重写Seq[InputPartition],调用stream的planInputPartitions方法,注意下这里的stream类型是ContinuousStream;第二个是重写readerFactory,获得读取器工厂类ContinuousPartitionReaderFactory;第三个重写是inputRDD,创建ContinuousDataSourceRDD作为inputRDD,而前两步重写的Seq[InputPartition]和readerFactory作为ContinuousDataSourceRDD的构造参数。
这里首先大概看下ContinuousDataSourceRDD的功能是什么。
ContinuousDataSourceRDD的代码和DataSourceRDD的基本差不多,直接看源码吧,这里就不细说了,也没啥好细说的,显得啰里啰唆。
对于Kafka来说,ContinuousDataSourceRDD和DataSourceRDD其实最终是一样的
“DataSourceV2流处理方法是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注天达云网站,小编将为大家输出更多高质量的实用文章!