这篇文章主要介绍“Spark Streaming怎么使用”,在日常操作中,相信很多人在Spark Streaming怎么使用问题上存在疑惑,小编查阅了各式资料,整理出简单好用的操作方法,希望对大家解答”Spark Streaming怎么使用”的疑惑有所帮助!接下来,请跟着小编一起来学习吧!
DStream是逻辑级别的,而RDD是物理级别的。DStream是随着时间的流动内部将集合封装RDD。对DStream的操作,转过来对其内部的RDD操作。
纵轴为空间维度:代表的是RDD的依赖关系构成的具体的处理逻辑的步骤,是用DStream来表示的。
横轴为时间维度:按照特定的时间间隔不断地生成job对象,并在集群上运行。
随着时间的推移,基于DStream Graph 不断生成RDD Graph ,也即DAG的方式生成job,并通过Job Scheduler的线程池的方式提交给spark cluster不断的执行。
由上可知,RDD 与 DStream的关系如下
RDD是物理级别的,而 DStream 是逻辑级别的
DStream是RDD的封装类,是RDD进一步的抽象
DStream 是RDD的模板。DStream要依赖RDD进行具体的数据计算
注意:纵轴维度需要RDD,DAG的生成模板,需要TimeLine的job控制器
横轴维度(时间维度)包含batch interval,窗口长度,窗口滑动时间等。
3,Spark Streaming源码解析
StreamingContext方法中调用JobScheduler的start方法
JobGenerator的start方法中,调用startFirstTime方法,来开启定时生成Job的定时器
startFirstTime方法,首先调用DStreamGraph的start方法,然后再调用RecurringTimer的start方法。
timer对象为一个定时器,根据batchInterval时间间隔定期向EventLoop发送GenerateJobs的消息。
接收到GenerateJobs消息后,会回调generateJobs方法。
generateJobs方法再调用DStreamGraph的generateJobs方法生成Job
DStreamGraph的generateJobs方法
DStreamGraph的实例化是在StreamingContext中的
DStreamGraph类中保存了输入流和输出流信息
回到JobGenerator的start方法中receiverTracker.start()
其中ReceiverTrackerEndpoint对象为一个消息循环体
launchReceivers方法中发送StartAllReceivers消息
接收到StartAllReceivers消息后,进行如下处理
StartReceiverFunc方法如下,实例化Receiver监控者,开启并等待退出
supervisor的start方法中调用startReceiver方法
我们以socketTextStream为例,其启动的是SocketReceiver,内部开启一个线程,来接收数据。
内部调用supervisor的pushSingle方法,将数据聚集后存放在内存中
supervisor的pushSingle方法如下,将数据放入到defaultBlockGenerator中,defaultBlockGenerator为BlockGenerator,保存Socket接收到的数据
BlockGenerator对象中有一个定时器,来更新当前的Buffer
BlockGenerator对象中有一个线程,来从阻塞队列中取出数据
调用ReceiverSupervisorImpl类中的继承BlockGeneratorListener的匿名类中的onPushBlock方法。
receivedBlockHandler对象如下
这里我们讲解BlockManagerBasedBlockHandler的方式
trackerEndpoint如下
其实是发送给ReceiverTrackerEndpoint类,
InputInfoTracker类的reportInfo方法只是对数据进行记录统计
其generateJob方法是被DStreamGraph调用
DStreamGraph的generateJobs方法是被JobGenerator类的generateJobs方法调用。
JobGenerator类中有一个定时器,batchInterval发送GenerateJobs消息
总结:
1,当调用StreamingContext的start方法时,启动了JobScheduler
2,当JobScheduler启动后会先后启动ReceiverTracker和JobGenerator
3,ReceiverTracker启动后会创建ReceiverTrackerEndpoint这个消息循环体,来接收运行在Executor上的Receiver发送过来的消息
4,ReceiverTracker在启动时会给自己发送StartAllReceivers消息,自己接收到消息后,向Spark提交startReceiverFunc的Job
5,startReceiverFunc方法中在Executor上启动Receiver,并实例化ReceiverSupervisorImpl对象,来监控Receiver的运行
6,ReceiverSupervisorImpl对象会调用Receiver的onStart方法,我们以SocketReceiver为例,启动一个线程,连接Server,读取网络数据先调用ReceiverSupervisorImpl的pushSingle方法,
保存在BlockGenerator对象中,该对象内部有个定时器,放到阻塞队列blocksForPushing,等待内部线程取出数据放到BlockManager中,并发AddBlock消息给ReceiverTrackerEndpoint。
ReceiverTrackerEndpoint为ReceiverTracker的内部类,在接收到addBlock消息后将streamId对应的数据阻塞队列streamIdToUnallocatedBlockQueues中
7,JobGenerator启动后会启动以batchInterval时间间隔发送GenerateJobs消息的定时器
8,接收到GenerateJobs消息会先后触发ReceiverTracker的allocateBlocksToBatch方法和DStreamGraph的generateJobs方法
9,ReceiverTracker的allocateBlocksToBatch方法会调用getReceivedBlockQueue方法从阻塞队列streamIdToUnallocatedBlockQueues中根据streamId获取数据
10,DStreamGraph的generateJobs方法,继而调用变量名为outputStreams的DStream集合的generateJob方法
11,继而调用DStream的getOrCompute来调用具体的DStream的compute方法,我们以ReceiverInputDStream为例,compute方法是从ReceiverTracker中获取数据
到此,关于“Spark Streaming怎么使用”的学习就结束了,希望能够解决大家的疑惑。理论与实践的搭配能更好的帮助大家学习,快去试试吧!若想继续学习更多相关知识,请继续关注天达云网站,小编会继续努力为大家带来更多实用的文章!