RabbitMQ 数据隔离

数据隔离隔离什么

这个其实在前面就应该说的,只不过我给忘记了

在 RabbitMQ 中,数据隔离是指通过技术手段确保不同业务、团队、环境或租户的消息数据相互独立,避免交叉干扰(如消息误消费、权限越界、资源抢占等)。这在多团队共享 RabbitMQ 集群、多租户系统或复杂业务场景中至关重要。

数据隔离一般会有如下隔离:

  • 业务隔离:不同业务线(如电商的订单、支付、物流)的消息不混用,避免业务逻辑混乱。
  • 环境隔离:开发、测试、生产环境的消息严格分离,防止测试数据污染生产。
  • 权限隔离:限制不同角色对消息资源(交换机、队列等)的操作权限(如只读、读写、配置)。
  • 资源隔离:防止某一业务过度占用集群资源(如连接数、队列长度),影响其他业务。

RabbitMQ 提供了多种机制实现数据隔离,核心是虚拟主机(vhost),配合权限控制、命名规范、资源限制等形成了完整的方案。

虚拟主机,Virtual Hosts,也就是右上角那个 vhost,它是 RabbitMQ 最核心的隔离单位

  • vhost 是 RabbitMQ 原生的隔离机制,相当于一个 “独立的 RabbitMQ 实例”,具备以下特性:
    • 资源隔离每个 vhost 有独立的交换机、队列、绑定关系,不同 vhost 的资源完全隔离(如/order vhost 的队列queue1/pay vhost 的queue1是两个完全独立的队列)。
    • 权限隔离:用户权限与 vhost 绑定,一个用户可被授权访问多个 vhost,但在不同 vhost 中权限可不同(如在/test有读写权限,在/prod只有读权限)。
    • 连接隔离:客户端连接 RabbitMQ 时必须指定 vhost,无法跨 vhost 访问资源。
image-20250918195314447

而且还可以通过配置限制 vhost 或用户的资源使用,避免单业务占用过多资源,平衡资源分配:

  • 连接数限制:限制某 vhost 或用户的最大并发连接数(如rabbitmqctl set_vhost_limits /order '{"max-connections": 100}')。
  • 队列大小限制:设置队列的最大消息数或内存占用(如声明队列时指定x-max-lengthx-max-length-bytes)。
  • 消费者限制:限制单个队列的消费者数量,避免过度竞争。

