@@ -218,6 +218,104 @@ acks 的默认值即为 1,代表我们的消息被 leader 副本接收之后
218218 - 处理完消息再提交:依旧有消息重复消费的风险,和自动提交一样
219219 - 拉取到消息即提交:会有消息丢失的风险。允许消息延时的场景,一般会采用这种方式。然后,通过定时任务在业务不繁忙(比如凌晨)的时候做数据兜底。
220220
221+ # kafka的重试机制
222+ 网上关于spring kafka的默认重试机制文章很多,但大多都是过时的,和实际运行结果完全不一样。以下代码是根据 spring-kafka-2.9.3 源码重新梳理一下。
223+
224+ ## 消费失败会怎么样
225+ 在消费过程中,当其中一个消息消费异常时,会不会卡住后续队列消息的消费?这样业务岂不是卡住了?
226+
227+ 生产者代码:
228+ ``` Java
229+ for (int i = 0 ; i < 10 ; i++ ) {
230+ kafkaTemplate. send(KafkaConst . TEST_TOPIC , String . valueOf(i))
231+ }
232+ ```
233+ 消费者消代码:
234+ ``` Java
235+ @KafkaListener (topics = {KafkaConst . TEST_TOPIC },groupId = " apple" )
236+ private void customer(String message) throws InterruptedException {
237+ log. info(" kafka customer:{}" ,message);
238+ Integer n = Integer . parseInt(message);
239+ if (n% 5 == 0 ){
240+ throw new RuntimeException ();
241+ }
242+ }
243+ ```
244+
245+ 在默认配置下,当消费异常会进行重试,重试多次后会跳过当前消息,继续进行后续消息的消费,不会一直卡在当前消息。下面是一段消费的日志,可以看出当 test-0@95 重试多次后会被跳过。
246+ ``` Java
247+ 2023 - 08- 10 12 : 03 : 32.918 DEBUG 9700 -- - [ntainer#0 - 0 - C - 1 ] o.s.kafka.listener. DefaultErrorHandler : Skipping seek of: test- 0 @95
248+ 2023 - 08- 10 12 : 03 : 32.918 TRACE 9700 -- - [ntainer#0 - 0 - C - 1 ] o.s.kafka.listener. DefaultErrorHandler : Seeking : test- 0 to: 96
249+ 2023 - 08- 10 12 : 03 : 32.918 INFO 9700 -- - [ntainer#0 - 0 - C - 1 ] o.a.k.clients.consumer. KafkaConsumer : [Consumer clientId= consumer- apple- 1 , groupId= apple] Seeking to offset 96 for partition test- 0
250+
251+ ```
252+
253+ ## 默认会重试多少次?
254+ 默认配置下,消费异常会进行重试,重试次数是多少, 重试是否有时间间隔?
255+ 10次。看源码 FailedRecordTracker 类有个 recovered 函数,返回 Boolean 值判断是否要进行重试,下面是这个函数中判断是否重试的逻辑:
256+ ``` Java
257+ FailedRecord failedRecord = getFailedRecordInstance(record, exception, map, topicPartition);
258+ this . retryListeners. forEach(rl - >
259+ rl. failedDelivery(record, exception, failedRecord. getDeliveryAttempts(). get()));
260+ long nextBackOff = failedRecord. getBackOffExecution(). nextBackOff();
261+ if (nextBackOff != BackOffExecution . STOP ) {
262+ this . backOffHandler. onNextBackOff(container, exception, nextBackOff);
263+ return false ;
264+ }
265+ ```
266+ 其中 BackOffExecution.STOP 的值为-1,nextBackOff 的值调用 BackOff 类的 nextBackOff() 函数。如果当前执行次数大于最大执行次数则返回 STOP,既超过这个最大执行次数后才会停止重试。
267+ ```
268+ public long nextBackOff() {
269+ this.currentAttempts++;
270+ if (this.currentAttempts <= getMaxAttempts()) {
271+ return getInterval();
272+ }
273+ else {
274+ return STOP;
275+ }
276+ }
277+ ```
278+ 那么这个 getMaxAttempts 的值又是多少呢?回到最开始,当执行出错会进入 DefaultErrorHandler 。DefaultErrorHandler 默认的构造函数是:
279+ ``` Java
280+ public DefaultErrorHandler() {
281+ this (null , SeekUtils . DEFAULT_BACK_OFF );
282+ }
283+ ```
284+ SeekUtils.DEFAULT_BACK_OFF 定义的是
285+ ``` Java
286+ public static final int DEFAULT_MAX_FAILURES = 10 ;
287+
288+ public static final FixedBackOff DEFAULT_BACK_OFF = new FixedBackOff (0 , DEFAULT_MAX_FAILURES - 1 );
289+ ```
290+ DEFAULT_MAX_FAILURES 的值是10,currentAttempts从0到9,所以总共会执行10次,每次重试的时间间隔为0。
291+
292+ ## 重试失败后的数据如何再次处理
293+
294+ 当达到最大重试次数后,数据会直接被跳过,继续向后进行。当代码修复后,如何重新消费这些重试失败的数据呢?
295+ 死信队列(Dead Letter Queue,简称DLQ)是消息中间件中的一种特殊队列。它主要用于处理无法被消费者正确处理的消息,通常是因为消息格式错误、处理失败、消费超时等情况导致的消息被"丢弃"或"死亡"的情况。
296+ 当消息进入队列后,消费者会尝试处理它。如果处理失败,或者超过一定的重试次数仍无法被成功处理,消息可以发送到死信队列中,而不是被永久性地丢弃。在死信队列中,可以进一步分析、处理这些无法正常消费的消息,以便定位问题、修复错误,并采取适当的措施。
297+
298+ spring kafka 中只需要加上 ` @DltHandler ` 注解即可将重试失败的消息推到死信队列,死信队列的topic是在原 topic 后加上 '.DLT'。然后开启新的消费者消费死信队列即可。
299+ ``` Java
300+ @DltHandler
301+ @KafkaListener (topics = {KafkaConst . TEST_TOPIC }, groupId = " apple" )
302+ private void customer(String message) {
303+ log. info(" kafka customer:{}" , message);
304+ Integer n = Integer . parseInt(message);
305+ if (n % 5 == 0 ) {
306+ throw new RuntimeException ();
307+ }
308+ System . out. println(n);
309+ }
310+
311+ @KafkaListener (topics = {KafkaConst . TEST_TOPIC + " .DLT" }, groupId = " apple" )
312+ private void delCustomer(String message) {
313+ //
314+ }
315+ ```
316+ ## 如何自定义重试次数,以及时间间隔
317+ "......,未完待续。"
318+
221319### Reference
222320
223321- Kafka 官方文档:https://kafka.apache.org/documentation/
0 commit comments