KAFKA中rebalance是什么
更新:HHH   时间:2023-1-7


本篇内容介绍了“KAFKA中rebalance是什么”的有关知识,在实际案例的操作过程中,不少人都会遇到这样的困境,接下来就让小编带领大家学习一下如何处理这些情况吧!希望大家仔细阅读,能够学有所成!


一、写在前面

让我们从头到尾梳理一下rebalance。


二、什么是rebalance?

中文直译,就是重平衡。
是什么去重平衡呢?消费组内的消费者成员去重平衡。(消费组的概念如果不清楚各位先自行百度,后续我写到消费模块的时候才会提到这些概念)
为什么需要重平衡呢?因为消费组内成员的故障转移和动态分区分配。

翻译一下:
消费组内成员的故障转移:当一个消费组内有三个消费者A,B,C,分别消费分区:a,b,c

A -> a
B -> b
C -> c

此时如果A消费者出了点问题,那么就意味着a分区没有消费者进行消费了,那这肯定不行,那么就通过rebalance去将a分区分配给其他还存活着的消费者客户端,rebalance后可能得到的消费策略:

A -> a (GG)
B -> b,a
C -> c

这就是消费组内成员的故障转移,就是某个消费者客户端出问题之后把它原本消费的分区通过REBALNACE分配给其他存活的消费者客户端。

动态分区分配:当某个topic的分区数变化,对于消费组而言可消费的分区数变化了,因此就需要rebalance去重新进行动态分区分配,举个栗子,原本某topic只有3个分区,我现在扩成了10个分区,那么不就意味着多了7个分区没有消费者消费吗?这显然是不行的,因此就需要rebalance过程去进行分区分配,让现有的消费者去把这10个分区全部消费到。


三、rebalance是怎么触发的?

这个其实在上面一小节已经提到的差不多了,在这个小节再做一点补充和总结。
触发条件:

  1. 消费组内成员变化:下线/上线/故障被踢出。

  2. 消费的分区数变化:topic被删了,topic分区数增加了。

  3. coordinator节点出问题了:因为消费组的元数据信息都是在coordinator节点的,因此coordinator节点出问题也会触发rebalance去找一个新的coordinator节点。怎么找呢?显然就是走一遍FIND_COORDINATOR请求嘛,然后找到负载最低的那个节点问一下,我的新的coordinator在哪儿呀?然后得到答案之后让消费者客户端去连新的coordinator节点。


四、rebalance的宏观过程

整个rebalance的过程,是一个状态机流转的过程,整体过程示意图如下:图源:https://www.cnblogs.com/huxi2b/p/6815797.html
其实上面这个状态机流转过程在明白原理的情况下,已经非常清晰了,但是如果没看过源码的,依旧不知道为什么是这么流转的,什么情况下状态是Empty呢,什么状态下是Stable呢?什么时候Empty状态会转换为PreparingRebalance状态呢?
下面我就根据请求顺序来看下整个状态的流转过程:
让我们来回答上个小节后面提出的几个比较细节的问题:

这些请求都带有哪些关键数据?
在FIND_COORDINATOR请求的时候,会带上自己的group.id值,这个值是用来计算它的coordinator到底在哪儿的,对应的计算方法就是:coordinatorId=groupId.hash % 50 这个算出来是个数字,代表着具体的分区,哪个topic的分区呢?显然是__consumer_offsets了。
在JOIN_GROUP请求的时候,是没带什么关键参数的,但是在响应的时候会挑选一个客户端作为leader,然后在响应中告诉它被选为了leader并且把消费组元数据信息发给它,然后让该客户端去进行分区分配。
在SYNC_GROUP请求的时候,leader就会带上它根据具体的策略已经分配好的分区分配方案,服务端收到后就更新到元数据里面去,然后其余的consumer客户端只要一发送SYNC请求过来就告诉它要消费哪些分区,然后让它自己去消费就ok了。

到底是哪个阶段导致rebalance过程会劣化到几分钟?
我图中特意将JOIN阶段标位红色,就是让这个阶段显得显眼一些,没错就是这个阶段会导致rebalance整个过程耗时劣化到几分钟。
具体的原因就是JOIN阶段会等待原先组内存活的成员发送JOIN_GROUP请求过来,如果原先组内的成员因为业务处理一直没有发送JOIN_GROUP请求过来,服务端就会一直等待,直到超时。这个超时时间就是max.poll.interval.ms的值,默认是5分钟,因此这种情况下rebalance的耗时就会劣化到5分钟,导致所有消费者都无法进行正常消费,影响非常大。

为什么要分为这么多阶段?
这个主要是设计上的考虑,整个过程设计的还是非常优雅的,第一次连上的情况下需要三次请求,正常运行的consumer去进行rebalance只需要两次请求,因为它原先就知道自己的coordinator在哪儿,因此就不需要FIND_COORDINATOR请求了,除非是它的coordinator宕机了。

