From 68c0d0c6444bd3d37e2ebd4d00ef10a09380d400 Mon Sep 17 00:00:00 2001 From: Wu Shilin Date: Thu, 25 Jul 2024 20:36:05 +0800 Subject: [PATCH] Fix consumer timeout after broker restart --- .../internal/ConsumerOffsetCommitter.java | 15 ++++++++++++++- 1 file changed, 14 insertions(+), 1 deletion(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter.java index c4393fab1..8b4e46241 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter.java @@ -90,7 +90,20 @@ protected void commitOffsets(final Map offset switch (commitMode) { case PERIODIC_CONSUMER_SYNC -> { log.debug("Committing offsets Sync"); - consumerMgr.commitSync(offsetsToSend); + while(true) { + try { + consumerMgr.commitSync(offsetsToSend); + //break when offset commit is okay, never throw exception to the main threads + break; + } catch(Throwable t) { + log.error("Failed to commit offset. Retrying in 10 seconds", t); + try { + Thread.sleep(10000L); + } catch(InterruptedException ite) { + log.info("Giving up offset commit due to interruption"); + } + } + } } case PERIODIC_CONSUMER_ASYNCHRONOUS -> { //