ActiveMQ 死信队列

参考官网
当消息已过期或者无法再投递的时候,就会移动到死信队列(Dead Letter Queue)。
在以下情况,消息会重新发送到客户端:

  • 使用事务的时候,调用了session的rollback方法
  • 使用事务的时候,在会话关闭后再commit
  • 使用CLIENT_ACKNOWLEDGE消息确认机制的时候,调用session的recover方法
  • 客户端连接超时

RedeliveryPolicy

消息重发的实体类的几个属性如下:

属性 默认值 说明
collisionAvoidanceFactor 0.15 设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。
maximumRedeliveries 6 最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
maximumRedeliveryDelay -1 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
initialRedeliveryDelay 1000L 初始重发延迟时间
redeliveryDelay 1000L 重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4)
useCollisionAvoidance false 启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。
useExponentialBackOff false 启用指数倍数递增的方式增加延迟时间。
backOffMultiplier 5 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。

重发的设置有两种方式,一个是通过代码来设置的:

1
2
3
4
5
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
policy.setInitialRedeliveryDelay(500);
policy.setBackOffMultiplier(2);
policy.setUseExponentialBackOff(true);
policy.setMaximumRedeliveries(2);

一个是通过activemq.xml配置文件来设置的:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
<broker>

<destinationPolicy>
<policyMap>
<policyEntries>
<!-- Set the following policy on all queues using the '>' wildcard -->
<policyEntry queue=">">
<deadLetterStrategy>
<!--
Use the prefix 'DLQ.' for the destination name, and make
the DLQ a queue rather than a topic
-->
<individualDeadLetterStrategy queuePrefix="DLQ." useQueueForQueueMessages="true"/>
</deadLetterStrategy>
</policyEntry>
</policyEntries>
</policyMap>
</destinationPolicy>

</broker>

消息重发示例

生产者,跟之前的ptp的代码一样

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
public static void main(String[] args) {
ConnectionFactory connectionFactory;
ActiveMQConnection connection = null;
Session session = null;
Destination destination;
MessageProducer producer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = (ActiveMQConnection)connectionFactory.createConnection();

// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.policy");
// 创建一个生产者
producer = session.createProducer(destination);
// 创建一个消息
message = session.createTextMessage("this is test.policy");
// 发送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

消费者,我们这边通过policy.setMaximumRedeliveries(2)设置重发次数为2,自动确认模式为AUTO_ACKNOWLEDGE,在监听中抛异常,使消息不能正确的消费。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
public static void main(String[] args) {
ConnectionFactory connectionFactory;
ActiveMQConnection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = (ActiveMQConnection)connectionFactory.createConnection();
RedeliveryPolicy policy = connection.getRedeliveryPolicy();
// 最大重试次数
policy.setMaximumRedeliveries(2);
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.policy");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收一个消息
consumer.setMessageListener(new MessageListener() {
@Override
public void onMessage(Message message) {
try {
System.out.println("consumer receive:" + ((TextMessage) message).getText());
} catch (JMSException e) {
e.printStackTrace();
}
throw new RuntimeException();
}
});
} catch (JMSException e) {
e.printStackTrace();
} finally {

}
}

运行生产者代码,数据库数据如下:
image.png
再运行消费者代码,可以看出,在打印出三次test.policy后,数据库数据如下,队列变成了ActiveMQ.DLQ。之所以打印三次,是因为原先一次,加上重试两次,就打印三次。
image.png
admin页面如下:
image.png
如果我们要处理死信队列的内容,也很简单,就相当于消费者消费队列名称是ActiveMQ.DLQ。