微服务项目内如何集成 Kafka

和其他组件要考虑的事情一样,还是涉及到在哪用,用给谁,如何配置的这三个问题

在 Spring Cloud 微服务项目中集成 Kafka,主要涉及生产者(发送消息)和消费者(接收消息)的实现,以及与 Spring Cloud 生态的适配(如服务发现、配置中心等)

Spring创建了一个项目Spring-kafka,封装了Apache 的Kafka-client,用于在Spring项目里快速集成kafka。

依赖如下

1
2
3
4
5
6
7
8
9
10
11
<!-- Spring Kafka核心依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

<!-- 可选:Spring Cloud Stream(如需通过Stream简化集成) -->
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-starter-stream-kafka</artifactId>
</dependency>

然后这时候,不要忘记打开你的 Kafka,记录 Broker 地址(localhost:9092),然后在application.yml(或bootstrap.yml,结合配置中心)中配置 Kafka 连接信息,常用的配置是这样的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
spring:
kafka:
# Kafka Broker地址(多节点用逗号分隔)
bootstrap-servers: localhost:9092

# 生产者配置
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer # 键序列化器
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer # 值序列化器(JSON格式)
acks: 1 # 消息确认机制(0:不确认;1:Leader确认;all:所有副本确认)
retries: 3 # 重试次数
batch-size: 16384 # 批处理大小(字节)
buffer-memory: 33554432 # 缓冲区大小(字节)

# 消费者配置
consumer:
group-id: service-order-consumer # 消费者组ID(同一组内消息只被消费一次)
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest # 偏移量重置策略(earliest/latest/none)
enable-auto-commit: false # 关闭自动提交偏移量(建议手动提交)

# 监听器配置(消费者监听相关)
listener:
ack-mode: manual_immediate # 手动提交偏移量(立即生效)
concurrency: 3 # 消费者并发数(建议小于等于分区数)

这里要注意,若使用 Spring Cloud Config/Nacos 配置中心,可将bootstrap-servers等配置抽离到配置中心,避免硬编码。因为消息队列的信息还是比较敏感的。

然后通过KafkaTemplate发送消息,支持同步 / 异步发送,发送了如何接受呢?通过@KafkaListener注解监听指定主题,处理消息。

这是最基本的使用,发送消息时注入一个KafkaTemplate,接收消息时添加一个@KafkaListener注解即可。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/**
* @author: kl @kailing.pub
* @date: 2019/5/30
*/
@SpringBootApplication
@RestController
public class Application {

private final Logger logger = LoggerFactory.getLogger(Application.class);

public static void main(String[] args) {
SpringApplication.run(Application.class, args);
}

@Autowired
private KafkaTemplate<Object, Object> template;

@GetMapping("/send/{input}")
public void sendFoo(@PathVariable String input) {
this.template.send("topic_input", input);
}
@KafkaListener(id = "webGroup", topics = "topic_input")
public void listen(String input) {
logger.info("input value: {}" , input);
}
}

如果出现处理消费失败的消息,想要进行处理,可以配置死信主题(如order-create-topic.DLT),通过@KafkaListener监听死信主题进行重试或人工处理。

之后我们就用实际项目中如何组合进这些内容来分析

Spring Cloud 微服务生态中集成 Kafka

添加依赖

1
2
3
4
5
<!-- Spring Kafka:Kafka集成核心依赖 -->
<dependency>
<groupId>org.springframework.kafka</groupId>
<artifactId>spring-kafka</artifactId>
</dependency>

编写配置文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
spring:
application:
name: kafka-message

# Kafka 配置
kafka:
# Kafka 服务器地址(如果是集群,可以配置多个,用逗号分隔)
bootstrap-servers: localhost:9092

# 生产者配置
producer:
# 消息重试次数(发送失败时的重试次数)
retries: 3
# 批量发送大小(当消息累积到这个大小时一起发送,提高性能)
batch-size: 16384
# 缓冲区大小(生产者可以用来缓冲等待发送到服务器的记录的总内存字节)
buffer-memory: 33554432
# Key 序列化器(将Key对象转换为字节)
key-serializer: org.apache.kafka.common.serialization.StringSerializer
# Value 序列化器(将Value对象转换为字节)
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
# ACK 确认级别(all/-1: 等待所有副本确认,1: 只等待leader确认,0: 不等待确认)
acks: all
# 消息压缩类型(可选:none, gzip, snappy, lz4, zstd)
compression-type: gzip

