From 02df0fc270fa228023a5211186d48f764498274e Mon Sep 17 00:00:00 2001 From: rmsamitha Date: Tue, 31 Oct 2023 13:36:12 +0530 Subject: [PATCH] Fix duplicate event issue when data flow is slow --- .../siddhi/extension/io/kafka/source/KafkaConsumerThread.java | 1 - .../java/io/siddhi/extension/io/kafka/source/KafkaSource.java | 4 +--- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaConsumerThread.java b/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaConsumerThread.java index 23092b3d..77ba5809 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaConsumerThread.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaConsumerThread.java @@ -126,7 +126,6 @@ void pause() { } void resume() { - restore(); paused = false; try { lock.lock(); diff --git a/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaSource.java b/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaSource.java index 3b8a8253..600f5dd1 100644 --- a/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaSource.java +++ b/component/src/main/java/io/siddhi/extension/io/kafka/source/KafkaSource.java @@ -440,10 +440,8 @@ public void pause() { public void resume() { if (consumerKafkaGroup != null) { consumerKafkaGroup.resume(); - if (LOG.isDebugEnabled()) { - LOG.debug("Kafka Adapter resumed for topic(s): " + optionHolder.validateAndGetStaticValue + LOG.info("Kafka Adapter resumed for topic(s): " + optionHolder.validateAndGetStaticValue (ADAPTOR_SUBSCRIBER_TOPIC)); - } } }