消息队列MQ

zy123
2025-05-28 /  0 评论 /  0 点赞 /  18 阅读 /  6472 字
最近更新于 09-08

消息队列MQ

初识MQ

同步调用

image-20250527173401081

同步调用有3个问题:

  • 拓展性差,每次有新的需求,现有支付逻辑都要跟着变化,代码经常变动
  • 性能下降,每次远程调用,调用者都是阻塞等待状态。最终整个业务的响应时长就是每次远程调用的执行时长之和
  • 级联失败,当交易服务、通知服务出现故障时,整个事务都会回滚,交易失败。

异步调用

image-20250527175753038

技术选型

image-20250527190824767

RabbitMQ

部署

mq:  #消息队列
    image: rabbitmq:3.8-management
    container_name: mq
    restart: unless-stopped
    hostname: mq
    environment:
      TZ: "Asia/Shanghai"
      RABBITMQ_DEFAULT_USER: admin
      RABBITMQ_DEFAULT_PASS: "admin"
    ports:
      - "15672:15672"
      - "5672:5672"
    volumes:
      - mq-plugins:/plugins
      # 持久化数据卷,保存用户/队列/交换机等元数据
      - ./mq-data:/var/lib/rabbitmq
    networks:
      - hmall-net
volumes:
  mq-plugins:

http://localhost:15672/ 访问控制台

架构图

image-20250527200935901

  • publisher:生产者,发送消息的一方
  • consumer:消费者,消费消息的一方
  • queue:队列,存储消息。生产者投递的消息会暂存在消息队列中,等待消费者处理
  • exchange:交换机,负责消息路由。生产者发送的消息由交换机决定投递到哪个队列。不存储
  • virtual host:虚拟主机,起到数据隔离的作用。每个虚拟主机相互独立,有各自的exchange、queue(每个项目+环境有各自的vhost)

一个队列最多指定给一个消费者!

Spring AMQP

快速开始

交换机和队列都是直接在控制台创建,消息的发送和接收在Java应用中实现!

简单案例:直接向队列发送消息,不经过交换机

image-20250528120304174

引入依赖

<!--AMQP依赖,包含RabbitMQ-->
 <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

配置MQ地址,在publisherconsumer服务的application.yml中添加配置:

spring:
  rabbitmq:
    host: localhost # 你的虚拟机IP
    port: 5672 # 端口
    virtual-host: /hmall # 虚拟主机
    username: hmall # 用户名
    password: 123 # 密码

消息发送:

然后在publisher服务中编写测试类SpringAmqpTest,并利用**RabbitTemplate**实现消息发送:

@SpringBootTest
public class SpringAmqpTest {

    @Autowired
    private RabbitTemplate rabbitTemplate;

    @Test
    public void testSimpleQueue() {
        // 队列名称
        String queueName = "simple.queue";
        // 消息
        String message = "hello, spring amqp!";
        // 发送消息
        rabbitTemplate.convertAndSend(queueName, message);
    }
}

convertAndSend如果 2 个参数,第一个表示队列名,第二个表示消息;

消息接收

@Component
public class SpringRabbitListener {
     // 利用RabbitListener来声明要监听的队列信息
    // 将来一旦监听的队列中有了消息,就会推送给当前服务,调用当前方法,处理消息。
    // 可以看到方法体中接收的就是消息体的内容
    @RabbitListener(queues = "simple.queue")
    public void listenSimpleQueueMessage(String msg) throws InterruptedException {
        System.out.println("spring 消费者接收到消息:【" + msg + "】");
    }
}

然后启动启动类,它能自动从队列中取出消息。取出后队列中就没消息了!

交换机

无论是 DirectTopic 还是 Fanout 交换机,你都可以用 同一个 Binding Key 把多条队列绑定到同一个交换机上。

1)fanout:广播给每个绑定的队列

image-20250528133703660

image-20250528132709273

发送消息:

convertAndSend如果 3 个参数,第一个表示交换机,第二个表示RoutingKey,第三个表示消息。

@Test
public void testFanoutExchange() {
     // 交换机名称
     String exchangeName = "hmall.fanout";
     // 消息
     String message = "hello, everyone!";
     rabbitTemplate.convertAndSend(exchangeName, "", message);
}

