Zookeeper的Leader选举

前面我们聊了一下ZAB协议以及Zookeeper的基础概念,心想着都到这个份上了,那还是把剩下的“Leader选举”、“分布式锁”、“惊群和脑裂”都跟大家简单聊聊,这些知识应该足够准备校招的你造火箭了。

今天首先说一下Zookeeper的Leader选举流程以及其中涉及的FastLeaderElection选举算法

说在前面

ZAB协议是保证Zookeeper集群数据一致性协议其中会涉及选举流程,FastLeaderElection是Zookeeper选举Leader的算法之一。这两点概念一定要搞清楚,不然很容易混为一谈。

Leader选举

两个关键时期:

  • 启动Zookeeper集群时
  • Leader崩溃进行崩溃恢复时

一些基础概念你需要提前预知:

1.对选举Leader的要求:

选出的leader节点上要持有最高zxid
选出的leader要有过半数节点同意
复制代码

2.内置实现的选举算法

LeaderElection
FastLeaderElection(默认的)
AuthFastLeaderElection
复制代码

3.选举状态

LOOKING:竞选状态
FOLLOWING:跟随状态,同步leader状态,参与投票
OBSERVING:管擦状态,同步leader状态,不参与投票
LEADING:领导者状态
复制代码

4.部分名词

服务器id---myid(或后文的sid,集群模式下必有该配置项) 
事务id---服务器中存放的最大zxid
逻辑时钟---发起的投票轮数计数
复制代码

选举流程

Zookeeper要求集群机器必须是奇数个(避免脑裂,下文会讲),那么我们假设有三台服务器。接着介绍一下三台服务器的Leader选举流程。

  • 每个Server发出一个投票。由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID,使用(myid, ZXID)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。

PS:不懂什么叫为自己投票(不知道票的数据结构?),别急后面带你看源码!!!

  • 接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。

  • 处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行比较,比较规则如下:

     优先判断ZXID。ZXID(事务ID)比较大的服务器优先作为Leader。
     
     如果ZXID相同,那么就比较myid。myid(服务器ID)较大的服务器作为Leader服务器。
    复制代码

对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。

PS:于是更新自己的投票为(2, 0)?

其涵义指的是将自己下次发出的投票信息更新为(2, 0),以该票作为新的投票依据。

  • 统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了Leader。

  • 改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING。


简而言之

1.每个服务实例均发起选举自己为leader的投票。

2.其他服务实例收到投票邀请时,比较发起者的数据事务id是否比自己最新的事务ID大,大则给它投一票,小则不投票,相等则比较发起者的服务器ID,大则投票给它 。

3.发起者收到大家的投票反馈后,看投票数(包括自己的票数)是否大于集群的半数,大于则成为leader,未超过半数且leader未选出,则再次发起投票。

Leader选举算法

在了解了选举流程后我们介绍一下Zookeeper源码中对于算法中的实现细节。

借助网上随处可以百度到的算法描述,我再一次针对其中涉及的疑难点做一个解说,其大致流程如下:

第一次投票。无论哪种导致进行Leader选举,集群的所有机器都处于试图选举出一个Leader的状态,即LOOKING状态,LOOKING机器会向所有其他机器发送消息,该消息称为投票。投票中包含了SID(服务器的唯一标识)和ZXID(事务ID),(SID, ZXID)形式来标识一次投票信息。

假定Zookeeper由5台机器组成,SID分别为1、2、3、4、5,ZXID分别为9、9、9、8、8,并且此时SID为2的机器是Leader机器,某一时刻,1、2所在机器出现故障,因此集群开始进行Leader选举。在第一次投票时,每台机器都会将自己作为投票对象,于是SID为3、4、5的机器投票情况分别为(3, 9),(4, 8), (5, 8)。

此时五台机器手里的投票分别为:
服务器一:(1,9)假设故障 ×
服务器二:(2,9)假设故障 ×
服务器三:(3,9)
服务器四:(4,8)
服务器五:(5,8)
复制代码

变更投票。每台机器发出投票后,也会收到其他机器的投票,每台机器会根据一定规则来处理收到的其他机器的投票,并以此来决定是否需要变更自己的投票,这个规则也是整个Leader选举算法的核心所在,其中术语描述如下

vote_sid:接收到的投票中所推举Leader服务器的SID。

vote_zxid:接收到的投票中所推举Leader服务器的ZXID。

self_sid:当前服务器自己的SID。

self_zxid:当前服务器自己的ZXID。
复制代码

每次对收到的投票的处理,都是对(vote_sid, vote_zxid)和(self_sid, self_zxid)对比的过程。

