RabbitMQ 的可靠性

RabbitMQ 的可靠性核心目标是 “确保消息不丢失、不重复消费,最终被正确处理”。其中,避免重复消费是可靠性的关键环节 —— 即使因网络波动、系统故障等导致消息被重复投递,业务结果也需保持一致。

避免重复消费就是保证消息就算被重复发送也只被消费一次,这就是我们下面要讲的消息幂等性。

举个例子,我们在购物的时候,已经支付完成,但是消息没有正确的被消费,前端发送请求查询支付状态时,肯定是查询交易服务状态,会发现业务订单未支付,而用户自己知道已经支付成功,这就导致用户体验不一致。

生产者可靠性

生产者重试机制,通过在配置文件中添加相关配置打开重试机制

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
connection-timeout: 1s # 设置MQ的连接超时时间
template:
retry:
enabled: true # 开启超时重试机制
initial-interval: 1000ms # 失败后的初始等待时间
multiplier: 1 # 失败后下次的等待时长倍数,下次等待时长 = initial-interval * multiplier
max-attempts: 3 # 最大重试次数

分布式系统的网络不稳定的话,利用重试机制可以有效提高消息发送的成功率。不过SpringAMQP提供的重试机制是阻塞式的重试,也就是说多次重试等待的过程中,当前线程是被阻塞的。

如果对于业务性能有要求,建议禁用重试机制。如果一定要使用,请合理配置等待时长和重试次数,当然也可以考虑使用异步线程来执行发送消息的代码。

生产者的重试机制可能会导致消息被重复发送,生产者发送消息后,因网络波动未收到 RabbitMQ 的确认(Publisher Confirm),误认为消息未发送成功,触发重试机制,导致同一消息被多次发送到队列。

所以也会有一个生产者确认机制,确保 RabbitMQ 成功接收消息后,生产者才停止重试。

生产者发送消息后,RabbitMQ 在 成功接收并处理消息(如持久化到磁盘、路由到队列)后,会向生产者返回一个 确认信号(acknowledgement)

  • 若生产者收到确认信号:说明消息已被 RabbitMQ 接收,无需重试;
  • 若未收到确认信号(或收到否定确认):说明消息未被正确处理,生产者可根据原因决定是否重试。

但是,生产者确认机制对性能影响较大,无特殊需要不要开启,而且一般情况下,只要生产者与MQ之间的网路连接顺畅,基本不会出现发送消息丢失的情况,因此大多数情况下我们无需考虑这种问题。

RabbitMQ 的 生产者消息确认机制 包含 Publisher Confirm(发布确认)Publisher Return(发布回退) 两个核心子机制

机制 触发阶段 核心作用 反馈结果类型
Publisher Confirm 消息到达 MQ 后的 最终处理 告知生产者 “消息是否被 MQ 成功接收并稳定存储” 成功(ACK) / 失败(NACK)
Publisher Return 消息到达交换机,但 路由失败 告知生产者 “消息已到交换机,但未找到匹配队列” 仅失败回退(含失败原因)

所以会存在如下四种结果

  • 消息成功到达交换机,但因路由规则不匹配(如路由键错误、队列未绑定),无法投递到任何队列,此时触发 Return 回退,同时 Confirm 返回 ACK。

    • 为什么路由失败还返回 ACK? 因为 Confirm 的核心是 “确认 MQ 是否接收消息”,而非 “消息是否最终入队”。消息到达交换机即视为 MQ 已接收,路由失败是后续的业务配置问题,不影响 Confirm 的 ACK 反馈。
    • 如何处理这种情况? 需在 Return 回调中记录日志,人工排查路由配置(如补充队列绑定),不建议重试(路由键错误属于永久故障,重试仍会失败)。
  • 临时消息投递到了MQ,并且入队成功,返回ACK,告知投递成功。也就是消息为非持久化(deliveryMode=1),成功到达交换机并路由到队列,MQ 仅将消息存入内存(不写磁盘),此时 Confirm 返回 ACK,Return 不触发。

    • 临时消息若 MQ 重启,内存中的消息会丢失,但 Confirm 已返回 ACK,生产者会误认为消息安全。
  • 持久消息(deliveryMode=2)投递到 MQ,入队并完成持久化,此时 Confirm 返回 ACK,Return 不触发。

    • 持久消息的 ACK 条件需同时满足 “入队成功 + 持久化完成”,确保消息不会因 MQ 重启丢失;
    • 持久化会增加磁盘 I/O 开销,吞吐量低于临时消息
  • 可能存在其它情况,消息未被 MQ 成功接收或稳定存储,此时 Confirm 返回 NACK(ack=false),Return 不触发(因消息未到达交换机,无路由环节),常见情况包括

    失败场景 具体流程
    交换机不存在 生产者指定不存在的交换机 exchange-not-exist → MQ 接收消息后检查交换机不存在 → Confirm 返回 NACK,cause 为 “NOT_FOUND”
    网络波动导致消息丢失 生产者发送消息后,网络中断 → 消息未到达 MQ → MQ 无反馈,生产者超时后 Confirm 返回 NACK,cause 为 “Connection reset”
    MQ 磁盘满(持久化失败) 持久消息入队后,MQ 尝试持久化但磁盘满 → 无法完成持久化 → Confirm 返回 NACK,cause 为 “disk full”
    • NACK 的核心含义:“MQ 未成功接收消息,或无法稳定存储消息”,生产者需根据 cause 判断是否重试;
    • 一般我们这样配置重试策略,仅对 临时故障(如网络波动、MQ 临时过载)重试,且限制重试次数(如 3 次);对 永久故障(如交换机不存在、磁盘满)直接记录日志,人工介入。

默认两种机制都是关闭状态,需要通过配置文件来开启。

1
2
3
4
spring:
rabbitmq:
publisher-confirm-type: correlated # 开启确认机制(CORRELATED 异步回调模式)
publisher-returns: true # 开启消息回退(路由失败时回调)

Spring AMQP 中通过 spring.rabbitmq.publisher-confirm-type 配置确认模式,支持三种类型:

模式 说明
NONE 关闭确认机制(默认值)
CORRELATED 异步回调确认:消息发送后,RabbitMQ 处理完成后触发回调方法
SIMPLE 同步确认:发送消息后阻塞等待确认,或批量确认(性能较差)

CORRELATED 模式是首选,通过回调函数异步处理确认结果,不阻塞主线程,性能要好一些。

