六、Zookeeper源码解析之选举中的通信

QuorumCnxManager介绍

  • 这个类是专门用于leader选举的连接管理器,对应专门有个端口来执行这个通信。它能保证,每个连接两两之前保持有且仅有一个连接。
  • 如果两个服务端同时开启一个连接,这个管理器会采用一个简单的tie-breaking机制来决定删除其中一个。
  • 对于每个连接,管理器维护一个消息队列,每次要发送的消息都会被放到队列尾部,当一个连接挂掉后,sender线程会将该消息放回队列。

接下来我们来了解这个类

主要属性:

1
2
3
4
5
6
7
8
9
10
11
12
final QuorumPeer self;
// 线程池
private ThreadPoolExecutor connectionExecutor;
// 发送线程,对端的sid作为key,value为线程SendWorker
final ConcurrentHashMap<Long, SendWorker> senderWorkerMap;
// 发送队列,对端的sid作为key,value是一个消息队列先进先出,有界队列
final ConcurrentHashMap<Long, BlockingQueue<ByteBuffer>> queueSendMap;
// 最后发送的消息
final ConcurrentHashMap<Long, ByteBuffer> lastMessageSent;
// 接收消息队列,因为都是同一个线程处理,无需针对不同客户端设置。
public final BlockingQueue<Message> recvQueue;
public final Listener listener;

专门指出一点,zookeeper用的阻塞队列大都是用的自定义的CircularBlockingQueue类,这个类与我们在并发时讲过的其他阻塞队列有两点不同:

  1. 只实现了部分方法,很多多余的方法都throw new UnsupportedOperationException();
  2. 在放入元素时,只能使用offer方法,且该offer保证一定返回true,因为在队列满的时候它会删除对一个元素,也就是最开始进来的,有点像线程的拒绝策略

接着我们看看上面几个Thread实现类

Listener

从上面选举流程中我们可以看到,在真正开始选举前我们启动了listener,因此我们可以合理的猜测这就是一个监听其他服务端请求过来的线程。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
private final int portBindMaxRetry; //重试次数
private List<ListenerHandler> listenerHandlers; // 监听处理列表
// 线程执行流程
public void run() {
if (!shutdown) {
LOG.debug("Listener thread started, myId: {}", self.getId());
Set<InetSocketAddress> addresses;

if (self.getQuorumListenOnAllIPs()) { // 通配符端口,一般我们只配置一个端口
addresses = self.getElectionAddress().getWildcardAddresses();
} else {
addresses = self.getElectionAddress().getAllAddresses();
}

CountDownLatch latch = new CountDownLatch(addresses.size());
listenerHandlers = addresses.stream().map(address ->
new ListenerHandler(address, self.shouldUsePortUnification(), self.isSslQuorum(), latch))
.collect(Collectors.toList());// 根据端口配置不同的处理器,

ExecutorService executor = Executors.newFixedThreadPool(addresses.size());
listenerHandlers.forEach(executor::submit);// 提交到线程池中
// 等待完成,也就时等待停止信号,需要每个处理器都不在工作了
try {
latch.await();
} catch (InterruptedException ie) {
LOG.error("Interrupted while sleeping. Ignoring exception", ie);
} finally {
// Clean up for shutdown.
for (ListenerHandler handler : listenerHandlers) {
try {
handler.close(); // 关闭所有的监听
} catch (IOException ie) {
// Don't log an error for shutdown.
LOG.debug("Error closing server socket", ie);
}
}
}
}
}

从代码中我们可以看到真正工作的时里面建立的listenerHandlers,因此我们再来看这个子类

ListenerHandler

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
private ServerSocket serverSocket; // 注意,这里时一个服务端
private InetSocketAddress address;
@Override
public void run() {
try {
acceptConnections(); // 核心代码
} catch (Exception e) {
} finally {
latch.countDown();
}
}

private void acceptConnections() {
int numRetries = 0;
Socket client = null; // 客户端

while ((!shutdown) && (portBindMaxRetry == 0 || numRetries < portBindMaxRetry)) {
try {
serverSocket = createNewServerSocket(); // 真正的服务端,使用tcp协议
LOG.info("{} is accepting connections now, my election bind port: {}", QuorumCnxManager.this.mySid, address.toString());
while (!shutdown) {
try {
client = serverSocket.accept(); // 接受数据
setSockOpts(client); // 设置socket选项
LOG.info("Received connection request from {}", client.getRemoteSocketAddress());
// 如果配置了sasl 鉴权方式,那么会异步处理请求,因为sasl鉴权服务器需要一定时间完成
if (quorumSaslAuthEnabled) {
receiveConnectionAsync(client);// 异步处理
} else {
receiveConnection(client); // 同步处理
}
numRetries = 0;
} catch (SocketTimeoutException e) {
LOG.warn("The socket is listening for the election accepted "
+ "and it timed out unexpectedly, but will retry."
+ "see ZOOKEEPER-2836");
}
}
} catch (IOException e) {
// ... 省略其他错误处理
closeSocket(client);
}
}
}
public void receiveConnection(final Socket sock) {
DataInputStream din = null;
try {
din = new DataInputStream(new BufferedInputStream(sock.getInputStream()));
LOG.debug("Sync handling of connection request received from: {}", sock.getRemoteSocketAddress());
handleConnection(sock, din);
} catch (IOException e) {
LOG.error("Exception handling connection, addr: {}, closing server connection", sock.getRemoteSocketAddress());
LOG.debug("Exception details: ", e);
closeSocket(sock);
}
}

