Skip to content

Commit

Permalink
fix: safely completing doClose() (confluentinc#818)
Browse files Browse the repository at this point in the history
  • Loading branch information
gtassone authored and skl83 committed Sep 2, 2024
1 parent ab277c9 commit 23cf4d3
Showing 1 changed file with 35 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -629,6 +629,25 @@ private void waitForClose(Duration timeout) throws TimeoutException, ExecutionEx
}

private void doClose(Duration timeout) throws TimeoutException, ExecutionException, InterruptedException {
// fixes github issue #809 - ensure doClose() state transition to CLOSED
// by catching unhandled exceptions in subsystems during close
try {
innerDoClose(timeout);
} catch (Exception e) {
log.error("exception during close", e);
throw e;
} finally {
deregisterMeters();
pcMetrics.close();
log.debug("Close complete.");
this.state = CLOSED;
if (this.getFailureCause() != null) {
log.error("PC closed due to error: {}", getFailureCause(), null);
}
}
}

private void innerDoClose(Duration timeout) throws TimeoutException, ExecutionException, InterruptedException {
log.debug("Starting close process (state: {})...", state);

// Drain and pause polling - keeps consumer alive for later commit, but paused
Expand Down Expand Up @@ -678,23 +697,26 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti
if (Thread.currentThread().isInterrupted()) {
log.warn("control thread interrupted - may lead to issues with transactional commit lock acquisition");
}
commitOffsetsThatAreReady();

try {
commitOffsetsThatAreReady();
} catch (Exception e) {
log.warn("failed to commit during close sequence", e);
}
// only close consumer once producer has committed it's offsets (tx'l)
log.debug("Closing and waiting for broker poll system...");
brokerPollSubsystem.closeAndWait();
try {
brokerPollSubsystem.closeAndWait();
} catch (Exception e) {
log.warn("failed to close brokerPollSubsystem during close sequence", e);
}

maybeCloseConsumer();
try {
maybeCloseConsumer();
} catch (Exception e) {
log.warn("failed to maybeCloseConsumer during close sequence", e);
}

producerManager.ifPresent(x -> x.close(timeout));
deregisterMeters();
pcMetrics.close();
log.debug("Close complete.");
this.state = CLOSED;

if (this.getFailureCause() != null) {
log.error("PC closed due to error: {}", getFailureCause(), null);
}
}

/**
Expand Down Expand Up @@ -1482,4 +1504,4 @@ private void clearCommitCommand() {
}
}

}
}

0 comments on commit 23cf4d3

Please sign in to comment.