ActiveMQ 延迟队列

activemq5.4版本后,可以在xml配置文件中,增加一个broker的schedulerSupport为true的属性,就可以启用延时和定时的消息投递。
阅读更多

ActiveMQ 死信队列

参考官网
当消息已过期或者无法再投递的时候,就会移动到死信队列(Dead Letter Queue)。
在以下情况,消息会重新发送到客户端:

  • 使用事务的时候,调用了session的rollback方法
  • 使用事务的时候,在会话关闭后再commit
  • 使用CLIENT_ACKNOWLEDGE消息确认机制的时候,调用session的recover方法
  • 客户端连接超时
阅读更多

ActiveMQ 虚拟 topics

当我们需要广播的时候,我们采用发布订阅模式。生产者往broker发送1,2,3,4,5,6,消费者1和消费者2都收到1,2,3,4,5,6。
image.png
当我们需要队列的时候,我们采用点对点模式,点对点模式中如果为了加快消息消费,我们就加多个消费者。生产者往broker发送1,2,3,4,5,6,消费者1收到1,3,5,消费者2收到2,4,6。
image.png
现在有个场景是这样的,在发布订阅模式中,由于生产者产生消息的速度超过了消费者1消费的速度,使消息一直积压,于是就想能不能既用发布订阅模式分发消息又用点对点模式加快消息消费呢?
image.png
activemq提供了一个虚拟topics的功能,首先元素的开头必须是VirtualTopic.开头的,消费端的格式是Consumer.<consumer name>.VirtualTopic.<VirtualTopicName>. ,比如生产者的是VirtualTopic.topic,那消费者A是Consumer.a.VirtualTopic.topic,a表示消费端组的名称,名字一样的话,会类似ptp一样消费。

阅读更多

ActiveMQ 通配符

消费者通过消息选择器来获取自己想要的消息,我们也可以通过另外通配符的方式,来获取自己想要的消息,当然两种还是有区别的。
通配符的方式有三种:

  • “.”:用于分隔路径名字
  • “*”:用于匹配路径的任何名字
  • “>”:用于匹配末尾的名称

示例

消费者1消费a.b.c的消息,消费者2消费a.*.c的消息,所以中间是什么他不在乎,消费者3消费a.开头的消息,所以a.后面是什么他不在乎。生产者往a.b.c和a.b.c发送数据。

阅读更多

ActiveMQ 发送多个消息到 Destination

有这样的一个业务场景,生产者发送消息后,一个以点对点的模式接收,一个已发布订阅的模式接收。当然我们可以做,生产者发送两个Destination,一个是点对点模式,一个是发布订阅模式。这样比较麻烦,activemq也提供了他的方法来处理这样的场景:

示例

生产者的主要代码如下:

1
session.createQueue("test.ptp,topic://test.topic");
阅读更多

ActiveMQ 消息持久化

消息存储

ActiveMQ有点对点和发布订阅两种方式,这两种的消息存储还是有稍微一点区别。

点对点

队列的存储比较简单,就是先进先出(FIFO),只有当该消息已被消费和确认可以删除消息存储。如果没有被确认,其他消费者是不能获取消息的。
image.png
看看下面的例子:
生产者发送了10条消息:

阅读更多

ActiveMQ MasterSlave

MasterSlave方式中,master提供服务,slave备份master的数据。当master挂掉后,slave会变成master继续工作。

Shared File System Master Slave

基于共享文件系统,第一个获取文件上的独占锁的broker,就是master,如果这个broker挂了,其他broker获取文件上的独占锁,就从slave变成master。
broker61616、broker61618、broker61619的persistenceAdapter配置如下:

阅读更多

ActiveMQ 动态网络连接

静态网络连接是通过显示的定义网络地址,这无疑给我们的工作量带来了一定的麻烦,现在看看动态的网络连接。
多播的默认格式如下:

1
multicast://ipaddress:port?key=value

例子

broker和broker用的是多播协议:通过IP进行一对多通信网络,生产者使用这个地址作为数据的目的地,而消费者使用它来表达他们对数据关注的来源。
client和broker用的是Discovery协议,它将使用多播来发现可用的broker然后随机选择一个连接到broker。
61616的activemq.xml配置

阅读更多

ActiveMQ 容错

Failover协议

如下图所示,生产者往broker61616发送消息,消费者通过broker61618接收消息。broker61616和broker61618通过networkConnectors连接。
image.png
Consumer的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static final String BROKEURL = "failover:(tcp://0.0.0.0:61618,tcp://0.0.0.0:61616)";
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory(BROKEURL);
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.failover");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer receive:" + ((TextMessage) message).getText());
Thread.sleep(1000);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {

}
}
阅读更多