命名空间通过统一的命名规范区分不同业务 / 环境的资源,配合 vhost 使用可增强隔离清晰度,规范资源命名,常见命名格式:

  • 交换机:{环境}.{业务线}.{功能}.exchange(如prod.order.notify.direct
  • 队列:{环境}.{业务线}.{功能}.queue(如test.pay.refund.fanout
  • 路由键:{业务线}.{操作}(如order.create

权限控制通过 “用户 - 权限 - vhost” 三元组控制访问,权限分为三级:

  • configure:是否允许配置资源(创建 / 删除交换机、队列等)。
  • write:是否允许发送消息(向交换机发布消息)。
  • read:是否允许消费消息(从队列获取消息、确认消息)。

数据隔离实践

以 “多业务线共享 RabbitMQ 集群” 为例

首先,我们先创建用户,进入 RabbitMQ 控制台,点击 Admin,选择 Users

点击 Add a user,填写用户信息:

  • Username:可以根据项目命名
  • Password:如 123
  • Tag:在这里选择创建的角色的权限级别,Admin 为最高
image-20250918195448101

这些权限是从后往前权限能力依次提高的,鼠标放上去他会告诉你不同权限能做到的有什么

image-20250918200529970

然后我们规划 vhost,为用户创建虚拟主机,按照环境和业务进行划分

登录新创建的用户,并尝试操作虚拟主机

  • 登录的用户只能操作与自己关联的虚拟主机中的队列和交换机。如果用户尝试操作其他虚拟主机中的队列,将会遇到权限错误,这就实现了数据隔离。

进入 RabbitMQ 控制台,点击 Virtual Hosts,然后点击 Add a virtual host,为用户创建新的虚拟主机

这个就是虚拟主机页面,在这里默认还有一个虚拟主机是/,这个是默认的,属于默认用户,我们创建一个/prod/order代表生产环境的订单业务,其中的Description可以描述虚拟主机的用途,然后新创建的虚拟主机会自动分配给当前登录用户

image-20250918200828602

其中涉及到的默认的交换机也会被自动创建

image-20250918200930363

我们在尝试删除其他虚拟主机的队列的时候,就会因为权限问题被拒绝

image-20250918201403941

在代码中声明交换机 / 队列时,按统一格式命名,例如订单业务的交换机:

1
2
3
4
5
6
7
8
// Java示例:声明订单业务的交换机(绑定到/prod/order vhost)
@Bean
public DirectExchange orderExchange() {
// 命名格式:环境.业务线.功能.类型
return ExchangeBuilder.directExchange("prod.order.notify.direct")
.durable(true) // 持久化
.build();
}

针对核心业务 vhost 设置资源上限,防止雪崩:

1
2
3
4
5
6
7
8
9
10
11
12
# 限制/prod/order的最大连接数为200
rabbitmqctl set_vhost_limits /prod/order '{"max-connections": 200}'

# 限制队列最大消息数(代码中声明时配置)
@Bean
public Queue orderQueue() {
Map<String, Object> args = new HashMap<>();
args.put("x-max-length", 10000); // 最大10000条消息
return QueueBuilder.durable("prod.order.notify.queue")
.withArguments(args)
.build();
}

最后链接的时候客户端连接时必须指定 vhost,否则默认连接/(根 vhost):

1
2
3
4
5
6
7
// Java客户端连接示例
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("rabbitmq-host");
factory.setPort(5672);
factory.setVirtualHost("/prod/order"); // 指定vhost
factory.setUsername("order_user");
factory.setPassword("P@ssw0rd");

RabbitMQ 死信队列

死信队列的工作原理

死信队列(DLQ)是 RabbitMQ 中一种特殊的消息处理机制,用于存放那些无法被正常消费的 “死信消息”。它本质上是一个普通队列,专门接收因特定原因无法被处理的消息,所以才被叫做死信队列,它的出现便于后续分析和处理,避免消息丢失或业务异常。

死信,在官网中对应的单词为“Dead Letter”,可以看出翻译确实非常的简单粗暴。消息成为死信(Dead Letter)需满足以下任一条件:

  1. 消息被拒绝(Rejected) 消费者使用basic.rejectbasic.nack方法拒绝消费消息,且requeue参数设为false(不重新入队)。
  2. 消息过期(TTL 过期)
    • 消息本身设置了过期时间(expiration属性),且超过该时间未被消费。
    • 消息所在队列设置了统一过期时间(x-message-ttl),消息在队列中停留时间超过该值。
  3. 队列达到最大长度 队列通过x-max-lengthx-max-length-bytes设置了最大容量,当新消息进入导致队列溢出时,最早的消息会成为死信。

“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。

死信队列的实现依赖 “死信交换机(Dead Letter Exchange,DLX)”,整体流程如下:

  1. 声明普通队列时绑定死信交换机 在创建业务队列时,通过参数指定该队列的死信交换机(x-dead-letter-exchange)和可选的死信路由键(x-dead-letter-routing-key)。
  2. 消息成为死信后自动转发 当消息满足死信条件时,RabbitMQ 会自动将其转发到绑定的死信交换机,再由死信交换机根据路由规则投递到死信队列。
  3. 死信队列的消费与处理 死信队列中的消息可由专门的消费者处理(如人工干预、重试机制等),这就避免原业务队列受影响。

那么“死信”被丢到死信队列中后,会发生什么变化呢?

如果队列配置了参数 x-dead-letter-routing-key 的话,“死信”的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。例如,如果原有消息的路由key是testA,被发送到业务Exchage中,然后被投递到业务队列 QueueA 中,如果该队列没有配置参数x-dead-letter-routing-key,则该消息成为死信后,将保留原有的路由keytestA,如果配置了该参数,并且值设置为testB,那么该消息成为死信后,路由key将会被替换为testB,然后被抛到死信交换机中。另外,由于被抛到了死信交换机,所以消息的Exchange Name也会被替换为死信交换机的名称。

死信消息的 Header 中东西比较多,它是 RabbitMQ 为死信消息添加的额外元数据,用于记录消息成为死信的原因、来源等关键信息

Header 字段 含义说明 出现场景
x-death 死信消息的核心元数据集合,是一个数组(可能包含多条记录,如多次成为死信的情况) 所有死信场景都会出现
x-death[].reason 消息成为死信的具体原因 - expired:消息过期 - rejected:被消费者拒绝(requeue=false) - maxlen:队列达到最大长度导致溢出 - immediate:已废弃(早期版本用于 “立即投递” 失败)
x-death[].queue 消息成为死信前所在的原队列名称 所有死信场景都会出现
x-death[].exchange 原队列绑定的交换机名称 所有死信场景都会出现
x-death[].routing-keys 消息被路由到原队列时使用的路由键(数组形式) 所有死信场景都会出现
x-death[].time 消息成为死信的时间戳(Unix 时间,毫秒级) 所有死信场景都会出现
x-death[].count 消息第几次成为死信(同一消息可能因重试等原因多次进入死信队列) 消息多次成为死信时出现
x-death[].original-expiration 原消息的过期时间(仅当因过期成为死信时存在) 消息因 TTL 过期成为死信时

其实也通过 RabbitMQ 管理界面查看

  1. 进入死信队列详情页(Queues and Streams → 选择死信队列)。

  2. 点击 Get messages 按钮(可选择 Ack modeNack requeue true 避免消息被删除)。

  3. 在获取的消息列表中,点击某条消息的 Headers 展开,即可看到 x-death 等死信相关 Header。

死信队列的配置与实现

如何配置死信队列呢?大概可以分为以下步骤:

  • 配置业务队列,绑定到业务交换机上
  • 为业务队列配置死信交换机和路由key
  • 为死信交换机配置死信队列

可以看到和配置正常的队列类似,所以我们能得知,死信队列的配置并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。

有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型。

一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。

接下来开始上代码

DeadLetterQueueConfig.java - 死信队列配置类,定义了死信交换机、死信队列、业务队列等

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
@Configuration
public class DeadLetterQueueConfig {

// 死信交换机名称
public static final String DEAD_LETTER_EXCHANGE = "ergou.dead.letter.exchange";

// 死信队列名称
public static final String DEAD_LETTER_QUEUE = "ergou.dead.letter.queue";

// 死信路由键
public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter";

// 业务队列名称
public static final String BUSINESS_QUEUE = "ergou.business.queue";

// 业务交换机名称
public static final String BUSINESS_EXCHANGE = "ergou.business.exchange";

// 业务路由键
public static final String BUSINESS_ROUTING_KEY = "business";

/**
* 声明死信交换机
* 类型为Direct直连交换机
*/
@Bean
public DirectExchange deadLetterExchange() {
return new DirectExchange(DEAD_LETTER_EXCHANGE);
}

/**
* 声明死信队列
*/
@Bean
public Queue deadLetterQueue() {
return QueueBuilder.durable(DEAD_LETTER_QUEUE).build();
}

/**
* 绑定死信队列到死信交换机
*/
@Bean
public Binding deadLetterBinding() {
return BindingBuilder.bind(deadLetterQueue())
.to(deadLetterExchange())
.with(DEAD_LETTER_ROUTING_KEY);
}

/**
* 声明业务交换机
*/
@Bean
public DirectExchange businessExchange() {
return new DirectExchange(BUSINESS_EXCHANGE);
}

/**
* 声明业务队列
* 设置消息过期时间为10秒
* 设置死信交换机和死信路由键
*/
@Bean
public Queue businessQueue() {
Map<String, Object> args = new HashMap<>(3);
// 设置死信交换机
args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE);
// 设置死信路由键
args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY);
// 设置消息过期时间 10秒
args.put("x-message-ttl", 10000);
// 设置队列最大长度
// args.put("x-max-length", 10);

return QueueBuilder.durable(BUSINESS_QUEUE)
.withArguments(args)
.build();
}

/**
* 绑定业务队列到业务交换机
*/
@Bean
public Binding businessBinding() {
return BindingBuilder.bind(businessQueue())
.to(businessExchange())
.with(BUSINESS_ROUTING_KEY);
}
}

业务队列的消息生产者和消费者我将忽略,直接来看死信队列消息消费者,它用于处理死信队列中的消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Component
@Slf4j
public class DeadLetterQueueConsumer {

/**
* 监听死信队列的消息
* 处理那些在业务队列中被拒绝或过期的消息
*/
@RabbitListener(queues = DeadLetterQueueConfig.DEAD_LETTER_QUEUE)
public void receiveDeadLetter(OrderMessage message) {
log.info("死信队列消费者接收到消息: {}", message);

// 记录死信消息
log.info("记录死信消息: 订单号={}, 产品={}, 数量={}",
message.getOrderNo(),
message.getProductName(),
message.getQuantity());

// 这里可以进行死信消息的处理逻辑
// 例如:记录日志、发送告警、重试处理、人工干预等

log.info("死信消息处理完成: {}", message.getOrderNo());
}
}

测试用的控制器也展示部分了

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
/**
* 发送消息到业务队列
* 如果消息的quantity > 5,消费者会拒绝消息,消息会被路由到死信队列
*/
@PostMapping("/send")
public Map<String, Object> sendMessage(@RequestParam("productName") String productName,
@RequestParam(value = "quantity", defaultValue = "1") Integer quantity) {
Map<String, Object> result = new HashMap<>();

try {
// 创建订单消息
OrderMessage orderMessage = messageProducer.createSampleOrder(System.currentTimeMillis(), productName);
orderMessage.setQuantity(quantity);

// 发送消息到业务队列
businessQueueProducer.sendMessage(orderMessage);

result.put("success", true);
result.put("message", "消息已发送到业务队列");
result.put("orderMessage", orderMessage);

log.info("消息已发送到业务队列: {}", orderMessage);
} catch (Exception e) {
result.put("success", false);
result.put("message", "发送消息失败: " + e.getMessage());
log.error("发送消息失败: {}", e.getMessage(), e);
}

return result;
}

/**
* 发送带TTL的消息到业务队列
* 消息会在指定的TTL时间后过期,然后被路由到死信队列
*/
@PostMapping("/send-ttl")
public Map<String, Object> sendMessageWithTTL(@RequestParam("productName") String productName,
@RequestParam(value = "quantity", defaultValue = "1") Integer quantity,
@RequestParam(value = "ttl", defaultValue = "5000") Long ttl) {
Map<String, Object> result = new HashMap<>();

try {
// 创建订单消息
OrderMessage orderMessage = messageProducer.createSampleOrder(System.currentTimeMillis(), productName);
orderMessage.setQuantity(quantity);

// 发送消息到业务队列,并设置TTL
businessQueueProducer.sendMessageWithTTL(orderMessage, ttl);

result.put("success", true);
result.put("message", "消息已发送到业务队列,TTL=" + ttl + "ms");
result.put("orderMessage", orderMessage);
result.put("ttl", ttl);

log.info("消息已发送到业务队列,TTL={}: {}", ttl, orderMessage);
} catch (Exception e) {
result.put("success", false);
result.put("message", "发送消息失败: " + e.getMessage());
log.error("发送消息失败: {}", e.getMessage(), e);
}

return result;
}

一个队列可以配置多个死信队列,根据不同的条件路由到不同的死信队列,而且死信队列也可以配置死信队列,形成链式结构,但要注意避免循环依赖

启动 Spring Boot 项目,项目启动后会自动创建以下队列和交换机

  • 死信交换机: ergou.dead.letter.exchange
  • 死信队列: ergou.dead.letter.queue
  • 业务交换机: ergou.business.exchange
  • 业务队列: ergou.business.queue
image-20250918203802788
image-20250918203814357

在这个场景中,我们将发送一个 quantity > 5 的消息到业务队列,消费者会拒绝这个消息,消息会被路由到死信队列。

image-20250918204009573

可以看到日志,业务队列消费者会拒绝这个消息,消息被路由到死信队列

image-20250918204231595

你也可以试试发送一个ttl过长的进行测试,消息会在指定的 TTL 时间后过期,然后被路由到死信队列,POST http://localhost:8091/dead-letter/send-ttl?productName=过期商品&quantity=3&ttl=5000

在某些场景下,我们可能需要直接发送消息到死信队列,而不经过业务队列

image-20250918204940060

可以看到,死信队列接受到了这个消息,所以流程到此为止就打通了。

RabbitMQ 延迟队列

什么是延迟队列

延迟队列和死信队列息息相关,一定要了解死信队列

延时队列,它再怎么样也是队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。

其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。

延时队列就是用来存放需要在指定时间被处理的元素的队列。

那么延时队列在什么情况下可以很好的使用呢?

像订单超时自动取消这种带有对特定业务有时间有效期的这种

工作流程

  • 下单并发送延迟消息:用户在电商平台下单后,系统会生成订单数据,并发送一条带有延迟时间(如 30 分钟)的消息到延迟队列。消息内容包含订单编号等关键信息。
  • 消息在队列中等待:该消息进入延迟队列后,会在队列中等待 30 分钟。这期间,若用户完成支付,支付系统会更新订单状态为 “已支付”。
  • 超时后处理:30 分钟时间一到,如果订单仍未支付,消息过期。过期消息会根据设置被转发到死信队列(若采用死信队列方案),或者直接触发相应的消费者处理逻辑(若使用延迟消息插件方案)。消费者获取到该消息后,检查订单状态。若订单仍为 “未支付”,则执行取消订单操作,如更新订单状态为 “已取消”、释放订单占用的库存等。

使用延迟队列的优势

  • 提升用户体验与运营效率:对于用户而言,未支付订单在超时后自动取消,避免了用户遗忘支付导致订单长时间占用资源,同时也为其他用户释放了商品库存,提高了商品的流转效率。从运营角度看,减少了人工干预订单取消的工作,降低了运营成本,提高了整体运营效率。
  • 系统性能优化:相比使用轮询机制检查订单支付状态,延迟队列不会频繁查询数据库,减少了数据库的压力;与依赖 Redis 失效 key 相比,RabbitMQ 能够更好地处理大量订单数据,削峰填谷,避免 Redis 承受过高压力,提升了系统的稳定性和可靠性。

如何进行实现

  • 死信队列方案:需声明一个普通业务队列和一个死信队列,并配置业务队列的死信交换机和死信路由键。同时,设置业务队列的消息过期时间(TTL)为 30 分钟。当消息在业务队列中过期后,会被转发到死信队列,由死信队列的消费者进行订单取消操作。
  • 延迟消息插件方案:先安装rabbitmq_delayed_message_exchange插件。声明一个类型为x - delayed - message的交换机和普通队列,并将它们绑定。生产者发送消息时,设置消息的延迟时间为 30 分钟。交换机接收到消息后,会根据延迟时间将消息存储,在延迟时间到达后,将消息投递到对应的队列,消费者获取消息后执行订单取消操作。

TTL

该部分引用自https://blog.csdn.net/Mou_O/article/details/106093749,为后来补充

在详细讲解延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)

