柔性事务

柔性事务的分类

在电商领域等高并发的互联网场景下,刚性事务在数据库性能和处理能力上都暴露出了瓶颈。

柔性事务有两个特性:基本可用和柔性状态。

  • 基本可用是指分布式系统出现故障的时候允许损失一部分的可用性。
  • 柔性状态是指允许系统存在中间状态,这个中间状态不会影响系统整体的可用性,比如数据库读写分离的主从同步延迟等。柔性事务的一致性指的是最终一致性。

柔性事务主要分为补偿型通知型,其中,补偿型事务都是同步的,通知型事务都是异步的。

补偿型事务通过 “正向执行 + 逆向补偿” 的机制保证最终一致性,属于同步事务(执行过程中需要等待各环节完成),典型子类型有 TCCSaga

  • 补偿型事务又分:TCC、Saga;

通知型事务通过异步消息传递保证最终一致性,属于异步事务(执行过程中无需同步等待,通过消息队列或重试机制完成),典型子类型有 MQ 事务消息最大努力通知型

  • 通知型事务分:MQ事务消息、最大努力通知型。

补偿型事务

补偿型号事务的设计理念以 “正向执行 + 逆向补偿” 为核心,通过主动回滚已完成的操作来保证最终一致性,属于同步事务(执行过程中需等待各环节完成)。

补偿型事务将长事务拆分为多个可独立执行、可补偿的子操作。

每个子操作都需定义 “正向执行逻辑” 和 “逆向补偿逻辑”(如 “扣款” 的补偿是 “退款”,“扣库存” 的补偿是 “加库存”)。

若任意子操作失败,会按逆序执行所有已完成子操作的补偿逻辑,回滚整个事务。这是其一致性的保障。

补偿型事务通常出现在对强一致性要求较高的业务,如金融交易(转账、支付)、电商核心订单流程(下单 - 扣款 - 发货需严格一致)。而且通常是跨系统的复杂业务流程,如供应链采购(下单、付款、生产、物流等环节需联动回滚)。

通知型事务

以 “异步消息传递 + 重试机制” 为核心,通过异步通信保证最终一致性,属于异步事务(执行过程中无需同步等待,依赖消息队列或重试完成)。

主流实现是通过MQ(消息队列)来通知其他事务参与者自己事务的执行状态,引入MQ组件,通过消息队列(MQ)或异步通知机制传递事务指令,有效的使得发起方和接收方解耦,各参与者都可以异步执行,所以通知型事务又被称为异步事务

其中若接收方未正确处理,通过 “多次重试” 保障送达;接收方需实现幂等性(同一消息多次处理结果一致)。它不追求实时一致,允许短暂中间状态,最终通过异步流程达成一致。

通知型事务主要适用于那些需要异步更新数据,并且对数据的实时性要求较低的场景,主要包含:异步确保型事务最大努力通知事务两种。

  • 异步确保型事务:主要适用于内部系统的数据最终一致性保障,因为内部相对比较可控,如订单和购物车、收货与清算、支付与结算等等场景;
  • 最大努力通知:主要用于外部系统,因为外部的网络环境更加复杂和不可信,所以只能尽最大努力去通知实现数据最终一致性,比如充值平台与运营商、支付对接等等跨网络系统级别对接;
image-20251103203554686

一般在跨系统异步通信场景,如订单生成后异步通知仓储系统发货、支付成功后异步通知积分系统加分等业务会出现这种事务。

异步确保型事务

异步确保型事务是通知型柔性事务的一种,核心是通过异步消息传递实现分布式系统的最终一致性,适用于对实时性要求低、但需保障数据最终一致的内部系统场景(如电商订单与积分、支付与结算等)。

异步确保型事务将一系列同步的事务操作修改为基于消息队列异步执行的操作,来避免分布式事务中同步阻塞带来的数据操作性能的下降。

核心特性包括:

  • 异步执行:事务发起方与参与方通过消息队列解耦,无需同步等待。
  • 最终一致性:允许短暂中间状态,通过重试、幂等机制确保数据最终一致。

MQ事务消息方案

基于MQ的事务消息方案主要依靠MQ的半消息机制来实现投递消息和参与者自身本地事务的一致性保障,又有 2PC 的二阶段提交来确保本地事务与消息投递的原子性

半消息机制实现原理其实借鉴的2PC的思路,是二阶段提交的广义拓展。

半消息是在原有队列消息执行后的逻辑,如果后面的本地逻辑出错,则不发送该消息,如果通过则告知MQ发送;

流程图如下

image-20251103204244040

