RabbitMQ 消息拒绝

当消费者收到消息后,需要对消息进行确认,队列才会把这个消息删除。如果消息处理中发生了异常需要拒绝消息怎么办呢?在这个章节中,我们看到了没确认消息时,如果断开了和rabbitmq的连接,消息会回到待发送那边,等待其他消费者,虽然我们可以通过关闭连接来拒绝消息,但是频繁的频繁的建立连接、关闭连接,会增加rabbitmq的负担。rabbitmq提供了另外一种优雅的方式来拒绝消息,方法如下:

1
void basicReject(long deliveryTag, boolean requeue) throws IOException

第一个参数deliveryTag,消息确认中提过,传递标识。第二个参数requeue,为true的话,消息会重新发给下一个消费者,为false的话,就不发给消费者,相当于说,消息我确认了。

重新投递

重新投递,就是requeue为true的情况。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
public static void main(String[] args) throws IOException, TimeoutException {
// 声明一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 创建一个与rabbitmq服务器的连接
Connection connection = factory.newConnection();
// 创建一个Channel
Channel channel = connection.createChannel();
// 通过Channel定义队列
channel.queueDeclare("reject", false, false, false, null);
// 异步回调处理
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("reject message '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
channel.basicReject(delivery.getEnvelope().getDeliveryTag(),true);
};
// 接收消息
channel.basicConsume("reject", false, deliverCallback1, consumerTag -> {
});
}

我们通过web控制台发送一条消息,在控制台打印如下,由于当前消费者把消息拒绝了,所以rabbitmq重新投递,又发给这个消费者,消费者又拒绝,所以一直打印,相对于死循环了。从deliveryTag可以看出,这条消息每次重新投递,就会递增。
image.png
消费者端口连接,web控制有一条消息
image.png

不重新投递

不重新投递,就是requeue为false的情况。把上面的代码requeue设置为false再运行一次。
从控制台可以看出,就打印了一次。
image.png
而web控制台,消息数是0
image.png

批量拒绝

rabbitmq提供了批量确认,也提供了批量拒绝。方法如下,deliveryTag是传递标识,multiple是否批量确认,requeue是否重新投递。

1
2
void basicNack(long deliveryTag, boolean multiple, boolean requeue)
throws IOException;

当multiple为true的时候,开启批量确认,我们看看下面的例子。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
public static void main(String[] args) throws IOException, TimeoutException {
// 声明一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 创建一个与rabbitmq服务器的连接
Connection connection = factory.newConnection();
// 创建一个Channel
Channel channel = connection.createChannel();
// 通过Channel定义队列
channel.queueDeclare("reject", false, false, false, null);
// 异步回调处理
DeliverCallback deliverCallback1 = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println("reject message '" + message + "'" + delivery.getEnvelope().getDeliveryTag());
if (delivery.getEnvelope().getDeliveryTag() % 3 == 0) {
channel.basicNack(delivery.getEnvelope().getDeliveryTag(), true, false);
}
};
// 接收消息
channel.basicConsume("reject", false, deliverCallback1, consumerTag -> {
});
}

先发送两条消息
image.png
web控制台如下:
image.png
再发送一条消息
image.png
web控制台
image.png
虽然我们只拒绝了第三条,还是把所有的都拒绝了。
如果把multiple设置为false,可以看到,仅仅拒绝了第三条消息,这边不做演示。