配置 RabbitMQ 配置,初始化交换机,队列和回调策略

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
import org.springframework.amqp.core.*;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {

// 声明交换机(持久化)
@Bean
public DirectExchange businessExchange() {
return ExchangeBuilder.directExchange("business.exchange")
.durable(true) // 持久化交换机
.build();
}

// 声明队列(持久化)
@Bean
public Queue businessQueue() {
return QueueBuilder.durable("business.queue")
.build();
}

// 绑定交换机与队列(路由键:business.key)
@Bean
public Binding businessBinding(DirectExchange businessExchange, Queue businessQueue) {
return BindingBuilder.bind(businessQueue)
.to(businessExchange)
.with("business.key");
}

// 配置 RabbitTemplate,设置 Confirm 和 Return 回调
@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 1. 设置 Confirm 回调(消息最终处理结果)
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData == null) {
return; // 无关联数据,忽略
}
String messageId = correlationData.getId(); // 获取消息唯一ID

if (ack) {
// ACK:消息已被 MQ 接收并处理(入队/持久化完成)
System.out.println("[Confirm] 消息处理成功:" + messageId);
} else {
// NACK:消息未被 MQ 接收或处理失败
System.err.println("[Confirm] 消息处理失败:" + messageId + ",原因:" + cause);
// 记录失败消息,后续人工处理
recordFailedMessage(messageId, cause);
}
});

// 2. 设置 Return 回调(路由失败回退)
rabbitTemplate.setReturnsCallback(returnedMessage -> {
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
System.err.println("[Return] 消息路由失败:" +
"消息ID=" + messageId +
",交换机=" + returnedMessage.getExchange() +
",路由键=" + returnedMessage.getRoutingKey() +
",原因=" + returnedMessage.getReplyText());
// 路由失败属于配置问题,记录日志后人工排查
recordRoutingError(messageId, returnedMessage);
});

// 3. 强制消息路由失败时回退(不丢弃消息)
rabbitTemplate.setMandatory(true);

return rabbitTemplate;
}

// 记录发送失败的消息(实际项目中可存储到数据库)
private void recordFailedMessage(String messageId, String cause) {
// 示例:仅打印日志,实际应持久化到数据库
System.err.println("【失败记录】消息ID=" + messageId + ",原因=" + cause);
}

// 记录路由失败的消息
private void recordRoutingError(String messageId, ReturnedMessage returnedMessage) {
// 示例:仅打印日志,实际应持久化到数据库
System.err.println("【路由错误记录】消息ID=" + messageId + ",详情=" + returnedMessage);
}
}

MQ的可靠性

MQ的可靠性我们一般从数据的持久化入手,在控制台的Exchanges页面,添加交换机时可以配置交换机的Durability参数,设置为Durable就是持久化模式,Transient就是临时模式。

image-20250919165538605

在控制台的Queues页面,添加队列时,同样可以配置队列的Durability参数

image-20250919165546810

消费者可靠性

消费者确认机制

为了确认消费者是否成功处理消息,RabbitMQ提供了消费者确认机制,当消费者处理消息结束后,应该向RabbitMQ发送一个回执,告知RabbitMQ自己消息处理状态。回执有三种可选值:

  • ack:成功处理消息,RabbitMQ从队列中删除该消息
  • nack:消息处理失败,RabbitMQ需要再次投递消息
  • reject:消息处理失败并拒绝该消息,RabbitMQ从队列中删除该消息

一般reject方式用的较少,除非是消息格式有问题。因此大多数情况下我们需要将消息处理的代码通过try catch机制捕获,消息处理成功时返回ack,处理失败时返回nack

由于消息回执的处理代码比较统一,因此SpringAMQP帮我们实现了消息确认。并允许我们通过配置文件设置ACK处理方式,有三种模式:

模式 核心逻辑 适用场景
AUTO(默认) Spring 自动根据业务代码是否抛出异常决定 ACK 类型: 无异常:自动发送 ack(删除消息) 如果是业务异常:自动发送 nack(重新入队,或进入死信队列) 如果是消息处理或校验异常,自动返回reject; 大多数场景,无需手动处理 ACK,简化代码
MANUAL(手动) 完全由开发者控制 ACK 发送:需通过 Channel 对象手动调用 basicAck/basicNack,否则消息会一直处于 Unacked 状态 复杂业务场景(如需批量确认、根据业务结果动态决定是否重入队)
NONE(无确认) 消费者不发送任何 ACK 回执,RabbitMQ 认为消息 “被投递即成功处理”,直接删除消息 对消息可靠性要求极低的场景(如日志收集),不推荐用于核心业务

AUTO 模式是 Spring AMQP 的默认配置,核心是 “业务无异常则 ACK,有异常则 NACK”,无需手动操作 Channel,适合大多数简单业务。

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: AUTO # 开启 AUTO 确认模式
prefetch: 1 # 每次从队列拉取1条消息,处理完再拉取下一条(避免消息堆积在消费者内存)
retry:
enabled: true # 配合 AUTO 模式,失败时触发重试(可选)
max-attempts: 3 # 最大重试次数(含首次处理)
initial-interval: 2000 # 重试间隔2秒

而业务代码无需关注 ACK 逻辑,只需专注业务处理,异常会自动触发 NACK:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class AutoAckConsumer {

// 监听指定队列,AUTO 模式下无需手动处理 ACK
@RabbitListener(queues = "business.queue")
public void processMessage(String message) {
try {
// 1. 执行业务逻辑(如订单状态更新、数据入库)
System.out.println("收到消息:" + message);
doBusiness(message);

// 2. 无异常 → Spring 自动发送 ACK,RabbitMQ 删除消息
} catch (RuntimeException e) {
// 3. 有业务异常 → Spring 自动发送 NACK,触发重试(重试耗尽后进入死信队列)
System.err.println("消息处理失败,触发重试:" + e.getMessage());
throw e; // 必须抛出异常,否则 Spring 会误判处理成功,发送 ACK
} catch (){
// 3. 有消息转换异常 → Spring 自动发送 reject,不触发重试
System.err.println("消息处理失败,触发重试:" + e.getMessage());
throw new MessageConversionException("故意的");
}
}

// 模拟业务逻辑
private void doBusiness(String message) {
// 示例:若消息包含“error”,模拟业务失败
if (message.contains("error")) {
throw new RuntimeException("业务处理异常:包含非法内容");
}
System.out.println("业务处理成功:" + message);
}
}
  • 重试与 NACK 的关系AUTO 模式下,业务抛出异常后,Spring 会先触发重试机制(若开启),重试耗尽仍失败则发送 nack 并拒绝重入队,这就代表需提前配置死信队列,否则消息会被丢弃;
  • 异常捕获注意:若手动捕获异常且不重新抛出,Spring 会认为处理成功并发送 ack,导致失败消息被误删,因此 必须抛出异常 才能触发 NACK。