2)Direct交换机

  • 队列与交换机的绑定,不能是任意绑定了,而是要指定一个RoutingKey(路由key)
  • 消息的发送方在 向 Exchange发送消息时,也必须指定消息的 RoutingKey
  • Exchange不再把消息交给每一个绑定的队列,而是根据消息的Routing Key进行判断,只有队列的Routingkey与消息的 Routing key完全一致,才会接收到消息

注意,RoutingKey不等于队列名称

image-20250528141029943

3)Topic交换机

Topic类型的交换机与Direct相比,都是可以根据RoutingKey把消息路由到不同的队列。

只不过Topic类型交换机可以让队列在绑定BindingKey 的时候使用通配符

BindingKey一般都是有一个或多个单词组成,多个单词之间以.分割

通配符规则:

  • #:匹配一个或多个词
  • *:匹配不多不少恰好1个词

举例:

  • item.#:能够匹配item.spu.insert 或者 item.spu
  • item.*:只能匹配item.spu

转发过程:把发送者传来的 Routing Key 按点分成多级,和各队列的 Binding Key(可以带 *# 通配符)做模式匹配,匹配上的队列统统都能收到消息。

Routing Key和Binding Key

Routing Key(路由键)

  • 由发送者(Producer)在发布消息时指定,附着在消息头上。
  • 用来告诉交换机:“我的这条消息属于哪类/哪个主题”。

Binding Key(绑定键)

  • 由消费者(在应用启动或队列声明时)指定,是把队列绑定到交换机时用的规则。有些 UI 里 Routing Key 等同于 Binding Key!
  • 告诉交换机:“符合这个键的消息,投递到我这个队列”。

交换机本身不设置 Routing Key 或 Binding Key,它只根据类型(Direct/Topic/Fanout/Headers)和已有的“队列–绑定键”关系,把 incoming Routing Key 匹配到对应的队列。

Direct Exchange

  • 路由规则Routing Key === Binding Key(完全一致)
  • 场景:一对一或一对多的精确路由

Topic Exchange

  • 路由规则

    :支持通配符

    • *:匹配一个单词
    • #:匹配零个或多个单词
  • 例:

    • Binding Key绑定键 order.* → 能匹配 order.createdorder.paid
    • 绑定键 order.# → 能匹配 order.created.successorder

Fanout Exchange

  • 路由规则:忽略 Routing/Binding Key,消息广播到所有绑定队列
  • 场景:聊天室广播、缓存失效通知等

消费者处理消息

不同队列: 同一个交换机 + 相同 routing key 绑定到 多个不同的队列 → 每个队列都会收到一份消息,各自独立处理。 👉 相当于多个队列订阅了同类信息,TOPIC

同一个队列: 多个消费者(不管是一个应用里开多个 listener,还是多台实例部署)监听 同一个队列 → 一条消息只会被其中一个消费者消费,起到负载均衡作用。 👉 常用于“任务分摊”。

基于注解声明交换机、队列

前面都是在 RabbitMQ 管理控制台手动创建队列和交换机,开发人员还得把所有配置整理一遍交给运维,既繁琐又容易出错。更好的做法是在应用启动时自动检测所需的队列和交换机,若不存在则直接创建。

基于注解方式来声明

type 默认交换机类型为ExchangeTypes.DIRECT

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue1"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "blue"}
))
public void listenDirectQueue1(String msg){
    System.out.println("消费者1接收到direct.queue1的消息:【" + msg + "】");
}

@RabbitListener(bindings = @QueueBinding(
    value = @Queue(name = "direct.queue2"),
    exchange = @Exchange(name = "hmall.direct", type = ExchangeTypes.DIRECT),
    key = {"red", "yellow"}
))
public void listenDirectQueue2(String msg){
    System.out.println("消费者2接收到direct.queue2的消息:【" + msg + "】");
}

检查队列

  • 如果 RabbitMQ 中已经有名为 direct.queue1 的队列,就不会重复创建;
  • 如果不存在,RabbitAdmin 会自动帮你创建一个。