回答完这些问题,是不是对整个rebalance过程理解加深一些了呢?其实还有很多细节没有涉及到,例如consumer客户端什么时候会进入rebalance状态?服务端是如何等待原先消费组内的成员发送JOIN_GROUP请求的呢?这些问题就只能一步步看源码了。

FIND_COORDINATOR请求的源码我就不打写了,很简单大家可以自己翻一下,就是带了个group.id上去,上面都提到了。


六、JOIN阶段源码分析

从这段函数我们知道,如果加入一个新的消费组,服务端收到第一个JOIN请求的时候会创建group,这个group的初始状态为Empty

      // 如果group都还不存在,就有了memberId,则认为是非法请求,直接拒绝。
      groupManager.getGroup(groupId) match {
        case None =>
          // 这里group都还不存在的情况下,memberId自然是空的
          if (memberId != JoinGroupRequest.UNKNOWN_MEMBER_ID) {
            responseCallback(joinError(memberId, Errors.UNKNOWN_MEMBER_ID))
          } else {
            // 初始状态是EMPTY
            val group = groupManager.addGroup(new GroupMetadata(groupId, initialState = Empty))
            // 执行具体的加组操作
            doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
          }

        case Some(group) =>
          doJoinGroup(group, memberId, clientId, clientHost, rebalanceTimeoutMs, sessionTimeoutMs, protocolType, protocols, responseCallback)
      }

