ActiveMQ 消息可靠性

1
2
Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException;

我们创建会话的时候,会有两个参数,第一个是是否开启事务,第二个是消息的确认机制。

事务

当transacted为true的时候,说明会话开启事务,这个时候,后面的acknowledgeMode这个参数就失效了。
类似于JDBC,开启事务的时候,session也需要调用commit跟broker通讯,生产者通过commit方法把消息提交给broker,消费者通过commit方法和broker确认消息已收到。

示例

生产者开启了事务,当消息发送4条的时候执行commit方法。这边以ptp方式为例,topic过程一样,不在累述。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageProducer producer = null;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.ptp.transaction");
// 创建一个生产者
producer = session.createProducer(destination);
for (int i = 0; i < 6; i++) {
// 创建一个消息
Message message = session.createTextMessage("test.transaction" + i);
// 发送消息
producer.send(message);
if (i == 3) {
session.commit();
}
}
session.rollback();
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

此时在数据库的数据如下:
image.png
由于只提交了前面4个,所以持久化数据库的时候,就只有4条数据。
消费者也开启了事务,只处理3个消息就commit。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.ptp.transaction");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收一个消息
int num = 0;
while (null != (message = consumer.receive())) {
System.out.println("consumer receive:" + ((TextMessage) message).getText());
if (2 == num++) {
session.commit();
}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

此时数据库的数据如下:
image.png
由于只处理了三个就提交,所以数据库还有一个数据未提交。

确认机制

消息的确认机制有4种:

  • AUTO_ACKNOWLEDGE = 1:自动确认
  • CLIENT_ACKNOWLEDGE = 2:客户端手动确认
  • DUPS_OK_ACKNOWLEDGE = 3:自动批量确认
  • SESSION_TRANSACTED = 0:开启事务,如果事务为false,会抛异常

消息的确认机制,就是跟broker说我消息已经收到了,不要再发了。如果没有确认,broker会重复发送,直至确认,或者累计次数达到阈值(默认6)的时候,就停止发送。
使用CLIENT_ACKNOWLEDGE消息确认机制,客户端手动确认的时候,会调用message.acknowledge()的方法来确认消息,我们看看下面的例子。
生产者发送6条数据:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageProducer producer = null;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.ptp.ack");
// 创建一个生产者
producer = session.createProducer(destination);
for (int i = 0; i < 6; i++) {
// 创建一个消息
Message message = session.createTextMessage("test.ack" + i);
// 发送消息
producer.send(message);
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

数据库的数据如下,已经有6条数据了。
image.png
消费者在消费第三个消息的时候,抛出异常,其他直接调用acknowledge方法。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.ptp.ack");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收一个消息
int num = 0;
while (null != (message = consumer.receive())) {
String text = ((TextMessage) message).getText();
System.out.println("consumer receive:" + text);
try{
if("test.ack2".equals(text)){
throw new RuntimeException();
}
message.acknowledge();
}catch (Exception e){

}
}
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (consumer != null) {
consumer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

查看数据库,发现数据都被消费了,我们不是让test.ack2这个抛异常不确认吗,为什么还是被消费了?
这是因为消息接收后,会存放在deliveredMessages中,test.ack2没有被确认,就还在deliveredMessages中,当test.ack3调用acknowledge的时候,就会把deliveredMessages都确认了。具体查看ActiveMQMessageConsumer的acknowledge方法:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public void acknowledge() throws JMSException {
clearDeliveredList();
waitForRedeliveries();
synchronized(deliveredMessages) {
// Acknowledge all messages so far.
MessageAck ack = makeAckForAllDeliveredMessages(MessageAck.STANDARD_ACK_TYPE);
if (ack == null) {
return; // no msgs
}

if (session.getTransacted()) {
rollbackOnFailedRecoveryRedelivery();
session.doStartTransaction();
ack.setTransactionId(session.getTransactionContext().getTransactionId());
}

pendingAck = null;
session.sendAck(ack);

// Adjust the counters
deliveredCounter = Math.max(0, deliveredCounter - deliveredMessages.size());
additionalWindowSize = Math.max(0, additionalWindowSize - deliveredMessages.size());

if (!session.getTransacted()) {
deliveredMessages.clear();
}
}
}