# 消费者配置
consumer:
# 消费者组ID(相同组ID的消费者会分摊消息,不同组ID会各自消费所有消息)
group-id: kafka-message-group
# 自动提交offset开关
enable-auto-commit: true
# 自动提交offset的时间间隔
auto-commit-interval: 1000
# Key 反序列化器
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
# Value 反序列化器
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
# 当没有初始offset或offset失效时的策略
# latest: 从最新的消息开始消费
# earliest: 从最早的消息开始消费
# none: 抛出异常
auto-offset-reset: earliest
# 配置信任的包(JSON反序列化时需要)
properties:
spring.json.trusted.packages: "*"
# 每次poll操作最多拉取的记录数
max.poll.records: 500

# 监听器配置
listener:
# 监听器类型:single(单条消息)、batch(批量消息)
type: single
# ACK模式
# RECORD: 每处理一条记录提交一次
# BATCH: 处理完一批记录后提交
# MANUAL: 手动提交
# MANUAL_IMMEDIATE: 手动立即提交
ack-mode: batch
# 并发数(消费者线程数)
concurrency: 3

# 服务器端口
server:
port: 8085

# 日志配置
logging:
level:
root: INFO
org.springframework.kafka: INFO
hbnu.project.kafkamessage: DEBUG

其中涉及到的很多重要的参数,在我们之前讲工作原理的时候就提到这些分别是什么了

定义配置类

在这里我定义了一个配置类用于创建主题并且配置参数,这里的核心组件是 KafkaAdmin

默认情况下,如果在使用KafkaTemplate发送消息时,Topic不存在,会创建一个新的Topic,默认的分区数和副本数为如下Broker参数来设定

如果Kafka Broker支持(1.0.0或更高版本),则如果发现现有Topic的Partition 数少于设置的Partition 数,则会新增新的Partition分区。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
/*
* Kafka Topic 配置类
*
* 作用:
* 1. 自动创建Kafka主题(Topic)
* 2. 配置主题的分区数、副本数等参数
*/
@Configuration
public class KafkaTopicConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

/**
* 创建KafkaAdmin Bean
* KafkaAdmin负责创建、修改、删除主题等管理操作
*/
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
return new KafkaAdmin(configs);
}

/**
* 创建一个简单的主题 - 用于字符串消息
*
* @return NewTopic 配置
*
* 参数说明:
* - name: 主题名称
* - partitions(3): 分区数为3,意味着消息会被分配到3个分区中
* - replicas(1): 副本数为1,生产环境建议至少2-3个副本
*/
@Bean
public NewTopic simpleMessageTopic() {
return TopicBuilder.name("simple-message-topic")
.partitions(3)
.replicas(1)
.build();
}

/**
* 创建一个用于对象消息的主题
* 使用compact清理策略,适合存储用户状态等需要保留最新值的场景
*/
@Bean
public NewTopic userMessageTopic() {
return TopicBuilder.name("user-message-topic")
.partitions(3)
.replicas(1)
// 配置压缩策略:delete(删除旧消息)或compact(保留每个key的最新值)
.config("cleanup.policy", "delete")
// 消息保留时间(毫秒),这里设置为7天
.config("retention.ms", "604800000")
.build();
}

/**
* 创建一个用于订单消息的主题
* 分区数更多,适合高吞吐量场景
*/
@Bean
public NewTopic orderMessageTopic() {
return TopicBuilder.name("order-message-topic")
.partitions(5)
.replicas(1)
.build();
}

/**
* 创建一个用于批量消息的主题
*/
@Bean
public NewTopic batchMessageTopic() {
return TopicBuilder.name("batch-message-topic")
.partitions(3)
.replicas(1)
.build();
}
}


关于KafkaAdmin有几个常用的用法如下:

  • setFatalIfBrokerNotAvailable(true):默认这个值是False的,在Broker不可用时,不影响Spring 上下文的初始化。如果你觉得Broker不可用影响正常业务需要显示的将这个值设置为True
  • setAutoCreate(false) : 默认值为True,也就是 Kafka 实例化后会自动创建已经实例化的 NewTopic 对象
  • initialize():setAutoCreate为false时,需要我们程序显示的调用admin的initialize()方法来初始化NewTopic对象

定义实体类

先定义信息的实体,订单消息实体类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package hbnu.project.kafkamessage.model;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

import java.io.Serializable;
import java.math.BigDecimal;
import java.time.LocalDateTime;

/**
* 订单消息实体类
*/
@Data
@NoArgsConstructor
@AllArgsConstructor
public class OrderMessage implements Serializable {

/**
* 订单ID
*/
private Long orderId;

/**
* 用户ID
*/
private Long userId;

/**
* 产品ID
*/
private Long productId;

/**
* 产品名称
*/
private String productName;

/**
* 订单金额
*/
private BigDecimal amount;

/**
* 订单数量
*/
private Integer quantity;

/**
* 订单状态(CREATED, PAID, SHIPPED, COMPLETED, CANCELLED)
*/
private String status;

/**
* 订单创建时间
*/
private LocalDateTime createTime;
}

