Zookeeper 基础

zookeeper集群配置

准备3台服务器:
n1:192.168.1.101
n2:192.168.1.102
n3:192.168.1.103

安装zookeeper

1
2
3
4
cd /opt
wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz
tar -zxvf zookeeper-3.4.10.tar.gz
mv zookeeper-3.4.10 zookeeper

配置zookeeper

1
2
3
4
5
6
7
cd zookeeper/conf
cp zoo_sample.cfg zoo.cfg
vi zoo.cfg
dataDir=/var/zookeeper # 修改zookeeper的数据目录
server.1=192.168.1.101:2888:3888 # 指定集群节点
server.2=192.168.1.102:2888:3888 # 指定集群节点
server.3=192.168.1.103:2888:3888 # 指定集群节点

创建zookeeper数据目录和myid文件:

1
2
3
4
cd /var
mkdir zookeeper
cd zookeeper
vi myid

myid里键入各自在配置文件中的服务器编号即可。

启动zookeeper

1
2
cd /opt/zookeeper/conf
./zkServer.sh start

验证状态

1
telnet 192.168.1.101 2181

stat获取zookeepr状态

1
stat

输出:This ZooKeeper instance is not currently serving requests

再启动第二台,超过1/2的zookeeper集群节点正常工作后,zookeeper就可以提供服务了。

重新telnet后用stat查看n1:

1
2
3
4
5
6
7
8
Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x0
Mode: follower
Node count: 4

查看n2:

1
2
3
4
5
6
7
8
Latency min/avg/max: 0/0/0
Received: 2
Sent: 1
Connections: 1
Outstanding: 0
Zxid: 0x100000000
Mode: leader
Node count: 4

可以看到n1是follower角色,n2是leader角色。

zkCli.sh的使用

1
2
cd /opt/zookeeper/bin
./zkCli.sh -timeout 5000 -server 192.168.1.101:2181

输入h显示客户端可用命令:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
h
ZooKeeper -server host:port cmd args
stat path [watch]
set path data [version]
ls path [watch]
delquota [-n|-b] path
ls2 path [watch]
setAcl path acl
setquota -n|-b val path
history
redo cmdno
printwatches on|off
delete path [version]
sync path
listquota path
rmr path
get path [watch]
create [-s] [-e] path data acl
addauth scheme auth
quit
getAcl path
close
connect host:port

开源客户端ZkClient

引入maven依赖

1
2
3
4
5
6
7
8
9
10
<dependency>
<groupId>org.apache.zookeeper</groupId>
<artifactId>zookeeper</artifactId>
<version>3.4.6</version>
</dependency>
<dependency>
<groupId>com.101tec</groupId>
<artifactId>zkclient</artifactId>
<version>0.5</version>
</dependency>

会话创建

1
2
3
4
5
6
7
8
public class CreateSession {

public static void main(String[] args) {
ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
System.out.println("conneted ok!");
}

}

节点创建

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class CreateNode {

public static void main(String[] args) {
ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
System.out.println("conneted ok!");


User u = new User();
u.setId(1);
u.setName("test");
String path = zc.create("/jike5", u, CreateMode.PERSISTENT);
System.out.println("created path:"+path);
}

}

获取节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class GetData {

public static void main(String[] args) {
ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
System.out.println("conneted ok!");

Stat stat = new Stat();
User u = zc.readData("/jike5",stat);
System.out.println(u.toString());
System.out.println(stat);

}

}

获取子节点

1
2
3
4
5
6
7
8
9
10
11
12
13
public class GetChild {

public static void main(String[] args) {
ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
System.out.println("conneted ok!");

List<String> cList = zc.getChildren("/jike5");

System.out.println(cList.toString());

}

}

检测节点

1
2
3
4
5
6
7
8
9
10
11
12
13
public class NodeExists {

public static void main(String[] args) {
ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
System.out.println("conneted ok!");

boolean e = zc.exists("/jike5");

System.out.println(e);

}

}

节点删除

1
2
3
4
5
6
7
8
9
10
11
12
public class NodeDel {

public static void main(String[] args) {
ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
System.out.println("conneted ok!");

boolean e1 = zc.delete("/jike5");
boolean e2 = zc.deleteRecursive("/jike5");

}

}

数据修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
public class WriteData {

public static void main(String[] args) {
ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
System.out.println("conneted ok!");


User u = new User();
u.setId(2);
u.setName("test2");
zc.writeData("/jike5", u, 1);

}

}

订阅子节点列表变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SubscribeChildChanges {

private static class ZkChildListener implements IZkChildListener{

public void handleChildChange(String parentPath,
List<String> currentChilds) throws Exception {

System.out.println(parentPath);
System.out.println(currentChilds.toString());

}

}

public static void main(String[] args) throws InterruptedException {
ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer());
System.out.println("conneted ok!");

// 除子节点变化外,节点本身创建和删除也会收到通知
zc.subscribeChildChanges("/jike20", new ZkChildListener());
Thread.sleep(Integer.MAX_VALUE);

}

}