由于同步和异步的过程类似,我们这里只分析其中过一个,receiveConnection(client)主要过程调用了handleConnection(sock, din);

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
private void handleConnection(Socket sock, DataInputStream din) throws IOException {
Long sid = null, protocolVersion = null;
MultipleAddresses electionAddr = null;
try {
protocolVersion = din.readLong();
if (protocolVersion >= 0) { // this is a server id and not a protocol version
sid = protocolVersion;
} else {
try {
InitialMessage init = InitialMessage.parse(protocolVersion, din); // 如果不是sid,那么就任务这是一个协议版本,并尝试解析
sid = init.sid; // 还是要拿sid
if (!init.electionAddr.isEmpty()) {
electionAddr = new MultipleAddresses(init.electionAddr,
Duration.ofMillis(self.getMultiAddressReachabilityCheckTimeoutMs()));
}
LOG.debug("Initial message parsed by {}: {}", self.getId(), init.toString());
} catch (InitialMessage.InitialMessageException ex) {
LOG.error("Initial message parsing error!", ex);
closeSocket(sock);
return;
}
}

if (sid == QuorumPeer.OBSERVER_ID) {
/*
* Choose identifier at random. We need a value to identify
* the connection.
*/
sid = observerCounter.getAndDecrement();
LOG.info("Setting arbitrary identifier to observer: {}", sid);
}
} catch (IOException e) {
LOG.warn("Exception reading or writing challenge", e);
closeSocket(sock);
return;
}

// do authenticating learner
authServer.authenticate(sock, din);
//这就是上面上面说过的tie-breaking机制,如果sid比自己小,则关闭该连接,也就是保证只有比自己sid大的才能发送信息过来。
if (sid < self.getId()) {

SendWorker sw = senderWorkerMap.get(sid);
if (sw != null) {
sw.finish();
}
LOG.debug("Create new connection to server: {}", sid);
closeSocket(sock);
if (electionAddr != null) {
connectOne(sid, electionAddr); // 这里开启一个新连接,也就是保证我去主动连比该sid小的
} else {
connectOne(sid);
}

} else if (sid == self.getId()) {
// we saw this case in ZOOKEEPER-2164
LOG.warn("We got a connection request from a server with our own ID. "
+ "This should be either a configuration error, or a bug.");
} else { // 如果sid比本身的sid大,则开启一个线程去接收数据。
SendWorker sw = new SendWorker(sock, sid); // 接收线程
RecvWorker rw = new RecvWorker(sock, din, sid, sw); // 发送线程
sw.setRecv(rw);

SendWorker vsw = senderWorkerMap.get(sid);
if (vsw != null) {
vsw.finish(); // 清空上一轮的信息
}
senderWorkerMap.put(sid, sw);
queueSendMap.putIfAbsent(sid, new CircularBlockingQueue<>(SEND_CAPACITY));

sw.start();
rw.start();
}
}

从以上代码中我们可以总结以下过程:

  1. 第一次会创建一个ServerSocket,用来监听与其他server端的通信
  2. 在接受到请求后,会解析数据包,第一个解析是服务端的sid
  3. 如果sid < self.sid 那么久断开这次请求,并发起一个主动连接。保证了服务器两两之间只有一个连接
  4. 如果sid > self.sid 则会创建一个SendWorker和RecvWorker来处理这个连接。
  5. 所有需要发送的消息,以对端sid作为key放入queueSendMap,senderWorkerMap中的每个线程会不断去队列中拿消息发送
  6. 同待发送的消息一样,我们也猜测,recvQueue这个队列同样是接收所有的请求。我们往下看如何做的。

RecvWorker

属性:

  • Socket sock; // 持有tcp连接的客户端,也就是用来接受和发送通信
  • final DataInputStream din; // 重复使用的数据包
  • final SendWorker sw; 对应的发送线程。

这个类主要在收到请求的时候会创建,每个Zookeeper服务节点都会开启一个群首选举的端口ServerSocket ,每收到一个连接都会开启一个客户端去接收信息,并发送信息。
我们来看它具体做的工作

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
 @Override
public void run() {
threadCnt.incrementAndGet();
try {
LOG.debug("RecvWorker thread towards {} started. myId: {}", sid, QuorumCnxManager.this.mySid);
while (running && !shutdown && sock != null) {
int length = din.readInt(); // 第一个数据是数据包的长度
if (length <= 0 || length > PACKETMAXSIZE) {
throw new IOException("Received packet with invalid packet: " + length);
}
final byte[] msgArray = new byte[length];
din.readFully(msgArray, 0, length);
addToRecvQueue(new Message(ByteBuffer.wrap(msgArray), sid));// 拿到消息并放入recv队列,注意这是外部类方法
}
} catch (Exception e) {
.....
} finally {
LOG.warn("Interrupting SendWorker thread from RecvWorker. sid: {}. myId: {}", sid, QuorumCnxManager.this.mySid);
sw.finish(); // 可以看出这是个长连接,只有在非正常状态下才会关闭
closeSocket(sock);
}
}

从上面的代码中可以看出,这个类的主要作用就是不断的接收从socket传来的消息,并放入到recvQueue 队列中

recvQueue 什么时候处理呢?这里有疑惑的同学可以回到选举过程这一篇文章就可以看到再Message类中处理了。

SendWorker

与RecvWorker类似,它也有如下属性

  • Socket sock;
  • RecvWorker recvWorker; // 对应的接收进程
  • DataOutputStream dout;

发送代码这里就不粘贴了,过程类似接收进程,不停的从发送队列中拿到消息然后发到对端。

这里有个不同要特别说明下,就是在没有消息的时候会重发发上次最后一条发送的消息也就是上面提到的lastMessageSent,为什么这么做,官方说明是因为,当我们同时删除和创建新连接时,有可能发送了消息但从未收到(这个时候可能发送队列为空),保留这个最近发送的消息列表,并重新发送最近发送的消息。接收多个副本是无害的(为什么无害?)。