MANUAL 模式下,开发者需通过 Channel 对象手动调用 ACK 方法,适合需要 “精细控制消息状态” 的场景(如批量确认、根据业务结果动态决定是否重入队)。

1
2
3
4
5
6
spring:
rabbitmq:
listener:
simple:
acknowledge-mode: MANUAL # 开启手动确认模式
prefetch: 5 # 批量拉取5条消息,处理完批量确认(提高吞吐量)

消息消费者的业务代码需通过 @Header(AmqpHeaders.DELIVERY_TAG) 获取消息的 deliveryTag(RabbitMQ 分配的消息唯一标识),再调用 Channel 方法发送 ACK/NACK:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
@Component
public class ManualAckConsumer {

// 监听队列,手动控制 ACK
@RabbitListener(queues = "business.queue")
public void processMessage(
Message message, // 消息对象(含消息体、消息头)
Channel channel, // RabbitMQ 原生 Channel 对象
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag // 消息唯一标识
) throws IOException {
String msgContent = new String(message.getBody());
try {
// 1. 执行业务逻辑(如调用第三方接口、事务操作)
boolean isSuccess = doBusinessWithResult(msgContent);

if (isSuccess) {
// 2. 业务成功:手动发送 ACK(删除消息)
// 第二个参数:true=批量确认(确认当前 deliveryTag 及之前的所有消息),false=仅确认当前消息
channel.basicAck(deliveryTag, false);
System.out.println("手动 ACK 成功:" + msgContent);
} else {
// 3. 业务失败(可重试):发送 NACK 并重新入队
// 第三个参数:true=重新入队,false=拒绝入队(进入死信队列)
channel.basicNack(deliveryTag, false, true);
System.out.println("手动 NACK 并重新入队:" + msgContent);
}
} catch (Exception e) {
// 4. 严重异常(如消息格式错误):发送 NACK 并拒绝入队(避免无限重试)
channel.basicNack(deliveryTag, false, false);
System.err.println("手动 NACK 并拒绝入队:" + msgContent + ",原因:" + e.getMessage());
}
}

// 模拟带返回结果的业务逻辑
private boolean doBusinessWithResult(String message) {
// 示例:消息包含“retry”则返回失败(需重试),否则返回成功
return !message.contains("retry");
}
}
  • basicAck(deliveryTag, multiple)
    • deliveryTag:当前消息的唯一标识(从消息头获取);
    • multipletrue 表示 “确认当前及之前所有未确认的消息”(批量确认),false 表示 “仅确认当前消息”。
  • basicNack(deliveryTag, multiple, requeue)
    • requeuetrue 表示 “消息重新入队”(可重试),false 表示 “消息拒绝入队”(需配置死信队列,否则消息被删除)。
  • basicReject(deliveryTag, requeue)
    • 功能与 basicNack 类似,但不支持批量操作(multiple 参数),仅能拒绝单条消息,实际使用中 basicNack 更灵活。

注意,无论是 AUTO 还是 MANUAL 模式,当消息处理失败需要重试时,需注意 “避免无限重试”—— 重试耗尽后应将消息转入死信队列(DLQ),而非反复重入队占用资源,关于这部分在下面消费者重连机制进行说明

消费者的重试机制

在未开启本地重试时,若消费者处理消息失败,如抛出异常,会触发 RabbitMQ 的 nack 机制,消息会被重新入队(requeue),然后再次投递到消费者。如果消费者始终处理失败,消息会陷入 “失败 → 重入队 → 再投递 → 再失败” 的无限循环,这会导致很严重的性能问题

针对消费者的重试机制通过限制重试的次数和本地处理,避免了这种无限循环,同时给消息处理失败一个补救机会

Spring AMQP 通过 spring.rabbitmq.listener.simple.retry 配置本地重试

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启本地重试(核心开关)
initial-interval: 1000ms # 首次重试的等待时间(1秒)
multiplier: 1 # 重试间隔的倍数(下次等待 = 上次间隔 × multiplier)
max-attempts: 3 # 最大重试次数(含首次处理,共3次)
stateless: true # 无状态模式(若业务含事务,需设为false)

当开启本地重试后,消息处理的完整流程如下(以 max-attempts=3 为例):

  1. 首次处理:消费者接收到消息,执行业务逻辑;
    • 若成功(无异常):正常返回 ack,消息被 MQ 删除;
    • 若失败(抛异常):不回退消息到 MQ,进入本地重试流程。
  2. 第一次重试:等待 initial-interval(1 秒)后,重新执行业务逻辑;
    • 若成功:返回 ack,消息删除;
    • 若失败:继续等待(间隔 = initial-interval × multiplier),准备第二次重试。
  3. 第二次重试:等待指定间隔后再次执行;
    • 若成功:返回 ack
    • 若失败:重试次数耗尽(已达 max-attempts=3),进入最终处理。
  4. 重试耗尽后:Spring AMQP 会自动抛出 AmqpRejectAndDontRequeueException 异常

重试耗尽了消息一定就要被删除吗?我看未必,实际上也是配置死信队列,重试耗尽后,可以让 Spring 返回 nack 而非 reject,并指定 requeue=false,消息会自动路由到死信队列。

1
2
3
4
5
6
7
8
9
10
@RabbitListener(queues = "business.queue")
public void processMessage(String message) {
try {
// 业务逻辑
doBusiness(message);
} catch (Exception e) {
// 重试耗尽后,手动抛出异常,指定不重入队(进入死信队列)
throw new AmqpRejectAndDontRequeueException("重试耗尽,路由到死信队列", e);
}
}

默认情况下,所有未捕获的异常都会触发重试,但可通过 RetryTemplate 自定义 “可重试异常” 和 “不可重试异常”:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
@Bean
public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
factory.setConnectionFactory(connectionFactory);

// 配置重试模板
RetryTemplate retryTemplate = new RetryTemplate();
// 设置可重试异常(仅网络异常重试)
retryTemplate.setRetryPolicy(new SimpleRetryPolicy(3,
Collections.singletonMap(NetworkException.class, true),
false)); // 其他异常不重试
factory.setRetryTemplate(retryTemplate);
return factory;
}

消费者重连机制

当消费者与 RabbitMQ 服务器的连接因网络波动、MQ 服务重启、连接超时等原因中断时,消费者能自动尝试重新建立连接并恢复消息消费,避免因连接中断导致的消费停滞。

Spring AMQP(基于 Spring 生态的 RabbitMQ 客户端封装)已经内置了成熟的消费者重连机制,无需开发者手动编写复杂的重连逻辑,只需理解其核心原理、默认配置及自定义优化方式即可。当消费者与 MQ 的连接中断,通常由以下「非预期故障」触发,此时重连机制会自动生效。