然后在实际项目中一般情况下我们还需要传递用户消息,定义其实体类,

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Data
@NoArgsConstructor
@AllArgsConstructor
public class UserMessage implements Serializable {

/**
* 用户ID
*/
private Long userId;

/**
* 用户名
*/
private String username;

/**
* 邮箱
*/
private String email;

/**
* 操作类型(CREATE, UPDATE, DELETE等)
*/
private String operation;

/**
* 消息创建时间
*/
private LocalDateTime timestamp;

/**
* 附加信息
*/
private String extraInfo;
}

编写生产者

我们需要编写一个生产者服务的实现,它的目的是封装了多种消息发送方式,适配不同业务场景

先看代码,下面再讲

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
@Slf4j
@Service
public class KafkaProducerService {

@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

/**
* 发送简单的字符串消息
*
* @param topic 主题名称
* @param message 消息内容
*
* 特点:
* - 最简单的发送方式
* - 异步发送,不等待响应
*/
public void sendSimpleMessage(String topic, String message) {
log.info("发送简单消息到主题 [{}]: {}", topic, message);
kafkaTemplate.send(topic, message);
}

/**
* 发送带Key的消息
*
* @param topic 主题名称
* @param key 消息键
* @param message 消息内容
*
* Key的作用:
* 1. 相同Key的消息会被发送到同一个分区,保证顺序性
* 2. 可以用于消息分组和路由
* 3. Compact清理策略下,相同Key只保留最新值
*/
public void sendMessageWithKey(String topic, String key, String message) {
log.info("发送消息到主题 [{}],Key: {}, 内容: {}", topic, key, message);
kafkaTemplate.send(topic, key, message);
}

/**
* 发送用户消息(对象)并处理回调
*
* @param userMessage 用户消息对象
*
* 异步回调说明:
* - whenComplete: 无论成功或失败都会执行
* - 第一个参数:SendResult(发送成功时的结果)
* - 第二个参数:Throwable(发送失败时的异常)
*/
public void sendUserMessage(UserMessage userMessage) {
log.info("发送用户消息: {}", userMessage);

CompletableFuture<SendResult<String, Object>> future =
kafkaTemplate.send("user-message-topic",
String.valueOf(userMessage.getUserId()),
userMessage);

// 异步回调处理
future.whenComplete((result, ex) -> {
if (ex == null) {
// 发送成功
log.info("用户消息发送成功! Topic: {}, Partition: {}, Offset: {}",
result.getRecordMetadata().topic(),
result.getRecordMetadata().partition(),
result.getRecordMetadata().offset());
} else {
// 发送失败
log.error("用户消息发送失败: {}", ex.getMessage());
}
});
}

/**
* 同步发送订单消息
*
* @param orderMessage 订单消息对象
* @return 是否发送成功
*
* 同步发送特点:
* - 调用get()方法会阻塞等待结果
* - 适合需要确认消息已发送的场景
* - 会降低吞吐量,但提高可靠性
*/
public boolean sendOrderMessageSync(OrderMessage orderMessage) {
log.info("同步发送订单消息: {}", orderMessage);

try {
SendResult<String, Object> result = kafkaTemplate
.send("order-message-topic",
String.valueOf(orderMessage.getOrderId()),
orderMessage)
.get(); // 同步等待结果

log.info("订单消息发送成功! Offset: {}", result.getRecordMetadata().offset());
return true;
} catch (Exception e) {
log.error("订单消息发送失败: {}", e.getMessage());
return false;
}
}

/**
* 发送消息到指定分区
*
* @param topic 主题名称
* @param partition 分区编号
* @param key 消息键
* @param message 消息内容
*
* 适用场景:
* - 需要精确控制消息发送到哪个分区
* - 实现自定义的负载均衡策略
*/
public void sendToPartition(String topic, Integer partition, String key, Object message) {
log.info("发送消息到主题 [{}] 的分区 [{}]", topic, partition);
kafkaTemplate.send(topic, partition, key, message);
}

/**
* 批量发送消息
*
* @param topic 主题名称
* @param messages 消息列表
*
* 注意:
* - 虽然是循环发送,但Kafka内部会进行批处理优化
* - 可以通过配置batch-size来调整批处理大小
*/
public void sendBatchMessages(String topic, List<String> messages) {
log.info("批量发送 {} 条消息到主题 [{}]", messages.size(), topic);

messages.forEach(message -> {
kafkaTemplate.send(topic, message);
});

log.info("批量消息发送完成");
}

/**
* 发送消息并立即刷新
*
* flush()方法:
* - 强制发送缓冲区中的所有消息
* - 用于确保消息立即发送,不等待批处理
* - 会影响性能,谨慎使用
*/
public void sendAndFlush(String topic, String message) {
log.info("发送并刷新消息到主题 [{}]: {}", topic, message);
kafkaTemplate.send(topic, message);
kafkaTemplate.flush(); // 立即发送
log.info("消息已刷新发送");
}
}

