五、Zookeeper源码解析之集群模式下的选举过程

选举主流程

上一篇文章中我们简要的介绍了集群的启动过程,主要有两步涉及的群主选举

1
2
3
4
5
6
7
public synchronized void start() {
loadDataBase();
startServerCnxnFactory();
adminServer.start();
startLeaderElection(); // 开启主节点选举
super.start(); // 本线程启动
}

再深入代码发现FastLeaderElection里面也是多个Thread类,所以这两个步骤很多过程是并行执行的。我们先看第一步中,串行部分:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public synchronized void startLeaderElection() {
try {
if (getPeerState() == ServerState.LOOKING) {
currentVote = new Vote(myid, getLastLoggedZxid(), getCurrentEpoch()); // 第一启动,创建一个选票。
}
} catch (IOException e) {
....// 省略
}
// 创建选举算法,其实应该是开启选举(多个线程配合构成了一个后台选举流程)
this.electionAlg = createElectionAlgorithm(electionType);
}
protected Election createElectionAlgorithm(int electionAlgorithm) {
Election le = null;
//TODO: use a factory rather than a switch
// 从上面看,目前只有一种该选举算法,其他的都是待开发状态的。
switch (electionAlgorithm) {
case 1:
throw new UnsupportedOperationException("Election Algorithm 1 is not supported.");
case 2:
throw new UnsupportedOperationException("Election Algorithm 2 is not supported.");
case 3:
QuorumCnxManager qcm = createCnxnManager(); // 专门用于选举通信的管理器
QuorumCnxManager oldQcm = qcmRef.getAndSet(qcm); // 使用原子类,保证线程安全,并获取之前管理器
if (oldQcm != null) { // 如果有旧的,则清除之前的信息
LOG.warn("Clobbering already-set QuorumCnxManager (restarting leader election?)");
oldQcm.halt();
}
QuorumCnxManager.Listener listener = qcm.listener;
if (listener != null) {
listener.start(); // 监听线程
FastLeaderElection fle = new FastLeaderElection(this, qcm);
fle.start(); // 选举的主要组件,启动后,主线程也同步往下走
le = fle; // 设置选举算法为fle FastLeaderElection
} else {
LOG.error("Null listener when initializing cnx manager");
}
break;
default:
assert false;
}
return le;
}
  1. 创建群主选举的通信管理器,并开启选举通信,listener.start()

  2. 启动选举也就是FastLeaderElection.start()

  3. 也就是最开始的QuorumPeer.start();

选举流程简要.PNG

选举的通信管理我们单独作为了一个章节来讲,有兴趣的可以看源码解析三、选举中的通信.我们这里只讲FastLeaderElection启动过程、QuorumPeer.run方法

FastLeaderElection

主要属性如下:

1
2
3
4
5
6
7
8
9
10
QuorumCnxManager manager;// 通信管理器
private SyncedLearnerTracker leadingVoteSet; // 投票
LinkedBlockingQueue<ToSend> sendqueue; // 待发送的消息队列,也就是自己的选票
LinkedBlockingQueue<Notification> recvqueue; //收到的消息,也就是收到的投票信息
QuorumPeer self; // 自引用
Messenger messenger; // 消息管理器
AtomicLong logicalclock = new AtomicLong();
long proposedLeader; // 投票的sid
long proposedZxid; // 投票的zxid
long proposedEpoch; // 投票的epoch

starter 初始化,票据和自己的zxid都为-1 ,初始化发送和接收队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
private void starter(QuorumPeer self, QuorumCnxManager manager) {
this.self = self;
proposedLeader = -1;
proposedZxid = -1;
sendqueue = new LinkedBlockingQueue<ToSend>();
recvqueue = new LinkedBlockingQueue<Notification>();
this.messenger = new Messenger(manager);
}
/**
* 启动messger线程,其实就是启动发送和接收线程,我们稍后详解。
*/
public void start() {
this.messenger.start();
}

Messenger

