KAFKA的ISR的伸缩过程是什么
更新:HHH   时间:2023-1-7


本篇内容介绍了“KAFKA的ISR的伸缩过程是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!

一、前言

知识点总结


二、概述

我们了解ISR列表是不断伸缩的,在副本失效后及时踢出ISR列表,在副本赶上进度之后重新将副本加入到ISR列表中,后面我们就会按照这个思路来看下其中细节。


三、什么是失效副本?

功能失效:节点宕机,在该节点上的副本都属于功能失效副本。
同步失效:follower副本所在的broker因为带宽或者负载等因素无法及时完成同步,导致被踢出ISR。


四、ISR伸缩控制参数了解吗?

在0.9x版本之前,有一个控制参数:replica.lag.max.messages 默认值为4000,表示如果follower的消息个数落后leader个数4000,那么就会被踢出ISR列表;
我们可以想一下这种直接指定条数的方式是否合理呢?显然是不合理的,原因入下:
高吞吐的场景:瞬间就几万条消息,可能follower就滞后个几秒钟就被判定为失效从而被踢出,可能导致ISR列表频繁的变动,以及元数据的频繁更新。
低吞吐的场景:可能一天就几条消息,那可能follower都滞后好几天了依旧存在于ISR中,那ISR不就失去意义了吗?

因此0.9x版本开始,移除了该参数,取而代之的参数是replica.lag.time.max.ms 该参数默认值是10000ms,也就是10s。
也就是说如果follower在10s都没能追上leader的LEO,就会被认定为失效,从而踢出IS列表。


五、ISR是如何将失效副本剔除的?

我们知道了ISR是如何判定失效副本后,再来看下,到底是怎么把这个失效的副本踢出去的呢?
1、每个broker在启动的时候都会启动两个定时任务:

  • isr-expiration:定时检查当前broker上的eader对应的副本失效信息,也就是看当前Leader的ISR列表中是否存在失效副本,默认执行周期为replica.lag.time.max.ms / 2 = 5s

  • isr-change-propagation:定时检查内存isrChangeSet中是否有新的变更数据,固定执行周期为2.5s

2、判断副本失效:
isr-expiration任务会根据当前时间now,减去某follower的 lastCaughtUpTimeMs,如果大于replica.lag.time.max.ms值,则说明失效。
lastCaughtUpTimeMs这个值,在follower的LEO与leader的LEO相等时(Leader中维护了follower的LEO信息),被更新。
也就是说,只有当follower完全追上了Leader才更新,而不是每Fetch一次就更新。

关于为什么不是每次Fetch的时候就更新该值呢?
我们试想一下,如果leader的写入速率远大于follower的同步速率,可能leader已经写了10w条数据了,follower由于网络/负载为原因还在慢悠悠的同步,但是因为Fetch请求是正常发送的,就每次都更新lastCaughtUpTimeMs值,从而认为该follower是有效的,那这不就导致leader和follower之间在这种场景下存在巨大的数据差异了嘛?从而影响数据的可靠性。

3、这个ISR变化的信息如何传递呢?

  1. 由leader所在的broker的isr-expiration定时任务,去检查失效副本和更新zk的/state节点数据,同时写入isrChangeSet

  2. isr-change-propagation去检查isrChangeSet是否有新增数据,如果有,则往zk中的/isr_change_notification节点下创建子节点。

  3. 而Controller对这个节点有一个Watcher,如果发现新增了子节点,那么Controller就会重新从zk中获取到最新的元数据,然后通知所有Broker更新元数据。

从上述过程中,我们还可以知道,实际上这个变更的数据会在内存中停留一段时间,假如这个时候我们对应的broker宕机了,那么不就是改了zk却没有让其他broker更新元数据吗?
其实不是,因为这种情况下,broker宕机会触发controller在zk下的brokers/ids下对应的节点被删除,因此Controller也会让其他的broker更新元数据,所以无论如何都会更新。

最后我们来总结一下整个ISR剔除的过程
每个leader在启动的时候都会启动两个定时检查任务,每隔一段时间检查是否存在失效副本。
如果某个follower的lastCaughtUpTimeMs > 10s那么就会被判定为失效副本
如果定时任务扫描到存在失效副本时,就会往zk的/state节点下更新最新的ISR列表数据,同时将变更数据写入到内存中的isrChangeSet中。
然后另外一个传播任务会定时检查isrChangeSet是否存在需要变更的任务,如果感知到就往zk的/isr_change_notification节点下创建子节点。
最终由Controller感知到节点的变化,然后从zk中获取最新的元数据,然后通知所有的Broker更新元数据,完成整个ISR列表的数据更新。


六、追赶上的副本是如何重新加入ISR中的?

在看完第五小节之后,第六小节就会显得非常简单,无非是需要知道什么时候一个副本会重新判定为同步副本呢? 那就是:当前失效follower的LEO等于leaderHW的时候,即被判断可以重新加入ISR。

那么随之而来的一个问题就是在哪儿去判断followerLEO == leaderHW的呢?
这里和上面的剔除ISR成员不一样,并不是由定时任务去检测的,而是在处理完Fetch请求的时候,如果判断Fetch请求是follower发过来的的(replicaId >= 0),那么就会去看下当前这个follower的LEO是多少(其实就是Fetch请求带过来的),是不是赶上了当前的leaderHW,如果是的那么就执行扩张ISR操作。
扩张ISR操作流程就和上面流程一样了,先写zk下的/state数据,然后写isrChangeSet,最后由Controller感知到数据变化,更新集群元数据。
我们所需要记住的主要差别点在于,ISR列表的扩张是在Fetch请求的时候去判断和执行的。


七、整个伸缩过程总结

最后,我们用图示来加深一点印象。
1、失效副本(图源:《深入理解kafka》)

2、踢出ISR列表:


八、性能优化

我们由上可知,ISR的伸缩是需要涉及到zk和Controller以及各个Broker的元数据更新的,因此如果太过频繁会造成性能问题。
所以kafka在在判断ISR伸缩之前,还会判断两个条件,以此来降低频率:

  • 上次ISR集合发生变化距离现在已经超过5s。

  • 上一次写入zk的时候,距离现在已经超过60s。

如果一个副本刚追上Leader加入到ISR,但是因为短时间内没有追上LEO,5s之后又被检查到是失效副本,不是又要被踢出去,要更新元数据,这样就太频繁了。 因此就有了上面两个限制,就起码给了多60s的让新加入的follower去追上Leader的LEO。

“KAFKA的ISR的伸缩过程是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注天达云网站,小编将为大家输出更多高质量的实用文章!

返回云计算教程...