From 269537f1aa731c9dfdd01434a76fd5e8a8fa9ef2 Mon Sep 17 00:00:00 2001 From: Wu Shilin Date: Thu, 25 Jul 2024 20:37:09 +0800 Subject: [PATCH] Fix consumer timeout issue after broker restart --- .../internal/ConsumerManager.java | 16 +++++++++++++++- 1 file changed, 15 insertions(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java index fc4b7e53f..cad01e350 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java @@ -94,7 +94,21 @@ public void commitSync(final Map offsetsToSen noWakeups++; while (inProgress) { try { - consumer.commitSync(offsetsToSend); + while(true) { + try { + consumer.commitSync(offsetsToSend); + // break when offset commit is okay. Do not throw exception to main threads + break; + } catch(Throwable t) { + log.error("Failed to commit offset. Retrying in 10 seconds", t); + try { + Thread.sleep(10000L); + } catch(InterruptedException ite) { + log.info("Giving up offset commit due to interruption"); + break; + } + } + } inProgress = false; } catch (WakeupException w) { log.debug("Got woken up, retry. errors: " + erroneousWakups + " none: " + noWakeups + " correct:" + correctPollWakeups, w);