详细分析流程中的每一步:

  1. 首先,事务发起方向 MQ 发送 “半消息”(对消费者不可见)。
  2. 如果半消息发送正常,MQ通知发送方消息发送成功;
  3. 在发送半消息成功后,发起方执行本地业务,并记录事务状态。
  4. 确认消息的投递,若本地事务成功,通知 MQ 提交半消息(消费者可见);若失败,通知 MQ 回滚(丢弃消息)。
  5. 如果消息是rollback, MQ将丢弃该消息不投递;如果是commit,MQ将会消息发送给消息订阅方;
  6. 订阅方收到消息后,根据消息执行本地事务
  7. 订阅方如果执行本地事务成功后,再从MQ中将该消息标记为已消费;
  8. 如果执行本地事务过程中,执行端挂掉,或者超时,MQ服务器端将不停的询问producer来获取事务状态,直到消费成功或达到上限。
  9. Consumer 端的消费成功机制有MQ保证

举个例子,假设存在业务规则:某笔订单成功后,为用户加一定的积分。

在这条规则里,管理订单数据源的服务为事务发起方,管理积分数据源的服务为事务跟随者。

从这个过程可以看到,基于消息队列实现的事务存在以下操作:

  • 订单服务创建订单,提交本地事务
  • 订单服务发布一条消息
  • 积分服务收到消息后加积分

我们可以看到它的整体流程是比较简单的,同时业务开发工作量也不大:

  • 编写订单服务里订单创建的逻辑
  • 编写积分服务里增加积分的逻辑

可以看到该事务形态过程简单,性能消耗小,发起方与跟随方之间的流量峰谷可以使用队列填平,同时业务开发工作量也基本与单机事务没有差别,都不需要编写反向的业务逻辑过程

因此基于消息队列实现的事务是我们除了单机事务外最优先考虑使用的形态。

基于阿里 RocketMQ实现MQ异步确保型事务

首先,有一些第三方的MQ是支持事务消息的,这些消息队列,支持半消息机制,比如RocketMQ,ActiveMQ。但是有一些常用的MQ也不支持事务消息,比如 RabbitMQ 和 Kafka 都不支持。

image-20251104103134036

以阿里的 RocketMQ 中间件为例,其思路大致为(以 “订单服务 A 通知积分服务 B 增加积分为例”继续上面的例子)

步骤 1:发送半消息(Producer 端:A 系统发起)

  • 操作:订单服务 A 作为 Producer,向 RocketMQ Broker 发送一条 “半消息”(Half Message)。
    • 半消息包含完整业务内容(如 “订单 ID:1001,用户 ID:2001,需加积分:100”),但消息属性被标记为 “半消息”,存储在 Broker 内置的特殊 Topic RMQ_SYS_TRANS_HALF_TOPIC 中,且 queueId 固定为 0。
    • 此时消息对消费者(积分服务 B)完全不可见(因消费者订阅的是业务 Topic,而非 RMQ_SYS_TRANS_HALF_TOPIC),避免 B 提前消费未确认的消息。
  • 目标:先确保消息在 Broker 中可靠存储,为后续与本地事务绑定做准备。

步骤 2:Broker 确认半消息存储(Broker 端响应)

  • 操作:Broker 收到半消息后,按普通消息的存储逻辑写入磁盘(确保持久化),并向 A 系统返回 “存储成功” 的响应(如 SendResult)。
  • 关键:只有确认半消息成功存储后,A 系统才会执行本地事务,避免 “本地事务执行成功但消息丢失” 的不一致。

步骤 3:执行本地事务(Producer 端:A 系统核心业务)

  • 操作:A 系统收到 Broker 的 “存储成功” 响应后,执行本地核心业务(如创建订单),并在同一数据库事务中记录本地事务状态(成功 / 失败)。
    • 例:A 系统在数据库中执行 INSERT INTO orders (id, user_id, amount) VALUES (1001, 2001, 500),同时记录事务日志 trans_log (msg_id, status) 为 “处理中”。
  • 目标:将 “半消息发送” 与 “本地业务执行” 通过 “先存消息、再执行业务” 的顺序绑定,确保业务执行前消息已安全落地。

步骤 4:提交 / 回滚半消息(Producer 端:通知 Broker 最终状态)

  • 操作:A 系统根据本地事务结果,向 Broker 发送 “提交” 或 “回滚” 指令(Oneway 方式,即发送后不等待响应,提升性能)。
    • 若本地事务成功:发送 “提交” 指令,Broker 需将半消息从 RMQ_SYS_TRANS_HALF_TOPIC 迁移到业务 Topic(如 order积分通知Topic),此时消息对 B 系统可见。
    • 若本地事务失败:发送 “回滚” 指令,Broker 直接丢弃 RMQ_SYS_TRANS_HALF_TOPIC 中的半消息,不通知 B 系统。
  • 风险点:Oneway 指令可能丢失(如网络故障),导致 Broker 无法收到最终状态,需依赖后续 “反查机制” 兜底。

