消费者通过消息选择器来获取自己想要的消息,我们也可以通过另外通配符的方式,来获取自己想要的消息,当然两种还是有区别的。
通配符的方式有三种:
- “.”:用于分隔路径名字
- “*”:用于匹配路径的任何名字
- “>”:用于匹配末尾的名称
示例
消费者1消费a.b.c的消息,消费者2消费a.*.c的消息,所以中间是什么他不在乎,消费者3消费a.开头的消息,所以a.后面是什么他不在乎。生产者往a.b.c和a.b.c发送数据。
生产者代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; boolean useTransaction = false; try { connectionFactory = new ActiveMQConnectionFactory(); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); sendMsg1(session); sendMsg2(session); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
public static void sendMsg1(Session session) throws JMSException { Destination destination = session.createTopic("a.b.c"); MessageProducer producer = session.createProducer(destination); producer.send(session.createTextMessage("this is a.b.c")); producer.close(); }
public static void sendMsg2(Session session) throws JMSException { Destination destination = session.createTopic("a.bb.c"); MessageProducer producer = session.createProducer(destination); producer.send(session.createTextMessage("this is a.bb.c")); producer.close(); }
|
消费者1
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("a.b.c"); // 创建一个消费者 consumer = session.createConsumer(destination); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer1 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
|
消费者2
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("a.*.c"); consumer = session.createConsumer(destination); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer2 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
|
消费者3
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { // 创建一个ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(); // 创建一个Connection connection = connectionFactory.createConnection(); // 启动消息传递的连接 connection.start(); // 创建一个session session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); // 创建一个destination,把消息发送到test.queue destination = session.createTopic("a.>"); consumer = session.createConsumer(destination); // 接收一个消息 while (null != (message = consumer.receive())) { System.out.println("consumer3 receive:" + ((TextMessage) message).getText()); } } catch (JMSException e) { e.printStackTrace(); } finally { try { if (consumer != null) { consumer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
|
运行结果如下:
消费者1只获取1个消息
消费者2获取2个消息
消费者3也获取2个消息