RabbitMQ 的本地部署安装

安装Erlang环境

首先,RabbitMQ 我们前面说过,是用 Erlang 写的

那么我们肯定要安装 Erlang 环境

Windows版下载地址:http://www.erlang.org/download/otp_win64_17.3.exe

打开安装包,第一步这个界面用于自定义 Erlang 安装内容

image-20250731154907733

说一下,其中

  • Microsoft DLL’s (present):是 Windows 系统依赖的动态链接库(DLL 文件)。如果系统里已有必要的 DLL,安装程序会检测到(显示 “present”),一般保持默认不勾选也没问题(依赖通常已存在或会自动处理 )。
  • Erlang是 Erlang 运行环境的核心组件,包含虚拟机、基础库等,必须装才能用 Erlang 。
  • Erlang Documentation是Erlang 的官方文档(手册、指南等),想学习 / 查阅 Erlang 语法、函数用法时很有用,建议勾选(占空间不大,299.3MB 包含核心 + 文档,整体体积算小 )。

之后就是选择路径,然后一顿 next

image-20250731155242927

装完了,然后就可以安装RabbitMQ了

中间有可能会弹出安装C++库类,确认安装

RabbitMQ的安装—压缩包

首先下载RabbitMQ 的Windows版本,地址: http://www.rabbitmq.com/

这是 RabbitMQ 的官方文档,你可以在这里进行内容的查阅

image-20250731155438735

来到他们的发行版的页面,https://github.com/rabbitmq/rabbitmq-server/releases/tag/v4.1.2

推荐选择 rabbitmq-server-windows-4.1.2.zip 这个文件,如果你是linux就是.rpm不多说

这个版本解压下来环境变量一配置就能直接用,我不喜欢安装包,如果是安装包你就选择exe

按下 Win + R 组合键,打开 “运行” 对话框,输入 sysdm.cpl 并回车,打开 “系统属性” 窗口。

在 “系统属性” 窗口中,切换到 “高级” 选项卡,点击右下方的 “环境变量” 按钮。

在 “环境变量” 窗口的 “系统变量” 部分,找到 Path 变量,点击 “编辑” 按钮。

在弹出的 “编辑环境变量” 窗口中,点击 “新建”,然后输入 RabbitMQ 解压目录下的 sbin 文件夹路径 (例如你的路径是 D:\rabbitmq_server-4.1.2\sbin )。

image-20250731161152316

上面忘记说了,没正确配置 ERLANG_HOME 环境变量,没有配就会这样

image-20250731161503443

打开 Erlang 安装路径,确认里面有 binerts-XX 等文件夹,这就是 Erlang 的根目录,记为 {ERLANG_INSTALL_PATH} 。右键 “此电脑”→“属性”→“高级系统设置”→“环境变量”。