步骤 5:Broker 处理最终状态(Broker 端状态流转)

  • 操作:Broker 收到 “提交 / 回滚” 指令后,从RMQ_SYS_TRANS_HALF_TOPIC中找到对应半消息,执行如下操作:
    • 提交:复制半消息到业务 Topic 的对应队列(消息内容不变,仅变更存储位置),此时 B 系统可通过订阅业务 Topic 消费该消息。
    • 回滚:直接删除 RMQ_SYS_TRANS_HALF_TOPIC 中的半消息,不做其他处理。
  • 目标:确保只有本地事务成功的消息才会被消费者处理,避免 “业务失败但消息被消费” 的不一致。

步骤 6:消息反查机制(Broker 兜底校验)

  • 触发场景:若 A 系统的 “提交 / 回滚” 指令丢失(如 A 系统宕机、网络中断),Broker 中 RMQ_SYS_TRANS_HALF_TOPIC 的半消息会处于 “超时未完成” 状态(默认超时时间 60 秒)。
  • 操作:Broker 启动TransactionalMessageCheckService定时任务(默认每 1 分钟执行一次),扫描超时半消息,向对应的 A 系统发送 “反查请求”(通过 Producer 预设的checkLocalTransaction接口)。
    • A 系统收到反查请求后,查询本地事务日志(如 trans_log),返回实际状态(成功 / 失败)。
    • Broker 根据反查结果,重复步骤 5 的 “提交 / 回滚” 操作;若反查失败(如 A 系统仍不可用),则后续继续反查(默认最多 15 次,可配置)。
  • 目标:解决 “指令丢失” 导致的消息状态悬而未决问题,确保最终一致性。

步骤 7:消费者消费消息(Consumer 端:B 系统处理)

  • 操作:B 系统订阅业务 Topic,消费到 “已提交” 的消息后,执行本地事务(如增加积分:UPDATE user_points SET points = points + 100 WHERE user_id = 2001)。至于B是否能消费成功,消费失败是否重试,这属于正常消息消费需要考虑的问题
  • 可靠性保障:
    • 重试机制:若 B 系统消费失败(如数据库宕机),RocketMQ 会将消息放入重试队列(%RETRY%+消费者组名),间隔一定时间(如 1s、5s、10s…)重新投递,直到消费成功或达到最大重试次数(默认 16 次)。
    • 幂等性处理:B 系统需实现幂等(如通过订单 ID 作为唯一键,避免重复加积分),因重试可能导致消息重复消费。
  • 目标:确保跨系统的业务操作最终执行成功,达成全局最终一致性。

也就是说,半消息机制确保了,消息先落地,再执行业务,避免消息丢失;而且反查机制解决 “指令丢失” 的极端情况,兜底了保障状态的一致;

最终,整个流程通过异步化设计(无需 A 系统同步等待 B 系统处理结果),在保证最终一致性的同时,兼顾了高吞吐和低延迟,非常适合电商、支付等核心场景的跨系统联动。

有这么一段代码

image-20251104103520703

这段代码的核心是 Producer 端通过 TransactionListener 接口,实现 “本地事务执行” 和 “反查状态” 的回调逻辑,是 RocketMQ 事务消息的核心

TransactionListener 接口提供两个核心方法

其中,executeLocalTransaction 方法在Broker 确认 “半消息” 存储成功后,主动回调该方法。然后在这个方法里执行本地业务事务(比如订单服务的 “创建订单” 操作)。它会返回这样的信息,LocalTransactionState.COMMIT_MESSAGE(本地事务成功,通知 Broker 提交半消息)、LocalTransactionState.ROLLBACK_MESSAGE(本地事务失败,通知 Broker 回滚半消息)。

其中,checkLocalTransaction 方法在Broker 发起消息反查时(比如半消息超时未处理),主动回调该方法。在这个方法里查询本地事务的最终状态(比如查数据库里订单是否真的创建成功)。返回同样返回 COMMIT_MESSAGEROLLBACK_MESSAGE,给 Broker 一个明确的 “最终判决”。

TransactionMQProducer 是RocketMQ 专门用于事务消息的 Producer 类,需要指定生产者组(如代码中的 testTransactionMsg),并绑定上面的 TransactionListener 回调。发送事务消息时,调用 sendMessageInTransaction 方法,触发整个 “半消息发送 → 本地事务执行 → 状态回调” 的流程。

image-20251104103948712

