一、基本概念
1.Message Queue
MQ全称(Message Queue)又名消息队列,是一种异步通讯的中间件。
- 可以将它理解成邮局,发送者将消息传递到邮局,然后由邮局帮我们发送给具体的消息接收者(消费者),具体发送过程与时间我们无需关心,它也不会干扰我进行其它事情。
- 常见的MQ有kafka、activemq、zeromq、rabbitmq 等等,各大MQ的对比和优劣势可以自行Google
2.RabbitMQ
RabbitMQ是一个遵循AMQP协议,由面向高并发的erlanng
语言开发而成,用在实时的对可靠性要求比较高的消息传递上,支持多种语言客户端。支持延迟队列(这是一个非常有用的功能)
….
2.1 基础概念
- Broker:简单来说就是消息队列服务器实体
- Exchange:消息交换机,它指定消息按什么规则,路由到哪个队列
- Queue:消息队列载体,每个消息都会被投入到一个或多个队列
- Binding:绑定,它的作用就是把
exchange
和queue
按照路由规则绑定起来
- Routing Key:路由关键字,
exchange
根据这个关键字进行消息投递
- vhost:虚拟主机,一个
broker
里可以开设多个vhost
,用作不同用户的权限分离
- producer:消息生产者,就是投递消息的程序
- consumer:消息消费者,就是接受消息的程序
- channel:消息通道,在客户端的每个连接里,可建立多个
channel
,每个channel
代表一个会话任务
2.2 常见应用场景
- 邮箱发送:用户注册后投递消息到
rabbitmq
中,由消息的消费方异步的发送邮件,提升系统响应速度
- 流量削峰:一般在秒杀活动中应用广泛,秒杀会因为流量过大,导致应用挂掉,为了解决这个问题,一般在应用前端加入消息队列。用于控制活动人数,将超过此一定阀值的订单直接丢弃。缓解短时间的高流量压垮应用。
- 订单超时:利用
rabbitmq
的延迟队列,可以很简单的实现订单超时的功能,比如用户在下单后30分钟未支付取消订单
- 还有更多应用场景就不一一列举了…..
二、准备RabbitMQ
准备条件:在SpringBoot整合RabbitMq之前需要确保已经正确安装和运行RabbitMQ。
三、SpringBoot 整合 RabbitMQ
Spring Boot 整合 RabbitMQ 是非常容易,只需要两个步骤:
1.创建子模块
这里我们创建一个子模块
1 2
| group = 'com.ray.study' artifact ='spring-boot-08-messaging-rabbitmq'
|
2.引入依赖
2.1 继承父工程依赖
在父工程spring-boot-seeds
的 settings.gradle
加入子工程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
| rootProject.name = 'spring-boot-seeds' include 'spring-boot-01-helloworld' include 'spring-boot-02-restful-test' include 'spring-boot-03-thymeleaf' include 'spring-boot-04-swagger2' include 'spring-boot-05-jpa' include 'spring-boot-05-mybatis' include 'spring-boot-05-tk-mybatis' include 'spring-boot-06-nosql-redis' include 'spring-boot-06-nosql-mongodb' include 'spring-boot-07-cache-concurrentmap' include 'spring-boot-07-cache-ehcache' include 'spring-boot-07-cache-caffeine' include 'spring-boot-07-cache-redis' include 'spring-boot-08-messaging-rabbitmq'
|
这样,子工程spring-boot-08-messaging-rabbitmq
就会自动继承父工程中subprojects
`函数里声明的依赖,主要包含如下依赖:
1 2 3 4 5
| implementation 'org.springframework.boot:spring-boot-starter-web' testImplementation 'org.springframework.boot:spring-boot-starter-test'
compileOnly 'org.projectlombok:lombok' annotationProcessor 'org.projectlombok:lombok'
|
2.2 引入RabbitMQ
依赖
将子模块spring-boot-08-messaging-rabbitmq
的build.gradle
修改为如下内容:
1 2 3 4
| dependencies { implementation 'org.springframework.boot:spring-boot-starter-amqp' }
|
4. 配置RabbitMQ
4.1 修改application.yml
1 2 3 4 5 6 7 8 9 10 11
| spring: rabbitmq: publisher-confirms: true
rabbitmq: queue: msg: spingboot-queue-msg user: spingboot-queue-user
|
关于默认配置和可选配置可参考spring-boot-autoconfigure
jar包中spring-configuration-metadata.json
,这里选取部分默认配置:
1 2 3 4 5
| spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.publisher-confirms= false
|
4.2 配置RabbitMQConfig
这里我们只是简单的注册两条队列
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| package com.ray.study.springboot08messagingrabbitmq.config;
import org.springframework.amqp.core.Queue; import org.springframework.beans.factory.annotation.Value; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMQConfig {
@Value("${rabbitmq.queue.msg}") private String msgQueueName;
@Value("${rabbitmq.queue.user}") private String userQueueName; @Bean public Queue defaultBookQueue() { return new Queue(msgQueueName, true); }
@Bean public Queue manualBookQueue() { return new Queue(userQueueName, true); } }
|
5.消息生产者
5.1 entity
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
| package com.ray.study.springboot08messagingrabbitmq.entity;
import lombok.AllArgsConstructor; import lombok.Data; import lombok.NoArgsConstructor;
import java.io.Serializable; import java.util.Date;
@Data @NoArgsConstructor @AllArgsConstructor public class User implements Serializable {
private static final long serialVersionUID = -2164058270260403154L;
private Long id;
private String name;
private Integer age;
private Date creationDate;
private Date lastUpdateDate;
}
|
5.2 service
(1)RabbitMsgService
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
| package com.ray.study.springboot08messagingrabbitmq.service;
import com.ray.study.springboot08messagingrabbitmq.entity.User;
public interface RabbitMsgService {
void sendMsg(String msg);
void sendUser(User user);
}
|
(2)RabbitMsgServiceImpl
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
| package com.ray.study.springboot08messagingrabbitmq.service.impl;
import com.ray.study.springboot08messagingrabbitmq.service.RabbitMsgService; import com.ray.study.springboot08messagingrabbitmq.entity.User; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.beans.factory.annotation.Value; import org.springframework.stereotype.Service;
@Slf4j @Service public class RabbitMsgServiceImpl implements RabbitTemplate.ConfirmCallback, RabbitMsgService {
@Value("${rabbitmq.queue.msg}") private String msgRouting;
@Value("${rabbitmq.queue.user}") private String userRouting;
@Autowired private RabbitTemplate rabbitTemplate;
@Override public void sendMsg(String msg) { log.info("发送消息:{}", msg); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend(msgRouting, msg); }
@Override public void sendUser(User user) { log.info("发送用户消息:{}", user); rabbitTemplate.setConfirmCallback(this); rabbitTemplate.convertAndSend(userRouting, user); }
@Override public void confirm(CorrelationData correlationData, boolean ack, String cause) { if(ack){ log.info("消息消费成功"); }else{ log.info("消息消费失败:{}"+cause); } }
}
|
6.消息消费者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| package com.ray.study.springboot08messagingrabbitmq.receiver;
import com.rabbitmq.client.Channel; import com.ray.study.springboot08messagingrabbitmq.entity.User; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class RabbitMsgReceiver {
@RabbitListener(queues = {"${rabbitmq.queue.msg}"}) public void reveiveMsg(String msg){ log.info("收到消息:{}",msg); }
@RabbitListener(queues = {"${rabbitmq.queue.user}"}) public void reveiveUser(User user, Message message, Channel channel){ log.info("收到用户消息:{}",user); }
}
|
7.单元测试
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| package com.ray.study.springboot08messagingrabbitmq.service;
import com.ray.study.springboot08messagingrabbitmq.entity.User; import org.junit.Test; import org.junit.runner.RunWith; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.test.context.SpringBootTest; import org.springframework.test.context.junit4.SpringRunner;
@RunWith(SpringRunner.class) @SpringBootTest public class RabbitMsgServiceTest {
@Autowired RabbitMsgService rabbitMsgService;
@Test public void testRabbit() { rabbitMsgService.sendMsg("冷风如刀,以大地为砧板;万里飞雪,将苍穹作烘炉");
User user = new User(); user.setId(1L); user.setName("阿飞"); user.setName("18");
rabbitMsgService.sendUser(user); }
}
|