当消费者收到消息后,需要对消息进行确认,队列才会把这个消息删除。如果消息处理中发生了异常需要拒绝消息怎么办呢?在这个章节中,我们看到了没确认消息时,如果断开了和rabbitmq的连接,消息会回到待发送那边,等待其他消费者,虽然我们可以通过关闭连接来拒绝消息,但是频繁的频繁的建立连接、关闭连接,会增加rabbitmq的负担。rabbitmq提供了另外一种优雅的方式来拒绝消息,方法如下:
1
| void basicReject(long deliveryTag, boolean requeue) throws IOException
|
第一个参数deliveryTag,消息确认中提过,传递标识。第二个参数requeue,为true的话,消息会重新发给下一个消费者,为false的话,就不发给消费者,相当于说,消息我确认了。
重新投递
重新投递,就是requeue为true的情况。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("reject", false, false, false, null); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("reject message '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true); }; channel.basicConsume("reject", false, deliverCallback1, consumerTag -> { }); }
|
我们通过web控制台发送一条消息,在控制台打印如下,由于当前消费者把消息拒绝了,所以rabbitmq重新投递,又发给这个消费者,消费者又拒绝,所以一直打印,相对于死循环了。从deliveryTag可以看出,这条消息每次重新投递,就会递增。
消费者端口连接,web控制有一条消息
不重新投递
不重新投递,就是requeue为false的情况。把上面的代码requeue设置为false再运行一次。
从控制台可以看出,就打印了一次。
而web控制台,消息数是0
批量拒绝
rabbitmq提供了批量确认,也提供了批量拒绝。方法如下,deliveryTag是传递标识,multiple是否批量确认,requeue是否重新投递。
1 2
| void basicNack(long deliveryTag, boolean multiple, boolean requeue) throws IOException;
|
当multiple为true的时候,开启批量确认,我们看看下面的例子。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("reject", false, false, false, null); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("reject message '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) { channel.basicNack(delivery.getEnvelope().getDeliveryTag(), true, false); } }; channel.basicConsume("reject", false, deliverCallback1, consumerTag -> { }); }
|
先发送两条消息
web控制台如下:
再发送一条消息
web控制台
虽然我们只拒绝了第三条,还是把所有的都拒绝了。
如果把multiple设置为false,可以看到,仅仅拒绝了第三条消息,这边不做演示。