前言

介绍RabbitMQ

RabbitMQ 是一款基于 AMQP(Advanced Message Queuing Protocol,高级消息队列协议) 实现的开源消息队列(MQ)中间件,由 Erlang 语言开发,因其高可靠性、灵活的路由机制、丰富的功能特性,被广泛用于分布式系统中的异步通信、服务解耦、流量削峰等场景。

RabbitMQ 最初起源于金融系统,用于在分布式系统中存储转发消息,在易用性、扩展性、高可用性等方面表现不俗。

消息队列(MQ)的核心作用是 “解耦、异步、削峰”,而 RabbitMQ 作为其中的典型实现,通过规范的消息传递流程、灵活的路由策略和完善的可靠性机制,实现了分布式系统中不同组件 / 服务之间的 “间接通信”—— 即发送方(生产者)无需直接调用接收方(消费者),而是通过 “消息” 作为载体,借助 RabbitMQ 完成传递,从而降低系统耦合度、提升响应速度、缓冲流量峰值。

RabbitMQ 核心概念

RabbitMQ 核心组件和架构

在学习使用 RabbitMQ 之前,必须了解其相关的各种基础概念

RabbitMQ 的消息传递模型与其他 MQ(如 Kafka 基于 “主题 - 分区”)有显著差异,其核心通过 “交换机(Exchange)、队列(Queue)、绑定(Binding)” 实现灵活的消息路由,组件关系如下:

image-20250730171217357

核心组件:

  • 生产者(Producer 或者 Publisher):发送消息的应用 / 服务,消息内容通常包含业务数据(如订单信息),负责将消息发送到交换机
  • 交换机(Exchange):负责接收生产者发送的消息,根据 “绑定规则” 将消息路由到对应绑定的队列。交换机是 RabbitMQ 路由灵活性的核心(区别于其他 MQ 直接将消息发送到队列)。
    • Direct Exchange(直连交换机)
      • 路由规则:消息的 Routing Key 必须与队列绑定的 Binding Key 完全匹配,才会被路由到该队列,它会匹配 Routing Key 精确投递。
      • 适用场景:一对一通信(如 “订单创建” 消息仅需被 “订单支付服务” 处理)。
    • Fanout Exchange(扇出交换机)
      • 路由规则:忽略 Routing Key,将消息 广播到所有与该交换机绑定的队列
      • 适用场景:一对多通知(如 “商品上架” 消息需同时被 “库存服务”“搜索服务”“推荐服务” 处理)。
    • Topic Exchange(主题交换机)
      • 路由规则:支持 Routing Key 与 Binding Key 用 “通配符” 匹配,也就是基于模式匹配(* 匹配单个单词,# 匹配多个单词,单词间用 . 分隔)。
      • 示例:若队列绑定的 Binding Key 为 order.#,则 Routing Key 为 order.createorder.pay.success 的消息都会被路由到该队列。
      • 适用场景:多维度分类的消息路由(如按 “业务类型。操作。状态” 分类的消息)。
    • Headers Exchange(头交换机)
      • 路由规则:忽略 Routing Key,根据消息 “头部属性(Headers)” 的键值对与绑定规则匹配(如 {"type": "log", "level": "error"})。
      • 适用场景:需要基于多属性过滤消息的场景(较少用,因 Topic 交换机已能满足大部分灵活路由需求)。
    • 多说一句,Kafka 只有 topic 的概念。这是因为 Kafka 的设计上消息只用存一份,通过游标,发送后不立即删除消息。多个消费者组可以互不影响的消费。这是 Kafka 的一大改进。也是为什么 Kafka 比 RabbitMQ 更抗压的原因。
  • 队列(Queue):存储消息的容器,消息最终会被投递到队列,等待消费者获取。队列是 “持久化” 的最小单位(可配置是否持久化,避免服务重启丢失消息)。队列可以绑定多个 Routing Key,也可以绑定到多个 交换机
  • 绑定(Binding):定义交换机与队列之间的关联关系,用于匹配交换机队列的路由规则,包含 “路由键(Routing Key)” 和 “匹配规则”,决定消息从交换机如何路由到队列。
  • 消费者(Consumer):从队列中获取并处理消息的应用 / 服务,消费者监听队列,并处理接收到的消息。
  • 其中,broker 是消息代理,指的是负责接收、存储、路由和转发消息的核心服务器 / 中间件节点。RabbitMQ 中,其运行的服务实例本身就是一个 Broker

那么我们知道了这些内容,RabbitMQ 的架构图就很容易理解了

image-20250730171258497

所以说,我们可以理清消息流转的简易的全流程如下

1
生产者 → 消息(带 Routing Key)→ 交换机(根据 Binding 规则)→ 队列(存储消息)→ 消费者(获取并处理)  
  • 生产者发送消息时,需指定消息的 “目标交换机” 和 “路由键(Routing Key)”;
  • 交换机根据自身类型(如 Direct、Topic 等)和与队列的绑定规则(Binding Key),判断消息应路由到哪些队列;
  • 队列存储消息,等待消费者连接并获取;
  • 消费者通过 “订阅” 队列,持续获取消息并处理。

RabbitMQ 作为 MQ 的核心特性

作为消息队列,RabbitMQ 的核心竞争力体现在 “可靠传递”“灵活扩展” 两方面:

消息可靠传递机制(避免消息丢失)

  • 持久化
    • 交换机、队列可配置为 “持久化”(durable: true),确保服务重启后不丢失元数据;
    • 消息可设置 delivery_mode=2,标记为持久化消息,确保被写入磁盘(非仅存于内存)。
  • 确认机制
    • 生产者确认(Publisher Confirm):生产者可开启确认模式,RabbitMQ 会在消息被交换机接收并路由到队列后,向生产者返回确认信号(确保消息未丢失在 “生产者→交换机” 环节);
    • 消费者确认(Consumer ACK):消费者处理完消息后,需手动发送 ACK 信号(默认自动 ACK,建议关闭),RabbitMQ 收到 ACK 后才删除队列中的消息(避免消费者崩溃导致消息丢失)。
      • 其中 ACK 是消息确认机制,ACK得不到确认消息会被重新投递。
  • 备份交换机(Alternate Exchange):若消息无法被目标交换机路由(无匹配队列),可配置备份交换机,将消息路由到 “死信队列” 或 “备份队列”,避免消息被丢弃。

其他核心特性

  • 消息优先级:消息可设置 priority 属性(0-9,数值越大优先级越高),队列会优先投递高优先级消息(适用于 “紧急订单” 等场景)。

  • 消息集群(Clustering):多个 RabbitMQ 服务器可以组成一个集群,形成一个逻辑 Broker 。

  • 多种协议(Multi-protocol):RabbitMQ 支持多种消息队列协议,比如 STOMP、MQTT 等等。

  • 延迟队列:通过 “死信交换机”+“消息过期时间(TTL)” 实现延迟消息(如 “订单 30 分钟未支付自动取消”)。

  • 限流:消费者可设置 prefetch_count,限制每次从队列获取的消息数量(避免消费者因处理能力不足被消息压垮)。

  • 集群与高可用:支持多节点集群,通过 “镜像队列(Mirror Queue)” 将队列数据同步到多个节点,确保单节点故障时消息不丢失、服务不中断。

  • 管理界面(Management UI):RabbitMQ 提供了一个易用的用户界面,使得用户可以监控和管理消息 Broker 的许多方面。

  • 跟踪机制(Tracing):如果消息异常,RabbitMQ 提供了消息跟踪机制,使用者可以找出发生了什么。

  • 插件机制(Plugin System):RabbitMQ 提供了许多插件,来从多方面进行扩展,也可以编

    写自己的插件。

RabbitMQ的原理解析

AMQP协议

基本介绍

它由摩根大通集团联合其他公司共同设计。是一个提供统一消息服务的应用层标准高级消息队列协议,是应用层协议的一个开放标准,为面向消息的中间件设计。基于此协议的客户端与消息中间件可传递消息,并不受客户端/中间件不同产品,不同的开发语言等条件的限制。

AMQP(Advanced Message Queuing Protocol,高级消息队列协议)是一个开放的应用层协议,旨在规范分布式系统中消息的传递流程、组件交互和可靠性保障,这个协议我们需要知道,它使不同厂商的消息中间件(如 RabbitMQ、Apache Qpid 等)和客户端能基于统一标准通信。

RabbitMQ 是 AMQP 协议(主要是 v0-9-1 版本)的经典实现,其核心架构、消息流转逻辑和可靠性机制完全遵循 AMQP 规范。

在 AMQP 出现前,消息队列领域没有统一标准:不同中间件(如早期的 ActiveMQ、自研 MQ)的消息格式、交互方式差异极大,导致客户端与中间件、中间件之间难以兼容(例如,Java 客户端无法直接连接 Python 实现的 MQ)。

AMQP 的核心目标是 “定义一套通用的消息传递规范”,包括:

  • 消息的结构(如何打包数据);
  • 参与消息传递的组件(如交换器、队列)及它们的职责;
  • 组件之间的交互规则(如消息如何从生产者到队列,如何被消费);
  • 可靠性保障机制(如消息如何避免丢失、如何确认)。

通过这套规范,任何遵循 AMQP 的客户端(无论语言)都能与任何遵循 AMQP 的中间件(无论厂商)通信,实现 “跨语言、跨平台” 的消息交互

AMQP 核心组件

而 AMQP 定义了一套 “消息传递模型”,包含 6 个核心组件,这些组件的职责和交互规则是协议的核心。RabbitMQ 完全实现了这套模型,这套组件我会再说一次,可以联系 RabbitMQ 的数据模型来对比看两者

而这时候,我又就要掏出这张图了

image-20250730165740942

RabbitMQ的核心实际上就是AMQP的核心,把AMQP讲明白,RabbitMQ 也就差不多了

消息(Message):AMQP 的 “数据载体”

AMQP 对 “消息” 的结构进行了规范,它不仅包含业务数据(如订单信息),还包含元数据(用于控制消息的路由、持久化等行为)。

  • 结构组成
    • 消息体(Body):二进制数据(如 JSON 字符串、字节流),AMQP 不关心其内容(由生产者和消费者自行解析);
    • 消息头(Properties/Headers):元数据键值对,AMQP 定义了标准字段,例如:
      • routing-key:路由键(用于交换器路由消息);
      • delivery-mode:投递模式(1 = 非持久化,2 = 持久化,控制消息是否写入磁盘);
      • content-type:消息体类型(如application/json,帮助消费者解析);
      • expiration:过期时间(消息在队列中存活的最大时长,单位毫秒);
      • priority:优先级(0-9,数值越高越先被投递)。
  • RabbitMQ 的消息结构完全遵循 AMQP 规范。例如,当生产者通过 RabbitMQ 客户端发送消息时,必须指定routing-keydelivery-mode等元数据,RabbitMQ 会根据这些元数据处理消息(如持久化、路由)。

交换器(Exchange):AMQP 的 “消息路由器”

交换器是 AMQP 中接收生产者消息并路由到队列的核心组件,其核心职责是 “根据规则将消息分发到对应的队列”。

  • AMQP 对交换器的定义
    • 交换器必须有唯一名称(在一个 Broker 内);
    • 交换器有 “类型”(Type),不同类型对应不同的路由规则;
    • 交换器可配置 “持久化”(durable):服务重启后是否保留(元数据不丢失);
    • 交换器可配置 “自动删除”(auto-delete):当最后一个绑定关系解除后自动删除。
  • RabbitMQ 中的实现: RabbitMQ 的交换器完全遵循 AMQP 定义,且实现了 AMQP 规范的 4 种核心类型(与前文提到的一致):
    • Direct(直连)、Fanout(扇出)、Topic(主题)、Headers(头); 例如,当创建 Direct 交换器时,RabbitMQ 会严格按照 “Routing Key 与 Binding Key 完全匹配” 的规则路由消息,这正是 AMQP 对 Direct 类型的规范。

队列(Queue):AMQP 的 “消息存储容器”

队列是 AMQP 中存储消息的组件,消息最终会被投递到队列,等待消费者获取。

  • AMQP 对队列的定义
    • 队列有唯一名称(在一个 Broker 内);
    • 队列可配置 “持久化”(durable):服务重启后队列元数据和持久化消息不丢失;
    • 队列可配置 “排他性”(exclusive):仅当前连接可见,连接关闭后自动删除(适合临时队列);
    • 队列可配置 “自动删除”(auto-delete):最后一个消费者取消订阅后自动删除;
    • 队列有 “消息溢出策略”:当队列满时(如设置了最大长度),如何处理新消息(丢弃、返回给生产者等)。
  • RabbitMQ 中的实现: RabbitMQ 的队列完全支持 AMQP 的配置项。例如:
    • 持久化队列(durable=true)在 RabbitMQ 重启后仍存在,且其中的持久化消息(delivery-mode=2)会被恢复;
    • 排他队列(exclusive=true)仅在创建它的信道(Channel)所在连接有效,连接关闭后立即删除,这是 RabbitMQ 对 AMQP 排他性的严格实现。
  • 多说一下队列,MessageQueue消息队列中,一个队列里的一条消息,也就是同一个message ID对应的消息,不管有多少个消费者来分摊压力,也只能被消费一次。消息队列和消费者之间有 ack 机制,消息一旦确认安全送达,RabbitMQ服务端就可以安全删除消息了。

绑定(Binding):AMQP 的 “交换器与队列的桥梁”

绑定是 AMQP 中定义交换器与队列关联关系的组件,它包含 “绑定规则”,决定交换器如何将消息路由到队列。

  • AMQP 对绑定的定义
    • 绑定是 “交换器→队列” 的单向关联(一个交换器可绑定多个队列,一个队列可被多个交换器绑定);
    • 绑定包含 “绑定键(Binding Key)”:与消息的 “路由键(Routing Key)” 配合,作为路由规则的判断依据;
    • 绑定可附加 “参数(Arguments)”:用于更复杂的匹配(如 Headers 交换器的头信息匹配)。
  • RabbitMQ 中的实现: RabbitMQ 的绑定完全遵循 AMQP 规则。例如:
    • 当用 Direct 交换器时,只有消息的 Routing Key 与绑定的 Binding Key 完全一致,才会被路由到队列;
    • 当用 Topic 交换器时,Binding Key 支持*#通配符,这是 AMQP 对 Topic 类型绑定规则的规范,RabbitMQ 严格实现了这一逻辑。
  • RabbitMQ 中,Binding 是 MessageQueue 与 Exchange 之间的连接,Exchange 只能给 Binding 的 MessageQueue 发送消息。

连接(Connection)与信道(Channel):AMQP 的 “通信通道”

AMQP 定义了客户端与消息中间件(Broker)的通信方式,核心是 “连接” 和 “信道”,用于减少 TCP 连接开销。

  • AMQP 对连接的定义
    • 连接是客户端与 Broker 之间的TCP 长连接,负责传输所有数据;
    • 连接需要认证(用户名 / 密码),确保安全性;
    • 连接可配置 “心跳检测”(Heartbeat):定期发送心跳包,检测连接是否存活。
  • AMQP 对信道的定义
    • 信道是 “共享一个 TCP 连接的虚拟连接”,客户端通过信道发送 / 接收消息、创建组件(交换器 / 队列);
    • 一个连接可包含多个信道(理论上无上限),信道之间相互隔离(独立的会话);
    • 信道的目的是减少 TCP 连接数量(TCP 连接创建 / 维护成本高),例如一个应用可通过一个 TCP 连接创建 10 个信道,分别处理不同业务。
  • RabbitMQ 中的实现: RabbitMQ 严格遵循 AMQP 的连接 / 信道模型:
    • 客户端(如 Java 的 RabbitMQ Client)必须先通过 TCP 连接到 RabbitMQ(默认端口 5672),并完成认证;
    • 所有操作(发送消息、创建队列等)都必须通过信道执行,RabbitMQ 通过信道 ID 区分不同信道的请求;
    • 例如,一个 Spring Boot 应用连接 RabbitMQ 时,通常会创建一个 TCP 连接,再通过连接池管理多个信道,大幅降低连接开销。

Broker:AMQP 的 “消息中间件实例”

Broker 是 AMQP 对 “消息中间件服务实例” 的抽象,它包含所有核心组件(交换器、队列、绑定等),负责接收客户端连接、处理消息路由和存储。

  • AMQP 对 Broker 的定义: Broker 是消息传递的 “中枢”,管理所有交换器、队列和绑定,协调生产者和消费者的交互。
  • RabbitMQ 中的体现: 一个 RabbitMQ 服务实例就是一个典型的 AMQP Broker。例如,我们启动的rabbitmq-server进程就是一个 Broker,它内部维护了所有交换器、队列、绑定关系,以及客户端的连接和信道。

AMQP三层协议

在整个消息流转中,三层结构分工明确,协同支撑通信:

  • Transport Layer(传输层):负责底层二进制数据传输,将所有命令、应答、消息体封装为 “帧”(Frame),通过 TCP 连接传输,并处理信道复用(一个 TCP 连接可承载多个信道,提升效率)、错误检查等。
  • Session Layer(会话层):负责命令与应答的同步交互,确保客户端与 Broker 之间的命令按顺序执行(如客户端发送Queue.Declare后,等待 Broker 返回Queue.Declare-OK才继续下一步),并处理通信中的错误(如连接中断后的重试逻辑)。
  • Module Layer(模块层):提供具体业务命令(如声明队列、发送消息、订阅消费等),是客户端实现业务逻辑的直接接口,所有核心操作均通过该层命令完成。

我草,这好像TCP的传输层和应用层那块,667

AMQP 的消息流转流程

AMQP 协议定义了消息从生产者到消费者的完整流转规则,RabbitMQ 严格按照这个流程处理消息,具体步骤如下:

  1. 协议握手:确定通信协议版本

    客户端与 Broker 建立 TCP 连接后,首先进行协议版本确认,这是后续所有交互的前提:

    • 客户端向 Broker 发送协议头报文AMQP\x00\x00\x09\x01(其中00\x00\x09\x01对应 AMQP 0-9-1 版本),明确本次通信使用 AMQP 0-9-1 协议。

    • Broker 收到后,若支持该版本,则进入连接建立阶段;若不支持,则断开连接。

  2. 建立 Connection(连接):客户端与 Broker 的 “物理连接”

    Connection 是客户端与 Broker 之间的 TCP 连接封装,包含认证、参数协商步骤,涉及 6 个核心命令交互(由 Module Layer 定义,Session Layer 负责同步,Transport Layer 传输):

    1. Connection.Start / Connection.Start-OK

      • Broker 发送Connection.Start:包含 Broker 支持的认证机制(如用户名密码)、服务器属性(如版本、操作系统)等。
      • 客户端回复Connection.Start-OK:携带认证信息(如用户名、密码)、客户端属性(如客户端名称、版本),完成身份验证。
    2. Connection.Tune / Connection.Tune-OK

      • Broker 发送Connection.Tune:协商连接参数(如最大帧大小、信道最大数量、心跳间隔)。

      • 客户端回复Connection.Tune-OK:确认接受这些参数(或修改后返回,Broker 需再次确认)。

    3. Connection.Open / Connection.Open-OK

      • 客户端发送Connection.Open:指定要连接的虚拟主机(Virtual Host,用于资源隔离)。

      • Broker 回复Connection.Open-OK:确认连接成功,此时 Connection 正式建立。

  3. 创建 Channel(信道):Connection 上的 “逻辑通道”

    AMQP 通过 “信道” 实现 TCP 连接的复用(一个 Connection 可创建多个 Channel),减少 TCP 连接开销。创建过程如下:

    • 客户端发送Channel.Open:请求创建一个信道(需指定信道 ID,如 1、2 等)。

    • Broker 回复Channel.Open-OK:确认信道创建成功。 后续所有业务操作(声明队列、发送消息等)均在具体信道上执行,Transport Layer 会通过帧中的 “信道 ID” 区分不同信道的请求。

  4. 生产者发送消息的完整流程

    生产者通过已创建的 Channel,完成交换机 / 队列声明、绑定关系建立,最终发送消息到 Broker:

    1. 声明交换机(Exchange)

      • 客户端发送Exchange.Declare:指定交换机名称、类型(如 direct、topic)、是否持久化、是否自动删除等属性。
      • Broker 回复Exchange.Declare-OK:确认交换机声明成功(若已存在且属性匹配,则直接返回成功)。
    2. 声明队列(Queue)

      • 客户端发送Queue.Declare:指定队列名称、是否持久化、是否排他(仅当前连接可见)、是否自动删除等属性。

      • Broker 回复Queue.Declare-OK:确认队列声明成功,并返回队列实际名称(若未指定名称,Broker 会自动生成)、消息数、消费者数等信息。

    3. 绑定交换机与队列(Binding)

      • 客户端发送Queue.Bind:指定要绑定的队列、交换机,以及路由键(Routing Key)、绑定参数(如 headers 类型的匹配规则)。

      • Broker 回复Queue.Bind-OK:确认绑定关系建立,此后交换机收到匹配路由键的消息时,会将其投递到该队列。

    4. 发送消息(Basic.Publish)

      • 客户端发送Basic.Publish:包含目标交换机名称、路由键(Routing Key)、消息属性(如持久化标记、优先级)等元数据。

      • 紧接着,客户端通过 Transport Layer 发送Content-Header 帧:包含消息体的元数据(如消息体大小、MIME 类型、自定义属性)。

      • 最后发送Content-Body 帧:消息的实际内容(如 JSON 字符串、二进制数据)。

      • Broker 收到后,根据交换机类型和绑定规则,将消息路由到匹配的队列(无需返回应答,除非开启消息确认机制)。

  5. 消费者接收消息的完整流程

    消费者通过 Channel 订阅队列,获取并处理消息,涉及以下步骤:

    1. 设置消费参数(Basic.Qos)

      • 客户端发送Basic.Qos:指定消费者最多可持有未确认的消息数量(如prefetch_count=1表示处理完一条消息并确认后,才会接收下一条),避免消息堆积。

      • Broker 回复Basic.Qos-OK:确认 Qos 参数生效。

    2. 订阅队列(Basic.Consume)

      • 客户端发送Basic.Consume:指定要消费的队列名称、是否自动确认消息(auto_ack=false表示手动确认)、消费者标签(标识消费者)等。

      • Broker 回复Basic.Consume-OK:确认订阅成功,并返回消费者标签(后续用于取消订阅)。

    3. 接受并且处理消息

      • Broker 通过Basic.Deliver帧向消费者推送消息:包含消息的路由键、交换机、消息体等。

      • 消费者处理消息后,若auto_ack=false,需发送Basic.Ack帧确认消息已处理(Broker 收到后会从队列中删除该消息);若处理失败,可发送Basic.NackBasic.Reject让消息重新入队或丢弃。

  6. 连接关闭(可选)

    当通信结束时,客户端与 Broker 通过以下命令关闭连接:

    • 客户端发送Channel.Close关闭信道,Broker 回复Channel.Close-OK

    • 客户端发送Connection.Close关闭连接,Broker 回复Connection.Close-OK,最终 TCP 连接断开。

Rabbit 中 Broker 的体现

RabbitMQ 作为 AMQP 协议的典型实现,其运行的服务实例本身就是一个 Broker。具体来说:

  • 当你启动 RabbitMQ 服务(如通过rabbitmq-server命令),这个服务进程就是 Broker 的实体。
  • 它内部管理着一系列核心组件:交换机(Exchange)、队列(Queue)、绑定(Binding)、虚拟主机(Virtual Host)等,这些组件共同协作完成消息的接收、存储和路由。
  • 生产者通过 AMQP 协议连接到 RabbitMQ Broker,将消息发送到指定的交换机;Broker 根据交换机与队列的绑定规则,将消息投递到匹配的队列;消费者再从队列中获取消息。

简单说,Broker 就是消息队列的 “服务器本体”,是协调生产者和消费者完成消息传递的核心角色。RabbitMQ 的所有功能(如路由策略、持久化、高可用等)都是通过 Broker 来实现的。

具体来说,Broker 的核心功能包括:

  1. 接收消息:从生产者(Producer)处接收发送的消息。
  2. 存储消息:根据配置(如持久化策略)暂时或永久存储消息(例如,当消费者未在线时,Broker 会将消息保存在队列中等待消费)。
  3. 路由消息:根据预设的规则(如 AMQP 中的交换机 Exchange 与队列 Queue 的绑定关系),将消息从生产者路由到对应的消费者。
  4. 转发消息:将路由后的消息推送给订阅了对应队列的消费者,或等待消费者主动拉取。
  5. 管理连接:维护生产者、消费者与 Broker 之间的网络连接(如 TCP 连接),并处理认证、授权等安全逻辑。

RabbitMQ消息流转流程

一条消息的完整生命周期通常遵循以下核心路径: Producer -> Exchange -> (Binding) -> Queue -> Consumer

下图直观地展示了这一完整流程,包含了成功处理和异常情况

flowchart TD
    A[生产者 Producer]
    B[交换机 Exchange]
    C{路由成功?}
    D[队列 Queue]
    E[⚠️ 消息丢失]
    F[消费者 Consumer]
    G[死信队列 DLQ]
    H[死信消费者]

    subgraph F_Process[消费者处理流程]
        F1[接收消息]
        F2{处理成功?}
        F3[发送ACK<br>消息被删除]
        F4[发送NACK<br>requeue=true]
        F5[发送NACK<br>requeue=false]
    end

    A -->|1. 创建并发送消息| B
    B -->|2. 根据规则路由| C
    C -->|是| D
    C -->|否| E
    D -->|3. 消息入队等待| F
    
    F --> F1
    F1 --> F2
    F2 -->|是| F3
    F2 -->|否/异常| F4
    F2 -->|否/严重失败| F5
    
    F4 -->|4. 重新投递| D
    F5 -->|5. 进入死信队列| G
    G -->|6. 死信处理| H

首先第一步是生产者发送消息:

  • 从创建消息对象开始,一般是OrderMessage 对象。
  • 然后进行消息转换,RabbitTemplate 会使用配置的 MessageConverter消息转换器,将 Java 对象序列化为 JSON 格式的字节数组,然后设置必要的属性,组合成一个 RabbitMQ 库能理解的 Message 对象。
  • 之后,获取资源,RabbitTemplate 从背后的 CachingConnectionFactory 中获取一个与 RabbitMQ 服务器的 TCP 连接 (Connection)。然后从连接中获取一个 通道 (Channel)。通道是建立在连接之上的轻量级逻辑链接,几乎所有的 AMQP 操作都在通道上进行。Spring 会缓存通道以提高性能。
  • 然后发布消息,通过通道,调用 AMQP 的 basicPublish 方法。
    • 如果开启了发布确认(Publisher Confirms),生产者会异步等待 Broker 返回一个确认信号,表示消息已成功到达服务器(但不一定被路由到队列)。这是一种高级的可靠性保证机制。

至此,生产者的工作就完成了。消息已经离开了你的应用,进入了 RabbitMQ Broker。

接下来进入了交换机路由的部分,由RabbitMQ Broker执行,它不存储消息

  • 接收消息: Broker 接收到生产者发来的消息。
  • 查找交换机: Broker 根据 basicPublish 中指定的 exchange 名称找到对应的交换机。查找的逻辑与设置的模式有关系
  • 产生路由结果
    • 成功: 消息被路由到一个或多个队列。消息会被存入这些队列中。
    • 失败: 如果消息无法被路由到任何队列(例如,没有匹配的绑定规则),消息会被丢弃(默认行为)或者配置为返回给生产者(通过设置 mandatory=true 参数)。

接下来还是 Broker 的任务,进行队列存储

  • 消息入队: 从交换机过来的消息被存入队列。队列本质上是一个 FIFO(先进先出)的缓冲区。
  • 持久化:
    • 如果队列和消息都被设置为 durable(持久化),那么消息会被写入磁盘,即使 RabbitMQ 服务器重启,消息也不会丢失。
    • 如果只有队列是持久的,而非持久化的消息在服务器重启后也会丢失。
  • 状态等待: 消息在队列中处于 “Ready” 状态,等待消费者来取走。你可以在 RabbitMQ 的管理界面中看到这个消息。

接下来就是消费者消费信息了

  • 首先会建立连接,在 Spring 启动时,注解了 @RabbitListener 的消费者会被创建长连接和通道。这些通道会持续监听指定的队列

  • 接下来进行推送消息,RabbitMQ 默认使用 推送模型 (Push API)。一旦队列中有消息,Broker 会主动通过通道将消息“推”送给空闲的消费者。另一种不常用的模式是拉取模型 basicGet,由消费者主动去“拉”消息

  • 消息交付:Broker 通过通道将消息发送给消费者。消息状态从 “Ready” 变为 “Unacked” (未确认)。这条消息不会被删除,也不会投递给其他消费者,直到收到确认。

  • 处理消息: 你的处理消息的方法被 Spring 调用。Spring 首先会帮你将消息体反序列化回 OrderMessage 对象。然后执行你的业务逻辑(打印日志、模拟处理延时)。

  • 消息确认 (Acknowledgment):

    • 这是最关键的一步。处理完成后,消费者必须告知 Broker 消息的处理结果。
    • 自动确认 (默认): 如果你的方法成功执行完毕(没有抛出异常),Spring 会自动向 Broker 发送一个 basicAck 命令。
    • 手动确认: 如果配置了 acknowledge-mode: manual,你需要在代码中显式调用 channel.basicAck()channel.basicNack()

    最终结果:

    • Ack (确认): Broker 收到 basicAck 后,才会将这条“Unacked”状态的消息从队列中永久删除。旅程成功结束。
    • Nack/Reject (否定确认/拒绝):
      • 如果你的方法抛出了异常,或者在手动模式下调用了 basicNack,Broker 会根据 requeue 参数决定下一步动作。
      • requeue = true: Broker 会将这条消息重新放回队列(或交给另一个消费者),消息状态变回 “Ready”。这可能导致消息被无限次重复处理,需要谨慎使用。
      • requeue = false: Broker 会直接丢弃消息。如果配置了死信交换机 (DLX),消息会被转移到死信队列。

AMQP的事务机制

和数据库一样,消息发送了就要保证被处理到,而 AMQP 和数据库的很相似,都是通过 “原子性操作” 确保消息的发送、处理等关键环节要么完全成功,要么完全失败,避免出现中间状态

在 RabbitMQ 中,AMQP 事务机制主要通过一系列命令交互实现,具体保证的内容及实现逻辑如下:

AMQP 事务机制保证的核心目标

  1. 消息发送的原子性 确保生产者发送的消息要么被 Broker(RabbitMQ 服务器)成功接收并持久化(如写入队列),要么完全不被接收(若事务回滚),避免 “消息已发送但 Broker 未处理” 或 “Broker 处理了但生产者误以为失败” 的情况。
  2. 消息确认的一致性 确保消费者对消息的处理状态(确认或拒绝)被 Broker 正确记录,避免 “消费者已处理但 Broker 未标记确认” 导致的消息重复投递,或 “消费者未处理但 Broker 已标记确认” 导致的消息丢失。
  3. 操作的可靠性同步 通过事务的提交与回滚机制,在网络波动、Broker 故障等异常情况下,保证客户端(生产者 / 消费者)与 Broker 的状态一致。

而 AMQP 的事务实现流程和数据库的也很类似,但是略有细节不同,所以我细说一下

AMQP 事务通过Tx.Select(开启事务)、Tx.Commit(提交事务)、Tx.Rollback(回滚事务)三组命令实现,贯穿生产者发送消息和消费者处理消息的全流程。

生产者发送消息的事务流程

生产者通过事务机制确保消息被 Broker 正确接收,步骤如下:

  • 步骤 1:开启事务 生产者通过Tx.Select命令通知 Broker “开启事务”,Broker 返回Tx.Select-OK表示事务已激活。此时,后续发送的消息不会被 Broker 立即处理,而是暂存于事务上下文。
  • 步骤 2:执行消息发送操作 生产者使用Basic.Publish命令发送消息(包含Content-HeaderContent-Body),Broker 接收到消息后暂存(不立即写入队列),等待事务提交。
  • 步骤 3:提交或回滚事务
    • 若生产者确认消息发送无误,发送Tx.Commit命令,Broker 收到后将暂存的消息正式写入队列,并返回Tx.Commit-OK,事务完成。
    • 若发送过程中出现异常(如网络中断、Broker 错误),生产者发送Tx.Rollback命令,Broker 丢弃暂存的消息,返回Tx.Rollback-OK,事务回滚,消息不被处理。

消费者处理消息的事务流程

消费者通过事务机制确保消息处理结果被 Broker 正确记录,步骤如下:

  • 步骤 1:开启事务 消费者通过Tx.Select命令开启事务,Broker 返回Tx.Select-OK,后续的消息确认 / 拒绝操作进入事务上下文。
  • 步骤 2:接收并处理消息 消费者通过Basic.Consume订阅队列,接收消息后进行业务处理(如写入数据库、调用接口等)。此时,Broker 仍将消息标记为 “未确认”,等待消费者的最终指令。
  • 步骤 3:提交或回滚事务
    • 若业务处理成功,消费者发送Basic.Ack(确认消息),再发送Tx.Commit,Broker 收到后将消息标记为 “已确认” 并从队列中移除,返回Tx.Commit-OK
    • 若业务处理失败,消费者发送Tx.Rollback,Broker 会将消息重新标记为 “未处理”(可被其他消费者再次消费),返回Tx.Rollback-OK

其实这里事务的开启还带来了一个问题,就是会出现阻塞:事务提交 / 回滚前,客户端会阻塞等待 Broker 响应,若网络延迟高,会进一步降低效率。

为弥补事务的性能缺陷,RabbitMQ 引入了 “发布确认机制”(Publisher Confirms),其通过异步确认消息发送状态,在保证可靠性的同时提升了吞吐量。但事务机制的优势在于 “严格的原子性”,适用于对数据一致性要求极高的场景(如金融交易)。

但是最为优雅的实现是 IBM 的 Websphere MQ。因为这是收费的,所以研究的人不多。它通过消息序列号保证消息不丢失、不重传。

通道为每条消息的传送分配一个序列号,它会自动累积增值。消息序列号由发送通道分配,是通道的一个永久属性,每当发送一条消息,消息序列号就加一。通道的相关属性SEQWRAP标识序号的最大值,缺省为999,999,999。序列号越界后自动归零,从头开始。

正常情况下,通道两端的消息序列号或者相等或相差为一。双方对前面的某一条或一批消息是否发送成功理解不一致。在解决了不确定的消息后,可以用MQSC命令通过重置消息序号将双方调整到一致。一旦连接断开后,通道重连时双方会将消息序号同步。

消息队列的持久化

image-20250909110158068

RabbitMQ 消息队列的持久化主要根据

持久化简单来说就是将数据存入磁盘,而不是存在内存中随服务器重启断开而消失,使数据能够永久保存。

消息持久化的目的是为了防止消息在 RabbitMQ 服务器意外重启或崩溃时丢失。这是一个“可靠性”特性。

要实现真正的消息不丢失,需要同时满足三个层次的持久化

  • 队列持久化

    • 在声明队列时,指定队列是持久化的。这样RabbitMQ 重启后,队列本身仍然存在。
    1
    2
    3
    4
    5
    @Bean
    public Queue durableQueue() {
    // 第二个参数 durable = true 表示队列持久化
    return new Queue("durable.queue", true, false, false);
    }
    • 注意:如果队列已经存在,尝试用不同的 durable 参数重新声明会报错。必须先删除已存在的队列。
  • 消息持久化

    • 在发送消息时,指定消息的交付模式为持久化。将消息体本身写入磁盘。RabbitMQ 重启后,持久化队列中的持久化消息会被恢复。

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      // 方法一:通过 MessagePostProcessor 设置(推荐,灵活)
      public void sendDurableMessage(OrderMessage message) {
      rabbitTemplate.convertAndSend("exchange", "routingKey", message, new MessagePostProcessor() {
      @Override
      public Message postProcessMessage(Message message) throws AmqpException {
      // 设置消息为持久化模式
      message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
      return message;
      }
      });
      }

      // 方法二:配置默认消息转换器(全局生效)
      // 如果你使用 Jackson2JsonMessageConverter,默认就会将消息设置为持久化。
      @Bean
      public MessageConverter jsonMessageConverter() {
      return new Jackson2JsonMessageConverter();
      }

      写入磁盘比只存入内存慢,会牺牲一些吞吐量,换来了可靠性。

  • 交换机持久化

    • 在声明交换机时,指定交换机是持久化的。RabbitMQ 重启后,交换机本身仍然存在。

      1
      2
      3
      4
      5
      @Bean
      public DirectExchange durableExchange() {
      // 第二个参数 durable = true 表示交换机持久化
      return new DirectExchange("durable.exchange", true, false);
      }

      如果交换机不存在,消息就无法被路由,所以交换机的持久化是必须的。

只有同时满足了交换机持久化队列持久化消息持久化,才能保证消息在服务器重启后不丢失。缺一不可。

但是持久化会显著增加磁盘 I/O 操作,降低消息吞吐量。应根据业务对可靠性和性能的要求进行权衡。例如,日志消息可以是非持久化的,而订单、支付消息必须是持久化的。

即使在持久化模式下,消息从接收到写入磁盘之间仍然有一个短暂的窗口期。如果 RabbitMQ 在这个时间点崩溃,消息仍然可能会丢失。对于极致可靠性的场景,需要使用 Publisher Confirms(发布者确认) 机制。

RabbitMQ的消息分发策略

再重述一次,MQ消息队列有如下几个角色:

  1. Producer:消息生产者。负责产生和发送消息到 Broker
  2. Broker:消息处理中心。负责消息存储、确认、重试等,一般其中会包含多个 queue
  3. Consumer:消息消费者。负责从 Broker 中获取消息,并进行相应处理
image-20250909110250983

消息分发策略解决的是 “当一条队列有多个消费者时,消息如何分配给这些消费者” 的问题。这是一个“效率与公平”的平衡问题。

一般的获取方式无外乎外推(push)或者(pull)两种方式,典型的git就有推拉机制,我们发送的http请求就是一种典型的拉取数据库数据返回的过程。而消息队列MQ是一种推送过程,而这些推机制会适用到很多的业务场景,也有很多对应的推机制策略

RabbitMQ 主要提供了两种核心的分发策略:

  • 轮询分发 (Round-Robin) - 默认策略

    • RabbitMQ 会依次将队列中的第 1 条消息发给消费者1,第 2 条发给消费者2,第 3 条发给消费者1,以此类推,循环往复。
    • 每个消费者处理的消息数量大致相同。而且无论消费者处理速度是快是慢,得到的消息数量都是一样的。这可能导致处理速度慢的消费者积压,而处理速度快的消费者空闲。
  • 公平分发/预取模式 (Fair Dispatch / Prefetch Count)

    • 这是更常用、更智能的策略。它通过限制每个消费者未确认(Unacked) 的消息数量来实现。

    • 核心配置prefetchCount(预取值)

      1
      2
      3
      4
      5
      6
      7
      spring:
      rabbitmq:
      listener:
      simple:
      prefetch: 1 # 每个消费者最多只能同时处理1条未确认的消息
      # 或者在消费者端注解配置
      # @RabbitListener(queues = "queue.name", ackMode = "MANUAL", concurrency = "2")
    • 工作机制

      1. 消费者连接到 Broker 时,会告知其 prefetchCount(例如 1)。
      2. Broker 会向该消费者持续发送消息,直到该消费者拥有的 “未确认”消息数达到 prefetchCount 上限。
      3. 达到上限后,Broker 将暂停向该消费者发送新消息,直到它确认了至少一条消息为止。
      4. 处理速度快的消费者确认消息更快,因此它能从 Broker 那里获得更多的新消息。
    • 特点

      • 能者多劳:将消息分发给空闲的、处理能力强的消费者,最大化整体消费效率。
      • 防止压垮:防止 Broker 一次性推送大量消息给一个慢速消费者,导致其内存溢出或崩溃。
      • 必须开启手动确认:此模式通常与 acknowledge-mode: manual 配合使用。
    • prefetchCount 值的设置

      • 1: 最保守的设置。消费者处理完一条,再获取下一条。能实现严格的公平分发,但可能限制吞吐量。
      • >1 (例如 10, 50, 100): 提高吞吐量。允许消费者有一个小的“缓存”,在处理上一条消息的同时,预取下一条消息,减少网络往返开销。值的大小取决于消费者的处理能力和消息的大小。
      • 0: 表示无限制。相当于回退到简单的轮询模式,可能压垮消费者。

消息分发策略的机制和对比

ActiveMQ RabbitMQ Kafka RocketMQ
发布订阅 支持 支持 支持 支持
轮询分发 支持 支持 支持 /
公平分发 / 支持 支持 /
重发 支持 支持 / 支持
消息拉取 / 支持 支持 支持

RabbitMQ的高可用和高可靠

所谓高可用:是指产品在规定的条件和规定的时刻或时间内处于可执行规定功能状态的能力。

当业务量增加时,请求也过大,一台消息中间件服务器的会触及硬件(CPU、内存、磁盘)的极限,一台消息服务器你已经无法满足业务的需求,所以消息中间件必须支持集群部署,来达到高可用的目的。

所谓高可靠 (Reliability):关注的是单条消息的安全性。确保消息不丢失不重复按顺序**地被处理。它解决的是“数据一致性”问题。

两者相辅相成。高可用是为高可靠提供基础保障的,如果一个系统频繁宕机,消息可靠性就无从谈起。

高可靠 (Reliability) - 确保消息不丢失

消息的旅程是 Producer -> Broker -> Consumer,高可靠机制需要在这三个环节都做好保障。

生产者端可靠性 (Producer -> Broker)

防止消息在发送过程中丢失。

事务机制

  • 机制:类似于数据库事务,通过 channel.txSelect(), channel.txCommit(), channel.txRollback() 实现。
  • 缺点同步阻塞,吞吐量会下降 200-300 倍,性能极差,生产环境极少使用。

发布者确认 (Publisher Confirms) - 生产标准

  • 机制:一种异步、轻量的确认机制。

    1. 生产者将信道设置为 confirm 模式 (channel.confirmSelect())。
    2. 所有在该信道上发布的消息都会被分配一个唯一的 ID。
    3. 消息被 Broker 接收后,Broker 会异步发送一个 basic.ack 确认信号给生产者(包含消息 ID)。
    4. 如果消息无法路由到队列(例如,没有匹配的绑定),Broker 会发送一个 basic.nack 信号。
  • 优点:异步非阻塞,性能极高。

  • 代码示例

    1
    2
    3
    4
    spring:
    rabbitmq:
    publisher-confirm-type: correlated # 开启相关性确认
    publisher-returns: true # 开启返回模式(用于处理不可路由的消息)
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    // 设置回调
    rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
    if (ack) {
    log.info("消息已成功到达Broker, ID: {}", correlationData.getId());
    } else {
    log.error("消息未能到达Broker, 原因: {}", cause);
    // 重发或记录日志
    }
    });

    // 处理不可路由的消息
    rabbitTemplate.setReturnsCallback(returned -> {
    log.error("消息被Broker退回! msg: {}, replyCode: {}, replyText: {}",
    new String(returned.getMessage().getBody()),
    returned.getReplyCode(),
    returned.getReplyText());
    });

    // 发送消息时携带关联数据
    rabbitTemplate.convertAndSend(exchange, routingKey, message,
    new CorrelationData(orderId.toString()));

