diff --git a/docs/documentation/configuration/eventhandlers.md b/docs/documentation/configuration/eventhandlers.md
index 4a0c6edc8..f02b0c1f1 100644
--- a/docs/documentation/configuration/eventhandlers.md
+++ b/docs/documentation/configuration/eventhandlers.md
@@ -3,13 +3,13 @@ Eventing in Conductor provides for loose coupling between workflows and support
This includes:
-1. Being able to produce an event (message) in an external system like SQS or internal to Conductor.
+1. Being able to produce an event (message) in an external system like SQS, Kafka or internal to Conductor.
2. Start a workflow when a specific event occurs that matches the provided criteria.
Conductor provides SUB_WORKFLOW task that can be used to embed a workflow inside parent workflow. Eventing supports provides similar capability without explicitly adding dependencies and provides **fire-and-forget** style integrations.
## Event Task
-Event task provides ability to publish an event (message) to either Conductor or an external eventing system like SQS. Event tasks are useful for creating event based dependencies for workflows and tasks.
+Event task provides ability to publish an event (message) to either Conductor or an external eventing system like SQS or Kafka. Event tasks are useful for creating event based dependencies for workflows and tasks.
See [Event Task](workflowdef/systemtasks/event-task.md) for documentation.
@@ -20,7 +20,7 @@ Event handlers are listeners registered that executes an action when a matching
2. Fail a Task
3. Complete a Task
-Event Handlers can be configured to listen to Conductor Events or an external event like SQS.
+Event Handlers can be configured to listen to Conductor Events or an external event like SQS or Kafka.
## Configuration
Event Handlers are configured via ```/event/``` APIs.
diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md
new file mode 100644
index 000000000..5c9e71719
--- /dev/null
+++ b/kafka-event-queue/README.md
@@ -0,0 +1,133 @@
+# Event Queue
+
+## Published Artifacts
+
+Group: `com.netflix.conductor`
+
+| Published Artifact | Description |
+| ----------- | ----------- |
+| conductor-kafka-event-queue | Support for integration with Kafka and consume events from it. |
+
+## Modules
+
+### Kafka
+
+https://kafka.apache.org/
+
+## kafka-event-queue
+
+Provides ability to consume messages from Kafka.
+
+## Usage
+
+To use it in an event handler prefix the event with `kafka` followed by the topic.
+
+Example:
+
+```json
+{
+ "name": "kafka_test_event_handler",
+ "event": "kafka:conductor-event",
+ "actions": [
+ {
+ "action": "start_workflow",
+ "start_workflow": {
+ "name": "workflow_triggered_by_kafka",
+ "input": {
+ "payload": "${payload}"
+ }
+ },
+ "expandInlineJSON": true
+ }
+ ],
+ "active": true
+}
+```
+
+The data from the kafka event has the format:
+
+```json
+{
+ "key": "key-1",
+ "headers": {
+ "header-1": "value1"
+ },
+ "payload": {
+ "first": "Marcelo",
+ "middle": "Billie",
+ "last": "Mertz"
+ }
+}
+```
+
+* `key` is the key field in Kafka message.
+* `headers` is the headers in the kafka message.
+* `payload` is the message of the Kafka message.
+
+To access them in the event handler use for example `"${payload}"` to access the payload property, which contains the kafka message data.
+
+## Configuration
+
+To enable the queue use set the following to true.
+
+```properties
+conductor.event-queues.kafka.enabled=true
+```
+
+There are is a set of shared properties these are:
+
+```properties
+# If kafka should be used with event queues like SQS or AMPQ
+conductor.default-event-queue.type=kafka
+
+# the bootstrap server ot use.
+conductor.event-queues.kafka.bootstrap-servers=kafka:29092
+
+# The dead letter queue to use for events that had some error.
+conductor.event-queues.kafka.dlq-topic=conductor-dlq
+
+# topic prefix combined with conductor.default-event-queue.type
+conductor.event-queues.kafka.listener-queue-prefix=conductor_
+
+# The polling duration. Start at 500ms and reduce based on how your environment behaves.
+conductor.event-queues.kafka.poll-time-duration=500ms
+```
+
+There are 3 clients that should be configured, there is the Consumer, responsible to consuming messages, Publisher that publishes messages to Kafka and the Admin which handles admin operations.
+
+The supported properties for the 3 clients are the ones included in `org.apache.kafka:kafka-clients:3.5.1` for each client type.
+
+## Consumer properties
+
+Example of consumer settings.
+
+```properties
+conductor.event-queues.kafka.consumer.client.id=consumer-client
+conductor.event-queues.kafka.consumer.auto.offset.reset=earliest
+conductor.event-queues.kafka.consumer.enable.auto.commit=false
+conductor.event-queues.kafka.consumer.fetch.min.bytes=1
+conductor.event-queues.kafka.consumer.max.poll.records=500
+conductor.event-queues.kafka.consumer.group-id=conductor-group
+```
+
+## Producer properties
+
+Example of producer settings.
+
+```properties
+conductor.event-queues.kafka.producer.client.id=producer-client
+conductor.event-queues.kafka.producer.acks=all
+conductor.event-queues.kafka.producer.retries=5
+conductor.event-queues.kafka.producer.batch.size=16384
+conductor.event-queues.kafka.producer.linger.ms=10
+conductor.event-queues.kafka.producer.compression.type=gzip
+```
+
+## Admin properties
+
+Example of admin settings.
+
+```properties
+conductor.event-queues.kafka.admin.client.id=admin-client
+conductor.event-queues.kafka.admin.connections.max.idle.ms=10000
+```
diff --git a/kafka-event-queue/build.gradle b/kafka-event-queue/build.gradle
new file mode 100644
index 000000000..5b998696b
--- /dev/null
+++ b/kafka-event-queue/build.gradle
@@ -0,0 +1,44 @@
+dependencies {
+ // Core Conductor dependencies
+ implementation project(':conductor-common')
+ implementation project(':conductor-core')
+
+ // Spring Boot support
+ implementation 'org.springframework.boot:spring-boot-starter'
+
+ // Apache Commons Lang for utility classes
+ implementation 'org.apache.commons:commons-lang3'
+
+ // Reactive programming support with RxJava
+ implementation "io.reactivex:rxjava:${revRxJava}"
+
+ // SBMTODO: Remove Guava dependency if possible
+ // Guava should only be included if specifically needed
+ implementation "com.google.guava:guava:${revGuava}"
+
+ // Removed AWS SQS SDK as we are transitioning to Kafka
+ // implementation "com.amazonaws:aws-java-sdk-sqs:${revAwsSdk}"
+
+ // Test dependencies
+ testImplementation 'org.springframework.boot:spring-boot-starter-test'
+ testImplementation project(':conductor-common').sourceSets.test.output
+
+
+ // Add Kafka client dependency
+ implementation 'org.apache.kafka:kafka-clients:3.5.1'
+
+ // Add SLF4J API for logging
+ implementation 'org.slf4j:slf4j-api:2.0.9'
+
+ // Add SLF4J binding for logging with Logback
+ runtimeOnly 'ch.qos.logback:logback-classic:1.4.11'
+}
+
+// test {
+// testLogging {
+// events "passed", "skipped", "failed"
+// showStandardStreams = true // Enable standard output
+// }
+// }
+
+
diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java
new file mode 100644
index 000000000..810e40cf7
--- /dev/null
+++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java
@@ -0,0 +1,102 @@
+/*
+ * Copyright 2024 Conductor Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafkaeq.config;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Optional;
+
+import org.apache.commons.lang3.StringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.netflix.conductor.core.config.ConductorProperties;
+import com.netflix.conductor.core.events.EventQueueProvider;
+import com.netflix.conductor.core.events.queue.ObservableQueue;
+import com.netflix.conductor.kafkaeq.eventqueue.KafkaObservableQueue.Builder;
+import com.netflix.conductor.model.TaskModel.Status;
+
+@Configuration
+@EnableConfigurationProperties(KafkaEventQueueProperties.class)
+@ConditionalOnProperty(name = "conductor.event-queues.kafka.enabled", havingValue = "true")
+public class KafkaEventQueueConfiguration {
+
+ @Autowired private KafkaEventQueueProperties kafkaProperties;
+
+ private static final Logger LOGGER =
+ LoggerFactory.getLogger(KafkaEventQueueConfiguration.class);
+
+ public KafkaEventQueueConfiguration(KafkaEventQueueProperties kafkaProperties) {
+ this.kafkaProperties = kafkaProperties;
+ }
+
+ @Bean
+ public EventQueueProvider kafkaEventQueueProvider() {
+ return new KafkaEventQueueProvider(kafkaProperties);
+ }
+
+ @ConditionalOnProperty(
+ name = "conductor.default-event-queue.type",
+ havingValue = "kafka",
+ matchIfMissing = false)
+ @Bean
+ public Map getQueues(
+ ConductorProperties conductorProperties, KafkaEventQueueProperties properties) {
+ try {
+
+ LOGGER.debug(
+ "Starting to create KafkaObservableQueues with properties: {}", properties);
+
+ String stack =
+ Optional.ofNullable(conductorProperties.getStack())
+ .filter(stackName -> stackName.length() > 0)
+ .map(stackName -> stackName + "_")
+ .orElse("");
+
+ LOGGER.debug("Using stack: {}", stack);
+
+ Status[] statuses = new Status[] {Status.COMPLETED, Status.FAILED};
+ Map queues = new HashMap<>();
+
+ for (Status status : statuses) {
+ // Log the status being processed
+ LOGGER.debug("Processing status: {}", status);
+
+ String queuePrefix =
+ StringUtils.isBlank(properties.getListenerQueuePrefix())
+ ? conductorProperties.getAppId() + "_kafka_notify_" + stack
+ : properties.getListenerQueuePrefix();
+
+ LOGGER.debug("queuePrefix: {}", queuePrefix);
+
+ String topicName = queuePrefix + status.name();
+
+ LOGGER.debug("topicName: {}", topicName);
+
+ final ObservableQueue queue = new Builder(properties).build(topicName);
+ queues.put(status, queue);
+ }
+
+ LOGGER.debug("Successfully created queues: {}", queues);
+ return queues;
+ } catch (Exception e) {
+ LOGGER.error("Failed to create KafkaObservableQueues", e);
+ throw new RuntimeException("Failed to getQueues on KafkaEventQueueConfiguration", e);
+ }
+ }
+}
diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java
new file mode 100644
index 000000000..2da3c81ff
--- /dev/null
+++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2024 Conductor Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafkaeq.config;
+
+import java.time.Duration;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.kafka.clients.admin.AdminClientConfig;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.logging.log4j.core.config.plugins.validation.constraints.NotBlank;
+import org.springframework.boot.context.properties.ConfigurationProperties;
+import org.springframework.validation.annotation.Validated;
+
+@ConfigurationProperties("conductor.event-queues.kafka")
+@Validated
+public class KafkaEventQueueProperties {
+
+ /** Kafka bootstrap servers (comma-separated). */
+ @NotBlank(message = "Bootstrap servers must not be blank")
+ private String bootstrapServers = "kafka:29092";
+
+ /** Dead Letter Queue (DLQ) topic for failed messages. */
+ private String dlqTopic = "conductor-dlq";
+
+ /** Prefix for dynamically created Kafka topics, if applicable. */
+ private String listenerQueuePrefix = "";
+
+ /** The polling interval for Kafka (in milliseconds). */
+ private Duration pollTimeDuration = Duration.ofMillis(100);
+
+ /** Additional properties for consumers, producers, and admin clients. */
+ private Map consumer = new HashMap<>();
+
+ private Map producer = new HashMap<>();
+ private Map admin = new HashMap<>();
+
+ // Getters and setters
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getDlqTopic() {
+ return dlqTopic;
+ }
+
+ public void setDlqTopic(String dlqTopic) {
+ this.dlqTopic = dlqTopic;
+ }
+
+ public String getListenerQueuePrefix() {
+ return listenerQueuePrefix;
+ }
+
+ public void setListenerQueuePrefix(String listenerQueuePrefix) {
+ this.listenerQueuePrefix = listenerQueuePrefix;
+ }
+
+ public Duration getPollTimeDuration() {
+ return pollTimeDuration;
+ }
+
+ public void setPollTimeDuration(Duration pollTimeDuration) {
+ this.pollTimeDuration = pollTimeDuration;
+ }
+
+ public Map getConsumer() {
+ return consumer;
+ }
+
+ public void setConsumer(Map consumer) {
+ this.consumer = consumer;
+ }
+
+ public Map getProducer() {
+ return producer;
+ }
+
+ public void setProducer(Map producer) {
+ this.producer = producer;
+ }
+
+ public Map getAdmin() {
+ return admin;
+ }
+
+ public void setAdmin(Map admin) {
+ this.admin = admin;
+ }
+
+ /**
+ * Generates configuration properties for Kafka consumers. Maps against `ConsumerConfig` keys.
+ */
+ public Map toConsumerConfig() {
+ Map config = mapProperties(ConsumerConfig.configNames(), consumer);
+ // Ensure key.deserializer and value.deserializer are always set
+ setDefaultIfNullOrEmpty(
+ config,
+ ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+ setDefaultIfNullOrEmpty(
+ config,
+ ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringDeserializer");
+
+ setDefaultIfNullOrEmpty(config, ConsumerConfig.GROUP_ID_CONFIG, "conductor-group");
+ setDefaultIfNullOrEmpty(config, ConsumerConfig.CLIENT_ID_CONFIG, "consumer-client");
+ return config;
+ }
+
+ /**
+ * Generates configuration properties for Kafka producers. Maps against `ProducerConfig` keys.
+ */
+ public Map toProducerConfig() {
+ Map config = mapProperties(ProducerConfig.configNames(), producer);
+ setDefaultIfNullOrEmpty(
+ config,
+ ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+ setDefaultIfNullOrEmpty(
+ config,
+ ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
+ "org.apache.kafka.common.serialization.StringSerializer");
+
+ setDefaultIfNullOrEmpty(config, ProducerConfig.CLIENT_ID_CONFIG, "admin-client");
+ return config;
+ }
+
+ /**
+ * Generates configuration properties for Kafka AdminClient. Maps against `AdminClientConfig`
+ * keys.
+ */
+ public Map toAdminConfig() {
+ Map config = mapProperties(AdminClientConfig.configNames(), admin);
+ setDefaultIfNullOrEmpty(config, ConsumerConfig.CLIENT_ID_CONFIG, "admin-client");
+ return config;
+ }
+
+ /**
+ * Filters and maps properties based on the allowed keys for a specific Kafka client
+ * configuration.
+ *
+ * @param allowedKeys The keys allowed for the specific Kafka client configuration.
+ * @param inputProperties The user-specified properties to filter.
+ * @return A filtered map containing only valid properties.
+ */
+ private Map mapProperties(
+ Iterable allowedKeys, Map inputProperties) {
+ Map config = new HashMap<>();
+ for (String key : allowedKeys) {
+ if (inputProperties.containsKey(key)) {
+ config.put(key, inputProperties.get(key));
+ }
+ }
+
+ setDefaultIfNullOrEmpty(
+ config, AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Ensure
+ // bootstrapServers
+ // is
+ // always added
+ return config;
+ }
+
+ private void setDefaultIfNullOrEmpty(
+ Map config, String key, String defaultValue) {
+ Object value = config.get(key);
+ if (value == null || (value instanceof String && ((String) value).isBlank())) {
+ config.put(key, defaultValue);
+ }
+ }
+}
diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java
new file mode 100644
index 000000000..af54a7447
--- /dev/null
+++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java
@@ -0,0 +1,47 @@
+/*
+ * Copyright 2024 Conductor Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafkaeq.config;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.conductor.core.events.EventQueueProvider;
+import com.netflix.conductor.core.events.queue.ObservableQueue;
+import com.netflix.conductor.kafkaeq.eventqueue.KafkaObservableQueue.Builder;
+
+public class KafkaEventQueueProvider implements EventQueueProvider {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventQueueProvider.class);
+
+ private final Map queues = new ConcurrentHashMap<>();
+ private final KafkaEventQueueProperties properties;
+
+ public KafkaEventQueueProvider(KafkaEventQueueProperties properties) {
+ this.properties = properties;
+ }
+
+ @Override
+ public String getQueueType() {
+ return "kafka";
+ }
+
+ @Override
+ public ObservableQueue getQueue(String queueURI) {
+ LOGGER.info("Creating KafkaObservableQueue for topic: {}", queueURI);
+
+ return queues.computeIfAbsent(queueURI, q -> new Builder(properties).build(queueURI));
+ }
+}
diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java
new file mode 100644
index 000000000..4c798cf6d
--- /dev/null
+++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java
@@ -0,0 +1,648 @@
+/*
+ * Copyright 2024 Conductor Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafkaeq.eventqueue;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.kafka.clients.admin.AdminClient;
+import org.apache.kafka.clients.admin.ListOffsetsResult;
+import org.apache.kafka.clients.admin.OffsetSpec;
+import org.apache.kafka.clients.admin.TopicDescription;
+import org.apache.kafka.clients.consumer.*;
+import org.apache.kafka.clients.producer.*;
+import org.apache.kafka.common.KafkaFuture;
+import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.TopicPartitionInfo;
+import org.apache.kafka.common.header.Header;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.conductor.core.events.queue.Message;
+import com.netflix.conductor.core.events.queue.ObservableQueue;
+import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties;
+import com.netflix.conductor.metrics.Monitors;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import rx.Observable;
+import rx.subscriptions.Subscriptions;
+
+public class KafkaObservableQueue implements ObservableQueue {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaObservableQueue.class);
+ private static final String QUEUE_TYPE = "kafka";
+
+ private final String topic;
+ private volatile AdminClient adminClient;
+ private final Consumer kafkaConsumer;
+ private final Producer kafkaProducer;
+ private final long pollTimeInMS;
+ private final String dlqTopic;
+ private final boolean autoCommitEnabled;
+ private final Map unacknowledgedMessages = new ConcurrentHashMap<>();
+ private volatile boolean running = false;
+ private final KafkaEventQueueProperties properties;
+ private final ObjectMapper objectMapper = new ObjectMapper();
+
+ public KafkaObservableQueue(
+ String topic,
+ Properties consumerConfig,
+ Properties producerConfig,
+ Properties adminConfig,
+ KafkaEventQueueProperties properties) {
+ this.topic = topic;
+ this.kafkaConsumer = new KafkaConsumer<>(consumerConfig);
+ this.kafkaProducer = new KafkaProducer<>(producerConfig);
+ this.properties = properties;
+ this.pollTimeInMS = properties.getPollTimeDuration().toMillis();
+ this.dlqTopic = properties.getDlqTopic();
+ this.autoCommitEnabled =
+ Boolean.parseBoolean(
+ consumerConfig
+ .getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ .toString());
+
+ this.adminClient = AdminClient.create(adminConfig);
+ }
+
+ public KafkaObservableQueue(
+ String topic,
+ Consumer kafkaConsumer,
+ Producer kafkaProducer,
+ AdminClient adminClient,
+ KafkaEventQueueProperties properties) {
+ this.topic = topic;
+ this.kafkaConsumer = kafkaConsumer;
+ this.kafkaProducer = kafkaProducer;
+ this.adminClient = adminClient;
+ this.properties = properties;
+ this.pollTimeInMS = properties.getPollTimeDuration().toMillis();
+ this.dlqTopic = properties.getDlqTopic();
+ this.autoCommitEnabled =
+ Boolean.parseBoolean(
+ properties
+ .toConsumerConfig()
+ .getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false")
+ .toString());
+ }
+
+ @Override
+ public Observable observe() {
+ return Observable.create(
+ subscriber -> {
+ Observable interval =
+ Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS);
+
+ interval.flatMap(
+ (Long x) -> {
+ if (!isRunning()) {
+ return Observable.from(Collections.emptyList());
+ }
+
+ try {
+ ConsumerRecords records =
+ kafkaConsumer.poll(
+ this.properties.getPollTimeDuration());
+ List messages = new ArrayList<>();
+
+ for (ConsumerRecord record : records) {
+ try {
+ String messageId =
+ record.partition()
+ + "-"
+ + record.offset();
+ String key = record.key();
+ String value = record.value();
+ Map headers = new HashMap<>();
+
+ // Extract headers
+ if (record.headers() != null) {
+ for (Header header : record.headers()) {
+ headers.put(
+ header.key(),
+ new String(header.value()));
+ }
+ }
+
+ // Log the details
+ LOGGER.debug(
+ "Input values MessageId: {} Key: {} Headers: {} Value: {}",
+ messageId,
+ key,
+ headers,
+ value);
+
+ // Construct message
+ String jsonMessage =
+ constructJsonMessage(
+ key, headers, value);
+ LOGGER.debug("Payload: {}", jsonMessage);
+
+ Message message =
+ new Message(
+ messageId, jsonMessage, null);
+
+ unacknowledgedMessages.put(
+ messageId, record.offset());
+ messages.add(message);
+ } catch (Exception e) {
+ LOGGER.error(
+ "Failed to process record from Kafka: {}",
+ record,
+ e);
+ }
+ }
+
+ Monitors.recordEventQueueMessagesProcessed(
+ QUEUE_TYPE, this.topic, messages.size());
+ return Observable.from(messages);
+ } catch (Exception e) {
+ LOGGER.error(
+ "Error while polling Kafka for topic: {}",
+ topic,
+ e);
+ Monitors.recordObservableQMessageReceivedErrors(
+ QUEUE_TYPE);
+ return Observable.error(e);
+ }
+ })
+ .subscribe(subscriber::onNext, subscriber::onError);
+
+ subscriber.add(Subscriptions.create(this::stop));
+ });
+ }
+
+ private String constructJsonMessage(String key, Map headers, String payload) {
+ StringBuilder json = new StringBuilder();
+ json.append("{");
+ json.append("\"key\":\"").append(key != null ? key : "").append("\",");
+
+ // Serialize headers to JSON, handling potential errors
+ String headersJson = toJson(headers);
+ if (headersJson != null) {
+ json.append("\"headers\":").append(headersJson).append(",");
+ } else {
+ json.append("\"headers\":{}")
+ .append(","); // Default to an empty JSON object if headers are invalid
+ }
+
+ json.append("\"payload\":");
+
+ // Detect if the payload is valid JSON
+ if (isJsonValid(payload)) {
+ json.append(payload); // Embed JSON object directly
+ } else {
+ json.append(payload != null ? "\"" + payload + "\"" : "null"); // Treat as plain text
+ }
+
+ json.append("}");
+ return json.toString();
+ }
+
+ private boolean isJsonValid(String json) {
+ if (json == null || json.isEmpty()) {
+ return false;
+ }
+ try {
+ objectMapper.readTree(json); // Parses the JSON to check validity
+ return true;
+ } catch (JsonProcessingException e) {
+ return false;
+ }
+ }
+
+ protected String toJson(Object value) {
+ if (value == null) {
+ return null;
+ }
+ try {
+ return objectMapper.writeValueAsString(value);
+ } catch (JsonProcessingException ex) {
+ // Log the error and return a placeholder or null
+ LOGGER.error("Failed to convert object to JSON: {}", value, ex);
+ return null;
+ }
+ }
+
+ @Override
+ public List ack(List messages) {
+ // If autocommit is enabled we do not run this code.
+ if (autoCommitEnabled == true) {
+ LOGGER.info("Auto commit is enabled. Skipping manual acknowledgment.");
+ return List.of();
+ }
+
+ Map offsetsToCommit = new HashMap<>();
+ List failedAcks = new ArrayList<>(); // Collect IDs of failed messages
+
+ for (Message message : messages) {
+ String messageId = message.getId();
+ if (unacknowledgedMessages.containsKey(messageId)) {
+ try {
+ String[] parts = messageId.split("-");
+
+ if (parts.length != 2) {
+ throw new IllegalArgumentException(
+ "Invalid message ID format: " + messageId);
+ }
+
+ // Extract partition and offset from messageId
+ int partition = Integer.parseInt(parts[0]);
+ long offset = Long.parseLong(parts[1]);
+
+ // Remove message
+ unacknowledgedMessages.remove(messageId);
+
+ TopicPartition tp = new TopicPartition(topic, partition);
+
+ LOGGER.debug(
+ "Parsed messageId: {}, topic: {}, partition: {}, offset: {}",
+ messageId,
+ topic,
+ partition,
+ offset);
+ offsetsToCommit.put(tp, new OffsetAndMetadata(offset + 1));
+ } catch (Exception e) {
+ LOGGER.error("Failed to prepare acknowledgment for message: {}", messageId, e);
+ failedAcks.add(messageId); // Add to failed list if exception occurs
+ }
+ } else {
+ LOGGER.warn("Message ID not found in unacknowledged messages: {}", messageId);
+ failedAcks.add(messageId); // Add to failed list if not found
+ }
+ }
+
+ try {
+ LOGGER.debug("Committing offsets: {}", offsetsToCommit);
+
+ kafkaConsumer.commitSync(offsetsToCommit); // Commit all collected offsets
+ } catch (CommitFailedException e) {
+ LOGGER.warn("Offset commit failed: {}", e.getMessage());
+ } catch (OffsetOutOfRangeException e) {
+ LOGGER.error(
+ "OffsetOutOfRangeException encountered for topic {}: {}",
+ e.partitions(),
+ e.getMessage());
+
+ // Reset offsets for the out-of-range partition
+ Map offsetsToReset = new HashMap<>();
+ for (TopicPartition partition : e.partitions()) {
+ long newOffset =
+ kafkaConsumer.position(partition); // Default to the current position
+ offsetsToReset.put(partition, new OffsetAndMetadata(newOffset));
+ LOGGER.warn("Resetting offset for partition {} to {}", partition, newOffset);
+ }
+
+ // Commit the new offsets
+ kafkaConsumer.commitSync(offsetsToReset);
+ } catch (Exception e) {
+ LOGGER.error("Failed to commit offsets to Kafka: {}", offsetsToCommit, e);
+ // Add all message IDs from the current batch to the failed list
+ failedAcks.addAll(messages.stream().map(Message::getId).toList());
+ }
+
+ return failedAcks; // Return IDs of messages that were not successfully acknowledged
+ }
+
+ @Override
+ public void nack(List messages) {
+ for (Message message : messages) {
+ try {
+ kafkaProducer.send(
+ new ProducerRecord<>(dlqTopic, message.getId(), message.getPayload()));
+ } catch (Exception e) {
+ LOGGER.error("Failed to send message to DLQ. Message ID: {}", message.getId(), e);
+ }
+ }
+ }
+
+ @Override
+ public void publish(List messages) {
+ for (Message message : messages) {
+ try {
+ kafkaProducer.send(
+ new ProducerRecord<>(topic, message.getId(), message.getPayload()),
+ (metadata, exception) -> {
+ if (exception != null) {
+ LOGGER.error(
+ "Failed to publish message to Kafka. Message ID: {}",
+ message.getId(),
+ exception);
+ } else {
+ LOGGER.info(
+ "Message published to Kafka. Topic: {}, Partition: {}, Offset: {}",
+ metadata.topic(),
+ metadata.partition(),
+ metadata.offset());
+ }
+ });
+ } catch (Exception e) {
+ LOGGER.error(
+ "Error publishing message to Kafka. Message ID: {}", message.getId(), e);
+ }
+ }
+ }
+
+ @Override
+ public boolean rePublishIfNoAck() {
+ return false;
+ }
+
+ @Override
+ public void setUnackTimeout(Message message, long unackTimeout) {
+ // Kafka does not support visibility timeout; this can be managed externally if
+ // needed.
+ }
+
+ @Override
+ public long size() {
+ if (topicExists(this.topic) == false) {
+ LOGGER.info("Topic '{}' not available, will refresh metadata.", this.topic);
+ refreshMetadata(this.topic);
+ }
+
+ long topicSize = getTopicSizeUsingAdminClient();
+ if (topicSize != -1) {
+ LOGGER.info("Topic size for '{}': {}", this.topic, topicSize);
+ } else {
+ LOGGER.error("Failed to fetch topic size for '{}'", this.topic);
+ }
+
+ return topicSize;
+ }
+
+ private long getTopicSizeUsingAdminClient() {
+ try {
+ KafkaFuture topicDescriptionFuture =
+ adminClient
+ .describeTopics(Collections.singletonList(topic))
+ .topicNameValues()
+ .get(topic);
+
+ TopicDescription topicDescription = topicDescriptionFuture.get();
+
+ // Prepare request for latest offsets
+ Map offsetRequest = new HashMap<>();
+ for (TopicPartitionInfo partition : topicDescription.partitions()) {
+ TopicPartition tp = new TopicPartition(topic, partition.partition());
+ offsetRequest.put(tp, OffsetSpec.latest());
+ }
+
+ // Fetch offsets asynchronously
+ KafkaFuture