这张架构图完整呈现了从Producer 发送半消息Consumer 消费消息的全流程,我们可以按步骤拆解:

  1. 生产者(Producer)侧流程

    • 步骤 1:初始化与发送半消息
      • 系统启动时,初始化 TransactionMQProducer(指定生产者组),并绑定 TransactionListener
      • 调用 sendMessageInTransaction 发送 “半消息” 到 Broker 的 RMQ_SYS_TRANS_HALF_TOPIC(半消息队列,消费者不可见)。
    • 步骤 2:执行本地事务(executeLocalTransaction 回调)
      • Broker 确认半消息存储成功后,回调 executeLocalTransaction 方法,执行本地业务(如 “创建订单” 并记录事务状态为 “处理中”)。
      • 根据本地事务结果,返回 COMMITROLLBACK 状态给 Broker。
    • 步骤 3:消息反查(checkLocalTransaction 回调)
      • 若 Broker 长时间未收到步骤 2 的状态(比如网络故障),会启动定时任务扫描半消息队列,发起反查请求
      • 回调 checkLocalTransaction 方法,查询本地事务的最终状态(如查数据库确认订单是否真的创建成功),再次返回 COMMITROLLBACK
  2. Broker 侧流程

    • 半消息暂存与状态流转
      • 半消息先存在RMQ_SYS_TRANS_HALF_TOPIC中,根据 Producer 返回的状态,执行:
        • COMMIT:将半消息复制到真实业务 Topic(如 order积分通知Topic),此时消费者可见。
        • ROLLBACK:直接删除半消息,不通知消费者。
        • UNKNOWN(反查也失败):将半消息放入延迟队列SCHEDULE_TOPIC_),等待后续再次反查。
    • 重试与兜底机制
      • 若消费者消费失败,消息会进入重试队列%RETRY%+消费者组名),间隔一段时间后重新投递。
      • 若重试多次仍失败,最终进入死信队列%DLQ%+消费者组名),由人工介入处理。
  3. 消费者(Consumer)侧流程

    • 消费者启动时,订阅真实业务 Topic,注册消息监听器(如 MessageListener)。

    • 消费到消息后,执行本地业务(如 “增加用户积分”),并返回消费状态:

      • CONSUME_SUCCESS:消费成功,Broker 确认消息已处理。
      • RECONSUME_LATER:消费失败,Broker 将消息放入重试队列,后续重试。

本地表方案

本地消息表方案是实现分布式事务最终一致性的经典方案,在不依赖 MQ 事务消息特性的情况下,解决跨系统间的数据一致性问题

有时候我们目前的MQ组件并不支持事务消息,我这个项目里面都用了很多的 Kafka 的内容了,那我干嘛非要加个 RocketMQ,我们想尽量少的侵入业务方。这时我们需要另外一种方案“基于DB本地消息表“。

本地消息表最初由eBay 提出来解决分布式事务的问题。是目前业界使用的比较多的方案之一,它的核心思想就是将分布式事务拆分成本地事务进行处理。

一句话,通过 “数据库本地事务 + 消息表轮询” 实现异步一致性。

本地消息表的核心思路是:将跨系统的分布式事务,转化为单个系统的本地事务 + 异步消息通信

  • 用 “本地数据库事务” 保证 “业务操作” 和 “消息记录” 的原子性(要么都成功,要么都失败);
  • 用 “消息表 + 定时任务” 保证消息可靠投递到 MQ,再由 MQ 通知接收方处理;
  • 用 “消费重试 + 幂等性” 保证接收方最终能正确处理消息,达成全局一致。

本地消息表的流程是这样的

image-20251104104242606

简单分析一下上述的流程:以 “订单服务 A 通知物流服务 B 发货” 为例

阶段 1:生产方(A 系统)的本地事务与消息记录

这一步是整个方案的核心,通过 “本地事务” 确保业务和消息的绑定。

  1. 开启本地数据库事务:订单服务 A 在处理 “创建订单” 业务时,先开启数据库事务(如 MySQL 的BEGIN)。

  2. 执行核心业务逻辑:在事务中完成订单创建(如INSERT INTO orders (id, user_id, status) VALUES (1001, 2001, '已下单'))。

  3. 写入本地消息表:同一事务中,向 A 系统的本地消息表插入一条 “待发送” 消息(消息表与订单表在同一数据库)

    1
    2
    3
    4
    5
    6
    7
    INSERT INTO local_message (
    message_id, -- 唯一消息ID(如UUID)
    topic, -- 消息主题(如“订单发货通知”)
    content, -- 消息内容(如“订单1001需发货”)
    status, -- 消息状态(0:待发送,1:已发送,2:已确认,3:失败)
    create_time -- 创建时间
    ) VALUES ('msg-123', 'order_delivery', '{"orderId":1001}', 0, NOW())
  4. 提交本地事务:若业务和消息表插入都成功,提交事务(COMMIT);任意一步失败,回滚事务(ROLLBACK),确保 “订单创建成功” 和 “消息记录成功” 必须同时发生。

