From 410ec36059aeabdd58fbd933a2db4c7a6eddc5e9 Mon Sep 17 00:00:00 2001 From: Allan-QLB Date: Mon, 13 Mar 2023 23:23:26 +0800 Subject: [PATCH] Fix test failure for #159 --- .../extension/io/kafka/source/ConsumerKafkaGroup.java | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java b/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java index 80ce59df..001901ac 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/source/ConsumerKafkaGroup.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.List; import java.util.Properties; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -114,7 +115,14 @@ void shutdown() { void run() { try { for (KafkaConsumerThread consumerThread : kafkaConsumerThreadList) { - futureList.add(executorService.submit(consumerThread)); + futureList.add(CompletableFuture.runAsync(consumerThread, executorService).whenComplete( + (ignored, throwable) -> { + if (throwable != null) { + LOG.error("KafkaConsumerThread for topic(s):{} terminated unexpectedly!", + Arrays.toString(topics), throwable); + } + } + )); } } catch (Throwable t) { LOG.error("Error while creating KafkaConsumerThread for topic(s): " + Arrays.toString(topics), t);