官方文章参考: how-should-i-implement-request-response-with-jms
在生产过程中,我们有可能需要消费者消费完后,把消息发给对应的生产者。
这种请求-响应的最佳方法是在启动时为每个生产者创建一个临时队列和使用者,将每个消息上的JMSReplyTo属性设置为临时队列,然后在每个消息上使用correlationID将请求消息与响应消息关联起来。这避免了为每个请求创建和关闭使用者的开销(这是昂贵的)。这也意味着你可以共享同一个生产者。
消费者消费完消息后,通过JMSReplyTo属性和correlationID将消息返回给生产者。
流程图如下:
生产者
生产者重要的代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| Queue temporaryQueue = session.createTemporaryQueue();
consumer = session.createConsumer(temporaryQueue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("response1 receive:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); message.setJMSCorrelationID(UUID.randomUUID().toString()); message.setJMSReplyTo(temporaryQueue);
|
整体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageProducer producer = null; MessageConsumer consumer = null; Message message; boolean useTransaction = false; try { connectionFactory = new ActiveMQConnectionFactory(); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test.request.reply"); producer = session.createProducer(destination); message = session.createTextMessage("this is test.request.reply1");
Queue temporaryQueue = session.createTemporaryQueue(); consumer = session.createConsumer(temporaryQueue); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("response1 receive:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); message.setJMSCorrelationID(UUID.randomUUID().toString()); message.setJMSReplyTo(temporaryQueue); producer.send(message); } catch (JMSException e) { e.printStackTrace(); } finally {
} }
|
消费者
消费者主要的代码如下:
1 2 3 4 5 6 7 8 9 10 11
| public void onMessage(Message message) { try { System.out.println("consumer receive:" + ((TextMessage) message).getText()); TextMessage response = finalSession.createTextMessage(); response.setJMSCorrelationID(message.getJMSCorrelationID()); response.setText(((TextMessage) message).getText()); finalProducer.send(message.getJMSReplyTo(), response); } catch (JMSException e) { e.printStackTrace(); } }
|
整体代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| public static void main(String[] args) { ConnectionFactory connectionFactory; Connection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; Message message; boolean useTransaction = false; MessageProducer producer = null; try { connectionFactory = new ActiveMQConnectionFactory(); connection = connectionFactory.createConnection(); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test.request.reply"); consumer = session.createConsumer(destination); producer = session.createProducer(null); Session finalSession = session; MessageProducer finalProducer = producer; consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { System.out.println("consumer receive:" + ((TextMessage) message).getText()); TextMessage response = finalSession.createTextMessage(); response.setJMSCorrelationID(message.getJMSCorrelationID()); response.setText(((TextMessage) message).getText()); finalProducer.send(message.getJMSReplyTo(), response); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } finally {
} }
|