通过设置消息的属性来的。
消息的属性:
属性值 |
描述 |
Delivery mode |
是否持久化,1为不持久化,2为持久化 |
Type |
应用程序特定的消息类型 |
Headers |
用户自定义的其他属性 |
Content type |
内容类型。比如application/json |
Content encoding |
内容编码,比如“gzip” |
Message ID |
消息ID |
Correlation ID |
用于request/response |
Reply To |
携带响应队列名称 |
Expiration |
消息过期时间 |
Timestamp |
消息的产生时间 |
User ID |
用于验证发布消息的用户身份 |
App ID |
应用程序的名称 |
request/response示例
大体步骤如下:
- 客户端发携带两个参数,replyTo和correlationId。replyTo是具有exclusive属性的队列,用于处理消费者返回的数据。correlationId是为每个请求设置一个唯一的值。
- 把请求发送给rpc队列
- 服务端处理完数据,通过replyTo的队列把消息发给客户端。
- 客户端通过correlationId消息,请求中的值相匹配,进行处理。
客户端
客户端发送从0到9的数字给服务端,服务端返回数字的平方,客户端获取到输出:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| public final static String requestQueueName = "rpc_queue";
public static void main(String[] args) throws IOException, TimeoutException, InterruptedException { for (int i = 0; i < 10; i++) { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); final String corrId = UUID.randomUUID().toString(); String replyQueueName = channel.queueDeclare().getQueue(); AMQP.BasicProperties props = new AMQP.BasicProperties .Builder() .correlationId(corrId) .replyTo(replyQueueName) .build(); String message = i + ""; channel.basicPublish("", requestQueueName, props, message.getBytes("UTF-8")); channel.basicConsume(replyQueueName, true, (consumerTag, delivery) -> { if (delivery.getProperties().getCorrelationId().equals(corrId)) { System.out.println("send " + message + " and receive message:" + new String(delivery.getBody(), "UTF-8")); } }, consumerTag -> { }); } TimeUnit.SECONDS.sleep(10); }
|
服务端
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare(RPCClient.requestQueueName, false, false, false, null); DeliverCallback deliverCallback = (consumerTag, delivery) -> { AMQP.BasicProperties replyProps = new AMQP.BasicProperties .Builder() .correlationId(delivery.getProperties().getCorrelationId()) .build(); String message = new String(delivery.getBody(), "UTF-8"); System.out.println("RPCServer Received '" + message); String response = "" + Integer.valueOf(message) * Integer.valueOf(message); channel.basicPublish("", delivery.getProperties().getReplyTo(), replyProps, response.getBytes("UTF-8")); channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false); }; channel.basicConsume(RPCClient.requestQueueName, false, deliverCallback, consumerTag -> { }); }
|
客户端运行结果如下:
服务端运行结果如下: