diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter.java index c4393fab1..8b4e46241 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerOffsetCommitter.java @@ -90,7 +90,20 @@ protected void commitOffsets(final Map offset switch (commitMode) { case PERIODIC_CONSUMER_SYNC -> { log.debug("Committing offsets Sync"); - consumerMgr.commitSync(offsetsToSend); + while(true) { + try { + consumerMgr.commitSync(offsetsToSend); + //break when offset commit is okay, never throw exception to the main threads + break; + } catch(Throwable t) { + log.error("Failed to commit offset. Retrying in 10 seconds", t); + try { + Thread.sleep(10000L); + } catch(InterruptedException ite) { + log.info("Giving up offset commit due to interruption"); + } + } + } } case PERIODIC_CONSUMER_ASYNCHRONOUS -> { //