检查交换机

  • 同理,会查看有没有名为 hmall.direct、类型为 direct 的交换机,若不存在就新建。

检查绑定

  • 最后再去声明绑定关系:把 direct.queue1 绑定到 hmall.direct,并且 routing-key 为 "red""blue"
  • 如果已有相同的绑定(队列、交换机、路由键都一致),也不会再重复创建。

消息转换器

使用JSON方式来做序列化和反序列化,替换掉默认方式。

更小或可压缩的消息体、易读、易调试

1)引入依赖

<dependency>
    <groupId>com.fasterxml.jackson.dataformat</groupId>
    <artifactId>jackson-dataformat-xml</artifactId>
    <version>2.9.10</version>
</dependency>

2)配置消息转换器,在publisherconsumer两个服务的启动类中添加一个Bean即可:

@Bean
public MessageConverter messageConverter(){
    // 1.定义消息转换器
    Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
    // 2.配置自动创建消息id,用于识别不同消息,也可以在业务中基于ID判断是否是重复消息
    jackson2JsonMessageConverter.setCreateMessageIds(true);
    return jackson2JsonMessageConverter;
}

MQ高级

我们要解决消息丢失问题,保证MQ的可靠性,就必须从3个方面入手:

  • 确保生产者一定把消息发送到MQ
  • 确保MQ不会将消息弄丢
  • 确保消费者一定要处理消息

发送者的可靠性

发送者重试

修改发送者模块的application.yaml文件,添加下面的内容:

主要是针对网络连接失败的场景,会自动重试;交换机不存在,不会触发重试。

spring:
  rabbitmq:
    connection-timeout: 1s # 设置MQ的连接超时时间
    template:
      retry:
        enabled: true # 开启超时重试机制
        initial-interval: 1000ms # 失败后的初始等待时间
        multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
        max-attempts: 3 # 最大重试次数
  • 阻塞重试,一般不建议开启。

发送者确认机制

一、机制概述

image-20250529170017117

RabbitMQ 提供两种发送者确认机制,确保消息投递的可靠性:

  • Publisher Confirm:确认消息是否到达 RabbitMQ 服务器
  • Publisher Return:确认消息是否成功路由到队列
二、配置开启

1.在发送者模块的application.yaml中添加配置:

spring:
  rabbitmq:
    publisher-confirm-type: correlated # 开启异步confirm机制
    publisher-returns: true # 开启return机制

confirm类型说明

  • none(默认模式):关闭confirm机制,消息由于网络连接失败也不会提醒。
  • simple:同步阻塞等待MQ的回执
  • correlated:MQ异步回调返回回执

2.每个RabbitTemplate只能配置一个ReturnCallback,因此我们可以在配置类中统一设置。

@Slf4j
@Configuration
@RequiredArgsConstructor
public class MqConfig {
    private final RabbitTemplate rabbitTemplate;

    @PostConstruct
    public void init() {
        // 设置全局ReturnCallback
        rabbitTemplate.setReturnsCallback(returned -> {
            log.error("消息路由失败 - Exchange: {}, RoutingKey: {}, ReplyCode: {}, ReplyText: {}",
                    returned.getExchange(),
                    returned.getRoutingKey(),
                    returned.getReplyCode(),
                    returned.getReplyText());
            
            // 可在此添加告警或重试逻辑
            sendAlert(returned);
        });
    }
}
三、ConfirmCallback 使用

消息发送时设置确认回调CorrelationData

image-20250529180404355

这里的CorrelationData中包含两个核心的东西:

  • id:消息的唯一标示,MQ对不同的消息的回执以此做判断,避免混淆
  • SettableListenableFuture:回执结果的Future对象
