之前项目中没有提到的一些额外的内容

为Kafka配置事务消息

Kafka事务允许生产者将多条消息作为一个原子操作发送,要么全部成功,要么全部失败。这确保了数据的一致性。

默认情况下,Spring-kafka自动生成的KafkaTemplate实例,是不具有事务消息发送能力的。需要使用如下配置激活事务特性。事务激活后,所有的消息发送只能在发生事务的方法内执行了,不然就会抛一个没有事务交易的异常

1
spring.kafka.producer.transaction-id-prefix=kafka_tx.

事务有一个 Exactly-Once语义,即使重试,消息也只会被处理一次

添加事务的配置

在配置文件中添加事务的配置

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
spring:
application:
name: kafka-message

# Kafka 配置
kafka:
bootstrap-servers: localhost:9092

# 生产者配置
producer:
retries: 3
batch-size: 16384
buffer-memory: 33554432
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: org.springframework.kafka.support.serializer.JsonSerializer
acks: all
compression-type: gzip

# ========== 事务相关配置 ==========
# 事务ID前缀(必须配置,用于标识事务生产者)
# 每个生产者实例的事务ID必须唯一
transaction-id-prefix: kafka-tx-

# 其他事务相关属性
properties:
# 事务超时时间(毫秒),默认60秒
# 如果事务在此时间内未提交或回滚,broker会自动中止事务
transaction.timeout.ms: 60000

# 幂等性开启(事务模式下自动开启,但显式配置更清晰)
# 确保重试不会导致消息重复
enable.idempotence: true

# 最大飞行请求数(事务模式下必须设置为5或更小)
max.in.flight.requests.per.connection: 5

# 消费者配置
consumer:
group-id: kafka-message-group
enable-auto-commit: true
auto-commit-interval: 1000
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer
auto-offset-reset: earliest
properties:
spring.json.trusted.packages: "*"
max.poll.records: 500

# ========== 事务消费者配置 ==========
# 隔离级别:read_committed 只读取已提交的消息(事务消费者必须配置)
# read_uncommitted 可以读取未提交的消息(默认值)
isolation.level: read_committed

# 监听器配置
listener:
type: single
ack-mode: batch
concurrency: 3

# 服务器端口
server:
port: 8085

# 日志配置
logging:
level:
root: INFO
org.springframework.kafka: INFO
hbnu.project.kafkamessage: DEBUG
  • transaction-id-prefix:事务ID前缀,每个生产者实例会自动添加后缀形成唯一ID
  • enable.idempotence: true:开启幂等性,防止重试导致消息重复
  • isolation.level: read_committed:消费者只读取已提交的事务消息

然后我们需要一个事务的配置类

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
/**
* Kafka 事务配置类
*
* 核心概念:
* 1. KafkaTransactionManager:Spring的事务管理器,管理Kafka事务的生命周期
* 2. ProducerFactory:创建支持事务的Producer实例
* 3. @Transactional:使用Spring的声明式事务
*
* 事务工作流程:
* 1. beginTransaction():开始事务
* 2. send():发送消息(缓存在本地)
* 3. commitTransaction():提交事务(消息对消费者可见)
* 4. abortTransaction():回滚事务(消息被丢弃)
*/
@Configuration
@EnableKafka
@EnableTransactionManagement // 开启Spring事务管理
public class KafkaTransactionConfig {

@Value("${spring.kafka.bootstrap-servers}")
private String bootstrapServers;

@Value("${spring.kafka.producer.transaction-id-prefix}")
private String transactionIdPrefix;

/**
* 配置事务生产者工厂
*
* 关键配置:
* - TRANSACTIONAL_ID_CONFIG:事务ID,必须全局唯一
* - ENABLE_IDEMPOTENCE_CONFIG:开启幂等性
* - ACKS_CONFIG:必须设置为all,确保消息不丢失
* - MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION:最大飞行请求数,事务模式下<=5
*/
@Bean
public ProducerFactory<String, Object> transactionalProducerFactory() {
Map<String, Object> configProps = new HashMap<>();

// 基础配置
configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
org.springframework.kafka.support.serializer.JsonSerializer.class);

// 事务必需配置
configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix);
configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true);
configProps.put(ProducerConfig.ACKS_CONFIG, "all");

// 性能配置
configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5);
configProps.put(ProducerConfig.RETRIES_CONFIG, 3);
configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384);

// 超时配置
configProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000);

DefaultKafkaProducerFactory<String, Object> factory =
new DefaultKafkaProducerFactory<>(configProps);

return factory;
}

/**
* 配置事务KafkaTemplate
*
* 这个KafkaTemplate支持Spring的@Transactional注解
* 可以与其他Spring事务(如数据库事务)配合使用
*/
@Bean
public KafkaTemplate<String, Object> transactionalKafkaTemplate() {
return new KafkaTemplate<>(transactionalProducerFactory());
}

/**
* 配置Kafka事务管理器
*
* KafkaTransactionManager的作用:
* 1. 管理事务的生命周期(开始、提交、回滚)
* 2. 与Spring的@Transactional注解集成
* 3. 支持与其他事务管理器配合(如DataSourceTransactionManager)
*
* 注意:
* - 每个事务管理器管理一个ProducerFactory
* - 不同的事务操作需要不同的事务管理器实例
*/
@Bean
public KafkaTransactionManager<String, Object> kafkaTransactionManager() {
return new KafkaTransactionManager<>(transactionalProducerFactory());
}

/**
* 配置事务消费者工厂
*
* 关键配置:
* - ISOLATION_LEVEL_CONFIG:设置为read_committed,只读取已提交的消息
* - ENABLE_AUTO_COMMIT_CONFIG:手动提交offset
*/
@Bean
public ConsumerFactory<String, Object> transactionalConsumerFactory() {
Map<String, Object> props = new HashMap<>();

props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
props.put(ConsumerConfig.GROUP_ID_CONFIG, "transaction-group");
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
org.springframework.kafka.support.serializer.JsonDeserializer.class);

