diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java new file mode 100644 index 000000000..b3fb778bb --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events; + +import java.time.OffsetDateTime; +import java.util.UUID; + +public class Event { + private final String eventId; + private final T payload; + private OffsetDateTime eventTime; + private String eventType; + private String correlationId; + private String domain; + private String title; + private String description; + private String priority; + private OffsetDateTime timeOccurred; + + public Event(T payload) { + this.eventId = UUID.randomUUID().toString(); + this.payload = payload; + } + + public String getEventId() { + return eventId; + } + + public T getPayload() { + return payload; + } + + public OffsetDateTime getEventTime() { + return eventTime; + } + + public void setEventTime(OffsetDateTime eventTime) { + this.eventTime = eventTime; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public String getCorrelationId() { + return correlationId; + } + + public void setCorrelationId(String correlationId) { + this.correlationId = correlationId; + } + + public String getDomain() { + return domain; + } + + public void setDomain(String domain) { + this.domain = domain; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getPriority() { + return priority; + } + + public void setPriority(String priority) { + this.priority = priority; + } + + public OffsetDateTime getTimeOccurred() { + return timeOccurred; + } + + public void setTimeOccurred(OffsetDateTime timeOccurred) { + this.timeOccurred = timeOccurred; + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaConfiguration.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaConfiguration.java new file mode 100644 index 000000000..07cff20ab --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaConfiguration.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.kafka.status.events.listener.TaskStatusKafkaProducer; +import com.netflix.conductor.kafka.status.events.listener.WorkflowStatusKafkaProducer; +import com.netflix.conductor.kafka.status.events.services.KafkaEventService; +import com.netflix.conductor.kafka.status.events.services.KafkaEventServiceImpl; + +@Configuration +@EnableConfigurationProperties(KafkaProperties.class) +@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "kafka") +public class KafkaConfiguration { + + @Bean + public KafkaEventService kafkaEventService(KafkaProperties kafkaProperties) { + return new KafkaEventServiceImpl(kafkaProperties); + } + + @ConditionalOnProperty( + name = "conductor.task-status-listener.type", + havingValue = "kafka", + matchIfMissing = false) + @Bean + public TaskStatusListener taskStatusPublisherRabbitMQ( + KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) { + return new TaskStatusKafkaProducer(kafkaEventService, kafkaProperties); + } + + @ConditionalOnProperty( + name = "conductor.workflow-status-listener.type", + havingValue = "kafka", + matchIfMissing = false) + @Bean + public WorkflowStatusListener workflowStatusListenerRabbitMQ( + KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) { + return new WorkflowStatusKafkaProducer(kafkaEventService, kafkaProperties); + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java new file mode 100644 index 000000000..b4f3bf07b --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.config; + +import java.util.Arrays; +import java.util.List; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("conductor.message-publisher.kafka") +public class KafkaProperties { + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public String getTaskStatusTopic() { + return taskStatusTopic; + } + + public void setTaskStatusTopic(String taskStatusTopic) { + this.taskStatusTopic = taskStatusTopic; + } + + public String getWorkflowStatusTopic() { + return workflowStatusTopic; + } + + public void setWorkflowStatusTopic(String workflowStatusTopic) { + this.workflowStatusTopic = workflowStatusTopic; + } + + public List getAllowedTaskStatuses() { + return Arrays.asList(this.allowedTaskStatuses.split(",")); + } + + public void setAllowedTaskStatuses(String allowedTaskStatuses) { + this.allowedTaskStatuses = allowedTaskStatuses; + } + + public boolean isAlwaysPublishWorkflowStatusEnabled() { + return alwaysPublishWorkflowStatusEnabled; + } + + public void setAlwaysPublishWorkflowStatusEnabled(boolean alwaysPublishWorkflowStatusEnabled) { + this.alwaysPublishWorkflowStatusEnabled = alwaysPublishWorkflowStatusEnabled; + } + + private boolean alwaysPublishWorkflowStatusEnabled = true; + private String allowedTaskStatuses; + private String bootstrapServers = ""; + private String taskStatusTopic = "conductor-task-status"; + private String workflowStatusTopic = "conductor-workflow-status"; +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/TaskStatusKafkaProducer.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/TaskStatusKafkaProducer.java new file mode 100644 index 000000000..b6144a3e4 --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/TaskStatusKafkaProducer.java @@ -0,0 +1,105 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.listener; + +import java.time.OffsetDateTime; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.kafka.status.events.Event; +import com.netflix.conductor.kafka.status.events.config.KafkaProperties; +import com.netflix.conductor.kafka.status.events.services.KafkaEventService; +import com.netflix.conductor.model.TaskModel; + +public class TaskStatusKafkaProducer implements TaskStatusListener { + + private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class); + private final KafkaProperties kafkaProperties; + private final KafkaEventService kafkaEventService; + + public TaskStatusKafkaProducer( + KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) { + this.kafkaEventService = kafkaEventService; + this.kafkaProperties = kafkaProperties; + } + + @Override + public void onTaskCompleted(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskScheduled(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskInProgress(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskCanceled(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskFailed(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskFailedWithTerminalError(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskCompletedWithErrors(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskTimedOut(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskSkipped(TaskModel task) { + produceMessage(task); + } + + private boolean IsStatusEnabled(TaskModel task) { + return kafkaProperties.getAllowedTaskStatuses().contains(task.getStatus().name()); + } + + private void produceMessage(TaskModel task) { + try { + if (IsStatusEnabled(task)) { + Event event = new Event<>(task); + event.setEventType("Task." + task.getStatus().name()); + event.setEventTime(OffsetDateTime.now()); + kafkaEventService.produce( + event.getEventId(), event, kafkaProperties.getTaskStatusTopic()); + } + + } catch (Exception e) { + LOGGER.error( + "Failed to produce message to topic: {}. Exception: {}", + kafkaProperties.getTaskStatusTopic(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/WorkflowStatusKafkaProducer.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/WorkflowStatusKafkaProducer.java new file mode 100644 index 000000000..f9f78970e --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/WorkflowStatusKafkaProducer.java @@ -0,0 +1,93 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.listener; + +import java.time.OffsetDateTime; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.kafka.status.events.Event; +import com.netflix.conductor.kafka.status.events.config.KafkaProperties; +import com.netflix.conductor.kafka.status.events.services.KafkaEventService; +import com.netflix.conductor.model.WorkflowModel; + +public class WorkflowStatusKafkaProducer implements WorkflowStatusListener { + + private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class); + private final KafkaProperties kafkaProperties; + private final KafkaEventService kafkaEventService; + + public WorkflowStatusKafkaProducer( + KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + this.kafkaEventService = kafkaEventService; + } + + @Override + public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowCompleted(workflow); + } + } + + @Override + public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowTerminated(workflow); + } + } + + @Override + public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowFinalized(workflow); + } + } + + @Override + public void onWorkflowFinalized(WorkflowModel workflow) { + produceMessage(workflow); + } + + @Override + public void onWorkflowCompleted(WorkflowModel workflow) { + produceMessage(workflow); + } + + @Override + public void onWorkflowTerminated(WorkflowModel workflow) { + produceMessage(workflow); + } + + private void produceMessage(WorkflowModel workflowModel) { + try { + Event event = new Event<>(workflowModel); + event.setEventType("Workflow." + workflowModel.getStatus().name()); + event.setEventTime(OffsetDateTime.now()); + kafkaEventService.produce( + event.getEventId(), event, kafkaProperties.getWorkflowStatusTopic()); + + } catch (Exception e) { + LOGGER.error( + "Failed to produce message to topic: {}. Exception: {}", + kafkaProperties.getWorkflowStatusTopic(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventService.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventService.java new file mode 100644 index 000000000..37b0cddf0 --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventService.java @@ -0,0 +1,20 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.services; + +public interface KafkaEventService { + + void produceEvent(T payload, String topic) throws Exception; + + void produce(String key, V message, String topic) throws Exception; +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java new file mode 100644 index 000000000..62fd7a236 --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java @@ -0,0 +1,88 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.services; + +import java.net.InetAddress; +import java.time.OffsetDateTime; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import com.netflix.conductor.kafka.status.events.Event; +import com.netflix.conductor.kafka.status.events.config.KafkaProperties; + +public class KafkaEventServiceImpl implements KafkaEventService { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventServiceImpl.class); + + @Autowired private final KafkaProperties kafkaProperties; + + public KafkaEventServiceImpl(KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + @Override + public void produceEvent(T payload, String topic) throws Exception { + Event event = new Event<>(payload); + event.setEventTime(OffsetDateTime.now()); + + produce(event.getEventId(), event, topic); + } + + @Override + public void produce(String key, V message, String topic) throws Exception { + + Properties producerConfig = new Properties(); + producerConfig.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put( + ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); + producerConfig.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + KafkaJsonSerializer jsonSerializer = new KafkaJsonSerializer<>(); + + try (KafkaProducer producer = + new KafkaProducer<>(producerConfig, new StringSerializer(), jsonSerializer)) { + + final ProducerRecord record = new ProducerRecord<>(topic, key, message); + + producer.send( + record, + (metadata, exception) -> { + if (exception != null) { + LOGGER.error( + "Failed to send message to Kafka topic: {}", topic, exception); + } else { + LOGGER.info( + "Message sent to topic: {} with offset: {}", + topic, + metadata.offset()); + } + }); + + producer.flush(); + + } catch (Exception e) { + LOGGER.error("Failed to create producer.", e); + throw e; + } + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaJsonSerializer.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaJsonSerializer.java new file mode 100644 index 000000000..73be059b3 --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaJsonSerializer.java @@ -0,0 +1,49 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.services; + +import java.util.Map; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +public class KafkaJsonSerializer implements Serializer { + + private final ObjectMapper objectMapper; + + public KafkaJsonSerializer() { + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); + this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public byte[] serialize(String topic, T data) { + try { + return this.objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new SerializationException(e); + } + } + + @Override + public void close() {} +}