参考官网 。 当消息已过期或者无法再投递的时候,就会移动到死信队列(Dead Letter Queue)。 在以下情况,消息会重新发送到客户端:
使用事务的时候,调用了session的rollback方法
使用事务的时候,在会话关闭后再commit
使用CLIENT_ACKNOWLEDGE消息确认机制的时候,调用session的recover方法
客户端连接超时
RedeliveryPolicy 消息重发的实体类的几个属性如下:
属性
默认值
说明
collisionAvoidanceFactor
0.15
设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。
maximumRedeliveries
6
最大重传次数,达到最大重连次数后抛出异常。为-1时不限制次数,为0时表示不进行重传。
maximumRedeliveryDelay
-1
最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
initialRedeliveryDelay
1000L
初始重发延迟时间
redeliveryDelay
1000L
重发延迟时间,当initialRedeliveryDelay=0时生效(v5.4)
useCollisionAvoidance
false
启用防止冲突功能,因为消息接收时是可以使用多线程并发处理的,应该是为了重发的安全性,避开所有并发线程都在同一个时间点进行消息接收处理。所有线程在同一个时间点处理时会发生什么问题呢?应该没有问题,只是为了平衡broker处理性能,不会有时很忙,有时很空闲。
useExponentialBackOff
false
启用指数倍数递增的方式增加延迟时间。
backOffMultiplier
5
重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。
重发的设置有两种方式,一个是通过代码来设置的:
1 2 3 4 5 RedeliveryPolicy policy = connection .getRedeliveryPolicy(); policy .setInitialRedeliveryDelay(500 );policy .setBackOffMultiplier(2 );policy .setUseExponentialBackOff(true );policy .setMaximumRedeliveries(2 );
一个是通过activemq.xml配置文件来设置的:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 <broker > <destinationPolicy > <policyMap > <policyEntries > <policyEntry queue =">" > <deadLetterStrategy > <individualDeadLetterStrategy queuePrefix ="DLQ." useQueueForQueueMessages ="true" /> </deadLetterStrategy > </policyEntry > </policyEntries > </policyMap > </destinationPolicy > </broker >
消息重发示例 生产者,跟之前的ptp的代码一样
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 public static void main (String[] args) { ConnectionFactory connectionFactory; ActiveMQConnection connection = null ; Session session = null ; Destination destination; MessageProducer producer = null ; Message message; boolean useTransaction = false ; try { connectionFactory = new ActiveMQConnectionFactory (); connection = (ActiveMQConnection)connectionFactory.createConnection(); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test.policy" ); producer = session.createProducer(destination); message = session.createTextMessage("this is test.policy" ); producer.send(message); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null ) { producer.close(); } if (session != null ) { session.close(); } if (connection != null ) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
消费者,我们这边通过policy.setMaximumRedeliveries(2)设置重发次数为2,自动确认模式为AUTO_ACKNOWLEDGE,在监听中抛异常,使消息不能正确的消费。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 public static void main (String[] args) { ConnectionFactory connectionFactory; ActiveMQConnection connection = null ; Session session = null ; Destination destination; MessageConsumer consumer = null ; boolean useTransaction = false ; try { connectionFactory = new ActiveMQConnectionFactory (); connection = (ActiveMQConnection)connectionFactory.createConnection(); RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setMaximumRedeliveries(2 ); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test.policy" ); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener () { @Override public void onMessage (Message message) { try { System.out.println("consumer receive:" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } throw new RuntimeException (); } }); } catch (JMSException e) { e.printStackTrace(); } finally { } }
运行生产者代码,数据库数据如下: 再运行消费者代码,可以看出,在打印出三次test.policy后,数据库数据如下,队列变成了ActiveMQ.DLQ。之所以打印三次,是因为原先一次,加上重试两次,就打印三次。 admin页面如下: 如果我们要处理死信队列的内容,也很简单,就相当于消费者消费队列名称是ActiveMQ.DLQ。