TTL是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。

有两种方式设置 TTL 值,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:

1
2
3
Map<String, Object> args = new HashMap<String, Object>();
args.put("x-message-ttl", 6000);
channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);

这样所有被投递到该队列的消息都最多不会存活超过6s。另一种方式便是针对每条消息设置TTL,代码如下

1
2
3
4
AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder();
builder.expiration("6000");
AMQP.BasicProperties properties = builder.build();
channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());

这样这条消息的过期时间也被设置成了6s。

但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。

另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。

延迟队列的实现

延迟队列就是消息希望被延迟多久之后处理,TTL则刚好能让消息在延迟多久之后成为死信,另一方面,成为死信的消息都会被投递到死信队列里,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。

从下图可以大致看出消息的流向:

image-20250919151933754

下面我们就实际编写代码实现

延迟队列有两种实现方式,一种是上面的基于TTL和死信队列的延迟队列,还有一种利用RabbitMQ的插件,先说第一种我们学习的比较常规的。

首先我们要进行延迟队列的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
@Configuration
public class DelayQueueConfig {

// ================ 基于TTL和死信队列的延迟队列 ================

// 延迟队列交换机
public static final String DELAY_EXCHANGE_TTL = "ergou.delay.ttl.exchange";

// 延迟队列(实际上是一个普通队列,但设置了消息过期后转发到目标队列)
public static final String DELAY_QUEUE_TTL = "ergou.delay.ttl.queue";

// 延迟队列路由键
public static final String DELAY_ROUTING_KEY_TTL = "delay.ttl";

// 目标交换机(消息延迟到期后,转发到此交换机)
public static final String TARGET_EXCHANGE_TTL = "ergou.delay.ttl.target.exchange";

// 目标队列(消息延迟到期后,转发到此队列)
public static final String TARGET_QUEUE_TTL = "ergou.delay.ttl.target.queue";

// 目标路由键
public static final String TARGET_ROUTING_KEY_TTL = "delay.ttl.target";

/**
* 声明延迟交换机
*/
@Bean
public DirectExchange delayExchangeTTL() {
return new DirectExchange(DELAY_EXCHANGE_TTL);
}

/**
* 声明延迟队列
* 设置队列的TTL,并指定死信交换机和路由键
*/
@Bean
public Queue delayQueueTTL() {
Map<String, Object> args = new HashMap<>(3);
// 设置死信交换机
args.put("x-dead-letter-exchange", TARGET_EXCHANGE_TTL);
// 设置死信路由键
args.put("x-dead-letter-routing-key", TARGET_ROUTING_KEY_TTL);
// 设置消息过期时间(毫秒)- 这里设置为0,表示不设置队列级别的TTL,而是在发送消息时设置
// args.put("x-message-ttl", 10000);

return QueueBuilder.durable(DELAY_QUEUE_TTL)
.withArguments(args)
.build();
}

/**
* 绑定延迟队列到延迟交换机
*/
@Bean
public Binding delayBindingTTL() {
return BindingBuilder.bind(delayQueueTTL())
.to(delayExchangeTTL())
.with(DELAY_ROUTING_KEY_TTL);
}

/**
* 声明目标交换机
*/
@Bean
public DirectExchange targetExchangeTTL() {
return new DirectExchange(TARGET_EXCHANGE_TTL);
}

/**
* 声明目标队列
*/
@Bean
public Queue targetQueueTTL() {
return QueueBuilder.durable(TARGET_QUEUE_TTL).build();
}

/**
* 绑定目标队列到目标交换机
*/
@Bean
public Binding targetBindingTTL() {
return BindingBuilder.bind(targetQueueTTL())
.to(targetExchangeTTL())
.with(TARGET_ROUTING_KEY_TTL);
}
}