// 事务消费者必需配置
props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed");
props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

// JSON反序列化配置
props.put("spring.json.trusted.packages", "*");

return new DefaultKafkaConsumerFactory<>(props);
}

/**
* 配置事务消费者监听器容器工厂
*/
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> transactionalKafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(transactionalConsumerFactory());

// 手动ACK模式
factory.getContainerProperties().setAckMode(
org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL);

return factory;
}
}

创建事务生产者

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
/**
* Kafka 事务生产者服务
*
* 事务使用方式:
* 1. 声明式事务:使用@Transactional注解(推荐)
* 2. 编程式事务:使用executeInTransaction()方法
*
* 事务保证:
* - 多条消息要么全部发送成功,要么全部失败
* - 发生异常时自动回滚
* - 消费者只能看到已提交的消息
*/
@Slf4j
@Service
public class KafkaTransactionalProducerService {

@Autowired
private KafkaTemplate<String, Object> transactionalKafkaTemplate;

/**
* 方式一:使用@Transactional注解(声明式事务)
*
* 优点:
* - 代码简洁,自动管理事务
* - 异常自动回滚
* - 与Spring其他事务(如数据库)集成方便
*
* 工作原理:
* 1. 方法开始时自动调用beginTransaction()
* 2. 方法正常结束时自动调用commitTransaction()
* 3. 抛出RuntimeException时自动调用abortTransaction()
*
* @param userMessage 用户消息
* @param orderMessage 订单消息
*/
@Transactional(transactionManager = "kafkaTransactionManager")
public void sendMessagesInTransaction(UserMessage userMessage, OrderMessage orderMessage) {
try {
log.info("开始事务发送消息...");

// 发送用户消息
log.info("发送用户消息: {}", userMessage);
transactionalKafkaTemplate.send("user-message-topic",
String.valueOf(userMessage.getUserId()),
userMessage);

// 模拟业务处理
log.info("处理业务逻辑...");

// 发送订单消息
log.info("发送订单消息: {}", orderMessage);
transactionalKafkaTemplate.send("order-message-topic",
String.valueOf(orderMessage.getOrderId()),
orderMessage);

log.info("事务消息发送成功,等待提交...");

// 方法正常结束,Spring自动提交事务

} catch (Exception e) {
log.error("事务消息发送失败,将回滚: {}", e.getMessage());
// 抛出RuntimeException,Spring自动回滚事务
throw new RuntimeException("事务执行失败", e);
}
}

/**
* 事务发送 - 带业务校验
*
* 场景:订单创建时需要同时发送多条消息
* - 用户消息:通知用户
* - 订单消息:记录订单
* - 库存消息:扣减库存
*
* 如果任何一步失败,所有消息都不会发送
*/
@Transactional(transactionManager = "kafkaTransactionManager")
public void createOrderWithTransaction(UserMessage userMessage, OrderMessage orderMessage) {
log.info("====================================");
log.info("开始事务:创建订单");

try {
// 步骤1:发送用户通知消息
log.info("步骤1:发送用户通知");
transactionalKafkaTemplate.send("user-message-topic",
String.valueOf(userMessage.getUserId()),
userMessage);

// 步骤2:业务校验(模拟)
log.info("步骤2:校验订单信息");
validateOrder(orderMessage);

// 步骤3:发送订单消息
log.info("步骤3:发送订单消息");
transactionalKafkaTemplate.send("order-message-topic",
String.valueOf(orderMessage.getOrderId()),
orderMessage);

// 步骤4:发送库存扣减消息(模拟)
log.info("步骤4:发送库存扣减消息");
String inventoryMessage = String.format("减少库存:产品ID=%d, 数量=%d",
orderMessage.getProductId(),
orderMessage.getQuantity());
transactionalKafkaTemplate.send("inventory-topic", inventoryMessage);

log.info("所有消息发送完成,提交事务");
log.info("====================================");

} catch (Exception e) {
log.error("订单创建失败,回滚所有消息: {}", e.getMessage());
log.info("====================================");
throw new RuntimeException("订单创建失败", e);
}
}

/**
* 方式二:编程式事务
*
* 优点:
* - 更灵活的控制
* - 可以在方法内部控制事务范围
*
* 使用场景:
* - 需要在同一方法中执行多个独立事务
* - 需要精确控制事务边界
*/
public void sendWithProgrammaticTransaction(String topic, Object message) {
log.info("使用编程式事务发送消息...");

// executeInTransaction会自动管理事务
transactionalKafkaTemplate.executeInTransaction(operations -> {
try {
log.info("事务内发送消息到 {}: {}", topic, message);
operations.send(topic, message);

// 可以在这里发送多条消息
log.info("事务内的其他操作...");

return true;
} catch (Exception e) {
log.error("事务执行失败: {}", e.getMessage());
// 返回null或抛出异常都会导致事务回滚
throw new RuntimeException("事务失败", e);
}
});

log.info("编程式事务执行完成");
}

/**
* 批量事务发送
*
* 将多条消息在一个事务中批量发送
*/
@Transactional(transactionManager = "kafkaTransactionManager")
public void sendBatchInTransaction(String topic, java.util.List<String> messages) {
log.info("====================================");
log.info("批量事务发送 {} 条消息", messages.size());

int count = 0;
for (String message : messages) {
transactionalKafkaTemplate.send(topic, message);
count++;
log.debug("已发送 {}/{} 条消息", count, messages.size());
}

log.info("批量消息发送完成,提交事务");
log.info("====================================");
}

// ==================== 私有辅助方法 ====================

/**
* 订单校验(模拟)
*/
private void validateOrder(OrderMessage order) {
if (order.getAmount().doubleValue() <= 0) {
throw new IllegalArgumentException("订单金额必须大于0");
}
if (order.getQuantity() <= 0) {
throw new IllegalArgumentException("订单数量必须大于0");
}
log.info("订单校验通过");
}
}

