From f01f7f553e7ba21c2ac0c8500c5a49a948d5fa90 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Fri, 29 Nov 2024 16:03:19 +0900 Subject: [PATCH 01/21] Added support for kafka event queue --- kafka-event-queue/README.md | 24 + kafka-event-queue/build.gradle | 40 ++ .../config/KafkaEventQueueConfiguration.java | 102 ++++ .../config/KafkaEventQueueProperties.java | 197 ++++++++ .../config/KafkaEventQueueProvider.java | 47 ++ .../eventqueue/KafkaObservableQueue.java | 459 ++++++++++++++++++ ...itional-spring-configuration-metadata.json | 27 ++ server/build.gradle | 1 + settings.gradle | 1 + ...itional-spring-configuration-metadata.json | 10 + 10 files changed, 908 insertions(+) create mode 100644 kafka-event-queue/README.md create mode 100644 kafka-event-queue/build.gradle create mode 100644 kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java create mode 100644 kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java create mode 100644 kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java create mode 100644 kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java create mode 100644 kafka-event-queue/src/main/resources/META-INF/additional-spring-configuration-metadata.json diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md new file mode 100644 index 000000000..bb2ea5390 --- /dev/null +++ b/kafka-event-queue/README.md @@ -0,0 +1,24 @@ +# Event Queue +## Published Artifacts + +Group: `com.netflix.conductor` + +| Published Artifact | Description | +| ----------- | ----------- | +| conductor-kafka-event-queue | Support for integration with Kafka and consume events from it. | + +## Modules +### Kafka +https://kafka.apache.org/ + +Provides ability to publish and consume messages from Kafka +#### Configuration +(Default values shown below) +```properties +conductor.event-queues.kafka.enabled=true +conductor.event-queues.kafka.bootstrap-servers=kafka:29092 +conductor.event-queues.kafka.groupId=conductor-consumers +conductor.event-queues.kafka.topic=conductor-event +conductor.event-queues.kafka.dlqTopic=conductor-dlq +conductor.event-queues.kafka.pollTimeDuration=100 +``` diff --git a/kafka-event-queue/build.gradle b/kafka-event-queue/build.gradle new file mode 100644 index 000000000..7aeabf6be --- /dev/null +++ b/kafka-event-queue/build.gradle @@ -0,0 +1,40 @@ +plugins { + id 'java' +} + +dependencies { + // Core Conductor dependencies + implementation project(':conductor-common') + implementation project(':conductor-core') + + // Spring Boot support + implementation 'org.springframework.boot:spring-boot-starter' + + // Apache Commons Lang for utility classes + implementation 'org.apache.commons:commons-lang3' + + // Reactive programming support with RxJava + implementation "io.reactivex:rxjava:${revRxJava}" + + // SBMTODO: Remove Guava dependency if possible + // Guava should only be included if specifically needed + implementation "com.google.guava:guava:${revGuava}" + + // Removed AWS SQS SDK as we are transitioning to Kafka + // implementation "com.amazonaws:aws-java-sdk-sqs:${revAwsSdk}" + + // Test dependencies + testImplementation 'org.springframework.boot:spring-boot-starter-test' + testImplementation project(':conductor-common').sourceSets.test.output + + // Add Kafka client dependency + implementation 'org.apache.kafka:kafka-clients:3.5.1' + + // Add SLF4J API for logging + implementation 'org.slf4j:slf4j-api:2.0.9' + + // Add SLF4J binding for logging with Logback + runtimeOnly 'ch.qos.logback:logback-classic:1.4.11' +} + + diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java new file mode 100644 index 000000000..810e40cf7 --- /dev/null +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueConfiguration.java @@ -0,0 +1,102 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.config; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; + +import org.apache.commons.lang3.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.netflix.conductor.core.config.ConductorProperties; +import com.netflix.conductor.core.events.EventQueueProvider; +import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.kafkaeq.eventqueue.KafkaObservableQueue.Builder; +import com.netflix.conductor.model.TaskModel.Status; + +@Configuration +@EnableConfigurationProperties(KafkaEventQueueProperties.class) +@ConditionalOnProperty(name = "conductor.event-queues.kafka.enabled", havingValue = "true") +public class KafkaEventQueueConfiguration { + + @Autowired private KafkaEventQueueProperties kafkaProperties; + + private static final Logger LOGGER = + LoggerFactory.getLogger(KafkaEventQueueConfiguration.class); + + public KafkaEventQueueConfiguration(KafkaEventQueueProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + @Bean + public EventQueueProvider kafkaEventQueueProvider() { + return new KafkaEventQueueProvider(kafkaProperties); + } + + @ConditionalOnProperty( + name = "conductor.default-event-queue.type", + havingValue = "kafka", + matchIfMissing = false) + @Bean + public Map getQueues( + ConductorProperties conductorProperties, KafkaEventQueueProperties properties) { + try { + + LOGGER.debug( + "Starting to create KafkaObservableQueues with properties: {}", properties); + + String stack = + Optional.ofNullable(conductorProperties.getStack()) + .filter(stackName -> stackName.length() > 0) + .map(stackName -> stackName + "_") + .orElse(""); + + LOGGER.debug("Using stack: {}", stack); + + Status[] statuses = new Status[] {Status.COMPLETED, Status.FAILED}; + Map queues = new HashMap<>(); + + for (Status status : statuses) { + // Log the status being processed + LOGGER.debug("Processing status: {}", status); + + String queuePrefix = + StringUtils.isBlank(properties.getListenerQueuePrefix()) + ? conductorProperties.getAppId() + "_kafka_notify_" + stack + : properties.getListenerQueuePrefix(); + + LOGGER.debug("queuePrefix: {}", queuePrefix); + + String topicName = queuePrefix + status.name(); + + LOGGER.debug("topicName: {}", topicName); + + final ObservableQueue queue = new Builder(properties).build(topicName); + queues.put(status, queue); + } + + LOGGER.debug("Successfully created queues: {}", queues); + return queues; + } catch (Exception e) { + LOGGER.error("Failed to create KafkaObservableQueues", e); + throw new RuntimeException("Failed to getQueues on KafkaEventQueueConfiguration", e); + } + } +} diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java new file mode 100644 index 000000000..b41ed7e75 --- /dev/null +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java @@ -0,0 +1,197 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.config; + +import java.time.Duration; +import java.util.HashMap; +import java.util.Map; + +import org.apache.kafka.clients.admin.AdminClientConfig; +import org.apache.kafka.clients.consumer.ConsumerConfig; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.logging.log4j.core.config.plugins.validation.constraints.NotBlank; +import org.springframework.boot.context.properties.ConfigurationProperties; +import org.springframework.validation.annotation.Validated; + +@ConfigurationProperties("conductor.event-queues.kafka") +@Validated +public class KafkaEventQueueProperties { + + /** Kafka bootstrap servers (comma-separated). */ + @NotBlank(message = "Bootstrap servers must not be blank") + private String bootstrapServers = "kafka:29092"; + + /** Primary topic for the Kafka queue. */ + private String topic = "conductor-event"; + + /** Dead Letter Queue (DLQ) topic for failed messages. */ + private String dlqTopic = "conductor-dlq"; + + /** Prefix for dynamically created Kafka topics, if applicable. */ + private String listenerQueuePrefix = ""; + + /** The polling interval for Kafka (in milliseconds). */ + private Duration pollTimeDuration = Duration.ofMillis(100); + + /** Additional properties for consumers, producers, and admin clients. */ + private Map consumer = new HashMap<>(); + + private Map producer = new HashMap<>(); + private Map admin = new HashMap<>(); + + // Getters and setters + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public String getTopic() { + return topic; + } + + public void setTopic(String topic) { + this.topic = topic; + } + + public String getDlqTopic() { + return dlqTopic; + } + + public void setDlqTopic(String dlqTopic) { + this.dlqTopic = dlqTopic; + } + + public String getListenerQueuePrefix() { + return listenerQueuePrefix; + } + + public void setListenerQueuePrefix(String listenerQueuePrefix) { + this.listenerQueuePrefix = listenerQueuePrefix; + } + + public Duration getPollTimeDuration() { + return pollTimeDuration; + } + + public void setPollTimeDuration(Duration pollTimeDuration) { + this.pollTimeDuration = pollTimeDuration; + } + + public Map getConsumer() { + return consumer; + } + + public void setConsumer(Map consumer) { + this.consumer = consumer; + } + + public Map getProducer() { + return producer; + } + + public void setProducer(Map producer) { + this.producer = producer; + } + + public Map getAdmin() { + return admin; + } + + public void setAdmin(Map admin) { + this.admin = admin; + } + + /** + * Generates configuration properties for Kafka consumers. Maps against `ConsumerConfig` keys. + */ + public Map toConsumerConfig() { + Map config = mapProperties(ConsumerConfig.configNames(), consumer); + // Ensure key.deserializer and value.deserializer are always set + setDefaultIfNullOrEmpty( + config, + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + setDefaultIfNullOrEmpty( + config, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringDeserializer"); + + setDefaultIfNullOrEmpty(config, ConsumerConfig.GROUP_ID_CONFIG, "conductor-group"); + setDefaultIfNullOrEmpty(config, ConsumerConfig.CLIENT_ID_CONFIG, "consumer-client"); + return config; + } + + /** + * Generates configuration properties for Kafka producers. Maps against `ProducerConfig` keys. + */ + public Map toProducerConfig() { + Map config = mapProperties(ProducerConfig.configNames(), producer); + setDefaultIfNullOrEmpty( + config, + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + setDefaultIfNullOrEmpty( + config, + ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, + "org.apache.kafka.common.serialization.StringSerializer"); + + setDefaultIfNullOrEmpty(config, ProducerConfig.CLIENT_ID_CONFIG, "admin-client"); + return config; + } + + /** + * Generates configuration properties for Kafka AdminClient. Maps against `AdminClientConfig` + * keys. + */ + public Map toAdminConfig() { + Map config = mapProperties(AdminClientConfig.configNames(), admin); + setDefaultIfNullOrEmpty(config, ConsumerConfig.CLIENT_ID_CONFIG, "admin-client"); + return config; + } + + /** + * Filters and maps properties based on the allowed keys for a specific Kafka client + * configuration. + * + * @param allowedKeys The keys allowed for the specific Kafka client configuration. + * @param inputProperties The user-specified properties to filter. + * @return A filtered map containing only valid properties. + */ + private Map mapProperties( + Iterable allowedKeys, Map inputProperties) { + Map config = new HashMap<>(); + for (String key : allowedKeys) { + if (inputProperties.containsKey(key)) { + config.put(key, inputProperties.get(key)); + } + } + + setDefaultIfNullOrEmpty( + config, AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers); // Ensure + // bootstrapServers + // is + // always added + return config; + } + + private void setDefaultIfNullOrEmpty( + Map config, String key, String defaultValue) { + Object value = config.get(key); + if (value == null || (value instanceof String && ((String) value).isBlank())) { + config.put(key, defaultValue); + } + } +} diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java new file mode 100644 index 000000000..af54a7447 --- /dev/null +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProvider.java @@ -0,0 +1,47 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.config; + +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.events.EventQueueProvider; +import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.kafkaeq.eventqueue.KafkaObservableQueue.Builder; + +public class KafkaEventQueueProvider implements EventQueueProvider { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventQueueProvider.class); + + private final Map queues = new ConcurrentHashMap<>(); + private final KafkaEventQueueProperties properties; + + public KafkaEventQueueProvider(KafkaEventQueueProperties properties) { + this.properties = properties; + } + + @Override + public String getQueueType() { + return "kafka"; + } + + @Override + public ObservableQueue getQueue(String queueURI) { + LOGGER.info("Creating KafkaObservableQueue for topic: {}", queueURI); + + return queues.computeIfAbsent(queueURI, q -> new Builder(properties).build(queueURI)); + } +} diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java new file mode 100644 index 000000000..91f7d7071 --- /dev/null +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -0,0 +1,459 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.eventqueue; + +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.TimeUnit; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.OffsetSpec; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.core.events.queue.ObservableQueue; +import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; + +import rx.Observable; +import rx.subscriptions.Subscriptions; + +public class KafkaObservableQueue implements ObservableQueue { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaObservableQueue.class); + private static final String QUEUE_TYPE = "kafka"; + + private final String topic; + private volatile AdminClient adminClient; + private final Consumer kafkaConsumer; + private final Producer kafkaProducer; + private final long pollTimeInMS; + private final String dlqTopic; + private final boolean autoCommitEnabled; + private final Map unacknowledgedMessages = new ConcurrentHashMap<>(); + private volatile boolean running = false; + private final KafkaEventQueueProperties properties; + + public KafkaObservableQueue( + String topic, + Properties consumerConfig, + Properties producerConfig, + Properties adminConfig, + KafkaEventQueueProperties properties) { + this.topic = topic; + this.kafkaConsumer = new KafkaConsumer<>(consumerConfig); + this.kafkaProducer = new KafkaProducer<>(producerConfig); + this.properties = properties; + this.pollTimeInMS = properties.getPollTimeDuration().toMillis(); + this.dlqTopic = properties.getDlqTopic(); + this.autoCommitEnabled = + Boolean.parseBoolean( + consumerConfig + .getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .toString()); + + this.adminClient = AdminClient.create(adminConfig); + } + + @Override + public Observable observe() { + return Observable.create( + subscriber -> { + Observable interval = + Observable.interval(pollTimeInMS, TimeUnit.MILLISECONDS); + + interval.flatMap( + (Long x) -> { + if (!isRunning()) { + return Observable.from(Collections.emptyList()); + } + + try { + ConsumerRecords records = + kafkaConsumer.poll( + this.properties.getPollTimeDuration()); + List messages = new ArrayList<>(); + + for (ConsumerRecord record : records) { + try { + Message message = + new Message( + record.partition() + + "-" + + record + .offset(), // Message ID based on partition and + // offset + record.value(), + null); + String messageId = + record.partition() + + "-" + + record.offset(); + unacknowledgedMessages.put( + messageId, record.offset()); + messages.add(message); + } catch (Exception e) { + LOGGER.error( + "Failed to process record from Kafka: {}", + record, + e); + } + } + + return Observable.from(messages); + } catch (Exception e) { + LOGGER.error( + "Error while polling Kafka for topic: {}", + topic, + e); + return Observable.error(e); + } + }) + .subscribe(subscriber::onNext, subscriber::onError); + + subscriber.add(Subscriptions.create(this::stop)); + }); + } + + @Override + public List ack(List messages) { + // If autocommit is enabled we do not run this code. + if (autoCommitEnabled == true) { + LOGGER.info("Auto commit is enabled. Skipping manual acknowledgment."); + return List.of(); + } + + Map offsetsToCommit = new HashMap<>(); + List failedAcks = new ArrayList<>(); // Collect IDs of failed messages + + for (Message message : messages) { + String messageId = message.getId(); + if (unacknowledgedMessages.containsKey(messageId)) { + try { + String[] parts = messageId.split("-"); + + if (parts.length != 2) { + throw new IllegalArgumentException( + "Invalid message ID format: " + messageId); + } + + // Extract partition and offset from messageId + int partition = Integer.parseInt(parts[0]); + long offset = Long.parseLong(parts[1]); + + // Remove message + unacknowledgedMessages.remove(messageId); + + TopicPartition tp = new TopicPartition(topic, partition); + + LOGGER.debug( + "Parsed messageId: {}, topic: {}, partition: {}, offset: {}", + messageId, + topic, + partition, + offset); + offsetsToCommit.put(tp, new OffsetAndMetadata(offset + 1)); + } catch (Exception e) { + LOGGER.error("Failed to prepare acknowledgment for message: {}", messageId, e); + failedAcks.add(messageId); // Add to failed list if exception occurs + } + } else { + LOGGER.warn("Message ID not found in unacknowledged messages: {}", messageId); + failedAcks.add(messageId); // Add to failed list if not found + } + } + + try { + LOGGER.debug("Committing offsets: {}", offsetsToCommit); + + kafkaConsumer.commitSync(offsetsToCommit); // Commit all collected offsets + } catch (CommitFailedException e) { + LOGGER.warn("Offset commit failed: {}", e.getMessage()); + } catch (OffsetOutOfRangeException e) { + LOGGER.error( + "OffsetOutOfRangeException encountered for topic {}: {}", + e.partitions(), + e.getMessage()); + + // Reset offsets for the out-of-range partition + Map offsetsToReset = new HashMap<>(); + for (TopicPartition partition : e.partitions()) { + long newOffset = + kafkaConsumer.position(partition); // Default to the current position + offsetsToReset.put(partition, new OffsetAndMetadata(newOffset)); + LOGGER.warn("Resetting offset for partition {} to {}", partition, newOffset); + } + + // Commit the new offsets + kafkaConsumer.commitSync(offsetsToReset); + } catch (Exception e) { + LOGGER.error("Failed to commit offsets to Kafka: {}", offsetsToCommit, e); + // Add all message IDs from the current batch to the failed list + failedAcks.addAll(messages.stream().map(Message::getId).toList()); + } + + return failedAcks; // Return IDs of messages that were not successfully acknowledged + } + + @Override + public void nack(List messages) { + for (Message message : messages) { + try { + kafkaProducer.send( + new ProducerRecord<>(dlqTopic, message.getId(), message.getPayload())); + } catch (Exception e) { + LOGGER.error("Failed to send message to DLQ. Message ID: {}", message.getId(), e); + } + } + } + + @Override + public void publish(List messages) { + for (Message message : messages) { + try { + kafkaProducer.send( + new ProducerRecord<>(topic, message.getId(), message.getPayload()), + (metadata, exception) -> { + if (exception != null) { + LOGGER.error( + "Failed to publish message to Kafka. Message ID: {}", + message.getId(), + exception); + } else { + LOGGER.info( + "Message published to Kafka. Topic: {}, Partition: {}, Offset: {}", + metadata.topic(), + metadata.partition(), + metadata.offset()); + } + }); + } catch (Exception e) { + LOGGER.error( + "Error publishing message to Kafka. Message ID: {}", message.getId(), e); + } + } + } + + @Override + public boolean rePublishIfNoAck() { + return false; + } + + @Override + public void setUnackTimeout(Message message, long unackTimeout) { + // Kafka does not support visibility timeout; this can be managed externally if + // needed. + } + + @Override + public long size() { + long topicSize = getTopicSizeUsingAdminClient(); + if (topicSize != -1) { + LOGGER.info("Topic size for 'conductor-event': {}", topicSize); + } else { + LOGGER.error("Failed to fetch topic size for 'conductor-event'"); + } + + return topicSize; + } + + private long getTopicSizeUsingAdminClient() { + try { + // Fetch metadata for the topic asynchronously + KafkaFuture topicDescriptionFuture = + adminClient + .describeTopics(Collections.singletonList(topic)) + .topicNameValues() + .get(topic); + + TopicDescription topicDescription = topicDescriptionFuture.get(); + + // Prepare request for latest offsets + Map offsetRequest = new HashMap<>(); + for (TopicPartitionInfo partition : topicDescription.partitions()) { + TopicPartition tp = new TopicPartition(topic, partition.partition()); + offsetRequest.put(tp, OffsetSpec.latest()); + } + + // Fetch offsets asynchronously + Map offsets = + adminClient.listOffsets(offsetRequest).all().get(); + + // Calculate total size by summing offsets + return offsets.values().stream() + .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset) + .sum(); + } catch (Exception e) { + LOGGER.error("Error fetching topic size using AdminClient for topic: {}", topic, e); + return -1; + } + } + + @Override + public void close() { + try { + stop(); + LOGGER.info("KafkaObservableQueue fully stopped and resources closed."); + } catch (Exception e) { + LOGGER.error("Error during close(): {}", e.getMessage(), e); + } + } + + @Override + public void start() { + LOGGER.info("KafkaObservableQueue starting for topic: {}", topic); + if (running) { + LOGGER.warn("KafkaObservableQueue is already running for topic: {}", topic); + return; + } + + try { + running = true; + kafkaConsumer.subscribe( + Collections.singletonList(topic)); // Subscribe to a single topic + LOGGER.info("KafkaObservableQueue started for topic: {}", topic); + } catch (Exception e) { + running = false; + LOGGER.error("Error starting KafkaObservableQueue for topic: {}", topic, e); + } + } + + @Override + public void stop() { + LOGGER.info("Kafka consumer stopping for topic: {}", topic); + if (!running) { + LOGGER.warn("KafkaObservableQueue is already stopped for topic: {}", topic); + return; + } + + try { + running = false; + + try { + kafkaConsumer.unsubscribe(); + kafkaConsumer.close(); + LOGGER.info("Kafka consumer stopped for topic: {}", topic); + } catch (Exception e) { + LOGGER.error("Error stopping Kafka consumer for topic: {}", topic, e); + retryCloseConsumer(); + } + + try { + kafkaProducer.close(); + LOGGER.info("Kafka producer stopped for topic: {}", topic); + } catch (Exception e) { + LOGGER.error("Error stopping Kafka producer for topic: {}", topic, e); + retryCloseProducer(); + } + } catch (Exception e) { + LOGGER.error("Critical error stopping KafkaObservableQueue for topic: {}", topic, e); + } + } + + private void retryCloseConsumer() { + int retries = 3; + while (retries > 0) { + try { + kafkaConsumer.close(); + LOGGER.info("Kafka consumer closed successfully on retry."); + return; + } catch (Exception e) { + retries--; + LOGGER.warn( + "Retry failed to close Kafka consumer. Remaining attempts: {}", retries, e); + if (retries == 0) { + LOGGER.error("Exhausted retries for closing Kafka consumer."); + } + } + } + } + + private void retryCloseProducer() { + int retries = 3; + while (retries > 0) { + try { + kafkaProducer.close(); + LOGGER.info("Kafka producer closed successfully on retry."); + return; + } catch (Exception e) { + retries--; + LOGGER.warn( + "Retry failed to close Kafka producer. Remaining attempts: {}", retries, e); + if (retries == 0) { + LOGGER.error("Exhausted retries for closing Kafka producer."); + } + } + } + } + + @Override + public String getType() { + return QUEUE_TYPE; + } + + @Override + public String getName() { + return topic; + } + + @Override + public String getURI() { + return "kafka://" + topic; + } + + @Override + public boolean isRunning() { + return running; + } + + public static class Builder { + private final KafkaEventQueueProperties properties; + + public Builder(KafkaEventQueueProperties properties) { + this.properties = properties; + } + + public KafkaObservableQueue build(final String topic) { + Properties consumerConfig = new Properties(); + consumerConfig.putAll(properties.toConsumerConfig()); + + LOGGER.debug("Kafka Consumer Config: {}", consumerConfig); + + Properties producerConfig = new Properties(); + producerConfig.putAll(properties.toProducerConfig()); + + LOGGER.debug("Kafka Producer Config: {}", producerConfig); + + Properties adminConfig = new Properties(); + adminConfig.putAll(properties.toAdminConfig()); + + LOGGER.debug("Kafka Admin Config: {}", adminConfig); + + try { + return new KafkaObservableQueue( + topic, consumerConfig, producerConfig, adminConfig, properties); + } catch (Exception e) { + LOGGER.error("Failed to initialize KafkaObservableQueue for topic: {}", topic, e); + throw new RuntimeException( + "Failed to initialize KafkaObservableQueue for topic: " + topic, e); + } + } + } +} diff --git a/kafka-event-queue/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/kafka-event-queue/src/main/resources/META-INF/additional-spring-configuration-metadata.json new file mode 100644 index 000000000..4351cdb81 --- /dev/null +++ b/kafka-event-queue/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -0,0 +1,27 @@ +{ + "properties": [ + { + "name": "conductor.event-queues.kafka.enabled", + "type": "java.lang.Boolean", + "description": "Enable the use of Kafka implementation to provide queues for consuming events.", + "sourceType": "com.netflix.conductor.kafkaeq.config.KafkaEventQueueConfiguration" + }, + { + "name": "conductor.default-event-queue.type", + "type": "java.lang.String", + "description": "The default event queue type to listen on for the WAIT task.", + "sourceType": "com.netflix.conductor.kafkaeq.config.KafkaEventQueueConfiguration" + } + ], + "hints": [ + { + "name": "conductor.default-event-queue.type", + "values": [ + { + "value": "kafka", + "description": "Use kafka as the event queue to listen on for the WAIT task." + } + ] + } + ] +} \ No newline at end of file diff --git a/server/build.gradle b/server/build.gradle index fdf7646be..1e5e6181d 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -25,6 +25,7 @@ dependencies { implementation project(':conductor-nats') implementation project(':conductor-nats-streaming') implementation project(':conductor-awssqs-event-queue') + implementation project(':conductor-kafka-event-queue') //External Payload Storage implementation project(':conductor-azureblob-storage') diff --git a/settings.gradle b/settings.gradle index 605765ede..41ac7bbf3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -75,6 +75,7 @@ include 'postgres-external-storage' include 'amqp' include 'nats' include 'nats-streaming' +include 'kafka-event-queue' include 'test-harness' diff --git a/workflow-event-listener/src/main/resources/META-INF/additional-spring-configuration-metadata.json b/workflow-event-listener/src/main/resources/META-INF/additional-spring-configuration-metadata.json index 3156ecbef..0096bbb81 100644 --- a/workflow-event-listener/src/main/resources/META-INF/additional-spring-configuration-metadata.json +++ b/workflow-event-listener/src/main/resources/META-INF/additional-spring-configuration-metadata.json @@ -70,6 +70,12 @@ "description": "Enable the use of NATS Streaming implementation to provide queues for consuming events.", "sourceType": "com.netflix.conductor.contribs.queue.nats.config.NATSStreamConfiguration" }, + { + "name": "conductor.event-queues.kafka.enabled", + "type": "java.lang.Boolean", + "description": "Enable the use of Kafka implementation to provide queues for consuming events.", + "sourceType": "com.netflix.conductor.event-queue.kafkaeq.config.KafkaEventQueueConfiguration" + }, { "name": "conductor.default-event-queue.type", "type": "java.lang.String", @@ -108,6 +114,10 @@ { "value": "nats_stream", "description": "Use NATS Stream as the event queue to listen on for the WAIT task." + }, + { + "value": "kafka", + "description": "Use Kafka as the event queue to listen on for the WAIT task." } ] }, From ea0447fea580f204c1cb442cf1ef7421f9685416 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Mon, 2 Dec 2024 14:07:02 +0900 Subject: [PATCH 02/21] Added basic tests --- kafka-event-queue/build.gradle | 12 +- .../eventqueue/KafkaObservableQueue.java | 21 ++ .../eventqueue/KafkaObservableQueueTest.java | 264 ++++++++++++++++++ 3 files changed, 293 insertions(+), 4 deletions(-) create mode 100644 kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java diff --git a/kafka-event-queue/build.gradle b/kafka-event-queue/build.gradle index 7aeabf6be..3b41e131e 100644 --- a/kafka-event-queue/build.gradle +++ b/kafka-event-queue/build.gradle @@ -1,7 +1,3 @@ -plugins { - id 'java' -} - dependencies { // Core Conductor dependencies implementation project(':conductor-common') @@ -26,6 +22,7 @@ dependencies { // Test dependencies testImplementation 'org.springframework.boot:spring-boot-starter-test' testImplementation project(':conductor-common').sourceSets.test.output + // Add Kafka client dependency implementation 'org.apache.kafka:kafka-clients:3.5.1' @@ -37,4 +34,11 @@ dependencies { runtimeOnly 'ch.qos.logback:logback-classic:1.4.11' } +test { + testLogging { + events "passed", "skipped", "failed" + showStandardStreams = true // Enable standard output + } +} + diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index 91f7d7071..33c5a7991 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -72,6 +72,27 @@ public KafkaObservableQueue( this.adminClient = AdminClient.create(adminConfig); } + public KafkaObservableQueue( + String topic, + Consumer kafkaConsumer, + Producer kafkaProducer, + AdminClient adminClient, + KafkaEventQueueProperties properties) { + this.topic = topic; + this.kafkaConsumer = kafkaConsumer; + this.kafkaProducer = kafkaProducer; + this.adminClient = adminClient; + this.properties = properties; + this.pollTimeInMS = properties.getPollTimeDuration().toMillis(); + this.dlqTopic = properties.getDlqTopic(); + this.autoCommitEnabled = + Boolean.parseBoolean( + properties + .toConsumerConfig() + .getOrDefault(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false") + .toString()); + } + @Override public Observable observe() { return Observable.create( diff --git a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java new file mode 100644 index 000000000..e0b9bd913 --- /dev/null +++ b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java @@ -0,0 +1,264 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafkaeq.eventqueue; + +import java.lang.reflect.Field; +import java.time.Duration; +import java.util.*; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.kafka.clients.admin.AdminClient; +import org.apache.kafka.clients.admin.DescribeTopicsResult; +import org.apache.kafka.clients.admin.ListOffsetsResult; +import org.apache.kafka.clients.admin.TopicDescription; +import org.apache.kafka.clients.consumer.*; +import org.apache.kafka.clients.producer.*; +import org.apache.kafka.common.KafkaFuture; +import org.apache.kafka.common.TopicPartition; +import org.apache.kafka.common.TopicPartitionInfo; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.ArgumentCaptor; +import org.mockito.Mock; +import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; + +import com.netflix.conductor.core.events.queue.Message; +import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; + +import rx.Observable; + +import static org.junit.jupiter.api.Assertions.*; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyCollection; +import static org.mockito.ArgumentMatchers.anyMap; +import static org.mockito.Mockito.*; + +@SuppressWarnings("unchecked") +@RunWith(SpringJUnit4ClassRunner.class) +public class KafkaObservableQueueTest { + + private KafkaObservableQueue queue; + + @Mock private volatile MockConsumer mockKafkaConsumer; + + @Mock private volatile MockProducer mockKafkaProducer; + + @Mock private volatile AdminClient mockAdminClient; + + @Mock private volatile KafkaEventQueueProperties mockProperties; + + @Before + public void setUp() throws Exception { + System.out.println("Setup called"); + // Create mock instances + this.mockKafkaConsumer = mock(MockConsumer.class); + this.mockKafkaProducer = mock(MockProducer.class); + this.mockAdminClient = mock(AdminClient.class); + this.mockProperties = mock(KafkaEventQueueProperties.class); + + // Mock KafkaEventQueueProperties behavior + when(this.mockProperties.getPollTimeDuration()).thenReturn(Duration.ofMillis(100)); + when(this.mockProperties.getDlqTopic()).thenReturn("test-dlq"); + + // Create an instance of KafkaObservableQueue with the mocks + queue = + new KafkaObservableQueue( + "test-topic", + this.mockKafkaConsumer, + this.mockKafkaProducer, + this.mockAdminClient, + this.mockProperties); + } + + private void injectMockField(Object target, String fieldName, Object mock) throws Exception { + Field field = target.getClass().getDeclaredField(fieldName); + field.setAccessible(true); + field.set(target, mock); + } + + @Test + public void testObserve() throws Exception { + // Prepare mock consumer records + List> records = + List.of( + new ConsumerRecord<>("test-topic", 0, 0, "key-1", "payload-1"), + new ConsumerRecord<>("test-topic", 0, 1, "key-2", "payload-2")); + + ConsumerRecords consumerRecords = + new ConsumerRecords<>(Map.of(new TopicPartition("test-topic", 0), records)); + + // Mock the KafkaConsumer poll behavior + when(mockKafkaConsumer.poll(any(Duration.class))) + .thenReturn(consumerRecords) + .thenReturn( + new ConsumerRecords<>( + Collections.emptyMap())); // Subsequent polls return empty + + // Start the queue + queue.start(); + + // Collect emitted messages + List found = new ArrayList<>(); + Observable observable = queue.observe(); + assertNotNull(observable); + observable.subscribe(found::add); + + // Allow polling to run + try { + Thread.sleep(1000); // Adjust duration if necessary + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Assert results + assertNotNull(queue); + assertEquals(2, found.size()); + assertEquals("payload-1", found.get(0).getPayload()); + assertEquals("payload-2", found.get(1).getPayload()); + } + + @Test + public void testAck() throws Exception { + Map unacknowledgedMessages = new ConcurrentHashMap<>(); + unacknowledgedMessages.put("0-1", 1L); + injectMockField(queue, "unacknowledgedMessages", unacknowledgedMessages); + + Message message = new Message("0-1", "payload", null); + List messages = List.of(message); + + doNothing().when(mockKafkaConsumer).commitSync(anyMap()); + + List failedAcks = queue.ack(messages); + + assertTrue(failedAcks.isEmpty()); + verify(mockKafkaConsumer, times(1)).commitSync(anyMap()); + } + + @Test + public void testNack() { + // Arrange + Message message = new Message("0-1", "payload", null); + List messages = List.of(message); + + // Simulate the Kafka Producer behavior + doAnswer( + invocation -> { + ProducerRecord record = invocation.getArgument(0); + System.out.println("Simulated record sent: " + record); + return null; // Simulate success + }) + .when(mockKafkaProducer) + .send(any(ProducerRecord.class)); + + // Act + queue.nack(messages); + + // Assert + ArgumentCaptor> captor = + ArgumentCaptor.forClass(ProducerRecord.class); + verify(mockKafkaProducer).send(captor.capture()); + + ProducerRecord actualRecord = captor.getValue(); + System.out.println("Captured Record: " + actualRecord); + + // Verify the captured record matches the expected values + assertEquals("test-dlq", actualRecord.topic()); + assertEquals("0-1", actualRecord.key()); + assertEquals("payload", actualRecord.value()); + } + + @Test + public void testPublish() { + Message message = new Message("key-1", "payload", null); + List messages = List.of(message); + + // Mock the behavior of the producer's send() method + when(mockKafkaProducer.send(any(ProducerRecord.class), any())) + .thenAnswer( + invocation -> { + Callback callback = invocation.getArgument(1); + // Simulate a successful send with mock metadata + callback.onCompletion( + new RecordMetadata( + new TopicPartition( + "test-topic", 0), // Topic and partition + 0, // Base offset + 0, // Log append time + 0, // Create time + 10, // Serialized key size + 100 // Serialized value size + ), + null); + return null; + }); + + // Invoke the publish method + queue.publish(messages); + + // Verify that the producer's send() method was called exactly once + verify(mockKafkaProducer, times(1)).send(any(ProducerRecord.class), any()); + } + + @Test + public void testSize() throws Exception { + // Step 1: Mock TopicDescription + TopicDescription topicDescription = + new TopicDescription( + "test-topic", + false, + List.of( + new TopicPartitionInfo( + 0, null, List.of(), List.of())) // One partition + ); + + // Simulate `describeTopics` returning the TopicDescription + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + KafkaFuture mockFutureTopicDescription = + KafkaFuture.completedFuture(topicDescription); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of("test-topic", mockFutureTopicDescription)); + when(mockAdminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Step 2: Mock Offsets + ListOffsetsResult.ListOffsetsResultInfo offsetInfo = + new ListOffsetsResult.ListOffsetsResultInfo( + 10, // Mock the offset size + 0, // Leader epoch + null // Timestamp + ); + + ListOffsetsResult mockListOffsetsResult = mock(ListOffsetsResult.class); + KafkaFuture> + mockOffsetsFuture = + KafkaFuture.completedFuture( + Map.of(new TopicPartition("test-topic", 0), offsetInfo)); + when(mockListOffsetsResult.all()).thenReturn(mockOffsetsFuture); + when(mockAdminClient.listOffsets(anyMap())).thenReturn(mockListOffsetsResult); + + // Step 3: Call the `size` method + long size = queue.size(); + + // Step 4: Verify the size is correctly calculated + assertEquals(10, size); // As we mocked 10 as the offset in the partition + } + + @Test + public void testLifecycle() { + queue.start(); + assertTrue(queue.isRunning()); + + queue.stop(); + assertFalse(queue.isRunning()); + } +} From 4be828a868fdc8c18217a79e708b20ee51fe7169 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Mon, 2 Dec 2024 14:26:26 +0900 Subject: [PATCH 03/21] Added readme --- kafka-event-queue/README.md | 76 ++++++++++++++++++++++++++++++++++--- 1 file changed, 70 insertions(+), 6 deletions(-) diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md index bb2ea5390..f776be837 100644 --- a/kafka-event-queue/README.md +++ b/kafka-event-queue/README.md @@ -1,4 +1,5 @@ # Event Queue + ## Published Artifacts Group: `com.netflix.conductor` @@ -8,17 +9,80 @@ Group: `com.netflix.conductor` | conductor-kafka-event-queue | Support for integration with Kafka and consume events from it. | ## Modules + ### Kafka + https://kafka.apache.org/ -Provides ability to publish and consume messages from Kafka -#### Configuration -(Default values shown below) +## kafka-event-queue + +Provides ability to consume messages from Kafka + +## Configuration + +To enable the queue use set the following to true. + ```properties conductor.event-queues.kafka.enabled=true +``` + +There are is a set of shared properties these are: + +```properties +# If kafka should be used with event queues like SQS or AMPQ +conductor.default-event-queue.type=kafka + +# the bootstrap server ot use. conductor.event-queues.kafka.bootstrap-servers=kafka:29092 -conductor.event-queues.kafka.groupId=conductor-consumers + +# The topic to listen to conductor.event-queues.kafka.topic=conductor-event -conductor.event-queues.kafka.dlqTopic=conductor-dlq -conductor.event-queues.kafka.pollTimeDuration=100 + +# The dead letter queue to use for events that had some error. +conductor.event-queues.kafka.dlq-topic=conductor-dlq + +# topic prefix combined with conductor.default-event-queue.type +conductor.event-queues.kafka.listener-queue-prefix=conductor_ + +# The polling duration. Start at 500ms and reduce based on how your environment behaves. +conductor.event-queues.kafka.poll-time-duration=500ms +``` + +There are 3 clients that should be configured, there is the Consumer, responsible to consuming messages, Publisher that publishes messages to Kafka and the Admin which handles admin operations. + +The supported properties for the 3 clients are the ones included in `org.apache.kafka:kafka-clients:3.5.1` for each client type. + +## Consumer properties + +Example of consumer settings. + +```properties +conductor.event-queues.kafka.consumer.client.id=consumer-client +conductor.event-queues.kafka.consumer.auto.offset.reset=earliest +conductor.event-queues.kafka.consumer.enable.auto.commit=false +conductor.event-queues.kafka.consumer.fetch.min.bytes=1 +conductor.event-queues.kafka.consumer.max.poll.records=500 +conductor.event-queues.kafka.consumer.group-id=conductor-group +``` + +## Producer properties + +Example of producer settings. + +```properties +conductor.event-queues.kafka.producer.client.id=producer-client +conductor.event-queues.kafka.producer.acks=all +conductor.event-queues.kafka.producer.retries=5 +conductor.event-queues.kafka.producer.batch.size=16384 +conductor.event-queues.kafka.producer.linger.ms=10 +conductor.event-queues.kafka.producer.compression.type=gzip +``` + +## Admin properties + +Example of admin settings. + +```properties +conductor.event-queues.kafka.admin.client.id=admin-client +conductor.event-queues.kafka.admin.connections.max.idle.ms=10000 ``` From 3098dc06d4cd30ddd690f3653ed0de7ab80270d1 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Mon, 2 Dec 2024 14:55:24 +0900 Subject: [PATCH 04/21] Updated read me with usage --- kafka-event-queue/README.md | 28 +++++++++++++++++++++++++++- 1 file changed, 27 insertions(+), 1 deletion(-) diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md index f776be837..690e71059 100644 --- a/kafka-event-queue/README.md +++ b/kafka-event-queue/README.md @@ -16,7 +16,33 @@ https://kafka.apache.org/ ## kafka-event-queue -Provides ability to consume messages from Kafka +Provides ability to consume messages from Kafka. + +## Usage + +To use it in an event handler prefix the event with `kafka` followed by the topic. + +Example: +```json +{ + "name": "kafka_test_event_handler", + "event": "kafka:conductor-event", + "actions": [ + { + "action": "start_workflow", + "start_workflow": { + "name": "workflow_triggered_by_kafka", + "input": { + "inlineValue": 1 + } + }, + "expandInlineJSON": true + } + ], + "active": true +} +``` + ## Configuration From e1ed995c859f4adc198459a1af2174be274c0d21 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Mon, 2 Dec 2024 14:57:34 +0900 Subject: [PATCH 05/21] Consolidated test output to match other modules --- kafka-event-queue/build.gradle | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/kafka-event-queue/build.gradle b/kafka-event-queue/build.gradle index 3b41e131e..5b998696b 100644 --- a/kafka-event-queue/build.gradle +++ b/kafka-event-queue/build.gradle @@ -34,11 +34,11 @@ dependencies { runtimeOnly 'ch.qos.logback:logback-classic:1.4.11' } -test { - testLogging { - events "passed", "skipped", "failed" - showStandardStreams = true // Enable standard output - } -} +// test { +// testLogging { +// events "passed", "skipped", "failed" +// showStandardStreams = true // Enable standard output +// } +// } From 2273d3946b6775205e4148493f982ef20a533330 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 11:13:44 +0900 Subject: [PATCH 06/21] Only create message ID once. Added logging of recived message. Added a new Test for testing JSON in the message. --- .../eventqueue/KafkaObservableQueue.java | 23 +++++---- .../eventqueue/KafkaObservableQueueTest.java | 51 +++++++++++++++++-- 2 files changed, 60 insertions(+), 14 deletions(-) diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index 33c5a7991..28a502e7e 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -114,19 +114,22 @@ public Observable observe() { for (ConsumerRecord record : records) { try { - Message message = - new Message( - record.partition() - + "-" - + record - .offset(), // Message ID based on partition and - // offset - record.value(), - null); String messageId = record.partition() + "-" - + record.offset(); + + record.offset(); // Message ID + // based on + // partition + // and + + String value = record.value(); + LOGGER.debug( + "MessageId: {} value: {}", + messageId, + value); + Message message = + new Message(messageId, value, null); + unacknowledgedMessages.put( messageId, record.offset()); messages.add(message); diff --git a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java index e0b9bd913..4f4aa4fb9 100644 --- a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java +++ b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java @@ -16,6 +16,7 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; @@ -102,8 +103,8 @@ public void testObserve() throws Exception { when(mockKafkaConsumer.poll(any(Duration.class))) .thenReturn(consumerRecords) .thenReturn( - new ConsumerRecords<>( - Collections.emptyMap())); // Subsequent polls return empty + new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls return + // empty // Start the queue queue.start(); @@ -128,6 +129,48 @@ public void testObserve() throws Exception { assertEquals("payload-2", found.get(1).getPayload()); } + @Test + public void testObserveReadsKafkaValue() { + // Prepare mock Kafka records + List> records = + List.of( + new ConsumerRecord<>( + "test-topic", 0, 0, "key1", "{\"testKey\": \"testValue1\"}"), + new ConsumerRecord<>( + "test-topic", 0, 1, "key2", "{\"testKey\": \"testValue2\"}")); + + ConsumerRecords consumerRecords = + new ConsumerRecords<>(Map.of(new TopicPartition("test-topic", 0), records)); + + // Mock the KafkaConsumer poll behavior + when(mockKafkaConsumer.poll(any(Duration.class))) + .thenReturn(consumerRecords) + .thenReturn( + new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls return + // empty + + // Start the queue + queue.start(); + + // Collect messages from observe + List collectedMessages = new CopyOnWriteArrayList<>(); + Observable observable = queue.observe(); + assertNotNull(observable); + observable.subscribe(collectedMessages::add); + + // Allow polling to run + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + + // Verify results + assertEquals(2, collectedMessages.size()); + assertEquals("{\"testKey\": \"testValue1\"}", collectedMessages.get(0).getPayload()); + assertEquals("{\"testKey\": \"testValue2\"}", collectedMessages.get(1).getPayload()); + } + @Test public void testAck() throws Exception { Map unacknowledgedMessages = new ConcurrentHashMap<>(); @@ -191,8 +234,8 @@ public void testPublish() { // Simulate a successful send with mock metadata callback.onCompletion( new RecordMetadata( - new TopicPartition( - "test-topic", 0), // Topic and partition + new TopicPartition("test-topic", 0), // Topic and + // partition 0, // Base offset 0, // Log append time 0, // Create time From 27158292082c19c0fb196c7de945046557715883 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 12:18:44 +0900 Subject: [PATCH 07/21] Improved payload formatting, added key and headers to payload. added a new test for verifying payload --- .../eventqueue/KafkaObservableQueue.java | 89 ++++++++- .../eventqueue/KafkaObservableQueueTest.java | 169 +++++++++++++++--- 2 files changed, 226 insertions(+), 32 deletions(-) diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index 28a502e7e..f785aa302 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -25,6 +25,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.header.Header; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -32,6 +33,8 @@ import com.netflix.conductor.core.events.queue.ObservableQueue; import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import rx.Observable; import rx.subscriptions.Subscriptions; @@ -50,6 +53,7 @@ public class KafkaObservableQueue implements ObservableQueue { private final Map unacknowledgedMessages = new ConcurrentHashMap<>(); private volatile boolean running = false; private final KafkaEventQueueProperties properties; + private final ObjectMapper objectMapper = new ObjectMapper(); public KafkaObservableQueue( String topic, @@ -117,18 +121,37 @@ public Observable observe() { String messageId = record.partition() + "-" - + record.offset(); // Message ID - // based on - // partition - // and - + + record.offset(); + String key = record.key(); String value = record.value(); + Map headers = new HashMap<>(); + + // Extract headers + if (record.headers() != null) { + for (Header header : record.headers()) { + headers.put( + header.key(), + new String(header.value())); + } + } + + // Log the details LOGGER.debug( - "MessageId: {} value: {}", + "Input values MessageId: {} Key: {} Headers: {} Value: {}", messageId, + key, + headers, value); + + // Construct message + String jsonMessage = + constructJsonMessage( + key, headers, value); + LOGGER.debug("Payload: {}", jsonMessage); + Message message = - new Message(messageId, value, null); + new Message( + messageId, jsonMessage, null); unacknowledgedMessages.put( messageId, record.offset()); @@ -156,6 +179,58 @@ public Observable observe() { }); } + private String constructJsonMessage(String key, Map headers, String payload) { + StringBuilder json = new StringBuilder(); + json.append("{"); + json.append("\"key\":\"").append(key != null ? key : "").append("\","); + + // Serialize headers to JSON, handling potential errors + String headersJson = toJson(headers); + if (headersJson != null) { + json.append("\"headers\":").append(headersJson).append(","); + } else { + json.append("\"headers\":{}") + .append(","); // Default to an empty JSON object if headers are invalid + } + + json.append("\"payload\":"); + + // Detect if the payload is valid JSON + if (isJsonValid(payload)) { + json.append(payload); // Embed JSON object directly + } else { + json.append(payload != null ? "\"" + payload + "\"" : "null"); // Treat as plain text + } + + json.append("}"); + return json.toString(); + } + + private boolean isJsonValid(String json) { + if (json == null || json.isEmpty()) { + return false; + } + try { + objectMapper.readTree(json); // Parses the JSON to check validity + return true; + } catch (JsonProcessingException e) { + return false; + } + } + + protected String toJson(Object value) { + if (value == null) { + return null; + } + try { + return objectMapper.writeValueAsString(value); + } catch (JsonProcessingException ex) { + // Log the error and return a placeholder or null + LOGGER.error("Failed to convert object to JSON: {}", value, ex); + return null; + } + } + @Override public List ack(List messages) { // If autocommit is enabled we do not run this code. diff --git a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java index 4f4aa4fb9..8a6f98ff0 100644 --- a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java +++ b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java @@ -16,7 +16,6 @@ import java.time.Duration; import java.util.*; import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.CopyOnWriteArrayList; import org.apache.kafka.clients.admin.AdminClient; import org.apache.kafka.clients.admin.DescribeTopicsResult; @@ -27,6 +26,7 @@ import org.apache.kafka.common.KafkaFuture; import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.TopicPartitionInfo; +import org.apache.kafka.common.header.Headers; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -37,6 +37,8 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; import rx.Observable; import static org.junit.jupiter.api.Assertions.*; @@ -89,12 +91,27 @@ private void injectMockField(Object target, String fieldName, Object mock) throw } @Test - public void testObserve() throws Exception { - // Prepare mock consumer records + public void testObserveWithHeaders() throws Exception { + // Prepare mock consumer records with diverse headers, keys, and payloads List> records = List.of( new ConsumerRecord<>("test-topic", 0, 0, "key-1", "payload-1"), - new ConsumerRecord<>("test-topic", 0, 1, "key-2", "payload-2")); + new ConsumerRecord<>("test-topic", 0, 1, "key-2", "{\"field\":\"value\"}"), + new ConsumerRecord<>("test-topic", 0, 2, null, "null-key-payload"), + new ConsumerRecord<>("test-topic", 0, 3, "key-3", ""), + new ConsumerRecord<>("test-topic", 0, 4, "key-4", "12345"), + new ConsumerRecord<>( + "test-topic", + 0, + 5, + "key-5", + "{\"complex\":{\"nested\":\"value\"}}")); + + // Add headers to each ConsumerRecord + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); + record.headers().add("header-key-" + i, ("header-value-" + i).getBytes()); + } ConsumerRecords consumerRecords = new ConsumerRecords<>(Map.of(new TopicPartition("test-topic", 0), records)); @@ -102,9 +119,8 @@ public void testObserve() throws Exception { // Mock the KafkaConsumer poll behavior when(mockKafkaConsumer.poll(any(Duration.class))) .thenReturn(consumerRecords) - .thenReturn( - new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls return - // empty + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls + // return empty // Start the queue queue.start(); @@ -124,20 +140,49 @@ public void testObserve() throws Exception { // Assert results assertNotNull(queue); - assertEquals(2, found.size()); - assertEquals("payload-1", found.get(0).getPayload()); - assertEquals("payload-2", found.get(1).getPayload()); + assertEquals(6, found.size()); // Expect all 6 messages to be processed + + assertEquals("0-0", found.get(0).getId()); + assertEquals("0-1", found.get(1).getId()); + assertEquals("0-2", found.get(2).getId()); + assertEquals("0-3", found.get(3).getId()); + assertEquals("0-4", found.get(4).getId()); + assertEquals("0-5", found.get(5).getId()); + + // Validate headers + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); + assertNotNull(record.headers()); + assertEquals(1, record.headers().toArray().length); + assertEquals( + "header-value-" + i, + new String(record.headers().lastHeader("header-key-" + i).value())); + } } @Test - public void testObserveReadsKafkaValue() { - // Prepare mock Kafka records + public void testObserveWithComplexPayload() throws Exception { + // Prepare mock consumer records with diverse headers, keys, and payloads List> records = List.of( new ConsumerRecord<>( - "test-topic", 0, 0, "key1", "{\"testKey\": \"testValue1\"}"), + "test-topic", 0, 0, "key-1", "{\"data\":\"payload-1\"}"), + new ConsumerRecord<>("test-topic", 0, 1, "key-2", "{\"field\":\"value\"}"), + new ConsumerRecord<>("test-topic", 0, 2, null, "null-key-payload"), + new ConsumerRecord<>("test-topic", 0, 3, "key-3", ""), + new ConsumerRecord<>("test-topic", 0, 4, "key-4", "12345"), new ConsumerRecord<>( - "test-topic", 0, 1, "key2", "{\"testKey\": \"testValue2\"}")); + "test-topic", + 0, + 5, + "key-5", + "{\"complex\":{\"nested\":\"value\"}}")); + + // Add headers to each ConsumerRecord + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); + record.headers().add("header-key-" + i, ("header-value-" + i).getBytes()); + } ConsumerRecords consumerRecords = new ConsumerRecords<>(Map.of(new TopicPartition("test-topic", 0), records)); @@ -145,30 +190,104 @@ public void testObserveReadsKafkaValue() { // Mock the KafkaConsumer poll behavior when(mockKafkaConsumer.poll(any(Duration.class))) .thenReturn(consumerRecords) - .thenReturn( - new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls return - // empty + .thenReturn(new ConsumerRecords<>(Collections.emptyMap())); // Subsequent polls + // return empty // Start the queue queue.start(); - // Collect messages from observe - List collectedMessages = new CopyOnWriteArrayList<>(); + // Collect emitted messages + List found = new ArrayList<>(); Observable observable = queue.observe(); assertNotNull(observable); - observable.subscribe(collectedMessages::add); + observable.subscribe(found::add); // Allow polling to run try { - Thread.sleep(1000); + Thread.sleep(1000); // Adjust duration if necessary } catch (InterruptedException e) { Thread.currentThread().interrupt(); } - // Verify results - assertEquals(2, collectedMessages.size()); - assertEquals("{\"testKey\": \"testValue1\"}", collectedMessages.get(0).getPayload()); - assertEquals("{\"testKey\": \"testValue2\"}", collectedMessages.get(1).getPayload()); + // Assert results + assertNotNull(queue); + assertEquals(6, found.size()); // Expect all 6 messages to be processed + + // Validate individual message payloads, keys, and headers in the structured + ObjectMapper objectMapper = new ObjectMapper(); + // JSON format + for (int i = 0; i < records.size(); i++) { + ConsumerRecord record = records.get(i); + Message message = found.get(i); + + String expectedPayload = + constructJsonMessage( + objectMapper, + record.key(), + record.headers().toArray().length > 0 + ? extractHeaders(record.headers()) + : Collections.emptyMap(), + record.value()); + + assertEquals(expectedPayload, message.getPayload()); + } + } + + private Map extractHeaders(Headers headers) { + Map headerMap = new HashMap<>(); + headers.forEach(header -> headerMap.put(header.key(), new String(header.value()))); + return headerMap; + } + + private String constructJsonMessage( + ObjectMapper objectMapper, String key, Map headers, String payload) { + StringBuilder json = new StringBuilder(); + json.append("{"); + json.append("\"key\":\"").append(key != null ? key : "").append("\","); + + // Serialize headers to JSON, handling potential errors + String headersJson = toJson(objectMapper, headers); + if (headersJson != null) { + json.append("\"headers\":").append(headersJson).append(","); + } else { + json.append("\"headers\":{}") + .append(","); // Default to an empty JSON object if headers are invalid + } + + json.append("\"payload\":"); + + // Detect if the payload is valid JSON + if (isJsonValid(objectMapper, payload)) { + json.append(payload); // Embed JSON object directly + } else { + json.append(payload != null ? "\"" + payload + "\"" : "null"); // Treat as plain text + } + + json.append("}"); + return json.toString(); + } + + private boolean isJsonValid(ObjectMapper objectMapper, String json) { + if (json == null || json.isEmpty()) { + return false; + } + try { + objectMapper.readTree(json); // Parses the JSON to check validity + return true; + } catch (JsonProcessingException e) { + return false; + } + } + + protected String toJson(ObjectMapper objectMapper, Object value) { + if (value == null) { + return null; + } + try { + return objectMapper.writeValueAsString(value); + } catch (JsonProcessingException ex) { + return null; + } } @Test From fe40298f2f33831d32968c37aeab0ee76d800266 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 12:24:59 +0900 Subject: [PATCH 08/21] Updated readme with kafka event data format --- kafka-event-queue/README.md | 26 ++++++++++++++++++++++++-- 1 file changed, 24 insertions(+), 2 deletions(-) diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md index 690e71059..ba5a6a592 100644 --- a/kafka-event-queue/README.md +++ b/kafka-event-queue/README.md @@ -20,9 +20,10 @@ Provides ability to consume messages from Kafka. ## Usage -To use it in an event handler prefix the event with `kafka` followed by the topic. +To use it in an event handler prefix the event with `kafka` followed by the topic. Example: + ```json { "name": "kafka_test_event_handler", @@ -33,7 +34,7 @@ Example: "start_workflow": { "name": "workflow_triggered_by_kafka", "input": { - "inlineValue": 1 + "payload": "${payload}" } }, "expandInlineJSON": true @@ -43,6 +44,27 @@ Example: } ``` +The data from the kafka event has the format: + +```json +{ + "key": "key-1", + "headers": { + "header-1": "value1" + }, + "payload": { + "first": "Marcelo", + "middle": "Billie", + "last": "Mertz" + } +} +``` + +* `key` is the key field in Kafka message. +* `headers` is the headers in the kafka message. +* `payload` is the message of the Kafka message. + +To access them in the event handler use for example `"${payload}"` to access the payload property, which contains the kafka message data. ## Configuration From d06262b0f1b06eea395b102d65021866084a8409 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 12:36:29 +0900 Subject: [PATCH 09/21] Added queue metrics --- .../conductor/kafkaeq/eventqueue/KafkaObservableQueue.java | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index f785aa302..d28b2e1d2 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -32,6 +32,7 @@ import com.netflix.conductor.core.events.queue.Message; import com.netflix.conductor.core.events.queue.ObservableQueue; import com.netflix.conductor.kafkaeq.config.KafkaEventQueueProperties; +import com.netflix.conductor.metrics.Monitors; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; @@ -164,12 +165,16 @@ public Observable observe() { } } + Monitors.recordEventQueueMessagesProcessed( + QUEUE_TYPE, this.topic, messages.size()); return Observable.from(messages); } catch (Exception e) { LOGGER.error( "Error while polling Kafka for topic: {}", topic, e); + Monitors.recordObservableQMessageReceivedErrors( + QUEUE_TYPE); return Observable.error(e); } }) From 1f1ba1dd99ae9c329a1f2162480425fa3e69d227 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Tue, 3 Dec 2024 12:36:45 +0900 Subject: [PATCH 10/21] Updated docs to add kafka --- docs/documentation/configuration/eventhandlers.md | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/docs/documentation/configuration/eventhandlers.md b/docs/documentation/configuration/eventhandlers.md index 4a0c6edc8..f02b0c1f1 100644 --- a/docs/documentation/configuration/eventhandlers.md +++ b/docs/documentation/configuration/eventhandlers.md @@ -3,13 +3,13 @@ Eventing in Conductor provides for loose coupling between workflows and support This includes: -1. Being able to produce an event (message) in an external system like SQS or internal to Conductor. +1. Being able to produce an event (message) in an external system like SQS, Kafka or internal to Conductor. 2. Start a workflow when a specific event occurs that matches the provided criteria. Conductor provides SUB_WORKFLOW task that can be used to embed a workflow inside parent workflow. Eventing supports provides similar capability without explicitly adding dependencies and provides **fire-and-forget** style integrations. ## Event Task -Event task provides ability to publish an event (message) to either Conductor or an external eventing system like SQS. Event tasks are useful for creating event based dependencies for workflows and tasks. +Event task provides ability to publish an event (message) to either Conductor or an external eventing system like SQS or Kafka. Event tasks are useful for creating event based dependencies for workflows and tasks. See [Event Task](workflowdef/systemtasks/event-task.md) for documentation. @@ -20,7 +20,7 @@ Event handlers are listeners registered that executes an action when a matching 2. Fail a Task 3. Complete a Task -Event Handlers can be configured to listen to Conductor Events or an external event like SQS. +Event Handlers can be configured to listen to Conductor Events or an external event like SQS or Kafka. ## Configuration Event Handlers are configured via ```/event/``` APIs. From 51b3b7dee572cca053a429aeda5b30945c45ea57 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Thu, 5 Dec 2024 12:23:56 +0900 Subject: [PATCH 11/21] Fix dynamic queue addition when new Event handler is added at runtime. Removed conductor.event-queues.kafka.topic property as it is not used. --- kafka-event-queue/README.md | 3 - .../config/KafkaEventQueueProperties.java | 11 --- .../eventqueue/KafkaObservableQueue.java | 80 +++++++++++++++++-- .../eventqueue/KafkaObservableQueueTest.java | 60 ++++++++++++++ 4 files changed, 134 insertions(+), 20 deletions(-) diff --git a/kafka-event-queue/README.md b/kafka-event-queue/README.md index ba5a6a592..5c9e71719 100644 --- a/kafka-event-queue/README.md +++ b/kafka-event-queue/README.md @@ -83,9 +83,6 @@ conductor.default-event-queue.type=kafka # the bootstrap server ot use. conductor.event-queues.kafka.bootstrap-servers=kafka:29092 -# The topic to listen to -conductor.event-queues.kafka.topic=conductor-event - # The dead letter queue to use for events that had some error. conductor.event-queues.kafka.dlq-topic=conductor-dlq diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java index b41ed7e75..2da3c81ff 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/config/KafkaEventQueueProperties.java @@ -31,9 +31,6 @@ public class KafkaEventQueueProperties { @NotBlank(message = "Bootstrap servers must not be blank") private String bootstrapServers = "kafka:29092"; - /** Primary topic for the Kafka queue. */ - private String topic = "conductor-event"; - /** Dead Letter Queue (DLQ) topic for failed messages. */ private String dlqTopic = "conductor-dlq"; @@ -58,14 +55,6 @@ public void setBootstrapServers(String bootstrapServers) { this.bootstrapServers = bootstrapServers; } - public String getTopic() { - return topic; - } - - public void setTopic(String topic) { - this.topic = topic; - } - public String getDlqTopic() { return dlqTopic; } diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index d28b2e1d2..8a65692f8 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -14,6 +14,7 @@ import java.util.*; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; import org.apache.kafka.clients.admin.AdminClient; @@ -368,11 +369,16 @@ public void setUnackTimeout(Message message, long unackTimeout) { @Override public long size() { + if (topicExists(this.topic) == false) { + LOGGER.info("Topic '{}' not available, will refresh metadata.", this.topic); + refreshMetadata(this.topic); + } + long topicSize = getTopicSizeUsingAdminClient(); if (topicSize != -1) { - LOGGER.info("Topic size for 'conductor-event': {}", topicSize); + LOGGER.info("Topic size for '{}': {}", this.topic, topicSize); } else { - LOGGER.error("Failed to fetch topic size for 'conductor-event'"); + LOGGER.error("Failed to fetch topic size for '{}'", this.topic); } return topicSize; @@ -380,7 +386,6 @@ public long size() { private long getTopicSizeUsingAdminClient() { try { - // Fetch metadata for the topic asynchronously KafkaFuture topicDescriptionFuture = adminClient .describeTopics(Collections.singletonList(topic)) @@ -397,17 +402,28 @@ private long getTopicSizeUsingAdminClient() { } // Fetch offsets asynchronously + KafkaFuture> + offsetsFuture = adminClient.listOffsets(offsetRequest).all(); + Map offsets = - adminClient.listOffsets(offsetRequest).all().get(); + offsetsFuture.get(); // Calculate total size by summing offsets return offsets.values().stream() .mapToLong(ListOffsetsResult.ListOffsetsResultInfo::offset) .sum(); + } catch (ExecutionException e) { + if (e.getCause() + instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) { + LOGGER.warn("Topic '{}' does not exist or partitions unavailable.", topic); + } else { + LOGGER.error("Error fetching offsets for topic '{}': {}", topic, e.getMessage()); + } } catch (Exception e) { - LOGGER.error("Error fetching topic size using AdminClient for topic: {}", topic, e); - return -1; + LOGGER.error( + "General error fetching offsets for topic '{}': {}", topic, e.getMessage()); } + return -1; } @Override @@ -560,4 +576,56 @@ public KafkaObservableQueue build(final String topic) { } } } + + private boolean topicExists(String topic) { + try { + KafkaFuture future = + adminClient + .describeTopics(Collections.singletonList(topic)) + .topicNameValues() + .get(topic); + + future.get(); // Attempt to fetch metadata + return true; + } catch (ExecutionException e) { + if (e.getCause() + instanceof org.apache.kafka.common.errors.UnknownTopicOrPartitionException) { + LOGGER.warn("Topic '{}' does not exist.", topic); + return false; + } + LOGGER.error("Error checking if topic '{}' exists: {}", topic, e.getMessage()); + return false; + } catch (Exception e) { + LOGGER.error("General error checking if topic '{}' exists: {}", topic, e.getMessage()); + return false; + } + } + + private void refreshMetadata(String topic) { + adminClient + .describeTopics(Collections.singletonList(topic)) + .topicNameValues() + .get(topic) + .whenComplete( + (topicDescription, exception) -> { + if (exception != null) { + if (exception.getCause() + instanceof + org.apache.kafka.common.errors + .UnknownTopicOrPartitionException) { + LOGGER.warn("Topic '{}' still does not exist.", topic); + } else { + LOGGER.error( + "Error refreshing metadata for topic '{}': {}", + topic, + exception.getMessage()); + } + } else { + LOGGER.info( + "Metadata refreshed for topic '{}': Partitions = {}", + topic, + topicDescription.partitions()); + } + }); + } } diff --git a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java index 8a6f98ff0..01ac9701b 100644 --- a/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java +++ b/kafka-event-queue/src/test/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueueTest.java @@ -415,6 +415,66 @@ public void testSize() throws Exception { assertEquals(10, size); // As we mocked 10 as the offset in the partition } + @Test + public void testSizeWhenTopicExists() throws Exception { + // Mock topic description + TopicDescription topicDescription = + new TopicDescription( + "test-topic", + false, + List.of(new TopicPartitionInfo(0, null, List.of(), List.of()))); + + // Mock offsets + Map offsets = + Map.of( + new TopicPartition("test-topic", 0), + new ListOffsetsResult.ListOffsetsResultInfo( + 10L, // Offset value + 0L, // Log append time (can be 0 if not available) + Optional.empty() // Leader epoch (optional, use a default value like + // 100) + )); + + // Mock AdminClient behavior for describeTopics + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of("test-topic", KafkaFuture.completedFuture(topicDescription))); + when(mockAdminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Mock AdminClient behavior for listOffsets + ListOffsetsResult mockListOffsetsResult = mock(ListOffsetsResult.class); + when(mockListOffsetsResult.all()).thenReturn(KafkaFuture.completedFuture(offsets)); + when(mockAdminClient.listOffsets(anyMap())).thenReturn(mockListOffsetsResult); + + // Call size + long size = queue.size(); + + // Verify + assertEquals(10L, size); + } + + @Test + public void testSizeWhenTopicDoesNotExist() throws Exception { + // Mock KafkaFuture to simulate a topic-not-found exception + KafkaFuture failedFuture = mock(KafkaFuture.class); + when(failedFuture.get()) + .thenThrow( + new org.apache.kafka.common.errors.UnknownTopicOrPartitionException( + "Topic not found")); + + // Mock DescribeTopicsResult + DescribeTopicsResult mockDescribeTopicsResult = mock(DescribeTopicsResult.class); + when(mockDescribeTopicsResult.topicNameValues()) + .thenReturn(Map.of("test-topic", failedFuture)); + when(mockAdminClient.describeTopics(anyCollection())).thenReturn(mockDescribeTopicsResult); + + // Call size + long size = queue.size(); + + // Verify the result + assertEquals(-1L, size); // Return -1 for non-existent topics + } + @Test public void testLifecycle() { queue.start(); From 16b8ccfe06b9e7fb27711d7323e41691d7f64bb3 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Thu, 5 Dec 2024 13:17:43 +0900 Subject: [PATCH 12/21] Handling thread sync when evenhandler is updated with a new queue --- .../eventqueue/KafkaObservableQueue.java | 51 ++++++++++++------- 1 file changed, 34 insertions(+), 17 deletions(-) diff --git a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java index 8a65692f8..4c798cf6d 100644 --- a/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java +++ b/kafka-event-queue/src/main/java/com/netflix/conductor/kafkaeq/eventqueue/KafkaObservableQueue.java @@ -456,7 +456,7 @@ public void start() { } @Override - public void stop() { + public synchronized void stop() { LOGGER.info("Kafka consumer stopping for topic: {}", topic); if (!running) { LOGGER.warn("KafkaObservableQueue is already stopped for topic: {}", topic); @@ -488,39 +488,56 @@ public void stop() { } private void retryCloseConsumer() { - int retries = 3; - while (retries > 0) { + int attempts = 3; + while (attempts > 0) { try { + kafkaConsumer.unsubscribe(); kafkaConsumer.close(); - LOGGER.info("Kafka consumer closed successfully on retry."); - return; + LOGGER.info("Kafka consumer stopped for topic: {}", topic); + return; // Exit if successful } catch (Exception e) { - retries--; LOGGER.warn( - "Retry failed to close Kafka consumer. Remaining attempts: {}", retries, e); - if (retries == 0) { - LOGGER.error("Exhausted retries for closing Kafka consumer."); + "Error stopping Kafka consumer for topic: {}, attempts remaining: {}", + topic, + attempts - 1, + e); + attempts--; + try { + Thread.sleep(1000); // Wait before retrying + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.error("Thread interrupted during Kafka consumer shutdown retries"); + break; } } } + LOGGER.error("Failed to stop Kafka consumer for topic: {} after retries", topic); } private void retryCloseProducer() { - int retries = 3; - while (retries > 0) { + int attempts = 3; + while (attempts > 0) { try { kafkaProducer.close(); - LOGGER.info("Kafka producer closed successfully on retry."); - return; + LOGGER.info("Kafka producer stopped for topic: {}", topic); + return; // Exit if successful } catch (Exception e) { - retries--; LOGGER.warn( - "Retry failed to close Kafka producer. Remaining attempts: {}", retries, e); - if (retries == 0) { - LOGGER.error("Exhausted retries for closing Kafka producer."); + "Error stopping Kafka producer for topic: {}, attempts remaining: {}", + topic, + attempts - 1, + e); + attempts--; + try { + Thread.sleep(1000); // Wait before retrying + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + LOGGER.error("Thread interrupted during Kafka producer shutdown retries"); + break; } } } + LOGGER.error("Failed to stop Kafka producer for topic: {} after retries", topic); } @Override From 23139a8e8961720aeee9030cf14523f4621b0174 Mon Sep 17 00:00:00 2001 From: James Stuart Milne Date: Tue, 10 Dec 2024 19:20:45 -0300 Subject: [PATCH 13/21] fix: asume a common type --- ui/src/pages/execution/TaskDetails.jsx | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/ui/src/pages/execution/TaskDetails.jsx b/ui/src/pages/execution/TaskDetails.jsx index 8cbfa192f..022894d8e 100644 --- a/ui/src/pages/execution/TaskDetails.jsx +++ b/ui/src/pages/execution/TaskDetails.jsx @@ -48,9 +48,9 @@ export default function TaskDetails({ const selectedTaskRefName = data?.data?.task?.executionData?.status === "PENDING" ? pendingTaskSelection(data?.data?.task)?.workflowTask - ?.taskReferenceName - : taskWithLatestIteration(execution?.tasks, data?.id) - ?.referenceTaskName; + ?.taskReferenceName + : taskWithLatestIteration(execution?.tasks, { ref: data.id }) + ?.referenceTaskName; setSelectedNode(data); setSelectedTask({ ref: selectedTaskRefName }); }} From 6696162fda9b06b9502893fd821d9a68c4e4aedb Mon Sep 17 00:00:00 2001 From: James Stuart Milne Date: Tue, 10 Dec 2024 19:22:14 -0300 Subject: [PATCH 14/21] fix: find task by id or taskReferenceName --- ui/src/pages/execution/RightPanel.jsx | 2 +- ui/src/utils/helpers.js | 28 ++++++++++++++++++--------- 2 files changed, 20 insertions(+), 10 deletions(-) diff --git a/ui/src/pages/execution/RightPanel.jsx b/ui/src/pages/execution/RightPanel.jsx index b3dc7b79e..839a2692c 100644 --- a/ui/src/pages/execution/RightPanel.jsx +++ b/ui/src/pages/execution/RightPanel.jsx @@ -46,7 +46,7 @@ export default function RightPanel({ const taskResult = selectedNode?.data?.task?.executionData?.status === "PENDING" ? pendingTaskSelection(selectedNode?.data?.task) - : taskWithLatestIteration(execution?.tasks, selectedTask?.ref); + : taskWithLatestIteration(execution?.tasks, selectedTask); const dfOptions = useMemo( () => dag && dag.getSiblings(selectedTask), diff --git a/ui/src/utils/helpers.js b/ui/src/utils/helpers.js index 407732f63..405ba06da 100644 --- a/ui/src/utils/helpers.js +++ b/ui/src/utils/helpers.js @@ -48,7 +48,7 @@ export function astToQuery(node) { return `${wrapper ? "(" : ""}${clauses.join(` ${combinator} `)}${ wrapper ? ")" : "" - }`; + }`; } else { return ""; } @@ -99,16 +99,24 @@ export function getBasename() { return _.isEmpty(basename) ? "/" : basename; } -export const taskWithLatestIteration = (tasksList, taskReferenceName) => { - const filteredTasks = tasksList?.filter( - (task) => - task?.workflowTask?.taskReferenceName === taskReferenceName || - task?.referenceTaskName === taskReferenceName - ); +export const taskWithLatestIteration = (tasksList = [], selectedTask) => { + const taskReferenceName = selectedTask?.ref; + + const findTaskByReferenceName = (task) => + task?.workflowTask?.taskReferenceName === taskReferenceName || + task?.referenceTaskName === taskReferenceName; + + const findTaskById = (task) => task?.taskId === selectedTask?.id; + + // If reference name is not provided, use taskId to find the task + const findTask = selectedTask?.ref == null ? findTaskById : findTaskByReferenceName; + + const filteredTasks = tasksList?.filter(findTask); if (filteredTasks && filteredTasks.length === 1) { // task without any retry/iteration - return _nth(filteredTasks, 0); + const targetTask = _nth(filteredTasks, 0); + return targetTask; } else if (filteredTasks && filteredTasks.length > 1) { const result = filteredTasks.reduce( (acc, task, idx) => { @@ -121,9 +129,11 @@ export const taskWithLatestIteration = (tasksList, taskReferenceName) => { ); if (result?.idx > -1) { - return _nth(filteredTasks, result.idx); + const targetTask = _nth(filteredTasks, result.idx); + return targetTask; } } + return undefined; }; From 12ebe53ee8797a30acdbb3c574646b585a690309 Mon Sep 17 00:00:00 2001 From: DanMiller192 <55767381+danmiller192@users.noreply.github.com> Date: Tue, 23 Jul 2024 21:22:06 +0100 Subject: [PATCH 15/21] Updated workflow sweeper Updated workflow sweeper to use the workflowOffsetTimeout rather than the task timeout when the task is in progress. --- .../conductor/core/reconciliation/WorkflowSweeper.java | 5 +---- .../conductor/core/reconciliation/TestWorkflowSweeper.java | 4 +++- 2 files changed, 4 insertions(+), 5 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index 3f5a9c5b6..b39d122f0 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -139,10 +139,7 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { postponeDurationSeconds = workflowOffsetTimeout; } else { - postponeDurationSeconds = - (taskModel.getResponseTimeoutSeconds() != 0) - ? taskModel.getResponseTimeoutSeconds() + 1 - : workflowOffsetTimeout; + postponeDurationSeconds = workflowOffsetTimeout; } break; } else if (taskModel.getStatus() == Status.SCHEDULED) { diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 53bd3bb48..042735b70 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -198,7 +198,9 @@ public void testPostponeDurationForTaskInProgressWithResponseTimeoutSet() { workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( - DECIDER_QUEUE, workflowModel.getWorkflowId(), (responseTimeout + 1) * 1000); + DECIDER_QUEUE, + workflowModel.getWorkflowId(), + defaultPostPoneOffSetSeconds * 1000); } @Test From a6be214299a43b6531cf043ef79b580d1fc243c5 Mon Sep 17 00:00:00 2001 From: Yong Sheng Tan Date: Thu, 12 Dec 2024 10:10:06 +0800 Subject: [PATCH 16/21] Add testWorkflow to OrkesWorkflowClient --- .../client/http/OrkesWorkflowClient.java | 5 ++++ .../client/http/WorkflowClientTests.java | 25 +++++++++++++++++++ 2 files changed, 30 insertions(+) diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index 0b47c6ff3..6748409e4 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -36,6 +36,7 @@ import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.WorkflowSummary; +import com.netflix.conductor.common.run.WorkflowTestRequest; import io.orkes.conductor.client.model.CorrelationIdsSearchRequest; import io.orkes.conductor.client.model.WorkflowRun; @@ -202,6 +203,10 @@ public void skipTaskFromWorkflow(String workflowId, String taskReferenceName) { workflowClient.skipTaskFromWorkflow(workflowId, taskReferenceName); } + public Workflow testWorkflow(WorkflowTestRequest testRequest) { + return workflowClient.testWorkflow(testRequest); + } + public SearchResult search(String query) { return workflowClient.search(query); } diff --git a/conductor-clients/java/conductor-java-sdk/tests/src/test/java/io/orkes/conductor/client/http/WorkflowClientTests.java b/conductor-clients/java/conductor-java-sdk/tests/src/test/java/io/orkes/conductor/client/http/WorkflowClientTests.java index 465ec03b8..7e8ec62cf 100644 --- a/conductor-clients/java/conductor-java-sdk/tests/src/test/java/io/orkes/conductor/client/http/WorkflowClientTests.java +++ b/conductor-clients/java/conductor-java-sdk/tests/src/test/java/io/orkes/conductor/client/http/WorkflowClientTests.java @@ -16,6 +16,9 @@ import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.metadata.workflow.WorkflowTask; +import com.netflix.conductor.common.run.WorkflowTestRequest; import org.junit.jupiter.api.Assertions; import org.junit.jupiter.api.BeforeAll; import org.junit.jupiter.api.Test; @@ -184,6 +187,28 @@ void testExecuteWorkflow() { // TODO } + @Test + void testWorkflow() { + WorkflowTask task = new WorkflowTask(); + task.setName("testable-task"); + task.setTaskReferenceName("testable-task-ref"); + + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("testable-flow"); + workflowDef.setTasks(List.of(task)); + + WorkflowTestRequest testRequest = new WorkflowTestRequest(); + testRequest.setName("testable-flow"); + testRequest.setWorkflowDef(workflowDef); + testRequest.setTaskRefToMockOutput(Map.of( + "testable-task-ref", + List.of(new WorkflowTestRequest.TaskMock(TaskResult.Status.COMPLETED, Map.of("result", "ok"))) + )); + + Workflow workflow = workflowClient.testWorkflow(testRequest); + Assertions.assertEquals("ok", workflow.getOutput().get("result")); + } + StartWorkflowRequest getStartWorkflowRequest() { StartWorkflowRequest startWorkflowRequest = new StartWorkflowRequest(); startWorkflowRequest.setName(Commons.WORKFLOW_NAME); From 832524567869e03cbaa61479e675ebbd6aed0a8f Mon Sep 17 00:00:00 2001 From: DanMiller192 <55767381+danmiller192@users.noreply.github.com> Date: Tue, 19 Nov 2024 16:58:49 +0000 Subject: [PATCH 17/21] Added maximum workflow sweep postpone config --- .../core/config/ConductorProperties.java | 12 +++++++ .../core/reconciliation/WorkflowSweeper.java | 12 ++++++- .../reconciliation/TestWorkflowSweeper.java | 33 ++++++++++++++++++- 3 files changed, 55 insertions(+), 2 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index f07f1485f..7ef41368a 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -42,6 +42,10 @@ public class ConductorProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration workflowOffsetTimeout = Duration.ofSeconds(30); + /** The maximum timeout duration to set when a workflow is pushed to the decider queue. */ + @DurationUnit(ChronoUnit.SECONDS) + private Duration maxPostponeDurationSeconds = Duration.ofSeconds(3600); + /** The number of threads to use to do background sweep on active workflows. */ private int sweeperThreadCount = Runtime.getRuntime().availableProcessors() * 2; @@ -251,6 +255,14 @@ public void setWorkflowOffsetTimeout(Duration workflowOffsetTimeout) { this.workflowOffsetTimeout = workflowOffsetTimeout; } + public Duration getMaxPostponeDurationSeconds() { + return maxPostponeDurationSeconds; + } + + public void setMaxPostponeDurationSeconds(Duration maxPostponeDurationSeconds) { + this.maxPostponeDurationSeconds = maxPostponeDurationSeconds; + } + public int getSweeperThreadCount() { return sweeperThreadCount; } diff --git a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java index b39d122f0..9702eadd6 100644 --- a/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java +++ b/core/src/main/java/com/netflix/conductor/core/reconciliation/WorkflowSweeper.java @@ -139,8 +139,18 @@ void unack(WorkflowModel workflowModel, long workflowOffsetTimeout) { } else if (taskModel.getTaskType().equals(TaskType.TASK_TYPE_HUMAN)) { postponeDurationSeconds = workflowOffsetTimeout; } else { - postponeDurationSeconds = workflowOffsetTimeout; + postponeDurationSeconds = + (taskModel.getResponseTimeoutSeconds() != 0) + ? taskModel.getResponseTimeoutSeconds() + 1 + : workflowOffsetTimeout; } + + if (postponeDurationSeconds + > properties.getMaxPostponeDurationSeconds().getSeconds()) { + postponeDurationSeconds = + properties.getMaxPostponeDurationSeconds().getSeconds(); + } + break; } else if (taskModel.getStatus() == Status.SCHEDULED) { Optional taskDefinition = taskModel.getTaskDefinition(); diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index 042735b70..c882ec45f 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -49,6 +49,7 @@ public class TestWorkflowSweeper { private ExecutionLockService executionLockService; private int defaultPostPoneOffSetSeconds = 1800; + private int defaulMmaxPostponeDurationSeconds = 2000000; @Before public void setUp() { @@ -79,6 +80,8 @@ public void testPostponeDurationForHumanTaskType() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -98,6 +101,8 @@ public void testPostponeDurationForWaitTaskType() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -119,6 +124,8 @@ public void testPostponeDurationForWaitTaskTypeWithLongWaitTime() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -174,6 +181,8 @@ public void testPostponeDurationForTaskInProgress() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( @@ -196,11 +205,33 @@ public void testPostponeDurationForTaskInProgressWithResponseTimeoutSet() { when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); + verify(queueDAO) + .setUnackTimeout( + DECIDER_QUEUE, workflowModel.getWorkflowId(), (responseTimeout + 1) * 1000); + } + + @Test + public void + testPostponeDurationForTaskInProgressWithResponseTimeoutSetLongerThanMaxPostponeDuration() { + long responseTimeout = defaulMmaxPostponeDurationSeconds + 1; + WorkflowModel workflowModel = new WorkflowModel(); + workflowModel.setWorkflowId("1"); + TaskModel taskModel = new TaskModel(); + taskModel.setTaskId("task1"); + taskModel.setTaskType(TaskType.TASK_TYPE_SIMPLE); + taskModel.setStatus(Status.IN_PROGRESS); + taskModel.setResponseTimeoutSeconds(responseTimeout); + workflowModel.setTasks(List.of(taskModel)); + when(properties.getWorkflowOffsetTimeout()) + .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); + workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( DECIDER_QUEUE, workflowModel.getWorkflowId(), - defaultPostPoneOffSetSeconds * 1000); + defaulMmaxPostponeDurationSeconds * 1000L); } @Test From a8f775fc810be3c436dd6b7e6b12a5e166ab9aac Mon Sep 17 00:00:00 2001 From: DanMiller192 <55767381+danmiller192@users.noreply.github.com> Date: Fri, 13 Dec 2024 21:25:06 +0000 Subject: [PATCH 18/21] Fixed failing test --- .../conductor/core/reconciliation/TestWorkflowSweeper.java | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java index c882ec45f..b1786b6e6 100644 --- a/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java +++ b/core/src/test/java/com/netflix/conductor/core/reconciliation/TestWorkflowSweeper.java @@ -204,6 +204,8 @@ public void testPostponeDurationForTaskInProgressWithResponseTimeoutSet() { workflowModel.setTasks(List.of(taskModel)); when(properties.getWorkflowOffsetTimeout()) .thenReturn(Duration.ofSeconds(defaultPostPoneOffSetSeconds)); + when(properties.getMaxPostponeDurationSeconds()) + .thenReturn(Duration.ofSeconds(defaulMmaxPostponeDurationSeconds)); workflowSweeper.unack(workflowModel, defaultPostPoneOffSetSeconds); verify(queueDAO) .setUnackTimeout( From a5a8f17ae01cdf5bdf21e9fdb66e292a2bc86f35 Mon Sep 17 00:00:00 2001 From: DanMiller192 <55767381+danmiller192@users.noreply.github.com> Date: Fri, 13 Dec 2024 21:34:06 +0000 Subject: [PATCH 19/21] Updated documentation --- .../core/config/ConductorProperties.java | 5 +- docs/documentation/configuration/appconf.md | 85 ++++++++++--------- 2 files changed, 47 insertions(+), 43 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java index 7ef41368a..0bd5e4ed2 100644 --- a/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java +++ b/core/src/main/java/com/netflix/conductor/core/config/ConductorProperties.java @@ -42,7 +42,10 @@ public class ConductorProperties { @DurationUnit(ChronoUnit.SECONDS) private Duration workflowOffsetTimeout = Duration.ofSeconds(30); - /** The maximum timeout duration to set when a workflow is pushed to the decider queue. */ + /** + * The maximum timeout duration to set when a workflow with running task is pushed to the + * decider queue. + */ @DurationUnit(ChronoUnit.SECONDS) private Duration maxPostponeDurationSeconds = Duration.ofSeconds(3600); diff --git a/docs/documentation/configuration/appconf.md b/docs/documentation/configuration/appconf.md index eed71b87d..8086836d0 100644 --- a/docs/documentation/configuration/appconf.md +++ b/docs/documentation/configuration/appconf.md @@ -9,48 +9,49 @@ All of these parameters are grouped under the `conductor.app` namespace. ### Configuration -| Field | Type | Description | Notes | -|:-----------------------------------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:-------------------------------------------------------| -| stack | String | Name of the stack within which the app is running. e.g. `devint`, `testintg`, `staging`, `prod` etc. | Default is "test" | -| appId | String | The ID with which the app has been registered. e.g. `conductor`, `myApp` | Default is "conductor" | -| executorServiceMaxThreadCount | int | The maximum number of threads to be allocated to the executor service threadpool. e.g. `50` | Default is 50 | -| workflowOffsetTimeout | Duration | The timeout duration to set when a workflow is pushed to the decider queue. Example: `30s` or `1m` | Default is 30 seconds | -| sweeperThreadCount | int | The number of threads to use for background sweeping on active workflows. Example: `8` if there are 4 processors (2x4) | Default is 2 times the number of available processors | -| sweeperWorkflowPollTimeout | Duration | The timeout for polling workflows to be swept. Example: `2000ms` or `2s` | Default is 2000 milliseconds | -| eventProcessorThreadCount | int | The number of threads to configure the threadpool in the event processor. Example: `4` | Default is 2 | -| eventMessageIndexingEnabled | boolean | Whether to enable indexing of messages within event payloads. Example: `true` or `false` | Default is true | -| eventExecutionIndexingEnabled | boolean | Whether to enable indexing of event execution results. Example: `true` or `false` | Default is true | -| workflowExecutionLockEnabled | boolean | Whether to enable the workflow execution lock. Example: `true` or `false` | Default is false | -| lockLeaseTime | Duration | The time for which the lock is leased. Example: `60000ms` or `1m` | Default is 60000 milliseconds | -| lockTimeToTry | Duration | The time for which the thread will block in an attempt to acquire the lock. Example: `500ms` or `1s` | Default is 500 milliseconds | -| activeWorkerLastPollTimeout | Duration | The time to consider if a worker is actively polling for a task. Example: `10s` | Default is 10 seconds | -| taskExecutionPostponeDuration | Duration | The time for which a task execution will be postponed if rate-limited or concurrent execution limited. Example: `60s` | Default is 60 seconds | -| taskIndexingEnabled | boolean | Whether to enable indexing of tasks. Example: `true` or `false` | Default is true | -| taskExecLogIndexingEnabled | boolean | Whether to enable indexing of task execution logs. Example: `true` or `false` | Default is true | -| asyncIndexingEnabled | boolean | Whether to enable asynchronous indexing to Elasticsearch. Example: `true` or `false` | Default is false | -| systemTaskWorkerThreadCount | int | The number of threads in the threadpool for system task workers. Example: `8` if there are 4 processors (2x4) | Default is 2 times the number of available processors | -| systemTaskMaxPollCount | int | The maximum number of threads to be polled within the threadpool for system task workers. Example: `8` | Default is equal to systemTaskWorkerThreadCount | -| systemTaskWorkerCallbackDuration | Duration | The interval after which a system task will be checked by the system task worker for completion. Example: `30s` | Default is 30 seconds | -| systemTaskWorkerPollInterval | Duration | The interval at which system task queues will be polled by system task workers. Example: `50ms` | Default is 50 milliseconds | -| systemTaskWorkerExecutionNamespace | String | The namespace for the system task workers to provide instance-level isolation. Example: `namespace1`, `namespace2` | Default is an empty string | -| isolatedSystemTaskWorkerThreadCount | int | The number of threads to be used within the threadpool for system task workers in each isolation group. Example: `4` | Default is 1 | -| asyncUpdateShortRunningWorkflowDuration | Duration | The duration of workflow execution qualifying as short-running when async indexing to Elasticsearch is enabled. Example: `30s` | Default is 30 seconds | -| asyncUpdateDelay | Duration | The delay with which short-running workflows will be updated in Elasticsearch when async indexing is enabled. Example: `60s` | Default is 60 seconds | -| ownerEmailMandatory | boolean | Whether to validate the owner email field as mandatory within workflow and task definitions. Example: `true` or `false` | Default is true | -| eventQueueSchedulerPollThreadCount | int | The number of threads used in the Scheduler for polling events from multiple event queues. Example: `8` if there are 4 processors (2x4) | Default is equal to the number of available processors | -| eventQueuePollInterval | Duration | The time interval at which the default event queues will be polled. Example: `100ms` | Default is 100 milliseconds | -| eventQueuePollCount | int | The number of messages to be polled from a default event queue in a single operation. Example: `10` | Default is 10 | -| eventQueueLongPollTimeout | Duration | The timeout for the poll operation on the default event queue. Example: `1000ms` | Default is 1000 milliseconds | -| workflowInputPayloadSizeThreshold | DataSize | The threshold of the workflow input payload size beyond which the payload will be stored in ExternalPayloadStorage. Example: `5120KB` | Default is 5120 kilobytes | -| maxWorkflowInputPayloadSizeThreshold | DataSize | The maximum threshold of the workflow input payload size beyond which input will be rejected and the workflow marked as FAILED. Example: `10240KB` | Default is 10240 kilobytes | -| workflowOutputPayloadSizeThreshold | DataSize | The threshold of the workflow output payload size beyond which the payload will be stored in ExternalPayloadStorage. Example: `5120KB` | Default is 5120 kilobytes | -| maxWorkflowOutputPayloadSizeThreshold | DataSize | The maximum threshold of the workflow output payload size beyond which output will be rejected and the workflow marked as FAILED. Example: `10240KB` | Default is 10240 kilobytes | -| taskInputPayloadSizeThreshold | DataSize | The threshold of the task input payload size beyond which the payload will be stored in ExternalPayloadStorage. Example: `3072KB` | Default is 3072 kilobytes | -| maxTaskInputPayloadSizeThreshold | DataSize | The maximum threshold of the task input payload size beyond which the task input will be rejected and the task marked as FAILED_WITH_TERMINAL_ERROR. Example: `10240KB` | Default is 10240 kilobytes | -| taskOutputPayloadSizeThreshold | DataSize | The threshold of the task output payload size beyond which the payload will be stored in ExternalPayloadStorage. Example: `3072KB` | Default is 3072 kilobytes | -| maxTaskOutputPayloadSizeThreshold | DataSize | The maximum threshold of the task output payload size beyond which the task output will be rejected and the task marked as FAILED_WITH_TERMINAL_ERROR. Example: `10240KB` | Default is 10240 kilobytes | -| maxWorkflowVariablesPayloadSizeThreshold | DataSize | The maximum threshold of the workflow variables payload size beyond which the task changes will be rejected and the task marked as FAILED_WITH_TERMINAL_ERROR. Example: `256KB` | Default is 256 kilobytes | -| taskExecLogSizeLimit | int | The maximum size of task execution logs. Example: `10000` | Default is 10 | +| Field | Type | Description | Notes | +|:--------------------------------------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|:--------------------------------------------------------| +| stack | String | Name of the stack within which the app is running. e.g. `devint`, `testintg`, `staging`, `prod` etc. | Default is "test" | +| appId | String | The ID with which the app has been registered. e.g. `conductor`, `myApp` | Default is "conductor" | +| executorServiceMaxThreadCount | int | The maximum number of threads to be allocated to the executor service threadpool. e.g. `50` | Default is 50 | +| workflowOffsetTimeout | Duration | The timeout duration to set when a workflow is pushed to the decider queue. Example: `30s` or `1m` | Default is 30 seconds | +| maxPostponeDurationSeconds | Duration | The maximum timeout duration to set when a workflow with running task is pushed to the decider queue. Example: `30m` or `1h` | Default is 3600 seconds | +| sweeperThreadCount | int | The number of threads to use for background sweeping on active workflows. Example: `8` if there are 4 processors (2x4) | Default is 2 times the number of available processors | +| sweeperWorkflowPollTimeout | Duration | The timeout for polling workflows to be swept. Example: `2000ms` or `2s` | Default is 2000 milliseconds | +| eventProcessorThreadCount | int | The number of threads to configure the threadpool in the event processor. Example: `4` | Default is 2 | +| eventMessageIndexingEnabled | boolean | Whether to enable indexing of messages within event payloads. Example: `true` or `false` | Default is true | +| eventExecutionIndexingEnabled | boolean | Whether to enable indexing of event execution results. Example: `true` or `false` | Default is true | +| workflowExecutionLockEnabled | boolean | Whether to enable the workflow execution lock. Example: `true` or `false` | Default is false | +| lockLeaseTime | Duration | The time for which the lock is leased. Example: `60000ms` or `1m` | Default is 60000 milliseconds | +| lockTimeToTry | Duration | The time for which the thread will block in an attempt to acquire the lock. Example: `500ms` or `1s` | Default is 500 milliseconds | +| activeWorkerLastPollTimeout | Duration | The time to consider if a worker is actively polling for a task. Example: `10s` | Default is 10 seconds | +| taskExecutionPostponeDuration | Duration | The time for which a task execution will be postponed if rate-limited or concurrent execution limited. Example: `60s` | Default is 60 seconds | +| taskIndexingEnabled | boolean | Whether to enable indexing of tasks. Example: `true` or `false` | Default is true | +| taskExecLogIndexingEnabled | boolean | Whether to enable indexing of task execution logs. Example: `true` or `false` | Default is true | +| asyncIndexingEnabled | boolean | Whether to enable asynchronous indexing to Elasticsearch. Example: `true` or `false` | Default is false | +| systemTaskWorkerThreadCount | int | The number of threads in the threadpool for system task workers. Example: `8` if there are 4 processors (2x4) | Default is 2 times the number of available processors | +| systemTaskMaxPollCount | int | The maximum number of threads to be polled within the threadpool for system task workers. Example: `8` | Default is equal to systemTaskWorkerThreadCount | +| systemTaskWorkerCallbackDuration | Duration | The interval after which a system task will be checked by the system task worker for completion. Example: `30s` | Default is 30 seconds | +| systemTaskWorkerPollInterval | Duration | The interval at which system task queues will be polled by system task workers. Example: `50ms` | Default is 50 milliseconds | +| systemTaskWorkerExecutionNamespace | String | The namespace for the system task workers to provide instance-level isolation. Example: `namespace1`, `namespace2` | Default is an empty string | +| isolatedSystemTaskWorkerThreadCount | int | The number of threads to be used within the threadpool for system task workers in each isolation group. Example: `4` | Default is 1 | +| asyncUpdateShortRunningWorkflowDuration | Duration | The duration of workflow execution qualifying as short-running when async indexing to Elasticsearch is enabled. Example: `30s` | Default is 30 seconds | +| asyncUpdateDelay | Duration | The delay with which short-running workflows will be updated in Elasticsearch when async indexing is enabled. Example: `60s` | Default is 60 seconds | +| ownerEmailMandatory | boolean | Whether to validate the owner email field as mandatory within workflow and task definitions. Example: `true` or `false` | Default is true | +| eventQueueSchedulerPollThreadCount | int | The number of threads used in the Scheduler for polling events from multiple event queues. Example: `8` if there are 4 processors (2x4) | Default is equal to the number of available processors | +| eventQueuePollInterval | Duration | The time interval at which the default event queues will be polled. Example: `100ms` | Default is 100 milliseconds | +| eventQueuePollCount | int | The number of messages to be polled from a default event queue in a single operation. Example: `10` | Default is 10 | +| eventQueueLongPollTimeout | Duration | The timeout for the poll operation on the default event queue. Example: `1000ms` | Default is 1000 milliseconds | +| workflowInputPayloadSizeThreshold | DataSize | The threshold of the workflow input payload size beyond which the payload will be stored in ExternalPayloadStorage. Example: `5120KB` | Default is 5120 kilobytes | +| maxWorkflowInputPayloadSizeThreshold | DataSize | The maximum threshold of the workflow input payload size beyond which input will be rejected and the workflow marked as FAILED. Example: `10240KB` | Default is 10240 kilobytes | +| workflowOutputPayloadSizeThreshold | DataSize | The threshold of the workflow output payload size beyond which the payload will be stored in ExternalPayloadStorage. Example: `5120KB` | Default is 5120 kilobytes | +| maxWorkflowOutputPayloadSizeThreshold | DataSize | The maximum threshold of the workflow output payload size beyond which output will be rejected and the workflow marked as FAILED. Example: `10240KB` | Default is 10240 kilobytes | +| taskInputPayloadSizeThreshold | DataSize | The threshold of the task input payload size beyond which the payload will be stored in ExternalPayloadStorage. Example: `3072KB` | Default is 3072 kilobytes | +| maxTaskInputPayloadSizeThreshold | DataSize | The maximum threshold of the task input payload size beyond which the task input will be rejected and the task marked as FAILED_WITH_TERMINAL_ERROR. Example: `10240KB` | Default is 10240 kilobytes | +| taskOutputPayloadSizeThreshold | DataSize | The threshold of the task output payload size beyond which the payload will be stored in ExternalPayloadStorage. Example: `3072KB` | Default is 3072 kilobytes | +| maxTaskOutputPayloadSizeThreshold | DataSize | The maximum threshold of the task output payload size beyond which the task output will be rejected and the task marked as FAILED_WITH_TERMINAL_ERROR. Example: `10240KB` | Default is 10240 kilobytes | +| maxWorkflowVariablesPayloadSizeThreshold | DataSize | The maximum threshold of the workflow variables payload size beyond which the task changes will be rejected and the task marked as FAILED_WITH_TERMINAL_ERROR. Example: `256KB` | Default is 256 kilobytes | +| taskExecLogSizeLimit | int | The maximum size of task execution logs. Example: `10000` | Default is 10 | ### Example usage From 2cc59ee9b8088400df774dee48d78b735745943f Mon Sep 17 00:00:00 2001 From: Gulam Mohiuddeen <90964388+gulam159@users.noreply.github.com> Date: Mon, 16 Dec 2024 17:17:04 +0530 Subject: [PATCH 20/21] Enhance logo visibility by supporting light and dark modes in README.md This PR updates the README file to ensure the project logo is visible in both light and dark mode themes on platforms like GitHub. Changes Made: Added a element to conditionally display the logo based on the user's prefers-color-scheme setting. Light mode: Displays the dark logo version (default). Dark mode: Displays the light logo version (optimized for dark backgrounds). Why this change? Previously, the logo was not visible when viewed in dark mode, potentially affecting readability and branding consistency. This update ensures a seamless viewing experience for all users, regardless of their preferred theme. --- README.md | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 6be3cba18..82687dad6 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,11 @@ -![Conductor OSS Logo](https://assets.conductor-oss.org/logo.png "Conductor OSS") + + + + + + Logo + +

Scalable Workflow Orchestration From a7b1b5f6221fbfb188345e3d6030eb6a65e9588e Mon Sep 17 00:00:00 2001 From: Gulam Mohiuddeen <90964388+gulam159@users.noreply.github.com> Date: Wed, 18 Dec 2024 01:36:38 +0530 Subject: [PATCH 21/21] docs: Add custom logo link to Conductor documentation site (#342) --- docs/overrides/partials/logo.html | 5 +++++ mkdocs.yml | 1 + 2 files changed, 6 insertions(+) create mode 100644 docs/overrides/partials/logo.html diff --git a/docs/overrides/partials/logo.html b/docs/overrides/partials/logo.html new file mode 100644 index 000000000..85cd2f3f8 --- /dev/null +++ b/docs/overrides/partials/logo.html @@ -0,0 +1,5 @@ +{% if config.theme.logo %} + +{% endif %} \ No newline at end of file diff --git a/mkdocs.yml b/mkdocs.yml index fa4794b0f..fcd47aca7 100644 --- a/mkdocs.yml +++ b/mkdocs.yml @@ -118,6 +118,7 @@ nav: theme: name: material logo: img/logo.svg + custom_dir: docs/overrides features: - navigation.tabs - navigation.tabs.sticky