Zookeeper 分布式队列

在传统的单进程编程中,我们使用队列来存储一些数据结构,用来在多线程之间共享或传递数据。

分布式环境下,我们同样需要一个类似单进程队列的组件,用来实现跨进程、跨主机、跨网络的数据共享和数据传递,这就是我们的分布式队列。

zookeeper可以通过顺序节点实现分布式队列。

架构图

clipboard.png

图中左侧代表zookeeper集群,右侧代表消费者和生产者。
生产者通过在queue节点下创建顺序节点来存放数据,消费者通过读取顺序节点来消费数据。

流程图

offer核心算法流程

clipboard.png

poll核心算法流程

clipboard.png

代码实现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
/**
* 简单分布式队列
*/
public class DistributedSimpleQueue<T> {

protected final ZkClient zkClient;
// queue节点
protected final String root;
// 顺序节点前缀
protected static final String Node_NAME = "n_";


public DistributedSimpleQueue(ZkClient zkClient, String root) {
this.zkClient = zkClient;
this.root = root;
}

// 判断队列大小
public int size() {
return zkClient.getChildren(root).size();
}

// 判断队列是否为空
public boolean isEmpty() {
return zkClient.getChildren(root).size() == 0;
}

// 向队列提供数据
public boolean offer(T element) throws Exception{

// 创建顺序节点
String nodeFullPath = root .concat( "/" ).concat( Node_NAME );
try {
zkClient.createPersistentSequential(nodeFullPath , element);
}catch (ZkNoNodeException e) {
zkClient.createPersistent(root);
offer(element);
} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}
return true;
}


// 从队列取数据
public T poll() throws Exception {

try {

// 获取所有顺序节点
List<String> list = zkClient.getChildren(root);
if (list.size() == 0) {
return null;
}

// 排序
Collections.sort(list, new Comparator<String>() {
public int compare(String lhs, String rhs) {
return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME));
}
});

// 循环每个顺序节点名
for ( String nodeName : list ){

// 构造出顺序节点的完整路径
String nodeFullPath = root.concat("/").concat(nodeName);
try {
// 读取顺序节点的内容
T node = (T) zkClient.readData(nodeFullPath);
// 删除顺序节点
zkClient.delete(nodeFullPath);
return node;
} catch (ZkNoNodeException e) {
// ignore 由其他客户端把这个顺序节点消费掉了
}
}

return null;

} catch (Exception e) {
throw ExceptionUtil.convertToRuntimeException(e);
}

}

private String getNodeNumber(String str, String nodeName) {
int index = str.lastIndexOf(nodeName);
if (index >= 0) {
index += Node_NAME.length();
return index <= str.length() ? str.substring(index) : "";
}
return str;

}

}
public class User implements Serializable {

String name;
String id;

public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getId() {
return id;
}
public void setId(String id) {
this.id = id;
}

}
public class TestDistributedSimpleQueue {

public static void main(String[] args) {


ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue");

User user1 = new User();
user1.setId("1");
user1.setName("xiao wang");

User user2 = new User();
user2.setId("2");
user2.setName("xiao wang");

try {
queue.offer(user1);
queue.offer(user2);
User u1 = (User) queue.poll();
User u2 = (User) queue.poll();

if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
System.out.println("Success!");
}

} catch (Exception e) {
e.printStackTrace();
}

}

}

上面实现了一个简单分布式队列,在此基础上,我们再扩展一个阻塞分布式队列。代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
/**
* 阻塞分布式队列
* 扩展自简单分布式队列,在拿不到队列数据时,进行阻塞直到拿到数据
*/
public class DistributedBlockingQueue<T> extends DistributedSimpleQueue<T>{


public DistributedBlockingQueue(ZkClient zkClient, String root) {
super(zkClient, root);

}


@Override
public T poll() throws Exception {

while (true){ // 结束在latch上的等待后,再来一次

final CountDownLatch latch = new CountDownLatch(1);
final IZkChildListener childListener = new IZkChildListener() {
public void handleChildChange(String parentPath, List<String> currentChilds)
throws Exception {
latch.countDown(); // 队列有变化,结束latch上的等待
}
};
zkClient.subscribeChildChanges(root, childListener);
try{
T node = super.poll(); // 获取队列数据
if ( node != null ){
return node;
} else {
latch.await(); // 拿不到队列数据,则在latch上await
}
} finally {
zkClient.unsubscribeChildChanges(root, childListener);
}

}
}

}
public class TestDistributedBlockingQueue {

public static void main(String[] args) {


ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1);
int delayTime = 5;

ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer());
final DistributedBlockingQueue<User> queue = new DistributedBlockingQueue<User>(zkClient,"/Queue");

final User user1 = new User();
user1.setId("1");
user1.setName("xiao wang");

final User user2 = new User();
user2.setId("2");
user2.setName("xiao wang");

try {

delayExector.schedule(new Runnable() {

public void run() {
try {
queue.offer(user1);
queue.offer(user2);
} catch (Exception e) {
e.printStackTrace();
}

}
}, delayTime , TimeUnit.SECONDS);

System.out.println("ready poll!");
User u1 = (User) queue.poll();
User u2 = (User) queue.poll();

if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){
System.out.println("Success!");
}

} catch (Exception e) {
e.printStackTrace();
} finally{
delayExector.shutdown();
try {
delayExector.awaitTermination(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
}

}

}

}