Broker 端可靠性 - 持久化铁三角

防止消息在 RabbitMQ 服务器内部丢失(如重启、崩溃)。

  1. 交换机持久化new DirectExchange("my.exchange", true, false);
  2. 队列持久化new Queue("my.queue", true, false, false);
  3. 消息持久化message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);

必须三者同时配置,才能保证消息在服务器重启后不丢失。

消费者端可靠性 (Broker -> Consumer)

防止消息在消费过程中丢失。

  • 手动确认 (Manual Acknowledgement) - 生产标准

    • 机制:消费者在处理完业务逻辑后,手动向 Broker 发送确认 (basicAck)。处理失败则拒绝 (basicNack)。

    • 配置

      1
      2
      3
      4
      5
      spring:
      rabbitmq:
      listener:
      simple:
      acknowledge-mode: manual # 关闭自动ACK,开启手动ACK
    • 代码示例

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      @RabbitListener(queues = "my.queue")
      public void handleMessage(OrderMessage message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) {
      try {
      // 1. 处理业务逻辑
      processOrder(message);
      // 2. 处理成功,手动ACK
      channel.basicAck(deliveryTag, false); // false: 不批量确认
      } catch (BusinessException e) {
      // 3. 业务处理失败(可重试异常),重新入队
      channel.basicNack(deliveryTag, false, true); // requeue=true
      } catch (Exception e) {
      // 4. 系统异常(不可重试异常),不入队,可记录日志或送入死信队列
      channel.basicNack(deliveryTag, false, false); // requeue=false
      }
      }
  • 死信队列 (DLX - Dead Letter Exchange)

    • 作用:处理那些被消费者拒绝且不重新入队 (requeue=false) 的消息,或者消息过期、队列满等情况的消息。

    • 机制:为一个正常队列设置死信交换机和路由键。当该队列中的消息成为“死信”时,会自动被转发到死信交换机,进而路由到死信队列,由专门的消费者进行后续处理(如分析、报警、人工干预)。

    • 配置

      1
      2
      3
      4
      5
      6
      7
      @Bean
      public Queue businessQueue() {
      Map<String, Object> args = new HashMap<>();
      args.put("x-dead-letter-exchange", "dlx.exchange"); // 指定死信交换机
      args.put("x-dead-letter-routing-key", "dlx.key"); // 指定死信路由键
      return new Queue("business.queue", true, false, false, args);
      }