让我们进入doJoinGroup函数,看下里面的核心逻辑:

          case Empty | Stable =>
            // 初始状态是EMPTY,添加member并且执行rebalance
            if (memberId == JoinGroupRequest.UNKNOWN_MEMBER_ID) {
              // if the member id is unknown, register the member to the group
              addMemberAndRebalance(rebalanceTimeoutMs, sessionTimeoutMs, clientId, clientHost, protocolType, protocols, group, responseCallback)
            } else {
			// ...
              } else {
			  //...
              }
  private def addMemberAndRebalance(rebalanceTimeoutMs: Int,
                                    sessionTimeoutMs: Int,
                                    clientId: String,
                                    clientHost: String,
                                    protocolType: String,
                                    protocols: List[(String, Array[Byte])],
                                    group: GroupMetadata,
                                    callback: JoinCallback) = {
    // 根据clientID初始化memberID
    val memberId = clientId + "-">
  def add(member: MemberMetadata) {
    if (members.isEmpty)
      this.protocolType = Some(member.protocolType)

    assert(groupId == member.groupId)
    assert(this.protocolType.orNull == member.protocolType)
    assert(supportsProtocols(member.protocols))
    // coordinator选举leader很简单,就第一个发送join_group请求的那个member
    if (leaderId.isEmpty)
      leaderId = Some(member.memberId)
    members.put(member.memberId, member)
  }

上面的代码翻译一下很简单,就是新来了一个member,封装一下,添加到这个group中,需要说一下的就是当组状态是Empty的情况下,谁先连上谁就是leader。紧接着就准备rebalance:

  private def maybePrepareRebalance(group: GroupMetadata) {
    group.inLock {
      if (group.canRebalance)
        prepareRebalance(group)
    }
  }
  // 这里是传入PreparingRebalance状态,然后获取到一个SET
  // 翻译一下:就是只有这个SET(Stable, CompletingRebalance, Empty)里面的状态,才能开启rebalance
  def canRebalance = GroupMetadata.validPreviousStates(PreparingRebalance).contains(state)

  private val validPreviousStates: Map[GroupState, Set[GroupState]] =
    Map(Dead -> Set(Stable, PreparingRebalance, CompletingRebalance, Empty, Dead),
      CompletingRebalance -> Set(PreparingRebalance),
      Stable -> Set(CompletingRebalance),
      PreparingRebalance -> Set(Stable, CompletingRebalance, Empty),
      Empty -> Set(PreparingRebalance))
  private def prepareRebalance(group: GroupMetadata) {
    // if any members are awaiting sync, cancel their request and have them rejoin
    if (group.is(CompletingRebalance))
      resetAndPropagateAssignmentError(group, Errors.REBALANCE_IN_PROGRESS)

    val delayedRebalance = if (group.is(Empty))
      new InitialDelayedJoin(this,
        joinPurgatory,
        group,
        groupConfig.groupInitialRebalanceDelayMs,// 默认3000ms,即3s
        groupConfig.groupInitialRebalanceDelayMs,
        max(group.rebalanceTimeoutMs - groupConfig.groupInitialRebalanceDelayMs, 0))
    else
      new DelayedJoin(this, group, group.rebalanceTimeoutMs)// 这里这个超时时间是客户端的poll间隔,默认5分钟
    // 状态机转换:EMPTY -> PreparingRebalance
    group.transitionTo(PreparingRebalance)
    // rebalance开始标志日志
    info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
      s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")
    // 加入时间轮
    val groupKey = GroupKey(group.groupId)
    joinPurgatory.tryCompleteElseWatch(delayedRebalance, Seq(groupKey))
  }

上面这段代码有两个关键点,一个是判断当前能否进入rebalance过程,可以看到只有(Stable, CompletingRebalance, Empty)里面的状态,才能开启rebalance,而最开始来到第一个member的时候,组的状态是Empty显然是能进来的,但是近来之后就给转换为了PreparingRebalance状态,那么后续的member发送JOIN请求过来之后就进不来了,就只能设置个回调后一直等。
那么要等到什么时候呢?第二段代码写的很清楚就是等待延时任务超时,这个延时任务创建是根据当前状态来判断的,如果是Empty就创建一个InitialDelayedJoin延时任务,超时时间是3s;如果不是Empty就创建一个DelayedJoin,超时时间默认是5min。看,源码出真知,这就是JOIN阶段等待member的代码实现。
这里需要补充一下,为什么Empty的状态下要等待3s呢?这其实是一个优化,主要就是优化多消费者同时连入的情况。举个栗子,10个消费者都能在3s内启动然后练上,如果你等着3s时间那么一次rebalance过程就搞定了,如果你不等,那么就意味着来一个就又要开启一次rebalance,一共要进行10次rebalance,这个耗时就比较长了。具体的细节可以查看:https://www.cnblogs.com/huxi2b/p/6815797.html
另外就是,为什么状态不是Empty的时候就延时5分钟呢?这个其实上面就回答了,要等待原来消费组内在线的消费者发送JOIN请求,这个也是rebalance过程耗时劣化的主要原因。

接下来我们看看这两个延时任务,在超时的时候分别都会做些啥,首先是InitialDelayedJoin:

/**
  * Delayed rebalance operation that is added to the purgatory when a group is transitioning from
  * Empty to PreparingRebalance
  *
  * When onComplete is triggered we check if any new members have been added and if there is still time remaining
  * before the rebalance timeout. If both are true we then schedule a further delay. Otherwise we complete the
  * rebalance.
  */
private[group] class InitialDelayedJoin(coordinator: GroupCoordinator,
                                        purgatory: DelayedOperationPurgatory[DelayedJoin],
                                        group: GroupMetadata,
                                        configuredRebalanceDelay: Int,
                                        delayMs: Int,
                                        remainingMs: Int) extends DelayedJoin(coordinator, group, delayMs) {

  // 这里写死是false,是为了在tryComplete的时候不被完成
  override def tryComplete(): Boolean = false

  override def onComplete(): Unit = {
    // 延时任务处理
    group.inLock  {
      // newMemberAdded是后面有新的member加进来就会是true
      // remainingMs第一次创建该延时任务的时候就是3s。
      // 所以这个条件在第一次的时候都是成立的
      if (group.newMemberAdded && remainingMs != 0) {
        group.newMemberAdded = false
        val delay = min(configuredRebalanceDelay, remainingMs)
        // 最新计算的remaining恒等于0,其实本质上就是3-3=0,
        // 所以哪怕这里是新创建了一个InitialDelayedJoin,这个任务的超时时间就是下一刻
        // 这么写的目的其实就是相当于去完成这个延时任务
        val remaining = max(remainingMs - delayMs, 0)
        purgatory.tryCompleteElseWatch(new InitialDelayedJoin(coordinator,
          purgatory,
          group,
          configuredRebalanceDelay,
          delay,
          remaining
        ), Seq(GroupKey(group.groupId)))
      } else
        // 如果没有新的member加入,直接调用父类的函数
        // 完成JOIN阶段
        super.onComplete()
    }
  }
}

大意我都写在注释里面了,其实就是等待3s,然后完了之后调用父类的函数完成整个JOIN阶段,不过不联系上下文去看,还是挺费劲的,对了看这个需要对时间轮源码有了解,正好我前面有写,大家如果有什么不清楚的可以去看下。
接着看下DelayedJoin超时后会干嘛:

/**
 * Delayed rebalance operations that are added to the purgatory when group is preparing for rebalance
 *
 * Whenever a join-group request is received, check if all known group members have requested
 * to re-join the group; if yes, complete this operation to proceed rebalance.
 *
 * When the operation has expired, any known members that have not requested to re-join
 * the group are marked as failed, and complete this operation to proceed rebalance with
 * the rest of the group.
 */
private[group] class DelayedJoin(coordinator: GroupCoordinator,
                                 group: GroupMetadata,
                                 rebalanceTimeout: Long) extends DelayedOperation(rebalanceTimeout, Some(group.lock)) {

  override def tryComplete(): Boolean = coordinator.tryCompleteJoin(group, forceComplete _)
  override def onExpiration() = coordinator.onExpireJoin()
  override def onComplete() = coordinator.onCompleteJoin(group)
}

  // 超时之后啥也没干,哈哈,因为确实不用做啥,置空就好了
  // 核心是onComplete函数和tryComplete函数
  def onExpireJoin() {
    // TODO: add metrics for restabilize timeouts
  }
  def tryCompleteJoin(group: GroupMetadata, forceComplete: () => Boolean) = {
    group.inLock {
      if (group.notYetRejoinedMembers.isEmpty)
        forceComplete()
      else false
    }
  }
  def notYetRejoinedMembers = members.values.filter(_.awaitingJoinCallback == null).toList
  
  def forceComplete(): Boolean = {
    if (completed.compareAndSet(false, true)) {
      // cancel the timeout timer
      cancel()
      onComplete()
      true
    } else {
      false
    }
  }
  def onCompleteJoin(group: GroupMetadata) {
    group.inLock {
      // remove any members who haven't joined the group yet
      // 如果组内成员依旧没能连上,那么就删除它,接收当前JOIN阶段
      group.notYetRejoinedMembers.foreach { failedMember =>
        group.remove(failedMember.memberId)
        // TODO: cut the socket connection to the client
      }

      if (!group.is(Dead)) {
        // 状态机流转 : preparingRebalancing -> CompletingRebalance
        group.initNextGeneration()
        if (group.is(Empty)) {
          info(s"Group ${group.groupId} with generation ${group.generationId} is now empty " +
            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

          groupManager.storeGroup(group, Map.empty, error => {
            if (error != Errors.NONE) {
              // we failed to write the empty group metadata. If the broker fails before another rebalance,
              // the previous generation written to the log will become active again (and most likely timeout).
              // This should be safe since there are no active members in an empty generation, so we just warn.
              warn(s"Failed to write empty metadata for group ${group.groupId}: ${error.message}")
            }
          })
        } else {
          // JOIN阶段标志结束日志
          info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
            s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

          // trigger the awaiting join group response callback for all the members after rebalancing
          for (member <- group.allMemberMetadata) {
            assert(member.awaitingJoinCallback != null)
            val joinResult = JoinGroupResult(
              // 如果是leader 就返回member列表及其元数据信息
              members = if (group.isLeader(member.memberId)) {
                group.currentMemberMetadata
              } else {
                Map.empty
              },
              memberId = member.memberId,
              generationId = group.generationId,
              subProtocol = group.protocolOrNull,
              leaderId = group.leaderOrNull,
              error = Errors.NONE)

            member.awaitingJoinCallback(joinResult)
            member.awaitingJoinCallback = null
            completeAndScheduleNextHeartbeatExpiration(group, member)
          }
        }
      }
    }
  }

上面这一串代码有几个要点,首先,这个任务超时的时候是啥也不干的,为什么呢?这里要了解时间轮的机制,代码也在上面,当一个任务超时的时候,时间轮强制执行对应任务的onComplete函数,然后执行onExpiration函数,其实onExpiration函数对于这个延时任务来说是没有意义的,并不需要做什么,打日志都懒得打。
第二点就是这个任务onComplete什么时候会被调用呢?难道就只能等待5分钟超时才能被调用吗?那不是每一次rebalance都必须要等待5分钟?当然不可能啦,这里就需要先看下tryComplete函数的内容,发现这个内容会去检查还没连上的member,如果发现到期了,就强制完成。那么我们看下这tryComplete是在哪儿被调用的?这里需要插入一点之前没贴全的代码,在doJoinGroup函数中的而最后一段:

if (group.is(PreparingRebalance))
      joinPurgatory.checkAndComplete(GroupKey(group.groupId))

这段代码非常关键,当当前状态是PreparingRebalance的时候,会尝试去完成当前的延时任务,最终调用的代码:

  private[server] def maybeTryComplete(): Boolean = {
    var retry = false
    var done = false
    do {
      if (lock.tryLock()) {
        try {
          tryCompletePending.set(false)
          done = tryComplete()
        } finally {
          lock.unlock()
        }
        // While we were holding the lock, another thread may have invoked `maybeTryComplete` and set
        // `tryCompletePending`. In this case we should retry.
        retry = tryCompletePending.get()
      } else {
        // Another thread is holding the lock. If `tryCompletePending` is already set and this thread failed to
        // acquire the lock, then the thread that is holding the lock is guaranteed to see the flag and retry.
        // Otherwise, we should set the flag and retry on this thread since the thread holding the lock may have
        // released the lock and returned by the time the flag is set.
        retry = !tryCompletePending.getAndSet(true)
      }
    } while (!isCompleted && retry)
    done
  }

就是上面的tryComplete函数,最终会调用到DelayedJoin中的tryComplete函数,什么意思呢?已经很明显了,每来一个JOIN请求的时候,如果处于PreparingRebalance阶段,都会去检查一下group中原来的成员是否已经到齐了,到齐了就立刻结束JOIN阶段往后走。看到这儿,回头看下InitialDelayedJoin这个延时任务的tryComplete为什么就默认实现了个false呢?也明白了,就是初始化延时任务的时候不让你尝试完成,我就等3s,不需要你们来触发我提前完成。

以上,我们就看完了整个服务端的JOIN请求处理过程,其实主要核心就是这两个延时任务,如果不联系上下文,不了解时间轮机制,看起来确实费劲。接下来就看下SYNC阶段是如何处理的。


七、SYNC阶段源码分析

直接看下面的核心源码逻辑:

  private def doSyncGroup(group: GroupMetadata,
                          generationId: Int,
                          memberId: String,
                          groupAssignment: Map[String, Array[Byte]],
                          responseCallback: SyncCallback) {
    group.inLock {
      if (!group.has(memberId)) {
        responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)
      } else if (generationId != group.generationId) {
        responseCallback(Array.empty, Errors.ILLEGAL_GENERATION)
      } else {
        group.currentState match {
          case Empty | Dead =>
            responseCallback(Array.empty, Errors.UNKNOWN_MEMBER_ID)

          case PreparingRebalance =>
            responseCallback(Array.empty, Errors.REBALANCE_IN_PROGRESS)
          // 只有group处于compeletingRebalance状态下才会被处理
          // 其余状态都是错误的状态
          case CompletingRebalance =>
            // 给当前member设置回调,之后就啥也不干,也不返回
            // 等到leader的分区方案就绪后,才会被返回。
            group.get(memberId).awaitingSyncCallback = responseCallback

            // if this is the leader, then we can attempt to persist state and transition to stable
            // 只有收到leader的SYNC才会被处理,并进行状态机流转
            if (group.isLeader(memberId)) {
              info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")

              // fill any missing members with an empty assignment
              val missing = group.allMembers -- groupAssignment.keySet
              val assignment = groupAssignment ++ missing.map(_ -> Array.empty[Byte]).toMap

              groupManager.storeGroup(group, assignment, (error: Errors) => {
                group.inLock {
                  // another member may have joined the group while we were awaiting this callback,
                  // so we must ensure we are still in the CompletingRebalance state and the same generation
                  // when it gets invoked. if we have transitioned to another state, then do nothing
                  if (group.is(CompletingRebalance) && generationId == group.generationId) {
                    if (error != Errors.NONE) {
                      resetAndPropagateAssignmentError(group, error)
                      maybePrepareRebalance(group)
                    } else {
                      setAndPropagateAssignment(group, assignment)
                      // 状态机流转:CompletingRebalance -> Stable
                      group.transitionTo(Stable)
                    }
                  }
                }
              })
            }
          // 如果已经处于stable状态,说明leader已经把分区分配方案传上来了
          // 那么直接从group的元数据里面返回对应的方案就好了
          case Stable =>
            // if the group is stable, we just return the current assignment
            val memberMetadata = group.get(memberId)
            responseCallback(memberMetadata.assignment, Errors.NONE)
            // 开启心跳检测
            completeAndScheduleNextHeartbeatExpiration(group, group.get(memberId))
        }
      }
    }
  }

我们可能对上面的代码案处理会有一个疑问,为什么只有leader的SYNC请求才会被处理呢?要是其他consumer比leader早上来了难道就卡这儿不管了?不像JOIN阶段那样加入个时间轮设置个最大超时时间?这要是leader一直不发送SNYC请求,那不就所有成员都这儿干等着,无限等待了?
我们一个个来回答,首先,我们看上面的代码,每个请求过来第一件事是先设置回调,然后才去卡住等着,直到leader把分区分配方案通过SYNC请求带上来。
第二个问题,如果其他consumer比leader早到了就这么干等着吗?是的,没错,代码就是这么写的。
第三个问题,为什么不设置个最大超时时间啥的?我们可以看下客户端的代码,一旦开启rebalance之后,就只会进行相关请求的收发,意味着leader在收到JOIN阶段的返回后,中间不会有任何业务代码的影响,直接就是分配完分区然后发送SYNC请求;这就意味着leader的JOIN响应和SYNC请求之间理论上是不存在阻塞的,因此就可以不用设置超时,就不用加入时间轮了。
第四个问题,leader一直不发送SYNC请求就干等着?是的,代码就是这么写的。不过你想想,哪些情况能让leader一直不发送SYNC请求?我能想到的就是GC/leader宕机了,无论是哪种情况都会因为心跳线程出了问题被服务端检测到,因此在对应的心跳任务超时后重新开启下一轮的rebalance。哪怕是GC很长时间之后恢复了继续发SYNC请求过来,也会因为generation不匹配而得到错误返回开启下一轮rebalance。
最后再看下leader到了之后会具体做啥:

  private def setAndPropagateAssignment(group: GroupMetadata, assignment: Map[String, Array[Byte]]) {
    assert(group.is(CompletingRebalance))
    // 给每个member的分配方案赋值
    group.allMemberMetadata.foreach(member => member.assignment = assignment(member.memberId))
    // 在整个group中传播这个分配方案
    propagateAssignment(group, Errors.NONE)
  }
  
  private def propagateAssignment(group: GroupMetadata, error: Errors) {
    // 遍历
    // 如果是follower比leader先到SYNC请求
    // 那么就只会设置个callback,就啥都不干了,也不会返回
    // 直到leader带着分配方案来了以后,把状态更改为stable之后,才会遍历
    // 看看有哪些member已经发送了请求过来,设置了callback,然后一次性给他们返回回去对应的分区方案
    // 所以这个名称叫做【传播分配方案】
    // 真是绝妙
    for (member <- group.allMemberMetadata) {
      if (member.awaitingSyncCallback != null) {
        // 通过回调告诉member对应的分配方案
        member.awaitingSyncCallback(member.assignment, error)
        member.awaitingSyncCallback = null

        // reset the session timeout for members after propagating the member's assignment.
        // This is because if any member's session expired while we were still awaiting either
        // the leader sync group or the storage callback, its expiration will be ignored and no
        // future heartbeat expectations will not be scheduled.
        completeAndScheduleNextHeartbeatExpiration(group, member)
      }
    }
  }

看,最开始设置的回调,在收到leader请求时候,起了作用;会被挨个遍历后响应具体的分区分配方案,另外就是kafka里面的命名都很准确。

SYNC阶段简单说起来就是等待leader把分区分配方案传上来,如果member先到就设置个回调先等着,如果leader先到,就直接把分区分配方案存到group的元数据中,然后状态修改为Stable,后续其他member来的SYNC请求就直接从group的元数据取分区分配方案,然后自己消费去了。


八、线上如何排查rebalance问题?
看完理论,让我们来看下线上问题怎么排查rebalance问题。 rebalance有哪些问题呢?我们来整理一下:

  • 为什么会rebalance呢?是什么引起的?能定位到是哪个客户端嘛?

  • rebalance耗时了多久?为什么会劣化? 常见的就上面两个问题,我们按个来回答。

首先,为什么会rebalance,这个就三种情况,分区信息变化、客户端变化、coordinator变化
一般线上常见的就是客户端变化,那么客户端有哪些可能的变化呢?——新增成员,减少成员

新增成员怎么看呢?很简单嘛,找到coordinator,然后去kafka-request.log里面搜:cat kafka-request.log |grep -i find | grep -i ${group.id} 不过一般FIND_COORDINATOR请求的处理时间都小于10ms,所以只能打开debug日志才能看到。一般这种让客户自己看,对应的时间点是不是有启动kafka-consumer就行了,其实也不常见,这种情况。毕竟很少有人频繁开启关闭消费者,就算是有也是不好的业务使用方式。

减少成员呢?又分为两种:心跳超时,poll间隔超过配置 心跳超时的标识日志:

  def onExpireHeartbeat(group: GroupMetadata, member: MemberMetadata, heartbeatDeadline: Long) {
    group.inLock {
      if (!shouldKeepMemberAlive(member, heartbeatDeadline)) {
        // 标识日志
        info(s"Member ${member.memberId} in group ${group.groupId} has failed, removing it from the group")
        removeMemberAndUpdateGroup(group, member)
      }
    }
  }

很遗憾poll间隔超时,在1.1.0版本的info级别下并没有可查找的日志,检测poll时间间隔超时的是对应客户端的心跳线程,在检测到超过配置后就会主动leaveGroup从而触发rebalance,而这个请求在服务端依旧没有info级别的请求,因此,要判断是poll间隔超时引起的rebalance,就只能看下有没有上面心跳超时的日志,如果没有可能就是因为这个原因造成的。目前大多数的rebalance都是因为这个原因造成的,而且这个原因引发的rebalance同时还可能伴随着很长的rebalance耗时。
来看下服务端是如何做poll间隔超时的呢?

} else if (heartbeat.pollTimeoutExpired(now)) {
// the poll timeout has expired, which means that the foreground thread has stalled
// in between calls to poll(), so we explicitly leave the group.
    maybeLeaveGroup();
}

