ActiveMQ 消息持久化

消息存储

ActiveMQ有点对点和发布订阅两种方式,这两种的消息存储还是有稍微一点区别。

点对点

队列的存储比较简单,就是先进先出(FIFO),只有当该消息已被消费和确认可以删除消息存储。如果没有被确认,其他消费者是不能获取消息的。
image.png
看看下面的例子:
生产者发送了10条消息:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageProducer producer = null;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.persistence");
// 创建一个生产者
producer = session.createProducer(destination);
// 创建消息
for (int i = 0; i < 10; i++) {
Message message = session.createTextMessage("this is test.persistence" + i);
// 发送消息
producer.send(message);
}

} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

消费者1消费5条,但是暂时没确认

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;

boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.persistence");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收消息
for (int i = 0; i < 5; i++) {
Message message = consumer.receive();
TimeUnit.SECONDS.sleep(1);
System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
if (i == 4) {
TimeUnit.SECONDS.sleep(10);
message.acknowledge();
}
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

消费者2此时就阻塞在获取消息上面,直到消费者1确认后才接收到消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;

boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.CLIENT_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.persistence");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收消息
for (int i = 0; i < 5; i++) {
Message message = consumer.receive();
System.out.println("consumer1 receive:" + ((TextMessage) message).getText());
if (i == 4) {
message.acknowledge();
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

发布订阅

在发布订阅中,每个消费者都会获取到消息的拷贝,为了节约空间,broker只存储了一份消息,并存储了每个消费者所消费的信息,这样每个消费者虽然有不同的消费进度,最终还是能一次获取到消息。如果消息被所有订阅者消费完了,broker就可以删除这个消息。
image.png

存储方式

ActiveMQ提供了多种存储方式,比如AMQ、KahaDB、JDBC、内存。

AMQ

参考官网
AMQ是早期版本的默认持久化存储方式,基于文件的事务存储,对于消息的存储进行了调优,速度还是非常快的。默认大小32M。当消息被成功使用时,就会被标记为清理或者存档,这个操作将在下个清理时发送。基本配置如下(其他参数详见官网):

1
2
3
4
5
6
7
<broker persistent="true" xmlns="http://activemq.apache.org/schema/core">
...
<persistenceAdapter>
<amqPersistenceAdapter/>
</persistenceAdapter>
...
</broker>

KahaDB

5.4以后默认的持久化存储方式,也是基于文件的,与AMQ不同的是,KahaDB采用了B-Tree存储的布局。拥有高性能和可扩展性等特点。基本配置如下:

1
2
3
4
5
6
7
<broker brokerName="broker" persistent="true" useShutdownHook="false">
...
<persistenceAdapter>
<kahaDB directory="activemq-data" journalMaxFileLength="32mb"/>
</persistenceAdapter>
...
</broker>

JDBC

JDBC有三个表,两个表存储消息,还有一个表当做锁用,保证只能一个代理访问broker。配置如下:
先把默认的kahaDB注释掉,再用下面的替换,注意这个配置是在broker下面。

1
2
3
<persistenceAdapter>
<jdbcPersistenceAdapter dataSource="#mysql-ds"/>
</persistenceAdapter>

然后再增加数据库配置,注意,这个配置是broker外面,跟其他bea同级。

1
2
3
4
5
6
7
8
9
<bean id="mysql-ds" class="org.apache.commons.dbcp.BasicDataSource"
destroy-method="close">
<property name="driverClassName" value="com.mysql.jdbc.Driver"/>
<property name="url" value="jdbc:mysql://localhost/activemq?relaxAutoCommit=true"/>
<property name="username" value="root"/>
<property name="password" value="123456"/>
<property name="maxActive" value="200"/>
<property name="poolPreparedStatements" value="true"/>
</bean>

在启动activemq.bat,之前,先建数据库,并且编码设置为latin1,不然ACTIVEMQ_ACKS表会创建失败导致启动不了。然后根据报错信息一次在lib下加入commons-dbcp-1.4.jar、commons-pool-1.5.4.jar、mysql-connector-java-5.1.36.jar包。如下所示,帮我们建了三个表:
image.png
activemq_acks用于保存持久化订阅信息。
image.png
activemq_lock,用于broker集群时的Master选举。
image.png
activemq_msgs,用于存储消息信息。
image.png

点对点

演示步骤如下:
1、 启动生产者,发送消息
2、 查看表数据如下:
image.png
3、 启动消费者,消息消费后表数据被删除

发布订阅

演示步骤如下:
1、 启动生产者,发送消息。
2、 查看表数据,并没有持久化,所以发布订阅默认不持久化的。
3、 启动消费者,没有消息被消费。
为了让消息可以被消费者消费,我们可以这样做:
消费者代码如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
// 这边的Topic
Topic destination;
TopicSubscriber consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 设置ClientId
connection.setClientID("ZhangSan");
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createTopic("test.topic.mysql");
// 创建一个消费者,调用createDurableSubscriber方法
consumer = session.createDurableSubscriber(destination,"my");
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer receive:" + ((TextMessage) message).getText());
}
} catch (JMSException e) {
e.printStackTrace();
} finally {

}
}

启动两个消费者者,activemq_acks表数据如下,已经有了两个数据
image.png
activemq的subscribers界面如下,也有两个数据,如果我们消费者下线了,就会到Offline Durable Topic Subscribers列表
image.png
修改消费者的代码后,生产者发送消息的时候,如果消费者在线,就直接消费,如果不在线,上线后还可以继续消费,下图是消费者消费了几次后,LAST_ACKED_ID变成了4。
image.png
发布订阅默认不持久化,所以生产者代码可以这样修改,加下面一句话

1
producer.setDeliveryMode(DeliveryMode.PERSISTENT);

生产者发送消息后,activemq_msgs表数据如下,与点对点不一样的是,这个消息被消费后,不会被删除。
image.png

日志类型的JDBC

由于JDBC的性能相对比较差,所以activemq还提供了日志类型的jdbc,确保了JMS事务的一致性。因为它整合了非常快的信息写入与缓存技术,它可以显着提高性能。配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker brokerName="test-broker" xmlns="http://activemq.apache.org/schema/core">
<persistenceAdapter>
<journaledJDBC dataDirectory="${activemq.base}/data" dataSource="#derby-ds"/>
</persistenceAdapter>
</broker>
<bean id="derby-ds" class="org.apache.derby.jdbc.EmbeddedDataSource">
<property name="databaseName" value="derbydb"/>
<property name="createDatabase" value="create"/>
</bean>
</beans>

虽然性能比jdbc快,但是他不支持master/slave。

内存

把消息存储在内存中,所以没有持久化的功能,因此要保证内存足够大,来缓存消息。

1
2
3
4
5
6
7
8
9
10
<?xml version="1.0" encoding="UTF-8"?>
<beans>
<broker brokerName="test-broker"
persistent="false"
xmlns="http://activemq.apache.org/schema/core">
<transportConnectors>
<transportConnector uri="tcp://localhost:61635"/>
</transportConnectors>
</broker>
</beans>