Zookeeper 数据发布订阅

多个订阅者对象同时监听同一主题对象,主题对象状态变化时通知所有订阅者对象更新自身状态。发布方和订阅方独立封装、独立改变,当一个对象的改变需要同时改变其他对象,并且它不知道有多少个对象需要改变时,可以使用发布订阅模式。

在分布式系统中的顶级应用有配置管理和服务发现。

配置管理:指集群中的机器拥有某些配置,并且这些配置信息需要动态地改变,那么我们就可以使用发布订阅模式把配置做统一的管理,让这些机器订阅配置信息的改变,但是配置改变时这些机器得到通知并更新自己的配置。

服务发现:指对集群中的服务上下线做统一管理,每个工作服务器都可以作为数据的发布方,向集群注册自己的基本信息,而让某些监控服务器作为订阅方,订阅工作服务器的基本信息。当工作服务器的基本信息改变时,如服务上下线、服务器的角色或服务范围变更,那么监控服务器可以得到通知并响应这些变化。

架构图

clipboard.png

左边代表Zookeeper集群,右侧代表服务器集群。其中前3个为工作服务器Work Server,绿色为管理服务器Manage Server,最下面的是控制服务器Control Server。

config节点,用于配置管理。Manage Server通过config节点下发配置信息,Work Server可以通过订阅config节点的改变来更新自己的配置。

Servers节点,用于服务发现,每个Work Server在启动时都会在Servers节点下创建一个临时节点,Manage Server充当monitor,通过监听Servers节点的子节点列表的变化来更新自己内存中工作服务器列表的信息。

通过Control Server由Command节点作为中介,向Manage Server发送控制指令。Control Server向command节点写入命令信息,Manage Server订阅command节点的数据改变来监听并执行命令。

流程图

Manage Server程序主体流程

clipboard.png

核心类图

clipboard.png

WorkServer对应架构图的Work Server;
ManageServer对应架构图的Manage Server;
ServerConfig用于记录Work Server的配置信息;
ServerData用于记录Work Server的基本信息;
SubscribeZkClient作为示例程序入口服务站启动Work Server和Manage Server