规则一:如果vote_zxid大于self_zxid,就认可当前收到的投票,并再次将该投票发送出去。(接收到的事务id大于自己当前事务id)

规则二:如果vote_zxid小于self_zxid,那么坚持自己的投票,不做任何变更。(接收到的事务id小于自己当前事务id)

规则三:如果vote_zxid等于self_zxid,那么就对比两者的SID,如果vote_sid大于self_sid,那么就认可当前收到的投票,并再次将该投票发送出去。(事务ID相等比较服务器ID及zxid)

规则四:如果vote_zxid等于self_zxid,并且vote_sid小于self_sid,那么坚持自己的投票,不做任何变更。

具体流程如图:

确定Leader。经过第二轮投票后,集群中的每台机器都会再次接收到其他机器的投票,然后开始统计投票,如果一台机器收到了超过半数的相同投票,那么这个投票对应的SID机器即为Leader。此时Server3将成为Leader。

选举流程源码

光说不练假把式,搞懂了Leader选举的基本流程,再来探究一下源码,源码之下无秘密!

用我的地址去拉取源码可能会快些。Zookeeper源码 git clone

投票数据结构

我们先解决前面的疑惑投票(或者说票)到底是什么结构?

public class Vote {
    ...
    private final int version;// 版本号

    private final long id;//被推举的Leader的SID

    private final long zxid;//被推举的Leader事务ID

    private final long electionEpoch;//逻辑时钟,用来判断多个投票是否在同一轮选举周期中,每轮自加1

    private final long peerEpoch;//被推举的Leader的epoch

    private final ServerState state;//当前服务器的状态
    ...
}
// 服务器状态
public enum ServerState {
        LOOKING,
        FOLLOWING,
        LEADING,
        OBSERVING
}
复制代码

知道我们投的是什么票了,接下来我们理一下整个算法流程。

源码入口

zookeeper\zookeeper-server\src\main\java\org\apache\zookeeper\server\quorum下

非核心代码我给大家省去了,如果有兴趣想研究,可以按着我的分析流程查看源码细节。

QuorumPeerMain.java

/**
 * To start the replicated server specify the configuration file name on
 * the command line.
 * @param args path to the configfile
 */
public static void main(String[] args) {
    QuorumPeerMain main = new QuorumPeerMain();
    try {
        main.initializeAndRun(args);//入口
    } catch (IllegalArgumentException e) {
      ...
    }
    LOG.info("Exiting normally");
    ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}

protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
    QuorumPeerConfig config = new QuorumPeerConfig();
    if (args.length == 1) {
        config.parse(args[0]);
    }

    // Start and schedule the the purge task
    DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
        config.getDataDir(),
        config.getDataLogDir(),
        config.getSnapRetainCount(),
        config.getPurgeInterval());
    purgeMgr.start();

    //判断是standalone模式还是集群模式
    if (args.length == 1 && config.isDistributed()) {
        //集群模式
        runFromConfig(config);
    } else {
        LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
        // there is only server in the quorum -- run as standalone
        ZooKeeperServerMain.main(args);
    }
}

public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
    try {
        ManagedUtil.registerLog4jMBeans();
    } catch (JMException e) {
        LOG.warn("Unable to register log4j JMX control", e);
    }

    LOG.info("Starting quorum peer");
    MetricsProvider metricsProvider;
    try {
        metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
            config.getMetricsProviderClassName(),
            config.getMetricsProviderConfiguration());
    } catch (MetricsProviderLifeCycleException error) {
        throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
    }
    try {
        ServerMetrics.metricsProviderInitialized(metricsProvider);
        ServerCnxnFactory cnxnFactory = null;
        ServerCnxnFactory secureCnxnFactory = null;

        //为客户端提供读写的server 及2181的端口
        if (config.getClientPortAddress() != null) {
            cnxnFactory = ServerCnxnFactory.createFactory();
            cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
        }

        if (config.getSecureClientPortAddress() != null) {
            secureCnxnFactory = ServerCnxnFactory.createFactory();
            secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
        }
        ...
        //启动主线程
        quorumPeer.start();
        ZKAuditProvider.addZKStartStopAuditLog();
        quorumPeer.join();
    } catch (InterruptedException e) {
        // warn, but generally this is ok
        LOG.warn("Quorum Peer interrupted", e);
    }
    ...
}
复制代码

调用 QuorumPeer 的 start方法

