ActiveMQ request/response

官方文章参考: how-should-i-implement-request-response-with-jms

在生产过程中,我们有可能需要消费者消费完后,把消息发给对应的生产者。
这种请求-响应的最佳方法是在启动时为每个生产者创建一个临时队列和使用者,将每个消息上的JMSReplyTo属性设置为临时队列,然后在每个消息上使用correlationID将请求消息与响应消息关联起来。这避免了为每个请求创建和关闭使用者的开销(这是昂贵的)。这也意味着你可以共享同一个生产者。
消费者消费完消息后,通过JMSReplyTo属性和correlationID将消息返回给生产者。
流程图如下:
image.png

生产者

生产者重要的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
// 创建一个临时的队列
Queue temporaryQueue = session.createTemporaryQueue();
// 创建一个使用者
consumer = session.createConsumer(temporaryQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("response1 receive:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
message.setJMSCorrelationID(UUID.randomUUID().toString());
message.setJMSReplyTo(temporaryQueue);

整体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageProducer producer = null;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.request.reply");
// 创建一个生产者
producer = session.createProducer(destination);
// 创建一个消息
message = session.createTextMessage("this is test.request.reply1");

// 创建一个临时的队列
Queue temporaryQueue = session.createTemporaryQueue();
// 创建一个使用者
consumer = session.createConsumer(temporaryQueue);
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("response1 receive:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
}
});
message.setJMSCorrelationID(UUID.randomUUID().toString());
message.setJMSReplyTo(temporaryQueue);
// 发送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {

}
}

消费者

消费者主要的代码如下:

1
2
3
4
5
6
7
8
9
10
11
public void onMessage(Message message) {
try {
System.out.println("consumer receive:" + ((TextMessage) message).getText());
TextMessage response = finalSession.createTextMessage();
response.setJMSCorrelationID(message.getJMSCorrelationID());
response.setText(((TextMessage) message).getText());
finalProducer.send(message.getJMSReplyTo(), response);
} catch (JMSException e) {
e.printStackTrace();
}
}

整体代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
MessageProducer producer = null;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.request.reply");
// 创建一个消费者
consumer = session.createConsumer(destination);
producer = session.createProducer(null);
// 接收一个消息
Session finalSession = session;
MessageProducer finalProducer = producer;
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("consumer receive:" + ((TextMessage) message).getText());
TextMessage response = finalSession.createTextMessage();
response.setJMSCorrelationID(message.getJMSCorrelationID());
response.setText(((TextMessage) message).getText());
finalProducer.send(message.getJMSReplyTo(), response);
} catch (JMSException e) {
e.printStackTrace();
}
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {

}
}