点击 “新建”,变量名填 ERLANG_HOME ,变量值填 Erlang 的根目录路径(比如 C:\Program Files\Erlang OTP

image-20250731161624944

然后找到 Path 变量,添加 %ERLANG_HOME%\bin ,然后依次点 “确定” 保存。

输入 erl ,回车后能进入 Erlang 交互界面,说明 Erlang 环境正常,输入 halt().退出

image-20250731161941523

然后使用rabbitmq-server start启动

image-20250731164208776

这个只是后台启动服务了,想要web管理页面,需要rabbitmq-plugins enable rabbitmq_management,注意,这个需要重启,才能生效rabbitmq-server stoprabbitmq-server start

大功告成,端口是15672

image-20250731164459790

默认账号密码都是 guest 进去之后看到是这样的

image-20250908084853781

配置RabbitMQ

我们进行登录,RabbitMQ 默认的登录用户名和密码都是 guest

不过需要注意的是,RabbitMQ 在一些较新的配置策略中,默认情况下,guest 用户只能从 localhost (也就是本地环回地址)访问管理界面 。如果尝试从其他 IP 地址访问并使用 guest 用户登录,会遇到权限拒绝的错误。

那么我们需要添加一个账号,我们要新增一个用户,选择超级管理员权限

输入以下命令创建新用户(假设创建用户名为 new_user,密码为 new_password ,实际使用时请替换为你自己设定的用户名和密码):

1
rabbitmqctl add_user new_user new_password

在命令提示符(管理员身份)中输入以下命令,将 new_user 标记为超级管理员(administrator):

1
rabbitmqctl set_user_tags new_user administrator
  • set_user_tags:用于给用户分配角色标签的命令。
  • administrator:是 RabbitMQ 最高权限的角色,拥有管理所有资源(队列、交换机、用户等)的权限。

默认情况下,RabbitMQ 有一个名为 / 的虚拟主机(类似独立的命名空间),需要给新用户授予对它的操作权限:

1
rabbitmqctl set_permissions -p / new_user ".*" ".*" ".*"
  • -p /:指定虚拟主机为默认的 /
  • "*.*" "*.*" "*.*":分别表示允许用户对该虚拟主机下的所有资源执行 配置、写、读 操作(.* 是正则表达式,表示匹配所有)。

进行验证

1
2
3
4
5
# 查看所有用户列表
rabbitmqctl list_users

# 查看指定用户的权限(以 new_user 为例)
rabbitmqctl list_user_permissions new_user
  1. 确保 RabbitMQ 服务已启动(若未启动,执行 rabbitmq-server start)。
  2. 打开浏览器访问 http://localhost:15672
  3. 输入新创建的用户名(new_user)和密码(new_password),即可登录管理界面,此时拥有与默认 guest 用户相同的超级管理员权限。

如果你是docker

在 Docker 中安装 RabbitMQ 其实很简单,比你在windows本地搞好很多

拉取 RabbitMQ 镜像

RabbitMQ 官方提供了 Docker 镜像,我们直接从 Docker Hub 拉取即可。推荐使用带管理界面的版本(management标签),方便后续操作。执行以下命令拉取镜像:

1
docker pull rabbitmq:management
  • docker pull:从 Docker Hub 下载镜像的命令。
  • rabbitmq:management:RabbitMQ 镜像的名称及标签,management表示该镜像包含 Web 管理界面。

创建并启动 RabbitMQ 容器

镜像拉取完成后,需要通过镜像创建并启动一个 RabbitMQ 容器。执行以下命令:

1
2
3
4
5
6
7
docker run -d \
--name my-rabbitmq \
-p 5672:5672 \
-p 15672:15672 \
-e RABBITMQ_DEFAULT_USER=admin \
-e RABBITMQ_DEFAULT_PASS=admin123 \
rabbitmq:management

命令参数解释:

  • -d:后台运行容器(守护进程模式)。
  • --name my-rabbitmq:给容器起一个名字(my-rabbitmq,可自定义),方便后续操作。
  • -p 5672:5672:端口映射,将容器内的 5672 端口(RabbitMQ 默认通信端口)映射到宿主机的 5672 端口,供程序连接使用。
  • -p 15672:15672:将容器内的 15672 端口(RabbitMQ 管理界面端口)映射到宿主机的 15672 端口,用于访问 Web 管理界面。
  • -e RABBITMQ_DEFAULT_USER=admin:设置 RabbitMQ 默认管理员用户名(admin,可自定义)。
  • -e RABBITMQ_DEFAULT_PASS=admin123:设置管理员密码(admin123,可自定义)。
  • rabbitmq:management:基于该镜像创建容器。

验证容器是否启动成功

执行以下命令查看容器状态:

1
docker ps

如果看到名为my-rabbitmq的容器,且STATUS列显示Up(例如Up 5 seconds),说明启动成功。

如果启动失败,可以用以下命令查看日志排查问题:

1
docker logs my-rabbitmq

访问 RabbitMQ 管理界面

启动成功后,通过宿主机的 IP 地址(或localhost,如果在本机操作)访问管理界面:

在浏览器中输入: http://你的Linux主机IP:15672

  • 如果是在 Linux 本机操作,可直接输入 http://localhost:15672
  • 如果是远程服务器,需要替换为服务器的公网 IP。

访问后会出现登录界面,输入步骤 2 中设置的用户名(admin)和密码(admin123),即可进入管理界面。

管理界面可以查看队列、交换机、连接等信息,非常直观,适合初学者使用,我是推荐装这个

RabbitMQ 的实践

RabbitMQ 如何发送消息

RabbitMQ 发送消息的过程涉及生产者(发送方)、交换机(Exchange)、队列(Queue)和绑定关系(Binding)四个核心组件。生产者不会直接将消息发送到队列,而是先发送到交换机,再由交换机根据绑定规则将消息路由到对应的队列。

发送消息的核心流程上面其实说过了,这里按照写代码的流程再说一次

  1. 建立连接:生产者通过 TCP 连接到 RabbitMQ 服务器,创建信道(Channel)。
  2. 声明资源:确保交换机、队列及绑定关系存在(若不存在则创建)。
  3. 构建消息:消息包含内容体(Body)和属性(Properties,如消息 ID、过期时间等)。
  4. 发送消息:通过信道将消息发送到指定交换机,并指定路由键(Routing Key)。
  5. 消息确认:RabbitMQ 确认消息接收后,生产者可进行后续处理。

以 Spring Boot 集成 RabbitMQ 为例

在依赖和配置文件写好之后,声明交换机、队列和绑定关系

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitMqConfig {

// 1. 声明交换机(这里以 Direct 类型为例,精确路由)
@Bean
public DirectExchange demoExchange() {
// 参数:交换机名称、是否持久化、是否自动删除、附加参数
return ExchangeBuilder.directExchange("demo.exchange")
.durable(true) // 持久化(服务重启后不丢失)
.autoDelete(false) // 不自动删除
.build();
}

// 2. 声明队列
@Bean
public Queue demoQueue() {
// 参数:队列名称、是否持久化、是否排他、是否自动删除、附加参数
return QueueBuilder.durable("demo.queue")
.durable(true) // 持久化消息
.exclusive(false) // 非排他(允许多个连接访问)
.autoDelete(false) // 不自动删除
.build();
}

// 3. 绑定交换机和队列(指定路由键)
@Bean
public Binding demoBinding(DirectExchange demoExchange, Queue demoQueue) {
// 将队列通过路由键 "demo.routing.key" 绑定到交换机
return BindingBuilder.bind(demoQueue)
.to(demoExchange)
.with("demo.routing.key");
}
}

使用 RabbitTemplate 发送消息,它支持多种消息类型

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.connection.CorrelationData;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import java.util.UUID;

@Component
public class MessageProducer {

@Autowired
private RabbitTemplate rabbitTemplate;

/**
* 发送简单文本消息
*/
public void sendSimpleMessage(String content) {
// 参数:交换机名称、路由键、消息内容
rabbitTemplate.convertAndSend(
"demo.exchange", // 交换机名称
"demo.routing.key", // 路由键(需与绑定关系一致)
content // 消息内容(字符串)
);
System.out.println("发送简单消息:" + content);
}

/**
* 发送带属性的消息(如过期时间、消息ID等)
*/
public void sendMessageWithProperties(String content) {
// 生成全局唯一消息ID(用于幂等性处理)
String messageId = UUID.randomUUID().toString();

// 设置消息属性(过期时间、消息ID等)
MessagePostProcessor postProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
// 设置消息ID(会被 RabbitMQ 记录在消息头中)
message.getMessageProperties().setMessageId(messageId);
// 设置消息过期时间(5秒后未消费则成为死信)
message.getMessageProperties().setExpiration("5000");
return message;
}
};

// 发送消息并携带属性
rabbitTemplate.convertAndSend(
"demo.exchange",
"demo.routing.key",
content,
postProcessor // 消息处理器(设置属性)
);
System.out.println("发送带属性的消息(ID:" + messageId + "):" + content);
}

/**
* 发送对象消息(自动序列化为JSON)
*/
public void sendObjectMessage() {
// 定义一个业务对象(如订单)
Order order = new Order();
order.setOrderId("ORDER_123456");
order.setAmount(99.9);
order.setStatus("PENDING");

// 发送对象(Spring AMQP 会自动序列化为JSON,需确保对象可序列化)
rabbitTemplate.convertAndSend(
"demo.exchange",
"demo.routing.key",
order
);
System.out.println("发送对象消息:" + order);
}

