之前项目中没有提到的一些额外的内容
为Kafka配置事务消息
Kafka事务允许生产者将多条消息作为一个原子操作发送,要么全部成功,要么全部失败。这确保了数据的一致性。
默认情况下,Spring-kafka自动生成的KafkaTemplate实例,是不具有事务消息发送能力的。需要使用如下配置激活事务特性。事务激活后,所有的消息发送只能在发生事务的方法内执行了,不然就会抛一个没有事务交易的异常
1
| spring.kafka.producer.transaction-id-prefix=kafka_tx.
|
事务有一个 Exactly-Once语义,即使重试,消息也只会被处理一次
添加事务的配置
在配置文件中添加事务的配置
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69
| spring: application: name: kafka-message kafka: bootstrap-servers: localhost:9092 producer: retries: 3 batch-size: 16384 buffer-memory: 33554432 key-serializer: org.apache.kafka.common.serialization.StringSerializer value-serializer: org.springframework.kafka.support.serializer.JsonSerializer acks: all compression-type: gzip transaction-id-prefix: kafka-tx- properties: transaction.timeout.ms: 60000 enable.idempotence: true max.in.flight.requests.per.connection: 5 consumer: group-id: kafka-message-group enable-auto-commit: true auto-commit-interval: 1000 key-deserializer: org.apache.kafka.common.serialization.StringDeserializer value-deserializer: org.springframework.kafka.support.serializer.JsonDeserializer auto-offset-reset: earliest properties: spring.json.trusted.packages: "*" max.poll.records: 500 isolation.level: read_committed listener: type: single ack-mode: batch concurrency: 3
server: port: 8085
logging: level: root: INFO org.springframework.kafka: INFO hbnu.project.kafkamessage: DEBUG
|
transaction-id-prefix:事务ID前缀,每个生产者实例会自动添加后缀形成唯一ID
enable.idempotence: true:开启幂等性,防止重试导致消息重复
isolation.level: read_committed:消费者只读取已提交的事务消息
然后我们需要一个事务的配置类
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135
|
@Configuration @EnableKafka @EnableTransactionManagement public class KafkaTransactionConfig {
@Value("${spring.kafka.bootstrap-servers}") private String bootstrapServers;
@Value("${spring.kafka.producer.transaction-id-prefix}") private String transactionIdPrefix;
@Bean public ProducerFactory<String, Object> transactionalProducerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); configProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonSerializer.class); configProps.put(ProducerConfig.TRANSACTIONAL_ID_CONFIG, transactionIdPrefix); configProps.put(ProducerConfig.ENABLE_IDEMPOTENCE_CONFIG, true); configProps.put(ProducerConfig.ACKS_CONFIG, "all"); configProps.put(ProducerConfig.MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION, 5); configProps.put(ProducerConfig.RETRIES_CONFIG, 3); configProps.put(ProducerConfig.BATCH_SIZE_CONFIG, 16384); configProps.put(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, 60000); DefaultKafkaProducerFactory<String, Object> factory = new DefaultKafkaProducerFactory<>(configProps); return factory; }
@Bean public KafkaTemplate<String, Object> transactionalKafkaTemplate() { return new KafkaTemplate<>(transactionalProducerFactory()); }
@Bean public KafkaTransactionManager<String, Object> kafkaTransactionManager() { return new KafkaTransactionManager<>(transactionalProducerFactory()); }
@Bean public ConsumerFactory<String, Object> transactionalConsumerFactory() { Map<String, Object> props = new HashMap<>(); props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); props.put(ConsumerConfig.GROUP_ID_CONFIG, "transaction-group"); props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.springframework.kafka.support.serializer.JsonDeserializer.class); props.put(ConsumerConfig.ISOLATION_LEVEL_CONFIG, "read_committed"); props.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false); props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest"); props.put("spring.json.trusted.packages", "*"); return new DefaultKafkaConsumerFactory<>(props); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, Object> transactionalKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(transactionalConsumerFactory()); factory.getContainerProperties().setAckMode( org.springframework.kafka.listener.ContainerProperties.AckMode.MANUAL); return factory; } }
|
创建事务生产者
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185
|
@Slf4j @Service public class KafkaTransactionalProducerService {
@Autowired private KafkaTemplate<String, Object> transactionalKafkaTemplate;
@Transactional(transactionManager = "kafkaTransactionManager") public void sendMessagesInTransaction(UserMessage userMessage, OrderMessage orderMessage) { try { log.info("开始事务发送消息..."); log.info("发送用户消息: {}", userMessage); transactionalKafkaTemplate.send("user-message-topic", String.valueOf(userMessage.getUserId()), userMessage); log.info("处理业务逻辑..."); log.info("发送订单消息: {}", orderMessage); transactionalKafkaTemplate.send("order-message-topic", String.valueOf(orderMessage.getOrderId()), orderMessage); log.info("事务消息发送成功,等待提交..."); } catch (Exception e) { log.error("事务消息发送失败,将回滚: {}", e.getMessage()); throw new RuntimeException("事务执行失败", e); } }
@Transactional(transactionManager = "kafkaTransactionManager") public void createOrderWithTransaction(UserMessage userMessage, OrderMessage orderMessage) { log.info("===================================="); log.info("开始事务:创建订单"); try { log.info("步骤1:发送用户通知"); transactionalKafkaTemplate.send("user-message-topic", String.valueOf(userMessage.getUserId()), userMessage); log.info("步骤2:校验订单信息"); validateOrder(orderMessage); log.info("步骤3:发送订单消息"); transactionalKafkaTemplate.send("order-message-topic", String.valueOf(orderMessage.getOrderId()), orderMessage); log.info("步骤4:发送库存扣减消息"); String inventoryMessage = String.format("减少库存:产品ID=%d, 数量=%d", orderMessage.getProductId(), orderMessage.getQuantity()); transactionalKafkaTemplate.send("inventory-topic", inventoryMessage); log.info("所有消息发送完成,提交事务"); log.info("===================================="); } catch (Exception e) { log.error("订单创建失败,回滚所有消息: {}", e.getMessage()); log.info("===================================="); throw new RuntimeException("订单创建失败", e); } }
public void sendWithProgrammaticTransaction(String topic, Object message) { log.info("使用编程式事务发送消息..."); transactionalKafkaTemplate.executeInTransaction(operations -> { try { log.info("事务内发送消息到 {}: {}", topic, message); operations.send(topic, message); log.info("事务内的其他操作..."); return true; } catch (Exception e) { log.error("事务执行失败: {}", e.getMessage()); throw new RuntimeException("事务失败", e); } }); log.info("编程式事务执行完成"); }
@Transactional(transactionManager = "kafkaTransactionManager") public void sendBatchInTransaction(String topic, java.util.List<String> messages) { log.info("===================================="); log.info("批量事务发送 {} 条消息", messages.size()); int count = 0; for (String message : messages) { transactionalKafkaTemplate.send(topic, message); count++; log.debug("已发送 {}/{} 条消息", count, messages.size()); } log.info("批量消息发送完成,提交事务"); log.info("===================================="); }
private void validateOrder(OrderMessage order) { if (order.getAmount().doubleValue() <= 0) { throw new IllegalArgumentException("订单金额必须大于0"); } if (order.getQuantity() <= 0) { throw new IllegalArgumentException("订单数量必须大于0"); } log.info("订单校验通过"); } }
|
事务消费者
那么事务消费者我只举一个方法,不同的内容就在于,指定containerFactory = "transactionalKafkaListenerContainerFactory"为事务配置的工厂类
设置isolation.level=read_committed,消费者只读取已提交的事务消息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
|
@KafkaListener( topics = "order-message-topic", groupId = "transaction-order-group", containerFactory = "transactionalKafkaListenerContainerFactory" ) public void consumeTransactionalOrderMessage(OrderMessage orderMessage, Acknowledgment ack) { log.info("===================================="); log.info("事务消费者接收到订单消息:"); log.info(" 订单ID: {}", orderMessage.getOrderId()); log.info(" 产品: {}", orderMessage.getProductName()); log.info(" 金额: {}", orderMessage.getAmount()); log.info(" 状态: {}", orderMessage.getStatus()); log.info(" 说明: 此消息已经过事务提交确认"); log.info("===================================="); try { processOrder(orderMessage); ack.acknowledge(); } catch (Exception e) { log.error("订单处理失败: {}", e.getMessage()); } }
|
这样,Kafkai就可以成功的集成事务
消息重试和死信队列
Kafka 本身不直接提供重试机制,而是通过 Spring Kafka
的重试配置和异常处理器实现。核心思路是:消费失败时,将消息重新放入待处理队列,等待一段时间后再次消费,直到达到最大重试次数。
那么一般情况下,我们需要定义死信队列的主题
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32
|
@Bean public NewTopic deadLetterTopic() { return TopicBuilder.name("retry-business-topic.DLT") .partitions(3) .replicas(1) .config("retention.ms", "604800000") .build(); }
@Bean public NewTopic retryTopic() { return TopicBuilder.name("retry-business-topic.RETRY") .partitions(3) .replicas(1) .build(); }
|
那么对于重试的配置,我们一般情况下是在application.yml中配置重试参数,控制重试次数、间隔等:
1 2 3 4 5 6 7 8 9 10 11
| spring: kafka: listener: retry: enabled: true max-attempts: 3 backoff: initial-interval: 1000ms multiplier: 2.0 max-interval: 10000ms
|
对于死信队列的流程,一般是这样的,消息处理失败 -> 重试N次 ->
仍然失败 -> 发送到死信队列 -> 人工处理或告警
所以结合死信队列的错误重试,我们需要在其对应的配置类中配置错误处理器,以指数退避为例子
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
| @Bean public CommonErrorHandler exponentialBackOffErrorHandler(KafkaTemplate<String, Object> kafkaTemplate) { DeadLetterPublishingRecoverer recoverer = new DeadLetterPublishingRecoverer( kafkaTemplate, (record, ex) -> { log.error("===== 消息进入死信队列 ====="); log.error("原始Topic: {}", record.topic()); log.error("原始Partition: {}", record.partition()); log.error("原始Offset: {}", record.offset()); log.error("失败原因: {}", ex.getMessage()); log.error("消息内容: {}", record.value()); log.error("============================"); return new TopicPartition(record.topic() + ".DLT", record.partition()); } );
ExponentialBackOffWithMaxRetries exponentialBackOff = new ExponentialBackOffWithMaxRetries(5); exponentialBackOff.setInitialInterval(1000L); exponentialBackOff.setMultiplier(2.0); exponentialBackOff.setMaxInterval(10000L);
DefaultErrorHandler errorHandler = new DefaultErrorHandler(recoverer, exponentialBackOff);
errorHandler.addNotRetryableExceptions( IllegalArgumentException.class, NullPointerException.class );
errorHandler.setLogLevel(org.springframework.kafka.KafkaException.Level.ERROR); return errorHandler; }
|
在这个配置中,我们使用了DeadLetterPublishingRecoverer,它是
Spring Kafka 提供的死信消息转发器,作用是将
“达到最大重试次数仍失败” 的消息转发到死信队列(DLT)。
我们通过 Lambda 表达式定义了死信队列的路由逻辑:
1 2 3 4
| (record, ex) -> { return new TopicPartition(record.topic() + ".DLT", record.partition()); }
|
而且转发到死信队列的消息不仅包含原消息的key和value,还会自动添加死信头信息,DefaultErrorHandler是
Spring Kafka 2.8 + 引入的新一代异常处理器,整合了 “重试逻辑” 和
“死信转发”
这样的死信和重试流程就是这样子了
1
| 0秒(失败) -> 1秒(重试1) -> 3秒(重试2) -> 7秒(重试3) -> 15秒(重试4) -> 25秒(重试5) -> DLQ
|
对了,上述的不重试异常指的是某些异常重试无意义,应该直接进入死信队列
虽然你也可以只重试不DLQ,这样消息一种不丢,但是消费也被一直阻塞
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| @Bean public CommonErrorHandler retryOnlyErrorHandler() { FixedBackOff backOff = new FixedBackOff(3000L, 10L); DefaultErrorHandler errorHandler = new DefaultErrorHandler(null, backOff); errorHandler.setRetryListeners((record, ex, deliveryAttempt) -> { log.error("消息重试第 {} 次失败. Topic: {}, Partition: {}, Offset: {}", deliveryAttempt, record.topic(), record.partition(), record.offset()); }); return errorHandler; }
|
生产者不用咋动,生产者配鸡毛死信和重试,露头就被秒了?
我们一般把 Kafka
死信队列消费者服务单列出来一个类,一般就是正常处理消息内容,只不过是放到死信处理的萝莉里处理,然后打印各种需要的信息
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225 226 227 228 229 230 231 232 233 234 235 236 237 238 239 240 241 242 243 244 245 246 247 248 249 250 251 252 253 254 255 256 257 258 259 260 261 262 263 264 265 266 267 268 269 270 271 272 273 274 275 276 277 278 279 280 281 282 283 284 285 286 287 288 289 290 291 292 293 294 295 296 297 298 299 300 301 302 303 304 305 306
| /** * Kafka 死信队列消费者服务 * * ==================== 核心功能 ==================== * * 1. 消费死信队列中的消息 * 2. 记录失败消息的详细信息 * 3. 进行告警或人工介入处理 * 4. 可以尝试修复并重新发送到原队列 * * ==================== 死信队列消息特点 ==================== * * 1. 包含原始消息内容 * 2. 包含失败原因(异常信息) * 3. 包含原始Topic、Partition、Offset信息 * 4. 包含重试次数等元数据 * * ==================== 处理策略 ==================== * * 1. 记录日志:详细记录失败原因,便于排查 * 2. 发送告警:通知相关人员处理 * 3. 存储数据库:持久化失败消息,便于后续处理 * 4. 人工修复:修复数据后重新发送 * 5. 业务补偿:执行补偿逻辑 * * ==================== 使用建议 ==================== * * 1. 死信队列消费者不应该再抛出异常 * 2. 使用try-catch包裹所有处理逻辑 * 3. 记录详细的日志信息 * 4. 建立监控和告警机制 * 5. 定期清理或归档死信消息 */ @Slf4j @Service public class KafkaDeadLetterConsumerService {
/** * 消费业务Topic的死信队列 * * Topic命名规范:原Topic + .DLT * 例如:retry-business-topic.DLT */ @KafkaListener( topics = "retry-business-topic.DLT", groupId = "dlq-business-group" ) public void consumeBusinessDLQ( ConsumerRecord<String, String> record, @Header(value = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage, @Header(value = KafkaHeaders.EXCEPTION_STACKTRACE, required = false) String stackTrace, @Header(value = KafkaHeaders.ORIGINAL_TOPIC, required = false) String originalTopic, @Header(value = KafkaHeaders.ORIGINAL_PARTITION, required = false) Integer originalPartition, @Header(value = KafkaHeaders.ORIGINAL_OFFSET, required = false) Long originalOffset) { log.error("╔════════════════════════════════════════════════════════════════╗"); log.error("║ 收到死信队列消息 - 业务Topic ║"); log.error("╚════════════════════════════════════════════════════════════════╝"); // 原始消息信息 log.error("【原始消息信息】"); log.error(" 原始Topic: {}", originalTopic != null ? originalTopic : "未知"); log.error(" 原始Partition: {}", originalPartition != null ? originalPartition : "未知"); log.error(" 原始Offset: {}", originalOffset != null ? originalOffset : "未知"); // 死信队列信息 log.error("【死信队列信息】"); log.error(" DLQ Topic: {}", record.topic()); log.error(" DLQ Partition: {}", record.partition()); log.error(" DLQ Offset: {}", record.offset()); log.error(" DLQ Timestamp: {}", new java.util.Date(record.timestamp())); // 消息内容 log.error("【消息内容】"); log.error(" Key: {}", record.key()); log.error(" Value: {}", record.value()); // 失败原因 log.error("【失败原因】"); log.error(" 异常消息: {}", exceptionMessage != null ? exceptionMessage : "无异常信息"); if (stackTrace != null && !stackTrace.isEmpty()) { log.error(" 异常堆栈(前200字符): {}", stackTrace.length() > 200 ? stackTrace.substring(0, 200) + "..." : stackTrace); } log.error("════════════════════════════════════════════════════════════════"); try { // 处理死信消息 handleDeadLetterMessage(record, exceptionMessage, originalTopic); log.info("死信消息处理完成"); } catch (Exception e) { // 死信队列消费者捕获所有异常,避免再次进入死信队列 log.error("处理死信消息时发生异常(已捕获): {}", e.getMessage()); } }
/** * 消费用户消息的死信队列 */ @KafkaListener( topics = "user-message-topic.DLT", groupId = "dlq-user-group" ) public void consumeUserMessageDLQ( ConsumerRecord<String, UserMessage> record, @Header(value = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage) { log.error("╔════════════════════════════════════════════════════════════════╗"); log.error("║ 收到死信队列消息 - 用户消息 ║"); log.error("╚════════════════════════════════════════════════════════════════╝"); UserMessage userMessage = record.value(); log.error("【死信用户消息】"); log.error(" 用户ID: {}", userMessage.getUserId()); log.error(" 用户名: {}", userMessage.getUsername()); log.error(" 邮箱: {}", userMessage.getEmail()); log.error(" 操作: {}", userMessage.getOperation()); log.error(" 时间: {}", userMessage.getTimestamp()); log.error("【失败原因】"); log.error(" 异常: {}", exceptionMessage); log.error("════════════════════════════════════════════════════════════════"); try { // 可以尝试修复数据并重新处理 handleFailedUserMessage(userMessage, exceptionMessage); } catch (Exception e) { log.error("处理用户死信消息时发生异常: {}", e.getMessage()); } }
/** * 消费订单消息的死信队列 */ @KafkaListener( topics = "order-message-topic.DLT", groupId = "dlq-order-group" ) public void consumeOrderMessageDLQ( ConsumerRecord<String, OrderMessage> record, @Header(value = KafkaHeaders.EXCEPTION_MESSAGE, required = false) String exceptionMessage) { log.error("╔════════════════════════════════════════════════════════════════╗"); log.error("║ 收到死信队列消息 - 订单消息 ║"); log.error("╚════════════════════════════════════════════════════════════════╝"); OrderMessage orderMessage = record.value(); log.error("【死信订单消息】"); log.error(" 订单ID: {}", orderMessage.getOrderId()); log.error(" 用户ID: {}", orderMessage.getUserId()); log.error(" 产品: {}", orderMessage.getProductName()); log.error(" 金额: {}", orderMessage.getAmount()); log.error(" 数量: {}", orderMessage.getQuantity()); log.error(" 状态: {}", orderMessage.getStatus()); log.error(" 创建时间: {}", orderMessage.getCreateTime()); log.error("【失败原因】"); log.error(" 异常: {}", exceptionMessage); log.error("════════════════════════════════════════════════════════════════"); try { // 处理失败的订单 handleFailedOrderMessage(orderMessage, exceptionMessage); } catch (Exception e) { log.error("处理订单死信消息时发生异常: {}", e.getMessage()); } }
// ==================== 私有处理方法 ====================
/** * 处理死信消息的通用逻辑 * * 可以执行以下操作: * 1. 保存到数据库 * 2. 发送告警通知 * 3. 记录到监控系统 * 4. 触发人工审核流程 */ private void handleDeadLetterMessage( ConsumerRecord<String, String> record, String exceptionMessage, String originalTopic) { log.info("开始处理死信消息..."); // 1. 保存到数据库(示例) saveToDatabase(record, exceptionMessage, originalTopic); // 2. 发送告警 sendAlert(record, exceptionMessage); // 3. 记录到监控系统 recordMetrics(originalTopic); log.info("死信消息处理完成"); }
/** * 处理失败的用户消息 */ private void handleFailedUserMessage(UserMessage userMessage, String exceptionMessage) { log.info("处理失败的用户消息: userId={}", userMessage.getUserId()); // 根据失败原因,执行不同的补偿逻辑 if (exceptionMessage != null && exceptionMessage.contains("参数")) { log.warn("参数错误,需要人工修复数据"); // 触发人工审核流程 } else { log.warn("系统错误,可能需要重试"); // 可以在修复系统后,重新发送到原Topic } }
/** * 处理失败的订单消息 */ private void handleFailedOrderMessage(OrderMessage orderMessage, String exceptionMessage) { log.info("处理失败的订单消息: orderId={}", orderMessage.getOrderId()); // 执行订单补偿逻辑 if (exceptionMessage != null && exceptionMessage.contains("库存")) { log.warn("库存服务失败,可能需要手动释放预占库存"); // 执行库存回滚 } else if (exceptionMessage != null && exceptionMessage.contains("金额")) { log.error("金额异常,需要人工审核"); // 冻结订单,等待人工处理 } }
/** * 保存到数据库(示例方法) */ private void saveToDatabase( ConsumerRecord<String, String> record, String exceptionMessage, String originalTopic) { log.debug("保存死信消息到数据库"); log.debug(" Topic: {}", originalTopic); log.debug(" 消息: {}", record.value()); log.debug(" 异常: {}", exceptionMessage); // 实际项目中,这里应该调用数据库服务 // Example: // DeadLetterRecord dlRecord = new DeadLetterRecord(); // dlRecord.setOriginalTopic(originalTopic); // dlRecord.setMessageContent(record.value()); // dlRecord.setExceptionMessage(exceptionMessage); // dlRecord.setCreateTime(LocalDateTime.now()); // deadLetterRepository.save(dlRecord); }
/** * 发送告警(示例方法) */ private void sendAlert(ConsumerRecord<String, String> record, String exceptionMessage) { log.warn("【告警】发现死信消息"); log.warn(" Topic: {}", record.topic()); log.warn(" 异常: {}", exceptionMessage); // 实际项目中,这里应该调用告警服务 // Example: // alertService.send(AlertLevel.ERROR, // "Kafka死信队列告警", // "Topic: " + record.topic() + ", 异常: " + exceptionMessage); // 或发送邮件、短信、钉钉等通知 }
/** * 记录监控指标(示例方法) */ private void recordMetrics(String originalTopic) { log.debug("记录死信队列指标: topic={}", originalTopic); // 实际项目中,这里应该上报到监控系统 // Example: // metricsService.increment("kafka.dlq.count", // Tags.of("topic", originalTopic)); }
/** * 重新发送消息到原Topic(用于手动修复后重试) * * 注意:需要注入KafkaTemplate */ public void retrySendToOriginalTopic(String originalTopic, String message) { log.info("重新发送消息到原Topic: {}", originalTopic); log.info("消息内容: {}", message); // 需要注入KafkaTemplate并调用send方法 // kafkaTemplate.send(originalTopic, message); log.info("消息已重新发送"); } }
|
我们配置了各种重试的内容,那么此时消费者会在抛出可重试异常的时候重试,当消息消费失败时,按照配置类中的内容自动重试了,再失败就进去DLQ
在重试过程中,如果消费者崩溃,消息可能会被重复消费,所以说重试一般配合幂等性一起实现。
而且,重试会阻塞其他消息,因为默认情况下,同一分区的消息会按顺序处理,所以重试会阻塞后续消息。这就是为什么,我考虑使用专门的重试Topic
如何防止死信队列无限增长?
1 2 3 4 5 6 7
| @Bean public NewTopic deadLetterTopic() { return TopicBuilder.name("business-topic.DLT") .config("retention.ms", "604800000") // 7天 .config("cleanup.policy", "delete") .build(); }
|
可以针对不同异常使用不同的重试策略吗?太可以了,抱着你的自定义ErrorHandler去玩吧
1 2 3 4 5 6 7 8 9 10 11 12 13
| public class CustomErrorHandler extends DefaultErrorHandler { @Override public void handleRemaining(Exception thrownException, ...) { if (thrownException instanceof NetworkException) { applyBackOff(new FixedBackOff(500, 10)); } else if (thrownException instanceof ServiceException) { applyBackOff(new ExponentialBackOff(5000, 2.0)); } super.handleRemaining(thrownException, ...); } }
|
ReplyingKafkaTemplate获得消息回复
RPC 调用知不知道,而ReplyingKafkaTemplate就是 Spring
Kafka 提供的请求 -
响应模式核心工具,很像rpc模式,能让生产者发送消息后,同步或异步接收消费者的回复
而ReplyingKafkaTemplate本质是 “生产者 + 消费者”
的封装,通过两个主题实现请求 - 响应:
- 请求主题(Request
Topic):生产者发送请求消息到该主题,消费者监听并处理。
- 响应主题(Reply
Topic):消费者处理完成后,将回复消息发送到该主题,
ReplyingKafkaTemplate内置的消费者监听并接收回复。
也就是说,流程变成这样了,生产者发送请求(Request
Topic)→消费者监听并处理→消费者发送回复(Reply
Topic)→ReplyingKafkaTemplate接收回复
它有一些专属的配置
1 2 3 4 5 6 7
| template: default-topic: request-topic reply-template: default-topic: reply-topic consumer: group-id: reply-consumer-group
|
ReplyingKafkaTemplate需要依赖ProducerFactory(生产者工厂)和ConcurrentKafkaListenerContainerFactory(回复消费者的容器工厂),需手动注册为
Spring Bean
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
| @Configuration public class KafkaReplyConfig {
@Bean public ProducerFactory<String, Object> producerFactory(Map<String, Object> producerConfigs) { return new DefaultKafkaProducerFactory<>(producerConfigs); }
@Bean public ConcurrentKafkaListenerContainerFactory<String, Object> replyContainerFactory( ConsumerFactory<String, Object> consumerFactory) { ConcurrentKafkaListenerContainerFactory<String, Object> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory); return factory; }
@Bean public ReplyingKafkaTemplate<String, Object, Object> replyingKafkaTemplate( ProducerFactory<String, Object> producerFactory, ConcurrentKafkaListenerContainerFactory<String, Object> replyContainerFactory) { ConcurrentMessageListenerContainer<String, Object> replyContainer = replyContainerFactory.createContainer("reply-topic"); replyContainer.getContainerProperties().setGroupId("reply-consumer-group"); return new ReplyingKafkaTemplate<>(producerFactory, replyContainer); } }
|
生产者通过ReplyingKafkaTemplate发送请求消息,并同步 /
异步接收回复。核心方法是sendAndReceive()(同步)和send()+
回调(异步)。
要注意,请求消息和回复消息都必须包含KafkaHeaders.CORRELATION_ID(唯一标识),否则ReplyingKafkaTemplate无法将回复与请求匹配,导致生产者永远收不到回复。
响应主题可通过两种方式指定:
- 请求头携带(
setHeader(KafkaHeaders.REPLY_TOPIC, "reply-topic")):灵活,支持动态切换主题。
- 配置文件默认(
spring.kafka.reply-template.default-topic):固定,适合单一响应主题场景。
当需要确认回复后再继续相关业务的场景,就可以使用发送请求后阻塞等待回复
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51
| @Autowired private ReplyingKafkaTemplate<String, TaskRequestDTO, TaskReplyDTO> replyingKafkaTemplate;
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
public TaskReplyDTO sendSyncRequest(TaskRequestDTO taskRequest) { try { String correlationId = "corr-" + System.currentTimeMillis() + "-" + taskRequest.getTaskId(); Message<TaskRequestDTO> requestMessage = MessageBuilder .withPayload(taskRequest) .setHeader(KafkaHeaders.CORRELATION_ID, correlationId) .setHeader(KafkaHeaders.REPLY_TOPIC, "reply-topic") .build();
log.info("发送请求消息:taskId={}, correlationId={}", taskRequest.getTaskId(), correlationId);
RequestReplyFuture<String, TaskRequestDTO, TaskReplyDTO> future = replyingKafkaTemplate.sendAndReceive(requestMessage);
Message<TaskReplyDTO> replyMessage = future.get(); TaskReplyDTO reply = replyMessage.getPayload();
log.info("收到回复消息:taskId={}, success={}, message={}", reply.getTaskId(), reply.isSuccess(), reply.getMessage());
return reply;
} catch (Exception e) { log.error("发送请求或接收回复失败:", e); TaskReplyDTO errorReply = new TaskReplyDTO(); errorReply.setTaskId(taskRequest.getTaskId()); errorReply.setSuccess(false); errorReply.setMessage("请求处理失败:" + e.getMessage()); errorReply.setTimestamp(System.currentTimeMillis()); return errorReply; } }
|
但是这个过程也可以是异步的,也就是发送请求后不阻塞,通过回调接收回复,不会影响业务的进行
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
|
public void sendAsyncRequest(TaskRequestDTO taskRequest) { String correlationId = "corr-" + System.currentTimeMillis() + "-" + taskRequest.getTaskId(); Message<TaskRequestDTO> requestMessage = MessageBuilder .withPayload(taskRequest) .setHeader(KafkaHeaders.CORRELATION_ID, correlationId) .setHeader(KafkaHeaders.REPLY_TOPIC, "reply-topic") .build();
log.info("异步发送请求消息:taskId={}, correlationId={}", taskRequest.getTaskId(), correlationId);
RequestReplyFuture<String, TaskRequestDTO, TaskReplyDTO> future = replyingKafkaTemplate.sendAndReceive(requestMessage);
future.whenComplete((replyMessage, ex) -> { if (ex == null) { TaskReplyDTO reply = replyMessage.getPayload(); log.info("异步收到回复:taskId={}, success={}", reply.getTaskId(), reply.isSuccess()); } else { log.error("异步接收回复失败:correlationId={}", correlationId, ex); } }); }
|
那么消费者需监听 “请求主题”,处理业务逻辑后,将回复消息发送到
“响应主题”,并通过KafkaHeaders.CORRELATION_ID关联请求和回复。
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55
| import lombok.extern.slf4j.Slf4j; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.kafka.annotation.KafkaListener; import org.springframework.kafka.core.KafkaTemplate; import org.springframework.kafka.support.KafkaHeaders; import org.springframework.messaging.Message; import org.springframework.messaging.handler.annotation.Header; import org.springframework.messaging.handler.annotation.Payload; import org.springframework.messaging.support.MessageBuilder; import org.springframework.stereotype.Service;
@Slf4j @Service public class KafkaReplyConsumerService {
@Autowired private KafkaTemplate<String, Object> kafkaTemplate;
@KafkaListener(topics = "request-topic", groupId = "request-consumer-group") public void handleRequest( @Payload TaskRequestDTO request, // 请求消息体 @Header(KafkaHeaders.CORRELATION_ID) String correlationId, // 请求关联ID @Header(KafkaHeaders.REPLY_TOPIC) String replyTopic) {
log.info("收到请求消息:taskId={}, correlationId={}, replyTopic={}", request.getTaskId(), correlationId, replyTopic);
TaskReplyDTO reply = processTask(request);
Message<TaskReplyDTO> replyMessage = MessageBuilder .withPayload(reply) .setHeader(KafkaHeaders.CORRELATION_ID, correlationId) .build();
kafkaTemplate.send(replyTopic, replyMessage); log.info("发送回复消息:taskId={}, correlationId={}", reply.getTaskId(), correlationId); }
private TaskReplyDTO processTask(TaskRequestDTO request) { } }
|
ReplyingKafkaTemplate内置的回复消费者组
ID(reply-consumer-group)必须唯一,不能与其他消费者组重复,否则会导致回复消息被其他消费者消费,生产者收不到回复。
而且请求和回复消息的序列化器(如JsonSerializer)和反序列化器(如JsonDeserializer)必须一致,且spring.json.trusted.packages必须包含
DTO 类的包路径,否则会报 “反序列化安全异常”。
@KafkaListener注解监听器生命周期
这里还有一点我在项目示例中没说,就是@KafkaListener注解监听器生命周期
@KafkaListener注解的监听器的生命周期是可以控制的,默认情况下,@KafkaListener的参数autoStartup = "true"。也就是自动启动消费,但是也可以同过KafkaListenerEndpointRegistry来干预他的生命周期。
KafkaListenerEndpointRegistry是 Spring 提供的
“监听器注册表”,所有通过@KafkaListener定义的监听器都会被注册到这里,通过它可统一管理生命周期。
KafkaListenerEndpointRegistry有三个动作方法分别如:start(),pause(),resume()启动,停止,继续。
也就是说,可以根据这个实现按需启停消费
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
| @KafkaListener`的`autoStartup`参数决定监听器是否随 Spring 启动而自动启动,默认值为`"true"`(字符串类型,支持 SpEL 表达式)。若需手动控制启动(如延迟消费),可将`autoStartup`设为`"false"
@KafkaListener(topics = "auto-start-topic", groupId = "auto-group", id = "auto-listener") public void autoStartConsumer(String message) { log.info("自动启动监听器接收:{}", message); }
@KafkaListener( topics = "manual-start-topic", groupId = "manual-group", id = "manual-listener", // 必须指定id,否则无法通过registry定位 autoStartup = "false" // 关闭自动启动 ) public void manualStartConsumer(String message) { log.info("手动启动监听器接收:{}", message); }
|
如果关闭了自动启动,必须给监听器指定id(如id = "manual-listener"),KafkaListenerEndpointRegistry需通过id找到对应的监听器实例。
start
就是启动监听器(start()),用于启动autoStartup = "false"的监听器,或重启已停止的监听器。
暂停监听器(pause())就是用于暂停监听器的消费(暂停后不再从
Kafka 拉取消息,但不会关闭容器)。暂停后,Kafka 的消费者组会认为该消费者
“暂时不可用”,不会重新分配分区;恢复后可继续从暂停前的 offset
消费,不会丢失消息。
用于恢复被暂停的监听器,继续从暂停前的 offset 消费消息。
一般情况下,通过健康检查(如 Spring Boot
Actuator)监测下游服务状态,故障时暂停消费,是为数不多用到这个的场景
1 2 3 4 5 6 7 8 9 10 11 12
| @Scheduled(fixedRate = 30000) public void checkDownstreamHealth() { boolean isHealthy = downstreamHealthService.check("inventory-service"); if (!isHealthy && !registry.getListenerContainer("order-listener").isPaused()) { log.warn("库存服务故障,暂停订单消息消费"); lifecycleService.pauseListener("order-listener"); } else if (isHealthy && registry.getListenerContainer("order-listener").isPaused()) { log.info("库存服务恢复,恢复订单消息消费"); lifecycleService.resumeListener("order-listener"); } }
|
SendTo消息转发
在 Spring Kafka 中,@SendTo注解是实现 “消息自动转发”
的核心工具,它是简化消息转发的核心注解,通过 “声明式配置”
一定程度的替代了KafkaTemplate.send()手动发送逻辑。
其实除了做发送响应语义外,@SendTo注解还可以带一个参数,指定转发的Topic队列。
最常见的就是消息多重加工
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
| @Slf4j @Service public class OrderForwardConsumer {
@KafkaListener(topics = "order-create-topic", groupId = "order-group") @SendTo("order-notify-topic") public NotifyMessage handleOrder(OrderMessage order) { log.info("处理订单:{}", order.getOrderId()); NotifyMessage notify = new NotifyMessage(); notify.setOrderId(order.getOrderId()); notify.setUserId(order.getUserId()); notify.setContent("您的订单已创建,订单号:" + order.getOrderId()); notify.setCreateTime(LocalDateTime.now()); return notify; } }
|
你也可以在转发的消息中添加自定义头信息,转发给别人的时候返回Message对象
注意返回值的类型需与转发主题的消息类型匹配
通过 SpEL
表达式动态指定转发主题(如根据消息内容中的字段动态选择主题),适合需要按业务规则路由的场景,懂得都懂,不演示了
结合上面的内容,在请求 -
响应模式中(使用ReplyingKafkaTemplate),@SendTo可自动将回复消息发送到请求消息指定的响应主题,无需手动获取REPLY_TOPIC头信息,极大简化代码。
之前讲解ReplyingKafkaTemplate时,消费者需要手动获取REPLY_TOPIC头信息并发送回复:
1 2 3 4 5
| @KafkaListener(topics = "request-topic") public void handleRequest( @Payload TaskRequestDTO request, @Header(KafkaHeaders.REPLY_TOPIC) String replyTopic, // 手动获取响应主题 @Header(KafkaHeaders.CORRELATION_ID) String correlationId) {
|
@SendTo可自动识别请求消息中的REPLY_TOPIC头信息,将返回值作为回复消息发送到该主题,且自动携带CORRELATION_ID:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
| @Service public class RequestReplyConsumer {
@KafkaListener(topics = "request-topic", groupId = "request-reply-group") @SendTo public TaskReplyDTO handleRequest(TaskRequestDTO request) { log.info("处理请求:{}", request.getTaskId()); TaskReplyDTO reply = new TaskReplyDTO(); reply.setTaskId(request.getTaskId()); reply.setSuccess(true); reply.setMessage("请求处理完成"); return reply; } }
|
而@SendTo无参数时,Spring Kafka
会自动读取请求消息中的KafkaHeaders.REPLY_TOPIC头信息,将回复发送到该主题。回复消息会自动携带原请求的CORRELATION_ID,无敌了