首先,无论是发送消息还是接收消息,我们需要注入一个核心组件

KafkaTemplate,这是 Spring Kafka 提供的核心模板类,封装了 Kafka 生产者 API,简化了消息发送操作。Spring Boot 会自动配置KafkaTemplate,底层关联 Kafka 原生的Producer实例。

其中,KafkaTemplate<String, Object>代表

  • 泛型第一个参数String:消息的key类型(用于分区路由)
  • 泛型第二个参数Object:消息体类型(支持字符串、自定义对象等)

简单字符串的消息就使用了 send方法,这是最基础的异步发送,只指定主题和消息体,不处理回调。当对消息可靠性要求不高,如日志采集、监控数据上报等,会使用这个方法

1
kafkaTemplate.send(topic, message);

注意的是,Kafka 默认异步发送,消息先存入本地缓冲区,满足批处理条件(如达到batch-sizelinger.ms)后批量发送。这个 send 异步发送的方法拆解一下就是如下

1
2
3
4
5
6
7
8
9
10
11
template.send("","").addCallback(new ListenableFutureCallback<SendResult<Object, Object>>() {
@Override
public void onFailure(Throwable throwable) {
......
}

@Override
public void onSuccess(SendResult<Object, Object> objectObjectSendResult) {
....
}
});

发送一个带 key 的消息就指定参数 key,通过key控制消息分区路由,需要保证需要保证消息顺序性等使用,一般情况下,默认通过key.hashCode() % 分区数计算分区索引

1
kafkaTemplate.send(topic, key, message);

Kafka 的每条消息由两部分组成:keyvalue(消息体)。

  • value:实际的业务数据(如字符串、UserMessage对象等),是消费端主要处理的内容。
  • key:可选的元数据(默认null),通常是字符串或序列化后的对象,Kafka 内部会对其进行哈希计算,主要用于分区路由

上述不指定key(如sendSimpleMessage方法),keynull,Kafka 会采用轮询策略将消息分发到不同分区(尽可能均匀分配)。指定 key 就根据key的哈希值计算分区,确保相同key的消息进入同一分区。

key的核心作用是控制消息的分区路由,进而影响消息的顺序性、分组性和存储策略,相同key的消息会被路由到同一分区,相当于按key对消息进行 “分组”,便于消费端按组处理。而且,Kafka 的log.cleanup.policy=compact策略下,会保留相同key最新一条消息

那么这个 key ,一般情况下,我们用业务主键作为key,确保同一主体的消息路由到同一分区。

还可以指定分区发送,指定参数 partition,会忽略路由规则,一般情况下自定义负载均衡用

1
kafkaTemplate.send(topic, partition, key, message);

发送带回调的对象消息,sendUserMessage,它通过CompletableFuture异步回调处理发送结果(成功 / 失败)。

1
2
CompletableFuture<SendResult> future = kafkaTemplate.send(...);
future.whenComplete((result, ex) -> { ... });

它能不阻塞主线程,同时能感知消息是否发送成功,成功时获取消息的分区、偏移量(offset)等元数据,失败会触发重试,当需要发送自定义对象(如UserMessage)且需要确认发送结果的场景,如用户登录通知、消息推送等,一般情况下我们使用这种消息

而且,代码中发送UserMessageOrderMessage等自定义对象时,依赖配置中的value-serializer: JsonSerializer(默认使用 Jackson 序列化),要注意,消费者需配置对应的JsonDeserializer,并指定spring.json.trusted.packages(信任的包路径),避免反序列化失败。

