ActiveMQ 基本概念

ActiveMQ是一种开源的基于JMS(Java Message Servie)规范的一种消息中间件的实现,提供了高可用、高性能、可伸缩性等。

为什么要用消息中间件

在分布式系统设计架构中,系统之间的偶尔是非常重要的,消息中间件可以用来解耦。RPC框架也可以用来解耦,但两个有不一样的地方:
RPC框架:
image.png
如上图所示,在RPC框架中,One应用通过网络直接调用Two应用,这就要保证Two应用是可用的,如果Two应用是不可用的,那这个调用就失败了。
消息中间件:
image.png
如上图所示,One应用把消息推送给消息中间件,Two应用再从消息中间件接收消息,在这个过程中,One应用和Two应用,是可以不知道对方的状态(比如是否不可用,用哪些语言),甚至不关心发送消息或者处理消息的是谁,这种情况下,两个应用的耦合度就不会那么高了。

阅读更多

RabbitMQ request/response

通过设置消息的属性来的。

消息的属性:

阅读更多

RabbitMQ 死信队列

rabbitmq也有死信队列,以下几种情况会有把消息投递到死信队列:

  • 消息被拒绝,且requeue设置为false。
  • 消息过期(队列过期并不会把消息投递给死信队列)
  • 由于超过了队列的消息最大数被抛弃

消息投递给死信队列的时候,也会经过交换器,这个交换器称之为死信交换器,但是他依然是一个正常的交换器。
要设置队列的死信交换,在声明队列时需要指定可选的x-dead-letter-exchange参数。主要是下面的代码:

属性值 描述
Delivery mode 是否持久化,1为不持久化,2为持久化
Type 应用程序特定的消息类型
Headers 用户自定义的其他属性
Content type
1
2
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
阅读更多

Java基础知识

基础概念与常识

Java 语言有哪些特点?

  1. 简单易学;
  2. 面向对象(封装,继承,多态);
  3. 平台无关性( Java 虚拟机实现平台无关性);
  4. 支持多线程( C++ (C++11,2011年引入多线程库)语言没有内置的多线程机制,因此必须调用操作系统的多线程功能来进行多线程程序设计,而 Java 语言却提供了多线程支持);
阅读更多

RabbitMQ 延迟队列

rabbitmq的延迟队列,我们可以通过死信交换器来实现。
生产者发送消息,定义2秒后消息过期,消息就会进入死信交换器,最后到死信队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 定义队列的名称
public final static String QUEUE_NAME = "queue.scheduler";
// 定义交换器的名称
public final static String EXCHANGE_NAME = "exchange.scheduler";
// 定义路由的名称
public final static String ROUTE_NAME = "route.scheduler";
// 定义死信队列的名称
public final static String DLX_QUEUE_NAME = "scheduler.queue.name";
// 定义死信交换器的名称
public final static String DLX_EXCHANGE_NAME = "scheduler.exchange.name";

public static void main(String[] args) throws IOException, TimeoutException {
// 声明一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 创建一个与rabbitmq服务器的连接
// 创建一个Channel
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 定义交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null);
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
arguments.put("x-message-ttl", 2000);
// 定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
// 绑定队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_NAME);
// 定义死信交换器
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null);
// 定义死信队列
channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null);
// 绑定死信队列
channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, ROUTE_NAME);
// 发送消息
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, true, null, df.format(new Date()).getBytes());
}
}
阅读更多

RabbitMQ 发送方的可靠性

在消费端,当确认或者拒绝了消息后,rabbitmq才会把消息从消息里删除掉,在发送端,会有以下问题:

  • 发送给不存在的交换器
  • 发送给路由不到的队列
  • 网络故障导致中途丢失

事务

确保消息不丢失的唯一方法是使用事务,将每个消息或一组消息发布、提交的信道设置为事务性的。
在rabbitmq中,加事务也比较简单,就是调用txSelect()开启事务,调用txCommit()提交事务,调用txRollback()回滚事务。下面的例子中,如果有一条信息异常,则整个都不能发送。

阅读更多

RabbitMQ 备用交换器

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换器来实现。
大概原理如下,如下图所示,消息发送给交换器,交换器发现没有可路由的队列,于是消息发给备用交换器,备用交换器再发给队列2,由队列2的消费者来处理消息。
image.png

示例

交换器的定义,需要一个参数,可以通过参数的方式,来指定备用交换器。参数的key是
alternate-exchange,value是交换器的名称。通常备用交换器的类型是fanout
生产者中,定义一个交换器和备用交换器,此时并没有响应路由的队列。

阅读更多

RabbitMQ 交换器

在Rabbitmq中,消息发送给交换器,交换器根据一定的规则把消息发给队列,broker再把消息发送给消费者,或者发送至主动从队列拉去消息。前面几张讲了队列的相关东西,这篇看看交换器是如何把消息发送给队列的。
hello-world-example-routing.webp.jpg

交换器

交换器接收消息并将其路由到零个或多个队列,它支持四种交换类型:DirectFanoutTopicHeaders。还还声明了一些属性,其中最重要的是:交换器的名称、交换器类型、是否持久化、是否自动删除、参数。
是否持久化,决定了rabbitmq重启后,交换器是否存在。是否自动删除,决定了当最后一个队列被解除绑定时,交换器是否被删除。

阅读更多

RabbitMQ 消息拒绝

当消费者收到消息后,需要对消息进行确认,队列才会把这个消息删除。如果消息处理中发生了异常需要拒绝消息怎么办呢?在这个章节中,我们看到了没确认消息时,如果断开了和rabbitmq的连接,消息会回到待发送那边,等待其他消费者,虽然我们可以通过关闭连接来拒绝消息,但是频繁的频繁的建立连接、关闭连接,会增加rabbitmq的负担。rabbitmq提供了另外一种优雅的方式来拒绝消息,方法如下:

1
void basicReject(long deliveryTag, boolean requeue) throws IOException
阅读更多

RabbitMQ 消息预取

消息预取,避免了rabbitmq一直往消费端发送数据,导致消费端出现无限制的缓冲区问题。消息预取定义了信道上或者消费者允许的最大未确认的消息数量。一旦未确认数达到了设置的值,RabbitMQ将停止传递更多消息,除非至少有一条未完成的消息得到确认。
使用消息预取的时候,会调用chanel的basicQos方法,prefetchCount是未确认的消息数,global默认值为false,是限制消费者未确认的消息数,设置为true的时候,是限制信道上的未确认消息数。

1
void basicQos(int prefetchCount, boolean global) throws IOException;
阅读更多