接下来我们看发送延迟消息的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
@Component
@RequiredArgsConstructor
@Slf4j
public class DelayQueueProducer {

private final RabbitTemplate rabbitTemplate;

/**
* 发送延迟消息(基于TTL和死信队列)
* @param message 订单消息
* @param delayTime 延迟时间(毫秒)
*/
public void sendDelayMessageWithTTL(OrderMessage message, long delayTime) {
log.info("发送延迟消息(基于TTL和死信队列),延迟时间={}ms: {}", delayTime, message);

// 使用MessagePostProcessor设置消息属性
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息过期时间
message.getMessageProperties().setExpiration(String.valueOf(delayTime));
return message;
}
};

rabbitTemplate.convertAndSend(
DelayQueueConfig.DELAY_EXCHANGE_TTL,
DelayQueueConfig.DELAY_ROUTING_KEY_TTL,
message,
messagePostProcessor
);
}
}

然后我们进行延迟消息的接收和消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
/**
* 延迟队列消费者
* 用于处理延迟队列中的消息
*/
@Component
@Slf4j
public class DelayQueueConsumer {

private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");

/**
* 监听基于TTL和死信队列的延迟队列
* 注意:这里监听的是目标队列,而不是延迟队列
*/
@RabbitListener(queues = DelayQueueConfig.TARGET_QUEUE_TTL)
public void receiveDelayMessageTTL(OrderMessage message) {
String now = LocalDateTime.now().format(formatter);
log.info("接收到TTL延迟消息,当前时间:{}, 消息内容: {}", now, message);

// 处理订单逻辑
processOrder(message);
}

/**
* 处理订单逻辑
*/
private void processOrder(OrderMessage message) {
log.info("处理订单: {}, 产品: {}, 数量: {}, 价格: {}",
message.getOrderNo(),
message.getProductName(),
message.getQuantity(),
message.getPrice());

// 模拟订单处理
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

log.info("订单处理完成: {}", message.getOrderNo());
}
}

