diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java new file mode 100644 index 000000000..b3fb778bb --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024 Conductor Authors. + *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *
+ * http://www.apache.org/licenses/LICENSE-2.0 + *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events;
+
+import java.time.OffsetDateTime;
+import java.util.UUID;
+
+public class Event
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.config;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.netflix.conductor.core.listener.TaskStatusListener;
+import com.netflix.conductor.core.listener.WorkflowStatusListener;
+import com.netflix.conductor.kafka.status.events.listener.TaskStatusKafkaProducer;
+import com.netflix.conductor.kafka.status.events.listener.WorkflowStatusKafkaProducer;
+import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
+import com.netflix.conductor.kafka.status.events.services.KafkaEventServiceImpl;
+
+@Configuration
+@EnableConfigurationProperties(KafkaProperties.class)
+@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "kafka")
+public class KafkaConfiguration {
+
+ @Bean
+ public KafkaEventService kafkaEventService(KafkaProperties kafkaProperties) {
+ return new KafkaEventServiceImpl(kafkaProperties);
+ }
+
+ @ConditionalOnProperty(
+ name = "conductor.task-status-listener.type",
+ havingValue = "kafka",
+ matchIfMissing = false)
+ @Bean
+ public TaskStatusListener taskStatusPublisherRabbitMQ(
+ KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
+ return new TaskStatusKafkaProducer(kafkaEventService, kafkaProperties);
+ }
+
+ @ConditionalOnProperty(
+ name = "conductor.workflow-status-listener.type",
+ havingValue = "kafka",
+ matchIfMissing = false)
+ @Bean
+ public WorkflowStatusListener workflowStatusListenerRabbitMQ(
+ KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
+ return new WorkflowStatusKafkaProducer(kafkaEventService, kafkaProperties);
+ }
+}
diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java
new file mode 100644
index 000000000..b4f3bf07b
--- /dev/null
+++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java
@@ -0,0 +1,67 @@
+/*
+ * Copyright 2024 Conductor Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.config;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+@ConfigurationProperties("conductor.message-publisher.kafka")
+public class KafkaProperties {
+ public String getBootstrapServers() {
+ return bootstrapServers;
+ }
+
+ public void setBootstrapServers(String bootstrapServers) {
+ this.bootstrapServers = bootstrapServers;
+ }
+
+ public String getTaskStatusTopic() {
+ return taskStatusTopic;
+ }
+
+ public void setTaskStatusTopic(String taskStatusTopic) {
+ this.taskStatusTopic = taskStatusTopic;
+ }
+
+ public String getWorkflowStatusTopic() {
+ return workflowStatusTopic;
+ }
+
+ public void setWorkflowStatusTopic(String workflowStatusTopic) {
+ this.workflowStatusTopic = workflowStatusTopic;
+ }
+
+ public List
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.listener;
+
+import java.time.OffsetDateTime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.conductor.core.listener.TaskStatusListener;
+import com.netflix.conductor.kafka.status.events.Event;
+import com.netflix.conductor.kafka.status.events.config.KafkaProperties;
+import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
+import com.netflix.conductor.model.TaskModel;
+
+public class TaskStatusKafkaProducer implements TaskStatusListener {
+
+ private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class);
+ private final KafkaProperties kafkaProperties;
+ private final KafkaEventService kafkaEventService;
+
+ public TaskStatusKafkaProducer(
+ KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
+ this.kafkaEventService = kafkaEventService;
+ this.kafkaProperties = kafkaProperties;
+ }
+
+ @Override
+ public void onTaskCompleted(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskScheduled(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskInProgress(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskCanceled(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskFailed(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskFailedWithTerminalError(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskCompletedWithErrors(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskTimedOut(TaskModel task) {
+ produceMessage(task);
+ }
+
+ @Override
+ public void onTaskSkipped(TaskModel task) {
+ produceMessage(task);
+ }
+
+ private boolean IsStatusEnabled(TaskModel task) {
+ return kafkaProperties.getAllowedTaskStatuses().contains(task.getStatus().name());
+ }
+
+ private void produceMessage(TaskModel task) {
+ try {
+ if (IsStatusEnabled(task)) {
+ Event
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.listener;
+
+import java.time.OffsetDateTime;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.conductor.core.listener.WorkflowStatusListener;
+import com.netflix.conductor.kafka.status.events.Event;
+import com.netflix.conductor.kafka.status.events.config.KafkaProperties;
+import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
+import com.netflix.conductor.model.WorkflowModel;
+
+public class WorkflowStatusKafkaProducer implements WorkflowStatusListener {
+
+ private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class);
+ private final KafkaProperties kafkaProperties;
+ private final KafkaEventService kafkaEventService;
+
+ public WorkflowStatusKafkaProducer(
+ KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
+ this.kafkaProperties = kafkaProperties;
+ this.kafkaEventService = kafkaEventService;
+ }
+
+ @Override
+ public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) {
+ if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
+ || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) {
+ onWorkflowCompleted(workflow);
+ }
+ }
+
+ @Override
+ public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) {
+ if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
+ || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) {
+ onWorkflowTerminated(workflow);
+ }
+ }
+
+ @Override
+ public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) {
+ if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
+ || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) {
+ onWorkflowFinalized(workflow);
+ }
+ }
+
+ @Override
+ public void onWorkflowFinalized(WorkflowModel workflow) {
+ produceMessage(workflow);
+ }
+
+ @Override
+ public void onWorkflowCompleted(WorkflowModel workflow) {
+ produceMessage(workflow);
+ }
+
+ @Override
+ public void onWorkflowTerminated(WorkflowModel workflow) {
+ produceMessage(workflow);
+ }
+
+ private void produceMessage(WorkflowModel workflowModel) {
+ try {
+ Event
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.services;
+
+public interface KafkaEventService {
+
+
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.services;
+
+import java.net.InetAddress;
+import java.time.OffsetDateTime;
+import java.util.Properties;
+
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.common.serialization.StringSerializer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.netflix.conductor.kafka.status.events.Event;
+import com.netflix.conductor.kafka.status.events.config.KafkaProperties;
+
+public class KafkaEventServiceImpl implements KafkaEventService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventServiceImpl.class);
+
+ @Autowired private final KafkaProperties kafkaProperties;
+
+ public KafkaEventServiceImpl(KafkaProperties kafkaProperties) {
+ this.kafkaProperties = kafkaProperties;
+ }
+
+ @Override
+ public
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.kafka.status.events.services;
+
+import java.util.Map;
+
+import org.apache.kafka.common.errors.SerializationException;
+import org.apache.kafka.common.serialization.Serializer;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.SerializationFeature;
+import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule;
+
+public class KafkaJsonSerializer
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.rabbitmq.config;
+
+import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
+import org.springframework.boot.context.properties.EnableConfigurationProperties;
+import org.springframework.context.annotation.Bean;
+import org.springframework.context.annotation.Configuration;
+
+import com.netflix.conductor.contribs.queue.amqp.AMQPConnection;
+import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern;
+import com.netflix.conductor.core.listener.TaskStatusListener;
+import com.netflix.conductor.core.listener.WorkflowStatusListener;
+import com.netflix.conductor.rabbitmq.listener.TaskStatusPublisherRabbitMQ;
+import com.netflix.conductor.rabbitmq.listener.WorkflowStatusPublisherRabbitMQ;
+import com.netflix.conductor.rabbitmq.services.RabbitMQService;
+import com.netflix.conductor.rabbitmq.services.RabbitMQServiceImpl;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.Address;
+import com.rabbitmq.client.ConnectionFactory;
+
+@Configuration
+@EnableConfigurationProperties(RabbitMQProperties.class)
+@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "rabbitmq")
+public class RabbitMQConfiguration {
+ @Bean
+ public AMQPConnection amqpConnection(RabbitMQProperties rabbitMQProperties) {
+ ConnectionFactory connectionFactory = new ConnectionFactory();
+
+ connectionFactory.setHost(rabbitMQProperties.getHosts());
+ connectionFactory.setPort(rabbitMQProperties.getPort());
+ connectionFactory.setUsername(rabbitMQProperties.getUsername());
+ connectionFactory.setPassword(rabbitMQProperties.getPassword());
+
+ Address[] addresses =
+ new Address[] {
+ new Address(rabbitMQProperties.getHosts(), rabbitMQProperties.getPort())
+ };
+
+ AMQPRetryPattern retryPattern = new AMQPRetryPattern();
+
+ return AMQPConnection.getInstance(connectionFactory, addresses, retryPattern);
+ }
+
+ @Bean
+ public RabbitMQService rabbitMQService(
+ AMQPConnection amqpConnection, ObjectMapper objectMapper) {
+ return new RabbitMQServiceImpl(amqpConnection, objectMapper);
+ }
+
+ @ConditionalOnProperty(
+ name = "conductor.workflow-status-listener.type",
+ havingValue = "rabbitmq",
+ matchIfMissing = false)
+ @Bean
+ public WorkflowStatusListener workflowStatusListenerRabbitMQ(
+ RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) {
+ return new WorkflowStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties);
+ }
+
+ @ConditionalOnProperty(
+ name = "conductor.task-status-listener.type",
+ havingValue = "rabbitmq",
+ matchIfMissing = false)
+ @Bean
+ public TaskStatusListener taskStatusPublisherRabbitMQ(
+ RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) {
+ return new TaskStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties);
+ }
+}
diff --git a/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java
new file mode 100644
index 000000000..f99a70308
--- /dev/null
+++ b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java
@@ -0,0 +1,137 @@
+/*
+ * Copyright 2024 Conductor Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.rabbitmq.config;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.springframework.boot.context.properties.ConfigurationProperties;
+
+import com.rabbitmq.client.ConnectionFactory;
+
+@ConfigurationProperties("conductor.message-publisher.rabbitmq")
+public class RabbitMQProperties {
+ private String hosts = ConnectionFactory.DEFAULT_HOST;
+ private String username = ConnectionFactory.DEFAULT_USER;
+ private String password = ConnectionFactory.DEFAULT_PASS;
+
+ private int port = ConnectionFactory.DEFAULT_AMQP_PORT;
+ private int maxChannelCount = 5000;
+ private int limit = 50;
+ private int duration = 1000;
+
+ private String virtualHost = ConnectionFactory.DEFAULT_VHOST;
+
+ private String allowedTaskStatuses;
+
+ private String workflowStatusExchange;
+ private String taskStatusExchange;
+
+ private boolean alwaysPublishWorkflowStatusEnabled = true;
+
+ public String getHosts() {
+ return hosts;
+ }
+
+ public void setHosts(String hosts) {
+ this.hosts = hosts;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public int getPort() {
+ return port;
+ }
+
+ public void setPort(int port) {
+ this.port = port;
+ }
+
+ public int getMaxChannelCount() {
+ return maxChannelCount;
+ }
+
+ public void setMaxChannelCount(int maxChannelCount) {
+ this.maxChannelCount = maxChannelCount;
+ }
+
+ public int getLimit() {
+ return limit;
+ }
+
+ public void setLimit(int limit) {
+ this.limit = limit;
+ }
+
+ public int getDuration() {
+ return duration;
+ }
+
+ public void setDuration(int duration) {
+ this.duration = duration;
+ }
+
+ public String getVirtualHost() {
+ return virtualHost;
+ }
+
+ public void setVirtualHost(String virtualHost) {
+ this.virtualHost = virtualHost;
+ }
+
+ public String getWorkflowStatusExchange() {
+ return workflowStatusExchange;
+ }
+
+ public void setWorkflowStatusExchange(String workflowStatusExchange) {
+ this.workflowStatusExchange = workflowStatusExchange;
+ }
+
+ public String getTaskStatusExchange() {
+ return taskStatusExchange;
+ }
+
+ public void setTaskStatusExchange(String taskStatusExchange) {
+ this.taskStatusExchange = taskStatusExchange;
+ }
+
+ public List
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.rabbitmq.listener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.conductor.core.listener.TaskStatusListener;
+import com.netflix.conductor.model.TaskModel;
+import com.netflix.conductor.rabbitmq.config.RabbitMQProperties;
+import com.netflix.conductor.rabbitmq.services.RabbitMQService;
+
+public class TaskStatusPublisherRabbitMQ implements TaskStatusListener {
+
+ private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisherRabbitMQ.class);
+
+ private final RabbitMQService rabbitMQService;
+ private final RabbitMQProperties rabbitMQProperties;
+
+ public TaskStatusPublisherRabbitMQ(
+ RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) {
+ this.rabbitMQService = rabbitMQService;
+ this.rabbitMQProperties = rabbitMQProperties;
+ }
+
+ @Override
+ public void onTaskScheduled(TaskModel task) {
+ publishMessage(task);
+ }
+
+ @Override
+ public void onTaskInProgress(TaskModel task) {
+ publishMessage(task);
+ }
+
+ @Override
+ public void onTaskCanceled(TaskModel task) {
+ publishMessage(task);
+ }
+
+ @Override
+ public void onTaskFailed(TaskModel task) {
+ publishMessage(task);
+ }
+
+ @Override
+ public void onTaskFailedWithTerminalError(TaskModel task) {
+ publishMessage(task);
+ }
+
+ @Override
+ public void onTaskCompleted(TaskModel task) {
+ publishMessage(task);
+ }
+
+ @Override
+ public void onTaskCompletedWithErrors(TaskModel task) {
+ publishMessage(task);
+ }
+
+ @Override
+ public void onTaskTimedOut(TaskModel task) {
+ publishMessage(task);
+ }
+
+ @Override
+ public void onTaskSkipped(TaskModel task) {
+ publishMessage(task);
+ }
+
+ private boolean IsStatusEnabled(TaskModel task) {
+ return rabbitMQProperties.getAllowedTaskStatuses().contains(task.getStatus().name());
+ }
+
+ private void publishMessage(TaskModel task) {
+ try {
+ if (IsStatusEnabled(task))
+ rabbitMQService.publishMessage(rabbitMQProperties.getTaskStatusExchange(), task);
+ } catch (Exception e) {
+ LOGGER.error(
+ "Failed to publish message to exchange: {}. Exception: {}",
+ rabbitMQProperties.getTaskStatusExchange(),
+ e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java
new file mode 100644
index 000000000..972d5ae55
--- /dev/null
+++ b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java
@@ -0,0 +1,86 @@
+/*
+ * Copyright 2023 Conductor Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.rabbitmq.listener;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.netflix.conductor.core.listener.WorkflowStatusListener;
+import com.netflix.conductor.model.WorkflowModel;
+import com.netflix.conductor.rabbitmq.config.RabbitMQProperties;
+import com.netflix.conductor.rabbitmq.services.RabbitMQService;
+
+public class WorkflowStatusPublisherRabbitMQ implements WorkflowStatusListener {
+
+ private final Logger LOGGER = LoggerFactory.getLogger(WorkflowStatusPublisherRabbitMQ.class);
+ private final RabbitMQService rabbitMQService;
+ private final RabbitMQProperties rabbitMQProperties;
+
+ public WorkflowStatusPublisherRabbitMQ(
+ RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) {
+ this.rabbitMQService = rabbitMQService;
+ this.rabbitMQProperties = rabbitMQProperties;
+ }
+
+ @Override
+ public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) {
+ if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
+ || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) {
+ onWorkflowCompleted(workflow);
+ }
+ }
+
+ @Override
+ public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) {
+ if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
+ || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) {
+ onWorkflowTerminated(workflow);
+ }
+ }
+
+ @Override
+ public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) {
+ if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled()
+ || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) {
+ onWorkflowFinalized(workflow);
+ }
+ }
+
+ @Override
+ public void onWorkflowCompleted(WorkflowModel workflow) {
+ publishMessage(workflow);
+ }
+
+ @Override
+ public void onWorkflowTerminated(WorkflowModel workflow) {
+ publishMessage(workflow);
+ }
+
+ @Override
+ public void onWorkflowFinalized(WorkflowModel workflow) {
+ publishMessage(workflow);
+ }
+
+ private void publishMessage(WorkflowModel workflow) {
+ try {
+ rabbitMQService.publishMessage(
+ rabbitMQProperties.getWorkflowStatusExchange(), workflow);
+ } catch (Exception e) {
+ LOGGER.error(
+ "Failed to publish message to exchange: {}. Exception: {}",
+ rabbitMQProperties.getWorkflowStatusExchange(),
+ e);
+ throw new RuntimeException(e);
+ }
+ }
+}
diff --git a/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java
new file mode 100644
index 000000000..932785a5a
--- /dev/null
+++ b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java
@@ -0,0 +1,25 @@
+/*
+ * Copyright 2023 Conductor Authors.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.rabbitmq.services;
+
+import com.rabbitmq.client.BuiltinExchangeType;
+
+public interface RabbitMQService {
+
+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
+ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations under the License.
+ */
+package com.netflix.conductor.rabbitmq.services;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.beans.factory.annotation.Autowired;
+
+import com.netflix.conductor.contribs.queue.amqp.AMQPConnection;
+import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType;
+
+import com.fasterxml.jackson.core.JsonProcessingException;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.rabbitmq.client.BuiltinExchangeType;
+import com.rabbitmq.client.Channel;
+
+public class RabbitMQServiceImpl implements RabbitMQService {
+
+ private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQServiceImpl.class);
+
+ private final AMQPConnection amqpConnection;
+
+ @Autowired private final ObjectMapper objectMapper;
+
+ public RabbitMQServiceImpl(AMQPConnection amqpConnection, ObjectMapper objectMapper) {
+ this.amqpConnection = amqpConnection;
+ this.objectMapper = objectMapper;
+ }
+
+ @Override
+ public