主要工作的类还是message,这是一个多线程实现的消息管理器

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
protected class Messenger {
WorkerSender ws; // 发送消息
WorkerReceiver wr; // 接受消息
Messenger(QuorumCnxManager manager) {
this.ws = new WorkerSender(manager);
this.wsThread = new Thread(this.ws, "WorkerSender[myid=" + self.getId() + "]");
this.wsThread.setDaemon(true);
this.wr = new WorkerReceiver(manager);
this.wrThread = new Thread(this.wr, "WorkerReceiver[myid=" + self.getId() + "]");
this.wrThread.setDaemon(true);
}
void start() {
this.wsThread.start();
this.wrThread.start();
}
}

WorkerReceiver

WorkerReceiver主要功能都在run方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
public void run() {
Message response;
while (!stop) {
// Sleeps on receive
try {
// 从消息通信管理器中的recvQueue队列拿数据,也就是其他节点发过来的消息。可以看选举通信那一章节。
response = manager.pollRecvQueue(3000, TimeUnit.MILLISECONDS);
if (response == null) {
continue;
}
Notification n = new Notification();
int rstate = response.buffer.getInt();
long rleader = response.buffer.getLong(); // 省略部分内容
int version = 0x0;
QuorumVerifier rqv = null;
if (!validVoter(response.sid)) {
sendqueue.offer(notmsg); // 如果sid不合法返回一个消息
} else {
QuorumPeer.ServerState ackstate = QuorumPeer.ServerState.LOOKING;
switch (rstate) {/*设置state,如LOOKING*/}
// 组装n Notification 省略部分内容
n.leader = rleader;
// 如果还在LOOKING的状态,则将这条选举信息方队消息队列recvqueue
if (self.getPeerState() == QuorumPeer.ServerState.LOOKING) {
recvqueue.offer(n);
// 如果发现对端发来消息。epoch比当前节点小,则发送我们的投票过去。
// 尝试让它更新投票(此时与getCurrentVote是不同的,还没有这个值)
if ((ackstate == QuorumPeer.ServerState.LOOKING)&& (n.electionEpoch < logicalclock.get())) {
Vote v = getVote();
QuorumVerifier qv = self.getQuorumVerifier();
ToSend notmsg = new ToSend(getVote());
sendqueue.offer(notmsg);
}
} else {
// 如果本节点应不再是LOOKING状态,且对端状态是LOOKING,则发送我们的投票信息
Vote current = self.getCurrentVote();
if (ackstate == QuorumPeer.ServerState.LOOKING) {
// 如果自身是Leader角色,判断在设定的时间段内(即initLimit*tickTime)
//是否有半数以上的Follower与Leader建立连接,假如没有半数以上的Follower
//与Leader建立连接的话Leader便会退出领导,并重新开始集群Leader的选举
if (self.leader != null) {
if (leadingVoteSet != null) {
self.leader.setLeadingVoteSet(leadingVoteSet);
leadingVoteSet = null;
}
self.leader.reportLookingSid(response.sid);
}

QuorumVerifier qv = self.getQuorumVerifier();
// 将拿到的消息放到消息队列sendqueue
ToSend notmsg = new ToSend(current);
sendqueue.offer(notmsg);
}
}
}
} catch (InterruptedException e) {
LOG.warn("Interrupted Exception while waiting for new message", e);
}
}
LOG.info("WorkerReceiver is down");
}

过程总结如下:

  1. 从选举通信消息管理器中拿到消息
  2. 组装成投票信息
  3. 验证消息的sid是否合法,不合法则返回给对方一个消息,将其放入sendqueue
  4. 判断自身的状态,如果是LOOKING,则将这条消息放到接受队列recvqueue
  5. 如果对端也是LOOKING状态,且对方的Epoch比我们的旧,则发送我们的投票,尝试让其更新,将其放入sendqueue
  6. 如果本节点应不是LOOKING状态,且对端状态是LOOKING,则发送我们的投票信息,将其放入sendqueue

如果本身是LEADING状态,有可能会触发自身状态的改变,这个留到后面讲

