在传统的单进程编程中,我们使用队列来存储一些数据结构,用来在多线程之间共享或传递数据。
分布式环境下,我们同样需要一个类似单进程队列的组件,用来实现跨进程、跨主机、跨网络的数据共享和数据传递,这就是我们的分布式队列。
zookeeper可以通过顺序节点实现分布式队列。
架构图
图中左侧代表zookeeper集群,右侧代表消费者和生产者。
生产者通过在queue节点下创建顺序节点来存放数据,消费者通过读取顺序节点来消费数据。
流程图
offer核心算法流程
poll核心算法流程
代码实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149
|
public class DistributedSimpleQueue<T> {
protected final ZkClient zkClient; protected final String root; protected static final String Node_NAME = "n_";
public DistributedSimpleQueue(ZkClient zkClient, String root) { this.zkClient = zkClient; this.root = root; }
public int size() { return zkClient.getChildren(root).size(); }
public boolean isEmpty() { return zkClient.getChildren(root).size() == 0; }
public boolean offer(T element) throws Exception{
String nodeFullPath = root .concat( "/" ).concat( Node_NAME ); try { zkClient.createPersistentSequential(nodeFullPath , element); }catch (ZkNoNodeException e) { zkClient.createPersistent(root); offer(element); } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); } return true; }
public T poll() throws Exception { try {
List<String> list = zkClient.getChildren(root); if (list.size() == 0) { return null; }
Collections.sort(list, new Comparator<String>() { public int compare(String lhs, String rhs) { return getNodeNumber(lhs, Node_NAME).compareTo(getNodeNumber(rhs, Node_NAME)); } });
for ( String nodeName : list ){
String nodeFullPath = root.concat("/").concat(nodeName); try { T node = (T) zkClient.readData(nodeFullPath); zkClient.delete(nodeFullPath); return node; } catch (ZkNoNodeException e) { } } return null; } catch (Exception e) { throw ExceptionUtil.convertToRuntimeException(e); }
}
private String getNodeNumber(String str, String nodeName) { int index = str.lastIndexOf(nodeName); if (index >= 0) { index += Node_NAME.length(); return index <= str.length() ? str.substring(index) : ""; } return str;
}
} public class User implements Serializable { String name; String id; public String getName() { return name; } public void setName(String name) { this.name = name; } public String getId() { return id; } public void setId(String id) { this.id = id; }
} public class TestDistributedSimpleQueue {
public static void main(String[] args) { ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer()); DistributedSimpleQueue<User> queue = new DistributedSimpleQueue<User>(zkClient,"/Queue"); User user1 = new User(); user1.setId("1"); user1.setName("xiao wang"); User user2 = new User(); user2.setId("2"); user2.setName("xiao wang"); try { queue.offer(user1); queue.offer(user2); User u1 = (User) queue.poll(); User u2 = (User) queue.poll(); if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){ System.out.println("Success!"); } } catch (Exception e) { e.printStackTrace(); } } }
|
上面实现了一个简单分布式队列,在此基础上,我们再扩展一个阻塞分布式队列。代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97
|
public class DistributedBlockingQueue<T> extends DistributedSimpleQueue<T>{ public DistributedBlockingQueue(ZkClient zkClient, String root) { super(zkClient, root);
}
@Override public T poll() throws Exception {
while (true){ final CountDownLatch latch = new CountDownLatch(1); final IZkChildListener childListener = new IZkChildListener() { public void handleChildChange(String parentPath, List<String> currentChilds) throws Exception { latch.countDown(); } }; zkClient.subscribeChildChanges(root, childListener); try{ T node = super.poll(); if ( node != null ){ return node; } else { latch.await(); } } finally { zkClient.unsubscribeChildChanges(root, childListener); } } }
} public class TestDistributedBlockingQueue {
public static void main(String[] args) { ScheduledExecutorService delayExector = Executors.newScheduledThreadPool(1); int delayTime = 5; ZkClient zkClient = new ZkClient("192.168.1.105:2181", 5000, 5000, new SerializableSerializer()); final DistributedBlockingQueue<User> queue = new DistributedBlockingQueue<User>(zkClient,"/Queue"); final User user1 = new User(); user1.setId("1"); user1.setName("xiao wang"); final User user2 = new User(); user2.setId("2"); user2.setName("xiao wang"); try { delayExector.schedule(new Runnable() { public void run() { try { queue.offer(user1); queue.offer(user2); } catch (Exception e) { e.printStackTrace(); } } }, delayTime , TimeUnit.SECONDS); System.out.println("ready poll!"); User u1 = (User) queue.poll(); User u2 = (User) queue.poll(); if (user1.getId().equals(u1.getId()) && user2.getId().equals(u2.getId())){ System.out.println("Success!"); } } catch (Exception e) { e.printStackTrace(); } finally{ delayExector.shutdown(); try { delayExector.awaitTermination(2, TimeUnit.SECONDS); } catch (InterruptedException e) { } } } }
|