测试的 RESTFUL 的控制器如下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
/**
* 发送延迟消息(基于TTL和死信队列)
* @param productName 产品名称
* @param quantity 数量
* @param delayTime 延迟时间(毫秒)
* @return 结果
*/
@PostMapping("/send-ttl")
public Map<String, Object> sendDelayMessageWithTTL(
@RequestParam("productName") String productName,
@RequestParam(value = "quantity", defaultValue = "1") Integer quantity,
@RequestParam(value = "delayTime", defaultValue = "10000") Long delayTime) {

Map<String, Object> result = new HashMap<>();

try {
// 创建订单消息
OrderMessage orderMessage = messageProducer.createSampleOrder(System.currentTimeMillis(), productName);
orderMessage.setQuantity(quantity);

// 记录发送时间
String sendTime = LocalDateTime.now().format(formatter);

// 发送延迟消息
delayQueueProducer.sendDelayMessageWithTTL(orderMessage, delayTime);

// 计算预计到达时间
String expectedTime = LocalDateTime.now().plusNanos(delayTime * 1000000).format(formatter);

result.put("success", true);
result.put("message", "延迟消息已发送(基于TTL和死信队列)");
result.put("orderMessage", orderMessage);
result.put("delayTime", delayTime + "ms");
result.put("sendTime", sendTime);
result.put("expectedTime", expectedTime);

log.info("延迟消息已发送(基于TTL和死信队列),延迟时间={}ms,发送时间={},预计到达时间={}: {}",
delayTime, sendTime, expectedTime, orderMessage);
} catch (Exception e) {
result.put("success", false);
result.put("message", "发送延迟消息失败: " + e.getMessage());
log.error("发送延迟消息失败: {}", e.getMessage(), e);
}

return result;
}