public boolean sessionTimeoutExpired(long now) {
    return now - Math.max(lastSessionReset, lastHeartbeatReceive) > sessionTimeout;
}

    public synchronized void maybeLeaveGroup() {
        if (!coordinatorUnknown() && state != MemberState.UNJOINED && generation != Generation.NO_GENERATION) {
            // this is a minimal effort attempt to leave the group. we do not
            // attempt any resending if the request fails or times out.
            log.debug("Sending LeaveGroup request to coordinator {}", coordinator);
            LeaveGroupRequest.Builder request =
                    new LeaveGroupRequest.Builder(groupId, generation.memberId);
            client.send(coordinator, request)
                    .compose(new LeaveGroupResponseHandler());
            client.pollNoWakeup();
        }

        resetGeneration();
    }

总结一下,怎么定位rebalance的问题,就是找标志日志,然后排除法,实在不行了就打开debug日志

接着看第二个问题,rebalance一次的时间耗费了多久?为什么会劣化到几分钟? 因为整个rebalance过程是线性的过程,就是状态按照请求顺序流转,因此呢找到对应的标志日志就好啦。 开启的标志日志:

// rebalance开始标志日志
info(s"Preparing to rebalance group ${group.groupId} with old generation ${group.generationId} " +
 s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

结束的两种标识日志:这两种结束日志都行,因为都差不多代表着rebalance过程完成,原因在上面已经讲的很清楚了。

 // JOIN阶段标志结束日志
 info(s"Stabilized group ${group.groupId} generation ${group.generationId} " +
 s"(${Topic.GROUP_METADATA_TOPIC_NAME}-${partitionFor(group.groupId)})")

// SYNC阶段结束日志
info(s"Assignment received from leader for group ${group.groupId} for generation ${group.generationId}")

那么如何统计整个rebalance过程的时间呢? 显而易见,结束时间 - 开始时间呀。

知道是怎么什么原因开启了rebalance之后,该怎么定位业务问题呢? 心跳超时:因为心跳线程是守护线程,一般都是因为客户端的机器负载太高导致心跳现场无法获取到CPU导致的。
poll间隔超过配置:显然嘛,就是poll出来数据之后,进行业务处理的时候太慢了,建议根据业务优化消费逻辑,改成多线程消费或者异步消费。


九、消费者如何感知到rebalance的呢?

这个很简单,我们想一下,与这个group有关的元数据全部都在coordinator那里,哪些请求会与coordinator交互呢?HEARTBEAT/OFFSET_COMMIT嘛,就这俩,那么其实正常的member都是靠这两个请求来感知到自己要去进行rebalance的,我们分别来看下。

首先是HEARTBEAT请求,每次都会带上当前消费组的generation值,也就是纪元值,要是服务端rebalance已经完成了,纪元值+1,那么此时就会发现自己没匹配上,然后紧接着就去设置自己的RejoinNeeded的标识,在下一轮poll 的时候就会去开启rebalance。
如果说是rebalance还没完成,那就更简单了,发现group的状态不是stable,直接就返回对应的错误,然后设置标识,加入到rebalance过程中。
服务端源码:

        case Some(group) =>
          group.inLock {
            group.currentState match {
              case Dead =>
                // if the group is marked as dead, it means some other thread has just removed the group
                // from the coordinator metadata; this is likely that the group has migrated to some other
                // coordinator OR the group is in a transient unstable phase. Let the member retry
                // joining without the specified member id,
                responseCallback(Errors.UNKNOWN_MEMBER_ID)

              case Empty =>
                responseCallback(Errors.UNKNOWN_MEMBER_ID)

              case CompletingRebalance =>
                if (!group.has(memberId))
                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                else
                  responseCallback(Errors.REBALANCE_IN_PROGRESS)

              case PreparingRebalance =>
                if (!group.has(memberId)) {
                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                } else if (generationId != group.generationId) {
                  responseCallback(Errors.ILLEGAL_GENERATION)
                } else {
                  val member = group.get(memberId)
                  completeAndScheduleNextHeartbeatExpiration(group, member)
                  responseCallback(Errors.REBALANCE_IN_PROGRESS)
                }

              case Stable =>
                if (!group.has(memberId)) {
                  responseCallback(Errors.UNKNOWN_MEMBER_ID)
                  // 纪元切换
                } else if (generationId != group.generationId) {
                  responseCallback(Errors.ILLEGAL_GENERATION)
                } else {
                  val member = group.get(memberId)
                  // 完成上次的延时,新建新的延时任务
                  completeAndScheduleNextHeartbeatExpiration(group, member)
                  // 回调响应
                  responseCallback(Errors.NONE)
                }

客户端源码:

    private class HeartbeatResponseHandler extends CoordinatorResponseHandler<HeartbeatResponse, Void> {
        @Override
        public void handle(HeartbeatResponse heartbeatResponse, RequestFuture<Void> future) {
            sensors.heartbeatLatency.record(response.requestLatencyMs());
            Errors error = heartbeatResponse.error();
            if (error == Errors.NONE) {
                log.debug("Received successful Heartbeat response");
                future.complete(null);
            } else if (error == Errors.COORDINATOR_NOT_AVAILABLE
                    || error == Errors.NOT_COORDINATOR) {
                log.debug("Attempt to heartbeat since coordinator {} is either not started or not valid.",
                        coordinator());
                markCoordinatorUnknown();
                future.raise(error);
            } else if (error == Errors.REBALANCE_IN_PROGRESS) {
                log.debug("Attempt to heartbeat failed since group is rebalancing");
                requestRejoin();
                future.raise(Errors.REBALANCE_IN_PROGRESS);
            } else if (error == Errors.ILLEGAL_GENERATION) {
                log.debug("Attempt to heartbeat failed since generation {} is not current", generation.generationId);
                resetGeneration();
                future.raise(Errors.ILLEGAL_GENERATION);
            } else if (error == Errors.UNKNOWN_MEMBER_ID) {
                log.debug("Attempt to heartbeat failed for since member id {} is not valid.", generation.memberId);
                resetGeneration();
                future.raise(Errors.UNKNOWN_MEMBER_ID);
            } else if (error == Errors.GROUP_AUTHORIZATION_FAILED) {
                future.raise(new GroupAuthorizationException(groupId));
            } else {
                future.raise(new KafkaException("Unexpected error in heartbeat response: " + error.message()));
            }
        }
    }
	
    protected synchronized void requestRejoin() {
        this.rejoinNeeded = true;
    }

所以我们客户端看到这种异常,就知道怎么回事了,就是我在rebalance的过程中,或者已经完成了,客户端的纪元不对。

    REBALANCE_IN_PROGRESS(27, "The group is rebalancing, so a rejoin is needed.",
        new ApiExceptionBuilder() {
            @Override
            public ApiException build(String message) {
                return new RebalanceInProgressException(message);
            }
        }),

    ILLEGAL_GENERATION(22, "Specified group generation id is not valid.",
        new ApiExceptionBuilder() {
            @Override
            public ApiException build(String message) {
                return new IllegalGenerationException(message);
            }
        }),

我们再看OFFSET_COMMIT请求,其实和HEARTBEAT请求是基本一致的。
服务端:

    group.inLock {
      if (group.is(Dead)) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
      } else if ((generationId < 0 && group.is(Empty)) || (producerId != NO_PRODUCER_ID)) {
        // The group is only using Kafka to store offsets.
        // Also, for transactional offset commits we don't need to validate group membership and the generation.
        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback, producerId, producerEpoch)
      } else if (group.is(CompletingRebalance)) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.REBALANCE_IN_PROGRESS))
      } else if (!group.has(memberId)) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.UNKNOWN_MEMBER_ID))
      } else if (generationId != group.generationId) {
        responseCallback(offsetMetadata.mapValues(_ => Errors.ILLEGAL_GENERATION))
      } else {
        val member = group.get(memberId)
        completeAndScheduleNextHeartbeatExpiration(group, member)
        groupManager.storeOffsets(group, memberId, offsetMetadata, responseCallback)
      }
    }
  }