高可用 (Availability) - 确保服务不中断

高可用主要通过集群镜像队列 来实现。

这也就是为什么 RabbitMQ 早期版本的需要和 Zookeeper 一起下

普通集群模式

  • 目标:提高吞吐量和性能,不是真正的高可用
  • 机制
    • 多个 RabbitMQ 节点组成一个集群。
    • 队列的元数据(名称、属性、绑定信息)在所有节点之间同步。
    • 队列的消息内容只存在于创建该队列的节点上(主节点)。其他节点(从节点)仅持有该队列的指针。
  • 问题:如果主节点宕机,队列消息无法访问,直到主节点恢复。消费者无法消费,生产者也无法向该队列发送消息。服务会中断。
image-20250909143659620

镜像队列模式 (Mirrored Queues) - 生产高可用标准

类似于Master-slave主从共享数据的部署方式

  • 目标:实现真正的高可用,防止单点故障。
  • 机制
    • 队列中的消息内容会在集群中的所有节点或多个节点之间进行镜像同步
    • 每个镜像队列都有一个主节点 (Master) 和若干个镜像节点 (Mirror)
    • 所有客户端的操作(生产、消费)都首先在主节点上执行,然后主节点将操作结果广播给所有镜像节点。
  • 优点
    • 故障转移 (Failover):如果主节点宕机,资历最老的镜像节点会自动提升为新的主节点,继续提供服务。整个过程对客户端是透明的(客户端可能需要配置自动重连机制)。
    • 服务不中断:只要集群中还有一个节点存活,队列就可用。
  • 策略配置
    • 通过 policy 来为匹配的队列设置镜像策略。