Spring AMQP 的重连能力依赖于其底层的 RabbitConnectionFactory(连接工厂)和 SimpleMessageListenerContainer(消息监听容器,消费者核心组件),核心逻辑如下:

  1. 连接状态监听SimpleMessageListenerContainer 会持续监控与 MQ 的连接 / 通道状态,一旦检测到「连接断开」「通道关闭」等异常(如 IOExceptionConnectionShutdownException),立即触发重连流程;
  2. 重连任务调度:连接中断后,容器会通过 定时任务(ScheduledExecutorService) 周期性尝试重新创建连接;
  3. 连接重建与消费恢复
    • 若重连成功(成功创建 ConnectionChannel),容器会自动重新声明队列、交换机、绑定关系(若配置了 declarables),并恢复消息监听(从队列头部或上次消费位置继续拉取消息);
    • 若重连失败,容器会按照「重试间隔」继续重试,直到连接恢复或手动停止。

Spring AMQP 的重连机制默认是开启状态,且提供了合理的默认参数,大多数场景下无需修改即可满足需求。核心默认配置如下:

配置项 默认值 作用说明
重连开启状态 自动开启 无需手动配置,连接中断后自动触发重连
初始重连间隔 1000ms(1 秒) 第一次重连失败后,等待 1 秒再尝试第二次重连
最大重连间隔 30000ms(30 秒) 重连间隔会逐渐递增(避免频繁重试占用资源),最终稳定在 30 秒一次
重连重试次数 无限次 除非手动停止消费者或 MQ 永久不可用,否则会一直重试(保障最大程度的恢复能力)
通道恢复机制 自动恢复 若仅通道(Channel)断开(连接未断),会自动重建通道,无需重新建立连接

配置文件配置如下:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
rabbitmq:
host: 127.0.0.1 # MQ 服务器地址
port: 5672 # MQ 端口
username: guest # 用户名
password: guest # 密码
virtual-host: / # 虚拟主机
# 消费者监听容器配置(以 simple 容器为例,direct 容器配置类似)
listener:
simple:
# 1. 连接恢复相关配置(重连核心参数)
retry:
enabled: true # 开启消费者重试
recovery-interval: 2000ms # 重连间隔

# 2. 连接超时与心跳配置(减少连接异常概率)
container-type: simple # 监听容器类型(simple/direct)
connection-timeout: 5000ms # 连接超时时间(5 秒,避免长时间阻塞)
# 3. 连接工厂级别的重连配置
connection-factory:
cache:
channel:
size: 5 # 缓存的通道数,减少重连时重建通道的开销
connection:
size: 2 # 缓存的连接数,重连时优先使用缓存连接
requested-heartbeat: 30s # 心跳间隔(30 秒):避免连接被防火墙/MQ 主动断开

失败处理策略

在消息发送本地测试达到最大重试次数后,消息会被丢弃。这在某些对于消息可靠性要求较高的业务场景下,显然不太合适了。

因此 Spring 允许我们自定义重试次数耗尽后的消息处理策略,这个策略是由MessageRecovery接口来定义的,它有3个不同实现:

  • RejectAndDontRequeueRecoverer:重试耗尽后,直接reject,丢弃消息。默认就是这种方式

    • 当重试次数达到 max-attempts 后,该实现会直接返回 reject 指令,并设置「不重新入队」(requeue=false),RabbitMQ 会将消息从队列中删除(不会转发到其他地方)。

    • 如果需要显式指定,可通过以下方式配置:

      1
      2
      3
      4
      5
      6
      7
      8
      @Configuration
      public class RabbitConfig {
      @Bean
      public MessageRecovery messageRecovery() {
      // 重试耗尽后,拒绝并丢弃消息(默认行为)
      return new RejectAndDontRequeueRecoverer();
      }
      }
  • ImmediateRequeueMessageRecoverer:重试耗尽后,返回nack,消息重新入队

    • 重试次数用完后,该实现会返回 requeue=true 的指令,RabbitMQ 会将消息重新加入原队列尾部,等待下一次被消费者拉取处理。相当于无限重试
  • RepublishMessageRecoverer:重试耗尽后,将失败消息投递到指定的交换机

    • 这是生产环境最常用的策略:重试耗尽后,将消息转发到死信队列(DLQ),而非直接丢弃。

    • 它的原理比较复杂,消息重试耗尽后,DeadLetterPublishingRecoverer 会读取原队列的死信交换机(DLX),然后将失败消息封装为「死信消息」,并添加额外的死信属性(x-death),再通过 DLX 将死信消息路由到绑定的 DLQ 中。

    • 这种情况下需要创建并且配置死信交换机、死信队列及各种绑定,然后配置DeadLetterPublishingRecoverer

    • 示例的代码如下

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      12
      13
      14
      15
      16
      17
      18
      19
      20
      21
      22
      23
      24
      25
      26
      27
      28
      29
      30
      31
      32
      33
      34
      35
      36
      37
      38
      39
      40
      41
      42
      43
      44
      45
      46
      @Configuration
      public class RabbitRecoveryConfig {
      @Autowired
      private AmqpTemplate amqpTemplate; // Spring 提供的 RabbitMQ 模板,用于发送消息

      @Bean
      public MessageRecovery messageRecovery() {
      // 重试耗尽后,将消息转发到死信队列
      return new DeadLetterPublishingRecoverer(amqpTemplate,
      // 可选:自定义死信交换机和路由键(若原队列未配置,可在此指定)
      (message, cause) -> {
      // 此处逻辑:若原队列未配置死信交换机,可手动指定
      // 示例:返回死信交换机和路由键的封装对象
      return new DeadLetterPublishingRecoverer.Destination(
      "dlx.exchange", "dlx.routing.key");
      });
      }

      // 补充:配置消费者重试基础参数(开启重试、设置最大次数等)
      @Bean
      public SimpleRabbitListenerContainerFactory rabbitListenerContainerFactory(ConnectionFactory connectionFactory) {
      SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
      factory.setConnectionFactory(connectionFactory);
      // 开启本地重试
      factory.setRetryTemplate(retryTemplate());
      // 重试耗尽后,调用 MessageRecovery 处理
      factory.setMessageRecovery(messageRecovery());
      return factory;
      }

      // 定义重试模板(设置重试间隔、最大次数等)
      private RetryTemplate retryTemplate() {
      RetryTemplate retryTemplate = new RetryTemplate();
      // 重试策略:固定间隔重试
      FixedBackOffPolicy backOffPolicy = new FixedBackOffPolicy();
      backOffPolicy.setBackOffPeriod(1000); // 重试间隔 1 秒(对应 initial-interval)
      retryTemplate.setBackOffPolicy(backOffPolicy);

      // 终止策略:最大重试 3 次(对应 max-attempts)
      SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
      retryPolicy.setMaxAttempts(3);
      retryTemplate.setRetryPolicy(retryPolicy);

      return retryTemplate;
      }
      }