public void sendMessageWithConfirmation(String exchange, String routingKey, Object message) {
    // 1. 创建关联数据
    CorrelationData correlationData = new CorrelationData();
    
    // 2. 添加确认回调
    correlationData.getFuture().addCallback(
        result -> {
            if (result.isAck()) {
                log.info("✅ 消息成功到达MQ服务器");
            } else {
                log.error("❌ 消息发送失败: {}", result.getReason());
                // 可在此添加重试逻辑
            }
        },
        ex -> {
            log.error("⚠️ 确认过程发生异常", ex);
        }
    );
    
    // 3. 发送消息
    rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
}
四、消息投递结果分析
场景 网络状态 路由状态 ConfirmCallback ReturnsCallback 最终结果
完全成功 ✅ 成功 ✅ 成功 ACK 不触发 消息入队
网络失败 ❌ 失败 - NACK 不触发 发送失败
路由失败 ✅ 成功 ❌ 失败 ACK 触发 消息丢弃
交换机不存在 ✅ 成功 ❌ 失败 ACK 触发 消息丢弃

端到端投递保障

  • ConfirmCallback 只告诉你:消息“到”了 RabbitMQ 服务器吗?(ACK:到;NACK:没到)
  • ReturnsCallback 只告诉你:到达服务器的消息,能“进”队列吗?(能进就不回;进不了就退)

两者都成功,才能确认:“这条消息真的安全地进了队列,等着消费者去拿。”

  • 🟢 ACK:消息到达MQ服务器(可能路由失败)
  • 🔴 NACK:消息未到达MQ服务器(网络问题)
  • 🔵 Return:消息到达但路由失败(配置问题)

通过组合使用这两种机制,可以实现完整的端到端消息投递保障。如果由于网络问题,NACK了,那么会被correlationData.getFuture().addCallback(...)回调函数捕捉!!!

MQ的可靠性

数据持久化

MQ消息持久化就是指当RabbitMQ服务重启后,消息仍然会保留在队列中不会丢失。

非持久化消息:只存储在内存中;持久化消息:同时存储在内存和磁盘中

为了保证数据的可靠性,必须配置数据持久化(从内存保存到磁盘上),包括:

  • 交换机持久化(选Durable)
  • 队列持久化(选Durable)
  • 消息持久化(选Persistent)

控制台方式:

image-20250530154302987 image-20250530154321546

代码方式,默认都是持久化的,不用变动。

消费者可靠性

消费者确认机制

消费者确认机制 (Consumer Acknowledgement) 是为了确认消费者是否成功处理消息。当消费者处理消息结束后,应该向 RabbitMQ 发送一个回执,告知 RabbitMQ 自己消息处理状态:

  • ack:成功处理消息,RabbitMQ 从队列中删除该消息
  • nack:消息处理失败,RabbitMQ 需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ 从队列中删除该消息
image-20250827215620772

上述的NACK状态时,MQ会不断向消费者重投消息,直至被正确处理!!!

消费者方,通过下面的配置可以修改消费者收到消息后的处理方式:

none:消费者收到消息后,RabbitMQ 立即自动确认(ACK)

manual,手动实现ack;

auto(默认模式),自动档,业务逻辑异常返回nack, 消息解析异常 返回reject,其他ack

spring:
  rabbitmq:
    listener:
      simple:
        acknowledge-mode: auto

消费者重试

  • 类似发送者的重试机制,在消费者出现异常时利用本地重试,而不是无限制的requeue到mq队列。
  • 重试达到最大次数后,会返回reject,消息会被丢弃

修改consumer服务的application.yml文件,添加内容:

spring:
  rabbitmq:
    listener:
      simple:
        retry:
          enabled: true # 开启消费者失败重试
          initial-interval: 1000ms # 初识的失败等待时长为1秒
          multiplier: 1 # 失败的等待时长倍数,下次等待时长 = multiplier * last-interval
          max-attempts: 3 # 最大重试次数
          stateless: true # true无状态(默认);如果业务中包含事务,这里改为false有状态

核心概念:一次事务 vs. 多次事务

想象一下这个场景:你是一个消费者,从MQ收到一条消息,内容是“给用户A的账户增加10元”。你的服务需要执行两个步骤:

  1. 处理业务逻辑(更新数据库,给用户A加钱)。
  2. 确认消息(告诉MQ消息处理成功了)。

这个“处理业务逻辑”和“确认消息”的过程,可以放在一个数据库事务里。

