RabbitMQ 概述

一、Rabbit 概述

RabbitMQ 是一个开源的消息代理和队列服务器,用来通过普通协议在完全不同的应用中间共享数据,RabbitMQ 是使用 Erlang 语言来编写的,并且 RabbitMQ 是基于 AMQP 协议的。

特点:

  • 开源、性能优秀

    Erlang 语言最初用在交换机的架构模式,这样使得 RabbitMQ 在 Broker 之间进行数据交互的性能时非常优秀的。Erlang 的优点:Erlang 有着和原生 Socket 一样的延迟。

阅读更多

RabbitMQ request/response

通过设置消息的属性来的。

消息的属性:

阅读更多

RabbitMQ 死信队列

rabbitmq也有死信队列,以下几种情况会有把消息投递到死信队列:

  • 消息被拒绝,且requeue设置为false。
  • 消息过期(队列过期并不会把消息投递给死信队列)
  • 由于超过了队列的消息最大数被抛弃

消息投递给死信队列的时候,也会经过交换器,这个交换器称之为死信交换器,但是他依然是一个正常的交换器。
要设置队列的死信交换,在声明队列时需要指定可选的x-dead-letter-exchange参数。主要是下面的代码:

属性值 描述
Delivery mode 是否持久化,1为不持久化,2为持久化
Type 应用程序特定的消息类型
Headers 用户自定义的其他属性
Content type
1
2
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-dead-letter-exchange", "some.exchange.name");
阅读更多

RabbitMQ 延迟队列

rabbitmq的延迟队列,我们可以通过死信交换器来实现。
生产者发送消息,定义2秒后消息过期,消息就会进入死信交换器,最后到死信队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
// 定义队列的名称
public final static String QUEUE_NAME = "queue.scheduler";
// 定义交换器的名称
public final static String EXCHANGE_NAME = "exchange.scheduler";
// 定义路由的名称
public final static String ROUTE_NAME = "route.scheduler";
// 定义死信队列的名称
public final static String DLX_QUEUE_NAME = "scheduler.queue.name";
// 定义死信交换器的名称
public final static String DLX_EXCHANGE_NAME = "scheduler.exchange.name";

public static void main(String[] args) throws IOException, TimeoutException {
// 声明一个连接工厂
ConnectionFactory factory = new ConnectionFactory();
// 创建一个与rabbitmq服务器的连接
// 创建一个Channel
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {
// 定义交换器
channel.exchangeDeclare(EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null);
Map<String, Object> arguments = new HashMap<String, Object>();
arguments.put("x-dead-letter-exchange", DLX_EXCHANGE_NAME);
arguments.put("x-message-ttl", 2000);
// 定义队列
channel.queueDeclare(QUEUE_NAME, false, false, false, arguments);
// 绑定队列
channel.queueBind(QUEUE_NAME, EXCHANGE_NAME, ROUTE_NAME);
// 定义死信交换器
channel.exchangeDeclare(DLX_EXCHANGE_NAME, BuiltinExchangeType.DIRECT, false, false, null);
// 定义死信队列
channel.queueDeclare(DLX_QUEUE_NAME, false, false, false, null);
// 绑定死信队列
channel.queueBind(DLX_QUEUE_NAME, DLX_EXCHANGE_NAME, ROUTE_NAME);
// 发送消息
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
channel.basicPublish(EXCHANGE_NAME, ROUTE_NAME, true, null, df.format(new Date()).getBytes());
}
}
阅读更多

RabbitMQ 发送方的可靠性

在消费端,当确认或者拒绝了消息后,rabbitmq才会把消息从消息里删除掉,在发送端,会有以下问题:

  • 发送给不存在的交换器
  • 发送给路由不到的队列
  • 网络故障导致中途丢失

事务

确保消息不丢失的唯一方法是使用事务,将每个消息或一组消息发布、提交的信道设置为事务性的。
在rabbitmq中,加事务也比较简单,就是调用txSelect()开启事务,调用txCommit()提交事务,调用txRollback()回滚事务。下面的例子中,如果有一条信息异常,则整个都不能发送。

阅读更多

RabbitMQ 备用交换器

当消息经过交换器准备路由给队列的时候,发现没有对应的队列可以投递信息,在rabbitmq中会默认丢弃消息,如果我们想要监测哪些消息被投递到没有对应的队列,我们可以用备用交换器来实现。
大概原理如下,如下图所示,消息发送给交换器,交换器发现没有可路由的队列,于是消息发给备用交换器,备用交换器再发给队列2,由队列2的消费者来处理消息。
image.png

示例

交换器的定义,需要一个参数,可以通过参数的方式,来指定备用交换器。参数的key是
alternate-exchange,value是交换器的名称。通常备用交换器的类型是fanout
生产者中,定义一个交换器和备用交换器,此时并没有响应路由的队列。

阅读更多

RabbitMQ 交换器

在Rabbitmq中,消息发送给交换器,交换器根据一定的规则把消息发给队列,broker再把消息发送给消费者,或者发送至主动从队列拉去消息。前面几张讲了队列的相关东西,这篇看看交换器是如何把消息发送给队列的。
hello-world-example-routing.webp.jpg

交换器

交换器接收消息并将其路由到零个或多个队列,它支持四种交换类型:DirectFanoutTopicHeaders。还还声明了一些属性,其中最重要的是:交换器的名称、交换器类型、是否持久化、是否自动删除、参数。
是否持久化,决定了rabbitmq重启后,交换器是否存在。是否自动删除,决定了当最后一个队列被解除绑定时,交换器是否被删除。

阅读更多

RabbitMQ 消息拒绝

当消费者收到消息后,需要对消息进行确认,队列才会把这个消息删除。如果消息处理中发生了异常需要拒绝消息怎么办呢?在这个章节中,我们看到了没确认消息时,如果断开了和rabbitmq的连接,消息会回到待发送那边,等待其他消费者,虽然我们可以通过关闭连接来拒绝消息,但是频繁的频繁的建立连接、关闭连接,会增加rabbitmq的负担。rabbitmq提供了另外一种优雅的方式来拒绝消息,方法如下:

1
void basicReject(long deliveryTag, boolean requeue) throws IOException
阅读更多

RabbitMQ 消息预取

消息预取,避免了rabbitmq一直往消费端发送数据,导致消费端出现无限制的缓冲区问题。消息预取定义了信道上或者消费者允许的最大未确认的消息数量。一旦未确认数达到了设置的值,RabbitMQ将停止传递更多消息,除非至少有一条未完成的消息得到确认。
使用消息预取的时候,会调用chanel的basicQos方法,prefetchCount是未确认的消息数,global默认值为false,是限制消费者未确认的消息数,设置为true的时候,是限制信道上的未确认消息数。

1
void basicQos(int prefetchCount, boolean global) throws IOException;
阅读更多

RabbitMQ 消息确认

接收消息的时候,有两个方式,一个是consume,一个是get,这两个方法都有一个autoAck的参数。当我们设置为true的时候,说明消费者会通过AMQP显示的向rabbitmq发送一个确认,rabbitmq自动视其确认了消息,然后把消息从队列中删除。下面用consume的方式做些例子来理解autoAck的参数设置。

1
2
3
String basicConsume(String queue, boolean autoAck, DeliverCallback deliverCallback, CancelCallback cancelCallback) throws IOException;

GetResponse basicGet(String queue, boolean autoAck) throws IOException;
阅读更多