发送延迟消息,POST http://localhost:8080/delay-queue/send-ttl?productName=测试商品&quantity=2&delayTime=10000,可以发现10s后消息变成了死信并且进入了死信队列,然后被消费掉,这样,一个简单的延时队列就算是实现了。

接下来再说另一种方式,如果你想使用基于插件的延迟队列,需要先安装 rabbitmq_delayed_message_exchange 插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases

将插件复制到RabbitMQ的插件目录: {RabbitMQ安装目录}/plugins,然后启动插件

1
rabbitmq-plugins enable rabbitmq_delayed_message_exchange

那么代码就可以这样写

配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
// 延迟交换机(使用插件)
public static final String DELAY_EXCHANGE_PLUGIN = "ergou.delay.plugin.exchange";

// 延迟队列(使用插件)
public static final String DELAY_QUEUE_PLUGIN = "ergou.delay.plugin.queue";

// 延迟路由键(使用插件)
public static final String DELAY_ROUTING_KEY_PLUGIN = "delay.plugin";

/**
* 声明基于插件的延迟交换机
* 注意:需要安装rabbitmq_delayed_message_exchange插件
*/
@Bean
public CustomExchange delayExchangePlugin() {
Map<String, Object> args = new HashMap<>(1);
args.put("x-delayed-type", "direct");
// 创建自定义交换机,类型为x-delayed-message
return new CustomExchange(DELAY_EXCHANGE_PLUGIN, "x-delayed-message", true, false, args);
}