/**
* 带确认机制的消息发送(确保消息被 RabbitMQ 接收)
*/
public void sendMessageWithConfirm(String content) {
// 生成消息唯一标识(用于确认回调)
CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

// 发送消息并设置确认回调
rabbitTemplate.convertAndSend(
"demo.exchange",
"demo.routing.key",
content,
correlationData
);

// 回调:消息被 RabbitMQ 确认接收后触发
correlationData.getFuture().addCallback(
confirm -> {
if (confirm.isAck()) { // ack:消息成功接收
System.out.println("消息发送成功,ID:" + correlationData.getId());
} else { // nack:消息接收失败
System.err.println("消息发送失败,ID:" + correlationData.getId() + ",原因:" + confirm.getReason());
}
},
ex -> System.err.println("消息确认异常:" + ex.getMessage())
);
}

我这个没写内部类,自己写一个看看去,消息确认什么的就不写了,我们测试一下

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMqSenderDemo implements CommandLineRunner {

@Autowired
private MessageProducer producer;

public static void main(String[] args) {
SpringApplication.run(RabbitMqSenderDemo.class, args);
}

@Override
public void run(String... args) throws Exception {
// 测试发送不同类型的消息
producer.sendSimpleMessage("Hello, RabbitMQ!");
producer.sendMessageWithProperties("带过期时间的消息");
producer.sendObjectMessage();
producer.sendMessageWithConfirm("带确认机制的消息");
}
}

RabbitMQ同步调用和异步调用

同步调用

同步调用是指生产者发送消息后,会阻塞等待消费者的响应,直到接收到响应结果后才继续执行后续操作。这种方式常用于需要立即获取处理结果,对响应及时性要求较高的场景,例如简单的查询请求。