事务消费者

那么事务消费者我只举一个方法,不同的内容就在于,指定containerFactory = "transactionalKafkaListenerContainerFactory"为事务配置的工厂类

设置isolation.level=read_committed,消费者只读取已提交的事务消息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
/**
* 消费事务订单消息
*/
@KafkaListener(
topics = "order-message-topic",
groupId = "transaction-order-group",
containerFactory = "transactionalKafkaListenerContainerFactory"
)
public void consumeTransactionalOrderMessage(OrderMessage orderMessage, Acknowledgment ack) {
log.info("====================================");
log.info("事务消费者接收到订单消息:");
log.info(" 订单ID: {}", orderMessage.getOrderId());
log.info(" 产品: {}", orderMessage.getProductName());
log.info(" 金额: {}", orderMessage.getAmount());
log.info(" 状态: {}", orderMessage.getStatus());
log.info(" 说明: 此消息已经过事务提交确认");
log.info("====================================");

try {
// 处理订单
processOrder(orderMessage);

// 提交offset
ack.acknowledge();

} catch (Exception e) {
log.error("订单处理失败: {}", e.getMessage());
}
}

这样,Kafkai就可以成功的集成事务

消息重试和死信队列

Kafka 本身不直接提供重试机制,而是通过 Spring Kafka 的重试配置异常处理器实现。核心思路是:消费失败时,将消息重新放入待处理队列,等待一段时间后再次消费,直到达到最大重试次数。

那么一般情况下,我们需要定义死信队列的主题

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
/**
* 主题:死信队列主题
*
* 命名规范:
* - 原主题名.DLT (Dead Letter Topic)
* - 例如:retry-business-topic.DLT
*
* Spring Kafka默认使用这种命名规范
*/
@Bean
public NewTopic deadLetterTopic() {
return TopicBuilder.name("retry-business-topic.DLT")
.partitions(3)
.replicas(1)
// 死信队列消息保留时间更长(7天)
.config("retention.ms", "604800000")
.build();
}

/**
* 主题:重试主题(可选,高级用法)
*
* 某些场景下,可以创建专门的重试主题
* 消息先进入重试主题,延迟后再消费
*/
@Bean
public NewTopic retryTopic() {
return TopicBuilder.name("retry-business-topic.RETRY")
.partitions(3)
.replicas(1)
.build();
}

那么对于重试的配置,我们一般情况下是在application.yml中配置重试参数,控制重试次数、间隔等:

1
2
3
4
5
6
7
8
9
10
11
spring:
kafka:
listener:
# 重试机制核心配置(针对异常未被捕获的情况)
retry:
enabled: true # 开启重试(默认true)
max-attempts: 3 # 最大重试次数(包含首次消费,共3次:1次正常+2次重试)
backoff:
initial-interval: 1000ms # 首次重试间隔(1秒)
multiplier: 2.0 # 重试间隔乘数(第二次间隔=1000*2=2秒,第三次=2000*2=4秒)
max-interval: 10000ms # 最大重试间隔(不超过10秒)

对于死信队列的流程,一般是这样的,消息处理失败 -> 重试N次 -> 仍然失败 -> 发送到死信队列 -> 人工处理或告警

所以结合死信队列的错误重试,我们需要在其对应的配置类中配置错误处理器,以指数退避为例子

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
@Bean
public CommonErrorHandler exponentialBackOffErrorHandler(KafkaTemplate<String, Object> kafkaTemplate) {
// 创建死信队列发布恢复器(带自定义头信息)
DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer(
kafkaTemplate,
(record, ex) -> {
log.error("===== 消息进入死信队列 =====");
log.error("原始Topic: {}", record.topic());
log.error("原始Partition: {}", record.partition());
log.error("原始Offset: {}", record.offset());
log.error("失败原因: {}", ex.getMessage());
log.error("消息内容: {}", record.value());
log.error("============================");

return new TopicPartition(record.topic() + ".DLT", record.partition());
}
);

// 配置指数退避策略
ExponentialBackOffWithMaxRetries exponentialBackOff = new ExponentialBackOffWithMaxRetries(5);
exponentialBackOff.setInitialInterval(1000L); // 初始间隔1秒
exponentialBackOff.setMultiplier(2.0); // 每次翻倍
exponentialBackOff.setMaxInterval(10000L); // 最大间隔10秒

// 创建错误处理器
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, exponentialBackOff);

// 配置不重试的异常
errorHandler.addNotRetryableExceptions(
IllegalArgumentException.class, // 参数错误
NullPointerException.class // 空指针
);

// 设置日志级别(可以看到每次重试的日志)
errorHandler.setLogLevel(org.springframework.kafka.KafkaException.Level.ERROR);

return errorHandler;
}

在这个配置中,我们使用了DeadLetterPublishingRecoverer,它是 Spring Kafka 提供的死信消息转发器,作用是将 “达到最大重试次数仍失败” 的消息转发到死信队列(DLT)。

我们通过 Lambda 表达式定义了死信队列的路由逻辑:

1
2
3
4
(record, ex) -> {
// 死信队列命名规则:原主题名 + ".DLT",分区与原消息相同
return new TopicPartition(record.topic() + ".DLT", record.partition());
}

而且转发到死信队列的消息不仅包含原消息的keyvalue,还会自动添加死信头信息DefaultErrorHandler是 Spring Kafka 2.8 + 引入的新一代异常处理器,整合了 “重试逻辑” 和 “死信转发”

这样的死信和重试流程就是这样子了

