小编给大家分享一下Spark sql流式处理的示例分析,相信大部分人都还不怎么了解,因此分享这篇文章给大家参考一下,希望大家阅读完这篇文章后大有收获,下面让我们一起去了解一下吧!
Spark sql支持流式处理,流式处理有Source,Sink。Source定义了流的源头,Sink定义了流的目的地,流的执行是从Sink开始触发的。
Dataset的writeStream定义了流的目的地并触发流的真正执行,所以分析就从writeStream开始。
writeStream = new DataStreamWriter[T](this)
DataStreamWriter
DataStreamWriter的作用是将入参的dataset写入到外部存储,比如kafka,database,txt等。
主要触发方法是start方法,返回一个StreamingQuery对象,代码:
def start(): StreamingQuery = {
if (source == "memory") {
assertNotPartitioned("memory")
val (sink, resultDf) = trigger match {
case _: ContinuousTrigger =>
val s = new MemorySinkV2()
val r = Dataset.ofRows(df.sparkSession, new MemoryPlanV2(s, df.schema.toAttributes))
(s, r)
case _ =>
val s = new MemorySink(df.schema, outputMode)
val r = Dataset.ofRows(df.sparkSession, new MemoryPlan(s))
(s, r)
}
val chkpointLoc = extraOptions.get("checkpointLocation")
val recoverFromChkpoint = outputMode == OutputMode.Complete()
val query = df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
chkpointLoc,
df,
extraOptions.toMap,
sink,
outputMode,
useTempCheckpointLocation = true,
recoverFromCheckpointLocation = recoverFromChkpoint,
trigger = trigger)
resultDf.createOrReplaceTempView(query.name)
query
} else if (source == "foreach") {
assertNotPartitioned("foreach")
val sink = new ForeachSink[T](foreachWriter)(ds.exprEnc)
df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
df,
extraOptions.toMap,
sink,
outputMode,
useTempCheckpointLocation = true,
trigger = trigger)
} else {
val ds = DataSource.lookupDataSource(source, df.sparkSession.sessionState.conf)
val disabledSources = df.sparkSession.sqlContext.conf.disabledV2StreamingWriters.split(",")
val sink = ds.newInstance() match {
case w: StreamWriteSupport if !disabledSources.contains(w.getClass.getCanonicalName) => w
case _ =>
val ds = DataSource(
df.sparkSession,
className = source,
options = extraOptions.toMap,
partitionColumns = normalizedParCols.getOrElse(Nil))
ds.createSink(outputMode)
}
df.sparkSession.sessionState.streamingQueryManager.startQuery(
extraOptions.get("queryName"),
extraOptions.get("checkpointLocation"),
df,
extraOptions.toMap,
sink,
outputMode,
useTempCheckpointLocation = source == "console",
recoverFromCheckpointLocation = true,
trigger = trigger)
}
}
我们这里看最后一个条件分支的代码,ds是对应的DataSource,sink有时候就是ds。最后通过streamingQueryManager的startQuery启动流的计算,返回计算中的StreamingQuery对象。
streamingQueryManager的startQuery方法里主要调用createQuery方法创建StreamingQueryWrapper对象,这是个私有方法:
private def createQuery(
userSpecifiedName: Option[String],
userSpecifiedCheckpointLocation: Option[String],
df: DataFrame,
extraOptions: Map[String, String],
sink: BaseStreamingSink,
outputMode: OutputMode,
useTempCheckpointLocation: Boolean,
recoverFromCheckpointLocation: Boolean,
trigger: Trigger,
triggerClock: Clock): StreamingQueryWrapper = {
var deleteCheckpointOnStop = false
val checkpointLocation = userSpecifiedCheckpointLocation.map { userSpecified =>
new Path(userSpecified).toUri.toString
}.orElse {
df.sparkSession.sessionState.conf.checkpointLocation.map { location =>
new Path(location, userSpecifiedName.getOrElse(UUID.randomUUID().toString)).toUri.toString
}
}.getOrElse {
if (useTempCheckpointLocation) {
// Delete the temp checkpoint when a query is being stopped without errors.
deleteCheckpointOnStop = true
Utils.createTempDir(namePrefix = s"temporary").getCanonicalPath
} else {
throw new AnalysisException(
"checkpointLocation must be specified either " +
"""through option("checkpointLocation", ...) or """ +
s"""SparkSession.conf.set("${SQLConf.CHECKPOINT_LOCATION.key}", ...)""")
}
}
// If offsets have already been created, we trying to resume a query.
if (!recoverFromCheckpointLocation) {
val checkpointPath = new Path(checkpointLocation, "offsets")
val fs = checkpointPath.getFileSystem(df.sparkSession.sessionState.newHadoopConf())
if (fs.exists(checkpointPath)) {
throw new AnalysisException(
s"This query does not support recovering from checkpoint location. " +
s"Delete $checkpointPath to start over.")
}
}
val analyzedPlan = df.queryExecution.analyzed
df.queryExecution.assertAnalyzed()
if (sparkSession.sessionState.conf.isUnsupportedOperationCheckEnabled) {
UnsupportedOperationChecker.checkForStreaming(analyzedPlan, outputMode)
}
if (sparkSession.sessionState.conf.adaptiveExecutionEnabled) {
logWarning(s"${SQLConf.ADAPTIVE_EXECUTION_ENABLED.key} " +
"is not supported in streaming DataFrames/Datasets and will be disabled.")
}
(sink, trigger) match {
case (v2Sink: StreamWriteSupport, trigger: ContinuousTrigger) =>
UnsupportedOperationChecker.checkForContinuous(analyzedPlan, outputMode)
new StreamingQueryWrapper(new ContinuousExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
v2Sink,
trigger,
triggerClock,
outputMode,
extraOptions,
deleteCheckpointOnStop))
case _ =>
new StreamingQueryWrapper(new MicroBatchExecution(
sparkSession,
userSpecifiedName.orNull,
checkpointLocation,
analyzedPlan,
sink,
trigger,
triggerClock,
outputMode,
extraOptions,
deleteCheckpointOnStop))
}
}
它根据是否连续流操作还是微批处理操作分成ContinuousExecution和MicroBatchExecution,他们都是StreamExecution的子类,StreamExecution是流处理的抽象类。稍后会分析StreamExecution的类结构。
ContinuousExecution和MicroBatchExecution两者的代码结构和功能其实是很类似的,我们先拿ContinuousExecution举例吧。
ContinuousExecution
首先ContinuousExecution是没有结束的,是没有结束的流,当暂时流没有数据时,ContinuousExecution会阻塞线程等待新数据的到来,这是通过awaitEpoch方法来控制的。
其实,commit方法在每条数据处理完后被触发,commit方法将当前处理完成的偏移量(offset)写到commitLog中。
再看logicalPlan,在ContinuousExecution中入参的逻辑计划是StreamingRelationV2类型,会被转换成ContinuousExecutionRelation类型的LogicalPlan:
analyzedPlan.transform {
case r @ StreamingRelationV2(
source: ContinuousReadSupport, _, extraReaderOptions, output, _) =>
toExecutionRelationMap.getOrElseUpdate(r, {
ContinuousExecutionRelation(source, extraReaderOptions, output)(sparkSession)
})
}
还有addOffset方法,在每次读取完offset之后会将当前的读取offset写入到offsetLog中,以便下次恢复时知道从哪里开始。addOffset和commit两个方法一起保证了Exactly-once语义的执行。
以上是“Spark sql流式处理的示例分析”这篇文章的所有内容,感谢各位的阅读!相信大家都有了一定的了解,希望分享的内容对大家有所帮助,如果还想学习更多知识,欢迎关注天达云行业资讯频道!