我们定义消息发送方和消息接收方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class SynchronousSender {

private static final String QUEUE_NAME = "sync_queue";

@Autowired
private RabbitTemplate rabbitTemplate;

public String send(String message) {
// 同步发送消息并等待响应,第二个参数为超时时间(单位:毫秒)
return (String) rabbitTemplate.convertSendAndReceive(QUEUE_NAME, message, 5000);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class SynchronousReceiver {

@RabbitListener(queues = "sync_queue")
public String receive(String message) {
System.out.println("Received message: " + message);
// 模拟业务处理,这里简单返回处理后的消息
return "Processed: " + message;
}
}

然后测试同步调用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.CommandLineRunner;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

@SpringBootApplication
public class RabbitMqSyncApp implements CommandLineRunner {

@Autowired
private SynchronousSender sender;

public static void main(String[] args) {
SpringApplication.run(RabbitMqSyncApp.class, args);
}

@Override
public void run(String... args) throws Exception {
String response = sender.send("Hello, RabbitMQ!");
System.out.println("Received response: " + response);
}
}

异步调用

异步调用是指生产者发送消息后,不会阻塞等待消费者的响应,而是继续执行后续操作。消费者在接收到消息后,会在后台线程中进行处理。通常会使用回调机制,当消费者处理完消息后,通过回调函数通知生产者处理结果。这种方式适用于对响应及时性要求不高,或者处理过程耗时较长的场景,例如文件上传后的处理、批量数据计算等。

依旧是定义消息发送方和消息接收方

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;

@Component
public class AsynchronousSender {

private static final String QUEUE_NAME = "async_queue";

@Autowired
private RabbitTemplate rabbitTemplate;

public void send(String message) {
ListenableFuture future = rabbitTemplate.convertSendAndReceive(QUEUE_NAME, message);
future.addCallback(new ListenableFutureCallback() {
@Override
public void onSuccess(Object result) {
System.out.println("Received response asynchronously: " + result);
}

@Override
public void onFailure(Throwable ex) {
System.out.println("Failed to receive response: " + ex.getMessage());
}
});
}
}

接收方没啥变化,不写了,测试也没啥变化,Autowried引入别引错了就行

同步调用:优点是逻辑简单,能立即获取响应结果;缺点是如果处理过程耗时较长,会阻塞生产者线程,影响系统的并发性能。

异步调用:优点是不阻塞生产者线程,提高系统的并发处理能力,适用于耗时较长的任务;缺点是代码相对复杂,需要处理回调逻辑,并且对结果的获取不如同步调用直接。

RabbitMQ声明队列和交换机的方式

Spring AMQP 基本 API

Spring AMQP 是 Spring 框架对 AMQP 协议的实现,主要用于简化 RabbitMQ 的开发。它提供了一系列高层 API,封装了 RabbitMQ 底层的连接管理、消息发送、消费等操作。

Spring AMQP 的核心组件

  1. ConnectionFactory
    • 作用:创建和管理与 RabbitMQ 服务器的连接(Connection)和信道(Channel)。
    • 常用实现:CachingConnectionFactory(默认实现,支持连接和信道缓存,提高性能)。
  2. RabbitTemplate
    • 作用:消息发送和接收的核心工具类,封装了消息序列化、发送、接收、确认等操作。
    • 特点:线程安全,可在整个应用中共享使用。
  3. Exchange/Queue/Binding
    • 作用:对应 RabbitMQ 的交换机、队列、绑定关系,通过这些对象声明 AMQP 资源。可以使用工厂类创建,简化队列,交换机的创建过程。绑定队列和交换机时,需要使用BindingBuilder来创建Binding对象。这个对象表示一个队列和一个交换机之间的绑定关系,它定义了消息应该如何从交换机路由到队列。
    • 注:这些对象仅用于定义元数据,实际资源创建由 Spring AMQP 自动完成(连接 RabbitMQ 时)。
  4. MessageListenerContainer
    • 作用:管理消息消费者的生命周期,负责创建消费者、监听队列、处理消息,并支持并发消费、消息确认等。
    • 常用实现:SimpleMessageListenerContainer(支持动态调整并发数)、DirectMessageListenerContainer(轻量级,性能更好)。
  5. Message
    • 作用:封装消息的载体,包含消息体(body)和消息属性(MessageProperties,如消息 ID、过期时间等)。

一般写的时候,首先我们需要配置 ConnectionFactory,用于创建连接工厂,配置 RabbitMQ 服务器地址、端口、账号等信息。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
import org.springframework.amqp.rabbit.connection.CachingConnectionFactory;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class ConnectionConfig {

@Bean
public ConnectionFactory connectionFactory() {
CachingConnectionFactory factory = new CachingConnectionFactory();
factory.setHost("localhost"); // RabbitMQ 服务器地址
factory.setPort(5672); // 端口(默认5672)
factory.setUsername("guest");// 用户名
factory.setPassword("guest");// 密码
factory.setVirtualHost("/"); // 虚拟主机

// 配置信道缓存(可选)
factory.setChannelCacheSize(10); // 缓存的信道数量
return factory;
}
}

然后调用RabbitTemplate ,它是发送和接收消息的核心工具,提供了丰富的方法:

方法分类 常用方法 说明
发送消息 convertAndSend(String exchange, String routingKey, Object message) 发送消息(自动序列化消息体,支持任意对象)
send(String exchange, String routingKey, Message message) 发送 Message 对象(需手动构建消息体和属性)
同步接收消息 receiveAndConvert(String queueName) 从队列接收消息并自动反序列化
receive(String queueName) 从队列接收 Message 对象
回调配置 setConfirmCallback(ConfirmCallback callback) 设置生产者确认回调(消息是否到达交换机)
setReturnsCallback(ReturnsCallback callback) 设置消息回退回调(到达交换机但路由失败时触发)
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
import org.springframework.amqp.AmqpException;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessagePostProcessor;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class RabbitTemplateDemo {

@Autowired
private RabbitTemplate rabbitTemplate;

// 1. 发送简单消息(自动序列化)
public void sendSimpleMessage() {
String message = "Hello, Spring AMQP!";
// 参数:交换机、路由键、消息内容
rabbitTemplate.convertAndSend("demo.exchange", "demo.key", message);
}

// 2. 发送带属性的消息
public void sendMessageWithProperties() {
MessagePostProcessor postProcessor = new MessagePostProcessor() {
@Override
public Message postProcessMessage(Message message) throws AmqpException {
message.getMessageProperties().setMessageId("MSG_001"); // 消息ID
message.getMessageProperties().setExpiration("10000"); // 过期时间(10秒)
return message;
}
};
// 携带消息处理器发送
rabbitTemplate.convertAndSend("demo.exchange", "demo.key", "带属性的消息", postProcessor);
}

// 3. 同步接收消息
public Object receiveMessage() {
// 从队列接收消息并反序列化
return rabbitTemplate.receiveAndConvert("demo.queue");
}

// 4. 配置回调(通常在配置类中设置)
public void configureCallbacks(RabbitTemplate rabbitTemplate) {
// 生产者确认回调(消息是否到达交换机)
rabbitTemplate.setConfirmCallback((correlationData, ack, cause) -> {
if (ack) {
System.out.println("消息已到达交换机");
} else {
System.err.println("消息未到达交换机,原因:" + cause);
}
});

// 消息回退回调(路由失败)
rabbitTemplate.setReturnsCallback(returnedMessage -> {
System.err.println("消息路由失败:" + returnedMessage.getReplyText());
});
}
}

然后声明 Exchange/Queue/Binding,通过 @Bean 声明 AMQP 资源,Spring AMQP 会自动将这些资源创建到 RabbitMQ 中。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class AmqpResourcesConfig {

// 1. 声明交换机(Direct类型)
@Bean
public DirectExchange demoDirectExchange() {
// 参数:名称、是否持久化、是否自动删除、附加参数
return ExchangeBuilder.directExchange("demo.direct.exchange")
.durable(true) // 持久化
.autoDelete(false) // 不自动删除
.build();
}

// 2. 声明队列
@Bean
public Queue demoQueue() {
// 参数:名称、是否持久化、是否排他、是否自动删除、附加参数
return QueueBuilder.durable("demo.queue")
.durable(true)
.exclusive(false)
.autoDelete(false)
// 附加参数(如死信配置)
.withArgument("x-dead-letter-exchange", "demo.dlq.exchange")
.build();
}

// 3. 绑定交换机和队列
@Bean
public Binding demoBinding(DirectExchange demoDirectExchange, Queue demoQueue) {
// 将队列通过路由键 "demo.key" 绑定到交换机
return BindingBuilder.bind(demoQueue)
.to(demoDirectExchange)
.with("demo.key");
}
}

最后就是编写消息的消费了,产生了就要消费,Spring AMQP 提供两种消费方式:注解式(@RabbitListener)和接口式(实现 MessageListener)。

注解式,使用 @RabbitListener 注解快速定义消费者,简化代码:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class AnnotationConsumer {

// 监听指定队列
@RabbitListener(queues = "demo.queue")
public void handleMessage(String message) {
System.out.println("收到消息:" + message);
}

// 消费对象消息(自动反序列化为Order对象)
@RabbitListener(queues = "order.queue")
public void handleOrder(Order order) {
System.out.println("收到订单:" + order.getOrderId());
}

// 消费Message对象(可获取消息属性)
@RabbitListener(queues = "advanced.queue")
public void handleAdvancedMessage(Message message) {
String body = new String(message.getBody());
String messageId = message.getMessageProperties().getMessageId();
System.out.println("消息ID:" + messageId + ",内容:" + body);
}
}

接口式,实现 MessageListener 接口,适合复杂场景:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
import org.springframework.amqp.core.Message;
import org.springframework.amqp.core.MessageListener;
import org.springframework.stereotype.Component;

@Component
public class InterfaceConsumer implements MessageListener {

@Override
public void onMessage(Message message) {
String body = new String(message.getBody());
System.out.println("接口式消费:" + body);
}
}

// 配置监听器容器
@Configuration
public class ListenerContainerConfig {

@Bean
public SimpleMessageListenerContainer messageListenerContainer(
ConnectionFactory connectionFactory, InterfaceConsumer consumer) {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("demo.queue"); // 监听的队列
container.setMessageListener(consumer); // 设置消息监听器
container.setConcurrentConsumers(2); // 并发消费者数量
container.setAcknowledgeMode(AcknowledgeMode.AUTO); // 自动确认消息
return container;
}
}

收到了消息还需要确认,Spring AMQP 支持三种消息确认模式,通过 AcknowledgeMode 配置,消息确认模式控制消费端如何通知 RabbitMQ 消息处理状态。

模式 说明
AUTO(默认) 自动确认:消息处理成功后自动确认;抛出异常时拒绝消息并根据配置决定是否重入队。
MANUAL 手动确认:需调用 channel.basicAck() 手动确认,basicNack() 拒绝。
NONE 不确认:RabbitMQ 认为消息一旦投递成功即被确认(不推荐,可能丢失消息)。

手动确认示例

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
import com.rabbitmq.client.Channel;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Component;

@Component
public class ManualAckConsumer {

@RabbitListener(queues = "manual.queue")
public void handleMessage(Message message,
Channel channel,
@Header(AmqpHeaders.DELIVERY_TAG) long deliveryTag) throws Exception {
try {
String body = new String(message.getBody());
System.out.println("处理消息:" + body);
// 手动确认消息(第二个参数:是否批量确认)
channel.basicAck(deliveryTag, false);
} catch (Exception e) {
// 处理失败,拒绝消息并重新入队(第二个参数:是否批量;第三个参数:是否重入队)
channel.basicNack(deliveryTag, false, true);
}
}
}

// 配置确认模式为手动
@Configuration
public class ManualAckConfig {
@Bean
public SimpleMessageListenerContainer manualAckContainer(
ConnectionFactory connectionFactory, ManualAckConsumer consumer) {

SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
container.setQueueNames("manual.queue");
container.setMessageListener(consumer);
container.setAcknowledgeMode(AcknowledgeMode.MANUAL); // 手动确认
return container;
}
}

RabbitMQ的简单模式测试

依赖及其配置相关处理

在这里也就展示 RabbitMQ 如何融合 Spring 系列进行开发

RabbitMQ的简单模式,也就是 Hello World模式,中最简单的也就是一个生产者、一个消费者、一个队列;生产者P发送消息到队列Q,一个消费者C接收消息。它的拓扑结构非常简单:

1
Producer -> Queue -> Consumer

接下来我们来用 Java 代码实现一下

首先让我们导入 RabbitMQ 的依赖

1
2
3
4
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

这个实际上是Spring Boot 提供的 RabbitMQ starter 依赖,它内部自动引入了 RabbitMQ 的 Java 客户端(amqp-client)以及 Spring AMQP 框架相关组件,一般情况下我们用这个

但是单个的 RabbitMQ 依赖在这

1
2
3
4
5
6
7
8
9
10
11
12
<!--rabbitmq 依赖客户端-->
<dependency>
<groupId>com.rabbitmq</groupId>
<artifactId>amqp-client</artifactId>
<version>5.8.0</version>
</dependency>
<!--操作文件流的一个依赖-->
<dependency>
<groupId>commons-io</groupId>
<artifactId>commons-io</artifactId>
<version>2.6</version>
</dependency>

RabbitMQ还有个测试依赖,方便对 RabbitMQ 相关代码(如消息监听、生产者)进行单元测试

1
2
3
4
5
<dependency>
<groupId>org.springframework.amqp</groupId>
<artifactId>spring-rabbit-test</artifactId>
<scope>test</scope>
</dependency>

之后,我们填写一些 RabbitMQ 的配置

1
2
3
4
5
6
# RabbitMQ配置
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/

具体什么意思相信大家肯定看得懂

那么相应配置类编写如下,简单模式的配置类非常简单

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* RabbitMQ简单队列配置
* 简单模式(Simple Queue)是最基础的消息模式,一个生产者对应一个消费者
*/
@Configuration
public class SimpleQueueConfig {

// 定义简单队列名称
public static final String SIMPLE_QUEUE = "ergou.simple.queue";

/**
* 创建简单队列
* 简单队列不需要交换机,消息直接发送到队列
*/
@Bean
public Queue simpleQueue() {
// 参数说明:
// 1. name: 队列名称
// 2. durable: 是否持久化,true表示RabbitMQ重启后队列仍然存在
// 3. exclusive: 是否排他性,true表示仅限于声明它的连接可见,连接关闭后队列删除
// 4. autoDelete: 是否自动删除,true表示当最后一个消费者断开连接后队列自动删除
return new Queue(SIMPLE_QUEUE, true, false, false);
}
}

编写消息生产者

生产者的核心作用是 “创建消息→指定交换机 / 队列→发送到 RabbitMQ”

编写消息生产者的流程如下

  • 创建 RabbitMQ 核心组件(交换机、队列、绑定关系)
    • 这个看情况,不一定就写到这里,一般情况下,会写到配置类中,在配置类中声明并且绑定会更清晰
  • 编写生产者代码,发送消息
    • 编写生产者的消息逻辑,然后通过 RabbitTemplate(Spring AMQP 提供的工具类)发送消息,支持发送字符串、对象(需序列化)等类型
    • 若发送对象,默认使用 JDK 序列化(需实现 Serializable),推荐配置 JSON 序列化
  • 之后就是在别的地方调用生产者发送消息

其实详细的编写步骤就是

  1. 创建连接工厂 ConnectionFactory
  2. 通过连接工厂创建连接 Connection
  3. 通过连接获取通道 Channel
  4. 通过通道声明队列 Queue
  5. 发送消息到队列 Queue 中

但是代码里确实没有显式地看到这些步骤。这正是 Spring Boot 和 Spring AMQP 的强大之处——它通过自动配置(Auto-Configuration)和模板化编程(Template Pattern)帮你封装了所有这些底层细节。

Spring 在读取到你yaml或者properties的配置文件的时候,自动创建了一个配置好的 CachingConnectionFactory

生产者编写注入 RabbitTemplate,这是 Spring AMQP 提供的用于发送和接收消息的核心工具类。

RabbitTemplate (生产者) 和 @RabbitListener (消费者) 内部都持有一个 ConnectionFactory

当需要发送消息时,RabbitTemplate 会从 ConnectionFactory 获取一个 Connection,然后从 Connection 创建一个 Channel,用这个 Channel 来执行 AMQP 命令(如 basicPublish)。完成后,Spring 通常会复用这个 Channel(得益于 CachingConnectionFactory)而不是关闭它,这是高性能的关键

在简单模式下,exchange 参数为空字符串 (""),这代表使用 默认的匿名交换机 (Default Exchange)。默认交换机会将消息路由到指定的 routingKey 同名的队列中。所以这里 routingKey 就是你的队列名 SIMPLE_QUEUE

rabbitTemplate.convertAndSend() 方法中的 convert 部分,会自动将你的 OrderMessage Java 对象通过 MessageConverter(默认是 SimpleMessageConverter,但常配置为 Jackson2JsonMessageConverter)转换成字节数组进行传输。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import edu.software.ergoutree.rabbitmqmessgae.config.SimpleQueueConfig;
import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

/**
* 简单队列消息生产者
* 简单模式下,消息直接发送到队列,不需要交换机
*/
@Component
@RequiredArgsConstructor
@Slf4j
public class SimpleQueueProducer {

private final RabbitTemplate rabbitTemplate;

/**
* 发送消息到简单队列
* @param message 订单消息
*/
public void sendMessage(OrderMessage message) {
log.info("发送简单队列消息: {}", message);
// 简单模式下,routingKey就是队列名称,exchange参数为空字符串
rabbitTemplate.convertAndSend("", SimpleQueueConfig.SIMPLE_QUEUE, message);
}
}

加入按照原来 RabbitMQ 的思路去编写,代码就会很长,但是每一步会很清晰

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class NativeProducer {
private final static String QUEUE_NAME = "ergou.simple.queue";

public static void main(String[] argv) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");

// 2. 通过连接工厂创建连接
// 3. 通过连接创建通道
// 这里使用 try-with-resources 语法自动关闭连接和通道
try (Connection connection = factory.newConnection();
Channel channel = connection.createChannel()) {

// 4. 通过通道声明队列 (参数要和你的Spring配置一致!)
// queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete, Map<String, Object> arguments)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);

// 准备消息
String message = "Hello, Native RabbitMQ!";

// 5. 发送消息到队列
// basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body)
channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));

System.out.println(" [x] Sent '" + message + "'");
}
}
}