特性 无状态重试 (stateless: true) 有状态重试 (stateless: false)
本质 本地方法重试 消息重新投递
事务范围 所有重试在同一个事务中 每次重试是独立的事务
MQ感知 MQ完全不知情(只投递1次) MQ完全知情(多次投递)
性能 高(无网络开销) 较低(有网络开销)
安全性 (易导致重复操作) (每次失败都回滚)
适用场景 幂等操作、非DB操作(如HTTP调用) 非幂等操作、数据库事务操作

为什么用了 @Transactional必须有状态重试?

假设是无状态重试,重试是在同一次方法调用/同一事务里循环进行的(拦截器内部重试)。

第一次失败抛出异常后,当前事务被标记为 rollback-only

  • 接下来即便你第2次、第3次尝试都“业务成功”,提交时也会失败(因为事务早已不可提交)。

结果:不适合与 @Transactional 搭配做数据库更新;更适合无事务幂等且不涉及DB提交的调用(如外部HTTP、缓存写入等)。

假设是有状态重试(stateless: false

  • 重试通过把异常抛回给容器,让消息重新投递来实现。
    • 每次投递 → 监听方法重新执行 → 新的事务开启。
  • 每次失败都会完整回滚该次事务;下一次重试是干净的事务上下文
  • 达到最大次数后,按照你的配置reject(可配合死信队列/失败交换器),从而避免“消息风暴”。

有状态重试相比不开启重试机制:可以配置有限次重试次数,更加灵活。

失败处理策略

只有在开启了消费者重试机制(即配置了 spring.rabbitmq.listener.simple.retry.enabled: true)时才会生效。

当消息消费重试达到最大次数后,默认会直接丢弃,这在要求高可靠性的场景中不可接受。Spring 提供了 MessageRecoverer接口来自定义最终处理策略,主要有三种实现:

  1. RejectAndDontRequeueRecoverer
    • 默认策略。直接拒绝消息并丢弃。
  2. ImmediateRequeueMessageRecoverer
    • 让消息重新进入队列,再次被消费(可能导致循环)。
  3. RepublishMessageRecoverer推荐方案
    • 将消息路由到一个专用的异常交换机,最终进入异常队列
    • 优势:实现故障隔离,便于后续人工干预自动化修复,是保证消息不丢失的优雅方案。

业务幂等性

在程序开发中,幂等则是指同一个业务,执行一次或多次对业务状态的影响是一致的。如:

  • 根据id删除数据
  • 查询数据
  • 新增数据

但数据的更新往往不是幂等的,如果重复执行可能造成不一样的后果。比如:

  • 取消订单,恢复库存的业务。如果多次恢复就会出现库存重复增加的情况
  • 退款业务。重复退款对商家而言会有经济损失。

所以,我们要尽可能避免业务被重复执行:MQ消息的重复投递、页面卡顿时频繁刷新导致表单重复提交、服务间调用的重试

法一:唯一ID

  1. 每一条消息都生成一个唯一的id,与消息一起投递给消费者。
  2. 消费者接收到消息后处理自己的业务,业务处理成功后将消息ID保存到数据库
  3. 如果下次又收到相同消息,去数据库查询判断是否存在,存在则为重复消息放弃处理。

法一存在业务侵入,因为mq的消息ID与业务无关,现在却多了一张专门记录 ID 的表或结构

法二:业务判断,基于业务本身的逻辑或状态来判断是否是重复的请求或消息,不同的业务场景判断的思路也不一样。

image-20250603151010579

综上,支付服务与交易服务之间的订单状态一致性是如何保证的?

  • 首先,支付服务会正在用户支付成功以后利用MQ消息通知交易服务,完成订单状态同步。
  • 其次,为了保证MQ消息的可靠性,我们采用了生产者确认机制、消费者确认、消费者失败重试等策略,确保消息投递的可靠性
  • 最后,我们还在交易服务设置了定时任务,定期查询订单支付状态。这样即便MQ通知失败,还可以利用定时任务作为兜底方案,确保订单支付状态的最终一致性。

延迟消息

对于超过一定时间未支付的订单,应该立刻取消订单并释放占用的库存。

方案:利用延迟消息实现超时检查

以“订单支付超时时间为30分钟”为例,具体实现流程如下:

  1. 创建订单时:在订单入库的同时,向消息队列发送一条延迟时间为30分钟的消息
  2. 消息等待:此消息不会立即被消费,而是由MQ服务器暂存至延迟时间到期。
  3. 延迟触发:30分钟后,消息队列自动将该消息投递给消费者服务。
  4. 执行检查与操作:消费者接收到消息后,查询该订单的当前支付状态:
    • 若订单仍为“未支付”:则执行取消订单、释放库存等后续操作。
    • 若订单已支付:则忽略此消息,流程结束。

image-20250603154136058

实现延迟消息法一

延迟消息插件

1.下载

GitHub - rabbitmq/rabbitmq-delayed-message-exchange: Delayed Messaging for RabbitMQ

2.上传插件,由于之前docker部署MQ挂载了数据卷

docker volume ls   #查看所有数据卷

docker volume inspect hmall_all_mq-plugins  #获取数据卷的目录

#"Mountpoint": "/var/lib/docker/volumes/hmall_all_mq-plugins/_data"

我们上传插件到该目录下。

3.安装插件

docker exec -it mq rabbitmq-plugins enable rabbitmq_delayed_message_exchange

image-20250603163744968

声明延迟交换机

额外指定参数 delayed = "true"

@RabbitListener(bindings = @QueueBinding(
        value = @Queue(name = "delay.queue", durable = "true"),
        exchange = @Exchange(name = "delay.direct", delayed = "true"),
        key = "delay"
))
public void listenDelayMessage(String msg){
    log.info("接收到delay.queue的延迟消息:{}", msg);
}

发送延迟消息

@Test
void testPublisherDelayMessage() {
    // 1.创建消息
    String message = "hello, delayed message";
    // 2.发送消息,利用消息后置处理器添加消息头
    rabbitTemplate.convertAndSend("delay.direct", "delay", message, new MessagePostProcessor() {
        @Override
        public Message postProcessMessage(Message message) throws AmqpException {
            // 添加延迟消息属性
            message.getMessageProperties().setDelay(5000);
            return message;
        }
    });
}

实现延迟消息法二

RabbitMQ (TTL + 死信队列)

1.配置类(配置交换机和队列)

类型 名称 作用 路由键
交换机 order.exchange 业务交换机:接收原始延迟消息 order.delay.key
队列 order.delay.queue 等待队列:消息在此等待TTL过期 -
交换机 order.delay.exchange 死信交换机:接收过期消息 order.delay.key
队列 order.process.queue 处理队列:最终消费消息的队列 -
@Configuration
public class RabbitMQDelayConfig {

    // 业务交换机
    @Bean
    public DirectExchange orderExchange() {
        return new DirectExchange("order.exchange");
    }

    // 死信交换机(作为延迟消息的目标)
    @Bean
    public DirectExchange orderDelayExchange() {
        return new DirectExchange("order.delay.exchange");
    }

    // 业务队列 - 设置死信参数
    @Bean
    public Queue orderDelayQueue() {
        Map<String, Object> args = new HashMap<>();
        // 消息到期后转发的死信交换机
        args.put("x-dead-letter-exchange", "order.delay.exchange");
        // 死信路由键
        args.put("x-dead-letter-routing-key", "order.delay.key");
        return new Queue("order.delay.queue", true, false, false, args);
    }

    // 最终消费队列
    @Bean
    public Queue orderProcessQueue() {
        return new Queue("order.process.queue");
    }

    // 绑定:业务队列 -> 业务交换机
    @Bean
    public Binding orderDelayBinding() {
        return BindingBuilder.bind(orderDelayQueue())
                .to(orderExchange())
                .with("order.delay.key");
    }

    // 绑定:最终队列 -> 死信交换机
    @Bean
    public Binding orderProcessBinding() {
        return BindingBuilder.bind(orderProcessQueue())
                .to(orderDelayExchange())
                .with("order.delay.key");
    }
}

2. 发送消息(设置TTL)

@Service
@RequiredArgsConstructor
public class OrderService {
    private final RabbitTemplate rabbitTemplate;

    public void createOrder(Order order) {
        // 创建订单逻辑...
        
        // 发送延迟消息(30分钟)
        rabbitTemplate.convertAndSend("order.exchange", "order.delay.key", order.getId(), message -> {
            // 设置消息的TTL为30分钟
            message.getMessageProperties().setExpiration("1800000"); // 毫秒
            return message;
        });
    }
}

3. 消费者

@Component
public class OrderDelayConsumer {
    
    @RabbitListener(queues = "order.process.queue")
    public void processExpiredOrder(String orderId) {
        // 查询订单状态,如果未支付则取消订单
        System.out.println("处理超时订单:" + orderId);
    }
}

超时订单问题

image-20250603171922542

死信交换机

  • 当消息在一个队列中变成“死信(Dead Letter)”后,能被重新投递到的另一个交换机,就是死信交换机(DLX)
  • 绑定到 DLX 的队列叫死信队列(DLQ),专门用来存放这些“死信”消息。

触发条件

  1. 消费者拒绝并不再重投(Consumer Rejection)
    • “消费者这一端”的情况。当消费者明确拒绝消息(发送 basic.rejectbasic.nack)并且设置 requeue=false时,消息会成为死信。
    • 场景:消费者处理消息时遇到无法处理的错误(如业务逻辑错误、数据格式错误),明确告知MQ不要重新投递了。
  2. 消息过期(Message TTL Expired)
    • 这与消费者无关。消息在队列中等待的时间超过了设定的生存时间(TTL),会被自动删除并变成死信。
    • 场景:常用于实现延迟队列。例如,下单15分钟未支付订单取消,就可以将消息TTL设为15分钟,过期后成为死信转到DLQ,由DLQ的消费者来处理取消逻辑。
  3. 队列溢出(Queue Length Limit Exceeded)
    • 这也与消费者无关。当队列的消息数量达到上限时,新来的消息或队列头部的消息(取决于配置)会被丢弃并变成死信。
    • 场景:用于限制队列容量,防止消息无限堆积,保护系统。

配置

必须用编程式方式来声明,不可用注解式。

@Configuration
public class RabbitMQConfig {

    @Value("${spring.rabbitmq.config.producer.exchange}")
    private String businessExchangeName;

    @Value("${spring.rabbitmq.config.producer.topic_team_success.queue}")
    private String businessQueueName;

    @Value("${spring.rabbitmq.config.producer.topic_team_success.routing_key}")
    private String businessRoutingKey;

    // 1. 定义死信交换机(通常一个应用一个就够了)
    @Bean
    public TopicExchange dlxExchange() {
        return new TopicExchange(businessExchangeName + ".dlx", true, false);
    }

    // 2. 定义死信队列
    @Bean
    public Queue dlq() {
        return new Queue(businessQueueName + ".dlq", true);
    }

    // 3. 将死信队列绑定到死信交换机
    @Bean
    public Binding dlqBinding() {
        return BindingBuilder.bind(dlq())
                .to(dlxExchange())
                .with(businessRoutingKey + ".dead"); // 使用新的路由键
    }

    // 4. 定义业务交换机
    @Bean
    public TopicExchange businessExchange() {
        return new TopicExchange(businessExchangeName, true, false);
    }

    // 5. 定义业务队列,并配置死信规则(核心!)
    @Bean
    public Queue businessQueue() {
        Map<String, Object> args = new HashMap<>();
        // 指定死信交换机
        args.put("x-dead-letter-exchange", businessExchangeName + ".dlx");
        // 指定死信的路由键(可选,不指定则使用原消息的路由键)
        args.put("x-dead-letter-routing-key", businessRoutingKey + ".dead");
        
        // 还可以设置其他导致消息成为死信的参数
        // args.put("x-message-ttl", 60000); // 消息60秒过期
        // args.put("x-max-length", 1000); // 队列最大长度1000条

        return new Queue(businessQueueName, true, false, false, args);
    }

    // 6. 将业务队列绑定到业务交换机
    @Bean
    public Binding businessBinding() {
        return BindingBuilder.bind(businessQueue())
                .to(businessExchange())
                .with(businessRoutingKey);
    }
}
© 版权声明
THE END
喜欢就支持一下吧
点赞 0 分享 收藏
评论 抢沙发
取消