那么还有一种同步发送消息(sendOrderMessageSync

1
SendResult result = kafkaTemplate.send(...).get(); // 阻塞等待结果

它通过get()方法阻塞当前线程,直到获取发送结果(成功 / 异常)。对消息可靠性要求极高,必须确认发送成功后再执行后续业务,如订单创建消息这种,需要用到这种的消息,注意,同步发送会降低吞吐量,性能不好

Kafka还能批量发送,循环调用单条发送,所以和单条的 send 一样,将多条消息缓存到缓冲区,满足batch-size(默认 16KB)或linger.ms(默认 0ms)时批量发送。

1
messages.forEach(message -> kafkaTemplate.send(topic, message));

发送并且立即刷新比较特殊,flush()方法会立即将缓冲区中的消息发送到 Kafka broker,不等待批处理条件。在需要消息立即送达的特殊场景(如实时告警通知)相当有用。但是频繁调用flush()会破坏批处理优化,导致性能下降,谨慎使用。

1
2
kafkaTemplate.send(topic, message);
kafkaTemplate.flush(); // 强制刷新缓冲区

我这边没开事务,若需保证 “本地事务与消息发送” 的原子性(如订单创建成功后才发送消息),可开启事务:

  • 配置transaction-id-prefix: tx-(指定事务 ID 前缀)。
  • 在业务方法上添加@Transactional,并使用kafkaTemplate.executeInTransaction(...)发送消息。

创建消费者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
/**
* Kafka 消费者服务
*
* 核心注解:
* - @KafkaListener: 标记方法为Kafka消息监听器
* - topics: 指定要监听的主题
* - groupId: 指定消费者组ID
*
* 消费者组(Consumer Group):
* - 同一组内的消费者共同消费一个主题,每条消息只被组内一个消费者消费
* - 不同组的消费者各自独立消费同一主题的所有消息
* - 用于实现消息的负载均衡和广播
*/
@Slf4j
@Service
public class KafkaConsumerService {

/**
* 监听简单字符串消息
*
* @param message 接收到的消息内容
*
* 最简单的消费方式:
* - 直接接收消息内容
* - 自动提交offset(根据配置)
*/
@KafkaListener(topics = "simple-message-topic", groupId = "simple-group")
public void consumeSimpleMessage(String message) {
log.info("====================================");
log.info("简单消息消费者接收到消息: {}", message);
log.info("====================================");

// 这里可以添加业务处理逻辑
processSimpleMessage(message);
}

/**
* 监听用户消息(对象)
*
* @param userMessage 自动反序列化的用户消息对象
*
* 特点:
* - Spring Kafka自动将JSON反序列化为对象
* - 需要配置JsonDeserializer
*/
@KafkaListener(topics = "user-message-topic", groupId = "user-group")
public void consumeUserMessage(UserMessage userMessage) {
log.info("====================================");
log.info("用户消息消费者接收到消息:");
log.info(" 用户ID: {}", userMessage.getUserId());
log.info(" 用户名: {}", userMessage.getUsername());
log.info(" 邮箱: {}", userMessage.getEmail());
log.info(" 操作类型: {}", userMessage.getOperation());
log.info(" 时间: {}", userMessage.getTimestamp());
log.info("====================================");

// 根据操作类型处理不同的业务逻辑
switch (userMessage.getOperation()) {
case "CREATE":
log.info("处理用户创建事件");
break;
case "UPDATE":
log.info("处理用户更新事件");
break;
case "DELETE":
log.info("处理用户删除事件");
break;
default:
log.warn("未知的操作类型: {}", userMessage.getOperation());
}
}

/**
* 监听订单消息 - 使用ConsumerRecord获取更多元数据
*
* @param record ConsumerRecord包含消息的所有信息
*
* ConsumerRecord提供的信息:
* - topic(): 主题名称
* - partition(): 分区编号
* - offset(): 消息偏移量
* - key(): 消息键
* - value(): 消息值
* - timestamp(): 消息时间戳
*/
@KafkaListener(topics = "order-message-topic", groupId = "order-group")
public void consumeOrderMessage(ConsumerRecord<String, OrderMessage> record) {
log.info("====================================");
log.info("订单消息消费者接收到消息:");
log.info(" 主题: {}", record.topic());
log.info(" 分区: {}", record.partition());
log.info(" 偏移量: {}", record.offset());
log.info(" Key: {}", record.key());

OrderMessage order = record.value();
log.info(" 订单ID: {}", order.getOrderId());
log.info(" 用户ID: {}", order.getUserId());
log.info(" 产品: {}", order.getProductName());
log.info(" 金额: {}", order.getAmount());
log.info(" 状态: {}", order.getStatus());
log.info("====================================");

// 根据订单状态处理不同逻辑
processOrder(order);
}

/**
* 使用@Payload@Header注解获取消息详情
*
* @param message 消息内容(@Payload
* @param key 消息键
* @param partition 分区编号
* @param offset 偏移量
* @param topic 主题名称
*
* 优点:
* - 可以选择性地获取需要的元数据
* - 代码更清晰,参数含义明确
*/
@KafkaListener(topics = "simple-message-topic", groupId = "detail-group")
public void consumeWithHeaders(
@Payload String message,
@Header(KafkaHeaders.RECEIVED_KEY) String key,
@Header(KafkaHeaders.RECEIVED_PARTITION) int partition,
@Header(KafkaHeaders.OFFSET) long offset,
@Header(KafkaHeaders.RECEIVED_TOPIC) String topic) {

log.info("详细信息消费:");
log.info(" Topic: {}, Partition: {}, Offset: {}", topic, partition, offset);
log.info(" Key: {}, Message: {}", key, message);
}

/**
* 批量消息消费
*
* @param messages 批量消息列表
*
* 配置要求:
* - 需要设置 spring.kafka.listener.type=batch
* - 或在@KafkaListener中设置 containerFactory
*
* 优点:
* - 提高吞吐量
* - 减少网络往返次数
* - 适合高并发场景
*/
@KafkaListener(topics = "batch-message-topic", groupId = "batch-group")
public void consumeBatchMessages(List<String> messages) {
log.info("====================================");
log.info("批量消息消费者接收到 {} 条消息", messages.size());

for (int i = 0; i < messages.size(); i++) {
log.info(" 消息 {}: {}", i + 1, messages.get(i));
}

log.info("====================================");

// 批量处理逻辑
batchProcess(messages);
}

/**
* 手动ACK确认模式
*
* @param message 消息内容
* @param acknowledgment 确认对象
*
* 使用场景:
* - 需要确保消息处理成功后再提交offset
* - 实现精确一次(exactly-once)语义
*
* 配置要求:
* - 需要设置 spring.kafka.listener.ack-mode=MANUAL
*/
@KafkaListener(
topics = "user-message-topic",
groupId = "manual-ack-group",
containerFactory = "kafkaListenerContainerFactory"
)
public void consumeWithManualAck(UserMessage message, Acknowledgment acknowledgment) {
try {
log.info("手动ACK消费者处理消息: {}", message);

// 处理业务逻辑
processUserMessage(message);

// 手动提交offset
acknowledgment.acknowledge();
log.info("消息处理成功,已提交offset");

} catch (Exception e) {
log.error("消息处理失败: {}", e.getMessage());
// 不调用acknowledge(),消息会重新消费
}
}

/**
* 监听特定分区
*
* @param message 消息内容
*
* TopicPartition说明:
* - 可以精确指定要监听的主题和分区
* - 可以指定起始offset
*
* 适用场景:
* - 需要消费特定分区的消息
* - 实现自定义的分区分配策略
*/
@KafkaListener(
groupId = "partition-group",
topicPartitions = @TopicPartition(
topic = "simple-message-topic",
partitions = {"0", "1"} // 只监听分区0和1
)
)
public void consumeFromSpecificPartitions(String message) {
log.info("特定分区消费者接收到消息: {}", message);
}

/**
* 从指定offset开始消费
*
* 适用场景:
* - 重新消费历史数据
* - 跳过某些消息
*/
@KafkaListener(
groupId = "offset-group",
topicPartitions = @TopicPartition(
topic = "simple-message-topic",
partitionOffsets = {
@PartitionOffset(partition = "0", initialOffset = "0"), // 分区0从offset 0开始
@PartitionOffset(partition = "1", initialOffset = "10") // 分区1从offset 10开始
}
)
)
public void consumeFromSpecificOffset(String message) {
log.info("从指定offset消费: {}", message);
}

// ==================== 私有业务处理方法 ====================

private void processSimpleMessage(String message) {
// 实现具体的业务逻辑
try {
// 模拟处理
Thread.sleep(100);
log.debug("简单消息处理完成");
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}

private void processUserMessage(UserMessage userMessage) {
// 实现用户消息的业务逻辑
log.debug("处理用户消息: {}", userMessage.getUserId());
}

private void processOrder(OrderMessage order) {
// 实现订单处理逻辑
log.debug("处理订单: {}", order.getOrderId());

// 根据订单状态执行不同操作
if ("CREATED".equals(order.getStatus())) {
// 新订单处理
log.info("新订单创建,发送通知");
} else if ("PAID".equals(order.getStatus())) {
// 支付完成处理
log.info("订单已支付,准备发货");
}
}

private void batchProcess(List<String> messages) {
// 批量处理逻辑
log.debug("批量处理 {} 条消息", messages.size());
}
}

编写消费者的时候,我们都通过@KafkaListener注解实现不同场景的消费逻辑,差异只不过是体现在 “消息格式”“元数据获取”“确认机制”“消费范围” 四个维度

@KafkaListener是 Spring Kafka 提供的核心注解,用于将方法标记为 Kafka 消息的消费者,实现 “监听指定主题并自动消费消息” 的功能

@KafkaListener包含多个参数

参数名 类型 作用 示例
topics String[] 指定要监听的主题(支持多个),最基础的配置 topics = {"order-topic", "user-topic"}
groupId String 指定消费者组 ID,同一组内消息仅被一个消费者消费 groupId = "order-service-consumer"
topicPartitions TopicPartition[] 精确配置监听的 “主题 + 分区”,支持指定 offset 见下文 “精确控制分区与 offset” 示例
containerFactory String 指定消费者容器工厂,用于自定义消费配置(如批量消费、重试) containerFactory = "batchKafkaListenerContainerFactory"
id String 消费者实例的唯一 ID,用于标识和管理消费者(默认自动生成) id = "order-consumer-01"
concurrency String 该消费者的并发线程数(局部配置,优先级高于全局listener.concurrency concurrency = "3"(3 个线程同时消费)
errorHandler String 指定异常处理器,处理消费过程中的异常(如转发死信队列) errorHandler = "kafkaErrorHandler"

它可以显示的指定消费哪些Topic和分区的消息,设置每个Topic以及分区初始化的偏移量,设置消费线程并发度,设置消息异常处理器

  • 基础字符串的消费consumeSimpleMessage

    • 直接接收字符串类型消息,自动反序列化(依赖StringDeserializer)。消费简单文本消息(如日志、通知文案),无需额外元数据。
    • 而且默认使用全局配置的ack-mode,如果消息体是 JSON 格式的字符串,那就使用对象消费了
  • 自定义对象消费consumeUserMessage

    • 这种情况是项目中最常用的方式,Spring Kafka 自动将 Kafka 中的 JSON 消息反序列化为UserMessage对象,无需手动解析,但是必须在application.yml中配置 JSON 反序列化器及信任包,否则会报安全异常

    • 这也是@KafkaListener最通用的用法,指定 “主题 + 消费者组”,适用于大多数普通消费场景

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      // 监听单个主题
      @KafkaListener(topics = "order-create-topic", groupId = "order-group")
      public void consumeOrderCreate(OrderMessage message) {
      log.info("消费订单创建消息:{}", message);
      }

      // 监听多个主题(用数组指定)
      @KafkaListener(topics = {"user-create-topic", "user-update-topic"}, groupId = "user-group")
      public void consumeUserEvents(UserMessage message) {
      log.info("消费用户事件消息:{}", message);
      }
  • 获取消息元数据consumeOrderMessage

    • 在监听自定义对象消费的时候,如果需要获取更多消息的元数据,可以使用ConsumerRecord,其泛型可以这样定义ConsumerRecord<String, 消息对象> record
    • 需要基于元数据做业务逻辑,或者单纯的日志审计的时候,都很有用
  • 选择性获取元数据consumeWithHeaders

    • 通过@Payload@Header注解,只获取需要的参数(消息体 + 指定元数据),代码更简洁。
    • 其中,@Payload:标记消息体(必须有,对应生产者发送的value)。
    • @Header(KafkaHeaders.XXX):从KafkaHeaders获取指定元数据,常用枚举包括:r
      • RECEIVED_KEY:消息 key
      • RECEIVED_PARTITION:分区号
      • OFFSET:消息 offset
      • RECEIVED_TOPIC:主题名
  • 批量消费consumeBatchMessages

    • 一次性接收多条消息(List<String>),减少单条消费的 IO 开销,提升吞吐量。但是全局开启批量消费模式,否则会报 “参数类型不匹配” 异常,或在@KafkaListener中设置 containerFactory

      1
      2
      3
      4
      5
      spring:
      kafka:
      listener:
      type: batch # 开启批量消费
      batch-listener: true # 兼容旧版本,部分Spring Kafka版本需配置

      还可以配置max-poll-records,控制每次拉取的最大消息数

    • 批量处理失败时,建议拆分单条重试(如循环遍历messages,单条处理 + 捕获异常,避免 “一条失败导致整批重试”)。

  • 手动 ACK 确认(consumeWithManualAck

    • 通过Acknowledgment对象手动提交 offset,确保 “消息处理成功后再确认”,避免消息丢失。其中需要配置containerFactory = "kafkaListenerContainerFactory"或者 spring.kafka.listener.ack-mode=MANUAL

      1
      2
      3
      4
      5
      spring:
      kafka:
      listener:
      ack-mode: MANUAL # 手动确认模式(MANUAL_IMMEDIATE:确认后立即提交,推荐)
      enable-auto-commit: false # 关闭自动提交

      在消费时,只需要在@KafkaListener监听方法的入参加入Acknowledgment 即可,执行到ack.acknowledge()代表提交了偏移量

      1
      2
      3
      4
      5
      6
      @KafkaListener(
      topics = "user-message-topic",
      groupId = "manual-ack-group",
      containerFactory = "kafkaListenerContainerFactory"
      )
      public void consumeWithManualAck(UserMessage message, Acknowledgment acknowledgment)

      如果消息消费成功,就会调用下面的acknowledgment.acknowledge()提交 offset,Kafka 标记该消息已消费。如果失败,不调用acknowledge(),Kafka 会在消费者重启或会话超时后,重新发送该消息,一般在这里结合 “死信队列”(DLQ),处理失败 3 次以上的消息转发到死信主题,避免无限重试。

    • 这里涉及到了@KafkaListener当需要差异化配置(如 “某个消费者用批量消费,其他用单条消费”)时,通过containerFactory指定自定义的容器工厂。

  • 监听特定分区(consumeFromSpecificPartitions

    • 通过@TopicPartition(partitions = {"0", "1"})指定只消费某个主题的特定分区,不参与默认的分区分配。在分区业务隔离时候非常有用,而且可以自定义负载均衡

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      @KafkaListener(
      groupId = "order-beijing-group",
      // 精确配置:监听order-topic的分区0和分区1
      topicPartitions = @TopicPartition(
      topic = "order-topic",
      partitions = {"0", "1"} // 分区编号,字符串格式
      )
      )
      public void consumeBeijingOrder(OrderMessage message) {
      log.info("消费北京地区订单(分区0/1):{}", message);
      }
  • 指定 offset 消费(consumeFromSpecificOffset

    需要重新消费历史数据(如 “回溯昨天的日志”)或跳过无效数据时,用@PartitionOffset指定每个分区的起始 Offset。

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    @KafkaListener(
    groupId = "log-backtrack-group",
    topicPartitions = @TopicPartition(
    topic = "log-topic",
    // 配置每个分区的起始Offset
    partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "1000"), // 分区0从Offset 1000开始
    @PartitionOffset(partition = "1", initialOffset = "2000") // 分区1从Offset 2000开始
    }
    )
    )
    public void backtrackLog(String logMessage) {
    log.info("回溯日志消息:{}", logMessage);
    }
    • 通过@PartitionOffset(initialOffset = "10")指定从某个 offset 开始消费,支持重新消费历史数据或跳过无效数据。
    • offset 规则:
      • initialOffset = "0":从分区的第一条消息开始消费(重新消费全量数据)。
      • initialOffset = "100":从 offset 100 开始消费(跳过前 100 条消息)。
    • 注意,该配置只在消费者组首次启动时生效;若已消费过(Kafka 保存了该组的 offset),需先删除 Kafka 中的消费组 offset,再重启消费者。

上述涉及到的消费者基础配置如下

配置项 推荐值 作用
spring.kafka.consumer.group-id 服务名 + 功能(如order-service-payment 避免不同服务共用一个组,导致消息被误消费
spring.kafka.listener.ack-mode MANUAL_IMMEDIATE 核心业务用手动确认,非核心可用AUTO
spring.kafka.consumer.auto-offset-reset latest(默认) 首次启动时,只消费启动后的新消息;需回溯用earliest
spring.kafka.listener.concurrency 等于主题分区数(如3 并发数≤分区数(超过分区数的并发会空闲),提升消费速度
spring.kafka.consumer.max-poll-records 100-500 批量消费时控制单次拉取量,避免 OOM

编写控制器

最后就是编写接口了,发送消息

这个真没啥好说的,正常写然后调用就行

就是注意,一般情况下,会在发送消息的时候,设置创建时间

测试

我们简单的测试一下

来看一下服务的健康情况

image-20251102144503511

要注意确保Kafka默认在9092运行

先测试简单消息的发送

image-20251102144438949

再测试带Key的消息

image-20251102144553033

相同Key的消息会发送到同一分区,便于保证顺序。

测试用户消息,也就是对象消息

image-20251102144737618

这是同步发送的消息,阻塞线程到直到获取发送结果

别忘了,手动 ACK 使用了 Acknowledgment 参数,需要application.yml 中的 ack-mode 是 MANUAL,若指定了 containerFactory = “kafkaListenerContainerFactory”,需要实现,我忘写了))

image-20251102145156914

批量消息测试一下

image-20251102145126008

可以看到消息发送到了指定的分区

image-20251102145315279
image-20251102145341260

草,忘设置 key 了,下面爆了很多异常