那么 Config 类如下,使用 @Configuration 表明这是一个配置类。

  • 当你使用 @Bean 在配置类中声明 Queue, Exchange, Binding 时,Spring 在应用启动时,会自动用当前的 ConnectionChannel 去 RabbitMQ 服务器上声明这些资源。这就是为什么你不需要手动写代码去 channel.queueDeclare()
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/**
* RabbitMQ简单队列配置
* 简单模式(Simple Queue)是最基础的消息模式,一个生产者对应一个消费者
*/
@Configuration
public class SimpleQueueConfig {

// 定义简单队列名称
public static final String SIMPLE_QUEUE = "ergou.simple.queue";

/**
* 创建简单队列
* 简单队列不需要交换机,消息直接发送到队列
*/
@Bean
public Queue simpleQueue() {
// 参数说明:
// 1. name: 队列名称
// 2. durable: 是否持久化,true表示RabbitMQ重启后队列仍然存在
// 3. exclusive: 是否排他性,true表示仅限于声明它的连接可见,连接关闭后队列删除
// 4. autoDelete: 是否自动删除,true表示当最后一个消费者断开连接后队列自动删除
return new Queue(SIMPLE_QUEUE, true, false, false);
}
}

编写消息消费者

消费者的核心作用是 “监听指定队列→接收消息→处理业务逻辑”,步骤更简洁,主要通过注解实现监听:

  • 编写消费者代码,监听队列
    • 使用 @RabbitListener 注解指定监听的队列名,Spring 会自动创建消费者并监听队列,消息到达时触发 @RabbitHandler 方法处理:
  • 然后根据业务需求,可添加消息确认、重试、异常处理等配置

