RabbitMQ 数据隔离
数据隔离隔离什么
这个其实在前面就应该说的,只不过我给忘记了
在 RabbitMQ
中,数据隔离是指通过技术手段确保不同业务、团队、环境或租户的消息数据相互独立,避免交叉干扰(如消息误消费、权限越界、资源抢占等)。这在多团队共享
RabbitMQ 集群、多租户系统或复杂业务场景中至关重要。
数据隔离一般会有如下隔离:
- 业务隔离:不同业务线(如电商的订单、支付、物流)的消息不混用,避免业务逻辑混乱。
- 环境隔离:开发、测试、生产环境的消息严格分离,防止测试数据污染生产。
- 权限隔离:限制不同角色对消息资源(交换机、队列等)的操作权限(如只读、读写、配置)。
- 资源隔离:防止某一业务过度占用集群资源(如连接数、队列长度),影响其他业务。
RabbitMQ
提供了多种机制实现数据隔离,核心是虚拟主机(vhost),配合权限控制、命名规范、资源限制等形成了完整的方案。
虚拟主机,Virtual Hosts,也就是右上角那个 vhost,它是 RabbitMQ
最核心的隔离单位
- vhost 是 RabbitMQ 原生的隔离机制,相当于一个 “独立的 RabbitMQ
实例”,具备以下特性:
- 资源隔离:每个 vhost
有独立的交换机、队列、绑定关系,不同 vhost
的资源完全隔离(如
/order
vhost
的队列queue1
与/pay
vhost
的queue1
是两个完全独立的队列)。
- 权限隔离:用户权限与 vhost
绑定,一个用户可被授权访问多个 vhost,但在不同 vhost
中权限可不同(如在
/test
有读写权限,在/prod
只有读权限)。
- 连接隔离:客户端连接 RabbitMQ 时必须指定
vhost,无法跨 vhost 访问资源。
image-20250918195314447
而且还可以通过配置限制 vhost
或用户的资源使用,避免单业务占用过多资源,平衡资源分配:
- 连接数限制:限制某 vhost
或用户的最大并发连接数(如
rabbitmqctl set_vhost_limits /order '{"max-connections": 100}'
)。
- 队列大小限制:设置队列的最大消息数或内存占用(如声明队列时指定
x-max-length
或x-max-length-bytes
)。
- 消费者限制:限制单个队列的消费者数量,避免过度竞争。
命名空间通过统一的命名规范区分不同业务 / 环境的资源,配合 vhost
使用可增强隔离清晰度,规范资源命名,常见命名格式:
- 交换机:
{环境}.{业务线}.{功能}.exchange
(如prod.order.notify.direct
)
- 队列:
{环境}.{业务线}.{功能}.queue
(如test.pay.refund.fanout
)
- 路由键:
{业务线}.{操作}
(如order.create
)
权限控制通过 “用户 - 权限 - vhost” 三元组控制访问,权限分为三级:
configure
:是否允许配置资源(创建 /
删除交换机、队列等)。
write
:是否允许发送消息(向交换机发布消息)。
read
:是否允许消费消息(从队列获取消息、确认消息)。
数据隔离实践
以 “多业务线共享 RabbitMQ 集群” 为例
首先,我们先创建用户,进入 RabbitMQ 控制台,点击
Admin
,选择 Users
点击 Add a user
,填写用户信息:
- Username:可以根据项目命名
- Password:如
123
。
- Tag:在这里选择创建的角色的权限级别,Admin
为最高
image-20250918195448101
这些权限是从后往前权限能力依次提高的,鼠标放上去他会告诉你不同权限能做到的有什么
image-20250918200529970
然后我们规划 vhost,为用户创建虚拟主机,按照环境和业务进行划分
登录新创建的用户,并尝试操作虚拟主机
- 登录的用户只能操作与自己关联的虚拟主机中的队列和交换机。如果用户尝试操作其他虚拟主机中的队列,将会遇到权限错误,这就实现了数据隔离。
进入 RabbitMQ 控制台,点击 Virtual Hosts
,然后点击
Add a virtual host
,为用户创建新的虚拟主机
这个就是虚拟主机页面,在这里默认还有一个虚拟主机是/
,这个是默认的,属于默认用户,我们创建一个/prod/order
代表生产环境的订单业务,其中的Description可以描述虚拟主机的用途,然后新创建的虚拟主机会自动分配给当前登录用户
image-20250918200828602
其中涉及到的默认的交换机也会被自动创建
image-20250918200930363
我们在尝试删除其他虚拟主机的队列的时候,就会因为权限问题被拒绝
image-20250918201403941
在代码中声明交换机 /
队列时,按统一格式命名,例如订单业务的交换机:
1 2 3 4 5 6 7 8
| @Bean public DirectExchange orderExchange() { return ExchangeBuilder.directExchange("prod.order.notify.direct") .durable(true) .build(); }
|
针对核心业务 vhost 设置资源上限,防止雪崩:
1 2 3 4 5 6 7 8 9 10 11 12
| # 限制/prod/order的最大连接数为200 rabbitmqctl set_vhost_limits /prod/order '{"max-connections": 200}'
# 限制队列最大消息数(代码中声明时配置) @Bean public Queue orderQueue() { Map<String, Object> args = new HashMap<>(); args.put("x-max-length", 10000); return QueueBuilder.durable("prod.order.notify.queue") .withArguments(args) .build(); }
|
最后链接的时候客户端连接时必须指定
vhost,否则默认连接/
(根 vhost):
1 2 3 4 5 6 7
| ConnectionFactory factory = new ConnectionFactory(); factory.setHost("rabbitmq-host"); factory.setPort(5672); factory.setVirtualHost("/prod/order"); factory.setUsername("order_user"); factory.setPassword("P@ssw0rd");
|
RabbitMQ 死信队列
死信队列的工作原理
死信队列(DLQ)是 RabbitMQ
中一种特殊的消息处理机制,用于存放那些无法被正常消费的
“死信消息”。它本质上是一个普通队列,专门接收因特定原因无法被处理的消息,所以才被叫做死信队列,它的出现便于后续分析和处理,避免消息丢失或业务异常。
死信,在官网中对应的单词为“Dead
Letter”,可以看出翻译确实非常的简单粗暴。消息成为死信(Dead
Letter)需满足以下任一条件:
- 消息被拒绝(Rejected)
消费者使用
basic.reject
或basic.nack
方法拒绝消费消息,且requeue
参数设为false
(不重新入队)。
- 消息过期(TTL 过期)
- 消息本身设置了过期时间(
expiration
属性),且超过该时间未被消费。
- 消息所在队列设置了统一过期时间(
x-message-ttl
),消息在队列中停留时间超过该值。
- 队列达到最大长度
队列通过
x-max-length
或x-max-length-bytes
设置了最大容量,当新消息进入导致队列溢出时,最早的消息会成为死信。
“死信”消息会被RabbitMQ进行特殊处理,如果配置了死信队列信息,那么该消息将会被丢进死信队列中,如果没有配置,则该消息将会被丢弃。
死信队列的实现依赖 “死信交换机(Dead Letter
Exchange,DLX)”,整体流程如下:
- 声明普通队列时绑定死信交换机
在创建业务队列时,通过参数指定该队列的死信交换机(
x-dead-letter-exchange
)和可选的死信路由键(x-dead-letter-routing-key
)。
- 消息成为死信后自动转发
当消息满足死信条件时,RabbitMQ
会自动将其转发到绑定的死信交换机,再由死信交换机根据路由规则投递到死信队列。
- 死信队列的消费与处理
死信队列中的消息可由专门的消费者处理(如人工干预、重试机制等),这就避免原业务队列受影响。
那么“死信”被丢到死信队列中后,会发生什么变化呢?
如果队列配置了参数 x-dead-letter-routing-key
的话,“死信”的路由key将会被替换成该参数对应的值。如果没有设置,则保留该消息原有的路由key。例如,如果原有消息的路由key是testA
,被发送到业务Exchage中,然后被投递到业务队列
QueueA
中,如果该队列没有配置参数x-dead-letter-routing-key
,则该消息成为死信后,将保留原有的路由keytestA
,如果配置了该参数,并且值设置为testB
,那么该消息成为死信后,路由key将会被替换为testB
,然后被抛到死信交换机中。另外,由于被抛到了死信交换机,所以消息的Exchange
Name也会被替换为死信交换机的名称。
死信消息的 Header 中东西比较多,它是 RabbitMQ
为死信消息添加的额外元数据,用于记录消息成为死信的原因、来源等关键信息
Header 字段 |
含义说明 |
出现场景 |
x-death |
死信消息的核心元数据集合,是一个数组(可能包含多条记录,如多次成为死信的情况) |
所有死信场景都会出现 |
x-death[].reason |
消息成为死信的具体原因 |
- expired :消息过期 -
rejected :被消费者拒绝(requeue=false ) -
maxlen :队列达到最大长度导致溢出 -
immediate :已废弃(早期版本用于 “立即投递” 失败) |
x-death[].queue |
消息成为死信前所在的原队列名称 |
所有死信场景都会出现 |
x-death[].exchange |
原队列绑定的交换机名称 |
所有死信场景都会出现 |
x-death[].routing-keys |
消息被路由到原队列时使用的路由键(数组形式) |
所有死信场景都会出现 |
x-death[].time |
消息成为死信的时间戳(Unix 时间,毫秒级) |
所有死信场景都会出现 |
x-death[].count |
消息第几次成为死信(同一消息可能因重试等原因多次进入死信队列) |
消息多次成为死信时出现 |
x-death[].original-expiration |
原消息的过期时间(仅当因过期成为死信时存在) |
消息因 TTL 过期成为死信时 |
其实也通过 RabbitMQ 管理界面查看
进入死信队列详情页(Queues and Streams
→
选择死信队列)。
点击 Get messages
按钮(可选择 Ack mode
为 Nack requeue true
避免消息被删除)。
在获取的消息列表中,点击某条消息的 Headers
展开,即可看到 x-death
等死信相关 Header。
死信队列的配置与实现
如何配置死信队列呢?大概可以分为以下步骤:
- 配置业务队列,绑定到业务交换机上
- 为业务队列配置死信交换机和路由key
- 为死信交换机配置死信队列
可以看到和配置正常的队列类似,所以我们能得知,死信队列的配置并不是直接声明一个公共的死信队列,然后所以死信消息就自己跑到死信队列里去了。而是为每个需要使用死信的业务队列配置一个死信交换机,这里同一个项目的死信交换机可以共用一个,然后为每个业务队列分配一个单独的路由key。
有了死信交换机和路由key后,接下来,就像配置业务队列一样,配置死信队列,然后绑定在死信交换机上。也就是说,死信队列并不是什么特殊的队列,只不过是绑定在死信交换机上的队列。死信交换机也不是什么特殊的交换机,只不过是用来接受死信的交换机,所以可以为任何类型。
一般来说,会为每个业务队列分配一个独有的路由key,并对应的配置一个死信队列进行监听,也就是说,一般会为每个重要的业务队列配置一个死信队列。
接下来开始上代码
DeadLetterQueueConfig.java
-
死信队列配置类,定义了死信交换机、死信队列、业务队列等
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88
| @Configuration public class DeadLetterQueueConfig {
public static final String DEAD_LETTER_EXCHANGE = "ergou.dead.letter.exchange"; public static final String DEAD_LETTER_QUEUE = "ergou.dead.letter.queue"; public static final String DEAD_LETTER_ROUTING_KEY = "dead.letter"; public static final String BUSINESS_QUEUE = "ergou.business.queue"; public static final String BUSINESS_EXCHANGE = "ergou.business.exchange"; public static final String BUSINESS_ROUTING_KEY = "business";
@Bean public DirectExchange deadLetterExchange() { return new DirectExchange(DEAD_LETTER_EXCHANGE); }
@Bean public Queue deadLetterQueue() { return QueueBuilder.durable(DEAD_LETTER_QUEUE).build(); }
@Bean public Binding deadLetterBinding() { return BindingBuilder.bind(deadLetterQueue()) .to(deadLetterExchange()) .with(DEAD_LETTER_ROUTING_KEY); }
@Bean public DirectExchange businessExchange() { return new DirectExchange(BUSINESS_EXCHANGE); }
@Bean public Queue businessQueue() { Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", DEAD_LETTER_EXCHANGE); args.put("x-dead-letter-routing-key", DEAD_LETTER_ROUTING_KEY); args.put("x-message-ttl", 10000); return QueueBuilder.durable(BUSINESS_QUEUE) .withArguments(args) .build(); }
@Bean public Binding businessBinding() { return BindingBuilder.bind(businessQueue()) .to(businessExchange()) .with(BUSINESS_ROUTING_KEY); } }
|
业务队列的消息生产者和消费者我将忽略,直接来看死信队列消息消费者,它用于处理死信队列中的消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Component @Slf4j public class DeadLetterQueueConsumer {
@RabbitListener(queues = DeadLetterQueueConfig.DEAD_LETTER_QUEUE) public void receiveDeadLetter(OrderMessage message) { log.info("死信队列消费者接收到消息: {}", message); log.info("记录死信消息: 订单号={}, 产品={}, 数量={}", message.getOrderNo(), message.getProductName(), message.getQuantity()); log.info("死信消息处理完成: {}", message.getOrderNo()); } }
|
测试用的控制器也展示部分了
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63
|
@PostMapping("/send") public Map<String, Object> sendMessage(@RequestParam("productName") String productName, @RequestParam(value = "quantity", defaultValue = "1") Integer quantity) { Map<String, Object> result = new HashMap<>(); try { OrderMessage orderMessage = messageProducer.createSampleOrder(System.currentTimeMillis(), productName); orderMessage.setQuantity(quantity); businessQueueProducer.sendMessage(orderMessage); result.put("success", true); result.put("message", "消息已发送到业务队列"); result.put("orderMessage", orderMessage); log.info("消息已发送到业务队列: {}", orderMessage); } catch (Exception e) { result.put("success", false); result.put("message", "发送消息失败: " + e.getMessage()); log.error("发送消息失败: {}", e.getMessage(), e); } return result; }
@PostMapping("/send-ttl") public Map<String, Object> sendMessageWithTTL(@RequestParam("productName") String productName, @RequestParam(value = "quantity", defaultValue = "1") Integer quantity, @RequestParam(value = "ttl", defaultValue = "5000") Long ttl) { Map<String, Object> result = new HashMap<>(); try { OrderMessage orderMessage = messageProducer.createSampleOrder(System.currentTimeMillis(), productName); orderMessage.setQuantity(quantity); businessQueueProducer.sendMessageWithTTL(orderMessage, ttl); result.put("success", true); result.put("message", "消息已发送到业务队列,TTL=" + ttl + "ms"); result.put("orderMessage", orderMessage); result.put("ttl", ttl); log.info("消息已发送到业务队列,TTL={}: {}", ttl, orderMessage); } catch (Exception e) { result.put("success", false); result.put("message", "发送消息失败: " + e.getMessage()); log.error("发送消息失败: {}", e.getMessage(), e); } return result; }
|
一个队列可以配置多个死信队列,根据不同的条件路由到不同的死信队列,而且死信队列也可以配置死信队列,形成链式结构,但要注意避免循环依赖
启动 Spring Boot 项目,项目启动后会自动创建以下队列和交换机
- 死信交换机:
ergou.dead.letter.exchange
- 死信队列:
ergou.dead.letter.queue
- 业务交换机:
ergou.business.exchange
- 业务队列:
ergou.business.queue
image-20250918203802788
image-20250918203814357
在这个场景中,我们将发送一个 quantity > 5
的消息到业务队列,消费者会拒绝这个消息,消息会被路由到死信队列。
image-20250918204009573
可以看到日志,业务队列消费者会拒绝这个消息,消息被路由到死信队列
image-20250918204231595
你也可以试试发送一个ttl过长的进行测试,消息会在指定的 TTL
时间后过期,然后被路由到死信队列,POST http://localhost:8091/dead-letter/send-ttl?productName=过期商品&quantity=3&ttl=5000
在某些场景下,我们可能需要直接发送消息到死信队列,而不经过业务队列
image-20250918204940060
可以看到,死信队列接受到了这个消息,所以流程到此为止就打通了。
RabbitMQ 延迟队列
什么是延迟队列
延迟队列和死信队列息息相关,一定要了解死信队列
延时队列,它再怎么样也是队列,队列意味着内部的元素是有序的,元素出队和入队是有方向性的,元素从一端进入,从另一端取出。
其次,延时队列,最重要的特性就体现在它的延时属性上,跟普通的队列不一样的是,普通队列中的元素总是等着希望被早点取出处理,而延时队列中的元素则是希望被在指定时间得到取出和处理,所以延时队列中的元素是都是带时间属性的,通常来说是需要被处理的消息或者任务。
延时队列就是用来存放需要在指定时间被处理的元素的队列。
那么延时队列在什么情况下可以很好的使用呢?
像订单超时自动取消这种带有对特定业务有时间有效期的这种
工作流程
- 下单并发送延迟消息:用户在电商平台下单后,系统会生成订单数据,并发送一条带有延迟时间(如
30 分钟)的消息到延迟队列。消息内容包含订单编号等关键信息。
- 消息在队列中等待:该消息进入延迟队列后,会在队列中等待
30 分钟。这期间,若用户完成支付,支付系统会更新订单状态为
“已支付”。
- 超时后处理:30
分钟时间一到,如果订单仍未支付,消息过期。过期消息会根据设置被转发到死信队列(若采用死信队列方案),或者直接触发相应的消费者处理逻辑(若使用延迟消息插件方案)。消费者获取到该消息后,检查订单状态。若订单仍为
“未支付”,则执行取消订单操作,如更新订单状态为
“已取消”、释放订单占用的库存等。
使用延迟队列的优势
- 提升用户体验与运营效率:对于用户而言,未支付订单在超时后自动取消,避免了用户遗忘支付导致订单长时间占用资源,同时也为其他用户释放了商品库存,提高了商品的流转效率。从运营角度看,减少了人工干预订单取消的工作,降低了运营成本,提高了整体运营效率。
- 系统性能优化:相比使用轮询机制检查订单支付状态,延迟队列不会频繁查询数据库,减少了数据库的压力;与依赖
Redis 失效 key 相比,RabbitMQ 能够更好地处理大量订单数据,削峰填谷,避免
Redis 承受过高压力,提升了系统的稳定性和可靠性。
如何进行实现
- 死信队列方案:需声明一个普通业务队列和一个死信队列,并配置业务队列的死信交换机和死信路由键。同时,设置业务队列的消息过期时间(TTL)为
30
分钟。当消息在业务队列中过期后,会被转发到死信队列,由死信队列的消费者进行订单取消操作。
- 延迟消息插件方案:先安装
rabbitmq_delayed_message_exchange
插件。声明一个类型为x - delayed - message
的交换机和普通队列,并将它们绑定。生产者发送消息时,设置消息的延迟时间为
30
分钟。交换机接收到消息后,会根据延迟时间将消息存储,在延迟时间到达后,将消息投递到对应的队列,消费者获取消息后执行订单取消操作。
TTL
该部分引用自https://blog.csdn.net/Mou_O/article/details/106093749,为后来补充
在详细讲解延时队列之前,还需要先介绍一下RabbitMQ中的一个高级特性——TTL(Time To Live)
TTL是什么呢?TTL是RabbitMQ中一个消息或者队列的属性,表明一条消息或者该队列中的所有消息的最大存活时间,单位是毫秒。换句话说,如果一条消息设置了TTL属性或者进入了设置TTL属性的队列,那么这条消息如果在TTL设置的时间内没有被消费,则会成为“死信”。如果同时配置了队列的TTL和消息的TTL,那么较小的那个值将会被使用。
有两种方式设置 TTL
值,第一种是在创建队列的时候设置队列的“x-message-ttl”属性,如下:
1 2 3
| Map<String, Object> args = new HashMap<String, Object>(); args.put("x-message-ttl", 6000); channel.queueDeclare(queueName, durable, exclusive, autoDelete, args);
|
这样所有被投递到该队列的消息都最多不会存活超过6s。另一种方式便是针对每条消息设置TTL,代码如下
1 2 3 4
| AMQP.BasicProperties.Builder builder = new AMQP.BasicProperties.Builder(); builder.expiration("6000"); AMQP.BasicProperties properties = builder.build(); channel.basicPublish(exchangeName, routingKey, mandatory, properties, "msg body".getBytes());
|
这样这条消息的过期时间也被设置成了6s。
但这两种方式是有区别的,如果设置了队列的TTL属性,那么一旦消息过期,就会被队列丢弃,而第二种方式,消息即使过期,也不一定会被马上丢弃,因为消息是否过期是在即将投递到消费者之前判定的,如果当前队列有严重的消息积压情况,则已过期的消息也许还能存活较长时间。
另外,还需要注意的一点是,如果不设置TTL,表示消息永远不会过期,如果将TTL设置为0,则表示除非此时可以直接投递该消息到消费者,否则该消息将会被丢弃。
延迟队列的实现
延迟队列就是消息希望被延迟多久之后处理,TTL则刚好能让消息在延迟多久之后成为死信
,另一方面,成为死信的消息都会被投递到死信队列里
,这样只需要消费者一直消费死信队列里的消息就万事大吉了,因为里面的消息都是希望被立即处理的消息。
从下图可以大致看出消息的流向:
image-20250919151933754
下面我们就实际编写代码实现
延迟队列有两种实现方式,一种是上面的基于TTL和死信队列的延迟队列,还有一种利用RabbitMQ的插件,先说第一种我们学习的比较常规的。
首先我们要进行延迟队列的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86
| @Configuration public class DelayQueueConfig {
public static final String DELAY_EXCHANGE_TTL = "ergou.delay.ttl.exchange"; public static final String DELAY_QUEUE_TTL = "ergou.delay.ttl.queue"; public static final String DELAY_ROUTING_KEY_TTL = "delay.ttl"; public static final String TARGET_EXCHANGE_TTL = "ergou.delay.ttl.target.exchange"; public static final String TARGET_QUEUE_TTL = "ergou.delay.ttl.target.queue"; public static final String TARGET_ROUTING_KEY_TTL = "delay.ttl.target";
@Bean public DirectExchange delayExchangeTTL() { return new DirectExchange(DELAY_EXCHANGE_TTL); }
@Bean public Queue delayQueueTTL() { Map<String, Object> args = new HashMap<>(3); args.put("x-dead-letter-exchange", TARGET_EXCHANGE_TTL); args.put("x-dead-letter-routing-key", TARGET_ROUTING_KEY_TTL); return QueueBuilder.durable(DELAY_QUEUE_TTL) .withArguments(args) .build(); }
@Bean public Binding delayBindingTTL() { return BindingBuilder.bind(delayQueueTTL()) .to(delayExchangeTTL()) .with(DELAY_ROUTING_KEY_TTL); }
@Bean public DirectExchange targetExchangeTTL() { return new DirectExchange(TARGET_EXCHANGE_TTL); }
@Bean public Queue targetQueueTTL() { return QueueBuilder.durable(TARGET_QUEUE_TTL).build(); }
@Bean public Binding targetBindingTTL() { return BindingBuilder.bind(targetQueueTTL()) .to(targetExchangeTTL()) .with(TARGET_ROUTING_KEY_TTL); } }
|
接下来我们看发送延迟消息的代码
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
| @Component @RequiredArgsConstructor @Slf4j public class DelayQueueProducer {
private final RabbitTemplate rabbitTemplate;
public void sendDelayMessageWithTTL(OrderMessage message, long delayTime) { log.info("发送延迟消息(基于TTL和死信队列),延迟时间={}ms: {}", delayTime, message); MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setExpiration(String.valueOf(delayTime)); return message; } }; rabbitTemplate.convertAndSend( DelayQueueConfig.DELAY_EXCHANGE_TTL, DelayQueueConfig.DELAY_ROUTING_KEY_TTL, message, messagePostProcessor ); } }
|
然后我们进行延迟消息的接收和消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
|
@Component @Slf4j public class DelayQueueConsumer {
private static final DateTimeFormatter formatter = DateTimeFormatter.ofPattern("yyyy-MM-dd HH:mm:ss");
@RabbitListener(queues = DelayQueueConfig.TARGET_QUEUE_TTL) public void receiveDelayMessageTTL(OrderMessage message) { String now = LocalDateTime.now().format(formatter); log.info("接收到TTL延迟消息,当前时间:{}, 消息内容: {}", now, message); processOrder(message); }
private void processOrder(OrderMessage message) { log.info("处理订单: {}, 产品: {}, 数量: {}, 价格: {}", message.getOrderNo(), message.getProductName(), message.getQuantity(), message.getPrice()); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } log.info("订单处理完成: {}", message.getOrderNo()); } }
|
测试的 RESTFUL 的控制器如下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
|
@PostMapping("/send-ttl") public Map<String, Object> sendDelayMessageWithTTL( @RequestParam("productName") String productName, @RequestParam(value = "quantity", defaultValue = "1") Integer quantity, @RequestParam(value = "delayTime", defaultValue = "10000") Long delayTime) { Map<String, Object> result = new HashMap<>(); try { OrderMessage orderMessage = messageProducer.createSampleOrder(System.currentTimeMillis(), productName); orderMessage.setQuantity(quantity); String sendTime = LocalDateTime.now().format(formatter); delayQueueProducer.sendDelayMessageWithTTL(orderMessage, delayTime); String expectedTime = LocalDateTime.now().plusNanos(delayTime * 1000000).format(formatter); result.put("success", true); result.put("message", "延迟消息已发送(基于TTL和死信队列)"); result.put("orderMessage", orderMessage); result.put("delayTime", delayTime + "ms"); result.put("sendTime", sendTime); result.put("expectedTime", expectedTime); log.info("延迟消息已发送(基于TTL和死信队列),延迟时间={}ms,发送时间={},预计到达时间={}: {}", delayTime, sendTime, expectedTime, orderMessage); } catch (Exception e) { result.put("success", false); result.put("message", "发送延迟消息失败: " + e.getMessage()); log.error("发送延迟消息失败: {}", e.getMessage(), e); } return result; }
|
发送延迟消息,POST http://localhost:8080/delay-queue/send-ttl?productName=测试商品&quantity=2&delayTime=10000
,可以发现10s后消息变成了死信并且进入了死信队列,然后被消费掉,这样,一个简单的延时队列就算是实现了。
接下来再说另一种方式,如果你想使用基于插件的延迟队列,需要先安装
rabbitmq_delayed_message_exchange
插件:https://github.com/rabbitmq/rabbitmq-delayed-message-exchange/releases
将插件复制到RabbitMQ的插件目录:
{RabbitMQ安装目录}/plugins
,然后启动插件
1
| rabbitmq-plugins enable rabbitmq_delayed_message_exchange
|
那么代码就可以这样写
配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| public static final String DELAY_EXCHANGE_PLUGIN = "ergou.delay.plugin.exchange"; public static final String DELAY_QUEUE_PLUGIN = "ergou.delay.plugin.queue"; public static final String DELAY_ROUTING_KEY_PLUGIN = "delay.plugin";
@Bean public CustomExchange delayExchangePlugin() { Map<String, Object> args = new HashMap<>(1); args.put("x-delayed-type", "direct"); return new CustomExchange(DELAY_EXCHANGE_PLUGIN, "x-delayed-message", true, false, args); }
@Bean public Queue delayQueuePlugin() { return QueueBuilder.durable(DELAY_QUEUE_PLUGIN).build(); }
@Bean public Binding delayBindingPlugin() { return BindingBuilder.bind(delayQueuePlugin()) .to(delayExchangePlugin()) .with(DELAY_ROUTING_KEY_PLUGIN) .noargs(); }
|
消费者
1 2 3 4 5 6 7 8 9 10 11
|
@RabbitListener(queues = DelayQueueConfig.DELAY_QUEUE_PLUGIN) public void receiveDelayMessagePlugin(OrderMessage message) { String now = LocalDateTime.now().format(formatter); log.info("接收到Plugin延迟消息,当前时间:{}, 消息内容: {}", now, message); processOrder(message); }
|
生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public void sendDelayMessageWithPlugin(OrderMessage message, long delayTime) { log.info("发送延迟消息(基于插件),延迟时间={}ms: {}", delayTime, message); MessagePostProcessor messagePostProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setHeader("x-delay", delayTime); return message; } }; rabbitTemplate.convertAndSend( DelayQueueConfig.DELAY_EXCHANGE_PLUGIN, DelayQueueConfig.DELAY_ROUTING_KEY_PLUGIN, message, messagePostProcessor ); }
|