如何对FastLeaderElection进行源码解析
更新:HHH   时间:2023-1-7


这篇文章将为大家详细讲解有关如何对FastLeaderElection进行源码解析,文章内容质量较高,因此小编分享给大家做个参考,希望大家阅读完这篇文章后对相关知识有一定的了解。

zookeeper作为常用的分布式协调器、注册中心,在初始化启动、leader宕机、或者与leader心跳超时等情况下,为保证集群可用性会进行重新选主。选主的默认算法是FastLeaderElection,本文会对FastLeaderElection进行解析。

选主入口

 在QuorumPeer.run()方法中,进行主循环while(running)。如果当前状态为LOOKING,则进入重新选主的方法lookForLeader。

//主循环
while (running){
    ...

    switch (getPeerState()) {
        case LOOKING:
            ...
            //调用makeLEStrategy获取选主策略,默认为FastLeaderElection
            //进入lookForLeader方法,开启重新选主。
            //把lookForLeader返回的结果调用setCurrentVote缓存起来。
            setCurrentVote(makeLEStrategy().lookForLeader());
            ...
            break;
        case OBSERVING:
            ...
            break;
        case FOLLOWING:
            ...
            break;
        case LEADING:
            break;
    }
}

 选主流程

进入FastLeaderElection#lookForLeader后,代码大致逻辑如下

1.leader选举周期版本号+1,发起第一次投票,默认选自己。

2.开启循环,接收其他zk节点发来的投票通知

    2.1接收不到其他节点的投票时,指数级延长接收等待时间并重新执行步骤2的循环

    2.2接收到其他节点投票后,判断投票通知来源节点的状态

        2.2.1投票来源节点为LOOKING状态,比较投票通知和本地投票信息缓存。比较投票周期版本号、决议zxid、zk节点的序号,判断哪个更有效。

            2.2.1.2比较后重新计算本机当前的投票信息,包括推选的leader id、推选leader的决议zxid、leader选举周期版本号,并发送给集群其他节点

            2.2.1.3判断本机最新的投票信息,在本机接收到的所有投票通知缓存中的数量,判断是否超过集群总数的一半

                2.2.1.3.1未超过集群总数的一半,重新执行步骤2的循环

                2.2.1.3.2超过集群总数的一半,尝试循环拉取投票队列中剩余的其他票。

                    2.2.1.3.2.1如果拉取到了其他投票,过滤掉无效的票,重新执行步骤2的循环。

                    2.2.1.3.2.2如果没有拉取到其他选票,设置当前节点的状态,LEADING、 FOLLOWING或者OBSERVING,清除缓存,返回最终leader的结果。

        2.2.2投票来源节点为OBSERVING,此状态的节点不能投票,丢弃此票,继续执行步骤2的循环。

        2.2.3投票来源节点为FOLLOWING,说明在集群内部已经选出主节点了,此时放弃自己的投票,查询outofelection中的其他节点的投票缓存,直接确定leader。设置当前节点状态为FOLLOWING或者OBSERVING,返回选票结果。

        2.2.4投票来源节点为LEADING,说明在集群内部已经选出主节点了,此时放弃自己的投票,查询outofelection中的其他节点的投票缓存,直接确定leader。设置当前节点状态为FOLLOWING或者OBSERVING,返回选票结果。

    /**
     * 开始新一轮leader选举。每当我们的服务节点状态变更为LOOKING时,调用此方法,然后向所有其他对等方发送通知
     */
    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }

        self.start_fle = Time.currentElapsedTime();
        try {
            /**
             * 当前领导人选举的选票存储在recvset这个map中。换句话说,选票Vote在recvset中
             * 必须满足条件v.electionEpoch == logicalclock(判定这个选票,是否属于这次选举周期)。
             * 当前选举参与节点使用recvset来推断是否大多选举参与节点投了赞成票。
             */
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();


            /**
             * 历次leader选举的票数以及本次leader选举的票数均会存储在outofelection中。
             * 注意,处于LOOKING状态的通知不会存储在outofelection中。
             *
             * 如果当前节点参与选举时,集群其他节点已经选举出了leader,则根据这个map中的数据直接跟随集群leader
             */
            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
                //选举的逻辑周期+1,表示发起新一届选举
                logicalclock.incrementAndGet();
                //更新投票信息,第一个参数表示选谁(默认选自己),第二个参数表示当前本地日志中最新的决议zxid,第三个参数表示新的leader周期的epoch编号
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
            //发送投票信息,广播给其他节点
            sendNotifications();

            SyncedLearnerTracker voteSet = null;

            /*
             * 在循环中,我们和其他zk节点交换通知,直到找到一个leader
             */

            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {

                //从接收队列中,拉取一个其他节点的投票通知,拉取超时时间为200ms
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);


                //200ms内没有拉取到新的投票通知,则发送更多投票通知。
                //拉取到了则否则处理新投票通知。
                if (n == null) {
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                        manager.connectAll();
                    }

                    /*
                     * 指数级延长等待时间
                     */
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    self.getQuorumVerifier().revalidateVoteset(voteSet, notTimeout != minNotificationInterval);

                    if (self.getQuorumVerifier() instanceof QuorumOracleMaj && voteSet != null && voteSet.hasAllQuorums() && notTimeout != minNotificationInterval) {
                        setPeerState(proposedLeader, voteSet);
                        Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                        leaveInstance(endVote);
                        return endVote;
                    }

                    LOG.info("Notification time out: {} ms", notTimeout);

                } else if (validVoter(n.sid) && validVoter(n.leader)) {

                    //判断这个投票的来源节点的状态
                    switch (n.state) {
                        //处于选举中的的状态
                    case LOOKING:

                        //当前日志中最新的zxid为-1,属于异常情况,直接退出
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }

                        //投票通知中的zxid为-1,也属于异常情况,直接退出
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                            break;
                        }
                        //判断投票通知中的leader选举周期版本号,是否大于本地选举周期中的版本号
                        if (n.electionEpoch > logicalclock.get()) {
                            //本地leader选举周期版本号不是最新的,更新成更新的leader选举周期版本号
                            logicalclock.set(n.electionEpoch);
                            //清除空旧的选票map
                            recvset.clear();
                            //调用核心方法totalOrderPredicate
                            // 投票通知中的信息和本地初始化信息对比,判断投票是否有效
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                //把接收到的投票通知中的信息,更新到本地投票信息中
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                //还是和之前一样,使用本地的投票信息
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            //广播发送新的投票通知
                            sendNotifications();
                        } else if (n.electionEpoch < logicalclock.get()) {
                            //接收的投票通知中的leader选举周期版本号比本地的旧,则丢弃这个投票通知
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                            break;
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            //还是调用totalOrderPredicate方法
                            // 用投票通知中的信息,和本地当前缓存中最新的投票信息进行比对,如果投票中的比较新,则进入这里
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        LOG.debug(
                            "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                            n.sid,
                            n.leader,
                            Long.toHexString(n.zxid),
                            Long.toHexString(n.electionEpoch));

                        //在选票map中存储选票
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        //proposedLeader、proposedZxid、proposedEpoch三个参数,表示当前zk服务根据已经接收到的投票,确定出的本机投选的leader信息
                        //调用getVoteTracker方法,对本机投选的leader的选票进行统计
                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
                        //本机投选的leader是否获取了半数以上的票
                        if (voteSet.hasAllQuorums()) {

                            // 拉取投票接收队列中的剩余选票
                            while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                //判断选票是否更有效
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    //如果有更有效的选票,则放回队列
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /**
                             * 没有拉取到新的投票通知了,那么表示所有服务都不改票了
                             */
                            if (n == null) {
                                //根据投票结果,设置当前节点的状态,LEADING、 FOLLOWING或者OBSERVING
                                setPeerState(proposedLeader, voteSet);
                                //创建最终选举结果的选票对象
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                //清空接收队列
                                leaveInstance(endVote);
                                //返回最终选举结果
                                return endVote;
                            }
                        }
                        break;
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;

                       
                    case FOLLOWING:
                        //收到的投票通知中,投票来源的机器是FOLLOWING,说明在集群内部已经选出主节点了
                        //此时放弃自己的投票,查询outofelection中的投票,直接确定leader
                        Vote resultFN = receivedFollowingNotification(recvset, outofelection, voteSet, n);
                        if (resultFN == null) {
                            break;
                        } else {
                            return resultFN;
                        }
                    case LEADING:
                        //收到的投票通知中,投票来源的机器是LEADING,说明在集群内部已经选出主节点了
                        //此时放弃自己的投票,查询outofelection中的投票,直接确定leader
                        Vote resultLN = receivedLeadingNotification(recvset, outofelection, voteSet, n);
                        if (resultLN == null) {
                            break;
                        } else {
                            return resultLN;
                        }
                    default:
                        LOG.warn("Notification state unrecognized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if (self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
        }
    }