阶段 2:生产方(A 系统)的消息投递与重试

确保消息从本地消息表可靠发送到 MQ,避免消息 “躺在表里不出去”。

  1. 独立线程扫描消息表:A 系统启动一个定时任务(如每 10 秒一次),专门扫描消息表中 “状态为 0(待发送)” 或 “状态为 3(发送失败)且重试次数未达上限” 的消息。
  2. 投递消息到 MQ:对扫描到的消息,调用 MQ 的发送接口(如producer.send()),将消息发送到指定 Topic(如 “order_delivery”)。
  3. 根据 MQ 反馈更新消息状态:
    • 若 MQ 返回发送成功(ACK):更新消息表状态为 “1(已发送)”(UPDATE local_message SET status=1 WHERE message_id='msg-123')。
    • 若发送失败(如网络超时):不更新状态(保持 0 或 3),等待下次定时任务重试(可记录重试次数,超过上限标记为 “死信”)。

阶段 3:消费方(B 系统)的消息处理与反馈

确保接收方正确处理消息,并通知生产方 “已处理完成”。

  1. 消费 MQ 消息:物流服务 B 订阅 “order_delivery” Topic,收到消息后解析内容(如 “订单 1001 需发货”)。
  2. 执行本地业务逻辑:B 系统在自己的本地事务中处理业务(如创建物流单:INSERT INTO logistics (id, order_id, status) VALUES (3001, 1001, '待发货'))。
  3. 处理结果反馈:
    • 成功:B 系统向 MQ 发送 “消费成功” 的确认(如 RocketMQ 的CONSUME_SUCCESS),同时可向 A 系统发送 “处理完成” 的通知(如调用 A 的 API,或通过另一条消息)。A 系统收到后,将消息表状态更新为 “2(已确认)”。
    • 失败:
      • 若为临时失败(如数据库忙):B 系统返回 “重试”(如RECONSUME_LATER),MQ 会将消息放入重试队列,间隔一段时间后重新投递。
      • 若为业务失败(如订单不存在):B 系统向 A 系统发送 “补偿消息”(如调用 A 的回滚接口),A 系统收到后执行订单回滚(如UPDATE orders SET status='已取消'),并将消息表状态标记为 “3(失败)”。

阶段 4:兜底机制(定时对账与补单)

解决极端情况下的消息丢失或处理失败(如 A 的投递线程挂了、B 处理成功但没通知 A 这种)。

  1. 生产方定时检查 “已发送未确认” 消息:A 系统的定时任务扫描 “状态为 1(已发送)且超过超时时间(如 30 分钟)” 的消息,这些消息可能是 B 处理失败或通知丢失导致的。
  2. 消费方定时检查本地业务完整性:B 系统定时扫描 “未处理” 的物流单(如状态为 “待发货” 超过 1 小时),主动向 A 系统查询对应订单的消息状态。
  3. 自动补单或人工介入:若发现消息未处理,触发重新投递或人工核对,确保最终一致性。不要小瞧这一步,不少苦逼的错误都是手动去介入的

因为业务操作和消息记录在同一个本地事务中,只要订单创建成功,消息就一定在消息表中,避免 “订单创建了但没发消息” 的情况,所以这样设计能保证一致性?

因为使用了 MQ,即使第一次发送 MQ 失败,定时任务会反复重试,直到消息成功投递,避免 “消息在表里但没发到 MQ” 的情况。当消费方处理失败时,MQ 会重试投递,确保 “消息到了 B 但没处理” 的情况被修复。

本地消息表优缺点:

优点:

  • 本地消息表建设成本比较低,实现了可靠消息的传递确保了分布式事务的最终一致性。
  • 无需提供回查方法,进一步减少的业务的侵入。
  • 在某些场景下,还可以进一步利用注解等形式进行解耦,有可能实现无业务代码侵入式的实现。

缺点:

  • 本地消息表与业务耦合在一起,难于做成通用性,不可独立伸缩。
  • 本地消息表是基于数据库来做的,而数据库是要读写磁盘IO的,因此在高并发下是有性能瓶颈的

与上面说的 RocketMQ 事务消息对比

MQ事务消息:

  • 需要MQ支持半消息机制或者类似特性,在重复投递上具有比较好的去重处理;
  • 具有比较大的业务侵入性,需要业务方进行改造,提供对应的本地操作成功的回查功能;

DB本地消息表:

  • 使用了数据库来存储事务消息,降低了对MQ的要求,但是增加了存储成本;
  • 事务消息使用了异步投递,增大了消息重复投递的可能性;