WorkerSender

相对于WorkerReceiver,WorkerSender更简单,就是将需要发送的消息从sendqueue放到manager里的queueSendMap准备发送。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
// 拿到消息然后发送
public void run() {
while (!stop) {
try {
ToSend m = sendqueue.poll(3000, TimeUnit.MILLISECONDS);
if (m == null) {
continue;
}
process(m);
} catch (InterruptedException e) {
break;
}
}
LOG.info("WorkerSender is down");
}
void process(ToSend m) {
ByteBuffer requestBuffer = buildMsg(m.state.ordinal(), m.leader, m.zxid, m.electionEpoch, m.peerEpoch, m.configData);
// 调用的是通信管理器的方法。其实就是放到queueSendMap 对应sid的队列中
manager.toSend(m.sid, requestBuffer);
}

主线程中的群主选举

主要集中在run方法中,由于run方法是在节点运行的所有过程,因此我们这里只列举与群主选举有关的部分:

代码分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Override
public void run() {
updateThreadName();
LOG.debug("Starting quorum peer");
// 开启jmx
try {
// 主循环
while (running) {
switch (getPeerState()) {
case LOOKING: // 此时为群主选举状态
LOG.info("LOOKING");
ServerMetrics.getMetrics().LOOKING_COUNT.add(1);

if (Boolean.getBoolean("readonlymode.enabled")) {
// 只读模式下的server处理
} else {
try {
reconfigFlagClear();
// 初始值是false,后面leader选举失败会变为true
if (shuttingDownLE) {
shuttingDownLE = false;
startLeaderElection();
}
setCurrentVote(makeLEStrategy().lookForLeader()); // 核心选举算法
} catch (Exception e) {
LOG.warn("Unexpected exception", e);
setPeerState(ServerState.LOOKING);
}
}
break;
// 其他状态的处理
}
}
}

于是我们的目光转到setCurrentVote(makeLEStrategy().lookForLeader());
其中makeLEStrategy()是获取选举算法,我们在上一步createElectionAlgorithm已经设置了,也就是FastLeaderElection。
我们选举真正的核心是lookForLeader()方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
/**
* 开启新一轮的选举,只要我们节点从其他状态转换成为LOOKING状态,就会触发该方法,并通知其他节点
*/
public Vote lookForLeader() throws InterruptedException {

self.start_fle = Time.currentElapsedTime();
try {
// 存储当前选举的选票,也就是逻辑时钟为logicalclock(当前时钟)的选票,当前参与者使用recvset来推断是否有大多数参与者投票支持它。
Map<Long, Vote> recvset = new HashMap<Long, Vote>();
//前一轮选票,以及当前群主选票(但是只保存FOLLOWING和LEADING的投票),参与者可以通过它来判断谁是leader
Map<Long, Vote> outofelection = new HashMap<Long, Vote>();
int notTimeout = minNotificationInterval;
synchronized (this) {
logicalclock.incrementAndGet();
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch()); //1. 初始化自己的选票
}
sendNotifications(); // 2. 广播自己的选票
SyncedLearnerTracker voteSet;
// 主循环,直到找到leader
while ((self.getPeerState() == ServerState.LOOKING) && (!stop)) {
// 3.从recvqueue拿到下一个选票
Notification n = recvqueue.poll(notTimeout, TimeUnit.MILLISECONDS);
// 4. 如果没有消息,那么就再广播一次,这是因为有可能其他节点没有收到消息。
if (n == null) {
if (manager.haveDelivered()) { // 只要有为空的,说明发出去,但是没有成功,则再广播一遍。
sendNotifications();
} else {
manager.connectAll(); // 都不为空,说明连接可能创建失败或还没创建好
}
} // 5. 验证选票的正确性,一是验证当前候选人和上一轮候选人是否包括该sid,
//二是验证当前候选人和上一轮候选人是否包括该选票的推举人。
else if (validVoter(n.sid) && validVoter(n.leader)) {
switch (n.state) {
case LOOKING:
// 6. 如果选票的逻辑始终大于当前节点的逻辑时钟,则说明当前节点慢多个轮次。
//则重置当前节点的时钟,并清除之前的选票,然后广播更新后的选票
if (n.electionEpoch > logicalclock.get()) {
logicalclock.set(n.electionEpoch);
recvset.clear();
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, getInitId(), getInitLastLoggedZxid(), getPeerEpoch())) {
updateProposal(n.leader, n.zxid, n.peerEpoch); // 用初始化的各个值来pk选票。
} else {
updateProposal(getInitId(), getInitLastLoggedZxid(), getPeerEpoch());
}
sendNotifications(); // 广播自己的选票
} else if (n.electionEpoch < logicalclock.get()) {
break;
// 相等的时候也要pk
} else if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid, proposedEpoch)) {
updateProposal(n.leader, n.zxid, n.peerEpoch);
sendNotifications();
}
// 7. 将选票放入recvset,注意每个服务器都只有一份,所以重发发没有副作用。
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch));
// 8. 统计选票(针对当前选票),也就是当前别选举的节点的投票。
voteSet = getVoteTracker(recvset, new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch));
// 如果该投票仲裁有效(也就是当前投票数超过一半)
if (voteSet.hasAllQuorums()) {
while ((n = recvqueue.poll(finalizeWait, TimeUnit.MILLISECONDS)) != null) { // 看是否有新的选票进来
if (totalOrderPredicate(n.leader, n.zxid, n.peerEpoch, proposedLeader, proposedZxid,
// 并且选票比当前节点的票据更高,则更新
proposedEpoch)) {
recvqueue.put(n);
break;
}
}
// 如果没有更好的选票,则结束选票,选择当前所选举的Leader为Leader
if (n == null) {
setPeerState(proposedLeader, voteSet);
Vote endVote = new Vote(proposedLeader, proposedZxid, logicalclock.get(), proposedEpoch);
leaveInstance(endVote);
return endVote;
}
}
break;
case OBSERVING:
LOG.debug("Notification from observer: {}", n.sid);
break;
case FOLLOWING:
case LEADING:
// 如果轮次是一样的,则进行统计。
if (n.electionEpoch == logicalclock.get()) {
recvset.put(n.sid, new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(recvset, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
if (voteSet.hasAllQuorums() && checkLeader(recvset, n.leader, n.electionEpoch)) {
setPeerState(n.leader, voteSet);
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
}
// 如果不是同一轮次。或者还没有结束,将此选票放入outofelection
// 这里有点难以理解,不过我们可以看不同的地方。
// outofelection没有加入过LOOKING状态的选票,也就是所有LEADER和FOLLOW的选票,判断条件是这些节点总数的一半,而不包括looking状态的节点。
// 上面的判断针对的是electionEpoch等于目前的轮次情况,而这里解决的问题是不相等的情况下,还能更新logicalclock
// 因为如果其他节点都是follower和leader状态,那么我们永远无法更新logicalclock
outofelection.put(n.sid, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));
voteSet = getVoteTracker(outofelection, new Vote(n.version, n.leader, n.zxid, n.electionEpoch, n.peerEpoch, n.state));

if (voteSet.hasAllQuorums() && checkLeader(outofelection, n.leader, n.electionEpoch)) {
synchronized (this) {
logicalclock.set(n.electionEpoch);
setPeerState(n.leader, voteSet);
}
Vote endVote = new Vote(n.leader, n.zxid, n.electionEpoch, n.peerEpoch);
leaveInstance(endVote);
return endVote;
}
break;
}
} else {
}
}
return null;
} finally {
}
}

