三、利用Zookeeper实现java主从模式

基本操作函数

连接服务器

Zookeeper的所有操作都基于会话,因为我们也从连接服务器开始。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Slf4j
public class CommonClient implements Watcher {
ZooKeeper zk;
String hostPort="localhost:2181";
void startZK() throws IOException {
zk = new ZooKeeper(hostPort, 15000, this); // 连接到字符串
}
@Override
public void process(WatchedEvent event) {
log.info("{} 状态{}",this+event.toString());
}
}
public static void main(String[] args) throws IOException, InterruptedException {
CommonClient commonClient = new CommonClient();
commonClient.startZK();
while (true){
Thread.sleep(1000);
}
}
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)

三个必要参数:

  • connectString:连接串,也就是host:port 列表,表示服务器地址,连接成功后悔返回一个sessionId
  • sessionTimeout:超时时间(至少是tickTime的两倍,至多是20倍),服务器会返回一个它的
  • watcher:默认监听器,有两个作用,一个是作为会话的监听对象(之前在介绍api的时候我们连接到服务器显示的watch字段),另一个是作为后面其他操作的默认监听器。

如上的代码,是一个简单的示例,连接到server端,并继承Watche,将自己作为watche注册。运行代码:

1
2
11:43:06.451|INFO |onConnected|org.apache.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, session id = 0x100052db5000015, negotiated timeout = 15000
11:43:06.453|INFO |process|com.huawei.cq.CommonClient - com.huawei.cq.CommonClient@551bdc27 状态 WatchedEvent state:SyncConnected type:None path:null

上面的输出可以看出,连接成功,并触发了监听

创建节点

一般Zookeeper方法中有同步和异步两种方式,下面我们用示例来演示,不过通常我们使用异步的方式来处理。

我们接着上面的代码继续往下,新增一个Node类,这个是为我们主从模式作铺垫。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
/**
* Node 类
**/
public class Node extends CommonClient {
Random random = new Random();
// 唯一标识id
String serverId = Integer.toString(random.nextInt());
static boolean isLeader = false;

// 同步创建master节点
void runForMaster() throws InterruptedException, KeeperException, IOException {
startZK(); // 连接
String s = zk.create("/master", serverId.getBytes(), OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL); // 创建节点
log.info(s);
}
// 同步创建master节点
void runForMasterSync() throws InterruptedException, KeeperException, IOException {
startZK(); // 连接
zk.create("/master", serverId.getBytes(), OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL,new MasterCreateCallBack(),null); // 创建节点
}
}
/**
* MasterCreateCallBack类,回调
**/
public class MasterCreateCallBack implements StringCallback {
@Override
public void processResult(int rc, String path, Object ctx, String name) {
log.info("{}创建结果{}",name,Code.get(rc));
}
}
// 同步
public String create(
final String path,
byte[] data,
List<ACL> acl,
CreateMode createMode) // 返回创建的路径
public void create(
final String path,
byte[] data,
List<ACL> acl,
CreateMode createMode,
StringCallback cb,
Object ctx)
`

参数:

  • path:节点路径
  • data:节点绑定的数据
  • acl: 权限列表,可以参考[ZooKeeper ACL权限控制]https://blog.csdn.net/liuxiao723846/article/details/79391650)
  • createMode: 节点的类型,比如持久节点还是临时节点
  • cb: 回调,这个是一个创建的模式,创建完成后会触发回调函数
  • ctx: 环境变量,也就是回调函数中所需要的。

由上可以看出,同步和异步没有太大的不同,主要是一些参数的不同,异步有点麻烦的是要实现回调,但它的性能和可操作性远胜同步的。
在main函数中分别加入这两个方法启动:

1
2
3
4
5
6
## 同步方法
11:45:02.187|INFO |onConnected|org.apache.zookeeper.ClientCnxn - Session establishment complete on server localhost/0:0:0:0:0:0:0:1:2181, session id = 0x100052db5000018, negotiated timeout = 15000
11:45:02.188|INFO |process|com.huawei.cq.CommonClient - com.huawei.cq.Node@1f1c7bf6 状态 WatchedEvent state:SyncConnected type:None path:null
## 异步方法
11:43:06.462|INFO |process|com.huawei.cq.CommonClient - com.huawei.cq.Node@1f1c7bf6 状态WatchedEvent state:SyncConnected type:None path:null
11:43:06.467|INFO |processResult|com.huawei.cq.MasterCreateCallBack - /master创建结果OK

我们发现/master节点可以创建两次,也就是说明确实是临时节点,不过如果两次启动够快的话,会出现nodeExists的状态。

监听器watches

所有读操作都能增加一个监听器,getData()、getChildre()、exists(). 使用方法如:

1
2
3
public Stat exists(final String path, Watcher watcher) ;  // 使用心得监听器
public Stat exists(String path, boolean watch);// 是否使用默认监听器
public void exists(final String path, Watcher watcher, StatCallback cb, Object ctx) // 异步方法

监听器是在服务端维护的,所以就算客户端失去连接了,也可以选择在重新连接上后再次注册。
在定义监听器时,需要考虑三个关键点:

  • 只触发一次:默认watch,数据更改后,一个监视事件将发送给客户端,但在此修改后,不会发送,除非你重新再设置。
  • 异步发送:客户端可能因为网络原因导致不会理解获得监听器返回的数据,不过Zookeeper保证在收到监听数据之前,看不到任何改节点的变换,也就是胡搜Zookeeper保证了顺序一致性。
  • 不同的watch返回的数据不一样: getData()和exists()返回节点的信息,但getChildren()返回的是子节点列表。

触发情况

事件 exists getData getChildren
创建create ture true false
删除delete true true true
修改set true true flase
子节点事件 false false true
  • 示例一:我们接着使用上面的代码来演示,在我们创建/master 之前,我们使用CommonClient来监听/master节点,我们在创建完成后,再进行删除。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    public class TestWatch implements Watcher {
    @Override
    public void process(WatchedEvent event) {
    log.info("监听结果:{}",event);
    }
    }
    // CommonClient 类中添加
    void masterExist() throws KeeperException, InterruptedException {
    zk.exists("/master",new TestWatch());
    }
    // main 函数中添加
    CommonClient commonClient = new CommonClient();
    commonClient.startZK();
    Node node=new Node();
    commonClient.masterExist();
    node.runForMaster();
    Thread.sleep(1000);
    log.info("--------------------------------");
    node.stopZK();
    while (true){
    Thread.sleep(1000);
    }

测试结果:

1
2
3
4
5
6
7
11:47:44.014|INFO |process|com.huawei.cq.CommonClient - com.huawei.cq.Node@1f1c7bf6 状态 WatchedEvent state:SyncConnected type:None path:null
11:47:44.016|INFO |runForMaster|com.huawei.cq.Node - /master
11:47:44.017|INFO |process|com.huawei.cq.TestWatch - 监听结果:WatchedEvent state:SyncConnected type:NodeCreated path:/master
11:47:45.017|INFO |main|com.huawei.cq.Application - --------------------------------
11:47:45.119|INFO |process|com.huawei.cq.CommonClient - com.huawei.cq.Node@1f1c7bf6 状态 WatchedEvent state:Closed type:None path:null
11:47:45.120|INFO |close|org.apache.zookeeper.ZooKeeper - Session: 0x100052db500001a closed
11:47:45.120|INFO |run|org.apache.zookeeper.ClientCnxn - EventThread shut down for session: 0x100052db500001a

由上可以看出,在创建的时候触发了一次监听,但在删除的时候不会再触发。

哪有没有永久触发呢,肯定是有的,3.6.0版本之后增加了永久监听器,后面会讲。