维度 本地消息表方案 RocketMQ 事务消息方案
依赖组件 数据库(消息表)+ 普通 MQ 支持事务消息的 MQ(如 RocketMQ)
消息暂存 本地数据库消息表 Broker 的半消息队列
可靠性保障 定时任务扫描重试 Broker 反查机制
实现复杂度 需手动设计消息表和定时任务 依赖 MQ 原生机制,代码更简洁
适用场景 不依赖特定 MQ,传统系统改造 已使用 RocketMQ,追求低代码量

最大努力通知

最大努力通知方案的目标,就是发起通知方通过一定的机制,最大努力将业务处理结果通知到接收方。

最大努力通知型的最终一致性:

通过引入定期校验机制实现最终一致性,对业务的侵入性较低,适合于对最终一致性敏感度比较低、业务链路较短的场景。

最大努力通知事务主要用于外部系统,因为外部的网络环境更加复杂和不可信,所以只能尽最大努力去通知实现数据最终一致性,比如充值平台与运营商、支付对接、商户通知等等跨平台、跨企业的系统间业务交互场景

异步确保型事务主要适用于内部系统的数据最终一致性保障,因为内部相对比较可控,比如订单和购物车、收货与清算、支付与结算等等场景。

image-20251104113645228

普通消息是无法解决本地事务执行和消息发送的一致性问题的。因为消息发送是一个网络通信的过程,发送消息的过程就有可能出现发送失败、或者超时的情况。超时有可能发送成功了,有可能发送失败了,消息的发送方是无法确定的,所以此时消息发送方无论是提交事务还是回滚事务,都有可能不一致性出现。

所以,通知型事务的难度在于: 投递消息和参与者本地事务的一致性保障

因为核心要点一致,都是为了保证消息的一致性投递,所以,最大努力通知事务在投递流程上跟异步确保型是一样的,因此也有两个分支

  • 基于MQ自身的事务消息方案
  • 基于DB的本地事务消息表方案

基于MQ自身的事务消息方案

要实现最大努力通知,可以采用 MQ 的 ACK 机制。

最大努力通知事务在投递之前,跟异步确保型流程都差不多,关键在于投递后的处理。

因为异步确保型在于内部的事务处理,所以MQ和系统是直连并且无需严格的权限、安全等方面的思路设计。最大努力通知事务在于第三方系统的对接,所以最大努力通知事务有几个特性

  • 业务主动方在完成业务处理后,向业务被动方(第三方系统)发送通知消息,允许存在消息丢失。
  • 业务主动方提供递增多挡位时间间隔(5min、10min、30min、1h、24h),用于失败重试调用业务被动方的接口;在通知N次之后就不再通知,报警+记日志+人工介入。
  • 业务被动方提供幂等的服务接口,防止通知重复消费。
  • 业务主动方需要有定期校验机制,对业务数据进行兜底;防止业务被动方无法履行责任时进行业务回滚,确保数据最终一致性。

也就是说,最大努力通知的核心是 “尽最大努力确保消息送达,但不保证 100% 成功”,通过 MQ 的重试机制和业务端的幂等设计,在性能与一致性之间做取舍,它允许短暂不一致,通过日志和人工对账弥补极端情况的失败。

  • 发送方:通过 MQ 多次重试投递消息,直到接收方确认或达到重试上限;
  • 接收方:通过幂等处理确保重复接收消息时结果一致,避免业务异常;

它适用于对一致性要求不高、允许少量数据不一致,可通过人工干预修复的场景

还是来看一下其流程

image-20251104114246106
  1. 业务活动的主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失。
  2. 主动方可以设置时间阶梯型通知规则,在通知失败后按规则重复通知,直到通知N次后不再通知。
  3. 主动方提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息。
  4. 业务活动的被动方如果正常接收了数据,就正常返回响应,并结束事务。
  5. 如果被动方没有正常接收,根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。

以 “支付结果通知订单系统” 为例讲一下上述的流程

首先,发送方发送通知消息

  1. 执行本地业务并记录消息日志

    支付系统完成支付后(如用户支付成功),在本地事务中记录 “支付结果” 和 “待通知消息”(类似本地消息表,但更简化):

    1
    2
    3
    4
    5
    -- 支付记录
    INSERT INTO payment (id, order_id, status) VALUES (pay1001, order1001, 'SUCCESS');
    -- 通知消息日志(用于重试和对账)
    INSERT INTO notify_log (id, order_id, content, status, retry_count)
    VALUES (log1001, order1001, '{"status":"SUCCESS"}', 'UNSENT', 0);
  2. 发送消息到 MQ

    支付系统调用 MQ 的发送接口,将消息(含订单 ID、支付状态)发送到指定 Topic(如 “pay_notify_topic”),并指定重试策略(如 RocketMQ 的重试次数、间隔时间)。

    也就是说,业务活动的主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失。

