diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java new file mode 100644 index 000000000..b3fb778bb --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024 Conductor Authors. + *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events;
+
+import java.time.OffsetDateTime;
+import java.util.UUID;
+
+public class Event
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.config;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.netflix.conductor.core.listener.TaskStatusListener;
+import com.netflix.conductor.core.listener.WorkflowStatusListener;
+import com.netflix.conductor.kafka.status.events.listener.TaskStatusKafkaProducer;
+import com.netflix.conductor.kafka.status.events.listener.WorkflowStatusKafkaProducer;
+import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
+import com.netflix.conductor.kafka.status.events.services.KafkaEventServiceImpl;
+
+@Configuration
+@EnableConfigurationProperties(KafkaProperties.class)
+@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "kafka")
+public class KafkaConfiguration {
+
+ @Bean
+ public KafkaEventService kafkaEventService(KafkaProperties kafkaProperties) {
+ return new KafkaEventServiceImpl(kafkaProperties);
+ }
+
+ @ConditionalOnProperty(
+ name = "conductor.task-status-listener.type",
+ havingValue = "kafka",
+ matchIfMissing = false)
+ @Bean
+ public TaskStatusListener taskStatusPublisherRabbitMQ(
+ KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
+ return new TaskStatusKafkaProducer(kafkaEventService, kafkaProperties);
+ }
+
+ @ConditionalOnProperty(
+ name = "conductor.workflow-status-listener.type",
+ havingValue = "kafka",
+ matchIfMissing = false)
+ @Bean
+ public WorkflowStatusListener workflowStatusListenerRabbitMQ(
+ KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
+ return new WorkflowStatusKafkaProducer(kafkaEventService, kafkaProperties);
+ }
+}
diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java
new file mode 100644
index 000000000..b4f3bf07b
--- /dev/null
+++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2024 Conductor Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.config;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("conductor.message-publisher.kafka")
+public class KafkaProperties {
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getTaskStatusTopic() {
+ return taskStatusTopic;
+ }
+
+ public void setTaskStatusTopic(String taskStatusTopic) {
+ this.taskStatusTopic = taskStatusTopic;
+ }
+
+ public String getWorkflowStatusTopic() {
+ return workflowStatusTopic;
+ }
+
+ public void setWorkflowStatusTopic(String workflowStatusTopic) {
+ this.workflowStatusTopic = workflowStatusTopic;
+ }
+
+ public List
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.listener;
+
+import java.time.OffsetDateTime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.conductor.core.listener.TaskStatusListener;
+import com.netflix.conductor.kafka.status.events.Event;
+import com.netflix.conductor.kafka.status.events.config.KafkaProperties;
+import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
+import com.netflix.conductor.model.TaskModel;
+
+public class TaskStatusKafkaProducer implements TaskStatusListener {
+
+ private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class);
+ private final KafkaProperties kafkaProperties;
+ private final KafkaEventService kafkaEventService;
+
+ public TaskStatusKafkaProducer(
+ KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
+ this.kafkaEventService = kafkaEventService;
+ this.kafkaProperties = kafkaProperties;
+ }
+
+ @Override
+ public void onTaskCompleted(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskScheduled(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskInProgress(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskCanceled(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskFailed(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskFailedWithTerminalError(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskCompletedWithErrors(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskTimedOut(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskSkipped(TaskModel task) {
+ produceMessage(task);
+ }
+
+ private boolean IsStatusEnabled(TaskModel task) {
+ return kafkaProperties.getAllowedTaskStatuses().contains(task.getStatus().name());
+ }
+
+ private void produceMessage(TaskModel task) {
+ try {
+ if (IsStatusEnabled(task)) {
+ Event
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.listener;
+
+import java.time.OffsetDateTime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.conductor.core.listener.WorkflowStatusListener;
+import com.netflix.conductor.kafka.status.events.Event;
+import com.netflix.conductor.kafka.status.events.config.KafkaProperties;
+import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
+import com.netflix.conductor.model.WorkflowModel;
+
+public class WorkflowStatusKafkaProducer implements WorkflowStatusListener {
+
+ private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class);
+ private final KafkaProperties kafkaProperties;
+ private final KafkaEventService kafkaEventService;
+
+ public WorkflowStatusKafkaProducer(
+ KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
+ this.kafkaProperties = kafkaProperties;
+ this.kafkaEventService = kafkaEventService;
+ }
+
+ @Override
+ public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) {
+ if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
+ || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) {
+ onWorkflowCompleted(workflow);
+ }
+ }
+
+ @Override
+ public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) {
+ if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
+ || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) {
+ onWorkflowTerminated(workflow);
+ }
+ }
+
+ @Override
+ public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) {
+ if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
+ || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) {
+ onWorkflowFinalized(workflow);
+ }
+ }
+
+ @Override
+ public void onWorkflowFinalized(WorkflowModel workflow) {
+ produceMessage(workflow);
+ }
+
+ @Override
+ public void onWorkflowCompleted(WorkflowModel workflow) {
+ produceMessage(workflow);
+ }
+
+ @Override
+ public void onWorkflowTerminated(WorkflowModel workflow) {
+ produceMessage(workflow);
+ }
+
+ private void produceMessage(WorkflowModel workflowModel) {
+ try {
+ Event
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.services;
+
+public interface KafkaEventService {
+
+
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.services;
+
+import java.net.InetAddress;
+import java.time.OffsetDateTime;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.netflix.conductor.kafka.status.events.Event;
+import com.netflix.conductor.kafka.status.events.config.KafkaProperties;
+
+public class KafkaEventServiceImpl implements KafkaEventService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventServiceImpl.class);
+
+ @Autowired private final KafkaProperties kafkaProperties;
+
+ public KafkaEventServiceImpl(KafkaProperties kafkaProperties) {
+ this.kafkaProperties = kafkaProperties;
+ }
+
+ @Override
+ public
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.services;
+
+import java.util.Map;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+public class KafkaJsonSerializer