核心投票有效性抉择逻辑

 选LEADER主流程中,多次调用FastLeaderElection#totalOrderPredicate进行了投票有效性的比较,该逻辑是选主有效性抉择的核心逻辑。优先比较leader选举周期版本号epoch,相等时比较决议zxid,都相等时直接使用leader机器id大的。

 /**
     * 检查接收到的投票通知,是否为有效投票
     * @param newId 新投票中的leader节点id
     * @param newZxid 新投票中的决议id
     * @param newEpoch 新投票中的leader选举周期版本号
     * @param curId 当前本机投票中的leader节点id
     * @param curZxid 当前本机投票中的决议id
     * @param curEpoch 当前本机投票中的leader选举周期版本号
     * @return 新投票更有效时返回true,否则false
     */
    protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug(
            "id: {}, proposed id: {}, zxid: 0x{}, proposed zxid: 0x{}",
            newId,
            curId,
            Long.toHexString(newZxid),
            Long.toHexString(curZxid));

        if (self.getQuorumVerifier().getWeight(newId) == 0) {
            return false;
        }

        /*
         *如果以下三种情况之一成立,则返回true:
         *1-投票中的leader选举周期版本号大于本地的
         *2-leader选举版本号相同,但投票中的决议号zxid更高
         *3-leader选举版本号和决议号zxid都相同,则比较zk机器本身的机器编号,判断投票中的机器编号是否大于当前机器编号
         */
        return ((newEpoch > curEpoch)
                || ((newEpoch == curEpoch)
                    && ((newZxid > curZxid)
                        || ((newZxid == curZxid)
                            && (newId > curId)))));
    }

关于如何对FastLeaderElection进行源码解析就分享到这里了,希望以上内容可以对大家有一定的帮助,可以学到更多知识。如果觉得文章不错,可以把它分享出去让更多的人看到。

返回编程语言教程...