image-20250909143338245

多活集群

  • 目标:实现跨机房、跨地域的高可用和灾难恢复。
  • 机制
    • Federation:基于 AMQP 协议,允许一个交换机和队列从另一个 Broker 接收消息。更适合持续性的数据同步。
    • Shovel:类似于一个移动消息的“铲子”,可以配置为将消息从一个 Broker 的队列中取出,然后放入另一个 Broker 的交换机上。更适合一次性或定期的数据迁移。
  • 应用场景:异地多活、数据备份、迁移。

RabbitMQ 的几个模式详解

RabbitMQ 基于 AMQP 协议设计了多种灵活的消息传递模式,不同模式对应不同的业务场景,核心区别在于消息路由方式消费者接收规则

简单模式(Simple Mode)

核心特点

  • 最基础的消息传递模式,由单个生产者、单个消费者和一个队列组成。
  • 生产者直接将消息发送到队列,消费者从队列中获取消息,RabbitMQ 仅作为 “消息代理” 转发消息,无需交换机(使用默认交换机AMQP default)。

工作流程

  1. 生产者创建消息,指定目标队列名称,发送至 RabbitMQ;
  2. 消息被存储在队列中,等待消费者连接;
  3. 消费者连接队列并获取消息,处理完成后消息从队列中删除。

