zookeeper(7)源码分析-集群Leader选举FastLeaderElection
更新:HHH   时间:2023-1-7


一、Election接口

选举的父接口为Election,其定义了lookForLeader和shutdown两个方法,lookForLeader表示寻找Leader,shutdown则表示关闭,如关闭服务端之间的连接。

Election接口的实现类

1、AuthFastLeaderElection,同FastLeaderElection算法基本一致,只是在消息中加入了认证信息,在3.4.0版本后已经被弃用。
2、FastLeaderElection,是标准的fast paxos算法的实现,基于TCP协议进行选举。
3、LeaderElection,在3.4.0版本后已经被弃用。

FastLeaderElection分析

1、重要内部类

1、Notification类

Notification表示收到的选举投票信息(其他服务器发来的选举投票信息),其包含了投票中被选举者的服务器sid、zxid、选举周期epoch,选举者的服务器sid,状态,选周期epoch

static public class Notification {
        /*
         * Format version, introduced in 3.4.6
         */

        public final static int CURRENTVERSION = 0x2;
        int version;

        /*
         * Proposed leader 被选举者的服务器id
         */
        long leader;

        /*
         * zxid of the proposed leader 被选举者的事务zxid
         */
        long zxid;

        /*
         * Epoch 选举者的选举周期
         */
        long electionEpoch;

        /*
         * current state of sender 选举者的节点状态
         * 总共有4中
         * LOOKING 寻找leader状态
         * FOLLOWING 跟随者
         * LEADING leader状态
                 *OBSERVING 不参与操作和选举
         */
        QuorumPeer.ServerState state;

        /*
         * Address of sender 选举者的服务器id
         */
        long sid;

        QuorumVerifier qv;
        /*
         * epoch of the proposed leader 被选举者的选举周期
         */
        long peerEpoch;
    }

2、ToSend类

ToSend表示发送给其他服务器的选举投票信息,也包含了被选举者的sid、zxid、选举周期等信息。

static public class ToSend {
        static enum mType {crequest, challenge, notification, ack}

        ToSend(mType type,
                long leader,
                long zxid,
                long electionEpoch,
                ServerState state,
                long sid,
                long peerEpoch,
                byte[] configData) {

            this.leader = leader;
            this.zxid = zxid;
            this.electionEpoch = electionEpoch;
            this.state = state;
            this.sid = sid;
            this.peerEpoch = peerEpoch;
            this.configData = configData;
        }

        /*
         * Proposed leader in the case of notification 被推举的leader的sid
         */
        long leader;

        /*
         * id contains the tag for acks, and zxid for notifications 
         * 被推举的leader的最大事务id
         */
        long zxid;

        /*
         * Epoch 选举者的选举周期
         */
        long electionEpoch;

        /*
         * Current state; 选举者的节点状态
         */
        QuorumPeer.ServerState state;

        /*
         * Address of recipient选举者的服务器sid
         */
        long sid;

        /*
         * Used to send a QuorumVerifier (configuration info)
         */
        byte[] configData = dummyData;

        /*
         * Leader epoch 被选举者的选举周期
         */
        long peerEpoch;
    }

3、Messenger类

3.1、内部类

Messenger包含了WorkerReceiver和WorkerSender两个内部类
1、WorkerReceiver继承了ZooKeeperThread,是选票接收器。
2、其会不断地从QuorumCnxManager中的recvQueue获取其他服务器发来的选举消息,类型是Message

WorkerReceiver(QuorumCnxManager manager) {
                super("WorkerReceiver");
                this.stop = false;
                this.manager = manager;
            }

//从QuorumCnxManager中的recvQueue中获取投票消息
                        response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
                        if(response == null) continue;                      

并将其转换成一个选票消息Notification,然后保存到recvqueue中,在选票接收过程中,如果发现该外部选票的选举轮次小于当前服务器的,那么忽略该外部投票,同时立即发送自己的内部投票,把投票信息组装成ToSend加入到sendqueue队列。

ToSend notmsg = new ToSend(ToSend.mType.notification,
                                            v.getId(),
                                            v.getZxid(),
                                            logicalclock.get(),
                                            self.getPeerState(),
                                            response.sid,
                                            v.getPeerEpoch(),
                                            qv.toString().getBytes());
                                    sendqueue.offer(notmsg);

3、WorkerSender也继承了ZooKeeperThread,为选票发送器,其会不断地从sendqueue中获取待发送的选票

ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);

并将其传递到底层QuorumCnxManager中,其过程是将FastLeaderElection的ToSend转化为QuorumCnxManager的Message。

void process(ToSend m) {
                ByteBuffer requestBuffer = buildMsg(m.state.ordinal(),
                                                    m.leader,
                                                    m.zxid,
                                                    m.electionEpoch,
                                                    m.peerEpoch,
                                                    m.configData);

                manager.toSend(m.sid, requestBuffer);

            }
3.2、Messenger构造函数
 Messenger(QuorumCnxManager manager) {
                        //创建WorkerSender
            this.ws = new WorkerSender(manager);
                        // 新创建线程
            this.wsThread = new Thread(this.ws,
                    "WorkerSender[myid=" + self.getId() + "]");
                         // 设置为守护线程
            this.wsThread.setDaemon(true);
                         // 创建WorkerReceiver
            this.wr = new WorkerReceiver(manager);
                        // 新创建线程
            this.wrThread = new Thread(this.wr,
                    "WorkerReceiver[myid=" + self.getId() + "]");
                         // 设置为守护线程             
            this.wrThread.setDaemon(true);
        }