@Override
public synchronized void start() {
    if (!getView().containsKey(myid)) {
        throw new RuntimeException("My id " + myid + " not in the peer list");
    }
    //loaddatabase主要是从本地文件中恢复数据,以及获取最新的 zxid
    loadDataBase();
    startServerCnxnFactory();
    try {
        adminServer.start();
    } catch (AdminServerException e) {
        LOG.warn("Problem starting AdminServer", e);
        System.out.println(e);
    }
    //选举初始化
    startLeaderElection();
    startJvmPauseMonitor();
    super.start();
}
...
public synchronized void startLeaderElection() {
    try {
        //如果当前节点状态是LOOKING 投票给自己
        if (getPeerState() == ServerState.LOOKING) {
            currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
        }
    } catch (IOException e) {
        RuntimeException re = new RuntimeException(e.getMessage());
        re.setStackTrace(e.getStackTrace());
        throw re;
    }

    //根据配置获取选举算法 可以通过在 zoo.cfg 里面进行配置,默认是 fast 选举
    this.electionAlg = createElectionAlgorithm(electionType);
}
...
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {
    Election le = null;

    //TODO: use a factory rather than a switch
    switch (electionAlgorithm) {
    case 1:
        throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
    case 2:
        throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
    case 3:
        //leader选举网络io负责类(负责底层网络处理接收和发送队列中的消息)
        QuorumCnxManager qcm = createCnxnManager();
        QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
        if (oldQcm != null) {
            LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
            oldQcm.halt();
        }
        QuorumCnxManager.Listener listener = qcm.listener;
        if (listener != null) {
            //启动已绑定的选举线程 等待集群中其他机器连接
            listener.start();
            //基于TCP的选举算法 FastLeaderElection
            FastLeaderElection fle = new FastLeaderElection(this, qcm);
            fle.start();
            le = fle;
        } else {
            LOG.error("Null listener when initializing cnx manager");
        }
        break;
    default:
        assert false;
    }
    return le;
}

// 其中FastLeaderElection fle = new FastLeaderElection(this, qcm);会调用一下构造方法
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
   this.stop = false;
   this.manager = manager;
   starter(self, manager);
}
// 一目了然不多解释
private void starter(QuorumPeer self, QuorumCnxManager manager) {
   this.self = self;
   proposedLeader = -1;
   proposedZxid = -1;

   sendqueue = new LinkedBlockingQueue<ToSend>();
   recvqueue = new LinkedBlockingQueue<Notification>();
   this.messenger = new Messenger(manager);
}

// FastLeaderElection的start方法被调用会构建Messenger
//Starts instances of WorkerSender and WorkerReceiver启动消息接收器和发送器线程
public void start() {
    this.messenger.start();
}
/**
* Constructor of class Messenger.
*
* @param manager   Connection manager
 */
 Messenger(QuorumCnxManager manager) {
  this.ws = new WorkerSender(manager);
  this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
  this.wsThread.setDaemon(true);

  this.wr = new WorkerReceiver(manager);

  this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
  this.wrThread.setDaemon(true);
 }
/**
 * Starts instances of WorkerSender and WorkerReceiver
*/
void start() {
  this.wsThread.start();
  this.wrThread.start();
}
...
// 以上执行完成后QuorumPeer的run方法被调用
@Override
public void run() {
  ...
    try {
        /*
         * Main loop
         */
        while (running) {
            //判断当前节点状态
            switch (getPeerState()) {
            case LOOKING:
                //如果是LOOKING 则进入选举流程
                LOG.info("LOOKING");
                ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

                if (Boolean.getBoolean("readonlymode.enabled")) {
                    LOG.info("Attempting to start ReadOnlyZooKeeperServer");

                    // Create read-only server but don't start it immediately
                    final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);

                    // Instead of starting roZk immediately, wait some grace
                    // period before we decide we're partitioned.
                    //
                    // Thread is used here because otherwise it would require
                    // changes in each of election strategy classes which is
                    // unnecessary code coupling.
                    Thread roZkMgr = new Thread() {
                        public void run() {
                            try {
                                // lower-bound grace period to 2 secs
                                sleep(Math.max(2000, tickTime));
                                if (ServerState.LOOKING.equals(getPeerState())) {
                                    roZk.startup();
                                }
                            } catch (InterruptedException e) {
                                LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
                            } catch (Exception e) {
                                LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
                            }
                        }
                    };
                    try {
                        roZkMgr.start();
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        //此处通过策略模式来决定当前用哪个选举算法来进行领导选举
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    } finally {
                        // If the thread is in the the grace period, interrupt
                        // to come out of waiting.
                        roZkMgr.interrupt();
                        roZk.shutdown();
                    }
                } else {
                    try {
                        reconfigFlagClear();
                        if (shuttingDownLE) {
                            shuttingDownLE = false;
                            startLeaderElection();
                        }
                        //此处通过策略模式决定当前用哪个选举算法来进行领导选举
                        setCurrentVote(makeLEStrategy().lookForLeader());
                    } catch (Exception e) {
                        LOG.warn("Unexpected exception", e);
                        setPeerState(ServerState.LOOKING);
                    }
                }
                break;
            case OBSERVING:
                ...
                break;
            case FOLLOWING:
                ...
                break;
            case LEADING:
                ...
                break;
            }
        }
    } finally {
       ...
    }
}
复制代码

