From f01f7f553e7ba21c2ac0c8500c5a49a948d5fa90 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Fri, 29 Nov 2024 16:03:19 +0900 Subject: [PATCH 01/12] Added support for kafka event queue --- kafka-event-queue/README.md | 24 + kafka-event-queue/build.gradle | 40 ++ .../config/KafkaEventQueueConfiguration.java | 102 ++++ .../config/KafkaEventQueueProperties.java | 197 ++++++++ .../config/KafkaEventQueueProvider.java | 47 ++ .../eventqueue/KafkaObservableQueue.java | 459 ++++++++++++++++++ ...itional-spring-configuration-metadata.json | 27 ++ server/build.gradle | 1 + settings.gradle | 1 + ...itional-spring-configuration-metadata.json | 10 + 10 files changed, 908 insertions(+) create mode 100644 kafka-event-queue/README.md create mode 100644 kafka-event-queue/build.gradle create mode 100644 kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java create mode 100644 kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java create mode 100644 kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java create mode 100644 kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java create mode 100644 kafka-event-queue/src/main/resources/META-INF/additional-spring-configuration-metadata.json diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md new file mode 100644 index 000000000..bb2ea5390 --- /dev/null +++ b/kafka-event-queue/README.md @@ -0,0 +1,24 @@ +# Event Queue +## Published Artifacts + +Group: `com.netflix.conductor` + +| Published Artifact | Description | +| ----------- | ----------- | +| conductor-kafka-event-queue | Support for integration with Kafka and consume events from it. | + +## Modules +### Kafka +https://kafka.apache.org/ + +Provides ability to publish and consume messages from Kafka +#### Configuration +(Default values shown below) +```properties +conductor.event-queues.kafka.enabled=true +conductor.event-queues.kafka.bootstrap-servers=kafka:29092 +conductor.event-queues.kafka.groupId=conductor-consumers +conductor.event-queues.kafka.topic=conductor-event +conductor.event-queues.kafka.dlqTopic=conductor-dlq +conductor.event-queues.kafka.pollTimeDuration=100 +``` diff --git a/kafka-event-queue/build.gradle b/kafka-event-queue/build.gradle new file mode 100644 index 000000000..7aeabf6be --- /dev/null +++ b/kafka-event-queue/build.gradle @@ -0,0 +1,40 @@ +plugins { + id 'java' +} + +dependencies { + // Core Conductor dependencies + implementation project(':conductor-common') + implementation project(':conductor-core') + + // Spring Boot support + implementation 'org.springframework.boot:spring-boot-starter' + + // Apache Commons Lang for utility classes + implementation 'org.apache.commons:commons-lang3' + + // Reactive programming support with RxJava + implementation "io.reactivex:rxjava:${revRxJava}" + + // SBMTODO: Remove Guava dependency if possible + // Guava should only be included if specifically needed + implementation "com.google.guava:guava:${revGuava}" + + // Removed AWS SQS SDK as we are transitioning to Kafka + // implementation "com.amazonaws:aws-java-sdk-sqs:${revAwsSdk}" + + // Test dependencies + testImplementation 'org.springframework.boot:spring-boot-starter-test' + testImplementation project(':conductor-common').sourceSets.test.output + + // Add Kafka client dependency + implementation 'org.apache.kafka:kafka-clients:3.5.1' + + // Add SLF4J API for logging + implementation 'org.slf4j:slf4j-api:2.0.9' + + // Add SLF4J binding for logging with Logback + runtimeOnly 'ch.qos.logback:logback-classic:1.4.11' +} + + diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java new file mode 100644 index 000000000..810e40cf7 --- /dev/null +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java @@ -0,0 +1,102 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.config; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.events.EventQueueProvider; +import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.kafkaeq.eventqueue.KafkaObservableQueue.Builder; +import com.netflix.conductor.model.TaskModel.Status; + +@Configuration +@EnableConfigurationProperties(KafkaEventQueueProperties.class) +@ConditionalOnProperty(name = "conductor.event-queues.kafka.enabled", havingValue = "true") +public class KafkaEventQueueConfiguration { + + @Autowired private KafkaEventQueueProperties kafkaProperties; + + private static final Logger LOGGER = + LoggerFactory.getLogger(KafkaEventQueueConfiguration.class); + + public KafkaEventQueueConfiguration(KafkaEventQueueProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + @Bean + public EventQueueProvider kafkaEventQueueProvider() { + return new KafkaEventQueueProvider(kafkaProperties); + } + + @ConditionalOnProperty( + name = "conductor.default-event-queue.type", + havingValue = "kafka", + matchIfMissing = false) + @Bean + public Map getQueues( + ConductorProperties conductorProperties, KafkaEventQueueProperties properties) { + try { + + LOGGER.debug( + "Starting to create KafkaObservableQueues with properties: {}", properties); + + String stack = + Optional.ofNullable(conductorProperties.getStack()) + .filter(stackName -> stackName.length() > 0) + .map(stackName -> stackName + "_") + .orElse(""); + + LOGGER.debug("Using stack: {}", stack); + + Status[] statuses = new Status[] {Status.COMPLETED, Status.FAILED}; + Map queues = new HashMap<>(); + + for (Status status : statuses) { + // Log the status being processed + LOGGER.debug("Processing status: {}", status); + + String queuePrefix = + StringUtils.isBlank(properties.getListenerQueuePrefix()) + ? conductorProperties.getAppId() + "_kafka_notify_" + stack + : properties.getListenerQueuePrefix(); + + LOGGER.debug("queuePrefix: {}", queuePrefix); + + String topicName = queuePrefix + status.name(); + + LOGGER.debug("topicName: {}", topicName); + + final ObservableQueue queue = new Builder(properties).build(topicName); + queues.put(status, queue); + } + + LOGGER.debug("Successfully created queues: {}", queues); + return queues; + } catch (Exception e) { + LOGGER.error("Failed to create KafkaObservableQueues", e); + throw new RuntimeException("Failed to getQueues on KafkaEventQueueConfiguration", e); + } + } +} diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java new file mode 100644 index 000000000..b41ed7e75 --- /dev/null +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java @@ -0,0 +1,197 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.config; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.NotBlank; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@ConfigurationProperties("conductor.event-queues.kafka") +@Validated +public class KafkaEventQueueProperties { + + /** Kafka bootstrap servers (comma-separated). */ + @NotBlank(message = "Bootstrap servers must not be blank") + private String bootstrapServers = "kafka:29092"; + + /** Primary topic for the Kafka queue. */ + private String topic = "conductor-event"; + + /** Dead Letter Queue (DLQ) topic for failed messages. */ + private String dlqTopic = "conductor-dlq"; + + /** Prefix for dynamically created Kafka topics, if applicable. */ + private String listenerQueuePrefix = ""; + + /** The polling interval for Kafka (in milliseconds). */ + private Duration pollTimeDuration = Duration.ofMillis(100); + + /** Additional properties for consumers, producers, and admin clients. */ + private Map consumer = new HashMap<>(); + + private Map producer = new HashMap<>(); + private Map admin = new HashMap<>(); + + // Getters and setters + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getDlqTopic() { + return dlqTopic; + } + + public void setDlqTopic(String dlqTopic) { + this.dlqTopic = dlqTopic; + } + + public String getListenerQueuePrefix() { + return listenerQueuePrefix; + } + + public void setListenerQueuePrefix(String listenerQueuePrefix) { + this.listenerQueuePrefix = listenerQueuePrefix; + } + + public Duration getPollTimeDuration() { + return pollTimeDuration; + } + + public void setPollTimeDuration(Duration pollTimeDuration) { + this.pollTimeDuration = pollTimeDuration; + } + + public Map getConsumer() { + return consumer; + } + + public void setConsumer(Map consumer) { + this.consumer = consumer; + } + + public Map getProducer() { + return producer; + } + + public void setProducer(Map producer) { + this.producer = producer; + } + + public Map getAdmin() { + return admin; + } + + public void setAdmin(Map admin) { + this.admin = admin; + } + + /** + * Generates configuration properties for Kafka consumers. Maps against `ConsumerConfig` keys. + */ + public Map toConsumerConfig() { + Map config = mapProperties(ConsumerConfig.configNames(), consumer); + // Ensure key.deserializer and value.deserializer are always set + setDefaultIfNullOrEmpty( + config, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + setDefaultIfNullOrEmpty( + config, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + + setDefaultIfNullOrEmpty(config, ConsumerConfig.GROUP_ID_CONFIG, "conductor-group"); + setDefaultIfNullOrEmpty(config, ConsumerConfig.CLIENT_ID_CONFIG, "consumer-client"); + return config; + } + + /** + * Generates configuration properties for Kafka producers. Maps against `ProducerConfig` keys. + */ + public Map toProducerConfig() { + Map config = mapProperties(ProducerConfig.configNames(), producer); + setDefaultIfNullOrEmpty( + config, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + setDefaultIfNullOrEmpty( + config, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + + setDefaultIfNullOrEmpty(config, ProducerConfig.CLIENT_ID_CONFIG, "admin-client"); + return config; + } + + /** + * Generates configuration properties for Kafka AdminClient. Maps against `AdminClientConfig` + * keys. + */ + public Map toAdminConfig() { + Map config = mapProperties(AdminClientConfig.configNames(), admin); + setDefaultIfNullOrEmpty(config, ConsumerConfig.CLIENT_ID_CONFIG, "admin-client"); + return config; + } + + /** + * Filters and maps properties based on the allowed keys for a specific Kafka client + * configuration. + * + * @param allowedKeys The keys allowed for the specific Kafka client configuration. + * @param inputProperties The user-specified properties to filter. + * @return A filtered map containing only valid properties. + */ + private Map mapProperties( + Iterable allowedKeys, Map inputProperties) { + Map config = new HashMap<>(); + for (String key : allowedKeys) { + if (inputProperties.containsKey(key)) { + config.put(key, inputProperties.get(key)); + } + } + + setDefaultIfNullOrEmpty( + config, AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Ensure + // bootstrapServers + // is + // always added + return config; + } + + private void setDefaultIfNullOrEmpty( + Map config, String key, String defaultValue) { + Object value = config.get(key); + if (value == null || (value instanceof String && ((String) value).isBlank())) { + config.put(key, defaultValue); + } + } +} diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java new file mode 100644 index 000000000..af54a7447 --- /dev/null +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java @@ -0,0 +1,47 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.config; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.events.EventQueueProvider; +import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.kafkaeq.eventqueue.KafkaObservableQueue.Builder; + +public class KafkaEventQueueProvider implements EventQueueProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventQueueProvider.class); + + private final Map queues = new ConcurrentHashMap<>(); + private final KafkaEventQueueProperties properties; + + public KafkaEventQueueProvider(KafkaEventQueueProperties properties) { + this.properties = properties; + } + + @Override + public String getQueueType() { + return "kafka"; + } + + @Override + public ObservableQueue getQueue(String queueURI) { + LOGGER.info("Creating KafkaObservableQueue for topic: {}", queueURI); + + return queues.computeIfAbsent(queueURI, q -> new Builder(properties).build(queueURI)); + } +} diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java new file mode 100644 index 000000000..91f7d7071 --- /dev/null +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -0,0 +1,459 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.eventqueue; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; + +import rx.Observable; +import rx.subscriptions.Subscriptions; + +public class KafkaObservableQueue implements ObservableQueue { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaObservableQueue.class); + private static final String QUEUE_TYPE = "kafka"; + + private final String topic; + private volatile AdminClient adminClient; + private final Consumer kafkaConsumer; + private final Producer kafkaProducer; + private final long pollTimeInMS; + private final String dlqTopic; + private final boolean autoCommitEnabled; + private final Map unacknowledgedMessages = new ConcurrentHashMap<>(); + private volatile boolean running = false; + private final KafkaEventQueueProperties properties; + + public KafkaObservableQueue( + String topic, + Properties consumerConfig, + Properties producerConfig, + Properties adminConfig, + KafkaEventQueueProperties properties) { + this.topic = topic; + this.kafkaConsumer = new KafkaConsumer<>(consumerConfig); + this.kafkaProducer = new KafkaProducer<>(producerConfig); + this.properties = properties; + this.pollTimeInMS = properties.getPollTimeDuration().toMillis(); + this.dlqTopic = properties.getDlqTopic(); + this.autoCommitEnabled = + Boolean.parseBoolean( + consumerConfig + .getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .toString()); + + this.adminClient = AdminClient.create(adminConfig); + } + + @Override + public Observable observe() { + return Observable.create( + subscriber -> { + Observable interval = + Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); + + interval.flatMap( + (Long x) -> { + if (!isRunning()) { + return Observable.from(Collections.emptyList()); + } + + try { + ConsumerRecords records = + kafkaConsumer.poll( + this.properties.getPollTimeDuration()); + List messages = new ArrayList<>(); + + for (ConsumerRecord record : records) { + try { + Message message = + new Message( + record.partition() + + "-" + + record + .offset(), // Message ID based on partition and + // offset + record.value(), + null); + String messageId = + record.partition() + + "-" + + record.offset(); + unacknowledgedMessages.put( + messageId, record.offset()); + messages.add(message); + } catch (Exception e) { + LOGGER.error( + "Failed to process record from Kafka: {}", + record, + e); + } + } + + return Observable.from(messages); + } catch (Exception e) { + LOGGER.error( + "Error while polling Kafka for topic: {}", + topic, + e); + return Observable.error(e); + } + }) + .subscribe(subscriber::onNext, subscriber::onError); + + subscriber.add(Subscriptions.create(this::stop)); + }); + } + + @Override + public List ack(List messages) { + // If autocommit is enabled we do not run this code. + if (autoCommitEnabled == true) { + LOGGER.info("Auto commit is enabled. Skipping manual acknowledgment."); + return List.of(); + } + + Map offsetsToCommit = new HashMap<>(); + List failedAcks = new ArrayList<>(); // Collect IDs of failed messages + + for (Message message : messages) { + String messageId = message.getId(); + if (unacknowledgedMessages.containsKey(messageId)) { + try { + String[] parts = messageId.split("-"); + + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid message ID format: " + messageId); + } + + // Extract partition and offset from messageId + int partition = Integer.parseInt(parts[0]); + long offset = Long.parseLong(parts[1]); + + // Remove message + unacknowledgedMessages.remove(messageId); + + TopicPartition tp = new TopicPartition(topic, partition); + + LOGGER.debug( + "Parsed messageId: {}, topic: {}, partition: {}, offset: {}", + messageId, + topic, + partition, + offset); + offsetsToCommit.put(tp, new OffsetAndMetadata(offset + 1)); + } catch (Exception e) { + LOGGER.error("Failed to prepare acknowledgment for message: {}", messageId, e); + failedAcks.add(messageId); // Add to failed list if exception occurs + } + } else { + LOGGER.warn("Message ID not found in unacknowledged messages: {}", messageId); + failedAcks.add(messageId); // Add to failed list if not found + } + } + + try { + LOGGER.debug("Committing offsets: {}", offsetsToCommit); + + kafkaConsumer.commitSync(offsetsToCommit); // Commit all collected offsets + } catch (CommitFailedException e) { + LOGGER.warn("Offset commit failed: {}", e.getMessage()); + } catch (OffsetOutOfRangeException e) { + LOGGER.error( + "OffsetOutOfRangeException encountered for topic {}: {}", + e.partitions(), + e.getMessage()); + + // Reset offsets for the out-of-range partition + Map offsetsToReset = new HashMap<>(); + for (TopicPartition partition : e.partitions()) { + long newOffset = + kafkaConsumer.position(partition); // Default to the current position + offsetsToReset.put(partition, new OffsetAndMetadata(newOffset)); + LOGGER.warn("Resetting offset for partition {} to {}", partition, newOffset); + } + + // Commit the new offsets + kafkaConsumer.commitSync(offsetsToReset); + } catch (Exception e) { + LOGGER.error("Failed to commit offsets to Kafka: {}", offsetsToCommit, e); + // Add all message IDs from the current batch to the failed list + failedAcks.addAll(messages.stream().map(Message::getId).toList()); + } + + return failedAcks; // Return IDs of messages that were not successfully acknowledged + } + + @Override + public void nack(List messages) { + for (Message message : messages) { + try { + kafkaProducer.send( + new ProducerRecord<>(dlqTopic, message.getId(), message.getPayload())); + } catch (Exception e) { + LOGGER.error("Failed to send message to DLQ. Message ID: {}", message.getId(), e); + } + } + } + + @Override + public void publish(List messages) { + for (Message message : messages) { + try { + kafkaProducer.send( + new ProducerRecord<>(topic, message.getId(), message.getPayload()), + (metadata, exception) -> { + if (exception != null) { + LOGGER.error( + "Failed to publish message to Kafka. Message ID: {}", + message.getId(), + exception); + } else { + LOGGER.info( + "Message published to Kafka. Topic: {}, Partition: {}, Offset: {}", + metadata.topic(), + metadata.partition(), + metadata.offset()); + } + }); + } catch (Exception e) { + LOGGER.error( + "Error publishing message to Kafka. Message ID: {}", message.getId(), e); + } + } + } + + @Override + public boolean rePublishIfNoAck() { + return false; + } + + @Override + public void setUnackTimeout(Message message, long unackTimeout) { + // Kafka does not support visibility timeout; this can be managed externally if + // needed. + } + + @Override + public long size() { + long topicSize = getTopicSizeUsingAdminClient(); + if (topicSize != -1) { + LOGGER.info("Topic size for 'conductor-event': {}", topicSize); + } else { + LOGGER.error("Failed to fetch topic size for 'conductor-event'"); + } + + return topicSize; + } + + private long getTopicSizeUsingAdminClient() { + try { + // Fetch metadata for the topic asynchronously + KafkaFuture topicDescriptionFuture = + adminClient + .describeTopics(Collections.singletonList(topic)) + .topicNameValues() + .get(topic); + + TopicDescription topicDescription = topicDescriptionFuture.get(); + + // Prepare request for latest offsets + Map offsetRequest = new HashMap<>(); + for (TopicPartitionInfo partition : topicDescription.partitions()) { + TopicPartition tp = new TopicPartition(topic, partition.partition()); + offsetRequest.put(tp, OffsetSpec.latest()); + } + + // Fetch offsets asynchronously + Map offsets = + adminClient.listOffsets(offsetRequest).all().get(); + + // Calculate total size by summing offsets + return offsets.values().stream() + .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset) + .sum(); + } catch (Exception e) { + LOGGER.error("Error fetching topic size using AdminClient for topic: {}", topic, e); + return -1; + } + } + + @Override + public void close() { + try { + stop(); + LOGGER.info("KafkaObservableQueue fully stopped and resources closed."); + } catch (Exception e) { + LOGGER.error("Error during close(): {}", e.getMessage(), e); + } + } + + @Override + public void start() { + LOGGER.info("KafkaObservableQueue starting for topic: {}", topic); + if (running) { + LOGGER.warn("KafkaObservableQueue is already running for topic: {}", topic); + return; + } + + try { + running = true; + kafkaConsumer.subscribe( + Collections.singletonList(topic)); // Subscribe to a single topic + LOGGER.info("KafkaObservableQueue started for topic: {}", topic); + } catch (Exception e) { + running = false; + LOGGER.error("Error starting KafkaObservableQueue for topic: {}", topic, e); + } + } + + @Override + public void stop() { + LOGGER.info("Kafka consumer stopping for topic: {}", topic); + if (!running) { + LOGGER.warn("KafkaObservableQueue is already stopped for topic: {}", topic); + return; + } + + try { + running = false; + + try { + kafkaConsumer.unsubscribe(); + kafkaConsumer.close(); + LOGGER.info("Kafka consumer stopped for topic: {}", topic); + } catch (Exception e) { + LOGGER.error("Error stopping Kafka consumer for topic: {}", topic, e); + retryCloseConsumer(); + } + + try { + kafkaProducer.close(); + LOGGER.info("Kafka producer stopped for topic: {}", topic); + } catch (Exception e) { + LOGGER.error("Error stopping Kafka producer for topic: {}", topic, e); + retryCloseProducer(); + } + } catch (Exception e) { + LOGGER.error("Critical error stopping KafkaObservableQueue for topic: {}", topic, e); + } + } + + private void retryCloseConsumer() { + int retries = 3; + while (retries > 0) { + try { + kafkaConsumer.close(); + LOGGER.info("Kafka consumer closed successfully on retry."); + return; + } catch (Exception e) { + retries--; + LOGGER.warn( + "Retry failed to close Kafka consumer. Remaining attempts: {}", retries, e); + if (retries == 0) { + LOGGER.error("Exhausted retries for closing Kafka consumer."); + } + } + } + } + + private void retryCloseProducer() { + int retries = 3; + while (retries > 0) { + try { + kafkaProducer.close(); + LOGGER.info("Kafka producer closed successfully on retry."); + return; + } catch (Exception e) { + retries--; + LOGGER.warn( + "Retry failed to close Kafka producer. Remaining attempts: {}", retries, e); + if (retries == 0) { + LOGGER.error("Exhausted retries for closing Kafka producer."); + } + } + } + } + + @Override + public String getType() { + return QUEUE_TYPE; + } + + @Override + public String getName() { + return topic; + } + + @Override + public String getURI() { + return "kafka://" + topic; + } + + @Override + public boolean isRunning() { + return running; + } + + public static class Builder { + private final KafkaEventQueueProperties properties; + + public Builder(KafkaEventQueueProperties properties) { + this.properties = properties; + } + + public KafkaObservableQueue build(final String topic) { + Properties consumerConfig = new Properties(); + consumerConfig.putAll(properties.toConsumerConfig()); + + LOGGER.debug("Kafka Consumer Config: {}", consumerConfig); + + Properties producerConfig = new Properties(); + producerConfig.putAll(properties.toProducerConfig()); + + LOGGER.debug("Kafka Producer Config: {}", producerConfig); + + Properties adminConfig = new Properties(); + adminConfig.putAll(properties.toAdminConfig()); + + LOGGER.debug("Kafka Admin Config: {}", adminConfig); + + try { + return new KafkaObservableQueue( + topic, consumerConfig, producerConfig, adminConfig, properties); + } catch (Exception e) { + LOGGER.error("Failed to initialize KafkaObservableQueue for topic: {}", topic, e); + throw new RuntimeException( + "Failed to initialize KafkaObservableQueue for topic: " + topic, e); + } + } + } +} diff --git a/kafka-event-queue/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/kafka-event-queue/src/main/resources/META-INF/additional-spring-configuration-metadata.json new file mode 100644 index 000000000..4351cdb81 --- /dev/null +++ b/kafka-event-queue/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -0,0 +1,27 @@ +{ + "properties": [ + { + "name": "conductor.event-queues.kafka.enabled", + "type": "java.lang.Boolean", + "description": "Enable the use of Kafka implementation to provide queues for consuming events.", + "sourceType": "com.netflix.conductor.kafkaeq.config.KafkaEventQueueConfiguration" + }, + { + "name": "conductor.default-event-queue.type", + "type": "java.lang.String", + "description": "The default event queue type to listen on for the WAIT task.", + "sourceType": "com.netflix.conductor.kafkaeq.config.KafkaEventQueueConfiguration" + } + ], + "hints": [ + { + "name": "conductor.default-event-queue.type", + "values": [ + { + "value": "kafka", + "description": "Use kafka as the event queue to listen on for the WAIT task." + } + ] + } + ] +} \ No newline at end of file diff --git a/server/build.gradle b/server/build.gradle index fdf7646be..1e5e6181d 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -25,6 +25,7 @@ dependencies { implementation project(':conductor-nats') implementation project(':conductor-nats-streaming') implementation project(':conductor-awssqs-event-queue') + implementation project(':conductor-kafka-event-queue') //External Payload Storage implementation project(':conductor-azureblob-storage') diff --git a/settings.gradle b/settings.gradle index 605765ede..41ac7bbf3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -75,6 +75,7 @@ include 'postgres-external-storage' include 'amqp' include 'nats' include 'nats-streaming' +include 'kafka-event-queue' include 'test-harness' diff --git a/workflow-event-listener/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/workflow-event-listener/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 3156ecbef..0096bbb81 100644 --- a/workflow-event-listener/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/workflow-event-listener/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -70,6 +70,12 @@ "description": "Enable the use of NATS Streaming implementation to provide queues for consuming events.", "sourceType": "com.netflix.conductor.contribs.queue.nats.config.NATSStreamConfiguration" }, + { + "name": "conductor.event-queues.kafka.enabled", + "type": "java.lang.Boolean", + "description": "Enable the use of Kafka implementation to provide queues for consuming events.", + "sourceType": "com.netflix.conductor.event-queue.kafkaeq.config.KafkaEventQueueConfiguration" + }, { "name": "conductor.default-event-queue.type", "type": "java.lang.String", @@ -108,6 +114,10 @@ { "value": "nats_stream", "description": "Use NATS Stream as the event queue to listen on for the WAIT task." + }, + { + "value": "kafka", + "description": "Use Kafka as the event queue to listen on for the WAIT task." } ] }, From ea0447fea580f204c1cb442cf1ef7421f9685416 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Mon, 2 Dec 2024 14:07:02 +0900 Subject: [PATCH 02/12] Added basic tests --- kafka-event-queue/build.gradle | 12 +- .../eventqueue/KafkaObservableQueue.java | 21 ++ .../eventqueue/KafkaObservableQueueTest.java | 264 ++++++++++++++++++ 3 files changed, 293 insertions(+), 4 deletions(-) create mode 100644 kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java diff --git a/kafka-event-queue/build.gradle b/kafka-event-queue/build.gradle index 7aeabf6be..3b41e131e 100644 --- a/kafka-event-queue/build.gradle +++ b/kafka-event-queue/build.gradle @@ -1,7 +1,3 @@ -plugins { - id 'java' -} - dependencies { // Core Conductor dependencies implementation project(':conductor-common') @@ -26,6 +22,7 @@ dependencies { // Test dependencies testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation project(':conductor-common').sourceSets.test.output + // Add Kafka client dependency implementation 'org.apache.kafka:kafka-clients:3.5.1' @@ -37,4 +34,11 @@ dependencies { runtimeOnly 'ch.qos.logback:logback-classic:1.4.11' } +test { + testLogging { + events "passed", "skipped", "failed" + showStandardStreams = true // Enable standard output + } +} + diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index 91f7d7071..33c5a7991 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -72,6 +72,27 @@ public KafkaObservableQueue( this.adminClient = AdminClient.create(adminConfig); } + public KafkaObservableQueue( + String topic, + Consumer kafkaConsumer, + Producer kafkaProducer, + AdminClient adminClient, + KafkaEventQueueProperties properties) { + this.topic = topic; + this.kafkaConsumer = kafkaConsumer; + this.kafkaProducer = kafkaProducer; + this.adminClient = adminClient; + this.properties = properties; + this.pollTimeInMS = properties.getPollTimeDuration().toMillis(); + this.dlqTopic = properties.getDlqTopic(); + this.autoCommitEnabled = + Boolean.parseBoolean( + properties + .toConsumerConfig() + .getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .toString()); + } + @Override public Observable observe() { return Observable.create( diff --git a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java new file mode 100644 index 000000000..e0b9bd913 --- /dev/null +++ b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java @@ -0,0 +1,264 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.eventqueue; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; + +import rx.Observable; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.*; + +@SuppressWarnings("unchecked") +@RunWith(SpringJUnit4ClassRunner.class) +public class KafkaObservableQueueTest { + + private KafkaObservableQueue queue; + + @Mock private volatile MockConsumer mockKafkaConsumer; + + @Mock private volatile MockProducer mockKafkaProducer; + + @Mock private volatile AdminClient mockAdminClient; + + @Mock private volatile KafkaEventQueueProperties mockProperties; + + @Before + public void setUp() throws Exception { + System.out.println("Setup called"); + // Create mock instances + this.mockKafkaConsumer = mock(MockConsumer.class); + this.mockKafkaProducer = mock(MockProducer.class); + this.mockAdminClient = mock(AdminClient.class); + this.mockProperties = mock(KafkaEventQueueProperties.class); + + // Mock KafkaEventQueueProperties behavior + when(this.mockProperties.getPollTimeDuration()).thenReturn(Duration.ofMillis(100)); + when(this.mockProperties.getDlqTopic()).thenReturn("test-dlq"); + + // Create an instance of KafkaObservableQueue with the mocks + queue = + new KafkaObservableQueue( + "test-topic", + this.mockKafkaConsumer, + this.mockKafkaProducer, + this.mockAdminClient, + this.mockProperties); + } + + private void injectMockField(Object target, String fieldName, Object mock) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, mock); + } + + @Test + public void testObserve() throws Exception { + // Prepare mock consumer records + List> records = + List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key-1", "payload-1"), + new ConsumerRecord<>("test-topic", 0, 1, "key-2", "payload-2")); + + ConsumerRecords consumerRecords = + new ConsumerRecords<>(Map.of(new TopicPartition("test-topic", 0), records)); + + // Mock the KafkaConsumer poll behavior + when(mockKafkaConsumer.poll(any(Duration.class))) + .thenReturn(consumerRecords) + .thenReturn( + new ConsumerRecords<>( + Collections.emptyMap())); // Subsequent polls return empty + + // Start the queue + queue.start(); + + // Collect emitted messages + List found = new ArrayList<>(); + Observable observable = queue.observe(); + assertNotNull(observable); + observable.subscribe(found::add); + + // Allow polling to run + try { + Thread.sleep(1000); // Adjust duration if necessary + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Assert results + assertNotNull(queue); + assertEquals(2, found.size()); + assertEquals("payload-1", found.get(0).getPayload()); + assertEquals("payload-2", found.get(1).getPayload()); + } + + @Test + public void testAck() throws Exception { + Map unacknowledgedMessages = new ConcurrentHashMap<>(); + unacknowledgedMessages.put("0-1", 1L); + injectMockField(queue, "unacknowledgedMessages", unacknowledgedMessages); + + Message message = new Message("0-1", "payload", null); + List messages = List.of(message); + + doNothing().when(mockKafkaConsumer).commitSync(anyMap()); + + List failedAcks = queue.ack(messages); + + assertTrue(failedAcks.isEmpty()); + verify(mockKafkaConsumer, times(1)).commitSync(anyMap()); + } + + @Test + public void testNack() { + // Arrange + Message message = new Message("0-1", "payload", null); + List messages = List.of(message); + + // Simulate the Kafka Producer behavior + doAnswer( + invocation -> { + ProducerRecord record = invocation.getArgument(0); + System.out.println("Simulated record sent: " + record); + return null; // Simulate success + }) + .when(mockKafkaProducer) + .send(any(ProducerRecord.class)); + + // Act + queue.nack(messages); + + // Assert + ArgumentCaptor> captor = + ArgumentCaptor.forClass(ProducerRecord.class); + verify(mockKafkaProducer).send(captor.capture()); + + ProducerRecord actualRecord = captor.getValue(); + System.out.println("Captured Record: " + actualRecord); + + // Verify the captured record matches the expected values + assertEquals("test-dlq", actualRecord.topic()); + assertEquals("0-1", actualRecord.key()); + assertEquals("payload", actualRecord.value()); + } + + @Test + public void testPublish() { + Message message = new Message("key-1", "payload", null); + List messages = List.of(message); + + // Mock the behavior of the producer's send() method + when(mockKafkaProducer.send(any(ProducerRecord.class), any())) + .thenAnswer( + invocation -> { + Callback callback = invocation.getArgument(1); + // Simulate a successful send with mock metadata + callback.onCompletion( + new RecordMetadata( + new TopicPartition( + "test-topic", 0), // Topic and partition + 0, // Base offset + 0, // Log append time + 0, // Create time + 10, // Serialized key size + 100 // Serialized value size + ), + null); + return null; + }); + + // Invoke the publish method + queue.publish(messages); + + // Verify that the producer's send() method was called exactly once + verify(mockKafkaProducer, times(1)).send(any(ProducerRecord.class), any()); + } + + @Test + public void testSize() throws Exception { + // Step 1: Mock TopicDescription + TopicDescription topicDescription = + new TopicDescription( + "test-topic", + false, + List.of( + new TopicPartitionInfo( + 0, null, List.of(), List.of())) // One partition + ); + + // Simulate `describeTopics` returning the TopicDescription + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + KafkaFuture mockFutureTopicDescription = + KafkaFuture.completedFuture(topicDescription); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of("test-topic", mockFutureTopicDescription)); + when(mockAdminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Step 2: Mock Offsets + ListOffsetsResult.ListOffsetsResultInfo offsetInfo = + new ListOffsetsResult.ListOffsetsResultInfo( + 10, // Mock the offset size + 0, // Leader epoch + null // Timestamp + ); + + ListOffsetsResult mockListOffsetsResult = mock(ListOffsetsResult.class); + KafkaFuture> + mockOffsetsFuture = + KafkaFuture.completedFuture( + Map.of(new TopicPartition("test-topic", 0), offsetInfo)); + when(mockListOffsetsResult.all()).thenReturn(mockOffsetsFuture); + when(mockAdminClient.listOffsets(anyMap())).thenReturn(mockListOffsetsResult); + + // Step 3: Call the `size` method + long size = queue.size(); + + // Step 4: Verify the size is correctly calculated + assertEquals(10, size); // As we mocked 10 as the offset in the partition + } + + @Test + public void testLifecycle() { + queue.start(); + assertTrue(queue.isRunning()); + + queue.stop(); + assertFalse(queue.isRunning()); + } +} From 4be828a868fdc8c18217a79e708b20ee51fe7169 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Mon, 2 Dec 2024 14:26:26 +0900 Subject: [PATCH 03/12] Added readme --- kafka-event-queue/README.md | 76 ++++++++++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 6 deletions(-) diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md index bb2ea5390..f776be837 100644 --- a/kafka-event-queue/README.md +++ b/kafka-event-queue/README.md @@ -1,4 +1,5 @@ # Event Queue + ## Published Artifacts Group: `com.netflix.conductor` @@ -8,17 +9,80 @@ Group: `com.netflix.conductor` | conductor-kafka-event-queue | Support for integration with Kafka and consume events from it. | ## Modules + ### Kafka + https://kafka.apache.org/ -Provides ability to publish and consume messages from Kafka -#### Configuration -(Default values shown below) +## kafka-event-queue + +Provides ability to consume messages from Kafka + +## Configuration + +To enable the queue use set the following to true. + ```properties conductor.event-queues.kafka.enabled=true +``` + +There are is a set of shared properties these are: + +```properties +# If kafka should be used with event queues like SQS or AMPQ +conductor.default-event-queue.type=kafka + +# the bootstrap server ot use. conductor.event-queues.kafka.bootstrap-servers=kafka:29092 -conductor.event-queues.kafka.groupId=conductor-consumers + +# The topic to listen to conductor.event-queues.kafka.topic=conductor-event -conductor.event-queues.kafka.dlqTopic=conductor-dlq -conductor.event-queues.kafka.pollTimeDuration=100 + +# The dead letter queue to use for events that had some error. +conductor.event-queues.kafka.dlq-topic=conductor-dlq + +# topic prefix combined with conductor.default-event-queue.type +conductor.event-queues.kafka.listener-queue-prefix=conductor_ + +# The polling duration. Start at 500ms and reduce based on how your environment behaves. +conductor.event-queues.kafka.poll-time-duration=500ms +``` + +There are 3 clients that should be configured, there is the Consumer, responsible to consuming messages, Publisher that publishes messages to Kafka and the Admin which handles admin operations. + +The supported properties for the 3 clients are the ones included in `org.apache.kafka:kafka-clients:3.5.1` for each client type. + +## Consumer properties + +Example of consumer settings. + +```properties +conductor.event-queues.kafka.consumer.client.id=consumer-client +conductor.event-queues.kafka.consumer.auto.offset.reset=earliest +conductor.event-queues.kafka.consumer.enable.auto.commit=false +conductor.event-queues.kafka.consumer.fetch.min.bytes=1 +conductor.event-queues.kafka.consumer.max.poll.records=500 +conductor.event-queues.kafka.consumer.group-id=conductor-group +``` + +## Producer properties + +Example of producer settings. + +```properties +conductor.event-queues.kafka.producer.client.id=producer-client +conductor.event-queues.kafka.producer.acks=all +conductor.event-queues.kafka.producer.retries=5 +conductor.event-queues.kafka.producer.batch.size=16384 +conductor.event-queues.kafka.producer.linger.ms=10 +conductor.event-queues.kafka.producer.compression.type=gzip +``` + +## Admin properties + +Example of admin settings. + +```properties +conductor.event-queues.kafka.admin.client.id=admin-client +conductor.event-queues.kafka.admin.connections.max.idle.ms=10000 ``` From 3098dc06d4cd30ddd690f3653ed0de7ab80270d1 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Mon, 2 Dec 2024 14:55:24 +0900 Subject: [PATCH 04/12] Updated read me with usage --- kafka-event-queue/README.md | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md index f776be837..690e71059 100644 --- a/kafka-event-queue/README.md +++ b/kafka-event-queue/README.md @@ -16,7 +16,33 @@ https://kafka.apache.org/ ## kafka-event-queue -Provides ability to consume messages from Kafka +Provides ability to consume messages from Kafka. + +## Usage + +To use it in an event handler prefix the event with `kafka` followed by the topic. + +Example: +```json +{ + "name": "kafka_test_event_handler", + "event": "kafka:conductor-event", + "actions": [ + { + "action": "start_workflow", + "start_workflow": { + "name": "workflow_triggered_by_kafka", + "input": { + "inlineValue": 1 + } + }, + "expandInlineJSON": true + } + ], + "active": true +} +``` + ## Configuration From e1ed995c859f4adc198459a1af2174be274c0d21 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Mon, 2 Dec 2024 14:57:34 +0900 Subject: [PATCH 05/12] Consolidated test output to match other modules --- kafka-event-queue/build.gradle | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka-event-queue/build.gradle b/kafka-event-queue/build.gradle index 3b41e131e..5b998696b 100644 --- a/kafka-event-queue/build.gradle +++ b/kafka-event-queue/build.gradle @@ -34,11 +34,11 @@ dependencies { runtimeOnly 'ch.qos.logback:logback-classic:1.4.11' } -test { - testLogging { - events "passed", "skipped", "failed" - showStandardStreams = true // Enable standard output - } -} +// test { +// testLogging { +// events "passed", "skipped", "failed" +// showStandardStreams = true // Enable standard output +// } +// } From 2273d3946b6775205e4148493f982ef20a533330 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 11:13:44 +0900 Subject: [PATCH 06/12] Only create message ID once. Added logging of recived message. Added a new Test for testing JSON in the message. --- .../eventqueue/KafkaObservableQueue.java | 23 +++++---- .../eventqueue/KafkaObservableQueueTest.java | 51 +++++++++++++++++-- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index 33c5a7991..28a502e7e 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -114,19 +114,22 @@ public Observable observe() { for (ConsumerRecord record : records) { try { - Message message = - new Message( - record.partition() - + "-" - + record - .offset(), // Message ID based on partition and - // offset - record.value(), - null); String messageId = record.partition() + "-" - + record.offset(); + + record.offset(); // Message ID + // based on + // partition + // and + + String value = record.value(); + LOGGER.debug( + "MessageId: {} value: {}", + messageId, + value); + Message message = + new Message(messageId, value, null); + unacknowledgedMessages.put( messageId, record.offset()); messages.add(message); diff --git a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java index e0b9bd913..4f4aa4fb9 100644 --- a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java +++ b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; @@ -102,8 +103,8 @@ public void testObserve() throws Exception { when(mockKafkaConsumer.poll(any(Duration.class))) .thenReturn(consumerRecords) .thenReturn( - new ConsumerRecords<>( - Collections.emptyMap())); // Subsequent polls return empty + new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls return + // empty // Start the queue queue.start(); @@ -128,6 +129,48 @@ public void testObserve() throws Exception { assertEquals("payload-2", found.get(1).getPayload()); } + @Test + public void testObserveReadsKafkaValue() { + // Prepare mock Kafka records + List> records = + List.of( + new ConsumerRecord<>( + "test-topic", 0, 0, "key1", "{\"testKey\": \"testValue1\"}"), + new ConsumerRecord<>( + "test-topic", 0, 1, "key2", "{\"testKey\": \"testValue2\"}")); + + ConsumerRecords consumerRecords = + new ConsumerRecords<>(Map.of(new TopicPartition("test-topic", 0), records)); + + // Mock the KafkaConsumer poll behavior + when(mockKafkaConsumer.poll(any(Duration.class))) + .thenReturn(consumerRecords) + .thenReturn( + new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls return + // empty + + // Start the queue + queue.start(); + + // Collect messages from observe + List collectedMessages = new CopyOnWriteArrayList<>(); + Observable observable = queue.observe(); + assertNotNull(observable); + observable.subscribe(collectedMessages::add); + + // Allow polling to run + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Verify results + assertEquals(2, collectedMessages.size()); + assertEquals("{\"testKey\": \"testValue1\"}", collectedMessages.get(0).getPayload()); + assertEquals("{\"testKey\": \"testValue2\"}", collectedMessages.get(1).getPayload()); + } + @Test public void testAck() throws Exception { Map unacknowledgedMessages = new ConcurrentHashMap<>(); @@ -191,8 +234,8 @@ public void testPublish() { // Simulate a successful send with mock metadata callback.onCompletion( new RecordMetadata( - new TopicPartition( - "test-topic", 0), // Topic and partition + new TopicPartition("test-topic", 0), // Topic and + // partition 0, // Base offset 0, // Log append time 0, // Create time From 27158292082c19c0fb196c7de945046557715883 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 12:18:44 +0900 Subject: [PATCH 07/12] Improved payload formatting, added key and headers to payload. added a new test for verifying payload --- .../eventqueue/KafkaObservableQueue.java | 89 ++++++++- .../eventqueue/KafkaObservableQueueTest.java | 169 +++++++++++++++--- 2 files changed, 226 insertions(+), 32 deletions(-) diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index 28a502e7e..f785aa302 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,8 @@ import com.netflix.conductor.core.events.queue.ObservableQueue; import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import rx.Observable; import rx.subscriptions.Subscriptions; @@ -50,6 +53,7 @@ public class KafkaObservableQueue implements ObservableQueue { private final Map unacknowledgedMessages = new ConcurrentHashMap<>(); private volatile boolean running = false; private final KafkaEventQueueProperties properties; + private final ObjectMapper objectMapper = new ObjectMapper(); public KafkaObservableQueue( String topic, @@ -117,18 +121,37 @@ public Observable observe() { String messageId = record.partition() + "-" - + record.offset(); // Message ID - // based on - // partition - // and - + + record.offset(); + String key = record.key(); String value = record.value(); + Map headers = new HashMap<>(); + + // Extract headers + if (record.headers() != null) { + for (Header header : record.headers()) { + headers.put( + header.key(), + new String(header.value())); + } + } + + // Log the details LOGGER.debug( - "MessageId: {} value: {}", + "Input values MessageId: {} Key: {} Headers: {} Value: {}", messageId, + key, + headers, value); + + // Construct message + String jsonMessage = + constructJsonMessage( + key, headers, value); + LOGGER.debug("Payload: {}", jsonMessage); + Message message = - new Message(messageId, value, null); + new Message( + messageId, jsonMessage, null); unacknowledgedMessages.put( messageId, record.offset()); @@ -156,6 +179,58 @@ public Observable observe() { }); } + private String constructJsonMessage(String key, Map headers, String payload) { + StringBuilder json = new StringBuilder(); + json.append("{"); + json.append("\"key\":\"").append(key != null ? key : "").append("\","); + + // Serialize headers to JSON, handling potential errors + String headersJson = toJson(headers); + if (headersJson != null) { + json.append("\"headers\":").append(headersJson).append(","); + } else { + json.append("\"headers\":{}") + .append(","); // Default to an empty JSON object if headers are invalid + } + + json.append("\"payload\":"); + + // Detect if the payload is valid JSON + if (isJsonValid(payload)) { + json.append(payload); // Embed JSON object directly + } else { + json.append(payload != null ? "\"" + payload + "\"" : "null"); // Treat as plain text + } + + json.append("}"); + return json.toString(); + } + + private boolean isJsonValid(String json) { + if (json == null || json.isEmpty()) { + return false; + } + try { + objectMapper.readTree(json); // Parses the JSON to check validity + return true; + } catch (JsonProcessingException e) { + return false; + } + } + + protected String toJson(Object value) { + if (value == null) { + return null; + } + try { + return objectMapper.writeValueAsString(value); + } catch (JsonProcessingException ex) { + // Log the error and return a placeholder or null + LOGGER.error("Failed to convert object to JSON: {}", value, ex); + return null; + } + } + @Override public List ack(List messages) { // If autocommit is enabled we do not run this code. diff --git a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java index 4f4aa4fb9..8a6f98ff0 100644 --- a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java +++ b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java @@ -16,7 +16,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; @@ -27,6 +26,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.header.Headers; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,6 +37,8 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import rx.Observable; import static org.junit.jupiter.api.Assertions.*; @@ -89,12 +91,27 @@ private void injectMockField(Object target, String fieldName, Object mock) throw } @Test - public void testObserve() throws Exception { - // Prepare mock consumer records + public void testObserveWithHeaders() throws Exception { + // Prepare mock consumer records with diverse headers, keys, and payloads List> records = List.of( new ConsumerRecord<>("test-topic", 0, 0, "key-1", "payload-1"), - new ConsumerRecord<>("test-topic", 0, 1, "key-2", "payload-2")); + new ConsumerRecord<>("test-topic", 0, 1, "key-2", "{\"field\":\"value\"}"), + new ConsumerRecord<>("test-topic", 0, 2, null, "null-key-payload"), + new ConsumerRecord<>("test-topic", 0, 3, "key-3", ""), + new ConsumerRecord<>("test-topic", 0, 4, "key-4", "12345"), + new ConsumerRecord<>( + "test-topic", + 0, + 5, + "key-5", + "{\"complex\":{\"nested\":\"value\"}}")); + + // Add headers to each ConsumerRecord + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); + record.headers().add("header-key-" + i, ("header-value-" + i).getBytes()); + } ConsumerRecords consumerRecords = new ConsumerRecords<>(Map.of(new TopicPartition("test-topic", 0), records)); @@ -102,9 +119,8 @@ public void testObserve() throws Exception { // Mock the KafkaConsumer poll behavior when(mockKafkaConsumer.poll(any(Duration.class))) .thenReturn(consumerRecords) - .thenReturn( - new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls return - // empty + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls + // return empty // Start the queue queue.start(); @@ -124,20 +140,49 @@ public void testObserve() throws Exception { // Assert results assertNotNull(queue); - assertEquals(2, found.size()); - assertEquals("payload-1", found.get(0).getPayload()); - assertEquals("payload-2", found.get(1).getPayload()); + assertEquals(6, found.size()); // Expect all 6 messages to be processed + + assertEquals("0-0", found.get(0).getId()); + assertEquals("0-1", found.get(1).getId()); + assertEquals("0-2", found.get(2).getId()); + assertEquals("0-3", found.get(3).getId()); + assertEquals("0-4", found.get(4).getId()); + assertEquals("0-5", found.get(5).getId()); + + // Validate headers + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); + assertNotNull(record.headers()); + assertEquals(1, record.headers().toArray().length); + assertEquals( + "header-value-" + i, + new String(record.headers().lastHeader("header-key-" + i).value())); + } } @Test - public void testObserveReadsKafkaValue() { - // Prepare mock Kafka records + public void testObserveWithComplexPayload() throws Exception { + // Prepare mock consumer records with diverse headers, keys, and payloads List> records = List.of( new ConsumerRecord<>( - "test-topic", 0, 0, "key1", "{\"testKey\": \"testValue1\"}"), + "test-topic", 0, 0, "key-1", "{\"data\":\"payload-1\"}"), + new ConsumerRecord<>("test-topic", 0, 1, "key-2", "{\"field\":\"value\"}"), + new ConsumerRecord<>("test-topic", 0, 2, null, "null-key-payload"), + new ConsumerRecord<>("test-topic", 0, 3, "key-3", ""), + new ConsumerRecord<>("test-topic", 0, 4, "key-4", "12345"), new ConsumerRecord<>( - "test-topic", 0, 1, "key2", "{\"testKey\": \"testValue2\"}")); + "test-topic", + 0, + 5, + "key-5", + "{\"complex\":{\"nested\":\"value\"}}")); + + // Add headers to each ConsumerRecord + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); + record.headers().add("header-key-" + i, ("header-value-" + i).getBytes()); + } ConsumerRecords consumerRecords = new ConsumerRecords<>(Map.of(new TopicPartition("test-topic", 0), records)); @@ -145,30 +190,104 @@ public void testObserveReadsKafkaValue() { // Mock the KafkaConsumer poll behavior when(mockKafkaConsumer.poll(any(Duration.class))) .thenReturn(consumerRecords) - .thenReturn( - new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls return - // empty + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls + // return empty // Start the queue queue.start(); - // Collect messages from observe - List collectedMessages = new CopyOnWriteArrayList<>(); + // Collect emitted messages + List found = new ArrayList<>(); Observable observable = queue.observe(); assertNotNull(observable); - observable.subscribe(collectedMessages::add); + observable.subscribe(found::add); // Allow polling to run try { - Thread.sleep(1000); + Thread.sleep(1000); // Adjust duration if necessary } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - // Verify results - assertEquals(2, collectedMessages.size()); - assertEquals("{\"testKey\": \"testValue1\"}", collectedMessages.get(0).getPayload()); - assertEquals("{\"testKey\": \"testValue2\"}", collectedMessages.get(1).getPayload()); + // Assert results + assertNotNull(queue); + assertEquals(6, found.size()); // Expect all 6 messages to be processed + + // Validate individual message payloads, keys, and headers in the structured + ObjectMapper objectMapper = new ObjectMapper(); + // JSON format + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); + Message message = found.get(i); + + String expectedPayload = + constructJsonMessage( + objectMapper, + record.key(), + record.headers().toArray().length > 0 + ? extractHeaders(record.headers()) + : Collections.emptyMap(), + record.value()); + + assertEquals(expectedPayload, message.getPayload()); + } + } + + private Map extractHeaders(Headers headers) { + Map headerMap = new HashMap<>(); + headers.forEach(header -> headerMap.put(header.key(), new String(header.value()))); + return headerMap; + } + + private String constructJsonMessage( + ObjectMapper objectMapper, String key, Map headers, String payload) { + StringBuilder json = new StringBuilder(); + json.append("{"); + json.append("\"key\":\"").append(key != null ? key : "").append("\","); + + // Serialize headers to JSON, handling potential errors + String headersJson = toJson(objectMapper, headers); + if (headersJson != null) { + json.append("\"headers\":").append(headersJson).append(","); + } else { + json.append("\"headers\":{}") + .append(","); // Default to an empty JSON object if headers are invalid + } + + json.append("\"payload\":"); + + // Detect if the payload is valid JSON + if (isJsonValid(objectMapper, payload)) { + json.append(payload); // Embed JSON object directly + } else { + json.append(payload != null ? "\"" + payload + "\"" : "null"); // Treat as plain text + } + + json.append("}"); + return json.toString(); + } + + private boolean isJsonValid(ObjectMapper objectMapper, String json) { + if (json == null || json.isEmpty()) { + return false; + } + try { + objectMapper.readTree(json); // Parses the JSON to check validity + return true; + } catch (JsonProcessingException e) { + return false; + } + } + + protected String toJson(ObjectMapper objectMapper, Object value) { + if (value == null) { + return null; + } + try { + return objectMapper.writeValueAsString(value); + } catch (JsonProcessingException ex) { + return null; + } } @Test From fe40298f2f33831d32968c37aeab0ee76d800266 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 12:24:59 +0900 Subject: [PATCH 08/12] Updated readme with kafka event data format --- kafka-event-queue/README.md | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md index 690e71059..ba5a6a592 100644 --- a/kafka-event-queue/README.md +++ b/kafka-event-queue/README.md @@ -20,9 +20,10 @@ Provides ability to consume messages from Kafka. ## Usage -To use it in an event handler prefix the event with `kafka` followed by the topic. +To use it in an event handler prefix the event with `kafka` followed by the topic. Example: + ```json { "name": "kafka_test_event_handler", @@ -33,7 +34,7 @@ Example: "start_workflow": { "name": "workflow_triggered_by_kafka", "input": { - "inlineValue": 1 + "payload": "${payload}" } }, "expandInlineJSON": true @@ -43,6 +44,27 @@ Example: } ``` +The data from the kafka event has the format: + +```json +{ + "key": "key-1", + "headers": { + "header-1": "value1" + }, + "payload": { + "first": "Marcelo", + "middle": "Billie", + "last": "Mertz" + } +} +``` + +* `key` is the key field in Kafka message. +* `headers` is the headers in the kafka message. +* `payload` is the message of the Kafka message. + +To access them in the event handler use for example `"${payload}"` to access the payload property, which contains the kafka message data. ## Configuration From d06262b0f1b06eea395b102d65021866084a8409 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 12:36:29 +0900 Subject: [PATCH 09/12] Added queue metrics --- .../conductor/kafkaeq/eventqueue/KafkaObservableQueue.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index f785aa302..d28b2e1d2 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -32,6 +32,7 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; +import com.netflix.conductor.metrics.Monitors; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -164,12 +165,16 @@ public Observable observe() { } } + Monitors.recordEventQueueMessagesProcessed( + QUEUE_TYPE, this.topic, messages.size()); return Observable.from(messages); } catch (Exception e) { LOGGER.error( "Error while polling Kafka for topic: {}", topic, e); + Monitors.recordObservableQMessageReceivedErrors( + QUEUE_TYPE); return Observable.error(e); } }) From 1f1ba1dd99ae9c329a1f2162480425fa3e69d227 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 12:36:45 +0900 Subject: [PATCH 10/12] Updated docs to add kafka --- docs/documentation/configuration/eventhandlers.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/documentation/configuration/eventhandlers.md b/docs/documentation/configuration/eventhandlers.md index 4a0c6edc8..f02b0c1f1 100644 --- a/docs/documentation/configuration/eventhandlers.md +++ b/docs/documentation/configuration/eventhandlers.md @@ -3,13 +3,13 @@ Eventing in Conductor provides for loose coupling between workflows and support This includes: -1. Being able to produce an event (message) in an external system like SQS or internal to Conductor. +1. Being able to produce an event (message) in an external system like SQS, Kafka or internal to Conductor. 2. Start a workflow when a specific event occurs that matches the provided criteria. Conductor provides SUB_WORKFLOW task that can be used to embed a workflow inside parent workflow. Eventing supports provides similar capability without explicitly adding dependencies and provides **fire-and-forget** style integrations. ## Event Task -Event task provides ability to publish an event (message) to either Conductor or an external eventing system like SQS. Event tasks are useful for creating event based dependencies for workflows and tasks. +Event task provides ability to publish an event (message) to either Conductor or an external eventing system like SQS or Kafka. Event tasks are useful for creating event based dependencies for workflows and tasks. See [Event Task](workflowdef/systemtasks/event-task.md) for documentation. @@ -20,7 +20,7 @@ Event handlers are listeners registered that executes an action when a matching 2. Fail a Task 3. Complete a Task -Event Handlers can be configured to listen to Conductor Events or an external event like SQS. +Event Handlers can be configured to listen to Conductor Events or an external event like SQS or Kafka. ## Configuration Event Handlers are configured via ```/event/``` APIs. From 51b3b7dee572cca053a429aeda5b30945c45ea57 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Thu, 5 Dec 2024 12:23:56 +0900 Subject: [PATCH 11/12] Fix dynamic queue addition when new Event handler is added at runtime. Removed conductor.event-queues.kafka.topic property as it is not used. --- kafka-event-queue/README.md | 3 - .../config/KafkaEventQueueProperties.java | 11 --- .../eventqueue/KafkaObservableQueue.java | 80 +++++++++++++++++-- .../eventqueue/KafkaObservableQueueTest.java | 60 ++++++++++++++ 4 files changed, 134 insertions(+), 20 deletions(-) diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md index ba5a6a592..5c9e71719 100644 --- a/kafka-event-queue/README.md +++ b/kafka-event-queue/README.md @@ -83,9 +83,6 @@ conductor.default-event-queue.type=kafka # the bootstrap server ot use. conductor.event-queues.kafka.bootstrap-servers=kafka:29092 -# The topic to listen to -conductor.event-queues.kafka.topic=conductor-event - # The dead letter queue to use for events that had some error. conductor.event-queues.kafka.dlq-topic=conductor-dlq diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java index b41ed7e75..2da3c81ff 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java @@ -31,9 +31,6 @@ public class KafkaEventQueueProperties { @NotBlank(message = "Bootstrap servers must not be blank") private String bootstrapServers = "kafka:29092"; - /** Primary topic for the Kafka queue. */ - private String topic = "conductor-event"; - /** Dead Letter Queue (DLQ) topic for failed messages. */ private String dlqTopic = "conductor-dlq"; @@ -58,14 +55,6 @@ public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - public String getDlqTopic() { return dlqTopic; } diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index d28b2e1d2..8a65692f8 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -14,6 +14,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.admin.AdminClient; @@ -368,11 +369,16 @@ public void setUnackTimeout(Message message, long unackTimeout) { @Override public long size() { + if (topicExists(this.topic) == false) { + LOGGER.info("Topic '{}' not available, will refresh metadata.", this.topic); + refreshMetadata(this.topic); + } + long topicSize = getTopicSizeUsingAdminClient(); if (topicSize != -1) { - LOGGER.info("Topic size for 'conductor-event': {}", topicSize); + LOGGER.info("Topic size for '{}': {}", this.topic, topicSize); } else { - LOGGER.error("Failed to fetch topic size for 'conductor-event'"); + LOGGER.error("Failed to fetch topic size for '{}'", this.topic); } return topicSize; @@ -380,7 +386,6 @@ public long size() { private long getTopicSizeUsingAdminClient() { try { - // Fetch metadata for the topic asynchronously KafkaFuture topicDescriptionFuture = adminClient .describeTopics(Collections.singletonList(topic)) @@ -397,17 +402,28 @@ private long getTopicSizeUsingAdminClient() { } // Fetch offsets asynchronously + KafkaFuture> + offsetsFuture = adminClient.listOffsets(offsetRequest).all(); + Map offsets = - adminClient.listOffsets(offsetRequest).all().get(); + offsetsFuture.get(); // Calculate total size by summing offsets return offsets.values().stream() .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset) .sum(); + } catch (ExecutionException e) { + if (e.getCause() + instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) { + LOGGER.warn("Topic '{}' does not exist or partitions unavailable.", topic); + } else { + LOGGER.error("Error fetching offsets for topic '{}': {}", topic, e.getMessage()); + } } catch (Exception e) { - LOGGER.error("Error fetching topic size using AdminClient for topic: {}", topic, e); - return -1; + LOGGER.error( + "General error fetching offsets for topic '{}': {}", topic, e.getMessage()); } + return -1; } @Override @@ -560,4 +576,56 @@ public KafkaObservableQueue build(final String topic) { } } } + + private boolean topicExists(String topic) { + try { + KafkaFuture future = + adminClient + .describeTopics(Collections.singletonList(topic)) + .topicNameValues() + .get(topic); + + future.get(); // Attempt to fetch metadata + return true; + } catch (ExecutionException e) { + if (e.getCause() + instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) { + LOGGER.warn("Topic '{}' does not exist.", topic); + return false; + } + LOGGER.error("Error checking if topic '{}' exists: {}", topic, e.getMessage()); + return false; + } catch (Exception e) { + LOGGER.error("General error checking if topic '{}' exists: {}", topic, e.getMessage()); + return false; + } + } + + private void refreshMetadata(String topic) { + adminClient + .describeTopics(Collections.singletonList(topic)) + .topicNameValues() + .get(topic) + .whenComplete( + (topicDescription, exception) -> { + if (exception != null) { + if (exception.getCause() + instanceof + org.apache.kafka.common.errors + .UnknownTopicOrPartitionException) { + LOGGER.warn("Topic '{}' still does not exist.", topic); + } else { + LOGGER.error( + "Error refreshing metadata for topic '{}': {}", + topic, + exception.getMessage()); + } + } else { + LOGGER.info( + "Metadata refreshed for topic '{}': Partitions = {}", + topic, + topicDescription.partitions()); + } + }); + } } diff --git a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java index 8a6f98ff0..01ac9701b 100644 --- a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java +++ b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java @@ -415,6 +415,66 @@ public void testSize() throws Exception { assertEquals(10, size); // As we mocked 10 as the offset in the partition } + @Test + public void testSizeWhenTopicExists() throws Exception { + // Mock topic description + TopicDescription topicDescription = + new TopicDescription( + "test-topic", + false, + List.of(new TopicPartitionInfo(0, null, List.of(), List.of()))); + + // Mock offsets + Map offsets = + Map.of( + new TopicPartition("test-topic", 0), + new ListOffsetsResult.ListOffsetsResultInfo( + 10L, // Offset value + 0L, // Log append time (can be 0 if not available) + Optional.empty() // Leader epoch (optional, use a default value like + // 100) + )); + + // Mock AdminClient behavior for describeTopics + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of("test-topic", KafkaFuture.completedFuture(topicDescription))); + when(mockAdminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Mock AdminClient behavior for listOffsets + ListOffsetsResult mockListOffsetsResult = mock(ListOffsetsResult.class); + when(mockListOffsetsResult.all()).thenReturn(KafkaFuture.completedFuture(offsets)); + when(mockAdminClient.listOffsets(anyMap())).thenReturn(mockListOffsetsResult); + + // Call size + long size = queue.size(); + + // Verify + assertEquals(10L, size); + } + + @Test + public void testSizeWhenTopicDoesNotExist() throws Exception { + // Mock KafkaFuture to simulate a topic-not-found exception + KafkaFuture failedFuture = mock(KafkaFuture.class); + when(failedFuture.get()) + .thenThrow( + new org.apache.kafka.common.errors.UnknownTopicOrPartitionException( + "Topic not found")); + + // Mock DescribeTopicsResult + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of("test-topic", failedFuture)); + when(mockAdminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Call size + long size = queue.size(); + + // Verify the result + assertEquals(-1L, size); // Return -1 for non-existent topics + } + @Test public void testLifecycle() { queue.start(); From 16b8ccfe06b9e7fb27711d7323e41691d7f64bb3 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Thu, 5 Dec 2024 13:17:43 +0900 Subject: [PATCH 12/12] Handling thread sync when evenhandler is updated with a new queue --- .../eventqueue/KafkaObservableQueue.java | 51 ++++++++++++------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index 8a65692f8..4c798cf6d 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -456,7 +456,7 @@ public void start() { } @Override - public void stop() { + public synchronized void stop() { LOGGER.info("Kafka consumer stopping for topic: {}", topic); if (!running) { LOGGER.warn("KafkaObservableQueue is already stopped for topic: {}", topic); @@ -488,39 +488,56 @@ public void stop() { } private void retryCloseConsumer() { - int retries = 3; - while (retries > 0) { + int attempts = 3; + while (attempts > 0) { try { + kafkaConsumer.unsubscribe(); kafkaConsumer.close(); - LOGGER.info("Kafka consumer closed successfully on retry."); - return; + LOGGER.info("Kafka consumer stopped for topic: {}", topic); + return; // Exit if successful } catch (Exception e) { - retries--; LOGGER.warn( - "Retry failed to close Kafka consumer. Remaining attempts: {}", retries, e); - if (retries == 0) { - LOGGER.error("Exhausted retries for closing Kafka consumer."); + "Error stopping Kafka consumer for topic: {}, attempts remaining: {}", + topic, + attempts - 1, + e); + attempts--; + try { + Thread.sleep(1000); // Wait before retrying + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.error("Thread interrupted during Kafka consumer shutdown retries"); + break; } } } + LOGGER.error("Failed to stop Kafka consumer for topic: {} after retries", topic); } private void retryCloseProducer() { - int retries = 3; - while (retries > 0) { + int attempts = 3; + while (attempts > 0) { try { kafkaProducer.close(); - LOGGER.info("Kafka producer closed successfully on retry."); - return; + LOGGER.info("Kafka producer stopped for topic: {}", topic); + return; // Exit if successful } catch (Exception e) { - retries--; LOGGER.warn( - "Retry failed to close Kafka producer. Remaining attempts: {}", retries, e); - if (retries == 0) { - LOGGER.error("Exhausted retries for closing Kafka producer."); + "Error stopping Kafka producer for topic: {}, attempts remaining: {}", + topic, + attempts - 1, + e); + attempts--; + try { + Thread.sleep(1000); // Wait before retrying + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.error("Thread interrupted during Kafka producer shutdown retries"); + break; } } } + LOGGER.error("Failed to stop Kafka producer for topic: {} after retries", topic); } @Override