2、FastLeaderElection类属性

    // 完成Leader选举之后需要等待时长
    final static int finalizeWait = 200;
    // 两个连续通知检查之间的最大时长
    final static int maxNotificationInterval = 60000;       
        // 管理服务器之间的连接
    QuorumCnxManager manager;
        // 选票发送队列,用于保存待发送的选票
    LinkedBlockingQueue<ToSend> sendqueue;

    // 选票接收队列,用于保存接收到的外部投票
    LinkedBlockingQueue<Notification> recvqueue;
        //投票者
        QuorumPeer self;
    Messenger messenger;
        //逻辑始终,当前选举周期
    AtomicLong logicalclock = new AtomicLong(); /* Election instance */
        //被选举者服务器sid
    long proposedLeader;
        //被选举者服务器zxid
    long proposedZxid;
        //被选举者服务器选举周期
    long proposedEpoch;

FastLeaderElection核心方法

1、发送选票

其会遍历所有的参与者投票集合,然后将自己的选票信息发送至上述所有的投票者集合,其并非同步发送,而是将ToSend消息放置于sendqueue中,之后由WorkerSender进行发送

private void sendNotifications() {
        for (long sid : self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = self.getQuorumVerifier();
                         // 构造发送消息
            ToSend notmsg = new ToSend(ToSend.mType.notification,
                    proposedLeader,
                    proposedZxid,
                    logicalclock.get(),
                    QuorumPeer.ServerState.LOOKING,
                    sid,
                    proposedEpoch, qv.toString().getBytes());
            if(LOG.isDebugEnabled()){
                LOG.debug("Sending Notification: " + proposedLeader + " (n.leader), 0x"  +
                      Long.toHexString(proposedZxid) + " (n.zxid), 0x" + Long.toHexString(logicalclock.get())  +
                      " (n.round), " + sid + " (recipient), " + self.getId() +
                      " (myid), 0x" + Long.toHexString(proposedEpoch) + " (n.peerEpoch)");
            }
                        // 将发送消息放置于队列
            sendqueue.offer(notmsg);
        }
    }

2、totalOrderPredicate函数

该函数将接收的投票与自身投票进行PK,查看是否消息中包含的服务器id是否更优,其按照epoch、zxid、id的优先级进行PK。

protected boolean totalOrderPredicate(long newId, long newZxid, long newEpoch, long curId, long curZxid, long curEpoch) {
        LOG.debug("id: " + newId + ", proposed id: " + curId + ", zxid: 0x" +
                Long.toHexString(newZxid) + ", proposed zxid: 0x" + Long.toHexString(curZxid));
        if(self.getQuorumVerifier().getWeight(newId) == 0){
            return false;
        }

        /*
         * We return true if one of the following three cases hold:
         * 1- New epoch is higher
         * 2- New epoch is the same as current epoch, but new zxid is higher
         * 3- New epoch is the same as current epoch, new zxid is the same
         *  as current zxid, but server id is higher.
         */
        // 1. 判断消息里的epoch是不是比当前的大,如果大则消息中id对应的服务器就是leader
        // 2. 如果epoch相等则判断zxid,如果消息里的zxid大,则消息中id对应的服务器就是leader
        // 3. 如果前面两个都相等那就比较服务器id,如果大,则其就是leader
        return ((newEpoch > curEpoch) ||
                ((newEpoch == curEpoch) &&
                ((newZxid > curZxid) || ((newZxid == curZxid) && (newId > curId)))));
    }

3、termPredicate方法

该函数用于判断Leader选举是否结束,即是否有一半以上的服务器选出了相同的Leader,其过程是将收到的选票与当前选票进行对比,选票相同的放入同一个集合,之后判断选票相同的集合是否超过了半数。

protected boolean termPredicate(Map<Long, Vote> votes, Vote vote) {
        SyncedLearnerTracker voteSet = new SyncedLearnerTracker();
        voteSet.addQuorumVerifier(self.getQuorumVerifier());
        if (self.getLastSeenQuorumVerifier() != null
                && self.getLastSeenQuorumVerifier().getVersion() > self
                        .getQuorumVerifier().getVersion()) {
            voteSet.addQuorumVerifier(self.getLastSeenQuorumVerifier());
        }

        /*
         * First make the views consistent. Sometimes peers will have different
         * zxids for a server depending on timing.
         */
        for (Map.Entry<Long, Vote> entry : votes.entrySet()) {
            if (vote.equals(entry.getValue())) {
                voteSet.addAck(entry.getKey());
            }
        }

        return voteSet.hasAllQuorums();
    }

4、lookForLeader函数

1、该函数用于开始新一轮的Leader选举,其首先会将逻辑时钟自增,然后更新本服务器的选票信息(初始化选票),之后将选票信息放入sendqueue等待发送给其他服务器

2、每台服务器会不断地从recvqueue队列中获取外部选票,处理外部选票。

Notification n = recvqueue.poll(notTimeout,
                        TimeUnit.MILLISECONDS);

3、判断选举轮次,选票PK,更新选票

if(totalOrderPredicate(n.leader, n.zxid, n.peerEpoch,
                                    getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                updateProposal(getInitId(),
                                        getInitLastLoggedZxid(),
                                        getPeerEpoch());
                            }
                            sendNotifications();

4、归档选票,统计选票,返回最后的选票

/*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            if (n == null) {
                                 //设置leading状态,否则设置为flowing
                                self.setPeerState((proposedLeader == self.getId()) ?
                                        ServerState.LEADING: learningState());
                                 //最终选票              
                                Vote endVote = new Vote(proposedLeader,
                                        proposedZxid, logicalclock.get(), 
                                        proposedEpoch);
                                // 清空recvqueue队列的选票             
                                leaveInstance(endVote);
                                return endVote;
                            }

总结

FastLeaderElection的算法,其是ZooKeeper的核心部分,比较复杂,梳理了一下大概的流程,好多细节没有展开。

返回编程语言教程...