ActiveMQ 通配符

消费者通过消息选择器来获取自己想要的消息,我们也可以通过另外通配符的方式,来获取自己想要的消息,当然两种还是有区别的。
通配符的方式有三种:

  • “.”:用于分隔路径名字
  • “*”:用于匹配路径的任何名字
  • “>”:用于匹配末尾的名称

示例

消费者1消费a.b.c的消息,消费者2消费a.*.c的消息,所以中间是什么他不在乎,消费者3消费a.开头的消息,所以a.后面是什么他不在乎。生产者往a.b.c和a.b.c发送数据。

生产者代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建消息
sendMsg1(session);
sendMsg2(session);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

public static void sendMsg1(Session session) throws JMSException {
Destination destination = session.createTopic("a.b.c");
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage("this is a.b.c"));
producer.close();
}

public static void sendMsg2(Session session) throws JMSException {
Destination destination = session.createTopic("a.bb.c");
MessageProducer producer = session.createProducer(destination);
producer.send(session.createTextMessage("this is a.bb.c"));
producer.close();
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createTopic("a.b.c");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createTopic("a.*.c");
consumer = session.createConsumer(destination);
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer2 receive:" + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

消费者3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createTopic("a.>");
consumer = session.createConsumer(destination);
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer3 receive:" + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

运行结果如下:
消费者1只获取1个消息
image.png
消费者2获取2个消息
image.png
消费者3也获取2个消息
image.png