这篇文章给大家分享的是有关Spark结构化流处理机制之容错机制的示例分析的内容。小编觉得挺实用的,因此分享给大家做个参考,一起跟随小编过来看看吧。
容错机制
端到端的有且仅有一次保证,是结构化流设计的关键目标之一.
结构化流设计了 Structured Streaming sources,sinks等等,来跟踪确切的处理进度,并让其重启或重运行来处理任何故障
streaming source是类似kafka的偏移量(offsets)来跟踪流的读取位置.执行引擎使用检查点(checkpoint)和预写日志(write ahead logs)来记录每个执行其的偏移范围值
streaming sinks 是设计用来保证处理的幂等性
这样,依靠可回放的数据源(streaming source)和处理幂等(streaming sinks),结构流来做到任何故障下的端到端的有且仅有一次保证
val lines = spark.readStream
.format("socket")
.option("host", "localhost")
.option("port", 9999)
.load()
// Split the lines into words
val words = lines.as[String].flatMap(_.split(" "))
// Generate running word count
val wordCounts = words.groupBy("value").count()
其中,spark是SparkSession,lines是DataFrame,DataFrame就是Dataset[Row]。
DataSet
看看Dataset的触发因子的代码实现,比如foreach操作:
def foreach(f: T => Unit): Unit = withNewRDDExecutionId {
rdd.foreach(f)
}
private def withNewRDDExecutionId[U](body: => U): U = {
SQLExecution.withNewExecutionId(sparkSession, rddQueryExecution) {
rddQueryExecution.executedPlan.foreach { plan =>
plan.resetMetrics()
}
body
}
}
接着看:
def withNewExecutionId[T](
sparkSession: SparkSession,
queryExecution: QueryExecution,
name: Option[String] = None)(body: => T): T = {
val sc = sparkSession.sparkContext
val oldExecutionId = sc.getLocalProperty(EXECUTION_ID_KEY)
val executionId = SQLExecution.nextExecutionId
sc.setLocalProperty(EXECUTION_ID_KEY, executionId.toString)
executionIdToQueryExecution.put(executionId, queryExecution)
try {
withSQLConfPropagated(sparkSession) {
try {
body
} catch {
} finally {
}
}
} finally {
executionIdToQueryExecution.remove(executionId)
sc.setLocalProperty(EXECUTION_ID_KEY, oldExecutionId)
}
}
执行的真正代码就是 queryExecution: QueryExecution。
@transient private lazy val rddQueryExecution: QueryExecution = {
val deserialized = CatalystSerde.deserialize[T](logicalPlan)
sparkSession.sessionState.executePlan(deserialized)
}
看到了看到了,是sessionState.executePlan执行logicalPlan而得到了QueryExecution
这里的sessionState.executePlan其实就是创建了一个QueryExecution对象。然后执行QueryExecution的executedPlan方法得到SparkPlan这个物理计划。怎么生成的呢?
lazy val sparkPlan: SparkPlan = tracker.measurePhase(QueryPlanningTracker.PLANNING) {
SparkSession.setActiveSession(sparkSession)
planner.plan(ReturnAnswer(optimizedPlan.clone())).next()
}
通过planner.plan方法生成。
planner是SparkPlanner。在BaseSessionStateBuilder类中定义。
protected def planner: SparkPlanner = {
new SparkPlanner(session.sparkContext, conf, experimentalMethods) {
override def extraPlanningStrategies: Seq[Strategy] =
super.extraPlanningStrategies ++ customPlanningStrategies
}
}
SparkPlanner类
SparkPlanner对LogicalPlan执行各种策略,返回对应的SparkPlan。比如对于流应用来说,有这样的策略:DataSourceV2Strategy。
典型的几个逻辑计划到物理计划的映射关系如下:
StreamingDataSourceV2Relation-》ContinuousScanExec
StreamingDataSourceV2Relation-》MicroBatchScanExec
前一种对应与Offset没有endOffset的情况,后一种对应于有endOffset的情况。前一种是没有结束的连续流,后一种是有区间的微批处理流。
前一种的时延可以达到1ms,后一种的时延只能达到100ms。
【代码】:
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isDefined =>
val microBatchStream = r.stream.asInstanceOf[MicroBatchStream]
val scanExec = MicroBatchScanExec(
r.output, r.scan, microBatchStream, r.startOffset.get, r.endOffset.get)
val withProjection = if (scanExec.supportsColumnar) {
scanExec
} else {
// Add a Project here to make sure we produce unsafe rows.
ProjectExec(r.output, scanExec)
}
withProjection :: Nil
case r: StreamingDataSourceV2Relation if r.startOffset.isDefined && r.endOffset.isEmpty =>
val continuousStream = r.stream.asInstanceOf[ContinuousStream]
val scanExec = ContinuousScanExec(r.output, r.scan, continuousStream, r.startOffset.get)
val withProjection = if (scanExec.supportsColumnar) {
scanExec
} else {
// Add a Project here to make sure we produce unsafe rows.
ProjectExec(r.output, scanExec)
}
withProjection :: Nil
感谢各位的阅读!关于“Spark结构化流处理机制之容错机制的示例分析”这篇文章就分享到这里了,希望以上内容可以对大家有一定的帮助,让大家可以学到更多知识,如果觉得文章不错,可以把它分享出去让更多的人看到吧!