From 16b8ccfe06b9e7fb27711d7323e41691d7f64bb3 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Thu, 5 Dec 2024 13:17:43 +0900 Subject: [PATCH] Handling thread sync when evenhandler is updated with a new queue --- .../eventqueue/KafkaObservableQueue.java | 51 ++++++++++++------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index 8a65692f8..4c798cf6d 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -456,7 +456,7 @@ public void start() { } @Override - public void stop() { + public synchronized void stop() { LOGGER.info("Kafka consumer stopping for topic: {}", topic); if (!running) { LOGGER.warn("KafkaObservableQueue is already stopped for topic: {}", topic); @@ -488,39 +488,56 @@ public void stop() { } private void retryCloseConsumer() { - int retries = 3; - while (retries > 0) { + int attempts = 3; + while (attempts > 0) { try { + kafkaConsumer.unsubscribe(); kafkaConsumer.close(); - LOGGER.info("Kafka consumer closed successfully on retry."); - return; + LOGGER.info("Kafka consumer stopped for topic: {}", topic); + return; // Exit if successful } catch (Exception e) { - retries--; LOGGER.warn( - "Retry failed to close Kafka consumer. Remaining attempts: {}", retries, e); - if (retries == 0) { - LOGGER.error("Exhausted retries for closing Kafka consumer."); + "Error stopping Kafka consumer for topic: {}, attempts remaining: {}", + topic, + attempts - 1, + e); + attempts--; + try { + Thread.sleep(1000); // Wait before retrying + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.error("Thread interrupted during Kafka consumer shutdown retries"); + break; } } } + LOGGER.error("Failed to stop Kafka consumer for topic: {} after retries", topic); } private void retryCloseProducer() { - int retries = 3; - while (retries > 0) { + int attempts = 3; + while (attempts > 0) { try { kafkaProducer.close(); - LOGGER.info("Kafka producer closed successfully on retry."); - return; + LOGGER.info("Kafka producer stopped for topic: {}", topic); + return; // Exit if successful } catch (Exception e) { - retries--; LOGGER.warn( - "Retry failed to close Kafka producer. Remaining attempts: {}", retries, e); - if (retries == 0) { - LOGGER.error("Exhausted retries for closing Kafka producer."); + "Error stopping Kafka producer for topic: {}, attempts remaining: {}", + topic, + attempts - 1, + e); + attempts--; + try { + Thread.sleep(1000); // Wait before retrying + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.error("Thread interrupted during Kafka producer shutdown retries"); + break; } } } + LOGGER.error("Failed to stop Kafka producer for topic: {} after retries", topic); } @Override