应用场景

一对一的简单通信场景,如:

  • 邮件发送:系统将待发送邮件放入队列,邮件服务从队列中取出并发送;
  • 即时聊天:单用户之间的消息传递(如点对点聊天)。

工作模式(Work Queue Mode)

核心特点

  • 一个生产者对应多个消费者,但一条消息仅被一个消费者处理(避免重复消费)。
  • 采用轮询分发策略:RabbitMQ 将消息平均分配给所有消费者(不考虑消费者处理能力差异)。
  • 消费者需手动确认消息auto-ack=false),处理完一条消息后才会接收下一条(避免消息堆积)。

工作流程

  1. 生产者向队列发送大量消息(如任务指令);
  2. 多个消费者同时订阅该队列,RabbitMQ 按顺序将消息轮流分配给每个消费者;
  3. 消费者处理完消息后发送Basic.Ack确认,队列删除该消息并推送下一条。

应用场景

  • 任务分发与并行处理, “任务量大、耗时较长” 的场景,如:
    • 日志处理:多台服务器同时处理日志文件;
    • 订单处理:多个订单服务节点分担订单创建、支付验证等任务,提高处理速度。

发布订阅模式(Publish/Subscribe Mode)

核心特点

  • 生产者发送的消息会被所有订阅的消费者接收(一对多通信)。
  • 依赖Fanout 交换机(扇出交换机):交换机收到消息后,会将其广播到所有与之绑定的队列(忽略路由键)。
  • 每个消费者需绑定独立队列,确保消息被各自接收。