过程总结

看完代码,我们再来总结一下流程

  1. 初始化自己的选票
  2. 广播自己的选票
  3. 从recvqueue拿到下一个选票
  4. 如果消息为空,那么就再广播一次,这是因为有可能其他节点没有收到消息。
  5. 验证选票的正确性,一是验证当前候选人和上一轮候选人是否包括该sid,二是验证当前候选人和上一轮候选人是否包括该选票的推举人。
  6. 对选票做处理:
    • LOOKING状态的选票
      • 如果选票的逻辑始终大于当前节点的逻辑时钟,则说明当前节点慢多个轮次。则重置当前节点的时钟,并清除之前的选票,然后广播更新后的选票
      • 将选票放入recvset,注意每个服务器都只有一份,所以重复发没有副作用。
      • 统计选票(针对当前选票),也就是当前别选举的节点的投票。如果该投票仲裁有效(也就是当前投票数超过一半)看是否有新的选票进来,如果没有更好的选票,则结束选票,选择当前所选举的Leader
    • FELLOW或者LEADER的选票
      • 如果轮次是一样的,进行统计所有的选票,如果成功,则返回。
      • 再对不在LOOKING状态的选票进行一次统计,并设置新的轮次。这个针对新节点加入已经稳定后的集群。
    • 没有结束则继续获取选票。