订阅数据内容变化

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
public class SubscribeDataChanges {

private static class ZkDataListener implements IZkDataListener{

public void handleDataChange(String dataPath, Object data)
throws Exception {
System.out.println(dataPath+":"+data.toString());
}

public void handleDataDeleted(String dataPath) throws Exception {
System.out.println(dataPath);
}

}

public static void main(String[] args) throws InterruptedException {
ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new BytesPushThroughSerializer());
System.out.println("conneted ok!");

zc.subscribeDataChanges("/jike20", new ZkDataListener());
Thread.sleep(Integer.MAX_VALUE);

}

}

开源客户端Curator

引入maven依赖

1
2
3
4
5
6
7
8
9
10
11
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
<version>2.8.0</version>
</dependency>

<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-recipes</artifactId>
<version>2.8.0</version>
</dependency>

创建会话

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
public class CreateSession {

public static void main(String[] args) throws InterruptedException {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

Thread.sleep(Integer.MAX_VALUE);


}

}

创建节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public class CreateNode {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();


String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.EPHEMERAL)
.forPath("/jike/1","123".getBytes());

System.out.println(path);

Thread.sleep(Integer.MAX_VALUE);


}

}

节点删除

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class DelNode {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/jike20");


Thread.sleep(Integer.MAX_VALUE);


}

}

获取子节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class GetChildren {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

List<String> cList = client.getChildren().forPath("/jike20");

System.out.println(cList.toString());

}

}

获取节点内容

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public class GetData {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

Stat stat = new Stat();

byte[] ret = client.getData().storingStatIn(stat).forPath("/jike");

System.out.println(new String(ret));

System.out.println(stat);


}

}

节点修改

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class UpdateData {

public static void main(String[] args) throws Exception {

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

Stat stat = new Stat();
client.getData().storingStatIn(stat).forPath("/jike");

client.setData().withVersion(stat.getVersion()).forPath("/jike", "123".getBytes());


}

}

检测节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public class checkexists {

public static void main(String[] args) throws Exception {

ExecutorService es = Executors.newFixedThreadPool(5);

//RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
//RetryPolicy retryPolicy = new RetryNTimes(5, 1000);
RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
// CuratorFramework client = CuratorFrameworkFactory
// .newClient("192.168.1.105:2181",5000,5000, retryPolicy);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

// Stat s = client.checkExists().forPath("/jike");

client.checkExists().inBackground(new BackgroundCallback() {

public void processResult(CuratorFramework arg0, CuratorEvent arg1)
throws Exception {

Stat stat = arg1.getStat();
System.out.println(stat);
System.out.println(arg1.getContext());

}
},"123",es).forPath("/jike");

Thread.sleep(Integer.MAX_VALUE);

}

}

节点监听

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
public class NodeListener {

public static void main(String[] args) throws Exception {

RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

final NodeCache cache = new NodeCache(client,"/jike");
cache.start();
cache.getListenable().addListener(new NodeCacheListener() {

public void nodeChanged() throws Exception {
byte[] ret = cache.getCurrentData().getData();
System.out.println("new data:"+new String(ret));
}
});

Thread.sleep(Integer.MAX_VALUE);

}

}

监听子节点

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public class NodeChildrenListener {

public static void main(String[] args) throws Exception {

RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

final PathChildrenCache cache = new PathChildrenCache(client,"/jike",true);
cache.start();
cache.getListenable().addListener(new PathChildrenCacheListener() {

public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
throws Exception {
switch (event.getType()) {
case CHILD_ADDED:
System.out.println("CHILD_ADDED:"+event.getData());
break;
case CHILD_UPDATED:
System.out.println("CHILD_UPDATED:"+event.getData());
break;
case CHILD_REMOVED:
System.out.println("CHILD_REMOVED:"+event.getData());
break;
default:
break;
}
}
});

Thread.sleep(Integer.MAX_VALUE);

}

}

指定权限

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
public class CreateNodeAuth {

public static void main(String[] args) throws Exception {

RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);

CuratorFramework client = CuratorFrameworkFactory
.builder()
.connectString("192.168.1.105:2181")
.sessionTimeoutMs(5000)
.connectionTimeoutMs(5000)
.retryPolicy(retryPolicy)
.build();

client.start();

ACL aclIp = new ACL(Perms.READ,new Id("ip","192.168.1.105"));
ACL aclDigest = new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("jike:123456")));
ArrayList<ACL> acls = new ArrayList<ACL>();
acls.add(aclDigest);
acls.add(aclIp);

String path = client.create()
.creatingParentsIfNeeded()
.withMode(CreateMode.PERSISTENT)
.withACL(acls)
.forPath("/jike/3","123".getBytes());

System.out.println(path);

Thread.sleep(Integer.MAX_VALUE);


}

}