执行核心选举算法

// 入口前文:setCurrentVote(makeLEStrategy().lookForLeader());
    public Vote lookForLeader() throws InterruptedException {
        try {
            self.jmxLeaderElectionBean = new LeaderElectionBean();
            MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
        } catch (Exception e) {
            LOG.warn("Failed to register with JMX", e);
            self.jmxLeaderElectionBean = null;
        }

        self.start_fle = Time.currentElapsedTime();
        try {
            /*
             * The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
             * if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
             * of participants has voted for it.
             */
            //保存收到的投票
            Map<Long, Vote> recvset = new HashMap<Long, Vote>();

            /*
             * The votes from previous leader elections, as well as the votes from the current leader election are
             * stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
             * Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
             * outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
             * the electionEpoch of the received notifications) in a leader election.
             */
            //存储选举结果
            Map<Long, Vote> outofelection = new HashMap<Long, Vote>();

            int notTimeout = minNotificationInterval;

            synchronized (this) {
                //增加逻辑时钟  +1原子操作
                logicalclock.incrementAndGet();
                //更新自己的zxid和epoch
                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
            }

            LOG.info(
                "New election. My id = {}, proposed zxid=0x{}",
                self.getId(),
                Long.toHexString(proposedZxid));
            //发送投票 包括发送给自己(广播)
            sendNotifications();

            SyncedLearnerTracker voteSet;

            /*
             * Loop in which we exchange notifications until we find a leader
             */

            //进行while循环 直到选举出leader
            while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
                /*
                 * Remove next notification from queue, times out after 2 times
                 * the termination time
                 */
                //从接收IO线程里拿到投票信息 自己的投票也在这里处理
                Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);

                /*
                 * Sends more notifications if haven't received enough.
                 * Otherwise processes new notification.
                 */
                //如果为空 消息发完了 继续发送 一直到选出leader为止
                if (n == null) {
                    if (manager.haveDelivered()) {
                        sendNotifications();
                    } else {
                        //消息还没投递出去 可能是其他server还没启动 尝试再连接
                        manager.connectAll();
                    }

                    /*
                     * Exponential backoff
                     */
                    //延长超时时间
                    int tmpTimeOut = notTimeout * 2;
                    notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
                    LOG.info("Notification time out: {}", notTimeout);
                    //收到投票消息 判断收到的消息是不是属于这个集群内
                } else if (validVoter(n.sid) && validVoter(n.leader)) {
                    /*
                     * Only proceed if the vote comes from a replica in the current or next
                     * voting view for a replica in the current or next voting view.
                     */
                    //判断收到的消息的节点的状态
                    switch (n.state) {
                    case LOOKING:
                        if (getInitLastLoggedZxid() == -1) {
                            LOG.debug("Ignoring notification as our zxid is -1");
                            break;
                        }
                        if (n.zxid == -1) {
                            LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
                            break;
                        }
                        // If notification > current, replace and send messages out
                        //判断接收到的节点epoch大于logicalclock  则表示当前是新一轮的选举
                        if (n.electionEpoch > logicalclock.get()) {
                            //更新本地logicalclock
                            logicalclock.set(n.electionEpoch);
                            //清空接收队列
                            recvset.clear();
                            //检查收到的消息是否可以胜出 依次比较epoch zxid myid
                            if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
                                //胜出后 把投票改为对方的票据
                                updateProposal(n.leader, n.zxid, n.peerEpoch);
                            } else {
                                //否则 票据不变
                                updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
                            }
                            //继续广播 让其他节点知道我现在的票据
                            sendNotifications();
                            //如果收到的消息epoch小于当前节点的epoch 则忽略这条消息
                        } else if (n.electionEpoch < logicalclock.get()) {
                                LOG.debug(
                                    "Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
                                    Long.toHexString(n.electionEpoch),
                                    Long.toHexString(logicalclock.get()));
                            break;
                            //如果epoch相同 继续比较zxid myid 如果胜出 则更新自己的票据 并发出广播
                        } else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                            updateProposal(n.leader, n.zxid, n.peerEpoch);
                            sendNotifications();
                        }

                        LOG.debug(
                            "Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
                            n.sid,
                            n.leader,
                            Long.toHexString(n.zxid),
                            Long.toHexString(n.electionEpoch));

                        // don't care about the version if it's in LOOKING state
                        //添加到本机投票集合 用来做选举终结判断
                        recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));

                        voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));

                        //判断选举是否结束 默认算法是超过半数server同意
                        if (voteSet.hasAllQuorums()) {

                            // Verify if there is any change in the proposed leader
                            //一直等到新的通知到达 直到超时
                            while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
                                if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
                                    recvqueue.put(n);
                                    break;
                                }
                            }

                            /*
                             * This predicate is true once we don't read any new
                             * relevant message from the reception queue
                             */
                            //确定leader
                            if (n == null) {
                                //修改状态
                                setPeerState(proposedLeader, voteSet);
                                Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }
                        break;
                        //OBSERVING 不参与选举投票
                    case OBSERVING:
                        LOG.debug("Notification from observer: {}", n.sid);
                        break;
                        //这两种需要参与选举
                    case FOLLOWING:
                    case LEADING:
                        /*
                         * Consider all notifications from the same epoch
                         * together.
                         */
                        //判断epoch是否相同
                        if (n.electionEpoch == logicalclock.get()) {
                            //如果相同 加入本机的投票集合
                            recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                            //判断是否结束 如果结束 确认leader是否有效
                            if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
                                //修改自己的状态并返回投票结果
                                setPeerState(n.leader, voteSet);
                                Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                                leaveInstance(endVote);
                                return endVote;
                            }
                        }

                        /*
                         * Before joining an established ensemble, verify that
                         * a majority are following the same leader.
                         *
                         * Note that the outofelection map also stores votes from the current leader election.
                         * See ZOOKEEPER-1732 for more information.
                         */
                        outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
                        voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

                        if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
                            synchronized (this) {
                                logicalclock.set(n.electionEpoch);
                                setPeerState(n.leader, voteSet);
                            }
                            Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
                            leaveInstance(endVote);
                            return endVote;
                        }
                        break;
                    default:
                        LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
                        break;
                    }
                } else {
                    if (!validVoter(n.leader)) {
                        LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
                    }
                    if (!validVoter(n.sid)) {
                        LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
                    }
                }
            }
            return null;
        } finally {
            try {
                if (self.jmxLeaderElectionBean != null) {
                    MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
                }
            } catch (Exception e) {
                LOG.warn("Failed to unregister with JMX", e);
            }
            self.jmxLeaderElectionBean = null;
            LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
        }
    }