[图片上传失败…(image-2d2cd3-1606390914706)]

一些参数介绍:

  • logicalclock(当前节点)=electionEpoch(当前的投票) // 选举的轮次,默认是0,每执行一次选举都会++

  • proposedEpoch(当前的节点)=peerEpoch(当前的投票) // 每次leader选举完成之后,都会选举出一个新的
    peerEpoch,作为该Leader轮次,用来标记事务请求所属的轮次。

  • currentEpoch 如果不是Leader的时候,一直是初始化的Epoch,如果是leader,成为leader之后的最新的轮次

投票pk

  1. 如果当前票据的Leader轮次大于当前节点的Leader轮次,则返回true;
  2. 如果当前票据和当前节点自己的票据Leader轮次相等,则比较事物id zxid,如果当前票据更大,则返回true;
  3. 如果当前票据和当前节点自己的票据Leader轮次和事物zxid都相等,则比较两者的sid 如果当前sid更大,返回true;
  4. 其他情况返回false,也就是不用更新选票。

流程总结

结合另一章节介绍的选举中的通信,我们可以梳理出,选举过程中所有组件的交互关系。一共三个组件QuorumCnxManager、FastLeaderElection、QuorumPeer。
都是作为一个或多个线程来并行处理,从而选出最后的主节点:

选举过程中各组件关系

脑裂问题

之所以没有单独放到一个章节来讲,主要是因为zookeeper如今的版本已基本解决了脑裂的问题。

什么是脑裂

很多集群模式工作的组件比如zookeeper、elasticSearch,他们通常有一个master节点,负责调度或者说负责管理、通信,总之一定有一个中心节点(可以看作是集群的大脑)。在正常工作的情况下,一般没有问题,但如果出现通信异常、程序异常
的情况,就是出现两个master节点,这就是脑裂现象。

如何产生

  1. 集群部分通信断开
    假设我们有六个节点,一个网段有三台,某一刻,两个网段的连接断开:

脑裂现象一

在如上的情况下,网络断开后,另一个网段就会有可能自主的产生一个新的master节点,这样就发生了脑裂
2. master节点断开后又连上
假设我们有四个节点,在某个瞬间,master节点断开通信,或者延迟。那么这个时候其他节点会重新选举出一个新的master节点,等到master节点恢复正常,它会认为它依然是master节点,这个时候集群出现了两个,也就是脑裂。

脑裂现象二

解决方案

在当前版本中,zookeeper已经解决以上两个问题,如果依照上文看懂了zookeeper的源码,那应该就能给出答案。

  • 使用仲裁机制Quorum,也就是master节点产生,必须获得半数以上的节点
    如上网段二是三台机器,那么就完全没有办法选举出master节点,注意这里是半数以上>,而不是半数或者以上>=。
  • 使用epch机制,每次选举都会累加
    在上述情况二中,master断开,其他三个节点重新选举,选票为3超过了一半有效,且epoch+1,等之前的master重新恢复之后,它的epoch已经是旧的了,所以其他三个节点都不会认同,而是会触发其(旧master)更新,这样就防止了脑裂