/**
* 声明延迟队列(使用插件)
*/
@Bean
public Queue delayQueuePlugin() {
return QueueBuilder.durable(DELAY_QUEUE_PLUGIN).build();
}

/**
* 绑定延迟队列到延迟交换机(使用插件)
*/
@Bean
public Binding delayBindingPlugin() {
return BindingBuilder.bind(delayQueuePlugin())
.to(delayExchangePlugin())
.with(DELAY_ROUTING_KEY_PLUGIN)
.noargs();
}

消费者

1
2
3
4
5
6
7
8
9
10
11
/**
* 监听基于插件的延迟队列
*/
@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE_PLUGIN)
public void receiveDelayMessagePlugin(OrderMessage message) {
String now = LocalDateTime.now().format(formatter);
log.info("接收到Plugin延迟消息,当前时间:{}, 消息内容: {}", now, message);

// 处理订单逻辑
processOrder(message);
}

生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* 发送延迟消息(基于RabbitMQ的延迟消息插件)
* 注意:需要安装rabbitmq_delayed_message_exchange插件
* @param message 订单消息
* @param delayTime 延迟时间(毫秒)
*/
public void sendDelayMessageWithPlugin(OrderMessage message, long delayTime) {
log.info("发送延迟消息(基于插件),延迟时间={}ms: {}", delayTime, message);

// 使用MessagePostProcessor设置消息属性
MessagePostProcessor messagePostProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置延迟时间
message.getMessageProperties().setHeader("x-delay", delayTime);
return message;
}
};

rabbitTemplate.convertAndSend(
DelayQueueConfig.DELAY_EXCHANGE_PLUGIN,
DelayQueueConfig.DELAY_ROUTING_KEY_PLUGIN,
message,
messagePostProcessor
);
}