diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java index fc4b7e53f..cad01e350 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/ConsumerManager.java @@ -94,7 +94,21 @@ public void commitSync(final Map offsetsToSen noWakeups++; while (inProgress) { try { - consumer.commitSync(offsetsToSend); + while(true) { + try { + consumer.commitSync(offsetsToSend); + // break when offset commit is okay. Do not throw exception to main threads + break; + } catch(Throwable t) { + log.error("Failed to commit offset. Retrying in 10 seconds", t); + try { + Thread.sleep(10000L); + } catch(InterruptedException ite) { + log.info("Giving up offset commit due to interruption"); + break; + } + } + } inProgress = false; } catch (WakeupException w) { log.debug("Got woken up, retry. errors: " + erroneousWakups + " none: " + noWakeups + " correct:" + correctPollWakeups, w);