/**
     * Send notifications to all peers upon a change in our vote
     */
    /**
     *   广播消息
     */
    private void sendNotifications() {
        //循环发送
        for (long sid : self.getCurrentAndNextConfigVoters()) {
            QuorumVerifier qv = self.getQuorumVerifier();
            //消息实体
            ToSend notmsg = new ToSend(
                ToSend.mType.notification,
                proposedLeader,
                logicalclock.get(),
                QuorumPeer.ServerState.LOOKING,
                sid,
                proposedEpoch,
                qv.toString().getBytes());

            LOG.debug(
                "Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
                    + " {} (myid), 0x{} (n.peerEpoch) ",
                proposedLeader,
                Long.toHexString(proposedZxid),
                Long.toHexString(logicalclock.get()),
                sid,
                self.getId(),
                Long.toHexString(proposedEpoch));

            //添加到发送队列 这个队列会被workersender消费
            sendqueue.offer(notmsg);
        }
}
复制代码

借助一张网络图片,该图对于选举流程中涉及到数据的流向的描述还是很清楚的。

其中涉及一个网络IO管理器:负责维护处理发送和接收两个线程。及选举算法从队列消费生产投票消息。最终执行核心的选票PK,按照一定策略进行更新和丢弃,直到选举出一个Leader。

总结

要想理解清楚Leader选举流程,其中几个重要的概念及名词要清楚。

  • 事务ID和Zxid的概念要明确
  • Zxid和Sid比较的先后顺序及比较策略
  • 如何理解更新选票并广播自己的选票

OK!关于Zookeeper的Leader选举流程暂时就聊这么多,后期还会对ZK实现的分布式锁以及涉及到的"惊群和脑裂的概念做一个介绍",如果还有时间的话,再聊聊Zk是进行数据同步的几种模式!


作者:爱唠嗑的阿磊
链接:https://juejin.im/post/6883483460686594061
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。