ActiveMQ 消息持久化

消息存储

ActiveMQ有点对点和发布订阅两种方式,这两种的消息存储还是有稍微一点区别。

点对点

队列的存储比较简单,就是先进先出(FIFO),只有当该消息已被消费和确认可以删除消息存储。如果没有被确认,其他消费者是不能获取消息的。
image.png
看看下面的例子:
生产者发送了10条消息:

阅读更多

ActiveMQ MasterSlave

MasterSlave方式中,master提供服务,slave备份master的数据。当master挂掉后,slave会变成master继续工作。

Shared File System Master Slave

基于共享文件系统,第一个获取文件上的独占锁的broker,就是master,如果这个broker挂了,其他broker获取文件上的独占锁,就从slave变成master。
broker61616、broker61618、broker61619的persistenceAdapter配置如下:

阅读更多

ActiveMQ 动态网络连接

静态网络连接是通过显示的定义网络地址,这无疑给我们的工作量带来了一定的麻烦,现在看看动态的网络连接。
多播的默认格式如下:

1
multicast://ipaddress:port?key=value

例子

broker和broker用的是多播协议:通过IP进行一对多通信网络,生产者使用这个地址作为数据的目的地,而消费者使用它来表达他们对数据关注的来源。
client和broker用的是Discovery协议,它将使用多播来发现可用的broker然后随机选择一个连接到broker。
61616的activemq.xml配置

阅读更多

ActiveMQ 容错

Failover协议

如下图所示,生产者往broker61616发送消息,消费者通过broker61618接收消息。broker61616和broker61618通过networkConnectors连接。
image.png
Consumer的代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
private static final String BROKEURL = "failover:(tcp://0.0.0.0:61618,tcp://0.0.0.0:61616)";
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageConsumer consumer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory(BROKEURL);
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.failover");
// 创建一个消费者
consumer = session.createConsumer(destination);
// 接收一个消息
while (null != (message = consumer.receive())) {
System.out.println("consumer receive:" + ((TextMessage) message).getText());
Thread.sleep(1000);
}
} catch (JMSException e) {
e.printStackTrace();
} catch (InterruptedException e) {
e.printStackTrace();
} finally {

}
}
阅读更多

ActiveMQ 静态网络连接

当系统需要高性能以及扩展性的时候,单个broker已经不满足我们的需求了,我们可以把多个broker连接起来,来达到我们需要的效果。ActiveMQ支持双向的网络连接通道,如下图所示
image.png
多个broker之间的连接,有两种方式,一个是静态网络连接,一个是动态网络连接。

静态网络连接

ActiveMQ提供了多种协议,配置transportConnectors的时候,根据不同的协议配置uri,比如:

阅读更多

ActiveMQ 消息

消息类型及属性

消息的几种类型,包括TextMessage、StreamMessage、ObjectMessage、MapMessage、BytesMessage、BlobMessage。属性包括boolean、byte、double、float、int、long、object、short、string。下面看看在代码中是怎么发送和接收的。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
MessageProducer producer = null;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
Destination destination = session.createQueue("test.message");
// 创建一个生产者
producer = session.createProducer(destination);
// 属性设置
sendProperty(session, producer);
// 各种消息发送
sendTextMessage(session, producer);
sendStreamMessage(session, producer);
sendObjectMessage(session, producer);
sendMapMessage(session, producer);
sendBytesMessage(session, producer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

private static void sendBytesMessage(Session session, MessageProducer producer) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("hello".getBytes());
producer.send(bytesMessage);
}

private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "张三");
producer.send(mapMessage);
}

private static void sendObjectMessage(Session session, MessageProducer producer) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage(new Person("张三", 18));
producer.send(objectMessage);
}

private static void sendStreamMessage(Session session, MessageProducer producer) throws JMSException {
StreamMessage message = session.createStreamMessage();
message.writeString("test.stream");
producer.send(message);
}

public static void sendProperty(Session session, MessageProducer producer) throws JMSException {
// JMS提供了boolean、byte、double、float、int、long、object、short、string的属性设置类型
// 这边只演示string
// 创建一个消息
Message message = session.createMessage();
// 设置熟悉
message.setStringProperty("name", "张三");
// 发送消息
producer.send(message);
}

public static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
// 创建一个消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("this is test.message");
// 发送消息
producer.send(textMessage);
}
阅读更多

ActiveMQ request/response

官方文章参考: how-should-i-implement-request-response-with-jms

在生产过程中,我们有可能需要消费者消费完后,把消息发给对应的生产者。
这种请求-响应的最佳方法是在启动时为每个生产者创建一个临时队列和使用者,将每个消息上的JMSReplyTo属性设置为临时队列,然后在每个消息上使用correlationID将请求消息与响应消息关联起来。这避免了为每个请求创建和关闭使用者的开销(这是昂贵的)。这也意味着你可以共享同一个生产者。
消费者消费完消息后,通过JMSReplyTo属性和correlationID将消息返回给生产者。
流程图如下:
image.png

阅读更多

ActiveMQ 点对点模式和发布订阅模式

点对点

生产者

步骤如下:

  1. 创建一个JMS的连接工厂
  2. 从这个连接工厂获取一个JMS连接
  3. 开始JMS连接
  4. 从这个连接创建一个JMS的session
  5. 通过session创建一个Destination
  6. 通过session创建一个生产者
  7. 发送消息

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageProducer producer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session,消息不持久化以及自动确认提交
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.queue");
// 创建一个生产者
producer = session.createProducer(destination);
// 创建一个消息
message = session.createTextMessage("this is test.queue");
// 发送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
阅读更多

ActiveMQ 基本概念

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,提供了高可用、高性能、可伸缩性等。

为什么要用消息中间件

在分布式系统设计架构中,系统之间的偶尔是非常重要的,消息中间件可以用来解耦。RPC框架也可以用来解耦,但两个有不一样的地方:
RPC框架:
image.png
如上图所示,在RPC框架中,One应用通过网络直接调用Two应用,这就要保证Two应用是可用的,如果Two应用是不可用的,那这个调用就失败了。
消息中间件:
image.png
如上图所示,One应用把消息推送给消息中间件,Two应用再从消息中间件接收消息,在这个过程中,One应用和Two应用,是可以不知道对方的状态(比如是否不可用,用哪些语言),甚至不关心发送消息或者处理消息的是谁,这种情况下,两个应用的耦合度就不会那么高了。

阅读更多

RabbitMQ request/response

通过设置消息的属性来的。

消息的属性:

阅读更多

© 2024 Jeremy Tsai  Powered by Hexo & Icarus

© JeremyTsai

属性值 描述
Delivery mode 是否持久化,1为不持久化,2为持久化
Type 应用程序特定的消息类型
Headers 用户自定义的其他属性
Content type