Boss进入2阶段:MQ 投递与重试机制

  1. 首次投递

    MQ 收到消息后,立即推送给订阅 “pay_notify_topic” 的订单系统(接收方)。

  2. 重试触发条件

    • 若接收方未返回确认(如网络超时、服务宕机),MQ 将消息标记为 “未消费成功”;
    • 若接收方返回 “消费失败”(如业务处理异常),MQ 同样触发重试。
  3. MQ 重试策略(以 RocketMQ 为例):

    主动方可以设置时间阶梯型通知规则,在通知失败后按规则重复通知,直到通知N次后不再通知。

    • 重试次数:默认最多 16 次(可配置),超过后放入死信队列%DLQ%+消费者组);
    • 重试间隔:采用阶梯式延迟(1s、5s、10s、30s、1min、2min…),避免短时间内频繁冲击接收方;
    • 消息状态:重试过程中,消息始终保存在 MQ 中,直到确认成功或进入死信队列。

Boss进入3阶段:接收方处理消息

  1. 接收消息并验证幂等

    订单系统收到消息后,首先通过订单 ID(唯一标识)检查是否已处理过该消息(如查本地日志):

    主动方需要提供校对查询的接口,给被动方按需校对查询,用于恢复丢失的业务消息。

    1
    2
    3
    -- 检查是否已处理
    SELECT * FROM order_notify_log WHERE order_id = 'order1001';
    -- 若已处理,直接返回“成功”,避免重复更新
  2. 执行本地业务

    若未处理过,订单系统执行业务逻辑(如更新订单状态为 “已支付”):

    1
    UPDATE orders SET status = 'PAID' WHERE id = 'order1001';
  3. 返回处理结果

    • 处理成功:业务活动的被动方如果正常接收了数据,就正常返回响应,并结束事务。返回 “消费成功” 确认(如 RocketMQ 的CONSUME_SUCCESS),MQ 删除消息或标记为 “已消费”;
    • 处理失败:如果被动方没有正常接收,根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。返回 “重试”(如RECONSUME_LATER),触发 MQ 重试;若为业务不可恢复失败(如订单不存在),记录日志后返回 “成功”(避免无效重试)。

Boss进入4阶段:兜底机制(死信处理与对账)

  1. 死信队列处理

    当消息超过最大重试次数仍未成功,MQ 可将其移入死信队列。发送方可定期扫描死信队列,通过人工介入(如手动触发通知)处理失败消息。

  2. 定时对账

    • 发送方(支付系统)定时扫描 “未通知成功” 的日志(notify_log.status = 'UNSENT'),与接收方(订单系统)的订单状态进行比对;
    • 若发现 “支付成功但订单未更新”,主动重新发送通知或触发人工处理。

本地消息表方案

要实现最大努力通知,可以采用 定期检查本地消息表的机制 。

和上面说的异步消息一样,本地消息表方案的本质是用 “本地数据库事务” 确保业务与消息的原子性,再通过 “定时任务扫描消息表” 实现消息的可靠投递,最终结合 “幂等消费” 保证接收方不重复处理,达成 “最大努力通知” 的最终一致性。

image-20251104114731375

发送消息方:

  • 需要有一个消息表,记录着消息状态相关信息。
  • 业务数据和消息表在同一个数据库,要保证它俩在同一个本地事务。直接利用本地事务,将业务数据和事务消息直接写入数据库。
  • 在本地事务中处理完业务数据和写消息表操作后,通过写消息到 MQ 消息队列。使用专门的投递工作线程进行事务消息投递到MQ,根据投递ACK去删除事务消息表记录
  • 消息会发到消息消费方,如果发送失败,即进行重试。
  • 生产方定时扫描本地消息表,把还没处理完成的消息或者失败的消息再发送一遍。如果有靠谱的自动对账补账逻辑,这种方案还是非常实用的。

详细讲一下上述的流程

环节 1:发送方执行本地事务

  • 操作:发送方在同一个本地数据库事务中,完成业务数据写入消息表记录
    • 业务数据:写入业务表(如 app table 中的订单、支付记录)。
    • 消息表:写入消息表(如 msg table),记录消息内容、状态(如 “待发送”)、重试次数等。
    • 事务保证:若业务或消息表写入失败,整个事务回滚,确保 “业务成功则消息必记录,业务失败则消息不记录”。

