zookeeper的watch(原生API)

github: https://github.com/zhaikaishun/zookeeper_tutorial


Zookeeper的watcher事件

zookeeper有watch事件,是一次性触发的,当watch监视的数据发生变化时,通知设置了该watch的client,即watcher。
同样,其watcher是监听数据发生了某些变化,那就一定会有对应的事件类型,和状态类型。

事件类型(znode节点相关的)

  • EventType.NodeCreated
  • EventType.NodeDataChanged
  • EventType.NodeChildrenChanged
  • EventType.NodeDeleted
    状态类型:(是跟客户端实例相关的,简单的说就是客户端和服务器端连接状态相关的)
  • KeeperState.Disconnected
  • KeeperState.SyncConnected
  • KeeperState.AuthFailed 认证失败
  • KeeperState.Expired 过期

watcher和watch

简单的说,一个节点上的某个程序监控某个节点,那么这个节点上的这个程序就是一个watcher,而监听的这个事件(动作),就是一个watch。watch事件,是一次性触发的,只能监听一次,第二次对此节点的修改就监听不到了,如果想一直监听,大概有两种方案,一种是在出发事件后执行方法的时候有个watch的参数再设置为true,一种是这个时候再创建一个watch,这种是有点麻烦。
看一个例子吧:
最好是去github上下载下来自己运行一下。

  • 设置watcher, 我这里每次create的时候,都设置了一下watcher,例如下面代码中 this.zk.exists(path, ifsetTrue);
  • 需要实现implements Watcher接口以及重写实现方法process,我这里监听比较简单。也就不多说了。
  • 反正是要注意,如果只设置一次监听,那么监听完之后,第二次就监听不到了。若需要多次监听,那么最好是再监听一次
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    158
    159
    160
    161
    162
    163
    164
    165
    166
    167
    168
    169
    170
    171
    172
    173
    174
    175
    176
    177
    178
    179
    180
    181
    182
    183
    184
    185
    186
    187
    188
    189
    190
    191
    192
    193
    194
    195
    196
    197
    198
    199
    200
    201
    202
    203
    204
    205
    206
    207
    208
    209
    210
    211
    212
    213
    214
    215
    216
    217
    218
    219
    220
    221
    222
    223
    224
    225
    226
    227
    228
    229
    230
    231
    232
    233
    234
    235
    236
    237
    238
    239
    240
    241
    242
    243
    244
    245
    246
    247
    248
    249
    250
    251
    252
    253
    254
    255
    256
    257
    258
    259
    260
    261
    262
    263
    264
    265
    266
    267
    268
    269
    270
    271
    272
    273
    274
    275
    276
    277
    278
    279
    280
    281
    282
    283
    284
    285
    286
    287
    288
    289
    290
    291
    292
    293
    294
    295
    296
    297
    298
    299
    300
    301
    302
    303
    304
    305
    306
    /**
    * Zookeeper Wathcher
    * 本类就是一个Watcher类(实现了org.apache.zookeeper.Watcher类)
    * @author(alienware)
    * @since 2015-6-14
    */
    public class ZooKeeperWatcher implements Watcher {
    /** 定义原子变量 */
    AtomicInteger seq = new AtomicInteger();
    /** 定义session失效时间 */
    private static final int SESSION_TIMEOUT = 10000;
    /** zookeeper服务器地址 */
    private static final String CONNECTION_ADDR = "192.168.1.31:2181";
    /** zk父路径设置 */
    private static final String PARENT_PATH = "/testWatch";
    /** zk子路径设置 */
    private static final String CHILDREN_PATH = "/testWatch/children";
    /** 进入标识 */
    private static final String LOG_PREFIX_OF_MAIN = "【Main】";
    /** zk变量 */
    private ZooKeeper zk = null;
    /** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */
    private CountDownLatch connectedSemaphore = new CountDownLatch(1);
    /**
    * 创建ZK连接
    * @param connectAddr ZK服务器地址列表
    * @param sessionTimeout Session超时时间
    */
    public void createConnection(String connectAddr, int sessionTimeout) {
    this.releaseConnection();
    try {
    zk = new ZooKeeper(connectAddr, sessionTimeout, this);
    System.out.println(LOG_PREFIX_OF_MAIN + "开始连接ZK服务器");
    connectedSemaphore.await();
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    /**
    * 关闭ZK连接
    */
    public void releaseConnection() {
    if (this.zk != null) {
    try {
    this.zk.close();
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    }
    }
    /**
    * 创建节点
    * @param path 节点路径
    * @param data 数据内容
    * @return
    */
    public boolean createPath(String path, String data,boolean ifsetTrue) {
    try {
    //设置监控(由于zookeeper的监控都是一次性的所以 每次必须设置监控)
    this.zk.exists(path, ifsetTrue);
    System.out.println(LOG_PREFIX_OF_MAIN + "节点创建成功, Path: " +
    this.zk.create( /**路径*/
    path,
    /**数据*/
    data.getBytes(),
    /**所有可见*/
    Ids.OPEN_ACL_UNSAFE,
    /**永久存储*/
    CreateMode.PERSISTENT ) +
    ", content: " + data);
    } catch (Exception e) {
    e.printStackTrace();
    return false;
    }
    return true;
    }
    /**
    * 读取指定节点数据内容
    * @param path 节点路径
    * @return
    */
    public String readData(String path, boolean needWatch) {
    try {
    return new String(this.zk.getData(path, needWatch, null));
    } catch (Exception e) {
    e.printStackTrace();
    return "";
    }
    }
    /**
    * 更新指定节点数据内容
    * @param path 节点路径
    * @param data 数据内容
    * @return
    */
    public boolean writeData(String path, String data) {
    try {
    System.out.println(LOG_PREFIX_OF_MAIN + "更新数据成功,path:" + path + ", stat: " +
    this.zk.setData(path, data.getBytes(), -1));
    } catch (Exception e) {
    e.printStackTrace();
    }
    return false;
    }
    /**
    * 删除指定节点
    *
    * @param path
    * 节点path
    */
    public void deleteNode(String path) {
    try {
    this.zk.delete(path, -1);
    System.out.println(LOG_PREFIX_OF_MAIN + "删除节点成功,path:" + path);
    } catch (Exception e) {
    e.printStackTrace();
    }
    }
    /**
    * 判断指定节点是否存在
    * @param path 节点路径
    */
    public Stat exists(String path, boolean needWatch) {
    try {
    return this.zk.exists(path, needWatch);
    } catch (Exception e) {
    e.printStackTrace();
    return null;
    }
    }
    /**
    * 获取子节点
    * @param path 节点路径
    */
    private List<String> getChildren(String path, boolean needWatch) {
    try {
    return this.zk.getChildren(path, needWatch);
    } catch (Exception e) {
    e.printStackTrace();
    return null;
    }
    }
    /**
    * 删除所有节点
    */
    public void deleteAllTestPath() {
    if(this.exists(CHILDREN_PATH, false) != null){
    this.deleteNode(CHILDREN_PATH);
    }
    if(this.exists(PARENT_PATH, false) != null){
    this.deleteNode(PARENT_PATH);
    }
    }
    /**
    * 收到来自Server的Watcher通知后的处理。
    */
    @Override
    public void process(WatchedEvent event) {
    System.out.println("进入 process 。。。。。event = " + event);
    try {
    Thread.sleep(200);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    if (event == null) {
    return;
    }
    // 连接状态
    KeeperState keeperState = event.getState();
    // 事件类型
    EventType eventType = event.getType();
    // 受影响的path
    String path = event.getPath();
    String logPrefix = "【Watcher-" + this.seq.incrementAndGet() + "】";
    System.out.println(logPrefix + "收到Watcher通知");
    System.out.println(logPrefix + "连接状态:\t" + keeperState.toString());
    System.out.println(logPrefix + "事件类型:\t" + eventType.toString());
    if (KeeperState.SyncConnected == keeperState) {
    // 成功连接上ZK服务器
    if (EventType.None == eventType) {
    System.out.println(logPrefix + "成功连接上ZK服务器");
    connectedSemaphore.countDown();
    }
    //创建节点
    else if (EventType.NodeCreated == eventType) {
    System.out.println(logPrefix + "节点创建");
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    this.exists(path, true);
    }
    //更新节点
    else if (EventType.NodeDataChanged == eventType) {
    System.out.println(logPrefix + "节点数据更新");
    System.out.println("我看看走不走这里........");
    try {
    Thread.sleep(100);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(logPrefix + "数据内容: " + this.readData(PARENT_PATH, true));
    }
    //更新子节点
    else if (EventType.NodeChildrenChanged == eventType) {
    System.out.println(logPrefix + "子节点变更");
    try {
    Thread.sleep(3000);
    } catch (InterruptedException e) {
    e.printStackTrace();
    }
    System.out.println(logPrefix + "子节点列表:" + this.getChildren(PARENT_PATH, true));
    }
    //删除节点
    else if (EventType.NodeDeleted == eventType) {
    System.out.println(logPrefix + "节点 " + path + " 被删除");
    }
    else ;
    }
    else if (KeeperState.Disconnected == keeperState) {
    System.out.println(logPrefix + "与ZK服务器断开连接");
    }
    else if (KeeperState.AuthFailed == keeperState) {
    System.out.println(logPrefix + "权限检查失败");
    }
    else if (KeeperState.Expired == keeperState) {
    System.out.println(logPrefix + "会话失效");
    }
    else ;
    System.out.println("--------------------------------------------");
    }
    /**
    * <B>方法名称:</B>测试zookeeper监控<BR>
    * <B>概要说明:</B>主要测试watch功能<BR>
    * @param args
    * @throws Exception
    */
    public static void main(String[] args) throws Exception {
    //建立watcher
    ZooKeeperWatcher zkWatch = new ZooKeeperWatcher();
    //创建连接
    zkWatch.createConnection(CONNECTION_ADDR, SESSION_TIMEOUT);
    //System.out.println(zkWatch.zk.toString());
    Thread.sleep(1000);
    // 清理节点
    zkWatch.deleteAllTestPath();
    if (zkWatch.createPath(PARENT_PATH, System.currentTimeMillis() + "",true)) {
    Thread.sleep(1000);
    // 读取数据
    System.out.println("---------------------- read parent ----------------------------");
    //zkWatch.readData(PARENT_PATH, true);
    // 读取子节点
    System.out.println("---------------------- read children path ----------------------------");
    zkWatch.getChildren(PARENT_PATH, true);
    // 更新数据
    zkWatch.writeData(PARENT_PATH, System.currentTimeMillis() + "");
    Thread.sleep(1000);
    // 创建子节点
    zkWatch.createPath(CHILDREN_PATH, System.currentTimeMillis() + "",true);
    Thread.sleep(1000);
    zkWatch.writeData(CHILDREN_PATH, System.currentTimeMillis() + "");
    }
    Thread.sleep(50000);
    // 清理节点
    zkWatch.deleteAllTestPath();
    Thread.sleep(1000);
    zkWatch.releaseConnection();
    }
    }

输出就在这里了,想具体了解的话,自己敲一下,然后覆盖一下代码。一个一个功能的执行,查看功能即可

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
【Main】开始连接ZK服务器
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:None path:null
【Watcher-1】收到Watcher通知
【Watcher-1】连接状态: SyncConnected
【Watcher-1】事件类型: None
【Watcher-1】成功连接上ZK服务器
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatch
【Main】节点创建成功, Path: /testWatch, content: 1508077399368
【Watcher-2】收到Watcher通知
【Watcher-2】连接状态: SyncConnected
【Watcher-2】事件类型: NodeCreated
【Watcher-2】节点创建
--------------------------------------------
---------------------- read parent ----------------------------
---------------------- read children path ----------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatch
【Main】更新数据成功,path:/testWatch, stat: 30064771078,30064771079,1508071193606,1508071194762,1,0,0,0,13,0,30064771078
【Watcher-3】收到Watcher通知
【Watcher-3】连接状态: SyncConnected
【Watcher-3】事件类型: NodeDataChanged
【Watcher-3】节点数据更新
我看看走不走这里........
【Watcher-3】数据内容: 1508077400538
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeCreated path:/testWatch/children
【Main】节点创建成功, Path: /testWatch/children, content: 1508077401581
【Watcher-4】收到Watcher通知
【Watcher-4】连接状态: SyncConnected
【Watcher-4】事件类型: NodeCreated
【Watcher-4】节点创建
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatch
【Watcher-5】收到Watcher通知
【Watcher-5】连接状态: SyncConnected
【Watcher-5】事件类型: NodeChildrenChanged
【Watcher-5】子节点变更
【Main】更新数据成功,path:/testWatch/children, stat: 30064771080,30064771081,1508071195777,1508071196784,1,0,0,0,13,0,30064771080
【Watcher-5】子节点列表:[children]
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeDataChanged path:/testWatch/children
【Watcher-6】收到Watcher通知
【Watcher-6】连接状态: SyncConnected
【Watcher-6】事件类型: NodeDataChanged
【Watcher-6】节点数据更新
我看看走不走这里........
【Watcher-6】数据内容: 1508077400538
--------------------------------------------
进入 process 。。。。。event = WatchedEvent state:SyncConnected type:NodeChildrenChanged path:/testWatch
【Main】删除节点成功,path:/testWatch/children
【Main】删除节点成功,path:/testWatch
【Watcher-7】收到Watcher通知
【Watcher-7】连接状态: SyncConnected
【Watcher-7】事件类型: NodeChildrenChanged
【Watcher-7】子节点变更
```
## 实际应用一个场景
我们希望zookeeper对分布式系统的配置文件进行管理,也就是说多个服务器进行watcher,zookeeper节点发送变化,则我们实时更新配置文件。我们要完成多个应用服务器注册watcher,然后实时观察数据的变化,然后反馈给媒体服务器变更的数据信息,观察zookeeper节点
下面是一个例子: 代码在bjsxt.zookeeper.cluster中
本例子模拟多台服务器同时监控一个节点。然后另外一个程序进行管理,所监控的这几台机器得到节点变更的通知。
本例中,Client1和Client2相当于两台服务器,共同watch一个节点。Test相当于管理者,用来管理这两个客户端的配置。
代码如下,具体的需要自己下载下来进行调试
Client1
```java
public class Client1 {
public static void main(String[] args) throws Exception{
ZKWatcher myWatcher = new ZKWatcher();
Thread.sleep(100000000);
}
}
```
Client2

public class Client2 {

public static void main(String[] args) throws Exception{

    ZKWatcher myWatcher = new ZKWatcher();
    Thread.sleep(100000000);
}

}

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
ZKWatcher
```java
public class ZKWatcher implements Watcher {
/** zk变量 */
private ZooKeeper zk = null;
/** 父节点path */
static final String PARENT_PATH = "/super";
/** 信号量设置,用于等待zookeeper连接建立之后 通知阻塞程序继续向下执行 */
private CountDownLatch connectedSemaphore = new CountDownLatch(1);
private List<String> cowaList = new CopyOnWriteArrayList<String>();
/** zookeeper服务器地址 */
public static final String CONNECTION_ADDR = "192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181";
/** 定义session失效时间 */
public static final int SESSION_TIMEOUT = 30000;
public ZKWatcher() throws Exception{
zk = new ZooKeeper(CONNECTION_ADDR, SESSION_TIMEOUT, this);
System.out.println("开始连接ZK服务器");
connectedSemaphore.await();
}
@Override
public void process(WatchedEvent event) {
// 连接状态
KeeperState keeperState = event.getState();
// 事件类型
EventType eventType = event.getType();
// 受影响的path
String path = event.getPath();
System.out.println("受影响的path : " + path);
if (KeeperState.SyncConnected == keeperState) {
// 成功连接上ZK服务器
if (EventType.None == eventType) {
System.out.println("成功连接上ZK服务器");
connectedSemaphore.countDown();
try {
if(this.zk.exists(PARENT_PATH, false) == null){
this.zk.create(PARENT_PATH, "root".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
List<String> paths = this.zk.getChildren(PARENT_PATH, true);
for (String p : paths) {
System.out.println(p);
this.zk.exists(PARENT_PATH + "/" + p, true);
}
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//创建节点
else if (EventType.NodeCreated == eventType) {
System.out.println("节点创建");
try {
this.zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//更新节点
else if (EventType.NodeDataChanged == eventType) {
System.out.println("节点数据更新");
try {
//update nodes call function
this.zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//更新子节点
else if (EventType.NodeChildrenChanged == eventType) {
System.out.println("子节点 ... 变更");
try {
List<String> paths = this.zk.getChildren(path, true);
if(paths.size() >= cowaList.size()){
paths.removeAll(cowaList);
for(String p : paths){
this.zk.exists(path + "/" + p, true);
//this.zk.getChildren(path + "/" + p, true);
System.out.println("这个是新增的子节点 : " + path + "/" + p);
//add new nodes call function
}
cowaList.addAll(paths);
} else {
cowaList = paths;
}
System.out.println("cowaList: " + cowaList.toString());
System.out.println("paths: " + paths.toString());
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
//删除节点
else if (EventType.NodeDeleted == eventType) {
System.out.println("节点 " + path + " 被删除");
try {
//delete nodes call function
this.zk.exists(path, true);
} catch (KeeperException | InterruptedException e) {
e.printStackTrace();
}
}
else ;
}
else if (KeeperState.Disconnected == keeperState) {
System.out.println("与ZK服务器断开连接");
}
else if (KeeperState.AuthFailed == keeperState) {
System.out.println("权限检查失败");
}
else if (KeeperState.Expired == keeperState) {
System.out.println("会话失效");
}
else ;
System.out.println("--------------------------------------------");
}
}

Test

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
public class Test {
/** zookeeper地址 */
static final String CONNECT_ADDR = "192.168.1.31:2181,192.168.1.32:2181,192.168.1.33:2181";
/** session超时时间 */
static final int SESSION_OUTTIME = 2000;//ms
/** 信号量,阻塞程序执行,用于等待zookeeper连接成功,发送成功信号 */
static final CountDownLatch connectedSemaphore = new CountDownLatch(1);
public static void main(String[] args) throws Exception{
ZooKeeper zk = new ZooKeeper(CONNECT_ADDR, SESSION_OUTTIME, new Watcher(){
@Override
public void process(WatchedEvent event) {
//获取事件的状态
KeeperState keeperState = event.getState();
EventType eventType = event.getType();
//如果是建立连接
if(KeeperState.SyncConnected == keeperState){
if(EventType.None == eventType){
//如果建立连接成功,则发送信号量,让后续阻塞程序向下执行
connectedSemaphore.countDown();
System.out.println("zk 建立连接");
}
}
}
});
//进行阻塞
connectedSemaphore.await();
// //创建子节点
// zk.create("/super/c1", "c1".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//创建子节点
// zk.create("/super/c2", "c2".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//创建子节点
String result = zk.create("/super/c3", "c3".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
System.out.println(result);
//创建子节点
// zk.create("/super/c4", "c4".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
// zk.create("/super/c4/c44", "c44".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
//获取节点信息
// byte[] data = zk.getData("/testRoot", false, null);
// System.out.println(new String(data));
// System.out.println(zk.getChildren("/testRoot", false));
//修改节点的值
// zk.setData("/super/c1", "modify c1".getBytes(), -1);
// zk.setData("/super/c2", "modify c2".getBytes(), -1);
// byte[] data = zk.getData("/super/c2", false, null);
// System.out.println(new String(data));
// //判断节点是否存在
// System.out.println(zk.exists("/super/c3", false));
// //删除节点
// zk.delete("/super/c3", -1);
zk.close();
}
}

先启动Client1,client1打印

1
2
3
开始连接ZK服务器
受影响的path : null
成功连接上ZK服务器

再启动Client2,

1
2
3
4
开始连接ZK服务器
受影响的path : null
成功连接上ZK服务器
--------------------------------------------

再启动Test类,此时Client1和Client2打印的内容是

1
2
3
4
5
6
7
8
9
开始连接ZK服务器
受影响的path : null
成功连接上ZK服务器
--------------------------------------------
受影响的path : /super
子节点 ... 变更
这个是新增的子节点 : /super/c3
cowaList: [c3]
paths: [c3]

Test打印的内容是

1
2
zk 建立连接
/super/c3

更多测试,需要自己来运行并且思考结果。这里只起抛砖引玉的作用