如何解密Spark Streaming
更新:HHH   时间:2023-1-7


这篇文章主要介绍“如何解密Spark Streaming”,在日常操作中,相信很多人在如何解密Spark Streaming问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”如何解密Spark Streaming”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!

1,解密Spark Streaming Job架构和运行机制

 先通过运行在线单词统计这个例子,观察Spark Streaming在控制台上输出的日志信息。

以下代码为在9999端口监听客户端连接请求,然后不断向客户端发送单词。

先启动SocketServer,然后在启动SparkStreaming在线统计单词的程序,代码如下

运行过程总结如下

1,StreamingContext启动后会ReceiverTracker,根据创建时指定的batchDuration时间,启动RecurringTimer定时器,间隔Interval发送JobGenerator消息,会启动JobGenerator和JobScheduler和BlockGenerator。

2,ReceiverTracker接收到Receiver(Stream 0)的注册消息,然后RecevierSupervisorImpl启动Receiver来接收数据。

3,SocketServer连接到localhost:9999开始接收数据,将接收到的数据通过BlockGenerator存放到BlockManager中。

4,JobScheduler接收到定期发送的JobGenerator消息后,提交一个Job,DStreamGraph从ReceiverTracker中获取数据生成RDD,DAGScheduler调度Job的执行,让TaskSchedulerImpl向Executor发送TaskSet,让Executor执行。

5,Task运行完后将结果发送给Driver,DAGScheduler和JbScheduler打印Job完成和耗时信息,最后在控制台输出单词统计结果。

可以看到随着时间的流逝会有不断的Job生成并且运行,那么,Spark Streaming中Job是如何生成的?

在StreamingContext调用start方法的内部其实是会启动JobScheduler的start方法,进行消息循环,在JobScheduler的start内部会构造JobGenerator和ReceiverTracker,并且调用JobGenerator和ReceiverTracker的start方法

    1,JobGenerator启动后不断的根据batchDuration生成一个个的Job

    2,ReceiverTracker启动后首先在Spark集群中启动Receiver(其实在Executor中先启动ReceiverSupervisor)在Receiver接收到数据后会通过ReceiverSupervisor将数据存储到Executor的BlockManager中,并且把数据的Metadata信息发送给Driver的ReceiverTracker,在ReceiverTracker内部通过ReceivedBlockTracker来管理接收到的元数据信息

每个BatchInterval会产生一个具体的Job,其实这里的Job不是SparkCore中的Job,它只是基于DStreamGraph而生成的RDD的DAG而已,从Java角度讲,相等于Runnable接口实例,此时要向运行Job需要提交给JobScheduler,在JobScheduler中通过线程池中单独的线程

来提交Job到集群运行(其实是在线程中基于RDD的Action触发真正的作业的运行)

为什么使用线程池?

1,作业不断生成,所以为了提升效率,我们需要线程池。这和Executor中通过线程池执行Task有异曲同工之妙

2,有可能设置了Job的FAIR公平调度的方式,这个时候也需要多线程的支持

2,解密Spark Streaming容错架构和运行机制

容错分为Driver级别的容错和Executor级别的容错。

在Executor级别的容错具体为接收数据的安全性和任务执行的安全性。在接收数据安全性方面,一种方式是Spark Streaming接收到数据默认为MEMORY_AND_DISK_2的方式,在两台机器的内存中,如果一台机器上的Executor挂了,立即切换到另一台机器上的Executor,这种方式一般情况下非常可靠且没有切换时间。另外一种方式是WAL(Write Ahead Log),在数据到来时先通过WAL机制将数据进行日志记录,如果有问题则从日志记录中恢复,然后再把数据存到Executor中,再进行其他副本的复制,这种方式对性能有影响。在生产环境中一般使用Kafka存储,Spark Streaming接收到数据丢失时可以从Kafka中回放。在任务执行的安全性方面,靠RDD的容错。

在Driver级别的容错具体为DAG生成的模板,即DStreamGraph,RecevierTracker中存储的元数据信息和JobScheduler中存储的Job进行的进度情况等信息,只要通过checkpoint就可以了,每个Job生成之前进行checkpoint,在Job生成之后再进行checkpoint,如果出错的话就从checkpoint中恢复。

到此,关于“如何解密Spark Streaming”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注天达云网站,小编会继续努力为大家带来更多实用的文章!

返回云计算教程...