之前我们有讲,在启动集群(也叫仲裁,quorum)模式的时候有两个TCP端口,一个是仲裁通信,一个是群首选举。现在我们跟着源码来看一下。
QuorumCnxManager介绍
- 这个类是专门用于leader选举的连接管理器,对应专门有个端口来执行这个通信。它能保证,每个连接两两之前保持有且仅有一个连接。
- 如果两个服务端同时开启一个连接,这个管理器会采用一个简单的tie-breaking机制来决定删除其中一个。
- 对于每个连接,管理器维护一个消息队列,每次要发送的消息都会被放到队列尾部,当一个连接挂掉后,sender线程会将该消息放回队列。
接下来我们来了解这个类
主要属性:
1 | final QuorumPeer self; |
专门指出一点,zookeeper用的阻塞队列大都是用的自定义的CircularBlockingQueue类,这个类与我们在并发时讲过的其他阻塞队列有两点不同:
- 只实现了部分方法,很多多余的方法都throw new UnsupportedOperationException();
- 在放入元素时,只能使用offer方法,且该offer保证一定返回true,因为在队列满的时候它会删除对一个元素,也就是最开始进来的,有点像线程的拒绝策略
接着我们看看上面几个Thread实现类
Listener
从上面选举流程中我们可以看到,在真正开始选举前我们启动了listener,因此我们可以合理的猜测这就是一个监听其他服务端请求过来的线程。
1 | private final int portBindMaxRetry; //重试次数 |
从代码中我们可以看到真正工作的时里面建立的listenerHandlers,因此我们再来看这个子类
ListenerHandler
1 | private ServerSocket serverSocket; // 注意,这里时一个服务端 |
由于同步和异步的过程类似,我们这里只分析其中过一个,receiveConnection(client)主要过程调用了handleConnection(sock, din);
1 | private void handleConnection(Socket sock, DataInputStream din) throws IOException { |
从以上代码中我们可以总结以下过程:
- 第一次会创建一个ServerSocket,用来监听与其他server端的通信
- 在接受到请求后,会解析数据包,第一个解析是服务端的sid
- 如果sid < self.sid 那么久断开这次请求,并发起一个主动连接。保证了服务器两两之间只有一个连接
- 如果sid > self.sid 则会创建一个SendWorker和RecvWorker来处理这个连接。
- 所有需要发送的消息,以对端sid作为key放入queueSendMap,senderWorkerMap中的每个线程会不断去队列中拿消息发送
- 同待发送的消息一样,我们也猜测,recvQueue这个队列同样是接收所有的请求。我们往下看如何做的。
RecvWorker
属性:
- Socket sock; // 持有tcp连接的客户端,也就是用来接受和发送通信
- final DataInputStream din; // 重复使用的数据包
- final SendWorker sw; 对应的发送线程。
这个类主要在收到请求的时候会创建,每个Zookeeper服务节点都会开启一个群首选举的端口ServerSocket ,每收到一个连接都会开启一个客户端去接收信息,并发送信息。
我们来看它具体做的工作
1 |
|
从上面的代码中可以看出,这个类的主要作用就是不断的接收从socket传来的消息,并放入到recvQueue 队列中
recvQueue 什么时候处理呢?这里有疑惑的同学可以回到选举过程这一篇文章就可以看到再Message类中处理了。
SendWorker
与RecvWorker类似,它也有如下属性
- Socket sock;
- RecvWorker recvWorker; // 对应的接收进程
- DataOutputStream dout;
发送代码这里就不粘贴了,过程类似接收进程,不停的从发送队列中拿到消息然后发到对端。
这里有个不同要特别说明下,就是在没有消息的时候会重发发上次最后一条发送的消息也就是上面提到的lastMessageSent,为什么这么做,官方说明是因为,当我们同时删除和创建新连接时,有可能发送了消息但从未收到(这个时候可能发送队列为空),保留这个最近发送的消息列表,并重新发送最近发送的消息。接收多个副本是无害的(为什么无害?)。