zookeeper集群配置
准备3台服务器:
n1:192.168.1.101
n2:192.168.1.102
n3:192.168.1.103
安装zookeeper
1 2 3 4
| cd /opt wget http://mirror.bit.edu.cn/apache/zookeeper/stable/zookeeper-3.4.10.tar.gz tar -zxvf zookeeper-3.4.10.tar.gz mv zookeeper-3.4.10 zookeeper
|
配置zookeeper
1 2 3 4 5 6 7
| cd zookeeper/conf cp zoo_sample.cfg zoo.cfg vi zoo.cfg dataDir=/var/zookeeper server.1=192.168.1.101:2888:3888 server.2=192.168.1.102:2888:3888 server.3=192.168.1.103:2888:3888
|
创建zookeeper数据目录和myid文件:
1 2 3 4
| cd /var mkdir zookeeper cd zookeeper vi myid
|
myid里键入各自在配置文件中的服务器编号即可。
启动zookeeper
1 2
| cd /opt/zookeeper/conf ./zkServer.sh start
|
验证状态
1
| telnet 192.168.1.101 2181
|
stat获取zookeepr状态
输出:This ZooKeeper instance is not currently serving requests
再启动第二台,超过1/2的zookeeper集群节点正常工作后,zookeeper就可以提供服务了。
重新telnet后用stat查看n1:
1 2 3 4 5 6 7 8
| Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x0 Mode: follower Node count: 4
|
查看n2:
1 2 3 4 5 6 7 8
| Latency min/avg/max: 0/0/0 Received: 2 Sent: 1 Connections: 1 Outstanding: 0 Zxid: 0x100000000 Mode: leader Node count: 4
|
可以看到n1是follower角色,n2是leader角色。
zkCli.sh的使用
1 2
| cd /opt/zookeeper/bin ./zkCli.sh -timeout 5000 -server 192.168.1.101:2181
|
输入h显示客户端可用命令:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| h ZooKeeper -server host:port cmd args stat path [watch] set path data [version] ls path [watch] delquota [-n|-b] path ls2 path [watch] setAcl path acl setquota -n|-b val path history redo cmdno printwatches on|off delete path [version] sync path listquota path rmr path get path [watch] create [-s] [-e] path data acl addauth scheme auth quit getAcl path close connect host:port
|
开源客户端ZkClient
引入maven依赖
1 2 3 4 5 6 7 8 9 10
| <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <version>3.4.6</version> </dependency> <dependency> <groupId>com.101tec</groupId> <artifactId>zkclient</artifactId> <version>0.5</version> </dependency>
|
会话创建
1 2 3 4 5 6 7 8
| public class CreateSession {
public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); } }
|
节点创建
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class CreateNode {
public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); User u = new User(); u.setId(1); u.setName("test"); String path = zc.create("/jike5", u, CreateMode.PERSISTENT); System.out.println("created path:"+path); } }
|
获取节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class GetData {
public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); Stat stat = new Stat(); User u = zc.readData("/jike5",stat); System.out.println(u.toString()); System.out.println(stat); } }
|
获取子节点
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class GetChild {
public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); List<String> cList = zc.getChildren("/jike5"); System.out.println(cList.toString()); } }
|
检测节点
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class NodeExists {
public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); boolean e = zc.exists("/jike5"); System.out.println(e); } }
|
节点删除
1 2 3 4 5 6 7 8 9 10 11 12
| public class NodeDel {
public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); boolean e1 = zc.delete("/jike5"); boolean e2 = zc.deleteRecursive("/jike5"); } }
|
数据修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| public class WriteData {
public static void main(String[] args) { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!"); User u = new User(); u.setId(2); u.setName("test2"); zc.writeData("/jike5", u, 1); } }
|
订阅子节点列表变化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class SubscribeChildChanges { private static class ZkChildListener implements IZkChildListener{
public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception {
System.out.println(parentPath); System.out.println(currentChilds.toString()); } }
public static void main(String[] args) throws InterruptedException { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new SerializableSerializer()); System.out.println("conneted ok!");
zc.subscribeChildChanges("/jike20", new ZkChildListener()); Thread.sleep(Integer.MAX_VALUE);
} }
|
订阅数据内容变化
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public class SubscribeDataChanges { private static class ZkDataListener implements IZkDataListener{
public void handleDataChange(String dataPath, Object data) throws Exception { System.out.println(dataPath+":"+data.toString()); }
public void handleDataDeleted(String dataPath) throws Exception { System.out.println(dataPath); }
}
public static void main(String[] args) throws InterruptedException { ZkClient zc = new ZkClient("192.168.1.105:2181",10000,10000,new BytesPushThroughSerializer()); System.out.println("conneted ok!"); zc.subscribeDataChanges("/jike20", new ZkDataListener()); Thread.sleep(Integer.MAX_VALUE);
} }
|
开源客户端Curator
引入maven依赖
1 2 3 4 5 6 7 8 9 10 11
| <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.8.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.8.0</version> </dependency>
|
创建会话
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public class CreateSession {
public static void main(String[] args) throws InterruptedException { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); Thread.sleep(Integer.MAX_VALUE); } }
|
创建节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| public class CreateNode {
public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); String path = client.create() .creatingParentsIfNeeded() .withMode(CreateMode.EPHEMERAL) .forPath("/jike/1","123".getBytes()); System.out.println(path); Thread.sleep(Integer.MAX_VALUE); } }
|
节点删除
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class DelNode {
public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); client.delete().guaranteed().deletingChildrenIfNeeded().withVersion(-1).forPath("/jike20");
Thread.sleep(Integer.MAX_VALUE); } }
|
获取子节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class GetChildren {
public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); List<String> cList = client.getChildren().forPath("/jike20"); System.out.println(cList.toString()); } }
|
获取节点内容
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
| public class GetData {
public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); Stat stat = new Stat(); byte[] ret = client.getData().storingStatIn(stat).forPath("/jike"); System.out.println(new String(ret)); System.out.println(stat); } }
|
节点修改
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class UpdateData {
public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); Stat stat = new Stat(); client.getData().storingStatIn(stat).forPath("/jike"); client.setData().withVersion(stat.getVersion()).forPath("/jike", "123".getBytes()); } }
|
检测节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public class checkexists {
public static void main(String[] args) throws Exception { ExecutorService es = Executors.newFixedThreadPool(5); RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000);
CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); client.checkExists().inBackground(new BackgroundCallback() { public void processResult(CuratorFramework arg0, CuratorEvent arg1) throws Exception { Stat stat = arg1.getStat(); System.out.println(stat); System.out.println(arg1.getContext()); } },"123",es).forPath("/jike"); Thread.sleep(Integer.MAX_VALUE); } }
|
节点监听
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| public class NodeListener {
public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); final NodeCache cache = new NodeCache(client,"/jike"); cache.start(); cache.getListenable().addListener(new NodeCacheListener() { public void nodeChanged() throws Exception { byte[] ret = cache.getCurrentData().getData(); System.out.println("new data:"+new String(ret)); } }); Thread.sleep(Integer.MAX_VALUE); } }
|
监听子节点
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public class NodeChildrenListener {
public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); final PathChildrenCache cache = new PathChildrenCache(client,"/jike",true); cache.start(); cache.getListenable().addListener(new PathChildrenCacheListener() { public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { switch (event.getType()) { case CHILD_ADDED: System.out.println("CHILD_ADDED:"+event.getData()); break; case CHILD_UPDATED: System.out.println("CHILD_UPDATED:"+event.getData()); break; case CHILD_REMOVED: System.out.println("CHILD_REMOVED:"+event.getData()); break; default: break; } } }); Thread.sleep(Integer.MAX_VALUE); } }
|
指定权限
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36
| public class CreateNodeAuth {
public static void main(String[] args) throws Exception { RetryPolicy retryPolicy = new RetryUntilElapsed(5000, 1000); CuratorFramework client = CuratorFrameworkFactory .builder() .connectString("192.168.1.105:2181") .sessionTimeoutMs(5000) .connectionTimeoutMs(5000) .retryPolicy(retryPolicy) .build(); client.start(); ACL aclIp = new ACL(Perms.READ,new Id("ip","192.168.1.105")); ACL aclDigest = new ACL(Perms.READ|Perms.WRITE,new Id("digest",DigestAuthenticationProvider.generateDigest("jike:123456"))); ArrayList<ACL> acls = new ArrayList<ACL>(); acls.add(aclDigest); acls.add(aclIp); String path = client.create() .creatingParentsIfNeeded() .withMode(CreateMode.PERSISTENT) .withACL(acls) .forPath("/jike/3","123".getBytes()); System.out.println(path); Thread.sleep(Integer.MAX_VALUE); } }
|