ActiveMQ 消息

消息类型及属性

消息的几种类型,包括TextMessage、StreamMessage、ObjectMessage、MapMessage、BytesMessage、BlobMessage。属性包括boolean、byte、double、float、int、long、object、short、string。下面看看在代码中是怎么发送和接收的。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
MessageProducer producer = null;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
Destination destination = session.createQueue("test.message");
// 创建一个生产者
producer = session.createProducer(destination);
// 属性设置
sendProperty(session, producer);
// 各种消息发送
sendTextMessage(session, producer);
sendStreamMessage(session, producer);
sendObjectMessage(session, producer);
sendMapMessage(session, producer);
sendBytesMessage(session, producer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

private static void sendBytesMessage(Session session, MessageProducer producer) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("hello".getBytes());
producer.send(bytesMessage);
}

private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "张三");
producer.send(mapMessage);
}

private static void sendObjectMessage(Session session, MessageProducer producer) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage(new Person("张三", 18));
producer.send(objectMessage);
}

private static void sendStreamMessage(Session session, MessageProducer producer) throws JMSException {
StreamMessage message = session.createStreamMessage();
message.writeString("test.stream");
producer.send(message);
}

public static void sendProperty(Session session, MessageProducer producer) throws JMSException {
// JMS提供了boolean、byte、double、float、int、long、object、short、string的属性设置类型
// 这边只演示string
// 创建一个消息
Message message = session.createMessage();
// 设置熟悉
message.setStringProperty("name", "张三");
// 发送消息
producer.send(message);
}

public static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
// 创建一个消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("this is test.message");
// 发送消息
producer.send(textMessage);
}

消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
((ActiveMQConnectionFactory) connectionFactory).setTrustAllPackages(true);
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.message");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收一个消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
if (message.propertyExists("name")) {
System.out.println("property receive:" + message.getStringProperty("name"));
}
if (message instanceof TextMessage) {
System.out.println("TextMessage receive:" + ((TextMessage) message).getText());
}
if (message instanceof StreamMessage) {
System.out.println("StreamMessage receive:" + ((StreamMessage) message).readString());
}
if (message instanceof ObjectMessage) {
ObjectMessage objectMessage = (ObjectMessage) message;
Person person = (Person) objectMessage.getObject();
System.out.println("ObjectMessage receive:" + person);
}
if (message instanceof MapMessage) {
System.out.println("MapMessage receive:" + ((MapMessage) message).getString("name"));
}
if (message instanceof BytesMessage) {
byte[] b = new byte[1024];
BytesMessage bytesMessage =(BytesMessage) message;
System.out.println("BytesMessage receive:" +new String(b, 0, bytesMessage.readBytes(b)));
}
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {

}
}

客户端收到消息结果如下:

1
2
3
4
5
6
property receive:张三
TextMessage receive:this is test.message
StreamMessage receive:test.stream
ObjectMessage receive:Person{name='张三', age=18}
MapMessage receive:张三
BytesMessage receive:hello

消息选择器

提供通过对消息头的消息进行过滤,使消费者对自己感兴趣的事件进行订阅,我们看看下面的例子:

生产者发送张三18岁,李四20岁,王五22岁的信息。消费者1接收名字是张三的信息,消费者2接收年龄大于18岁的信息,消费者3接收名字是李四、王五且年龄大于20岁的信息。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageProducer producer = null;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createTopic("test.selector");
// 创建一个生产者
producer = session.createProducer(destination);
// 创建消息
sendPerson1(session, producer);
sendPerson2(session, producer);
sendPerson3(session, producer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

public static void sendPerson1(Session session, MessageProducer producer) throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("张三的个人信息");
textMessage.setStringProperty("name", "张三");
textMessage.setIntProperty("age", 18);
producer.send(textMessage);
}

public static void sendPerson2(Session session, MessageProducer producer) throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("李四的个人信息");
textMessage.setStringProperty("name", "李四");
textMessage.setIntProperty("age", 20);
producer.send(textMessage);
}

public static void sendPerson3(Session session, MessageProducer producer) throws JMSException {
TextMessage textMessage = session.createTextMessage();
textMessage.setText("王五的个人信息");
textMessage.setStringProperty("name", "王五");
textMessage.setIntProperty("age", 22);
producer.send(textMessage);
}

消费者1

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createTopic("test.selector");
// 创建一个消费者
String selector = "name='张三'";
consumer = session.createConsumer(destination, selector);
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

消费者2

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createTopic("test.selector");
// 创建一个消费者
String selector = "age>18";
consumer = session.createConsumer(destination, selector);
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer2 receive:" + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

消费者3

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createTopic("test.selector");
// 创建一个消费者
String selector = "name in ('李四','王五') and age>20";
consumer = session.createConsumer(destination, selector);
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer3 receive:" + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

选择器有点跟sql语法一样。