RocketMQ 安装
安装配置 jdk8
1. 上传jdk压缩文件
将文件jdk-8u212-linux-x64.tar.gz
上传到 /root 目录
2. 解压缩
执行解压命令
1 | 将jdk解压到 /usr/local/ 目录 |
3. 配置环境变量
修改 /etc/profile 配置文件, 配置环境变量
1 | vim /etc/profile |
修改完后, 让环境变量立即生效
1 | source /etc/profile |
4. 验证
1 | java -version |
安装 RocketMQ
1. 下载 rocketmq 二进制文件
1 | wget https://mirror.bit.edu.cn/apache/rocketmq/4.7.0/rocketmq-all-4.7.0-bin-release.zip |
2. 解压缩 rocketmq
将 rocketmq 解压到 /usr/local/ 目录
1 | unzip rocketmq-all-4.7.0-bin-release.zip -d /usr/local/ |
3. 配置环境变量 ROCKETMQ_HOME 和 PATH
为了后续操作方便可以配置环境变量,之后在任意位置都可以执行rocketmq的操作命令。
1 | vim /etc/profile |
修改完后, 让环境变量立即生效
1 | source /etc/profile |
4. 减小 rocketmq 使用的内存
rocketmq需要启动两个服务: name server
和 broker
, name server
默认配置JVM使用的内存是4g, broker
默认配置JVM使用的内存是8g.
开发环境中如果内存不足, 服务可能会无法启动, 可以通过降低两个服务的内存, 使服务可以正常启动, 也可以节省内存.
修改 name server
内存改为 256m
1 | cd /usr/local/rocketmq/ |
修改 broker
内存改为 256m
1 | 编辑 bin/runbroker.sh |
5. 启动 rocketmq
先启动 name server
1 | 进入 rocketmq 目录 |
再启动 broker
1 | 启动 broker, 连接name server: localhost:9876 |
6. 关闭防火墙
rocketmq的通信会用到多个端口, 为了方便测试我们关闭防火墙
1 | 关闭防火墙 |
测试
运行测试, 启动生产者发送消息, 启动消费者接收消息
1 | 通过环境变量, 告诉客户端程序name server的地址 |
RocketMQ 的关闭命令
关闭 broker
1 | mqshutdown broker |
关闭 nameserver
1 | mqshutdown namesrv |
管理界面
在开源项目 rocketmq-externals
中提供了rocketmq 的管理界面: 地址为: https://github.com/apache/rocketmq-externals
github 在国内访问缓慢, 也可以使用码云的镜像项目, 地址为: https://gitee.com/mirrors/RocketMQ-Externals
1. 克隆项目
1 | cd /usr/local/rocketmq/ |
2. maven打包管理界面项目
如果没有安装 maven, 请先执行 maven 安装命令
1 | yum install -y maven |
打包管理界面项目 rocketmq-console
.
打包过程中会下载各种依赖,比较缓慢,请耐心等待
1 | 进入管理界面项目的文件夹 |
3. 运行启动管理界面
打包的 jar 文件在 target 目录, 进入目录执行jar文件
1 | 进入 target 目录 |
访问管理界面:
RocketMQ双主双从同步复制集群方案
部署环境
作为测试环境,我们使用两台虚拟机来部署双主双从环境,具体结构如下:
整个集群由两个 name server 实例和四个 broker 实例组成
name server:
- 两台服务器分别启动两个name server
broker A 主从:
- 服务器1部署 broker A 主服务
- 服务器2部署 broker A 从服务
broker B 主从:
- 服务器2部署 broker B 主服务
- 服务器1部署 broker B 从服务
安装 Rocketmq
在两台虚拟机上安装 Rocketmq。或在一台虚拟机上装好后进行克隆。
建文件夹
在一台服务器上启动两个 broker 实例,需要为不同实例设置单独的数据存储目录。
为了方便起见,我们在两台服务器上都创建这四个实例所需要的的目录。
1 | mkdir /usr/local/rocketmq/store/ |
配置
在 rocketmq/conf
目录下提供了四种集群方案的配置样例
- 2m-2s-async:双主双从异步复制
- 2m-2s-sync:双主双从同步复制
- 2m-noslave:双主
- dledger: raft主从切换
这里我们选择双主双从同步复制方案。
- broker-a,a主服务器配置
在服务器1修改样例配置文件:rocketmq/conf/2m-2s-sync/broker-a.properties
在样例配置文件中,添加三项配置:
listenPort
:我们在一台服务器上要运行两个broker实例,所以两个实例的端口要有所区分。这里broker-a主服务器的端口使用默认的10911。storePathRootDir
:数据存储目录storePathCommitLog
:提交日志存储目录
1 | brokerClusterName=DefaultCluster |
- broker-a slave,a从服务器配置
在服务器2修改样例配置文件:rocketmq/conf/2m-2s-sync/broker-a-s.properties
在样例配置文件中,添加三项配置:
listenPort
:我们在一台服务器上要运行两个broker实例,所以两个实例的端口要有所区分。这里broker-a slave从服务器的端口使用11911。storePathRootDir
:数据存储目录storePathCommitLog
:提交日志存储目录
1 | brokerClusterName=DefaultCluster |
- broker-b,b主服务器配置
在服务器2修改样例配置文件:rocketmq/conf/2m-2s-sync/broker-b.properties
在样例配置文件中,添加三项配置:
listenPort
:我们在一台服务器上要运行两个broker实例,所以两个实例的端口要有所区分。这里broker-b主服务器的端口使用默认的10911。storePathRootDir
:数据存储目录storePathCommitLog
:提交日志存储目录
1 | brokerClusterName=DefaultCluster |
- broker-b slave,b从服务器配置
在服务器1修改样例配置文件:rocketmq/conf/2m-2s-sync/broker-b-s.properties
在样例配置文件中,添加三项配置:
listenPort
:我们在一台服务器上要运行两个broker实例,所以两个实例的端口要有所区分。这里broker-b slave从服务器的端口使用11911。storePathRootDir
:数据存储目录storePathCommitLog
:提交日志存储目录
1 | brokerClusterName=DefaultCluster |
配置要点说明
- 四台服务器的集群名
brokerClusterName
相同。集群名称相同的服务器共同组成服务集群 。 - 从服务器通过名字与主服务器关联在一起,
brokerName
与主服务器相同。 brokerId
为0是主服务器。从服务器的值是非零值,例如如果有四个从服务器,他们的brokerId
应该是 1,2,3,4。brokerRole
的值为SYNC_MASTER
是同步复制的主服务器。如果是ASYNC_MASTER
则为异步复制的主服务器。
- 同步复制:消息复制到从服务器后才向生产者发回反馈信息。
- 异步复制:消息发到主服务器就向生产者发回反馈信息,之后再向从服务器复制。
启动
- 启动两个 name server
在两台服务器上启动两个 name server,它们不用做任何集群的配置,都是作为独立服务运行,它们之间也不会进行数据复制。
所有broker服务启动后,要同时连接这两个 name server,向两个 name server 进行注册。
在两台服务器上都启动 name server:
1 | nohup sh mqnamesrv & |
- 启动 broker a 的主从两台服务器
在服务器1上启动 broker a 主服务器:
参数说明:
- -n参数:指定name server地址列表,多个地址用分号分隔
- -c参数:指定配置文件,使用指定的配置文件启动 broker
1 | nohup sh mqbroker |
在服务器2上启动 broker a 从服务器:
1 | nohup sh mqbroker |
- 启动 broker b 的主从两台服务器
在服务器2上启动 broker b 主服务器:
1 | nohup sh mqbroker |
在服务器1上启动 broker b 从服务器:
1 | nohup sh mqbroker |
检查启动的服务
在两台服务器上分别查看java进程,确认两台服务器上是否各启动了三个java进程,分别运行name server和两个broker。
1 | 查看 java 进程 |
启动管理界面
1 | 进入 rocketmq-console 项目打包文件目录 |
查看集群状态:
RocketMQ基本原理
Topic 基本原理
在Rocketmq集群中新建 Topic1
在管理界面中新建主题Topic1
,为了方便观察测试效果,这里把写队列和读队列的数量都设置成3。
这样,在 broker-a 和 broker-b 上都创建了 Topic1 主题,并各创建了3写3读队列,共6写6读,如下图所示:
你也可以修改Topic1分别配置 broker-a 和 borker-b 上的队列数量。
perm 参数的含义
perm
参数是设置队列的读写权限,下面表格列出了可配置的值及其含义:
取值 | 含义 |
---|---|
6 | 同时开启读写 |
4 | 禁写 |
2 | 禁读 |
Topic 收发消息原理
生产者将消息发送到 Topic1 的其中一个写队列,消费者从对应的一个读队列接收消息。
生产者的负载均衡
生产者以轮询的方式向所有写队列发送消息,这些队列可能会分布在多个broker实例上。
消费者的负载均衡
一个 group 中的多个消费者,可以以负载均衡的方式来接收消息。
读取队列
被均匀分配给这些消费者,它们从指定的队列来接收消息。队列的分配可以采用不同的策略,这里简略介绍以下三种策略:
AllocateMessageQueueAveragely 平均分配
这是默认策略,它是这样分配队列的:
AllocateMessageQueueAveragelyByCircle 环形分配
如果使用环形分配,在消费者的代码中需要设置分配策略,代码如下:
1 | consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueAveragelyByCircle()); |
这种分配策略的逻辑很简单,所有0号队列分给0号消费者,所有1号队列分给1号消费者,以此类推。
AllocateMessageQueueConsistentHash 一致性哈希
如果使用一致性哈希算法进行分配,在消费者的代码中需要设置分配策略,代码如下:
1 | consumer.setAllocateMessageQueueStrategy(new AllocateMessageQueueConsistentHash()); |
这种算法依靠一致性哈希算法,看当前消费者可以落到哪个虚拟节点,该虚拟节点对应哪个队列。
问题
思考一下,如果写队列比读队列多会怎样?反之会怎样?
NameServer 基本原理
NameServer 是 rocketmq 自己开发的一个轻型注册中心,他的作用相当于是 zk、eureka等。
rocketmq 为什么不使用 zk 呢?实际上 rocketmq 的早期版本使用的就是 zookeeper。
而 rocketmq 的架构设计决定了只需要一个轻量级的元数据服务器就足够了。杀鸡焉用牛刀?小区里,搞个货架就行了,建个仓库,又占地方,维护成本又高。
甚至,NameServer 都不需要有一个集群的管理者。以至于,NameServer 看起来都不像一个集群。事实上,NameServer 本质上来看,也不是一个集群。因为它的各个节点是独立的,不相关的。每个 NameServer 都是独立和 Producer、Consumer打交道。
基本认识
- NameServer主要用于存储Topic,Broker关系信息,功能简单,稳定性高。
- 各个NameServer节点之间不相关,不需要通信,单台宕机不影响其它节点。
- NameServer集群整体宕机不影响已建立关系的Concumer,Producer,Broker。
Broker、Producer、Consumer 与NameServer的通信
- 每个Borker和所有NameServer保持长连接,心跳间隔为30秒。每次心跳时还会携带当前的Topic信息。当某个Broker两分钟之内没有心跳,则认为该Broker下线,并调整内存中与该Broker相关的Topic信息。
- Consumer 从 NameServer 获得 Topic 的路由信息,与对应的 Broker 建立长连接。间隔30秒发送心跳至Broker。Broker检查若发现某 Consumer 两分钟内无心跳则认为该Consumer下线,并通知该Consumer所有的消费者集群中的其他实例,触发该消费者集群重新负载均衡。
- Producer 与消费者一样,也是从 NameServer 获得 Topic 的路由信息,与对应的 Broker 建立长连接,30秒发送一次心跳。Broker 也会认为两分钟内没有心跳的 Producer 下线。