通过 RabbitTemplate发送消息,支持发送字符串、对象(需序列化)等类型

消费者中使用 @RabbitListener(queues = ...) 注解来监听指定的队列。一旦队列中有消息,receiveMessage 方法就会被自动触发。

方法参数 OrderMessage message 体现了 Spring AMQP 强大的消息自动转换能力,它会自动将消息体(默认是 JSON 格式)反序列化成 OrderMessage 对象。

详细的步骤就是

  1. 创建连接工厂 ConnectionFactory
  2. 通过连接工厂创建连接 Connection
  3. 通过连接获取通道 Channel
  4. 通过通道接收消息

还是,但是代码里确实没有显式地看到这些步骤。是因为还是RabbitTemplate自动管理了ConnectionChannel,对于消费者,RabbitListenerContainerFactory 会在后台创建并管理一个或多个 Channel,用于持续地监听队列。它负责处理 Channel 的生命周期、消息确认等所有复杂逻辑。自动声明队列和交换机的逻辑也和上面在生产者说的一样。而在消息转换中,消费者端的 @RabbitListener 会自动执行反向操作,将字节数组转换回 OrderMessage 对象。

这些我只是在这里简单说一下,具体内容在源码中分析

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
package edu.software.ergoutree.rabbitmqmessgae.consumer;

import edu.software.ergoutree.rabbitmqmessgae.config.SimpleQueueConfig;
import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

/**
* 简单队列消息消费者
* 使用@RabbitListener注解监听队列
*/
@Component
@Slf4j
public class SimpleQueueConsumer {

/**
* 监听简单队列的消息
* @param message 接收到的订单消息
*/
@RabbitListener(queues = SimpleQueueConfig.SIMPLE_QUEUE)
public void receiveMessage(OrderMessage message) {
log.info("简单队列接收到消息: {}", message);

// 处理订单消息的业务逻辑
log.info("处理订单: {}, 产品: {}, 数量: {}",
message.getOrderNo(),
message.getProductName(),
message.getQuantity());

// 模拟处理延时
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}

log.info("简单队列订单处理完成: {}", message.getOrderNo());
}
}

那么使用原生消费者进行代码设计就是如下了,很清晰的涉及到了每个步骤

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import com.rabbitmq.client.*;
import java.io.IOException;

