消息队列MQ
初识MQ
同步调用

同步调用有3个问题:
- 拓展性差,每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动
- 性能下降,每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和
- 级联失败,当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。
异步调用
技术选型
RabbitMQ
部署
mq: #消息队列
image: rabbitmq:3.8-management
container_name: mq
restart: unless-stopped
hostname: mq
environment:
TZ: "Asia/Shanghai"
RABBITMQ_DEFAULT_USER: admin
RABBITMQ_DEFAULT_PASS: "admin"
ports:
- "15672:15672"
- "5672:5672"
volumes:
- mq-plugins:/plugins
# 持久化数据卷,保存用户/队列/交换机等元数据
- ./mq-data:/var/lib/rabbitmq
networks:
- hmall-net
volumes:
mq-plugins:
http://localhost:15672/ 访问控制台
架构图
publisher
:生产者,发送消息的一方consumer
:消费者,消费消息的一方queue
:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理exchange
:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。不存储virtual host
:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue(每个项目+环境有各自的vhost)
一个队列最多指定给一个消费者!
Spring AMQP
快速开始
交换机和队列都是直接在控制台创建,消息的发送和接收在Java应用中实现!
简单案例:直接向队列发送消息,不经过交换机
引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
配置MQ地址,在publisher
和consumer
服务的application.yml
中添加配置:
spring:
rabbitmq:
host: localhost # 你的虚拟机IP
port: 5672 # 端口
virtual-host: /hmall # 虚拟主机
username: hmall # 用户名
password: 123 # 密码
消息发送:
然后在publisher
服务中编写测试类SpringAmqpTest
,并利用**RabbitTemplate
**实现消息发送:
@SpringBootTest
public class SpringAmqpTest {
@Autowired
private RabbitTemplate rabbitTemplate;
@Test
public void testSimpleQueue() {
// 队列名称
String queueName = "simple.queue";
// 消息
String message = "hello, spring amqp!";
// 发送消息
rabbitTemplate.convertAndSend(queueName, message);
}
}
消息接收
@Component
public class SpringRabbitListener {
// 利用RabbitListener来声明要监听的队列信息
// 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
// 可以看到方法体中接收的就是消息体的内容
@RabbitListener(queues = "simple.queue")
public void listenSimpleQueueMessage(String msg) throws InterruptedException {
System.out.println("spring 消费者接收到消息:【" + msg + "】");
}
}
然后启动启动类,它能自动从队列中取出消息。取出后队列中就没消息了!
交换机
1)fanout:广播给每个绑定的队列
发送消息:
convertAndSend
如果2个参数,第一个表示队列名,第二个表示消息;如果3个参数,第一个表示交换机,第二个表示RoutingKey
,第三个表示消息。
@Test
public void testFanoutExchange() {
// 交换机名称
String exchangeName = "hmall.fanout";
// 消息
String message = "hello, everyone!";
rabbitTemplate.convertAndSend(exchangeName, "", message);
}
2)Direct交换机
- 队列与交换机的绑定,不能是任意绑定了,而是要指定一个
RoutingKey
(路由key) - 消息的发送方在 向 Exchange发送消息时,也必须指定消息的
RoutingKey
。 - Exchange不再把消息交给每一个绑定的队列,而是根据消息的
Routing Key
进行判断,只有队列的Routingkey
与消息的Routing key
完全一致,才会接收到消息
注意,RoutingKey不等于队列名称
3)Topic交换机
Topic
类型的Exchange
与Direct
相比,都是可以根据RoutingKey
把消息路由到不同的队列。
只不过Topic
类型Exchange
可以让队列在绑定BindingKey
的时候使用通配符!
BindingKey一般都是有一个或多个单词组成,多个单词之间以.
分割
通配符规则:
#
:匹配一个或多个词*
:匹配不多不少恰好1个词
举例:
item.#
:能够匹配item.spu.insert
或者item.spu
item.*
:只能匹配item.spu
基于注解声明交换机、队列
以往我们都在 RabbitMQ 管理控制台手动创建队列和交换机,开发人员还得把所有配置整理一遍交给运维,既繁琐又容易出错。更好的做法是在应用启动时自动检测所需的队列和交换机,若不存在则直接创建。
基于注解方式来声明
type
默认交换机类型为ExchangeTypes.DIRECT
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}
检查队列
- 如果 RabbitMQ 中已经有名为
direct.queue1
的队列,就不会重复创建; - 如果不存在,
RabbitAdmin
会自动帮你创建一个。
检查交换机
- 同理,会查看有没有名为
hmall.direct
、类型为direct
的交换机,若不存在就新建。
检查绑定
- 最后再去声明绑定关系:把
direct.queue1
绑定到hmall.direct
,并且 routing-key 为"red"
和"blue"
。 - 如果已有相同的绑定(队列、交换机、路由键都一致),也不会再重复创建。
消息转换器
使用JSON方式来做序列化和反序列化,替换掉默认方式。
更小或可压缩的消息体、易读、易调试
1)引入依赖
<dependency>
<groupId>com.fasterxml.jackson.dataformat</groupId>
<artifactId>jackson-dataformat-xml</artifactId>
<version>2.9.10</version>
</dependency>
2)配置消息转换器,在publisher
和consumer
两个服务的启动类中添加一个Bean即可:
@Bean
public MessageConverter messageConverter(){
// 1.定义消息转换器
Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
// 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
jackson2JsonMessageConverter.setCreateMessageIds(true);
return jackson2JsonMessageConverter;
}
MQ高级
我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:
- 确保生产者一定把消息发送到MQ
- 确保MQ不会将消息弄丢
- 确保消费者一定要处理消息
发送者的可靠性
发送者重试
修改发送者模块的application.yaml
文件,添加下面的内容:
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数
- 阻塞重试,一般不建议开启。
发送者确认
- 当消息投递到MQ,但是路由失败(没有队列绑定交换机、或者你routingKey设置错误等)时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
- 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
- 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
- 其它情况都会返回NACK,告知投递失败
1.在发送者模块的application.yaml
中添加配置:
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
publisher-returns: true # 开启publisher return机制
none
:关闭confirm机制simple
:同步阻塞等待MQ的回执correlated
:MQ异步回调返回回执
2.每个RabbitTemplate
只能配置一个ReturnCallback
,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:
@Slf4j
@RequiredArgsConstructor
@Configuration
public class MqConfig {
private final RabbitTemplate rabbitTemplate;
@PostConstruct
public void init(){
rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
@Override
public void returnedMessage(ReturnedMessage returned) {
log.error("触发return callback,");
log.debug("exchange: {}", returned.getExchange());
log.debug("routingKey: {}", returned.getRoutingKey());
log.debug("message: {}", returned.getMessage());
log.debug("replyCode: {}", returned.getReplyCode());
log.debug("replyText: {}", returned.getReplyText());
}
});
}
}
3.定义ConfirmCallback
由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:
这里的CorrelationData中包含两个核心的东西:
id
:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆SettableListenableFuture
:回执结果的Future对象
@Test
void testPublisherConfirm() {
// 1.创建CorrelationData
CorrelationData cd = new CorrelationData();
// 2.给Future添加ConfirmCallback
cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
@Override
public void onFailure(Throwable ex) {
// 2.1.Future发生异常时的处理逻辑,基本不会触发
log.error("send message fail", ex);
}
@Override
public void onSuccess(CorrelationData.Confirm result) {
// 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
log.debug("发送消息成功,收到 ack!");
}else{ // result.getReason(),String类型,返回nack时的异常描述
log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
}
}
});
// 3.发送消息
rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}
端到端投递保障
- ConfirmCallback 只告诉你:消息“到”了 RabbitMQ 服务器吗?(ACK:到;NACK:没到)
- ReturnsCallback 只告诉你:到达服务器的消息,能“进”队列吗?(能进就不回;进不了就退)
两者都成功,才能确认:“这条消息真的安全地进了队列,等着消费者去拿。”
- 开启生产者确认比较消耗MQ性能,一般不建议开启。
MQ的可靠性
数据持久化
为了保证数据的可靠性,必须配置数据持久化(从内存保存到磁盘上),包括:
- 交换机持久化(选Durable)
- 队列持久化(选Durable)
- 消息持久化(选Persistent)
控制台方式:
这样重启后还能恢复。
代码方式,默认都是持久化的,不用变动。
消费者可靠性
消费者确认机制
当消费者处理消息结束后,向RabbitMQ返回自己的处理状态,MQ做出相应反应...
上述的NACK状态时,MQ会不断向消费者重投消息,直至被正确处理!!!
在消费者方,通过下面的配置可以修改消费者收到消息后的ACK处理方式:
none,收到消息就直接返回ack;
manual,手动实现ack;
auto,自动档,业务异常返回nack, 消息处理异常返回reject,其他ack (默认也是这个模式)
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: auto
消费者重试
- 类似发送者的重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
- 重试达到最大次数后,、会返回reject,消息会被丢弃
修改consumer服务的application.yml文件,添加内容:
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启消费者失败重试
initial-interval: 1000ms # 初识的失败等待时长为1秒
multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
max-attempts: 3 # 最大重试次数
stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false
Stateless(无状态重试):所有的重试都在同一个事务上下文里进行,直到重试次数用尽后才会把异常抛回容器并最终回滚或提交事务。
Stateful(有状态重试):每次重试都会被当成一次独立的消息交付,Spring 会为每次尝试开启新的事务,失败时立即回滚并重新投递,下次重试又是一个干净的事务环境。这对事务性操作(@Transactional)来说,能保证“第 N 次重试失败就回滚第 N 次的事务”,避免把所有尝试都裹在一笔大事务里
失败处理策略
前面默认的达到最大重试次数后,消息会被丢弃,对于消息可靠性要求较高的业务场景下,显然不太合适了。
因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery
接口来定义的,它有3个不同实现:
RejectAndDontRequeueRecoverer
:重试耗尽后,直接reject
,丢弃消息。默认就是这种方式ImmediateRequeueMessageRecoverer
:重试耗尽后,返回nack
,消息重新入队RepublishMessageRecoverer
:重试耗尽后,将失败消息投递到指定的交换机
比较优雅的一种处理方案是RepublishMessageRecoverer
,失败后将消息投递到一个指定的,专门存放异常消息的交换机->队列,后续由人工集中处理。
业务幂等性
在程序开发中,幂等则是指同一个业务,执行一次或多次对业务状态的影响是一致的。如:
- 根据id删除数据
- 查询数据
- 新增数据
但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:
- 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
- 退款业务。重复退款对商家而言会有经济损失。
所以,我们要尽可能避免业务被重复执行:MQ消息的重复投递、页面卡顿时频繁刷新导致表单重复提交、服务间调用的重试
法一:唯一ID
- 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
- 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
- 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。
法一存在业务侵入,因为mq的消息ID与业务无关,现在却多了一张专门记录 ID 的表或结构
法二:业务判断,基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。
综上,支付服务与交易服务之间的订单状态一致性是如何保证的?
- 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
- 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
- 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。
延迟消息
对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。
例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。
延迟消息插件
1.下载
GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ
2.上传插件,由于之前docker部署MQ挂载了数据卷
docker volume ls #查看所有数据卷
docker volume inspect hmall_all_mq-plugins #获取数据卷的目录
#"Mountpoint": "/var/lib/docker/volumes/hmall_all_mq-plugins/_data"
我们上传插件到该目录下。
3.安装插件
docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange
声明延迟交换机
额外指定参数 delayed = "true"
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "delay.queue", durable = "true"),
exchange = @Exchange(name = "delay.direct", delayed = "true"),
key = "delay"
))
public void listenDelayMessage(String msg){
log.info("接收到delay.queue的延迟消息:{}", msg);
}
发送延迟消息
@Test
void testPublisherDelayMessage() {
// 1.创建消息
String message = "hello, delayed message";
// 2.发送消息,利用消息后置处理器添加消息头
rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 添加延迟消息属性
message.getMessageProperties().setDelay(5000);
return message;
}
});
}
超时订单问题
三种可能性:1.如果用户支付成功,但是消息通知失败(未传递给订单服务),那么会导致数据不一致情况,这时延迟消息到达后,它会先看本地订单状态,发现处于'待支付'状态,此时不确定是否是真的未支付,还是消息通知失败,需要再openfeign调用支付服务,查询支付流水状态,发现支付成功,那么就更新本地订单状态为'已支付'。
2.如果用户支付成功且消息通知成功,那么订单服务会更新订单状态为'已支付',延迟消息到达时查询本地订单状态,确实'已支付',直接return。
3.用户确实到时间了扔未支付,此时本地订单状态和远程的支付流水状态都是'待支付',此时取消订单、恢复库存。