Skip to content

Commit

Permalink
Fix consumer timeout issue after broker restart
Browse files Browse the repository at this point in the history
  • Loading branch information
wushilin authored Jul 25, 2024
1 parent 68c0d0c commit 269537f
Showing 1 changed file with 15 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,21 @@ public void commitSync(final Map<TopicPartition, OffsetAndMetadata> offsetsToSen
noWakeups++;
while (inProgress) {
try {
consumer.commitSync(offsetsToSend);
while(true) {
try {
consumer.commitSync(offsetsToSend);
// break when offset commit is okay. Do not throw exception to main threads
break;
} catch(Throwable t) {
log.error("Failed to commit offset. Retrying in 10 seconds", t);
try {
Thread.sleep(10000L);
} catch(InterruptedException ite) {
log.info("Giving up offset commit due to interruption");
break;
}
}
}
inProgress = false;
} catch (WakeupException w) {
log.debug("Got woken up, retry. errors: " + erroneousWakups + " none: " + noWakeups + " correct:" + correctPollWakeups, w);
Expand Down

0 comments on commit 269537f

Please sign in to comment.