Zookeeper的Leader选举
前面我们聊了一下ZAB协议以及Zookeeper的基础概念,心想着都到这个份上了,那还是把剩下的“Leader选举”、“分布式锁”、“惊群和脑裂”都跟大家简单聊聊,这些知识应该足够准备校招的你造火箭
了。
今天首先说一下Zookeeper的Leader选举流程
以及其中涉及的FastLeaderElection选举算法
。
说在前面
ZAB协议是保证Zookeeper集群数据一致性协议其中会涉及选举流程,FastLeaderElection是Zookeeper选举Leader的算法之一。这两点概念一定要搞清楚,不然很容易混为一谈。
Leader选举
两个关键时期:
- 启动Zookeeper集群时
- Leader崩溃进行崩溃恢复时
一些基础概念你需要提前预知:
1.对选举Leader的要求:
选出的leader节点上要持有最高zxid
选出的leader要有过半数节点同意
复制代码
2.内置实现的选举算法
LeaderElection
FastLeaderElection(默认的)
AuthFastLeaderElection
复制代码
3.选举状态
LOOKING:竞选状态
FOLLOWING:跟随状态,同步leader状态,参与投票
OBSERVING:管擦状态,同步leader状态,不参与投票
LEADING:领导者状态
复制代码
4.部分名词
服务器id---myid(或后文的sid,集群模式下必有该配置项)
事务id---服务器中存放的最大zxid
逻辑时钟---发起的投票轮数计数
复制代码
选举流程
Zookeeper要求集群机器必须是
奇数个
(避免脑裂,下文会讲),那么我们假设有三台服务器。接着介绍一下三台服务器的Leader选举流程。
- 每个Server发出一个投票。由于是初始情况,Server1和Server2都会将自己作为Leader服务器来进行投票,每次投票会包含所推举的服务器的myid和ZXID,使用(myid, ZXID)来表示,此时Server1的投票为(1, 0),Server2的投票为(2, 0),然后各自将这个投票发给集群中其他机器。
PS:不懂什么叫为自己投票(不知道票的数据结构?),别急后面带你看源码!!!
-
接受来自各个服务器的投票。集群的每个服务器收到投票后,首先判断该投票的有效性,如检查是否是本轮投票、是否来自LOOKING状态的服务器。
-
处理投票。针对每一个投票,服务器都需要将别人的投票和自己的投票进行比较,比较规则如下:
优先判断ZXID。ZXID(事务ID)比较大的服务器优先作为Leader。 如果ZXID相同,那么就比较myid。myid(服务器ID)较大的服务器作为Leader服务器。 复制代码
对于Server1而言,它的投票是(1, 0),接收Server2的投票为(2, 0),首先会比较两者的ZXID,均为0,再比较myid,此时Server2的myid最大,于是更新自己的投票为(2, 0),然后重新投票,对于Server2而言,其无须更新自己的投票,只是再次向集群中所有机器发出上一次投票信息即可。
PS:于是更新自己的投票为(2, 0)?
其涵义指的是将自己下次发出的投票信息更新为(2, 0),以该票作为新的投票依据。
-
统计投票。每次投票后,服务器都会统计投票信息,判断是否已经有过半机器接受到相同的投票信息,对于Server1、Server2而言,都统计出集群中已经有两台机器接受了(2, 0)的投票信息,此时便认为已经选出了Leader。
-
改变服务器状态。一旦确定了Leader,每个服务器就会更新自己的状态,如果是Follower,那么就变更为FOLLOWING,如果是Leader,就变更为LEADING。
简而言之
1.每个服务实例均发起选举自己为leader的投票。
2.其他服务实例收到投票邀请时,比较发起者的数据事务id是否比自己最新的事务ID大,大则给它投一票,小则不投票,相等则比较发起者的服务器ID,大则投票给它 。
3.发起者收到大家的投票反馈后,看投票数(包括自己的票数)是否大于集群的半数,大于则成为leader,未超过半数且leader未选出,则再次发起投票。
Leader选举算法
在了解了选举流程后我们介绍一下Zookeeper源码中对于算法中的实现细节。
借助网上随处可以百度到的算法描述,我再一次针对其中涉及的疑难点
做一个解说,其大致流程如下:
第一次投票。无论哪种导致进行Leader选举,集群的所有机器都处于试图选举出一个Leader的状态,即LOOKING状态,LOOKING机器会向所有其他机器发送消息,该消息称为投票。投票中包含了SID(服务器的唯一标识)和ZXID(事务ID),(SID, ZXID)形式来标识一次投票信息。
假定Zookeeper由5台机器组成,SID分别为1、2、3、4、5,ZXID分别为9、9、9、8、8,并且此时SID为2的机器是Leader机器,某一时刻,1、2所在机器出现故障,因此集群开始进行Leader选举。在第一次投票时,每台机器都会将自己作为投票对象,于是SID为3、4、5的机器投票情况分别为(3, 9),(4, 8), (5, 8)。
此时五台机器手里的投票分别为:
服务器一:(1,9)假设故障 ×
服务器二:(2,9)假设故障 ×
服务器三:(3,9)
服务器四:(4,8)
服务器五:(5,8)
复制代码
变更投票。每台机器发出投票后,也会收到其他机器的投票,每台机器会根据一定规则来处理收到的其他机器的投票,并以此来决定是否需要变更自己的投票,这个规则也是整个Leader选举算法的核心所在,其中术语描述如下
vote_sid:接收到的投票中所推举Leader服务器的SID。
vote_zxid:接收到的投票中所推举Leader服务器的ZXID。
self_sid:当前服务器自己的SID。
self_zxid:当前服务器自己的ZXID。
复制代码
每次对收到的投票的处理,都是对(vote_sid, vote_zxid)和(self_sid, self_zxid)对比的过程。
规则一:如果vote_zxid大于self_zxid,就认可当前收到的投票,并再次将该投票发送出去。(接收到的事务id大于自己当前事务id)
规则二:如果vote_zxid小于self_zxid,那么坚持自己的投票,不做任何变更。(接收到的事务id小于自己当前事务id)
规则三:如果vote_zxid等于self_zxid,那么就对比两者的SID,如果vote_sid大于self_sid,那么就认可当前收到的投票,并再次将该投票发送出去。(事务ID相等比较服务器ID及zxid)
规则四:如果vote_zxid等于self_zxid,并且vote_sid小于self_sid,那么坚持自己的投票,不做任何变更。
具体流程如图:
确定Leader。经过第二轮投票后,集群中的每台机器都会再次接收到其他机器的投票,然后开始统计投票,如果一台机器收到了超过半数的相同投票,那么这个投票对应的SID机器即为Leader。此时Server3将成为Leader。
选举流程源码
光说不练假把式,搞懂了Leader选举的基本流程,再来探究一下源码,源码之下无秘密!
用我的地址去拉取源码可能会快些。Zookeeper源码 git clone
投票数据结构
我们先解决前面的疑惑投票(或者说票)到底是什么结构?
public class Vote {
...
private final int version;// 版本号
private final long id;//被推举的Leader的SID
private final long zxid;//被推举的Leader事务ID
private final long electionEpoch;//逻辑时钟,用来判断多个投票是否在同一轮选举周期中,每轮自加1
private final long peerEpoch;//被推举的Leader的epoch
private final ServerState state;//当前服务器的状态
...
}
// 服务器状态
public enum ServerState {
LOOKING,
FOLLOWING,
LEADING,
OBSERVING
}
复制代码
知道我们投的是什么票了,接下来我们理一下整个算法流程。
源码入口
zookeeper\zookeeper-server\src\main\java\org\apache\zookeeper\server\quorum下
非核心代码我给大家省去了,如果有兴趣想研究,可以按着我的分析流程查看源码细节。
QuorumPeerMain.java
/**
* To start the replicated server specify the configuration file name on
* the command line.
* @param args path to the configfile
*/
public static void main(String[] args) {
QuorumPeerMain main = new QuorumPeerMain();
try {
main.initializeAndRun(args);//入口
} catch (IllegalArgumentException e) {
...
}
LOG.info("Exiting normally");
ServiceUtils.requestSystemExit(ExitCode.EXECUTION_FINISHED.getValue());
}
protected void initializeAndRun(String[] args) throws ConfigException, IOException, AdminServerException {
QuorumPeerConfig config = new QuorumPeerConfig();
if (args.length == 1) {
config.parse(args[0]);
}
// Start and schedule the the purge task
DatadirCleanupManager purgeMgr = new DatadirCleanupManager(
config.getDataDir(),
config.getDataLogDir(),
config.getSnapRetainCount(),
config.getPurgeInterval());
purgeMgr.start();
//判断是standalone模式还是集群模式
if (args.length == 1 && config.isDistributed()) {
//集群模式
runFromConfig(config);
} else {
LOG.warn("Either no config or no quorum defined in config, running in standalone mode");
// there is only server in the quorum -- run as standalone
ZooKeeperServerMain.main(args);
}
}
public void runFromConfig(QuorumPeerConfig config) throws IOException, AdminServerException {
try {
ManagedUtil.registerLog4jMBeans();
} catch (JMException e) {
LOG.warn("Unable to register log4j JMX control", e);
}
LOG.info("Starting quorum peer");
MetricsProvider metricsProvider;
try {
metricsProvider = MetricsProviderBootstrap.startMetricsProvider(
config.getMetricsProviderClassName(),
config.getMetricsProviderConfiguration());
} catch (MetricsProviderLifeCycleException error) {
throw new IOException("Cannot boot MetricsProvider " + config.getMetricsProviderClassName(), error);
}
try {
ServerMetrics.metricsProviderInitialized(metricsProvider);
ServerCnxnFactory cnxnFactory = null;
ServerCnxnFactory secureCnxnFactory = null;
//为客户端提供读写的server 及2181的端口
if (config.getClientPortAddress() != null) {
cnxnFactory = ServerCnxnFactory.createFactory();
cnxnFactory.configure(config.getClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), false);
}
if (config.getSecureClientPortAddress() != null) {
secureCnxnFactory = ServerCnxnFactory.createFactory();
secureCnxnFactory.configure(config.getSecureClientPortAddress(), config.getMaxClientCnxns(), config.getClientPortListenBacklog(), true);
}
...
//启动主线程
quorumPeer.start();
ZKAuditProvider.addZKStartStopAuditLog();
quorumPeer.join();
} catch (InterruptedException e) {
// warn, but generally this is ok
LOG.warn("Quorum Peer interrupted", e);
}
...
}
复制代码
调用 QuorumPeer 的 start方法
@Override
public synchronized void start() {
if (!getView().containsKey(myid)) {
throw new RuntimeException("My id " + myid + " not in the peer list");
}
//loaddatabase主要是从本地文件中恢复数据,以及获取最新的 zxid
loadDataBase();
startServerCnxnFactory();
try {
adminServer.start();
} catch (AdminServerException e) {
LOG.warn("Problem starting AdminServer", e);
System.out.println(e);
}
//选举初始化
startLeaderElection();
startJvmPauseMonitor();
super.start();
}
...
public synchronized void startLeaderElection() {
try {
//如果当前节点状态是LOOKING 投票给自己
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch());
}
} catch (IOException e) {
RuntimeException re = new RuntimeException(e.getMessage());
re.setStackTrace(e.getStackTrace());
throw re;
}
//根据配置获取选举算法 可以通过在 zoo.cfg 里面进行配置,默认是 fast 选举
this.electionAlg = createElectionAlgorithm(electionType);
}
...
@SuppressWarnings("deprecation")
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
//TODO: use a factory rather than a switch
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
//leader选举网络io负责类(负责底层网络处理接收和发送队列中的消息)
QuorumCnxManager qcm = createCnxnManager();
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm);
if (oldQcm != null) {
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
//启动已绑定的选举线程 等待集群中其他机器连接
listener.start();
//基于TCP的选举算法 FastLeaderElection
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start();
le = fle;
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
// 其中FastLeaderElection fle = new FastLeaderElection(this, qcm);会调用一下构造方法
public FastLeaderElection(QuorumPeer self, QuorumCnxManager manager) {
this.stop = false;
this.manager = manager;
starter(self, manager);
}
// 一目了然不多解释
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
// FastLeaderElection的start方法被调用会构建Messenger
//Starts instances of WorkerSender and WorkerReceiver启动消息接收器和发送器线程
public void start() {
this.messenger.start();
}
/**
* Constructor of class Messenger.
*
* @param manager Connection manager
*/
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
/**
* Starts instances of WorkerSender and WorkerReceiver
*/
void start() {
this.wsThread.start();
this.wrThread.start();
}
...
// 以上执行完成后QuorumPeer的run方法被调用
@Override
public void run() {
...
try {
/*
* Main loop
*/
while (running) {
//判断当前节点状态
switch (getPeerState()) {
case LOOKING:
//如果是LOOKING 则进入选举流程
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);
if (Boolean.getBoolean("readonlymode.enabled")) {
LOG.info("Attempting to start ReadOnlyZooKeeperServer");
// Create read-only server but don't start it immediately
final ReadOnlyZooKeeperServer roZk = new ReadOnlyZooKeeperServer(logFactory, this, this.zkDb);
// Instead of starting roZk immediately, wait some grace
// period before we decide we're partitioned.
//
// Thread is used here because otherwise it would require
// changes in each of election strategy classes which is
// unnecessary code coupling.
Thread roZkMgr = new Thread() {
public void run() {
try {
// lower-bound grace period to 2 secs
sleep(Math.max(2000, tickTime));
if (ServerState.LOOKING.equals(getPeerState())) {
roZk.startup();
}
} catch (InterruptedException e) {
LOG.info("Interrupted while attempting to start ReadOnlyZooKeeperServer, not started");
} catch (Exception e) {
LOG.error("FAILED to start ReadOnlyZooKeeperServer", e);
}
}
};
try {
roZkMgr.start();
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//此处通过策略模式来决定当前用哪个选举算法来进行领导选举
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
} finally {
// If the thread is in the the grace period, interrupt
// to come out of waiting.
roZkMgr.interrupt();
roZk.shutdown();
}
} else {
try {
reconfigFlagClear();
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
//此处通过策略模式决定当前用哪个选举算法来进行领导选举
setCurrentVote(makeLEStrategy().lookForLeader());
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
case OBSERVING:
...
break;
case FOLLOWING:
...
break;
case LEADING:
...
break;
}
}
} finally {
...
}
}
复制代码
执行核心选举算法
// 入口前文:setCurrentVote(makeLEStrategy().lookForLeader());
public Vote lookForLeader() throws InterruptedException {
try {
self.jmxLeaderElectionBean = new LeaderElectionBean();
MBeanRegistry.getInstance().register(self.jmxLeaderElectionBean, self.jmxLocalPeerBean);
} catch (Exception e) {
LOG.warn("Failed to register with JMX", e);
self.jmxLeaderElectionBean = null;
}
self.start_fle = Time.currentElapsedTime();
try {
/*
* The votes from the current leader election are stored in recvset. In other words, a vote v is in recvset
* if v.electionEpoch == logicalclock. The current participant uses recvset to deduce on whether a majority
* of participants has voted for it.
*/
//保存收到的投票
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
/*
* The votes from previous leader elections, as well as the votes from the current leader election are
* stored in outofelection. Note that notifications in a LOOKING state are not stored in outofelection.
* Only FOLLOWING or LEADING notifications are stored in outofelection. The current participant could use
* outofelection to learn which participant is the leader if it arrives late (i.e., higher logicalclock than
* the electionEpoch of the received notifications) in a leader election.
*/
//存储选举结果
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = minNotificationInterval;
synchronized (this) {
//增加逻辑时钟 +1原子操作
logicalclock.incrementAndGet();
//更新自己的zxid和epoch
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
LOG.info(
"New election. My id = {}, proposed zxid=0x{}",
self.getId(),
Long.toHexString(proposedZxid));
//发送投票 包括发送给自己(广播)
sendNotifications();
SyncedLearnerTracker voteSet;
/*
* Loop in which we exchange notifications until we find a leader
*/
//进行while循环 直到选举出leader
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
/*
* Remove next notification from queue, times out after 2 times
* the termination time
*/
//从接收IO线程里拿到投票信息 自己的投票也在这里处理
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
/*
* Sends more notifications if haven't received enough.
* Otherwise processes new notification.
*/
//如果为空 消息发完了 继续发送 一直到选出leader为止
if (n == null) {
if (manager.haveDelivered()) {
sendNotifications();
} else {
//消息还没投递出去 可能是其他server还没启动 尝试再连接
manager.connectAll();
}
/*
* Exponential backoff
*/
//延长超时时间
int tmpTimeOut = notTimeout * 2;
notTimeout = Math.min(tmpTimeOut, maxNotificationInterval);
LOG.info("Notification time out: {}", notTimeout);
//收到投票消息 判断收到的消息是不是属于这个集群内
} else if (validVoter(n.sid) && validVoter(n.leader)) {
/*
* Only proceed if the vote comes from a replica in the current or next
* voting view for a replica in the current or next voting view.
*/
//判断收到的消息的节点的状态
switch (n.state) {
case LOOKING:
if (getInitLastLoggedZxid() == -1) {
LOG.debug("Ignoring notification as our zxid is -1");
break;
}
if (n.zxid == -1) {
LOG.debug("Ignoring notification from member with -1 zxid {}", n.sid);
break;
}
// If notification > current, replace and send messages out
//判断接收到的节点epoch大于logicalclock 则表示当前是新一轮的选举
if (n.electionEpoch > logicalclock.get()) {
//更新本地logicalclock
logicalclock.set(n.electionEpoch);
//清空接收队列
recvset.clear();
//检查收到的消息是否可以胜出 依次比较epoch zxid myid
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
//胜出后 把投票改为对方的票据
updateProposal(n.leader, n.zxid, n.peerEpoch);
} else {
//否则 票据不变
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
//继续广播 让其他节点知道我现在的票据
sendNotifications();
//如果收到的消息epoch小于当前节点的epoch 则忽略这条消息
} else if (n.electionEpoch < logicalclock.get()) {
LOG.debug(
"Notification election epoch is smaller than logicalclock. n.electionEpoch = 0x{}, logicalclock=0x{}",
Long.toHexString(n.electionEpoch),
Long.toHexString(logicalclock.get()));
break;
//如果epoch相同 继续比较zxid myid 如果胜出 则更新自己的票据 并发出广播
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
LOG.debug(
"Adding vote: from={}, proposed leader={}, proposed zxid=0x{}, proposed election epoch=0x{}",
n.sid,
n.leader,
Long.toHexString(n.zxid),
Long.toHexString(n.electionEpoch));
// don't care about the version if it's in LOOKING state
//添加到本机投票集合 用来做选举终结判断
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
//判断选举是否结束 默认算法是超过半数server同意
if (voteSet.hasAllQuorums()) {
// Verify if there is any change in the proposed leader
//一直等到新的通知到达 直到超时
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) {
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
recvqueue.put(n);
break;
}
}
/*
* This predicate is true once we don't read any new
* relevant message from the reception queue
*/
//确定leader
if (n == null) {
//修改状态
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
//OBSERVING 不参与选举投票
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
//这两种需要参与选举
case FOLLOWING:
case LEADING:
/*
* Consider all notifications from the same epoch
* together.
*/
//判断epoch是否相同
if (n.electionEpoch == logicalclock.get()) {
//如果相同 加入本机的投票集合
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
//判断是否结束 如果结束 确认leader是否有效
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
//修改自己的状态并返回投票结果
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
/*
* Before joining an established ensemble, verify that
* a majority are following the same leader.
*
* Note that the outofelection map also stores votes from the current leader election.
* See ZOOKEEPER-1732 for more information.
*/
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
default:
LOG.warn("Notification state unrecoginized: {} (n.state), {}(n.sid)", n.state, n.sid);
break;
}
} else {
if (!validVoter(n.leader)) {
LOG.warn("Ignoring notification for non-cluster member sid {} from sid {}", n.leader, n.sid);
}
if (!validVoter(n.sid)) {
LOG.warn("Ignoring notification for sid {} from non-quorum member sid {}", n.leader, n.sid);
}
}
}
return null;
} finally {
try {
if (self.jmxLeaderElectionBean != null) {
MBeanRegistry.getInstance().unregister(self.jmxLeaderElectionBean);
}
} catch (Exception e) {
LOG.warn("Failed to unregister with JMX", e);
}
self.jmxLeaderElectionBean = null;
LOG.debug("Number of connection processing threads: {}", manager.getConnectionThreadCount());
}
}
/**
* Send notifications to all peers upon a change in our vote
*/
/**
* 广播消息
*/
private void sendNotifications() {
//循环发送
for (long sid : self.getCurrentAndNextConfigVoters()) {
QuorumVerifier qv = self.getQuorumVerifier();
//消息实体
ToSend notmsg = new ToSend(
ToSend.mType.notification,
proposedLeader,
logicalclock.get(),
QuorumPeer.ServerState.LOOKING,
sid,
proposedEpoch,
qv.toString().getBytes());
LOG.debug(
"Sending Notification: {} (n.leader), 0x{} (n.zxid), 0x{} (n.round), {} (recipient),"
+ " {} (myid), 0x{} (n.peerEpoch) ",
proposedLeader,
Long.toHexString(proposedZxid),
Long.toHexString(logicalclock.get()),
sid,
self.getId(),
Long.toHexString(proposedEpoch));
//添加到发送队列 这个队列会被workersender消费
sendqueue.offer(notmsg);
}
}
复制代码
借助一张网络图片,该图对于选举流程中涉及到数据的流向的描述还是很清楚的。
其中涉及一个网络IO管理器
:负责维护处理发送和接收两个线程。及选举算法从队列消费
和生产
投票消息。最终执行核心的选票PK
,按照一定策略进行更新和丢弃,直到选举出一个
Leader。
总结
要想理解清楚Leader选举流程,其中几个重要的概念及名词要清楚。
- 事务ID和Zxid的概念要明确
- Zxid和Sid比较的先后顺序及比较策略
- 如何理解更新选票并广播自己的选票
OK!关于Zookeeper的Leader选举流程暂时就聊这么多,后期还会对ZK实现的分布式锁以及涉及到的"惊群和脑裂的概念做一个介绍",如果还有时间的话,再聊聊Zk是进行数据同步的几种模式!
作者:爱唠嗑的阿磊
链接:https://juejin.im/post/6883483460686594061
来源:掘金
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。