环节 2:发送方向 MQ 发送消息(步骤 2)

  • 操作:发送方启动专门的投递线程,扫描消息表中 “状态为待发送(PENDING)” 或 “发送失败(FAILED)且重试次数未达上限” 的消息,调用 MQ 的发送接口将消息投递到指定 Topic。
  • 关键:投递后不立即删除消息表记录,需等待MQ 的发送回执(ACK) 后再更新状态。

环节 3:MQ 返回发送回执(步骤 3)

  • 操作:MQ 收到消息后,向发送方返回发送结果(成功 / 失败)。
    • 若发送成功:发送方更新消息表状态为 “已发送(SENT)”,并删除消息表中对应的记录(或标记为 “已处理”)。
    • 若发送失败:发送方不更新消息表状态(保持 PENDING 或标记为 FAILED),等待定时任务重试

环节 4:订阅方消费并处理消息

  • MQ 投递消息:MQ 将消息推送给订阅该 Topic 的消息订阅方(如订单系统)。
  • 订阅方处理:
    • 调用通知服务:订阅方收到消息后,调用 “通知服务” 执行业务逻辑(如更新订单状态为 “已支付”)。
    • 记录通知:通知服务将处理结果写入 “通知记录”(用于对账和幂等校验)。
    • 幂等性保障:通知服务需通过唯一标识(如 order_id) 确保重复消息不重复处理(如查通知记录判断是否已处理)。

环节 5:兜底机制(定时回查与重试)

  • 定时回查事务状态:发送方启动定时任务(如每 5 分钟一次),扫描消息表中 “超时未处理” 的消息(如状态为 PENDING 超过 30 分钟),主动触发消息重发业务回滚
  • 生产方定时扫描消息表:对未处理完成或发送失败的消息,再次发起投递,直到达到最大重试次数(如重试 5 次后仍失败,标记为 “死信” 并告警)。

最大努力通知事务在于第三方系统的对接,所以最大努力通知事务有几个特性:

  • “允许消息丢失,但尽最大努力投递”
    • 流程中通过 “消息表记录 + 定时重试”,确保消息不会因一次发送失败而丢失,尽最大努力向订阅方投递。
  • “递增多挡位时间间隔重试”
    • 发送方的定时任务可配置重试间隔(如 5min、10min、30min…),避免短时间内频繁重试导致系统压力,同时提高送达概率。
  • “被动方幂等接口”
    • 订阅方的通知服务通过唯一标识(如 order_id)实现幂等,防止重复消息导致业务异常(如重复更新订单状态)。
  • “定期校验机制兜底”
    • 定时回查和对账逻辑,确保极端情况下(如订阅方始终未处理)业务数据的最终一致性(如主动回滚支付或人工介入)。

最大努力通知事务 VS 异步确保型事务

最大努力通知事务,其实是基于异步确保型事务发展而来适用于外部对接的一种业务实现。他们主要有的是业务差别,如下:

  • 从参与者来说:最大努力通知事务适用于跨平台、跨企业的系统间业务交互;异步确保型事务更适用于同网络体系的内部服务交付。
  • 从消息层面说:最大努力通知事务需要主动推送并提供多档次时间的重试机制来保证数据的通知;而异步确保型事务只需要消息消费者主动去消费。
  • 从数据层面说:最大努力通知事务还需额外的定期校验机制对数据进行兜底,保证数据的最终一致性;而异步确保型事务只需保证消息的可靠投递即可,自身无需对数据进行兜底处理。

通知型事务存在的问题

通知型事务,是无法解决本地事务执行和消息发送的一致性问题的。

因为消息发送是一个网络通信的过程,发送消息的过程就有可能出现发送失败、或者超时的情况。超时有可能发送成功了,有可能发送失败了,消息的发送方是无法确定的,所以此时消息发送方无论是提交事务还是回滚事务,都有可能不一致性出现。

而消息中间件在分布式系统中的核心作用就是异步通讯、应用解耦和并发缓冲(也叫作流量削峰)。在分布式环境下,需要通过网络进行通讯,就引入了数据传输的不确定性,也就是CAP理论中的分区容错性。

消息发送一致性是指产生消息的业务动作与消息发送动作一致,也就是说如果业务操作成功,那么由这个业务操作所产生的消息一定要发送出去,否则就丢失。

image-20251104115626070

常规的MQ队列处理流程无法实现消息的一致性。所以,需要借助半消息、本地消息表,保障一致性。

消息重复发送问题和业务接口幂等性设计

image-20251104115717433

对于未确认的消息,采用按规则重新投递的方式进行处理。

对于以上流程,消息重复发送会导致业务处理接口出现重复调用的问题。消息消费过程中消息重复发送的主要原因就是消费者成功接收处理完消息后,消息中间件没有及时更新投递状态导致的。如果允许消息重复发送,那么消费方应该实现业务接口的幂等性设计。