实现代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
/**
* 配置信息
*/
public class ServerConfig {

private String dbUrl;
private String dbPwd;
private String dbUser;
public String getDbUrl() {
return dbUrl;
}
public void setDbUrl(String dbUrl) {
this.dbUrl = dbUrl;
}
public String getDbPwd() {
return dbPwd;
}
public void setDbPwd(String dbPwd) {
this.dbPwd = dbPwd;
}
public String getDbUser() {
return dbUser;
}
public void setDbUser(String dbUser) {
this.dbUser = dbUser;
}

@Override
public String toString() {
return "ServerConfig [dbUrl=" + dbUrl + ", dbPwd=" + dbPwd
+ ", dbUser=" + dbUser + "]";
}

}
/**
* 服务器基本信息
*/
public class ServerData {

private String address;
private Integer id;
private String name;

public String getAddress() {
return address;
}
public void setAddress(String address) {
this.address = address;
}
public Integer getId() {
return id;
}
public void setId(Integer id) {
this.id = id;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}

@Override
public String toString() {
return "ServerData [address=" + address + ", id=" + id + ", name="
+ name + "]";
}

}
/**
* 代表工作服务器
*/
public class WorkServer {

private ZkClient zkClient;
// ZooKeeper
private String configPath;
// ZooKeeper集群中servers节点的路径
private String serversPath;
// 当前工作服务器的基本信息
private ServerData serverData;
// 当前工作服务器的配置信息
private ServerConfig serverConfig;
private IZkDataListener dataListener;

public WorkServer(String configPath, String serversPath,
ServerData serverData, ZkClient zkClient, ServerConfig initConfig) {
this.zkClient = zkClient;
this.serversPath = serversPath;
this.configPath = configPath;
this.serverConfig = initConfig;
this.serverData = serverData;

this.dataListener = new IZkDataListener() {

public void handleDataDeleted(String dataPath) throws Exception {

}

public void handleDataChange(String dataPath, Object data)
throws Exception {
String retJson = new String((byte[])data);
ServerConfig serverConfigLocal = (ServerConfig)JSON.parseObject(retJson,ServerConfig.class);
updateConfig(serverConfigLocal);
System.out.println("new Work server config is:"+serverConfig.toString());

}
};

}

// 启动服务器
public void start() {
System.out.println("work server start...");
initRunning();
}

// 停止服务器
public void stop() {
System.out.println("work server stop...");
zkClient.unsubscribeDataChanges(configPath, dataListener); // 取消监听config节点
}

// 服务器初始化
private void initRunning() {
registMe(); // 注册自己
zkClient.subscribeDataChanges(configPath, dataListener); // 订阅config节点的改变事件
}

// 启动时向zookeeper注册自己的注册函数
private void registMe() {
String mePath = serversPath.concat("/").concat(serverData.getAddress());

try {
zkClient.createEphemeral(mePath, JSON.toJSONString(serverData)
.getBytes());
} catch (ZkNoNodeException e) {
zkClient.createPersistent(serversPath, true);
registMe();
}
}

// 更新自己的配置信息
private void updateConfig(ServerConfig serverConfig) {
this.serverConfig = serverConfig;
}

}
public class ManageServer {

// zookeeper的servers节点路径
private String serversPath;
// zookeeper的command节点路径
private String commandPath;
// zookeeper的config节点路径
private String configPath;
private ZkClient zkClient;
private ServerConfig config;
// 用于监听servers节点的子节点列表的变化
private IZkChildListener childListener;
// 用于监听command节点数据内容的变化
private IZkDataListener dataListener;
// 工作服务器的列表
private List<String> workServerList;

public ManageServer(String serversPath, String commandPath,
String configPath, ZkClient zkClient, ServerConfig config) {
this.serversPath = serversPath;
this.commandPath = commandPath;
this.zkClient = zkClient;
this.config = config;
this.configPath = configPath;
this.childListener = new IZkChildListener() {

public void handleChildChange(String parentPath,
List<String> currentChilds) throws Exception {
// TODO Auto-generated method stub
workServerList = currentChilds; // 更新内存中工作服务器列表

System.out.println("work server list changed, new list is ");
execList();

}
};
this.dataListener = new IZkDataListener() {

public void handleDataDeleted(String dataPath) throws Exception {
// TODO Auto-generated method stub
// ignore;
}

public void handleDataChange(String dataPath, Object data)
throws Exception {
// TODO Auto-generated method stub
String cmd = new String((byte[]) data);
System.out.println("cmd:"+cmd);
exeCmd(cmd); // 执行命令

}
};

}

private void initRunning() {
zkClient.subscribeDataChanges(commandPath, dataListener);
zkClient.subscribeChildChanges(serversPath, childListener);
}

/*
* 1: list 2: create 3: modify
*/
private void exeCmd(String cmdType) {
if ("list".equals(cmdType)) {
execList();

} else if ("create".equals(cmdType)) {
execCreate();
} else if ("modify".equals(cmdType)) {
execModify();
} else {
System.out.println("error command!" + cmdType);
}

}

// 列出工作服务器列表
private void execList() {
System.out.println(workServerList.toString());
}

// 创建config节点
private void execCreate() {
if (!zkClient.exists(configPath)) {
try {
zkClient.createPersistent(configPath, JSON.toJSONString(config)
.getBytes());
} catch (ZkNodeExistsException e) {
zkClient.writeData(configPath, JSON.toJSONString(config)
.getBytes()); // config节点已经存在,则写入内容就可以了
} catch (ZkNoNodeException e) {
String parentDir = configPath.substring(0,
configPath.lastIndexOf('/'));
zkClient.createPersistent(parentDir, true);
execCreate();
}
}
}

// 修改config节点内容
private void execModify() {
// 我们随意修改config的一个属性就可以了
config.setDbUser(config.getDbUser() + "_modify");

try {
zkClient.writeData(configPath, JSON.toJSONString(config).getBytes());
} catch (ZkNoNodeException e) {
execCreate(); // 写入时config节点还未存在,则创建它
}
}

// 启动工作服务器
public void start() {
initRunning();
}

// 停止工作服务器
public void stop() {
zkClient.unsubscribeChildChanges(serversPath, childListener);
zkClient.unsubscribeDataChanges(commandPath, dataListener);
}

}
/**
* 调度类
*/
public class SubscribeZkClient {

private static final int CLIENT_QTY = 5; // Work Server数量

private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181";

private static final String CONFIG_PATH = "/config";
private static final String COMMAND_PATH = "/command";
private static final String SERVERS_PATH = "/servers";

public static void main(String[] args) throws Exception {

List<ZkClient> clients = new ArrayList<ZkClient>();
List<WorkServer> workServers = new ArrayList<WorkServer>();
ManageServer manageServer = null;

try {

// 创建一个默认的配置
ServerConfig initConfig = new ServerConfig();
initConfig.setDbPwd("123456");
initConfig.setDbUrl("jdbc:mysql://localhost:3306/mydb");
initConfig.setDbUser("root");

// 实例化一个Manage Server
ZkClient clientManage = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
manageServer = new ManageServer(SERVERS_PATH, COMMAND_PATH,CONFIG_PATH,clientManage,initConfig);
manageServer.start(); // 启动Manage Server

// 创建指定个数的工作服务器
for ( int i = 0; i < CLIENT_QTY; ++i ) {
ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new BytesPushThroughSerializer());
clients.add(client);
ServerData serverData = new ServerData();
serverData.setId(i);
serverData.setName("WorkServer#"+i);
serverData.setAddress("192.168.1."+i);

WorkServer workServer = new WorkServer(CONFIG_PATH, SERVERS_PATH, serverData, client, initConfig);
workServers.add(workServer);
workServer.start(); // 启动工作服务器

}

System.out.println("敲回车键退出!\n");
new BufferedReader(new InputStreamReader(System.in)).readLine();

} finally {
System.out.println("Shutting down...");

for ( WorkServer workServer : workServers ) {
try {
workServer.stop();
} catch (Exception e) {
e.printStackTrace();
}
}

for ( ZkClient client : clients ) {
try {
client.close();
} catch (Exception e) {
e.printStackTrace();
}

}
}
}

}

我们用zookeeper的命令行客户端向Manage Server下达指令。
执行zkCli命令:

1
create /command list

java控制台输出:

1
2
cmd:list
[192.168.1.1, 192.168.1.0, 192.168.1.3, 192.168.1.2, 192.168.1.4]

执行zkCli命令:

1
set /command create

java控制台输出:

1
2
3
4
5
6
cmd:create
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456, dbUser=root]

执行zkCli命令:

1
set /command modify

java控制台输出:

1
2
3
4
5
6
cmd:modify
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]
new Work server config is: ServerConfig [dbUrl=jdbc:mysql://localhost:3306/mydb, dbPwd=123456_modify, dbUser=root]