public class NativeConsumer {
private final static String QUEUE_NAME = "ergou.simple.queue";

public static void main(String[] argv) throws Exception {
// 1. 创建连接工厂
ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
factory.setUsername("guest");
factory.setPassword("guest");

// 2. 创建连接
// 3. 创建通道
// 消费者通常需要长连接,所以不放在try-with-resources中
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

// 4. 声明队列 (确保队列存在)
channel.queueDeclare(QUEUE_NAME, true, false, false, null);
System.out.println(" [*] Waiting for messages. To exit press CTRL+C");

// 5. 创建回调对象,用于接收消息
DeliverCallback deliverCallback = (consumerTag, delivery) -> {
String message = new String(delivery.getBody(), "UTF-8");
System.out.println(" [x] Received '" + message + "'");
// 模拟处理业务
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
System.out.println(" [x] Done");
};

// 6. 告诉通道开始消费队列中的消息
// basicConsume(String queue, boolean autoAck, DeliverCallback callback, CancelCallback cancelCallback)
channel.basicConsume(QUEUE_NAME, true, deliverCallback, consumerTag -> { });
}
}

然后我们创建一个控制器用来发送消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
package edu.software.ergoutree.rabbitmqmessgae.controller;

import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage;
import edu.software.ergoutree.rabbitmqmessgae.producer.MessageProducer;
import edu.software.ergoutree.rabbitmqmessgae.producer.SimpleQueueProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

/**
* 简单队列消息控制器
* 提供REST API接口发送简单队列消息
*/
@RestController
@RequestMapping("/api/simple")
@RequiredArgsConstructor
public class SimpleQueueController {

private final SimpleQueueProducer simpleQueueProducer;
private final MessageProducer messageProducer;

/**
* 发送简单队列消息
* @param orderId 订单ID
* @param productName 产品名称
* @return 响应结果
*/
@PostMapping("/send")
public Map<String, Object> sendSimpleMessage(@RequestParam Long orderId,
@RequestParam String productName) {
// 创建订单消息
OrderMessage orderMessage = messageProducer.createSampleOrder(orderId, productName);

// 发送到简单队列
simpleQueueProducer.sendMessage(orderMessage);

// 返回结果
Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "简单队列消息发送成功");
result.put("data", orderMessage);
return result;
}
}

运行测试

运行程序,观察消息在rabbitmq-server服务中的过程

首先启动生产者,可以看到创建了一个队列,进行发送可以看到日志打印了相关信息image-20250909105301994

RabbitMQ工作模式实践

工作模式在我们之前提到的就是一个生产者、多个消费者、一个队列,一条消息仅被一个消费者处理

image-20250909105530026它的主要思想是避免排队等待,避免一个消息处理时间过久而无法处理下一个的问题。因此相比简单模式可以有多个消费者,原理就是我们把任务封装为消息并将其发送到队列中,这多个消费者可以一起处理队列中的任务。

RabbitMQ 中的工作模式默认采用轮询的方式,RabbitMQ 将消息平均分配给所有消费者,也就是如果有两个消费者的话,消息逐一分给每个消费者进行消费。接下来我们来用 Java 代码实现一下 Work Queues工作模式,来测试其轮训消费的功能。

消费者需手动确认消息auto-ack=false),处理完一条消息后才会接收下一条(避免消息堆积)。

config 类

工作模式不需要我们特意去往配置文件写什么内容

工作模式和简单模式类似,不需要额外的交换机(用默认匿名交换机即可),核心是声明一个 “供多个消费者竞争的队列”。配置类的作用是在 Spring 启动时,自动向 RabbitMQ 服务器声明队列资源,避免手动调用 channel.queueDeclare()

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package edu.software.ergoutree.rabbitmqmessgae.config;

import org.springframework.amqp.core.Queue;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
* 工作队列模式配置
* 工作队列模式(Work Queue)是一个生产者对应多个消费者,但每个消息只会被一个消费者获取
*/
@Configuration
public class WorkQueueConfig {

// 定义工作队列名称
public static final String WORK_QUEUE = "ergou.work.queue";

/**
* 创建工作队列
* 工作队列也不需要交换机,消息直接发送到队列
*/
@Bean
public Queue workQueue() {
return new Queue(WORK_QUEUE, // 队列名称
true, // durable: 是否持久化
false, // exclusive: 是否排他
false); // autoDelete: 是否自动删除
}
}

分发耗时任务给多个工作者(消费者),以实现并行处理和负载均衡。一个队列可以被多个消费者同时监听。每条消息只能被一个消费者获取和处理。消费者之间是竞争关系。

生产者类

工作模式的生产者逻辑和简单模式几乎一致 —— 都是向指定队列发送消息,区别仅在于 “队列名称用 WORK_QUEUE”。代码中复用了 MessageProducer 类(同时支持直接 / 主题 / 扇形模式),重点 在于“如何发送工作队列消息”

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package edu.software.ergoutree.rabbitmqmessgae.producer;

import edu.software.ergoutree.rabbitmqmessgae.config.RabbitMQConfig;
import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage;
import lombok.RequiredArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.stereotype.Component;

import java.time.LocalDateTime;

@Component
@RequiredArgsConstructor
@Slf4j
public class MessageProducer {

private final RabbitTemplate rabbitTemplate;

// 发送直接消息
public void sendDirectMessage(OrderMessage message) {
log.info("发送直接消息: {}", message);
rabbitTemplate.convertAndSend(RabbitMQConfig.DIRECT_EXCHANGE,
RabbitMQConfig.DIRECT_ROUTING_KEY,
message);
}

// 发送主题消息
public void sendTopicMessage(OrderMessage message, String routingKey) {
log.info("发送主题消息: {}, 路由键: {}", message, routingKey);
rabbitTemplate.convertAndSend(RabbitMQConfig.TOPIC_EXCHANGE,
routingKey,
message);
}

// 发送扇形消息
public void sendFanoutMessage(OrderMessage message) {
log.info("发送扇形消息: {}", message);
rabbitTemplate.convertAndSend(RabbitMQConfig.FANOUT_EXCHANGE,
"",
message);
}

// 创建示例订单消息
public OrderMessage createSampleOrder(Long orderId, String productName) {
OrderMessage orderMessage = new OrderMessage();
orderMessage.setOrderId(orderId);
orderMessage.setOrderNo("ORDER-" + System.currentTimeMillis());
orderMessage.setProductName(productName);
orderMessage.setPrice(Math.random() * 100);
orderMessage.setQuantity((int)(Math.random() * 10) + 1);
orderMessage.setCreateTime(LocalDateTime.now());
return orderMessage;
}
}