1
0秒(失败) -> 1秒(重试1) -> 3秒(重试2) -> 7秒(重试3) -> 15秒(重试4) -> 25秒(重试5) -> DLQ

对了,上述的不重试异常指的是某些异常重试无意义,应该直接进入死信队列

虽然你也可以只重试不DLQ,这样消息一种不丢,但是消费也被一直阻塞

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
@Bean
public CommonErrorHandler retryOnlyErrorHandler() {
// 不提供DeadLetterPublishingRecoverer,重试失败后会停止消费
FixedBackOff backOff = new FixedBackOff(3000L, 10L); // 3秒间隔,最多重试10次

DefaultErrorHandler errorHandler = new DefaultErrorHandler(null, backOff);

// 重试用尽后的处理(记录日志、发送告警等)
errorHandler.setRetryListeners((record, ex, deliveryAttempt) -> {
log.error("消息重试第 {} 次失败. Topic: {}, Partition: {}, Offset: {}",
deliveryAttempt,
record.topic(),
record.partition(),
record.offset());
});

return errorHandler;
}

生产者不用咋动,生产者配鸡毛死信和重试,露头就被秒了?

我们一般把 Kafka 死信队列消费者服务单列出来一个类,一般就是正常处理消息内容,只不过是放到死信处理的萝莉里处理,然后打印各种需要的信息

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
/**
* Kafka 死信队列消费者服务
*
* ==================== 核心功能 ====================
*
* 1. 消费死信队列中的消息
* 2. 记录失败消息的详细信息
* 3. 进行告警或人工介入处理
* 4. 可以尝试修复并重新发送到原队列
*
* ==================== 死信队列消息特点 ====================
*
* 1. 包含原始消息内容
* 2. 包含失败原因(异常信息)
* 3. 包含原始Topic、Partition、Offset信息
* 4. 包含重试次数等元数据
*
* ==================== 处理策略 ====================
*
* 1. 记录日志:详细记录失败原因,便于排查
* 2. 发送告警:通知相关人员处理
* 3. 存储数据库:持久化失败消息,便于后续处理
* 4. 人工修复:修复数据后重新发送
* 5. 业务补偿:执行补偿逻辑
*
* ==================== 使用建议 ====================
*
* 1. 死信队列消费者不应该再抛出异常
* 2. 使用try-catch包裹所有处理逻辑
* 3. 记录详细的日志信息
* 4. 建立监控和告警机制
* 5. 定期清理或归档死信消息
*/
@Slf4j
@Service
public class KafkaDeadLetterConsumerService {

/**
* 消费业务Topic的死信队列
*
* Topic命名规范:原Topic + .DLT
* 例如:retry-business-topic.DLT
*/
@KafkaListener(
topics = "retry-business-topic.DLT",
groupId = "dlq-business-group"
)
public void consumeBusinessDLQ(
ConsumerRecord<String, String> record,
@Header(value = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage,
@Header(value = KafkaHeaders.EXCEPTION_STACKTRACE, required = false) String stackTrace,
@Header(value = KafkaHeaders.ORIGINAL_TOPIC, required = false) String originalTopic,
@Header(value = KafkaHeaders.ORIGINAL_PARTITION, required = false) Integer originalPartition,
@Header(value = KafkaHeaders.ORIGINAL_OFFSET, required = false) Long originalOffset) {

log.error("╔════════════════════════════════════════════════════════════════╗");
log.error("║ 收到死信队列消息 - 业务Topic ║");
log.error("╚════════════════════════════════════════════════════════════════╝");

// 原始消息信息
log.error("【原始消息信息】");
log.error(" 原始Topic: {}", originalTopic != null ? originalTopic : "未知");
log.error(" 原始Partition: {}", originalPartition != null ? originalPartition : "未知");
log.error(" 原始Offset: {}", originalOffset != null ? originalOffset : "未知");

// 死信队列信息
log.error("【死信队列信息】");
log.error(" DLQ Topic: {}", record.topic());
log.error(" DLQ Partition: {}", record.partition());
log.error(" DLQ Offset: {}", record.offset());
log.error(" DLQ Timestamp: {}", new java.util.Date(record.timestamp()));

// 消息内容
log.error("【消息内容】");
log.error(" Key: {}", record.key());
log.error(" Value: {}", record.value());

// 失败原因
log.error("【失败原因】");
log.error(" 异常消息: {}", exceptionMessage != null ? exceptionMessage : "无异常信息");
if (stackTrace != null && !stackTrace.isEmpty()) {
log.error(" 异常堆栈(前200字符): {}",
stackTrace.length() > 200 ? stackTrace.substring(0, 200) + "..." : stackTrace);
}

log.error("════════════════════════════════════════════════════════════════");

try {
// 处理死信消息
handleDeadLetterMessage(record, exceptionMessage, originalTopic);

log.info("死信消息处理完成");

} catch (Exception e) {
// 死信队列消费者捕获所有异常,避免再次进入死信队列
log.error("处理死信消息时发生异常(已捕获): {}", e.getMessage());
}
}

/**
* 消费用户消息的死信队列
*/
@KafkaListener(
topics = "user-message-topic.DLT",
groupId = "dlq-user-group"
)
public void consumeUserMessageDLQ(
ConsumerRecord<String, UserMessage> record,
@Header(value = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage) {

log.error("╔════════════════════════════════════════════════════════════════╗");
log.error("║ 收到死信队列消息 - 用户消息 ║");
log.error("╚════════════════════════════════════════════════════════════════╝");

UserMessage userMessage = record.value();

log.error("【死信用户消息】");
log.error(" 用户ID: {}", userMessage.getUserId());
log.error(" 用户名: {}", userMessage.getUsername());
log.error(" 邮箱: {}", userMessage.getEmail());
log.error(" 操作: {}", userMessage.getOperation());
log.error(" 时间: {}", userMessage.getTimestamp());

log.error("【失败原因】");
log.error(" 异常: {}", exceptionMessage);

log.error("════════════════════════════════════════════════════════════════");

try {
// 可以尝试修复数据并重新处理
handleFailedUserMessage(userMessage, exceptionMessage);

} catch (Exception e) {
log.error("处理用户死信消息时发生异常: {}", e.getMessage());
}
}

/**
* 消费订单消息的死信队列
*/
@KafkaListener(
topics = "order-message-topic.DLT",
groupId = "dlq-order-group"
)
public void consumeOrderMessageDLQ(
ConsumerRecord<String, OrderMessage> record,
@Header(value = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage) {

log.error("╔════════════════════════════════════════════════════════════════╗");
log.error("║ 收到死信队列消息 - 订单消息 ║");
log.error("╚════════════════════════════════════════════════════════════════╝");

OrderMessage orderMessage = record.value();

log.error("【死信订单消息】");
log.error(" 订单ID: {}", orderMessage.getOrderId());
log.error(" 用户ID: {}", orderMessage.getUserId());
log.error(" 产品: {}", orderMessage.getProductName());
log.error(" 金额: {}", orderMessage.getAmount());
log.error(" 数量: {}", orderMessage.getQuantity());
log.error(" 状态: {}", orderMessage.getStatus());
log.error(" 创建时间: {}", orderMessage.getCreateTime());

log.error("【失败原因】");
log.error(" 异常: {}", exceptionMessage);

log.error("════════════════════════════════════════════════════════════════");

try {
// 处理失败的订单
handleFailedOrderMessage(orderMessage, exceptionMessage);

} catch (Exception e) {
log.error("处理订单死信消息时发生异常: {}", e.getMessage());
}
}

// ==================== 私有处理方法 ====================

/**
* 处理死信消息的通用逻辑
*
* 可以执行以下操作:
* 1. 保存到数据库
* 2. 发送告警通知
* 3. 记录到监控系统
* 4. 触发人工审核流程
*/
private void handleDeadLetterMessage(
ConsumerRecord<String, String> record,
String exceptionMessage,
String originalTopic) {

log.info("开始处理死信消息...");

// 1. 保存到数据库(示例)
saveToDatabase(record, exceptionMessage, originalTopic);

// 2. 发送告警
sendAlert(record, exceptionMessage);

// 3. 记录到监控系统
recordMetrics(originalTopic);

log.info("死信消息处理完成");
}

/**
* 处理失败的用户消息
*/
private void handleFailedUserMessage(UserMessage userMessage, String exceptionMessage) {
log.info("处理失败的用户消息: userId={}", userMessage.getUserId());

// 根据失败原因,执行不同的补偿逻辑
if (exceptionMessage != null && exceptionMessage.contains("参数")) {
log.warn("参数错误,需要人工修复数据");
// 触发人工审核流程
} else {
log.warn("系统错误,可能需要重试");
// 可以在修复系统后,重新发送到原Topic
}
}

/**
* 处理失败的订单消息
*/
private void handleFailedOrderMessage(OrderMessage orderMessage, String exceptionMessage) {
log.info("处理失败的订单消息: orderId={}", orderMessage.getOrderId());

// 执行订单补偿逻辑
if (exceptionMessage != null && exceptionMessage.contains("库存")) {
log.warn("库存服务失败,可能需要手动释放预占库存");
// 执行库存回滚
} else if (exceptionMessage != null && exceptionMessage.contains("金额")) {
log.error("金额异常,需要人工审核");
// 冻结订单,等待人工处理
}
}

/**
* 保存到数据库(示例方法)
*/
private void saveToDatabase(
ConsumerRecord<String, String> record,
String exceptionMessage,
String originalTopic) {

log.debug("保存死信消息到数据库");
log.debug(" Topic: {}", originalTopic);
log.debug(" 消息: {}", record.value());
log.debug(" 异常: {}", exceptionMessage);

// 实际项目中,这里应该调用数据库服务
// Example:
// DeadLetterRecord dlRecord = new DeadLetterRecord();
// dlRecord.setOriginalTopic(originalTopic);
// dlRecord.setMessageContent(record.value());
// dlRecord.setExceptionMessage(exceptionMessage);
// dlRecord.setCreateTime(LocalDateTime.now());
// deadLetterRepository.save(dlRecord);
}

/**
* 发送告警(示例方法)
*/
private void sendAlert(ConsumerRecord<String, String> record, String exceptionMessage) {
log.warn("【告警】发现死信消息");
log.warn(" Topic: {}", record.topic());
log.warn(" 异常: {}", exceptionMessage);

// 实际项目中,这里应该调用告警服务
// Example:
// alertService.send(AlertLevel.ERROR,
// "Kafka死信队列告警",
// "Topic: " + record.topic() + ", 异常: " + exceptionMessage);

// 或发送邮件、短信、钉钉等通知
}

/**
* 记录监控指标(示例方法)
*/
private void recordMetrics(String originalTopic) {
log.debug("记录死信队列指标: topic={}", originalTopic);

// 实际项目中,这里应该上报到监控系统
// Example:
// metricsService.increment("kafka.dlq.count",
// Tags.of("topic", originalTopic));
}

/**
* 重新发送消息到原Topic(用于手动修复后重试)
*
* 注意:需要注入KafkaTemplate
*/
public void retrySendToOriginalTopic(String originalTopic, String message) {
log.info("重新发送消息到原Topic: {}", originalTopic);
log.info("消息内容: {}", message);

// 需要注入KafkaTemplate并调用send方法
// kafkaTemplate.send(originalTopic, message);

log.info("消息已重新发送");
}
}

我们配置了各种重试的内容,那么此时消费者会在抛出可重试异常的时候重试,当消息消费失败时,按照配置类中的内容自动重试了,再失败就进去DLQ

在重试过程中,如果消费者崩溃,消息可能会被重复消费,所以说重试一般配合幂等性一起实现。

而且,重试会阻塞其他消息,因为默认情况下,同一分区的消息会按顺序处理,所以重试会阻塞后续消息。这就是为什么,我考虑使用专门的重试Topic

如何防止死信队列无限增长?

1
2
3
4
5
6
7
@Bean
public NewTopic deadLetterTopic() {
return TopicBuilder.name("business-topic.DLT")
.config("retention.ms", "604800000") // 7天
.config("cleanup.policy", "delete")
.build();
}

可以针对不同异常使用不同的重试策略吗?太可以了,抱着你的自定义ErrorHandler去玩吧

1
2
3
4
5
6
7
8
9
10
11
12
13
public class CustomErrorHandler extends DefaultErrorHandler {
@Override
public void handleRemaining(Exception thrownException, ...) {
if (thrownException instanceof NetworkException) {
// 网络异常:快速重试
applyBackOff(new FixedBackOff(500, 10));
} else if (thrownException instanceof ServiceException) {
// 服务异常:慢速重试
applyBackOff(new ExponentialBackOff(5000, 2.0));
}
super.handleRemaining(thrownException, ...);
}
}

ReplyingKafkaTemplate获得消息回复

RPC 调用知不知道,而ReplyingKafkaTemplate就是 Spring Kafka 提供的请求 - 响应模式核心工具,很像rpc模式,能让生产者发送消息后,同步或异步接收消费者的回复

ReplyingKafkaTemplate本质是 “生产者 + 消费者” 的封装,通过两个主题实现请求 - 响应:

  1. 请求主题(Request Topic):生产者发送请求消息到该主题,消费者监听并处理。
  2. 响应主题(Reply Topic):消费者处理完成后,将回复消息发送到该主题,ReplyingKafkaTemplate内置的消费者监听并接收回复。

也就是说,流程变成这样了,生产者发送请求(Request Topic)消费者监听并处理消费者发送回复(Reply Topic)ReplyingKafkaTemplate接收回复

它有一些专属的配置

1
2
3
4
5
6
7
# ReplyingKafkaTemplate专属配置(内置消费者的配置)
template:
default-topic: request-topic # 默认请求主题
reply-template:
default-topic: reply-topic # 默认响应主题
consumer:
group-id: reply-consumer-group # 内置回复消费者的组ID(必须唯一)

ReplyingKafkaTemplate需要依赖ProducerFactory(生产者工厂)和ConcurrentKafkaListenerContainerFactory(回复消费者的容器工厂),需手动注册为 Spring Bean

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
@Configuration
public class KafkaReplyConfig {

// 1. 生产者工厂(复用Spring自动配置的ProducerFactory,也可自定义)
@Bean
public ProducerFactory<String, Object> producerFactory(Map<String, Object> producerConfigs) {
return new DefaultKafkaProducerFactory<>(producerConfigs);
}

// 2. 回复消费者的容器工厂(用于ReplyingKafkaTemplate内置的回复消费者)
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Object> replyContainerFactory(
ConsumerFactory<String, Object> consumerFactory) {
ConcurrentKafkaListenerContainerFactory<String, Object> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory);
return factory;
}

// 3. 注册ReplyingKafkaTemplate(核心Bean)
@Bean
public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate(
ProducerFactory<String, Object> producerFactory,
ConcurrentKafkaListenerContainerFactory<String, Object> replyContainerFactory) {
// 创建回复消费者容器(监听响应主题)
ConcurrentMessageListenerContainer<String, Object> replyContainer =
replyContainerFactory.createContainer("reply-topic");
// 设置回复消费者的组ID(需与配置文件一致)
replyContainer.getContainerProperties().setGroupId("reply-consumer-group");

// 初始化ReplyingKafkaTemplate(生产者工厂 + 回复消费者容器)
return new ReplyingKafkaTemplate<>(producerFactory, replyContainer);
}
}

生产者通过ReplyingKafkaTemplate发送请求消息,并同步 / 异步接收回复。核心方法是sendAndReceive()(同步)和send()+ 回调(异步)。

要注意,请求消息和回复消息都必须包含KafkaHeaders.CORRELATION_ID(唯一标识),否则ReplyingKafkaTemplate无法将回复与请求匹配,导致生产者永远收不到回复。

响应主题可通过两种方式指定:

  • 请求头携带(setHeader(KafkaHeaders.REPLY_TOPIC, "reply-topic")):灵活,支持动态切换主题。
  • 配置文件默认(spring.kafka.reply-template.default-topic):固定,适合单一响应主题场景。

当需要确认回复后再继续相关业务的场景,就可以使用发送请求后阻塞等待回复

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
// 注入ReplyingKafkaTemplate(泛型:key类型,请求消息类型,响应消息类型)
@Autowired
private ReplyingKafkaTemplate<String, TaskRequestDTO, TaskReplyDTO> replyingKafkaTemplate;

// 注入普通KafkaTemplate
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

/**
* 同步发送请求并接收回复
* @param taskRequest 请求消息
* @return 响应消息
*/
public TaskReplyDTO sendSyncRequest(TaskRequestDTO taskRequest) {
try {
// 1. 构建请求消息(添加唯一ID,用于关联请求和回复)
String correlationId = "corr-" + System.currentTimeMillis() + "-" + taskRequest.getTaskId();
Message<TaskRequestDTO> requestMessage = MessageBuilder
.withPayload(taskRequest)
// 必须添加CORRELATION_ID:用于ReplyingKafkaTemplate识别哪个回复对应哪个请求
.setHeader(KafkaHeaders.CORRELATION_ID, correlationId)
// 指定响应主题(可选,默认用配置的reply-topic)
.setHeader(KafkaHeaders.REPLY_TOPIC, "reply-topic")
.build();

log.info("发送请求消息:taskId={}, correlationId={}", taskRequest.getTaskId(), correlationId);

// 2. 发送请求并获取Future(阻塞等待回复,默认超时时间由配置决定)
RequestReplyFuture<String, TaskRequestDTO, TaskReplyDTO> future =
replyingKafkaTemplate.sendAndReceive(requestMessage);

// 3. 获取回复结果(阻塞直到收到回复或超时)
Message<TaskReplyDTO> replyMessage = future.get();
TaskReplyDTO reply = replyMessage.getPayload();

log.info("收到回复消息:taskId={}, success={}, message={}",
reply.getTaskId(), reply.isSuccess(), reply.getMessage());

return reply;

} catch (Exception e) {
log.error("发送请求或接收回复失败:", e);
// 异常时返回默认失败响应
TaskReplyDTO errorReply = new TaskReplyDTO();
errorReply.setTaskId(taskRequest.getTaskId());
errorReply.setSuccess(false);
errorReply.setMessage("请求处理失败:" + e.getMessage());
errorReply.setTimestamp(System.currentTimeMillis());
return errorReply;
}
}

但是这个过程也可以是异步的,也就是发送请求后不阻塞,通过回调接收回复,不会影响业务的进行

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
/**
* 异步发送请求,通过回调接收回复
* @param taskRequest 请求消息
*/
public void sendAsyncRequest(TaskRequestDTO taskRequest) {
// 1. 构建请求消息(同同步场景,必须加CORRELATION_ID)
String correlationId = "corr-" + System.currentTimeMillis() + "-" + taskRequest.getTaskId();
Message<TaskRequestDTO> requestMessage = MessageBuilder
.withPayload(taskRequest)
.setHeader(KafkaHeaders.CORRELATION_ID, correlationId)
.setHeader(KafkaHeaders.REPLY_TOPIC, "reply-topic")
.build();

log.info("异步发送请求消息:taskId={}, correlationId={}", taskRequest.getTaskId(), correlationId);

// 2. 发送请求并注册回调(非阻塞)
RequestReplyFuture<String, TaskRequestDTO, TaskReplyDTO> future =
replyingKafkaTemplate.sendAndReceive(requestMessage);

// 3. 异步回调处理回复
future.whenComplete((replyMessage, ex) -> {
if (ex == null) {
// 接收回复成功
TaskReplyDTO reply = replyMessage.getPayload();
log.info("异步收到回复:taskId={}, success={}", reply.getTaskId(), reply.isSuccess());
} else {
// 接收回复失败
log.error("异步接收回复失败:correlationId={}", correlationId, ex);
}
});
}

那么消费者需监听 “请求主题”,处理业务逻辑后,将回复消息发送到 “响应主题”,并通过KafkaHeaders.CORRELATION_ID关联请求和回复。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.KafkaHeaders;
import org.springframework.messaging.Message;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.messaging.handler.annotation.Payload;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class KafkaReplyConsumerService {

// 注入普通KafkaTemplate(用于发送回复消息到响应主题)
@Autowired
private KafkaTemplate<String, Object> kafkaTemplate;

/**
* 监听请求主题,处理请求并发送回复
* @param request 请求消息体
* @param correlationId 请求的唯一关联ID(用于回复关联)
* @param replyTopic 响应主题(从请求头中获取,避免硬编码)
*/
@KafkaListener(topics = "request-topic", groupId = "request-consumer-group")
public void handleRequest(
@Payload TaskRequestDTO request, // 请求消息体
@Header(KafkaHeaders.CORRELATION_ID) String correlationId, // 请求关联ID
@Header(KafkaHeaders.REPLY_TOPIC) String replyTopic) { // 响应主题(从请求头获取)

log.info("收到请求消息:taskId={}, correlationId={}, replyTopic={}",
request.getTaskId(), correlationId, replyTopic);

// 1. 处理业务逻辑(如执行分布式任务)
TaskReplyDTO reply = processTask(request);

// 2. 构建回复消息(必须携带原请求的CORRELATION_ID,否则生产者无法识别)
Message<TaskReplyDTO> replyMessage = MessageBuilder
.withPayload(reply)
// 携带原请求的CORRELATION_ID,确保生产者能匹配到对应的请求
.setHeader(KafkaHeaders.CORRELATION_ID, correlationId)
.build();

// 3. 发送回复消息到响应主题
kafkaTemplate.send(replyTopic, replyMessage);
log.info("发送回复消息:taskId={}, correlationId={}", reply.getTaskId(), correlationId);
}

/**
* 模拟业务逻辑处理(如执行任务、调用其他服务)
*/
private TaskReplyDTO processTask(TaskRequestDTO request) {
}
}

ReplyingKafkaTemplate内置的回复消费者组 ID(reply-consumer-group)必须唯一,不能与其他消费者组重复,否则会导致回复消息被其他消费者消费,生产者收不到回复。

而且请求和回复消息的序列化器(如JsonSerializer)和反序列化器(如JsonDeserializer)必须一致,且spring.json.trusted.packages必须包含 DTO 类的包路径,否则会报 “反序列化安全异常”。

@KafkaListener注解监听器生命周期

这里还有一点我在项目示例中没说,就是@KafkaListener注解监听器生命周期

@KafkaListener注解的监听器的生命周期是可以控制的,默认情况下,@KafkaListener的参数autoStartup = "true"。也就是自动启动消费,但是也可以同过KafkaListenerEndpointRegistry来干预他的生命周期。

KafkaListenerEndpointRegistry是 Spring 提供的 “监听器注册表”,所有通过@KafkaListener定义的监听器都会被注册到这里,通过它可统一管理生命周期。

KafkaListenerEndpointRegistry有三个动作方法分别如:start(),pause(),resume()启动,停止,继续。

也就是说,可以根据这个实现按需启停消费

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
@KafkaListener`的`autoStartup`参数决定监听器是否随 Spring 启动而自动启动,默认值为`"true"`(字符串类型,支持 SpEL 表达式)。若需手动控制启动(如延迟消费),可将`autoStartup`设为`"false"
// 监听器1:默认自动启动(autoStartup = "true")
@KafkaListener(topics = "auto-start-topic", groupId = "auto-group", id = "auto-listener")
public void autoStartConsumer(String message) {
log.info("自动启动监听器接收:{}", message);
}

// 监听器2:关闭自动启动(需手动调用start())
@KafkaListener(
topics = "manual-start-topic",
groupId = "manual-group",
id = "manual-listener", // 必须指定id,否则无法通过registry定位
autoStartup = "false" // 关闭自动启动
)
public void manualStartConsumer(String message) {
log.info("手动启动监听器接收:{}", message);
}

如果关闭了自动启动,必须给监听器指定id(如id = "manual-listener"),KafkaListenerEndpointRegistry需通过id找到对应的监听器实例。

start 就是启动监听器(start()),用于启动autoStartup = "false"的监听器,或重启已停止的监听器。

暂停监听器(pause())就是用于暂停监听器的消费(暂停后不再从 Kafka 拉取消息,但不会关闭容器)。暂停后,Kafka 的消费者组会认为该消费者 “暂时不可用”,不会重新分配分区;恢复后可继续从暂停前的 offset 消费,不会丢失消息。

用于恢复被暂停的监听器,继续从暂停前的 offset 消费消息。

一般情况下,通过健康检查(如 Spring Boot Actuator)监测下游服务状态,故障时暂停消费,是为数不多用到这个的场景

1
2
3
4
5
6
7
8
9
10
11
12
// 定时检查下游服务健康状态
@Scheduled(fixedRate = 30000) // 每30秒检查一次
public void checkDownstreamHealth() {
boolean isHealthy = downstreamHealthService.check("inventory-service");
if (!isHealthy && !registry.getListenerContainer("order-listener").isPaused()) {
log.warn("库存服务故障,暂停订单消息消费");
lifecycleService.pauseListener("order-listener");
} else if (isHealthy && registry.getListenerContainer("order-listener").isPaused()) {
log.info("库存服务恢复,恢复订单消息消费");
lifecycleService.resumeListener("order-listener");
}
}

SendTo消息转发

在 Spring Kafka 中,@SendTo注解是实现 “消息自动转发” 的核心工具,它是简化消息转发的核心注解,通过 “声明式配置” 一定程度的替代了KafkaTemplate.send()手动发送逻辑。

其实除了做发送响应语义外,@SendTo注解还可以带一个参数,指定转发的Topic队列。

最常见的就是消息多重加工

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
@Slf4j
@Service
public class OrderForwardConsumer {

/**
* 消费订单创建消息,并自动转发到通知主题
* @param order 订单消息
* @return 转发的通知消息(会被自动发送到@SendTo指定的主题)
*/
@KafkaListener(topics = "order-create-topic", groupId = "order-group")
@SendTo("order-notify-topic") // 处理后转发到通知主题
public NotifyMessage handleOrder(OrderMessage order) {
log.info("处理订单:{}", order.getOrderId());

// 构建转发的通知消息
NotifyMessage notify = new NotifyMessage();
notify.setOrderId(order.getOrderId());
notify.setUserId(order.getUserId());
notify.setContent("您的订单已创建,订单号:" + order.getOrderId());
notify.setCreateTime(LocalDateTime.now());

return notify; // 返回值会被自动发送到"order-notify-topic"
}
}

你也可以在转发的消息中添加自定义头信息,转发给别人的时候返回Message对象

注意返回值的类型需与转发主题的消息类型匹配

通过 SpEL 表达式动态指定转发主题(如根据消息内容中的字段动态选择主题),适合需要按业务规则路由的场景,懂得都懂,不演示了

结合上面的内容,在请求 - 响应模式中(使用ReplyingKafkaTemplate),@SendTo可自动将回复消息发送到请求消息指定的响应主题,无需手动获取REPLY_TOPIC头信息,极大简化代码。

之前讲解ReplyingKafkaTemplate时,消费者需要手动获取REPLY_TOPIC头信息并发送回复:

1
2
3
4
5
@KafkaListener(topics = "request-topic")
public void handleRequest(
@Payload TaskRequestDTO request,
@Header(KafkaHeaders.REPLY_TOPIC) String replyTopic, // 手动获取响应主题
@Header(KafkaHeaders.CORRELATION_ID) String correlationId) {

@SendTo可自动识别请求消息中的REPLY_TOPIC头信息,将返回值作为回复消息发送到该主题,且自动携带CORRELATION_ID

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
@Service
public class RequestReplyConsumer {

/**
* 处理请求并自动回复(通过@SendTo简化回复逻辑)
* 无需手动获取REPLY_TOPIC和CORRELATION_ID,Spring Kafka自动处理
*/
@KafkaListener(topics = "request-topic", groupId = "request-reply-group")
@SendTo // 空注解:自动使用请求消息中的REPLY_TOPIC头信息
public TaskReplyDTO handleRequest(TaskRequestDTO request) {
log.info("处理请求:{}", request.getTaskId());

// 处理业务逻辑
TaskReplyDTO reply = new TaskReplyDTO();
reply.setTaskId(request.getTaskId());
reply.setSuccess(true);
reply.setMessage("请求处理完成");
return reply; // 返回值会被自动发送到请求消息指定的响应主题
}
}

@SendTo无参数时,Spring Kafka 会自动读取请求消息中的KafkaHeaders.REPLY_TOPIC头信息,将回复发送到该主题。回复消息会自动携带原请求的CORRELATION_ID,无敌了