消息队列MQ

消息队列MQ

zy123
2025-05-28 /  0 评论 /  0 点赞 /  1 阅读 /  4058 字
最近更新于 06-07

消息队列MQ

初识MQ

同步调用

image-20250527173401081

同步调用有3个问题:

  • 拓展性差,每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动
  • 性能下降,每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和
  • 级联失败,当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

异步调用

image-20250527175753038

技术选型

image-20250527190824767

RabbitMQ

部署

mq:  #消息队列
    image: rabbitmq:3.8-management
    container_name: mq
    restart: unless-stopped
    hostname: mq
    environment:
      TZ: "Asia/Shanghai"
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: "admin"
    ports:
      - "15672:15672"
      - "5672:5672"
    volumes:
      - mq-plugins:/plugins
      # 持久化数据卷,保存用户/队列/交换机等元数据
      - ./mq-data:/var/lib/rabbitmq
    networks:
      - hmall-net
volumes:
  mq-plugins:

http://localhost:15672/ 访问控制台

架构图

image-20250527200935901

  • publisher:生产者,发送消息的一方
  • consumer:消费者,消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。不存储
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue(每个项目+环境有各自的vhost)

一个队列最多指定给一个消费者!

Spring AMQP

快速开始

交换机和队列都是直接在控制台创建,消息的发送和接收在Java应用中实现!

简单案例:直接向队列发送消息,不经过交换机

image-20250528120304174

引入依赖

<!--AMQP依赖,包含RabbitMQ-->
 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置MQ地址,在publisherconsumer服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: localhost # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

消息发送:

然后在publisher服务中编写测试类SpringAmqpTest,并利用**RabbitTemplate**实现消息发送:

@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

消息接收

@Component
public class SpringRabbitListener {
     // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

然后启动启动类,它能自动从队列中取出消息。取出后队列中就没消息了!

交换机

1)fanout:广播给每个绑定的队列

image-20250528133703660

image-20250528132709273

发送消息:

convertAndSend如果2个参数,第一个表示队列名,第二个表示消息;如果3个参数,第一个表示交换机,第二个表示RoutingKey,第三个表示消息。

@Test
public void testFanoutExchange() {
     // 交换机名称
     String exchangeName = "hmall.fanout";
     // 消息
     String message = "hello, everyone!";
     rabbitTemplate.convertAndSend(exchangeName, "", message);
}

2)Direct交换机

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

注意,RoutingKey不等于队列名称

image-20250528141029943

3)Topic交换机

Topic类型的ExchangeDirect相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型Exchange可以让队列在绑定BindingKey 的时候使用通配符

BindingKey一般都是有一个或多个单词组成,多个单词之间以.分割

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

基于注解声明交换机、队列

以往我们都在 RabbitMQ 管理控制台手动创建队列和交换机,开发人员还得把所有配置整理一遍交给运维,既繁琐又容易出错。更好的做法是在应用启动时自动检测所需的队列和交换机,若不存在则直接创建。

基于注解方式来声明

type 默认交换机类型为ExchangeTypes.DIRECT

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

检查队列

  • 如果 RabbitMQ 中已经有名为 direct.queue1 的队列,就不会重复创建;
  • 如果不存在,RabbitAdmin 会自动帮你创建一个。

检查交换机

  • 同理,会查看有没有名为 hmall.direct、类型为 direct 的交换机,若不存在就新建。

检查绑定

  • 最后再去声明绑定关系:把 direct.queue1 绑定到 hmall.direct,并且 routing-key 为 "red""blue"
  • 如果已有相同的绑定(队列、交换机、路由键都一致),也不会再重复创建。

消息转换器

使用JSON方式来做序列化和反序列化,替换掉默认方式。

更小或可压缩的消息体、易读、易调试

1)引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

2)配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

MQ高级

我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

发送者的可靠性

发送者重试

修改发送者模块的application.yaml文件,添加下面的内容:

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数
  • 阻塞重试,一般不建议开启。

发送者确认

image-20250529170017117

  • 当消息投递到MQ,但是路由失败(没有队列绑定交换机、或者你routingKey设置错误等)时,通过Publisher Return返回异常信息,同时返回ack的确认信息,代表投递成功
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功
  • 持久消息投递到了MQ,并且入队完成持久化,返回ACK ,告知投递成功
  • 其它情况都会返回NACK,告知投递失败

1.在发送者模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启publisher confirm机制,并设置confirm类型
    publisher-returns: true # 开启publisher return机制
  • none:关闭confirm机制
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

2.每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。我们在publisher模块定义一个配置类:

@Slf4j
@RequiredArgsConstructor
@Configuration
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init(){
        rabbitTemplate.setReturnsCallback(new RabbitTemplate.ReturnsCallback() {
            @Override
            public void returnedMessage(ReturnedMessage returned) {
                log.error("触发return callback,");
                log.debug("exchange: {}", returned.getExchange());
                log.debug("routingKey: {}", returned.getRoutingKey());
                log.debug("message: {}", returned.getMessage());
                log.debug("replyCode: {}", returned.getReplyCode());
                log.debug("replyText: {}", returned.getReplyText());
            }
        });
    }
}

3.定义ConfirmCallback

由于每个消息发送时的处理逻辑不一定相同,因此ConfirmCallback需要在每次发消息时定义。具体来说,是在调用RabbitTemplate中的convertAndSend方法时,多传递一个参数:

image-20250529180404355

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象
@Test
void testPublisherConfirm() {
    // 1.创建CorrelationData
    CorrelationData cd = new CorrelationData();
    // 2.给Future添加ConfirmCallback
    cd.getFuture().addCallback(new ListenableFutureCallback<CorrelationData.Confirm>() {
        @Override
        public void onFailure(Throwable ex) {
            // 2.1.Future发生异常时的处理逻辑,基本不会触发
            log.error("send message fail", ex);
        }
        @Override
        public void onSuccess(CorrelationData.Confirm result) {
            // 2.2.Future接收到回执的处理逻辑,参数中的result就是回执内容
            if(result.isAck()){ // result.isAck(),boolean类型,true代表ack回执,false 代表 nack回执
                log.debug("发送消息成功,收到 ack!");
            }else{ // result.getReason(),String类型,返回nack时的异常描述
                log.error("发送消息失败,收到 nack, reason : {}", result.getReason());
            }
        }
    });
    // 3.发送消息
    rabbitTemplate.convertAndSend("hmall.direct", "q", "hello", cd);
}

端到端投递保障

  • ConfirmCallback 只告诉你:消息“到”了 RabbitMQ 服务器吗?(ACK:到;NACK:没到)
  • ReturnsCallback 只告诉你:到达服务器的消息,能“进”队列吗?(能进就不回;进不了就退)

两者都成功,才能确认:“这条消息真的安全地进了队列,等着消费者去拿。”

  • 开启生产者确认比较消耗MQ性能,一般不建议开启。

MQ的可靠性

数据持久化

为了保证数据的可靠性,必须配置数据持久化(从内存保存到磁盘上),包括:

  • 交换机持久化(选Durable)
  • 队列持久化(选Durable)
  • 消息持久化(选Persistent)

控制台方式:

image-20250530154302987

image-20250530154321546

这样重启后还能恢复。

代码方式,默认都是持久化的,不用变动。

消费者可靠性

消费者确认机制

当消费者处理消息结束后,向RabbitMQ返回自己的处理状态,MQ做出相应反应...

image-20250530160814244

上述的NACK状态时,MQ会不断向消费者重投消息,直至被正确处理!!!

在消费者方,通过下面的配置可以修改消费者收到消息后的ACK处理方式:

none,收到消息就直接返回ack;

manual,手动实现ack;

auto,自动档,业务异常返回nack, 消息处理异常返回reject,其他ack (默认也是这个模式)

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

消费者重试

  • 类似发送者的重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
  • 重试达到最大次数后,、会返回reject,消息会被丢弃

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态;false有状态。如果业务中包含事务,这里改为false

Stateless(无状态重试):所有的重试都在同一个事务上下文里进行,直到重试次数用尽后才会把异常抛回容器并最终回滚或提交事务。

Stateful(有状态重试):每次重试都会被当成一次独立的消息交付,Spring 会为每次尝试开启新的事务,失败时立即回滚并重新投递,下次重试又是一个干净的事务环境。这对事务性操作(@Transactional)来说,能保证“第 N 次重试失败就回滚第 N 次的事务”,避免把所有尝试都裹在一笔大事务里

失败处理策略

前面默认的达到最大重试次数后,消息会被丢弃,对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此Spring允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

比较优雅的一种处理方案是RepublishMessageRecoverer,失败后将消息投递到一个指定的,专门存放异常消息的交换机->队列,后续由人工集中处理。

业务幂等性

在程序开发中,幂等则是指同一个业务,执行一次或多次对业务状态的影响是一致的。如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行:MQ消息的重复投递、页面卡顿时频繁刷新导致表单重复提交、服务间调用的重试

法一:唯一ID

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

法一存在业务侵入,因为mq的消息ID与业务无关,现在却多了一张专门记录 ID 的表或结构

法二:业务判断,基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。

image-20250603151010579

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。

例如,订单支付超时时间为30分钟,则我们应该在用户下单后的第30分钟检查订单支付状态,如果发现未支付,应该立刻取消订单,释放库存。

image-20250603154136058

延迟消息插件

1.下载

GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

2.上传插件,由于之前docker部署MQ挂载了数据卷

docker volume ls   #查看所有数据卷

docker volume inspect hmall_all_mq-plugins  #获取数据卷的目录

#"Mountpoint": "/var/lib/docker/volumes/hmall_all_mq-plugins/_data"

我们上传插件到该目录下。

3.安装插件

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

image-20250603163744968

声明延迟交换机

额外指定参数 delayed = "true"

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

发送延迟消息

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

超时订单问题

image-20250603171922542

三种可能性:1.如果用户支付成功,但是消息通知失败(未传递给订单服务),那么会导致数据不一致情况,这时延迟消息到达后,它会先看本地订单状态,发现处于'待支付'状态,此时不确定是否是真的未支付,还是消息通知失败,需要再openfeign调用支付服务,查询支付流水状态,发现支付成功,那么就更新本地订单状态为'已支付'。

2.如果用户支付成功且消息通知成功,那么订单服务会更新订单状态为'已支付',延迟消息到达时查询本地订单状态,确实'已支付',直接return。

3.用户确实到时间了扔未支付,此时本地订单状态和远程的支付流水状态都是'待支付',此时取消订单、恢复库存。

© 版权声明
THE END
喜欢就支持一下吧
点赞 0 分享 收藏
评论 抢沙发
取消