生产者的代码编写需要注意如下的逻辑:

  • 依赖 RabbitTemplate:Spring 自动根据 application.properties 中的配置(host、port、username 等)初始化连接工厂,无需手动创建 ConnectionChannel
  • 消息发送:通过 convertAndSend 方法,将 OrderMessage 对象自动序列化为字节数组(默认 JDK 序列化,建议在配置类中替换为 JSON 序列化,避免序列化异常);
  • 路由逻辑:因使用默认匿名交换机,routingKey 必须等于队列名 WORK_QUEUE,否则消息会丢失。

消费者类

工作模式的核心是 “多消费者竞争消息”,所以需要编写多个消费者实例(或同一个消费者类的多个 Bean),同时监听 WORK_QUEUE 队列。代码中通过 @RabbitListener 注解指定监听的队列,Spring 会自动创建消费者线程,消息到达时触发方法处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package edu.software.ergoutree.rabbitmqmessgae.consumer;

import edu.software.ergoutree.rabbitmqmessgae.config.RabbitMQConfig;
import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class MessageConsumer {

// 监听直接队列
@RabbitListener(queues = RabbitMQConfig.DIRECT_QUEUE)
public void receiveDirectMessage(OrderMessage message) {
log.info("直接队列接收到消息: {}", message);
// 处理订单消息的业务逻辑
processOrder(message);
}

// 监听主题队列1
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_1)
public void receiveTopicMessage1(OrderMessage message) {
log.info("主题队列1接收到消息: {}", message);
// 处理订单消息的业务逻辑
processOrder(message);
}

// 监听主题队列2
@RabbitListener(queues = RabbitMQConfig.TOPIC_QUEUE_2)
public void receiveTopicMessage2(OrderMessage message) {
log.info("主题队列2接收到消息: {}", message);
// 处理订单消息的业务逻辑
processOrder(message);
}

// 监听扇形队列1
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_1)
public void receiveFanoutMessage1(OrderMessage message) {
log.info("扇形队列1接收到消息: {}", message);
// 处理订单消息的业务逻辑
processOrder(message);
}

// 监听扇形队列2
@RabbitListener(queues = RabbitMQConfig.FANOUT_QUEUE_2)
public void receiveFanoutMessage2(OrderMessage message) {
log.info("扇形队列2接收到消息: {}", message);
// 处理订单消息的业务逻辑
processOrder(message);
}

// 处理订单消息
private void processOrder(OrderMessage message) {
log.info("处理订单: {}, 产品: {}, 数量: {}",
message.getOrderNo(),
message.getProductName(),
message.getQuantity());
// 模拟处理延时
try {
Thread.sleep(500);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
log.info("订单处理完成: {}", message.getOrderNo());
}
}

解释一下:

  • 多消费者监听同一队列:receiveWorkMessage1receiveWorkMessage2 都监听 WORK_QUEUE,Spring 会为每个方法创建独立的消费者线程,实现 “竞争消费”;
  • 和上面一样,方法参数 OrderMessage message 无需手动解析 ——Spring 会将 RabbitMQ 中的字节数组消息,自动反序列化为 OrderMessage 对象
  • 手动确认消息(默认配置):Spring AMQP 默认开启手动确认,只有当 processOrder 方法执行完(无异常),才会向 RabbitMQ 发送 “消息确认”,避免消息在处理中丢失;
  • 轮询分配消息:RabbitMQ 会将队列中的消息 “平均分配” 给每个消费者,即使某个消费者处理速度慢,也不会多分配

控制者类

为了方便测试(比如用 Postman 调用),编写 WorkQueueController 提供 REST API,接收前端参数后,调用生产者发送消息到工作队列。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
package edu.software.ergoutree.rabbitmqmessgae.controller;

import edu.software.ergoutree.rabbitmqmessgae.model.OrderMessage;
import edu.software.ergoutree.rabbitmqmessgae.producer.MessageProducer;
import lombok.RequiredArgsConstructor;
import org.springframework.web.bind.annotation.*;

import java.util.HashMap;
import java.util.Map;

@RestController
@RequestMapping("/api/message")
@RequiredArgsConstructor
public class MessageController {

private final MessageProducer messageProducer;

// 发送直接消息
@PostMapping("/direct")
public Map<String, Object> sendDirectMessage(@RequestParam Long orderId,
@RequestParam String productName) {
OrderMessage orderMessage = messageProducer.createSampleOrder(orderId, productName);
messageProducer.sendDirectMessage(orderMessage);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "直接消息发送成功");
result.put("data", orderMessage);
return result;
}

// 发送主题消息
@PostMapping("/topic")
public Map<String, Object> sendTopicMessage(@RequestParam Long orderId,
@RequestParam String productName,
@RequestParam String routingKey) {
OrderMessage orderMessage = messageProducer.createSampleOrder(orderId, productName);
messageProducer.sendTopicMessage(orderMessage, routingKey);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "主题消息发送成功");
result.put("data", orderMessage);
result.put("routingKey", routingKey);
return result;
}

// 发送扇形消息
@PostMapping("/fanout")
public Map<String, Object> sendFanoutMessage(@RequestParam Long orderId,
@RequestParam String productName) {
OrderMessage orderMessage = messageProducer.createSampleOrder(orderId, productName);
messageProducer.sendFanoutMessage(orderMessage);

Map<String, Object> result = new HashMap<>();
result.put("success", true);
result.put("message", "扇形消息发送成功");
result.put("data", orderMessage);
return result;
}
}

进行测试

用 Postman 或 curl 调用接口,连续发送多条消息

查看项目运行日志,会发现消息被平均分配给两个消费者:

image-20250909152818933

可以发现策略还是轮询,消息被轮询发送到两个工作队列消费者

image-20250909153118617

剩下三个模式类似,我在这里不再演示了

RPC不咋用吧,我是没写过好像