ActiveMQ 容错

Failover协议

如下图所示,生产者往broker61616发送消息,消费者通过broker61618接收消息。broker61616和broker61618通过networkConnectors连接。
image.png
Consumer的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static final String BROKEURL = "failover:(tcp://0.0.0.0:61618,tcp://0.0.0.0:61616)";
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory(BROKEURL);
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.failover");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer receive:" + ((TextMessage) message).getText());
Thread.sleep(1000);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {

}
}

Producer代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
private static final String BROKEURL = "failover:(tcp://0.0.0.0:61616,tcp://0.0.0.0:61618)";

public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageProducer producer = null;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory(BROKEURL);
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.failover");
// 创建一个生产者
producer = session.createProducer(destination);
for (int i = 0; i < 10; i++) {
// 创建一个消息
Message message = session.createTextMessage("this is test.failover:" + i);
// 发送消息
producer.send(message);
System.out.println(i);
}

} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

启动两个ActiveMQ,61616和61618。producer往61616发送消息,consumer从61618接收数据。消息发送完后,把61618给关闭了,此时,consumer会报如下错误:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
 INFO | Successfully connected to tcp://0.0.0.0:61618
consumer receive:this is test.failover:0
consumer receive:this is test.failover:1
consumer receive:this is test.failover:2
WARN | Transport (tcp://0.0.0.0:61618) failed , attempting to automatically reconnect: {}
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:210)
at java.net.SocketInputStream.read(SocketInputStream.java:141)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.fill(TcpBufferedInputStream.java:50)
at org.apache.activemq.transport.tcp.TcpTransport$2.fill(TcpTransport.java:634)
at org.apache.activemq.transport.tcp.TcpBufferedInputStream.read(TcpBufferedInputStream.java:59)
at org.apache.activemq.transport.tcp.TcpTransport$2.read(TcpTransport.java:619)
at java.io.DataInputStream.readInt(DataInputStream.java:387)
at org.apache.activemq.openwire.OpenWireFormat.unmarshal(OpenWireFormat.java:268)
at org.apache.activemq.transport.tcp.TcpTransport.readCommand(TcpTransport.java:240)
at org.apache.activemq.transport.tcp.TcpTransport.doRun(TcpTransport.java:232)
at org.apache.activemq.transport.tcp.TcpTransport.run(TcpTransport.java:215)
at java.lang.Thread.run(Thread.java:748)
INFO | Successfully reconnected to tcp://0.0.0.0:61616

61618连接不了,最后一句可以看到重新连到61616,完成故障转移。此时重启61618,61618的几个消息被继续消费:

1
2
3
4
5
6
7
consumer receive:this is test.failover:3
consumer receive:this is test.failover:4
consumer receive:this is test.failover:5
consumer receive:this is test.failover:6
consumer receive:this is test.failover:7
consumer receive:this is test.failover:8
consumer receive:this is test.failover:9

本来想看看destinationPolicy的replayWhenNoConsumers属性,让消息从broker61618回流到broker61616,这边没设置,已经回流。我的版本是apache-activemq-5.15.12。

配置值

属性 默认值 描述
initialReconnectDelay 10 在第一次尝试重新连接之前等待的时间长度(毫秒)
maxReconnectDelay 30000 最长重连时间间隔(毫秒)
useExponentialBackOff true 重连时间间隔是否以指数形式增长
backOffMultiplier 2.0 递增倍数
maxReconnectAttempts -1 自5.6版本开始,-1为默认值,代表不限重试次数,0标识从不重试(只尝试连接一次,并不重连),5.6以前的版本,0为默认值,代表不重试,如果设置大于0的数,则代表最大重试次数
startupMaxReconnectAttempts 0 初始化时的最大重试次数
randomize true 使用随机连接,以达到负载均衡的目的
backup false 提前初始化一个未使用的链接,以便进行快速的失败转移
trackMessages false 设置是否缓存(故障发生时)尚未传送完成的消息,当broker一旦重新连接成功,便将这些缓存中的消息刷新到新连接的代理中,使得消息可以在broker切换前后顺利传送
maxCacheSize 127*1024bytes 当trackMessage启动时,缓存的最大字节
updateURISupported true 设定是否可以动态修改broker uri(自5.4版本开始)