客户端:

else if (error == Errors.UNKNOWN_MEMBER_ID
                            || error == Errors.ILLEGAL_GENERATION
                            || error == Errors.REBALANCE_IN_PROGRESS) {
                        // need to re-join group
                        resetGeneration();
                        future.raise(new CommitFailedException());
                        return;

    /**
     * Reset the generation and memberId because we have fallen out of the group.
     */
    protected synchronized void resetGeneration() {
        this.generation = Generation.NO_GENERATION;
        this.rejoinNeeded = true;
        this.state = MemberState.UNJOINED;
    }

从源码我们可以看到,客户端在感知rebalance主要通过两个机制,一个是状态,一个是纪元;状态生效于rebalance过程中,纪元生效于rebalance的JOIN阶段结束后。
与coordinator交互的这两个请求都会带上自己的纪元信息,在服务端处理前都会校验一下状态已经纪元信息,一旦不对,就告诉客户端你需要rebalance了。


十、线上如何减小rebalance的影响?

首先明确下,rebalance会有什么影响?引用JVM的术语来说,就是STOP THE WORLD
一旦开启rebalance过程,在消费者进入JOIN阶段后就无法再继续消费,就是整个group的成员全部STW,所以对业务的影响还是很大的。

“KAFKA中rebalance是什么”的内容就介绍到这里了,感谢大家的阅读。如果想了解更多行业相关的知识可以关注天达云网站,小编将为大家输出更多高质量的实用文章!

返回云计算教程...