From 5ce0b2f69f09f73cecad7e8b1c623c0c42c4ea28 Mon Sep 17 00:00:00 2001 From: gtassone Date: Wed, 14 Aug 2024 11:28:25 -0400 Subject: [PATCH] fix: safely completing doClose() (#818) Fixes #809 --- .../AbstractParallelEoSStreamProcessor.java | 48 ++++++++++++++----- 1 file changed, 35 insertions(+), 13 deletions(-) diff --git a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java index 719c94cb8..cdad98300 100644 --- a/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java +++ b/parallel-consumer-core/src/main/java/io/confluent/parallelconsumer/internal/AbstractParallelEoSStreamProcessor.java @@ -629,6 +629,25 @@ private void waitForClose(Duration timeout) throws TimeoutException, ExecutionEx } private void doClose(Duration timeout) throws TimeoutException, ExecutionException, InterruptedException { + // fixes github issue #809 - ensure doClose() state transition to CLOSED + // by catching unhandled exceptions in subsystems during close + try { + innerDoClose(timeout); + } catch (Exception e) { + log.error("exception during close", e); + throw e; + } finally { + deregisterMeters(); + pcMetrics.close(); + log.debug("Close complete."); + this.state = CLOSED; + if (this.getFailureCause() != null) { + log.error("PC closed due to error: {}", getFailureCause(), null); + } + } + } + + private void innerDoClose(Duration timeout) throws TimeoutException, ExecutionException, InterruptedException { log.debug("Starting close process (state: {})...", state); // Drain and pause polling - keeps consumer alive for later commit, but paused @@ -678,23 +697,26 @@ private void doClose(Duration timeout) throws TimeoutException, ExecutionExcepti if (Thread.currentThread().isInterrupted()) { log.warn("control thread interrupted - may lead to issues with transactional commit lock acquisition"); } - commitOffsetsThatAreReady(); - + try { + commitOffsetsThatAreReady(); + } catch (Exception e) { + log.warn("failed to commit during close sequence", e); + } // only close consumer once producer has committed it's offsets (tx'l) log.debug("Closing and waiting for broker poll system..."); - brokerPollSubsystem.closeAndWait(); + try { + brokerPollSubsystem.closeAndWait(); + } catch (Exception e) { + log.warn("failed to close brokerPollSubsystem during close sequence", e); + } - maybeCloseConsumer(); + try { + maybeCloseConsumer(); + } catch (Exception e) { + log.warn("failed to maybeCloseConsumer during close sequence", e); + } producerManager.ifPresent(x -> x.close(timeout)); - deregisterMeters(); - pcMetrics.close(); - log.debug("Close complete."); - this.state = CLOSED; - - if (this.getFailureCause() != null) { - log.error("PC closed due to error: {}", getFailureCause(), null); - } } /** @@ -1482,4 +1504,4 @@ private void clearCommitCommand() { } } -} +} \ No newline at end of file