考虑7*24小时向外提供服务的系统,不能有单点故障,于是我们使用集群,采用的是Master+Slave。集群中有一台主机和多台备机,由主机向外提供服务,备机监听主机状态,一旦主机宕机,备机必需迅速接管主机继续向外提供服务。在这个过程中,从备机选出一台机作为主机的过程,就是Master选举。
架构图
左边是ZooKeeper集群,右边是3台工作服务器。工作服务器启动时,会去ZooKeeper的Servers节点下创建临时节点,并把基本信息写入临时节点。这个过程叫服务注册,系统中的其他服务可以通过获取Servers节点的子节点列表,来了解当前系统哪些服务器可用,这该过程叫做服务发现。接着这些服务器会尝试创建Master临时节点,谁创建成功谁就是Master,其他的两台就作为Slave。所有的Work Server必需关注Master节点的删除事件。通过监听Master节点的删除事件,来了解Master服务器是否宕机(创建临时节点的服务器一旦宕机,它所创建的临时节点即会自动删除)。一旦Master服务器宕机,必需开始新一轮的Master选举。
流程图
核心类图
WorkServer对应架构图的WorkServer,是主工作类;
RunningData用来描述WorkServer的基本信息;
LeaderSelectorZkClient作为调度器来启动和停止WorkServer;
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230
|
public class RunningData implements Serializable {
private static final long serialVersionUID = 4260577459043203630L;
private Long cid; private String name; public Long getCid() { return cid; } public void setCid(Long cid) { this.cid = cid; } public String getName() { return name; } public void setName(String name) { this.name = name; }
}
public class WorkServer {
private volatile boolean running = false;
private ZkClient zkClient; private static final String MASTER_PATH = "/master"; private IZkDataListener dataListener; private RunningData serverData; private RunningData masterData; private ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); private int delayTime = 5;
public WorkServer(RunningData rd) { this.serverData = rd; this.dataListener = new IZkDataListener() {
public void handleDataDeleted(String dataPath) throws Exception {
if (masterData != null && masterData.getName().equals(serverData.getName())){ takeMaster(); } else { delayExector.schedule(new Runnable(){ public void run(){ takeMaster(); } }, delayTime, TimeUnit.SECONDS); }
}
public void handleDataChange(String dataPath, Object data) throws Exception {
} }; }
public ZkClient getZkClient() { return zkClient; }
public void setZkClient(ZkClient zkClient) { this.zkClient = zkClient; }
public void start() throws Exception { if (running) { throw new Exception("server has startup..."); } running = true; zkClient.subscribeDataChanges(MASTER_PATH, dataListener); takeMaster();
}
public void stop() throws Exception { if (!running) { throw new Exception("server has stoped"); } running = false; delayExector.shutdown(); zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener); releaseMaster();
}
private void takeMaster() { if (!running) return;
try { zkClient.create(MASTER_PATH, serverData, CreateMode.EPHEMERAL); masterData = serverData; System.out.println(serverData.getName()+" is master");
delayExector.schedule(new Runnable() { public void run() { if (checkMaster()){ releaseMaster(); } } }, 5, TimeUnit.SECONDS); } catch (ZkNodeExistsException e) { RunningData runningData = zkClient.readData(MASTER_PATH, true); if (runningData == null) { takeMaster(); } else { masterData = runningData; } } catch (Exception e) { }
}
private void releaseMaster() { if (checkMaster()) { zkClient.delete(MASTER_PATH); } }
private boolean checkMaster() { try { RunningData eventData = zkClient.readData(MASTER_PATH); masterData = eventData; if (masterData.getName().equals(serverData.getName())) { return true; } return false; } catch (ZkNoNodeException e) { return false; } catch (ZkInterruptedException e) { return checkMaster(); } catch (ZkException e) { return false; } }
}
public class LeaderSelectorZkClient {
private static final int CLIENT_QTY = 10; private static final String ZOOKEEPER_SERVER = "192.168.1.105:2181"; public static void main(String[] args) throws Exception { List<ZkClient> clients = new ArrayList<ZkClient>(); List<WorkServer> workServers = new ArrayList<WorkServer>();
try { for ( int i = 0; i < CLIENT_QTY; ++i ) { ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer()); clients.add(client); RunningData runningData = new RunningData(); runningData.setCid(Long.valueOf(i)); runningData.setName("Client #" + i); WorkServer workServer = new WorkServer(runningData); workServer.setZkClient(client); workServers.add(workServer); workServer.start(); }
System.out.println("敲回车键退出!\n"); new BufferedReader(new InputStreamReader(System.in)).readLine();
} finally {
System.out.println("Shutting down...");
for ( WorkServer workServer : workServers ) { try { workServer.stop(); } catch (Exception e) { e.printStackTrace(); } }
for ( ZkClient client : clients ) { try { client.close(); } catch (Exception e) { e.printStackTrace(); } } } } }
|