消息幂等性

什么是消息幂等性

在分布式系统中,消息的可靠传输是一项重要的任务,而 RabbitMQ 作为常用的消息中间件,消息幂等性是保证消息可靠处理的关键特性之一。

消息幂等性指的是,无论一条消息被重复处理多少次,最终的业务结果都应该是一致的,就好像这条消息只被处理了一次一样。对消息来说,就是“不管这条消息被消费多少次,业务结果都不会重复”。

比如,在电商系统中,支付消息如果不保证幂等性,当消息重复消费时,可能导致用户被重复扣款;而保证幂等性后,即使消息多次被消费,用户也只会被扣款一次。

在使用 RabbitMQ 过程中,消息重复的情况可能在以下场景中出现:

  • 生产者消息确认机制问题:生产者发送消息后,由于网络波动等原因,没有及时收到 RabbitMQ 发送的确认应答,生产者会认为消息发送失败而进行重发,但实际上 RabbitMQ 可能已经接收到了消息,这就导致消息重复。
  • 消费者消息确认机制问题:消费者从 RabbitMQ 获取消息并处理完成后,在发送确认消息给 RabbitMQ 的过程中,网络出现故障,RabbitMQ 没有收到确认,就会认为消息没有被成功消费,当消费者重新连接时,RabbitMQ 会再次推送该消息,造成消息重复消费。
  • RabbitMQ 集群故障:在 RabbitMQ 集群环境下,当节点发生故障转移时,可能会导致部分消息的状态记录不准确,进而引发消息的重复发送或消费。

所以,消息幂等是分布式系统的“底线要求”,尤其是涉及写操作的场景。

RabbitMQ的ACK机制和持久化(durable)能保证消息“不丢失”,但无法直接解决重复消费。比如:

  • 消费者处理消息时抛异常,RabbitMQ会把消息重新放回队列,下次消费者可能再次拉取到它;
  • 生产者发送消息时因网络问题重试,导致Broker收到多条相同消息(虽然概率低,但必须防)。

四种幂等性解决方案

保证消息不被重复消费一定会有一定的决策。

利用重试机制

MQ 消费者的幂等性问题,主要在于 MQ 的重试机制,因为网络原因或客户端延迟消费导致重复消费。

因为可能会出现如下的情况,消费者在消费 MQ 中的消息时,MQ 已把消息发送给消费者,消费者在给 MQ 返回 ack 时网络中断,故 MQ 未收到确认信息,该条消息会重新发给其他的消费者,或者在网络重连后再次发送给该消费者,但实际上该消费者已成功消费了该条消息,造成消费者消费了重复的消息

在 RabbitMQ 消息处理中,重试机制用于应对临时故障(如网络波动、依赖服务短暂不可用),确保消息最终能被成功处理;

所以什么时候需要重试呢?

  • 消费者获取到消息后,调用第三方接口,但接口暂时无法访问,是否需要重试? 需要重试
  • 消费者获取到消息后,抛出数据转换异常,是否需要重试? 不需要,因为重试也没用

Spring AMQP 提供了成熟的重试机制,可通过 RetryTemplate 配置消费者重试(处理消费阶段的异常)和生产者重试(处理发送阶段的异常),一般重试都是消费者重试

首先,通过 application.yml 或 Java 配置类开启并定制重试策略:

1
2
3
4
5
6
7
8
9
10
11
spring:
rabbitmq:
listener:
simple:
retry:
enabled: true # 开启重试
max-attempts: 3 # 最大重试次数(包括首次消费,共3次)
initial-interval: 1000 # 首次重试间隔(毫秒)
multiplier: 2 # 重试间隔乘数(下次间隔 = 上次间隔 * 乘数)
max-interval: 5000 # 最大重试间隔(毫秒)
acknowledge-mode: AUTO # 确认模式(AUTO/MANUAL,结合重试使用)
  • 参数说明
    • max-attempts: 3:消息最多被处理 3 次(1 次正常消费 + 2 次重试)。
    • initial-interval: 1000:首次失败后,1 秒后重试。
    • multiplier: 2:第二次重试间隔为 1000*2=2000 毫秒,第三次为 4000 毫秒(不超过 max-interval)。

当重试达到最大次数后仍失败,需指定 “最终处理策略”(默认将消息丢弃,不推荐)。通常有两种方案:

  • 方案 1:将消息转发到死信队列(DLQ),后续人工干预。
  • 方案 2:记录日志并手动确认消息(避免消息无限重试)。

配置死信队列示例(重试耗尽后进入死信队列):

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
@Configuration
public class RetryDeadLetterConfig {
// 1. 声明死信交换机和死信队列
@Bean
public DirectExchange dlqExchange() {
return ExchangeBuilder.directExchange("retry.dlq.exchange").durable(true).build();
}

@Bean
public Queue dlqQueue() {
return QueueBuilder.durable("retry.dlq.queue").build();
}

@Bean
public Binding dlqBinding() {
return BindingBuilder.bind(dlqQueue()).to(dlqExchange()).with("retry.dlq.key");
}

// 2. 声明业务队列,并关联死信配置(重试耗尽后进入死信队列)
@Bean
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-dead-letter-exchange", "retry.dlq.exchange"); // 死信交换机
args.put("x-dead-letter-routing-key", "retry.dlq.key"); // 死信路由键
return QueueBuilder.durable("retry.business.queue")
.withArguments(args)
.build();
}
}

那么上述都是针对重试的操作,那么到底,如何保证消息幂等性,不被重复消费?

这个问题也算是 MQ 面试当中经常考察的一点,因为无论是什么 MQ 都会有这个问题。

这个就需要结合我们接下来提到的策略了。

利用全局唯一 ID

原理:为每条消息生成一个全局唯一的 ID,例如使用 UUID 或者业务系统中具有唯一性的业务单号(如订单号)。在消费者处理消息之前,先根据这个唯一 ID 去查询本地的处理记录(可以存储在数据库、缓存如 Redis 中),如果已经存在处理记录,说明该消息已经被处理过,直接跳过;如果不存在,则进行业务处理,并将处理记录保存到本地。

那么实现思路也很清晰了

  1. 生产者给每条消息打一个 “全局唯一 ID”(类似身份证),随消息一起发送;
  2. 消费者收到消息后,先查 “这个 ID 是否已处理过”(用 Redis 记录处理状态);
  3. 若已处理:直接跳过业务逻辑,避免重复执行;
  4. 若未处理:执行业务逻辑 → 成功后标记 “ID 已处理” → 失败则抛异常触发重试。

