参考官网。
并发编程有,有个DelayQueue处理延迟消息的场景,在分布式系统中,我们可以用消息队列的延迟队列来实现。
activemq5.4版本后,可以在xml配置文件中,增加一个broker的schedulerSupport为true的属性,就可以启用延时和定时的消息投递。
属性 |
类型 |
说明 |
AMQ_SCHEDULED_DELAY |
long |
消息投递的延迟时间 |
AMQ_SCHEDULED_PERIOD |
long |
重复投递消息的时间间隔 |
AMQ_SCHEDULED_REPEAT |
int |
重复调度投递消息的次数 |
AMQ_SCHEDULED_CRON |
String |
使用Cron设置时间表 |
示例
延迟10秒发送,重复发送5次,每次间隔3秒。
生产者主要代码如下,把属性设置在message的属性里:
1 2 3 4 5 6
| long delay = 10 * 1000; long period = 3 * 1000; int repeat = 4; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat);
|
完整代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| public static void main(String[] args) { ConnectionFactory connectionFactory; ActiveMQConnection connection = null; Session session = null; Destination destination; MessageProducer producer = null; Message message; boolean useTransaction = false; try { connectionFactory = new ActiveMQConnectionFactory(); connection = (ActiveMQConnection)connectionFactory.createConnection();
connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test.scheduler"); producer = session.createProducer(destination); SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); message = session.createTextMessage(df.format(new Date())); long delay = 10 * 1000; long period = 3 * 1000; int repeat = 4; message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_DELAY, delay); message.setLongProperty(ScheduledMessage.AMQ_SCHEDULED_PERIOD, period); message.setIntProperty(ScheduledMessage.AMQ_SCHEDULED_REPEAT, repeat); producer.send(message); } catch (JMSException e) { e.printStackTrace(); } finally { try { if (producer != null) { producer.close(); } if (session != null) { session.close(); } if (connection != null) { connection.close(); } } catch (JMSException e) { e.printStackTrace(); } } }
|
消费者代码如下:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| public static void main(String[] args) { ConnectionFactory connectionFactory; ActiveMQConnection connection = null; Session session = null; Destination destination; MessageConsumer consumer = null; boolean useTransaction = false; try { connectionFactory = new ActiveMQConnectionFactory(); connection = (ActiveMQConnection) connectionFactory.createConnection(); RedeliveryPolicy policy = connection.getRedeliveryPolicy(); policy.setMaximumRedeliveries(2); connection.start(); session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE); destination = session.createQueue("test.scheduler"); consumer = session.createConsumer(destination); consumer.setMessageListener(new MessageListener() { @Override public void onMessage(Message message) { try { SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss"); System.out.println("consumer receive:" + df.format(new Date()) + "---" + ((TextMessage) message).getText()); } catch (JMSException e) { e.printStackTrace(); } } }); } catch (JMSException e) { e.printStackTrace(); } finally {
} }
|
先运行消费者,再运行生产者,控制台打印如下,可以看出第一次打印的时候,距离发送时间相差10秒,而后每次接收消息的时间与上一次差3秒。