RabbitMQ 的本地部署安装
安装Erlang环境
首先,RabbitMQ 我们前面说过,是用 Erlang 写的
那么我们肯定要安装 Erlang 环境
Windows版下载地址:http://www.erlang.org/download/otp_win64_17.3.exe
打开安装包,第一步这个界面用于自定义 Erlang
安装内容
image-20250731154907733
说一下,其中
- Microsoft DLL’s (present):是 Windows
系统依赖的动态链接库(DLL 文件)。如果系统里已有必要的
DLL,安装程序会检测到(显示
“present”),一般保持默认不勾选也没问题(依赖通常已存在或会自动处理
)。
- Erlang是 Erlang
运行环境的核心组件,包含虚拟机、基础库等,必须装才能用
Erlang 。
- Erlang Documentation是Erlang
的官方文档(手册、指南等),想学习 / 查阅 Erlang
语法、函数用法时很有用,建议勾选(占空间不大,299.3MB 包含核心 +
文档,整体体积算小 )。
之后就是选择路径,然后一顿 next
image-20250731155242927
装完了,然后就可以安装RabbitMQ了
中间有可能会弹出安装C++库类,确认安装
RabbitMQ的安装—压缩包
首先下载RabbitMQ 的Windows版本,地址: http://www.rabbitmq.com/
这是 RabbitMQ 的官方文档,你可以在这里进行内容的查阅
image-20250731155438735
来到他们的发行版的页面,https://github.com/rabbitmq/rabbitmq-server/releases/tag/v4.1.2
推荐选择 rabbitmq-server-windows-4.1.2.zip
这个文件,如果你是linux就是.rpm
不多说
这个版本解压下来环境变量一配置就能直接用,我不喜欢安装包,如果是安装包你就选择exe
按下 Win + R
组合键,打开 “运行” 对话框,输入
sysdm.cpl
并回车,打开 “系统属性” 窗口。
在 “系统属性” 窗口中,切换到 “高级” 选项卡,点击右下方的 “环境变量”
按钮。
在 “环境变量” 窗口的 “系统变量” 部分,找到 Path
变量,点击 “编辑” 按钮。
在弹出的 “编辑环境变量” 窗口中,点击 “新建”,然后输入 RabbitMQ
解压目录下的 sbin
文件夹路径 (例如你的路径是
D:\rabbitmq_server-4.1.2\sbin
)。
image-20250731161152316
上面忘记说了,没正确配置 ERLANG_HOME
环境变量,没有配就会这样
image-20250731161503443
打开 Erlang 安装路径,确认里面有
bin
、erts-XX
等文件夹,这就是 Erlang
的根目录,记为 {ERLANG_INSTALL_PATH}
。右键
“此电脑”→“属性”→“高级系统设置”→“环境变量”。
点击 “新建”,变量名填 ERLANG_HOME
,变量值填 Erlang
的根目录路径(比如 C:\Program Files\Erlang OTP
)
image-20250731161624944
然后找到 Path
变量,添加 %ERLANG_HOME%\bin
,然后依次点 “确定” 保存。
输入 erl
,回车后能进入 Erlang 交互界面,说明 Erlang
环境正常,输入 halt().
退出
image-20250731161941523
然后使用rabbitmq-server start
启动
image-20250731164208776
这个只是后台启动服务了,想要web管理页面,需要rabbitmq-plugins enable rabbitmq_management
,注意,这个需要重启,才能生效rabbitmq-server stop
,rabbitmq-server start
大功告成,端口是15672
image-20250731164459790
默认账号密码都是 guest 进去之后看到是这样的
image-20250908084853781
配置RabbitMQ
我们进行登录,RabbitMQ 默认的登录用户名和密码都是 guest
。
不过需要注意的是,RabbitMQ
在一些较新的配置策略中,默认情况下,guest
用户只能从
localhost
(也就是本地环回地址)访问管理界面
。如果尝试从其他 IP 地址访问并使用 guest
用户登录,会遇到权限拒绝的错误。
那么我们需要添加一个账号,我们要新增一个用户,选择超级管理员权限
输入以下命令创建新用户(假设创建用户名为
new_user
,密码为 new_password
,实际使用时请替换为你自己设定的用户名和密码):
1
| rabbitmqctl add_user new_user new_password
|
在命令提示符(管理员身份)中输入以下命令,将 new_user
标记为超级管理员(administrator
):
1
| rabbitmqctl set_user_tags new_user administrator
|
set_user_tags
:用于给用户分配角色标签的命令。
administrator
:是 RabbitMQ
最高权限的角色,拥有管理所有资源(队列、交换机、用户等)的权限。
默认情况下,RabbitMQ 有一个名为 /
的虚拟主机(类似独立的命名空间),需要给新用户授予对它的操作权限:
1
| rabbitmqctl set_permissions -p / new_user ".*" ".*" ".*"
|
-p /
:指定虚拟主机为默认的 /
。
"*.*" "*.*" "*.*"
:分别表示允许用户对该虚拟主机下的所有资源执行
配置、写、读 操作(.*
是正则表达式,表示匹配所有)。
进行验证
1 2 3 4 5
| rabbitmqctl list_users
rabbitmqctl list_user_permissions new_user
|
- 确保 RabbitMQ 服务已启动(若未启动,执行
rabbitmq-server start
)。
- 打开浏览器访问
http://localhost:15672
。
- 输入新创建的用户名(
new_user
)和密码(new_password
),即可登录管理界面,此时拥有与默认
guest
用户相同的超级管理员权限。
如果你是docker
在 Docker 中安装 RabbitMQ 其实很简单,比你在windows本地搞好很多
拉取 RabbitMQ 镜像
RabbitMQ 官方提供了 Docker 镜像,我们直接从 Docker Hub
拉取即可。推荐使用带管理界面的版本(management
标签),方便后续操作。执行以下命令拉取镜像:
1
| docker pull rabbitmq:management
|
docker pull
:从 Docker Hub 下载镜像的命令。
rabbitmq:management
:RabbitMQ
镜像的名称及标签,management
表示该镜像包含 Web
管理界面。
创建并启动 RabbitMQ 容器
镜像拉取完成后,需要通过镜像创建并启动一个 RabbitMQ
容器。执行以下命令:
1 2 3 4 5 6 7
| docker run -d \ --name my-rabbitmq \ -p 5672:5672 \ -p 15672:15672 \ -e RABBITMQ_DEFAULT_USER=admin \ -e RABBITMQ_DEFAULT_PASS=admin123 \ rabbitmq:management
|
命令参数解释:
-d
:后台运行容器(守护进程模式)。
--name my-rabbitmq
:给容器起一个名字(my-rabbitmq
,可自定义),方便后续操作。
-p 5672:5672
:端口映射,将容器内的 5672 端口(RabbitMQ
默认通信端口)映射到宿主机的 5672 端口,供程序连接使用。
-p 15672:15672
:将容器内的 15672 端口(RabbitMQ
管理界面端口)映射到宿主机的 15672 端口,用于访问 Web 管理界面。
-e RABBITMQ_DEFAULT_USER=admin
:设置 RabbitMQ
默认管理员用户名(admin
,可自定义)。
-e RABBITMQ_DEFAULT_PASS=admin123
:设置管理员密码(admin123
,可自定义)。
rabbitmq:management
:基于该镜像创建容器。
验证容器是否启动成功
执行以下命令查看容器状态:
如果看到名为my-rabbitmq
的容器,且STATUS
列显示Up
(例如Up 5 seconds
),说明启动成功。
如果启动失败,可以用以下命令查看日志排查问题:
访问 RabbitMQ 管理界面
启动成功后,通过宿主机的 IP
地址(或localhost
,如果在本机操作)访问管理界面:
在浏览器中输入: http://你的Linux主机IP:15672
- 如果是在 Linux 本机操作,可直接输入
http://localhost:15672
。
- 如果是远程服务器,需要替换为服务器的公网 IP。
访问后会出现登录界面,输入步骤 2
中设置的用户名(admin
)和密码(admin123
),即可进入管理界面。
管理界面可以查看队列、交换机、连接等信息,非常直观,适合初学者使用,我是推荐装这个
RabbitMQ 的实践
RabbitMQ 如何发送消息
RabbitMQ
发送消息的过程涉及生产者(发送方)、交换机(Exchange)、队列(Queue)和绑定关系(Binding)四个核心组件。生产者不会直接将消息发送到队列,而是先发送到交换机,再由交换机根据绑定规则将消息路由到对应的队列。
发送消息的核心流程上面其实说过了,这里按照写代码的流程再说一次
- 建立连接:生产者通过 TCP 连接到 RabbitMQ
服务器,创建信道(Channel)。
- 声明资源:确保交换机、队列及绑定关系存在(若不存在则创建)。
- 构建消息:消息包含内容体(Body)和属性(Properties,如消息
ID、过期时间等)。
- 发送消息:通过信道将消息发送到指定交换机,并指定路由键(Routing
Key)。
- 消息确认:RabbitMQ
确认消息接收后,生产者可进行后续处理。
以 Spring Boot 集成 RabbitMQ 为例
在依赖和配置文件写好之后,声明交换机、队列和绑定关系
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
| import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class RabbitMqConfig {
@Bean public DirectExchange demoExchange() { return ExchangeBuilder.directExchange("demo.exchange") .durable(true) .autoDelete(false) .build(); }
@Bean public Queue demoQueue() { return QueueBuilder.durable("demo.queue") .durable(true) .exclusive(false) .autoDelete(false) .build(); }
@Bean public Binding demoBinding(DirectExchange demoExchange, Queue demoQueue) { return BindingBuilder.bind(demoQueue) .to(demoExchange) .with("demo.routing.key"); } }
|
使用 RabbitTemplate
发送消息,它支持多种消息类型
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
| import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.connection.CorrelationData; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
import java.util.UUID;
@Component public class MessageProducer {
@Autowired private RabbitTemplate rabbitTemplate;
public void sendSimpleMessage(String content) { rabbitTemplate.convertAndSend( "demo.exchange", "demo.routing.key", content ); System.out.println("发送简单消息:" + content); }
public void sendMessageWithProperties(String content) { String messageId = UUID.randomUUID().toString(); MessagePostProcessor postProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setMessageId(messageId); message.getMessageProperties().setExpiration("5000"); return message; } };
rabbitTemplate.convertAndSend( "demo.exchange", "demo.routing.key", content, postProcessor ); System.out.println("发送带属性的消息(ID:" + messageId + "):" + content); }
public void sendObjectMessage() { Order order = new Order(); order.setOrderId("ORDER_123456"); order.setAmount(99.9); order.setStatus("PENDING");
rabbitTemplate.convertAndSend( "demo.exchange", "demo.routing.key", order ); System.out.println("发送对象消息:" + order); }
public void sendMessageWithConfirm(String content) { CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());
rabbitTemplate.convertAndSend( "demo.exchange", "demo.routing.key", content, correlationData );
correlationData.getFuture().addCallback( confirm -> { if (confirm.isAck()) { System.out.println("消息发送成功,ID:" + correlationData.getId()); } else { System.err.println("消息发送失败,ID:" + correlationData.getId() + ",原因:" + confirm.getReason()); } }, ex -> System.err.println("消息确认异常:" + ex.getMessage()) ); }
|
我这个没写内部类,自己写一个看看去,消息确认什么的就不写了,我们测试一下
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RabbitMqSenderDemo implements CommandLineRunner {
@Autowired private MessageProducer producer;
public static void main(String[] args) { SpringApplication.run(RabbitMqSenderDemo.class, args); }
@Override public void run(String... args) throws Exception { producer.sendSimpleMessage("Hello, RabbitMQ!"); producer.sendMessageWithProperties("带过期时间的消息"); producer.sendObjectMessage(); producer.sendMessageWithConfirm("带确认机制的消息"); } }
|
RabbitMQ同步调用和异步调用
同步调用
同步调用是指生产者发送消息后,会阻塞等待消费者的响应,直到接收到响应结果后才继续执行后续操作。这种方式常用于需要立即获取处理结果,对响应及时性要求较高的场景,例如简单的查询请求。
我们定义消息发送方和消息接收方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class SynchronousSender {
private static final String QUEUE_NAME = "sync_queue";
@Autowired private RabbitTemplate rabbitTemplate;
public String send(String message) { return (String) rabbitTemplate.convertSendAndReceive(QUEUE_NAME, message, 5000); } }
|
1 2 3 4 5 6 7 8 9 10 11 12 13
| import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class SynchronousReceiver {
@RabbitListener(queues = "sync_queue") public String receive(String message) { System.out.println("Received message: " + message); return "Processed: " + message; } }
|
然后测试同步调用
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
| import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.CommandLineRunner; import org.springframework.boot.SpringApplication; import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication public class RabbitMqSyncApp implements CommandLineRunner {
@Autowired private SynchronousSender sender;
public static void main(String[] args) { SpringApplication.run(RabbitMqSyncApp.class, args); }
@Override public void run(String... args) throws Exception { String response = sender.send("Hello, RabbitMQ!"); System.out.println("Received response: " + response); } }
|
异步调用
异步调用是指生产者发送消息后,不会阻塞等待消费者的响应,而是继续执行后续操作。消费者在接收到消息后,会在后台线程中进行处理。通常会使用回调机制,当消费者处理完消息后,通过回调函数通知生产者处理结果。这种方式适用于对响应及时性要求不高,或者处理过程耗时较长的场景,例如文件上传后的处理、批量数据计算等。
依旧是定义消息发送方和消息接收方
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import org.springframework.util.concurrent.ListenableFuture; import org.springframework.util.concurrent.ListenableFutureCallback;
@Component public class AsynchronousSender {
private static final String QUEUE_NAME = "async_queue";
@Autowired private RabbitTemplate rabbitTemplate;
public void send(String message) { ListenableFuture future = rabbitTemplate.convertSendAndReceive(QUEUE_NAME, message); future.addCallback(new ListenableFutureCallback() { @Override public void onSuccess(Object result) { System.out.println("Received response asynchronously: " + result); }
@Override public void onFailure(Throwable ex) { System.out.println("Failed to receive response: " + ex.getMessage()); } }); } }
|
接收方没啥变化,不写了,测试也没啥变化,Autowried引入别引错了就行
同步调用:优点是逻辑简单,能立即获取响应结果;缺点是如果处理过程耗时较长,会阻塞生产者线程,影响系统的并发性能。
异步调用:优点是不阻塞生产者线程,提高系统的并发处理能力,适用于耗时较长的任务;缺点是代码相对复杂,需要处理回调逻辑,并且对结果的获取不如同步调用直接。
RabbitMQ声明队列和交换机的方式
Spring AMQP 基本 API
Spring AMQP 是 Spring 框架对 AMQP 协议的实现,主要用于简化 RabbitMQ
的开发。它提供了一系列高层 API,封装了 RabbitMQ
底层的连接管理、消息发送、消费等操作。
Spring AMQP 的核心组件
- ConnectionFactory
- 作用:创建和管理与 RabbitMQ
服务器的连接(
Connection
)和信道(Channel
)。
- 常用实现:
CachingConnectionFactory
(默认实现,支持连接和信道缓存,提高性能)。
- RabbitTemplate
- 作用:消息发送和接收的核心工具类,封装了消息序列化、发送、接收、确认等操作。
- 特点:线程安全,可在整个应用中共享使用。
- Exchange/Queue/Binding
- 作用:对应 RabbitMQ 的交换机、队列、绑定关系,通过这些对象声明 AMQP
资源。可以使用工厂类创建,简化队列,交换机的创建过程。绑定队列和交换机时,需要使用BindingBuilder来创建Binding对象。这个对象表示一个队列和一个交换机之间的绑定关系,它定义了消息应该如何从交换机路由到队列。
- 注:这些对象仅用于定义元数据,实际资源创建由 Spring AMQP
自动完成(连接 RabbitMQ 时)。
- MessageListenerContainer
- 作用:管理消息消费者的生命周期,负责创建消费者、监听队列、处理消息,并支持并发消费、消息确认等。
- 常用实现:
SimpleMessageListenerContainer
(支持动态调整并发数)、DirectMessageListenerContainer
(轻量级,性能更好)。
- Message
- 作用:封装消息的载体,包含消息体(
body
)和消息属性(MessageProperties
,如消息
ID、过期时间等)。
一般写的时候,首先我们需要配置
ConnectionFactory,用于创建连接工厂,配置 RabbitMQ
服务器地址、端口、账号等信息。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
| import org.springframework.amqp.rabbit.connection.CachingConnectionFactory; import org.springframework.amqp.rabbit.connection.ConnectionFactory; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class ConnectionConfig {
@Bean public ConnectionFactory connectionFactory() { CachingConnectionFactory factory = new CachingConnectionFactory(); factory.setHost("localhost"); factory.setPort(5672); factory.setUsername("guest"); factory.setPassword("guest"); factory.setVirtualHost("/");
factory.setChannelCacheSize(10); return factory; } }
|
然后调用RabbitTemplate
,它是发送和接收消息的核心工具,提供了丰富的方法:
方法分类 |
常用方法 |
说明 |
发送消息 |
convertAndSend(String exchange, String routingKey, Object message) |
发送消息(自动序列化消息体,支持任意对象) |
|
send(String exchange, String routingKey, Message message) |
发送 Message 对象(需手动构建消息体和属性) |
同步接收消息 |
receiveAndConvert(String queueName) |
从队列接收消息并自动反序列化 |
|
receive(String queueName) |
从队列接收 Message 对象 |
回调配置 |
setConfirmCallback(ConfirmCallback callback) |
设置生产者确认回调(消息是否到达交换机) |
|
setReturnsCallback(ReturnsCallback callback) |
设置消息回退回调(到达交换机但路由失败时触发) |
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57
| import org.springframework.amqp.AmqpException; import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessagePostProcessor; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component;
@Component public class RabbitTemplateDemo {
@Autowired private RabbitTemplate rabbitTemplate;
public void sendSimpleMessage() { String message = "Hello, Spring AMQP!"; rabbitTemplate.convertAndSend("demo.exchange", "demo.key", message); }
public void sendMessageWithProperties() { MessagePostProcessor postProcessor = new MessagePostProcessor() { @Override public Message postProcessMessage(Message message) throws AmqpException { message.getMessageProperties().setMessageId("MSG_001"); message.getMessageProperties().setExpiration("10000"); return message; } }; rabbitTemplate.convertAndSend("demo.exchange", "demo.key", "带属性的消息", postProcessor); }
public Object receiveMessage() { return rabbitTemplate.receiveAndConvert("demo.queue"); }
public void configureCallbacks(RabbitTemplate rabbitTemplate) { rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> { if (ack) { System.out.println("消息已到达交换机"); } else { System.err.println("消息未到达交换机,原因:" + cause); } });
rabbitTemplate.setReturnsCallback(returnedMessage -> { System.err.println("消息路由失败:" + returnedMessage.getReplyText()); }); } }
|
然后声明 Exchange/Queue/Binding,通过 @Bean
声明 AMQP
资源,Spring AMQP 会自动将这些资源创建到 RabbitMQ 中。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39
| import org.springframework.amqp.core.*; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class AmqpResourcesConfig {
@Bean public DirectExchange demoDirectExchange() { return ExchangeBuilder.directExchange("demo.direct.exchange") .durable(true) .autoDelete(false) .build(); }
@Bean public Queue demoQueue() { return QueueBuilder.durable("demo.queue") .durable(true) .exclusive(false) .autoDelete(false) .withArgument("x-dead-letter-exchange", "demo.dlq.exchange") .build(); }
@Bean public Binding demoBinding(DirectExchange demoDirectExchange, Queue demoQueue) { return BindingBuilder.bind(demoQueue) .to(demoDirectExchange) .with("demo.key"); } }
|
最后就是编写消息的消费了,产生了就要消费,Spring AMQP
提供两种消费方式:注解式(@RabbitListener)和接口式(实现
MessageListener)。
注解式,使用 @RabbitListener
注解快速定义消费者,简化代码:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
| import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component public class AnnotationConsumer {
@RabbitListener(queues = "demo.queue") public void handleMessage(String message) { System.out.println("收到消息:" + message); }
@RabbitListener(queues = "order.queue") public void handleOrder(Order order) { System.out.println("收到订单:" + order.getOrderId()); }
@RabbitListener(queues = "advanced.queue") public void handleAdvancedMessage(Message message) { String body = new String(message.getBody()); String messageId = message.getMessageProperties().getMessageId(); System.out.println("消息ID:" + messageId + ",内容:" + body); } }
|
接口式,实现 MessageListener
接口,适合复杂场景:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
| import org.springframework.amqp.core.Message; import org.springframework.amqp.core.MessageListener; import org.springframework.stereotype.Component;
@Component public class InterfaceConsumer implements MessageListener {
@Override public void onMessage(Message message) { String body = new String(message.getBody()); System.out.println("接口式消费:" + body); } }
@Configuration public class ListenerContainerConfig {
@Bean public SimpleMessageListenerContainer messageListenerContainer( ConnectionFactory connectionFactory, InterfaceConsumer consumer) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("demo.queue"); container.setMessageListener(consumer); container.setConcurrentConsumers(2); container.setAcknowledgeMode(AcknowledgeMode.AUTO); return container; } }
|
收到了消息还需要确认,Spring AMQP 支持三种消息确认模式,通过
AcknowledgeMode
配置,消息确认模式控制消费端如何通知
RabbitMQ 消息处理状态。
模式 |
说明 |
AUTO (默认) |
自动确认:消息处理成功后自动确认;抛出异常时拒绝消息并根据配置决定是否重入队。 |
MANUAL |
手动确认:需调用 channel.basicAck()
手动确认,basicNack() 拒绝。 |
NONE |
不确认:RabbitMQ
认为消息一旦投递成功即被确认(不推荐,可能丢失消息)。 |
手动确认示例:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| import com.rabbitmq.client.Channel; import org.springframework.amqp.core.Message; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.amqp.support.AmqpHeaders; import org.springframework.messaging.handler.annotation.Header; import org.springframework.stereotype.Component;
@Component public class ManualAckConsumer {
@RabbitListener(queues = "manual.queue") public void handleMessage(Message message, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception { try { String body = new String(message.getBody()); System.out.println("处理消息:" + body); channel.basicAck(deliveryTag, false); } catch (Exception e) { channel.basicNack(deliveryTag, false, true); } } }
@Configuration public class ManualAckConfig { @Bean public SimpleMessageListenerContainer manualAckContainer( ConnectionFactory connectionFactory, ManualAckConsumer consumer) { SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory); container.setQueueNames("manual.queue"); container.setMessageListener(consumer); container.setAcknowledgeMode(AcknowledgeMode.MANUAL); return container; } }
|
RabbitMQ的简单模式测试
依赖及其配置相关处理
在这里也就展示 RabbitMQ 如何融合 Spring 系列进行开发
RabbitMQ的简单模式,也就是 Hello
World模式,中最简单的也就是一个生产者、一个消费者、一个队列;生产者P发送消息到队列Q,一个消费者C接收消息。它的拓扑结构非常简单:
1
| Producer -> Queue -> Consumer
|
接下来我们来用 Java 代码实现一下
首先让我们导入 RabbitMQ 的依赖
1 2 3 4
| <dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId> </dependency>
|
这个实际上是Spring Boot 提供的 RabbitMQ starter
依赖,它内部自动引入了 RabbitMQ 的 Java
客户端(amqp-client
)以及 Spring AMQP
框架相关组件,一般情况下我们用这个
但是单个的 RabbitMQ 依赖在这
1 2 3 4 5 6 7 8 9 10 11 12
| <dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.8.0</version> </dependency> <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> <version>2.6</version> </dependency>
|
RabbitMQ还有个测试依赖,方便对 RabbitMQ
相关代码(如消息监听、生产者)进行单元测试
1 2 3 4 5
| <dependency> <groupId>org.springframework.amqp</groupId> <artifactId>spring-rabbit-test</artifactId> <scope>test</scope> </dependency>
|
之后,我们填写一些 RabbitMQ 的配置
1 2 3 4 5 6
| spring.rabbitmq.host=localhost spring.rabbitmq.port=5672 spring.rabbitmq.username=guest spring.rabbitmq.password=guest spring.rabbitmq.virtual-host=/
|
具体什么意思相信大家肯定看得懂
那么相应配置类编写如下,简单模式的配置类非常简单
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class SimpleQueueConfig {
public static final String SIMPLE_QUEUE = "ergou.simple.queue";
@Bean public Queue simpleQueue() { return new Queue(SIMPLE_QUEUE, true, false, false); } }
|
编写消息生产者
生产者的核心作用是 “创建消息→指定交换机 / 队列→发送到 RabbitMQ”
编写消息生产者的流程如下
- 创建 RabbitMQ 核心组件(交换机、队列、绑定关系)
- 这个看情况,不一定就写到这里,一般情况下,会写到配置类中,在配置类中声明并且绑定会更清晰
- 编写生产者代码,发送消息
- 编写生产者的消息逻辑,然后通过
RabbitTemplate
(Spring
AMQP 提供的工具类)发送消息,支持发送字符串、对象(需序列化)等类型
- 若发送对象,默认使用 JDK 序列化(需实现
Serializable
),推荐配置 JSON 序列化
- 之后就是在别的地方调用生产者发送消息
其实详细的编写步骤就是
- 创建连接工厂 ConnectionFactory
- 通过连接工厂创建连接 Connection
- 通过连接获取通道 Channel
- 通过通道声明队列 Queue
- 发送消息到队列 Queue 中
但是代码里确实没有显式地看到这些步骤。这正是 Spring Boot 和 Spring
AMQP
的强大之处——它通过自动配置(Auto-Configuration)和模板化编程(Template
Pattern)帮你封装了所有这些底层细节。
Spring
在读取到你yaml或者properties的配置文件的时候,自动创建了一个配置好的
CachingConnectionFactory
。
生产者编写注入 RabbitTemplate
,这是 Spring AMQP
提供的用于发送和接收消息的核心工具类。
RabbitTemplate
(生产者) 和 @RabbitListener
(消费者) 内部都持有一个 ConnectionFactory
。
当需要发送消息时,RabbitTemplate
会从
ConnectionFactory
获取一个 Connection
,然后从
Connection
创建一个 Channel
,用这个
Channel
来执行 AMQP 命令(如
basicPublish
)。完成后,Spring 通常会复用这个
Channel
(得益于
CachingConnectionFactory
)而不是关闭它,这是高性能的关键。
在简单模式下,exchange
参数为空字符串
(""
),这代表使用 默认的匿名交换机 (Default
Exchange)。默认交换机会将消息路由到指定的
routingKey
同名的队列中。所以这里 routingKey
就是你的队列名 SIMPLE_QUEUE
。
而rabbitTemplate.convertAndSend()
方法中的
convert
部分,会自动将你的 OrderMessage
Java
对象通过 MessageConverter
(默认是
SimpleMessageConverter
,但常配置为
Jackson2JsonMessageConverter
)转换成字节数组进行传输。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| import edu.software.ergoutree.rabbitmqmessgae.config.SimpleQueueConfig; import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
@Component @RequiredArgsConstructor @Slf4j public class SimpleQueueProducer {
private final RabbitTemplate rabbitTemplate;
public void sendMessage(OrderMessage message) { log.info("发送简单队列消息: {}", message); rabbitTemplate.convertAndSend("", SimpleQueueConfig.SIMPLE_QUEUE, message); } }
|
加入按照原来 RabbitMQ
的思路去编写,代码就会很长,但是每一步会很清晰
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
| import com.rabbitmq.client.Channel; import com.rabbitmq.client.Connection; import com.rabbitmq.client.ConnectionFactory;
public class NativeProducer { private final static String QUEUE_NAME = "ergou.simple.queue";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest");
try (Connection connection = factory.newConnection(); Channel channel = connection.createChannel()) {
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
String message = "Hello, Native RabbitMQ!"; channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8")); System.out.println(" [x] Sent '" + message + "'"); } } }
|
那么 Config 类如下,使用 @Configuration
表明这是一个配置类。
- 当你使用
@Bean
在配置类中声明 Queue
,
Exchange
, Binding
时,Spring
在应用启动时,会自动用当前的 Connection
和
Channel
去 RabbitMQ
服务器上声明这些资源。这就是为什么你不需要手动写代码去
channel.queueDeclare()
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
@Configuration public class SimpleQueueConfig {
public static final String SIMPLE_QUEUE = "ergou.simple.queue";
@Bean public Queue simpleQueue() { return new Queue(SIMPLE_QUEUE, true, false, false); } }
|
编写消息消费者
消费者的核心作用是
“监听指定队列→接收消息→处理业务逻辑”,步骤更简洁,主要通过注解实现监听:
- 编写消费者代码,监听队列
- 使用
@RabbitListener
注解指定监听的队列名,Spring
会自动创建消费者并监听队列,消息到达时触发 @RabbitHandler
方法处理:
- 然后根据业务需求,可添加消息确认、重试、异常处理等配置
通过
RabbitTemplate
发送消息,支持发送字符串、对象(需序列化)等类型
消费者中使用 @RabbitListener(queues = ...)
注解来监听指定的队列。一旦队列中有消息,receiveMessage
方法就会被自动触发。
方法参数 OrderMessage message
体现了 Spring AMQP
强大的消息自动转换能力,它会自动将消息体(默认是 JSON
格式)反序列化成 OrderMessage
对象。
详细的步骤就是
- 创建连接工厂 ConnectionFactory
- 通过连接工厂创建连接 Connection
- 通过连接获取通道 Channel
- 通过通道接收消息
还是,但是代码里确实没有显式地看到这些步骤。是因为还是RabbitTemplate
自动管理了Connection
和
Channel
,对于消费者,RabbitListenerContainerFactory
会在后台创建并管理一个或多个
Channel
,用于持续地监听队列。它负责处理
Channel
的生命周期、消息确认等所有复杂逻辑。自动声明队列和交换机的逻辑也和上面在生产者说的一样。而在消息转换中,消费者端的
@RabbitListener
会自动执行反向操作,将字节数组转换回
OrderMessage
对象。
这些我只是在这里简单说一下,具体内容在源码中分析
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40
| package edu.software.ergoutree.rabbitmqmessgae.consumer;
import edu.software.ergoutree.rabbitmqmessgae.config.SimpleQueueConfig; import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class SimpleQueueConsumer {
@RabbitListener(queues = SimpleQueueConfig.SIMPLE_QUEUE) public void receiveMessage(OrderMessage message) { log.info("简单队列接收到消息: {}", message); log.info("处理订单: {}, 产品: {}, 数量: {}", message.getOrderNo(), message.getProductName(), message.getQuantity()); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } log.info("简单队列订单处理完成: {}", message.getOrderNo()); } }
|
那么使用原生消费者进行代码设计就是如下了,很清晰的涉及到了每个步骤
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
| import com.rabbitmq.client.*; import java.io.IOException;
public class NativeConsumer { private final static String QUEUE_NAME = "ergou.simple.queue";
public static void main(String[] argv) throws Exception { ConnectionFactory factory = new ConnectionFactory(); factory.setHost("localhost"); factory.setUsername("guest"); factory.setPassword("guest");
Connection connection = factory.newConnection(); Channel channel = connection.createChannel();
channel.queueDeclare(QUEUE_NAME, true, false, false, null); System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
DeliverCallback deliverCallback = (consumerTag, delivery) -> { String message = new String(delivery.getBody(), "UTF-8"); System.out.println(" [x] Received '" + message + "'"); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } System.out.println(" [x] Done"); };
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { }); } }
|
然后我们创建一个控制器用来发送消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46
| package edu.software.ergoutree.rabbitmqmessgae.controller;
import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage; import edu.software.ergoutree.rabbitmqmessgae.producer.MessageProducer; import edu.software.ergoutree.rabbitmqmessgae.producer.SimpleQueueProducer; import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.*;
import java.util.HashMap; import java.util.Map;
@RestController @RequestMapping("/api/simple") @RequiredArgsConstructor public class SimpleQueueController {
private final SimpleQueueProducer simpleQueueProducer; private final MessageProducer messageProducer;
@PostMapping("/send") public Map<String, Object> sendSimpleMessage(@RequestParam Long orderId, @RequestParam String productName) { OrderMessage orderMessage = messageProducer.createSampleOrder(orderId, productName); simpleQueueProducer.sendMessage(orderMessage);
Map<String, Object> result = new HashMap<>(); result.put("success", true); result.put("message", "简单队列消息发送成功"); result.put("data", orderMessage); return result; } }
|
运行测试
运行程序,观察消息在rabbitmq-server服务中的过程
首先启动生产者,可以看到创建了一个队列,进行发送可以看到日志打印了相关信息
RabbitMQ工作模式实践
工作模式在我们之前提到的就是一个生产者、多个消费者、一个队列,一条消息仅被一个消费者处理
它的主要思想是避免排队等待,避免一个消息处理时间过久而无法处理下一个的问题。因此相比简单模式可以有多个消费者,原理就是我们把任务封装为消息并将其发送到队列中,这多个消费者可以一起处理队列中的任务。
RabbitMQ 中的工作模式默认采用轮询的方式,RabbitMQ
将消息平均分配给所有消费者,也就是如果有两个消费者的话,消息逐一分给每个消费者进行消费。接下来我们来用
Java 代码实现一下
Work Queues
工作模式,来测试其轮训消费的功能。
消费者需手动确认消息(auto-ack=false
),处理完一条消息后才会接收下一条(避免消息堆积)。
config 类
工作模式不需要我们特意去往配置文件写什么内容
工作模式和简单模式类似,不需要额外的交换机(用默认匿名交换机即可),核心是声明一个
“供多个消费者竞争的队列”。配置类的作用是在 Spring 启动时,自动向
RabbitMQ 服务器声明队列资源,避免手动调用
channel.queueDeclare()
。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28
| package edu.software.ergoutree.rabbitmqmessgae.config;
import org.springframework.amqp.core.Queue; import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration;
@Configuration public class WorkQueueConfig {
public static final String WORK_QUEUE = "ergou.work.queue";
@Bean public Queue workQueue() { return new Queue(WORK_QUEUE, true, false, false); } }
|
分发耗时任务给多个工作者(消费者),以实现并行处理和负载均衡。一个队列可以被多个消费者同时监听。每条消息只能被一个消费者获取和处理。消费者之间是竞争关系。
生产者类
工作模式的生产者逻辑和简单模式几乎一致 ——
都是向指定队列发送消息,区别仅在于 “队列名称用
WORK_QUEUE
”。代码中复用了 MessageProducer
类(同时支持直接 / 主题 / 扇形模式),重点
在于“如何发送工作队列消息”
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54
| package edu.software.ergoutree.rabbitmqmessgae.producer;
import edu.software.ergoutree.rabbitmqmessgae.config.RabbitMQConfig; import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage; import lombok.RequiredArgsConstructor; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.core.RabbitTemplate; import org.springframework.stereotype.Component;
import java.time.LocalDateTime;
@Component @RequiredArgsConstructor @Slf4j public class MessageProducer {
private final RabbitTemplate rabbitTemplate;
public void sendDirectMessage(OrderMessage message) { log.info("发送直接消息: {}", message); rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE, RabbitMQConfig.DIRECT_ROUTING_KEY, message); }
public void sendTopicMessage(OrderMessage message, String routingKey) { log.info("发送主题消息: {}, 路由键: {}", message, routingKey); rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE, routingKey, message); }
public void sendFanoutMessage(OrderMessage message) { log.info("发送扇形消息: {}", message); rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE, "", message); }
public OrderMessage createSampleOrder(Long orderId, String productName) { OrderMessage orderMessage = new OrderMessage(); orderMessage.setOrderId(orderId); orderMessage.setOrderNo("ORDER-" + System.currentTimeMillis()); orderMessage.setProductName(productName); orderMessage.setPrice(Math.random() * 100); orderMessage.setQuantity((int)(Math.random() * 10) + 1); orderMessage.setCreateTime(LocalDateTime.now()); return orderMessage; } }
|
生产者的代码编写需要注意如下的逻辑:
- 依赖
RabbitTemplate
:Spring 自动根据
application.properties
中的配置(host、port、username
等)初始化连接工厂,无需手动创建 Connection
和
Channel
;
- 消息发送:通过
convertAndSend
方法,将
OrderMessage
对象自动序列化为字节数组(默认 JDK
序列化,建议在配置类中替换为 JSON 序列化,避免序列化异常);
- 路由逻辑:因使用默认匿名交换机,
routingKey
必须等于队列名 WORK_QUEUE
,否则消息会丢失。
消费者类
工作模式的核心是
“多消费者竞争消息”,所以需要编写多个消费者实例(或同一个消费者类的多个
Bean),同时监听 WORK_QUEUE
队列。代码中通过
@RabbitListener
注解指定监听的队列,Spring
会自动创建消费者线程,消息到达时触发方法处理。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68
| package edu.software.ergoutree.rabbitmqmessgae.consumer;
import edu.software.ergoutree.rabbitmqmessgae.config.RabbitMQConfig; import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage; import lombok.extern.slf4j.Slf4j; import org.springframework.amqp.rabbit.annotation.RabbitListener; import org.springframework.stereotype.Component;
@Component @Slf4j public class MessageConsumer {
@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE) public void receiveDirectMessage(OrderMessage message) { log.info("直接队列接收到消息: {}", message); processOrder(message); }
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_1) public void receiveTopicMessage1(OrderMessage message) { log.info("主题队列1接收到消息: {}", message); processOrder(message); }
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_2) public void receiveTopicMessage2(OrderMessage message) { log.info("主题队列2接收到消息: {}", message); processOrder(message); }
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_1) public void receiveFanoutMessage1(OrderMessage message) { log.info("扇形队列1接收到消息: {}", message); processOrder(message); }
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_2) public void receiveFanoutMessage2(OrderMessage message) { log.info("扇形队列2接收到消息: {}", message); processOrder(message); }
private void processOrder(OrderMessage message) { log.info("处理订单: {}, 产品: {}, 数量: {}", message.getOrderNo(), message.getProductName(), message.getQuantity()); try { Thread.sleep(500); } catch (InterruptedException e) { Thread.currentThread().interrupt(); } log.info("订单处理完成: {}", message.getOrderNo()); } }
|
解释一下:
- 多消费者监听同一队列:
receiveWorkMessage1
和
receiveWorkMessage2
都监听 WORK_QUEUE
,Spring
会为每个方法创建独立的消费者线程,实现 “竞争消费”;
- 和上面一样,方法参数
OrderMessage message
无需手动解析
——Spring 会将 RabbitMQ 中的字节数组消息,自动反序列化为
OrderMessage
对象
- 手动确认消息(默认配置):Spring AMQP 默认开启手动确认,只有当
processOrder
方法执行完(无异常),才会向 RabbitMQ 发送
“消息确认”,避免消息在处理中丢失;
- 轮询分配消息:RabbitMQ 会将队列中的消息 “平均分配”
给每个消费者,即使某个消费者处理速度慢,也不会多分配
控制者类
为了方便测试(比如用 Postman 调用),编写
WorkQueueController
提供 REST
API,接收前端参数后,调用生产者发送消息到工作队列。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61
| package edu.software.ergoutree.rabbitmqmessgae.controller;
import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage; import edu.software.ergoutree.rabbitmqmessgae.producer.MessageProducer; import lombok.RequiredArgsConstructor; import org.springframework.web.bind.annotation.*;
import java.util.HashMap; import java.util.Map;
@RestController @RequestMapping("/api/message") @RequiredArgsConstructor public class MessageController {
private final MessageProducer messageProducer;
@PostMapping("/direct") public Map<String, Object> sendDirectMessage(@RequestParam Long orderId, @RequestParam String productName) { OrderMessage orderMessage = messageProducer.createSampleOrder(orderId, productName); messageProducer.sendDirectMessage(orderMessage);
Map<String, Object> result = new HashMap<>(); result.put("success", true); result.put("message", "直接消息发送成功"); result.put("data", orderMessage); return result; }
@PostMapping("/topic") public Map<String, Object> sendTopicMessage(@RequestParam Long orderId, @RequestParam String productName, @RequestParam String routingKey) { OrderMessage orderMessage = messageProducer.createSampleOrder(orderId, productName); messageProducer.sendTopicMessage(orderMessage, routingKey);
Map<String, Object> result = new HashMap<>(); result.put("success", true); result.put("message", "主题消息发送成功"); result.put("data", orderMessage); result.put("routingKey", routingKey); return result; }
@PostMapping("/fanout") public Map<String, Object> sendFanoutMessage(@RequestParam Long orderId, @RequestParam String productName) { OrderMessage orderMessage = messageProducer.createSampleOrder(orderId, productName); messageProducer.sendFanoutMessage(orderMessage);
Map<String, Object> result = new HashMap<>(); result.put("success", true); result.put("message", "扇形消息发送成功"); result.put("data", orderMessage); return result; } }
|
进行测试
用 Postman 或 curl 调用接口,连续发送多条消息
查看项目运行日志,会发现消息被平均分配给两个消费者:
image-20250909152818933
可以发现策略还是轮询,消息被轮询发送到两个工作队列消费者
image-20250909153118617
剩下三个模式类似,我在这里不再演示了
RPC不咋用吧,我是没写过好像