示例

生产者,发送了带唯一ID的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class IdempotentProducer {
@Autowired
private RabbitTemplate rabbitTemplate;

public void sendMessage(String content) {
// 生成全局唯一消息ID
String messageId = UUID.randomUUID().toString();

// 设置消息ID到消息头
// MessagePostProcessor:Spring AMQP提供的“消息后置处理器”,用于修改消息属性
MessagePostProcessor postProcessor = message -> {
// 获取消息的属性对象(MessageProperties),类似HTTP请求头
message.getMessageProperties().setMessageId(messageId);
return message;
};

// 发送消息
rabbitTemplate.convertAndSend(
"retry.business.exchange",
"retry.business.key",
content,
postProcessor
);
System.out.println("发送消息,ID:" + messageId + ",内容:" + content);
}
}

消费者:结合重试与幂等性处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.data.redis.core.RedisTemplate;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

@Component
public class IdempotentConsumer {
@Autowired
private RedisTemplate<String, String> redisTemplate;

// 监听业务队列(开启重试)
@RabbitListener(queues = "retry.business.queue")
public void processMessage(Message message) {
// 从消息头中提取唯一ID:对应生产者设置的messageId
String messageId = message.getMessageProperties().getMessageId();
// 从消息体中提取业务内容:Message的getBody()返回字节数组,需转为字符串
String content = new String(message.getBody());
System.out.println("收到消息,ID:" + messageId + ",内容:" + content);

// 1. 幂等校验第一步:检查该消息是否已处理(Redis中是否有记录)
String redisKey = "processed:" + messageId;
// 判断Redis中是否存在该键,存在则说明消息已处理
Boolean isProcessed = redisTemplate.hasKey(redisKey);

if (Boolean.TRUE.equals(isProcessed)) {
// 2. 找到了键,说明消息已处理:直接返回(不执行业务逻辑)
System.out.println("消息已处理,ID:" + messageId);
return;
}

try {
// 3. 没找到,说明未处理,这才执行业务逻辑(可能抛出临时异常触发重试)
doBusiness(content);

// 4. 业务成功:标记为已处理(设置过期时间,避免Redis内存溢出)
redisTemplate.opsForValue().set(redisKey, "1", 24, TimeUnit.HOURS);
System.out.println("消息处理成功,ID:" + messageId);
} catch (Exception e) {
// 5. 业务失败:抛出异常触发重试(重试耗尽后进入死信队列)
System.err.println("消息处理失败,触发重试,ID:" + messageId + ",原因:" + e.getMessage());
throw e; // 必须抛出异常,否则Spring会认为处理成功,不再重试
}
}

// 模拟业务逻辑(可能抛出临时异常)
private void doBusiness(String content) throws Exception {
// 示例:随机抛出异常模拟临时故障(如数据库连接超时)
if (Math.random() < 0.5) {
throw new Exception("临时故障:依赖服务不可用");
}
// 实际业务逻辑:如下单、扣款等
System.out.println("执行业务逻辑:" + content);
}
}

为什么使用 Redis 呢?因为 Redis 是分布式缓存,支持高并发读写,且hasKey/set操作是单线程执行的(Redis 单线程模型),可避免 “并发重复处理”(如两个消费者同时处理同一条消息)。

数据库唯一约束

利用数据库的唯一索引,让重复消息的写操作直接报错,从而避免重复数据。

这种方案适合基于业务唯一标识(如订单号、支付流水号)的场景,无需额外维护版本号或状态标记。

  1. 唯一约束设计:在业务表中对 “业务唯一标识”(如订单号 order_no)创建唯一索引,确保同一标识的记录只能插入一次。
  2. 消息处理逻辑
    • 消费者收到消息后,尝试向数据库插入或更新含该唯一标识的记录;
    • 若成功:说明是首次处理,正常执行业务;
    • 若触发唯一约束异常(如 DuplicateKeyException):说明是重复消息,直接忽略。

那么数据库表的设计如下

1
2
3
4
5
6
7
8
9
10
11
-- 支付记录表:通过唯一索引确保同一订单只支付一次
CREATE TABLE `t_payment` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT 'ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号(唯一)',
`pay_amount` decimal(10,2) NOT NULL COMMENT '支付金额',
`pay_time` datetime NOT NULL COMMENT '支付时间',
`pay_status` tinyint NOT NULL COMMENT '支付状态(1:成功)',
PRIMARY KEY (`id`),
-- 核心:对order_no创建唯一索引,确保重复订单无法插入
UNIQUE KEY `uk_order_no` (`order_no`) COMMENT '订单编号唯一约束'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='支付记录表';

实体类忽略,生产者就是正常发送消息,也忽略了,消费者处理有一些不同,主要是尝试插入记录,通过捕获唯一约束异常判断是否为重复消息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
@Component
public class PaymentConsumer {

@Autowired
private PaymentMapper paymentMapper;

/**
* 处理支付成功消息,利用数据库唯一约束保证幂等性
*/
@RabbitListener(queues = "payment.success.queue")
public void handlePaymentSuccess(Message message) {
// 1. 从消息体中获取订单编号
String orderNo = new String(message.getBody());
String messageId = message.getMessageProperties().getMessageId();
System.out.println("收到支付消息:消息ID=" + messageId + ",订单编号=" + orderNo);

// 2. 先查询是否已处理(可选优化:减少数据库异常抛出)
Payment existing = paymentMapper.selectByOrderNo(orderNo);
if (existing != null) {
System.out.println("订单已支付(查询命中):" + orderNo);
return;
}

// 3. 构造支付记录(模拟支付金额和时间)
Payment payment = new Payment();
payment.setOrderNo(orderNo);
payment.setPayAmount(new BigDecimal("99.99")); // 模拟金额
payment.setPayTime(LocalDateTime.now());
payment.setPayStatus(1); // 支付成功

try {
// 4. 尝试插入记录(唯一索引会拦截重复数据)
int rows = paymentMapper.insertPayment(payment);
if (rows > 0) {
System.out.println("支付记录插入成功:" + orderNo);
// 执行后续业务(如更新订单状态、通知用户等)
doAfterPaymentSuccess(orderNo);
}
} catch (DuplicateKeyException e) {
// 5. 捕获唯一约束异常:说明是重复消息,直接忽略
System.out.println("重复支付消息(唯一约束触发):" + orderNo);
} catch (Exception e) {
// 处理其他异常(如数据库连接失败,可触发重试)
System.err.println("支付处理异常:" + e.getMessage());
throw e; // 抛出异常触发Spring AMQP重试
}
}

// 后续业务逻辑(如更新订单状态)
private void doAfterPaymentSuccess(String orderNo) {
System.out.println("执行后续业务:订单" + orderNo + "已支付,更新订单状态");
// 实际场景中可能调用订单服务的API
}
}

