消息预取,避免了rabbitmq一直往消费端发送数据,导致消费端出现无限制的缓冲区问题。消息预取定义了信道上或者消费者允许的最大未确认的消息数量。一旦未确认数达到了设置的值,RabbitMQ将停止传递更多消息,除非至少有一条未完成的消息得到确认。
使用消息预取的时候,会调用chanel的basicQos方法,prefetchCount是未确认的消息数,global默认值为false,是限制消费者未确认的消息数,设置为true的时候,是限制信道上的未确认消息数。
1
| void basicQos(int prefetchCount, boolean global) throws IOException;
|
消费者限制
global设置为false,当每个消费者有2个未确认的消息时,不能再发消息给消费者。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("qos", false, false, false, null); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); };
channel.basicQos(2, false); channel.basicConsume("qos", false, deliverCallback1, consumerTag -> { });
}
|
运行后,往队列发送了4条消息,可以看到,未发送(ready)有2个,未确认2个。
控制台确实只收到了两个消息。
如果把注释放开,同时有两个消费者,可以看到,未发送(ready)有0个,未确认4个。
控制台结果如下,两个都消费了两个。
信道限制
把上面两个消费者的global改为true,改为信道限制的方式。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("qos", false, false, false, null); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; channel.basicQos(2, true); channel.basicConsume("qos", false, deliverCallback1, consumerTag -> { }); channel.basicConsume("qos", false, deliverCallback2, consumerTag -> { }); }
|
可以看到,未发送(ready)有2个,未确认2个。
每个消费者都只消费了一个。因为此时,信道上未确认的消息数是2。
混合模式
即设置了信道限制又设置了消费者限制,那结果是怎么样的呢?
先设置消费端只能有2个未确认的消息,通道只能有3个未确认的消息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| public static void main(String[] args) throws IOException, TimeoutException { ConnectionFactory factory = new ConnectionFactory(); Connection connection = factory.newConnection(); Channel channel = connection.createChannel(); channel.queueDeclare("qos", false, false, false, null); DeliverCallback deliverCallback1 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback1 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; DeliverCallback deliverCallback2 = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println("deliverCallback2 Received '" + message + "'" + delivery.getEnvelope().getDeliveryTag()); }; channel.basicQos(2, false); channel.basicQos(3, true); channel.basicConsume("qos", false, deliverCallback1, consumerTag -> { }); channel.basicConsume("qos", false, deliverCallback2, consumerTag -> { }); }
|
运行后控制台如下,打印了三个消息,说明整个信道就只能有三个未确认的消息,第一个消费者有两个未确认的消息后不再接收,由第二个消费者接收。
web控制台信息,确实只有3个未确认的消息,还有1个待发送。
注意,如果换了顺序呢?
把下面的代码
1 2
| channel.basicQos(2, false); channel.basicQos(3, true);
|
换成,先控制信道的未确认的消息是3个,再控制消费者未确认的消息是2个
1 2
| channel.basicQos(3, true); channel.basicQos(2, false);
|
运行后,控制台如下,每个消费者都2个未确认的消息。此时信道的限制不生效了。