工作流程

  1. 生产者将消息发送到 Fanout 交换机,并指定交换机名称;
  2. 交换机将消息复制到所有绑定的队列(每个队列对应一个消费者);
  3. 所有消费者同时从各自队列中获取消息并处理。

应用场景

  • 消息需要被多角色接收的场景,如:
    • 通知推送:用户充值成功后,系统同时发送短信、邮件、APP 推送通知;
    • 日志广播:分布式系统中,多节点同时接收并处理核心日志(如错误日志)。

路由模式(Routing Mode)

核心特点

  • 消息通过路由键(Routing Key) 精准路由到指定队列,只有匹配路由键的消费者才能接收消息。
  • 依赖Direct 交换机(直连交换机):交换机根据消息的路由键与队列的绑定键(Binding Key)完全匹配来分发消息。

工作流程

  1. 队列与 Direct 交换机绑定,同时指定绑定键(如order.pay);
  2. 生产者发送消息时,指定目标交换机和路由键(如order.pay);
  3. 交换机对比路由键与绑定键,仅将消息投递到绑定键完全匹配的队列,对应消费者接收消息。

应用场景

  • 消息需要按类型精准分发的场景,如:
    • 商品促销:不同促销活动(如 “iPhone13 促销”“华为 Mate50 促销”)绑定不同路由键,仅对应活动接收相关消息;
    • 日志分级:error类型日志路由到告警队列,info类型日志路由到普通日志队列。