这种情况一般连重试都不用配置

状态机模式

状态机模式就是通过业务状态的流转控制和判断,确保同一消息在不同状态下只生效一次。

要想使用这种,你的业务就要有明确的业务状态流转规则,确保同一消息在不同状态下只能被处理一次,当重复消息到达时,若当前状态不符合预设的流转规则(如 “已支付” 状态无法再次执行 “支付” 操作),则直接拒绝处理,从而实现幂等。

在这种情况下,一般会有明确的流转规则,规定状态之间的合法转换路径(如 “待支付→已支付” 是合法的,“已支付→已支付” 是非法的)。

消息处理逻辑

  • 消费者接收消息后,先查询业务实体的当前状态;
  • 根据状态机规则判断当前状态是否允许执行消息对应的操作;
  • 若允许:执行操作并更新状态;
  • 若不允许:直接忽略消息(重复处理无意义)。

代码没什么特别的,定义状态和编写状态机就忽略了,消费者编写需要注意基于状态机处理消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
/**
* 消费者:利用状态机判断消息是否可处理,保证幂等性
*/
@Component
public class OrderConsumer {

@Autowired
private OrderMapper orderMapper;
@Autowired
private OrderStateMachine stateMachine;

/**
* 处理支付成功消息
*/
@RabbitListener(queues = "order.pay.queue")
public void handlePaymentMessage(Message message) {
// 1. 解析消息:获取订单编号和目标状态
String orderNo = new String(message.getBody());
String messageId = message.getMessageProperties().getMessageId();
int targetStatusCode = (int) message.getMessageProperties().getHeader("targetStatus");
OrderStatus targetStatus = OrderStatus.getByCode(targetStatusCode);
System.out.println("收到支付消息:消息ID=" + messageId + ",订单编号=" + orderNo + ",目标状态=" + targetStatus);

// 2. 查询订单当前状态
Order order = orderMapper.selectByOrderNo(orderNo);
if (order == null) {
System.err.println("订单不存在:" + orderNo);
return;
}
OrderStatus currentStatus = order.getCurrentStatus();

// 3. 状态机校验:当前状态是否允许转换到目标状态
if (!stateMachine.isTransitionValid(currentStatus, targetStatus)) {
System.out.println("状态转换非法(幂等拦截):当前状态=" + currentStatus + ",目标状态=" + targetStatus + ",订单编号=" + orderNo);
return; // 非法转换,直接忽略
}

// 4. 合法转换:执行业务逻辑并更新状态
try {
// 模拟支付业务逻辑(如扣减库存、记录支付日志)
processPayment(order);

// 更新订单状态为“已支付”
order.setStatus(targetStatus);
orderMapper.updateById(order);
System.out.println("订单状态更新成功:" + orderNo + ",从" + currentStatus + "到" + targetStatus);
} catch (Exception e) {
System.err.println("支付处理异常:" + e.getMessage());
throw e; // 抛出异常触发重试(仅针对临时故障)
}
}

// 模拟支付业务逻辑
private void processPayment(Order order) {
System.out.println("执行支付逻辑:订单" + order.getOrderNo() + ",金额" + order.getAmount());
// 实际场景:调用支付接口、扣减库存等
}
}

数据库乐观锁

原理:如果业务操作涉及到数据库,可以利用数据库的乐观锁机制。例如,在更新数据库记录时,为表添加一个版本号字段(version)。每次更新操作时,将版本号作为条件,只有当版本号与数据库中的版本号一致时,才进行更新操作,并且更新完成后将版本号加 1。如果版本号不一致,说明该数据已经被其他线程更新过,当前操作可以忽略,从而实现幂等性。

利用数据库乐观锁实现幂等性的核心逻辑:

  1. 业务表中添加 version 字段(版本号,初始值为 0);
  2. 消费者处理消息时,执行 SQL 更新操作,将 version 作为条件(where id = ? and version = ?);
  3. 若更新成功(影响行数 > 0):说明是首次处理,业务正常执行;
  4. 若更新失败(影响行数 = 0):说明消息已被处理(版本号已变更),直接忽略

那么就涉及到了数据库表的设计上的情况

