ActiveMQ 静态网络连接

当系统需要高性能以及扩展性的时候,单个broker已经不满足我们的需求了,我们可以把多个broker连接起来,来达到我们需要的效果。ActiveMQ支持双向的网络连接通道,如下图所示
image.png
多个broker之间的连接,有两种方式,一个是静态网络连接,一个是动态网络连接。

静态网络连接

ActiveMQ提供了多种协议,配置transportConnectors的时候,根据不同的协议配置uri,比如:

阅读更多

ActiveMQ 消息

消息类型及属性

消息的几种类型,包括TextMessage、StreamMessage、ObjectMessage、MapMessage、BytesMessage、BlobMessage。属性包括boolean、byte、double、float、int、long、object、short、string。下面看看在代码中是怎么发送和接收的。

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
MessageProducer producer = null;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
Destination destination = session.createQueue("test.message");
// 创建一个生产者
producer = session.createProducer(destination);
// 属性设置
sendProperty(session, producer);
// 各种消息发送
sendTextMessage(session, producer);
sendStreamMessage(session, producer);
sendObjectMessage(session, producer);
sendMapMessage(session, producer);
sendBytesMessage(session, producer);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}

private static void sendBytesMessage(Session session, MessageProducer producer) throws JMSException {
BytesMessage bytesMessage = session.createBytesMessage();
bytesMessage.writeBytes("hello".getBytes());
producer.send(bytesMessage);
}

private static void sendMapMessage(Session session, MessageProducer producer) throws JMSException {
MapMessage mapMessage = session.createMapMessage();
mapMessage.setString("name", "张三");
producer.send(mapMessage);
}

private static void sendObjectMessage(Session session, MessageProducer producer) throws JMSException {
ObjectMessage objectMessage = session.createObjectMessage(new Person("张三", 18));
producer.send(objectMessage);
}

private static void sendStreamMessage(Session session, MessageProducer producer) throws JMSException {
StreamMessage message = session.createStreamMessage();
message.writeString("test.stream");
producer.send(message);
}

public static void sendProperty(Session session, MessageProducer producer) throws JMSException {
// JMS提供了boolean、byte、double、float、int、long、object、short、string的属性设置类型
// 这边只演示string
// 创建一个消息
Message message = session.createMessage();
// 设置熟悉
message.setStringProperty("name", "张三");
// 发送消息
producer.send(message);
}

public static void sendTextMessage(Session session, MessageProducer producer) throws JMSException {
// 创建一个消息
TextMessage textMessage = session.createTextMessage();
textMessage.setText("this is test.message");
// 发送消息
producer.send(textMessage);
}
阅读更多

ActiveMQ request/response

官方文章参考: how-should-i-implement-request-response-with-jms

在生产过程中,我们有可能需要消费者消费完后,把消息发给对应的生产者。
这种请求-响应的最佳方法是在启动时为每个生产者创建一个临时队列和使用者,将每个消息上的JMSReplyTo属性设置为临时队列,然后在每个消息上使用correlationID将请求消息与响应消息关联起来。这避免了为每个请求创建和关闭使用者的开销(这是昂贵的)。这也意味着你可以共享同一个生产者。
消费者消费完消息后,通过JMSReplyTo属性和correlationID将消息返回给生产者。
流程图如下:
image.png

阅读更多

ActiveMQ 点对点模式和发布订阅模式

点对点

生产者

步骤如下:

  1. 创建一个JMS的连接工厂
  2. 从这个连接工厂获取一个JMS连接
  3. 开始JMS连接
  4. 从这个连接创建一个JMS的session
  5. 通过session创建一个Destination
  6. 通过session创建一个生产者
  7. 发送消息

代码如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
public static void main(String[] args) {
ConnectionFactory connectionFactory;
Connection connection = null;
Session session = null;
Destination destination;
MessageProducer producer = null;
Message message;
boolean useTransaction = false;
try {
// 创建一个ConnectionFactory
connectionFactory = new ActiveMQConnectionFactory();
// 创建一个Connection
connection = connectionFactory.createConnection();
// 启动消息传递的连接
connection.start();
// 创建一个session,消息不持久化以及自动确认提交
session = connection.createSession(useTransaction, Session.AUTO_ACKNOWLEDGE);
// 创建一个destination,把消息发送到test.queue
destination = session.createQueue("test.queue");
// 创建一个生产者
producer = session.createProducer(destination);
// 创建一个消息
message = session.createTextMessage("this is test.queue");
// 发送消息
producer.send(message);
} catch (JMSException e) {
e.printStackTrace();
} finally {
try {
if (producer != null) {
producer.close();
}
if (session != null) {
session.close();
}
if (connection != null) {
connection.close();
}
} catch (JMSException e) {
e.printStackTrace();
}
}
}
阅读更多

ActiveMQ 基本概念

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,提供了高可用、高性能、可伸缩性等。

为什么要用消息中间件

在分布式系统设计架构中,系统之间的偶尔是非常重要的,消息中间件可以用来解耦。RPC框架也可以用来解耦,但两个有不一样的地方:
RPC框架:
image.png
如上图所示,在RPC框架中,One应用通过网络直接调用Two应用,这就要保证Two应用是可用的,如果Two应用是不可用的,那这个调用就失败了。
消息中间件:
image.png
如上图所示,One应用把消息推送给消息中间件,Two应用再从消息中间件接收消息,在这个过程中,One应用和Two应用,是可以不知道对方的状态(比如是否不可用,用哪些语言),甚至不关心发送消息或者处理消息的是谁,这种情况下,两个应用的耦合度就不会那么高了。

阅读更多