主题订阅模式(Topics Mode)

核心特点

  • 基于通配符匹配路由键,支持更灵活的消息分发(扩展了路由模式的精准匹配)。
  • 依赖Topic 交换机(主题交换机):支持*(匹配单个单词)和#(匹配多个单词,包括零个)通配符,路由键需以.分隔(如order.pay.success)。

工作流程

  1. 队列与 Topic 交换机绑定,绑定键使用通配符(如order.#匹配所有订单相关消息);
  2. 生产者发送消息时,指定包含多个单词的路由键(如order.refund.failure);
  3. 交换机通过通配符匹配路由键与绑定键,将消息投递到所有匹配的队列。

应用场景

  • 消息需要按 “多级分类” 分发的场景,如:
    • 商品消息:electronics.iphone路由键的消息可被绑定electronics.*(匹配单个子分类)或#.iphone(匹配所有 iPhone 相关)的队列接收;
    • 业务日志:user.loginuser.register消息可被绑定user.*的队列统一处理。

还有模式吗,我不记得了,也没找到,但是这五个覆盖了从 “一对一” 到 “一对多”、从 “精准路由” 到 “灵活匹配” 的各类场景了已经

RPC远程过程调用模式

通常我们认为消息队列是异步的:生产者发送消息后不会立即等待结果。而 RPC 是同步的:客户端发起请求后,会阻塞并等待服务器的响应。

RabbitMQ RPC 模式的巧妙之处在于,它使用异步的消息机制模拟了同步的调用过程。其核心工作流程如下

sequenceDiagram
    participant C as RPC Client
    participant Q_Request as 请求队列
    participant S as RPC Server
    participant Q_Reply as 回调队列

    Note right of C: 1. 准备调用
    C->>C: 生成唯一 CorrelationId<br>创建临时回调队列

    Note right of C: 2. 发送请求
    C->>Q_Request: 发送请求消息<br>(包含回调队列地址和CorrelationId)
    
    Note right of S: 3. 监听并处理
    S->>Q_Request: 监听并获取请求
    S->>S: 执行任务/计算
    S->>Q_Reply: 发送响应消息<br>(携带相同的CorrelationId)

    Note right of C: 4. 等待并匹配响应
    C->>Q_Reply: 监听回调队列
    Q_Reply->>C: 接收到响应
    C->>C: 根据CorrelationId匹配请求

RPC的特点可以这样描述

  • 异步通信但模拟同步调用:底层基于消息队列异步传递,但客户端会 “阻塞等待” 响应(或通过回调机制等待),体验类似本地同步函数调用。
  • 解耦与可扩展性:客户端无需知道服务端的具体地址(只需知道请求队列名),服务端可横向扩展(多个服务端实例监听同一个请求队列,实现负载均衡)。
  • 消息可追溯:通过 correlation_id 确保请求与响应一一对应,避免 “响应丢失” 或 “响应错乱”。
  • 临时队列优化资源:客户端的响应队列是临时的,避免长期占用队列资源,减少运维成本。

首先需要知道什么是临时队列

每个客户端都需要一个唯一的回调队列,且最好能自动清理。所以客户端在启动时声明一个排他的、自动删除的临时队列用于接收消息,等待并处理服务端返回的 “响应消息”。

1
2
3
4
5
6
@Bean
public Queue temporaryReplyQueue() {
// exclusive=true: 排他队列,仅限本连接使用,连接关闭后自动删除
// autoDelete=true: 自动删除
return new Queue("temporary.reply.queue", false, true, true);
}

那么回调队列是什么,就是服务器需要知道将响应发送到哪里,然后客户端在发送请求时,在消息属性中指定一个 reply_to 地址,通常是客户端独占的、临时的队列

1
2
3
4
5
6
// 客户端发送请求时设置
rabbitTemplate.convertAndSend("rpc_exchange", "rpc_routing_key", request, message -> {
message.getMessageProperties().setReplyTo(replyQueueName); // 设置回调队列名
message.getMessageProperties().setCorrelationId(correlationId); // 设置关联ID
return message;
});

然后服务端监听 请求队列,持续接收客户端的请求消息,将处理结果封装为 “响应消息”,发送到客户端指定的 临时响应队列

流程可以拆解为如下:

  1. 客户端初始化
    • 声明一个 临时独占队列exclusive=True,客户端断开连接后自动删除),作为自己的响应队列。
    • 绑定该临时队列到默认交换机("",RabbitMQ 内置的直连交换机),无需额外绑定规则(直连交换机按队列名匹配)。
  2. 客户端发送请求
    • 生成一个唯一的 correlation_id(如 uuid.uuid4().hex)。
    • 构造请求消息,设置两个关键属性:
      • reply_to:临时响应队列的名称(告知服务端 “往这回传结果”)。
      • correlation_id:请求的唯一标识(用于后续匹配响应)。
    • 将请求消息发送到 服务端的请求队列(如 rpc_queue)。
  3. 服务端处理请求
    • 服务端启动时,预先声明 请求队列rpc_queue),并注册消息监听器(持续监听该队列的消息)。
    • 接收到请求消息后,解析消息内容(如需要计算的参数),执行业务逻辑(如计算斐波那契数列、查询数据库)。
    • 处理完成后,构造响应消息,将客户端发送的 correlation_id 原样放入响应消息的 correlation_id 属性中。
    • 通过默认交换机,将响应消息发送到 reply_to 指定的临时响应队列。
  4. 客户端接收响应
    • 客户端监听自己的临时响应队列,当收到响应消息时,提取其 correlation_id
    • 比对该 correlation_id 与自己发起请求时生成的 correlation_id:若匹配,则确认是当前请求的结果,处理响应数据;若不匹配,则忽略(避免接收其他请求的响应)。