1
2
3
4
5
6
7
8
9
10
11
-- 订单表:包含乐观锁版本号 version
CREATE TABLE `t_order` (
`id` bigint NOT NULL AUTO_INCREMENT COMMENT '订单ID',
`order_no` varchar(64) NOT NULL COMMENT '订单编号(唯一)',
`amount` decimal(10,2) NOT NULL COMMENT '订单金额',
`status` tinyint NOT NULL COMMENT '订单状态(0:待支付,1:已支付,2:已取消)',
`version` int NOT NULL DEFAULT 0 COMMENT '乐观锁版本号',
`create_time` datetime NOT NULL COMMENT '创建时间',
PRIMARY KEY (`id`),
UNIQUE KEY `uk_order_no` (`order_no`) COMMENT '订单编号唯一索引'
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COMMENT='订单表(含乐观锁)';
  • 当第一次处理消息时,version = 0 与数据库匹配,更新成功(rows = 1),version 变为 1;
  • 若消息重复消费(如重试),再次执行时 version = 0 已不匹配(数据库中 version = 1),更新失败(rows = 0),避免重复处理。

乐观锁字段在实体类中也有体现

1
2
3
4
5
6
7
8
9
10
11
12
13
14
import lombok.Data;

import java.math.BigDecimal;
import java.time.LocalDateTime;

@Data
public class Order {
private Long id;
private String orderNo; // 订单编号(业务唯一标识)
private BigDecimal amount;
private Integer status; // 0:待支付,1:已支付,2:已取消
private Integer version; // 乐观锁版本号
private LocalDateTime createTime;
}

生产者(发送带订单信息的消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class OrderProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送订单支付消息(消息体为订单编号)
*/
public void sendPaymentMessage(String orderNo) {
// 生成全局唯一消息ID(可选,用于追踪,非乐观锁必需)
String messageId = UUID.randomUUID().toString();

// 设置消息ID到消息头
MessagePostProcessor postProcessor = message -> {
message.getMessageProperties().setMessageId(messageId);
return message;
};

// 发送消息:交换机、路由键、消息体(订单编号)
rabbitTemplate.convertAndSend(
"order.exchange",
"order.pay",
orderNo, // 消息体携带订单编号(业务唯一标识)
postProcessor
);
System.out.println("发送支付消息:消息ID=" + messageId + ",订单编号=" + orderNo);
}
}

Repository 层对于乐观锁的处理也要有,这里省略了

消费者(基于乐观锁处理消息)

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
import com.example.entity.Order;
import com.example.mapper.OrderMapper;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class OrderConsumer {

@Autowired
private OrderMapper orderMapper;

/**
* 监听订单支付消息,使用乐观锁确保幂等性
*/
@RabbitListener(queues = "order.pay.queue")
public void handlePaymentMessage(Message message) {
// 1. 从消息体中获取订单编号(消息体为字符串类型)
String orderNo = new String(message.getBody());
String messageId = message.getMessageProperties().getMessageId();
System.out.println("收到支付消息:消息ID=" + messageId + ",订单编号=" + orderNo);

// 2. 查询订单当前信息(含版本号)
Order order = orderMapper.selectByOrderNo(orderNo);
if (order == null) {
System.err.println("订单不存在:" + orderNo);
return;
}

// 3. 检查订单当前状态(避免重复处理已支付的订单)
if (order.getStatus() == 1) {
System.out.println("订单已支付,无需处理:" + orderNo);
return;
}

// 4. 执行乐观锁更新:将订单状态从0(待支付)改为1(已支付)
// 关键:使用当前查询到的version作为条件
int rows = orderMapper.updateStatusWithVersion(
orderNo,
order.getVersion(), // 旧版本号(期望的值)
1 // 新状态(已支付)
);

// 5. 根据更新结果判断是否处理成功
if (rows > 0) {
System.out.println("订单支付成功:" + orderNo + ",版本号从" + order.getVersion() + "更新为" + (order.getVersion() + 1));
// 此处可添加后续业务逻辑(如通知物流、扣减库存等)
} else {
System.out.println("订单已被处理(乐观锁更新失败):" + orderNo + ",当前版本号=" + order.getVersion());
// 无需抛出异常,因为更新失败说明是重复消息,直接忽略
}
}
}

RabbitMQ配置优化

除了这种在业务逻辑上进行处理,通过配置,减少重复消息的发送其实也算是增强了幂等性

生产者配置优化,开启生产者确认机制,确保 RabbitMQ 成功接收消息后,生产者才认为发送成功,避免因 “不确定消息是否发送成功” 而盲目重试。

1
2
3
4
5
6
7
8
9
10
spring:
rabbitmq:
host: localhost
port: 5672
username: guest
password: guest
# 开启生产者确认机制(correlated:异步回调确认)
publisher-confirm-type: correlated
# 开启消息回退机制(消息到达交换机但路由失败时触发回调)
publisher-returns: true

代码层面上可以开启配置确认与回退回调,通过 RabbitTemplate 回调确认消息是否到达交换机和队列,避免重复发送:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitProducerConfig {

@Bean
public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
RabbitTemplate rabbitTemplate = new RabbitTemplate(connectionFactory);

// 1. 消息到达交换机的确认回调
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (correlationData == null) return;
String messageId = correlationData.getId();
if (ack) {
// 消息成功到达交换机:无需处理(避免重复发送)
System.out.println("消息[" + messageId + "]已到达交换机");
} else {
// 消息未到达交换机:记录日志,根据业务决定是否重试(如网络故障)
System.err.println("消息[" + messageId + "]未到达交换机,原因:" + cause);
// 此处可添加重试逻辑(需限制重试次数,避免无限循环)
}
});

// 2. 消息路由失败的回退回调(到达交换机但未路由到队列)
rabbitTemplate.setReturnsCallback(returnedMessage -> {
String messageId = returnedMessage.getMessage().getMessageProperties().getMessageId();
System.err.println("消息[" + messageId + "]路由失败:交换机=" + returnedMessage.getExchange()
+ ",路由键=" + returnedMessage.getRoutingKey() + ",原因:" + returnedMessage.getReplyText());
// 路由失败可能是绑定错误,需人工介入,不建议重试(避免无效消息堆积)
});

return rabbitTemplate;
}
}

一般情况下,我们需要限制生产者重试次数与间隔,生产者发送失败时(如网络超时),需设置有限重试,避免因无限制重试导致消息重复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
import org.springframework.amqp.rabbit.retry.SimpleRetryPolicy;
import org.springframework.amqp.rabbit.retry.StatefulRetryOperationsInterceptor;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.retry.interceptor.RetryOperationsInterceptor;
import org.springframework.retry.policy.TimeoutRetryPolicy;
import org.springframework.retry.support.RetryTemplate;

@Configuration
public class ProducerRetryConfig {

@Bean
public RetryOperationsInterceptor producerRetryInterceptor() {
// 1. 重试策略:最多重试3次,仅对特定异常(如网络异常)重试
SimpleRetryPolicy retryPolicy = new SimpleRetryPolicy();
retryPolicy.setMaxAttempts(3); // 最大重试次数(含首次)
// 仅对网络相关异常重试(排除业务异常)
retryPolicy.setRetryableExceptions(java.net.SocketException.class, java.net.ConnectException.class);

// 2. 退避策略:重试间隔指数增长(1s → 2s → 4s)
ExponentialBackOffPolicy backOffPolicy = new ExponentialBackOffPolicy();
backOffPolicy.setInitialInterval(1000); // 首次间隔1s
backOffPolicy.setMultiplier(2); // 间隔乘数
backOffPolicy.setMaxInterval(4000); // 最大间隔4s

// 3. 配置重试模板
RetryTemplate retryTemplate = new RetryTemplate();
retryTemplate.setRetryPolicy(retryPolicy);
retryTemplate.setBackOffPolicy(backOffPolicy);

// 4. 绑定到生产者拦截器
return RetryInterceptorBuilder.stateless()
.retryOperations(retryTemplate)
.build();
}
}
  • 仅对临时异常(如网络波动)重试,对永久异常(如路由键错误)不重试;
  • 限制重试次数和间隔,避免短时间内大量重复消息冲击系统。

交换机与队列配置上就是主要减少消息丢失与重复路由,交换机和队列的稳定性直接影响消息的路由和存储,不合理的配置可能导致消息丢失,间接引发生产者重试和重复消息。

确保消息在 RabbitMQ 中持久化存储,避免 RabbitMQ 重启后消息丢失,进而引发生产者重新发送

1
2
3
4
5
MessagePostProcessor postProcessor = message -> {
// deliveryMode=2:消息持久化(1:非持久化)
message.getMessageProperties().setDeliveryMode(MessageDeliveryMode.PERSISTENT);
return message;
};

需配合队列持久化使用(队列非持久化时,消息持久化无意义)。

然后,要避免交换机和队列过度绑定,过多的绑定关系可能导致消息被重复路由到多个队列,引发 “伪重复” 消息(同一消息被多个消费者处理)。

如果在消费者上进行配置就是减少重复处理,在需要的时候开启手动确认会好很多,然后配置合适的死信队列