diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java new file mode 100644 index 000000000..b3fb778bb --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/Event.java @@ -0,0 +1,106 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events; + +import java.time.OffsetDateTime; +import java.util.UUID; + +public class Event { + private final String eventId; + private final T payload; + private OffsetDateTime eventTime; + private String eventType; + private String correlationId; + private String domain; + private String title; + private String description; + private String priority; + private OffsetDateTime timeOccurred; + + public Event(T payload) { + this.eventId = UUID.randomUUID().toString(); + this.payload = payload; + } + + public String getEventId() { + return eventId; + } + + public T getPayload() { + return payload; + } + + public OffsetDateTime getEventTime() { + return eventTime; + } + + public void setEventTime(OffsetDateTime eventTime) { + this.eventTime = eventTime; + } + + public String getEventType() { + return eventType; + } + + public void setEventType(String eventType) { + this.eventType = eventType; + } + + public String getCorrelationId() { + return correlationId; + } + + public void setCorrelationId(String correlationId) { + this.correlationId = correlationId; + } + + public String getDomain() { + return domain; + } + + public void setDomain(String domain) { + this.domain = domain; + } + + public String getTitle() { + return title; + } + + public void setTitle(String title) { + this.title = title; + } + + public String getDescription() { + return description; + } + + public void setDescription(String description) { + this.description = description; + } + + public String getPriority() { + return priority; + } + + public void setPriority(String priority) { + this.priority = priority; + } + + public OffsetDateTime getTimeOccurred() { + return timeOccurred; + } + + public void setTimeOccurred(OffsetDateTime timeOccurred) { + this.timeOccurred = timeOccurred; + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaConfiguration.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaConfiguration.java new file mode 100644 index 000000000..07cff20ab --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaConfiguration.java @@ -0,0 +1,56 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.kafka.status.events.listener.TaskStatusKafkaProducer; +import com.netflix.conductor.kafka.status.events.listener.WorkflowStatusKafkaProducer; +import com.netflix.conductor.kafka.status.events.services.KafkaEventService; +import com.netflix.conductor.kafka.status.events.services.KafkaEventServiceImpl; + +@Configuration +@EnableConfigurationProperties(KafkaProperties.class) +@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "kafka") +public class KafkaConfiguration { + + @Bean + public KafkaEventService kafkaEventService(KafkaProperties kafkaProperties) { + return new KafkaEventServiceImpl(kafkaProperties); + } + + @ConditionalOnProperty( + name = "conductor.task-status-listener.type", + havingValue = "kafka", + matchIfMissing = false) + @Bean + public TaskStatusListener taskStatusPublisherRabbitMQ( + KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) { + return new TaskStatusKafkaProducer(kafkaEventService, kafkaProperties); + } + + @ConditionalOnProperty( + name = "conductor.workflow-status-listener.type", + havingValue = "kafka", + matchIfMissing = false) + @Bean + public WorkflowStatusListener workflowStatusListenerRabbitMQ( + KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) { + return new WorkflowStatusKafkaProducer(kafkaEventService, kafkaProperties); + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java new file mode 100644 index 000000000..b4f3bf07b --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/config/KafkaProperties.java @@ -0,0 +1,67 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.config; + +import java.util.Arrays; +import java.util.List; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +@ConfigurationProperties("conductor.message-publisher.kafka") +public class KafkaProperties { + public String getBootstrapServers() { + return bootstrapServers; + } + + public void setBootstrapServers(String bootstrapServers) { + this.bootstrapServers = bootstrapServers; + } + + public String getTaskStatusTopic() { + return taskStatusTopic; + } + + public void setTaskStatusTopic(String taskStatusTopic) { + this.taskStatusTopic = taskStatusTopic; + } + + public String getWorkflowStatusTopic() { + return workflowStatusTopic; + } + + public void setWorkflowStatusTopic(String workflowStatusTopic) { + this.workflowStatusTopic = workflowStatusTopic; + } + + public List getAllowedTaskStatuses() { + return Arrays.asList(this.allowedTaskStatuses.split(",")); + } + + public void setAllowedTaskStatuses(String allowedTaskStatuses) { + this.allowedTaskStatuses = allowedTaskStatuses; + } + + public boolean isAlwaysPublishWorkflowStatusEnabled() { + return alwaysPublishWorkflowStatusEnabled; + } + + public void setAlwaysPublishWorkflowStatusEnabled(boolean alwaysPublishWorkflowStatusEnabled) { + this.alwaysPublishWorkflowStatusEnabled = alwaysPublishWorkflowStatusEnabled; + } + + private boolean alwaysPublishWorkflowStatusEnabled = true; + private String allowedTaskStatuses; + private String bootstrapServers = ""; + private String taskStatusTopic = "conductor-task-status"; + private String workflowStatusTopic = "conductor-workflow-status"; +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/TaskStatusKafkaProducer.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/TaskStatusKafkaProducer.java new file mode 100644 index 000000000..b6144a3e4 --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/TaskStatusKafkaProducer.java @@ -0,0 +1,105 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.listener; + +import java.time.OffsetDateTime; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.kafka.status.events.Event; +import com.netflix.conductor.kafka.status.events.config.KafkaProperties; +import com.netflix.conductor.kafka.status.events.services.KafkaEventService; +import com.netflix.conductor.model.TaskModel; + +public class TaskStatusKafkaProducer implements TaskStatusListener { + + private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class); + private final KafkaProperties kafkaProperties; + private final KafkaEventService kafkaEventService; + + public TaskStatusKafkaProducer( + KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) { + this.kafkaEventService = kafkaEventService; + this.kafkaProperties = kafkaProperties; + } + + @Override + public void onTaskCompleted(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskScheduled(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskInProgress(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskCanceled(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskFailed(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskFailedWithTerminalError(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskCompletedWithErrors(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskTimedOut(TaskModel task) { + produceMessage(task); + } + + @Override + public void onTaskSkipped(TaskModel task) { + produceMessage(task); + } + + private boolean IsStatusEnabled(TaskModel task) { + return kafkaProperties.getAllowedTaskStatuses().contains(task.getStatus().name()); + } + + private void produceMessage(TaskModel task) { + try { + if (IsStatusEnabled(task)) { + Event event = new Event<>(task); + event.setEventType("Task." + task.getStatus().name()); + event.setEventTime(OffsetDateTime.now()); + kafkaEventService.produce( + event.getEventId(), event, kafkaProperties.getTaskStatusTopic()); + } + + } catch (Exception e) { + LOGGER.error( + "Failed to produce message to topic: {}. Exception: {}", + kafkaProperties.getTaskStatusTopic(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/WorkflowStatusKafkaProducer.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/WorkflowStatusKafkaProducer.java new file mode 100644 index 000000000..f9f78970e --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/listener/WorkflowStatusKafkaProducer.java @@ -0,0 +1,93 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.listener; + +import java.time.OffsetDateTime; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.kafka.status.events.Event; +import com.netflix.conductor.kafka.status.events.config.KafkaProperties; +import com.netflix.conductor.kafka.status.events.services.KafkaEventService; +import com.netflix.conductor.model.WorkflowModel; + +public class WorkflowStatusKafkaProducer implements WorkflowStatusListener { + + private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class); + private final KafkaProperties kafkaProperties; + private final KafkaEventService kafkaEventService; + + public WorkflowStatusKafkaProducer( + KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + this.kafkaEventService = kafkaEventService; + } + + @Override + public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowCompleted(workflow); + } + } + + @Override + public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowTerminated(workflow); + } + } + + @Override + public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || kafkaProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowFinalized(workflow); + } + } + + @Override + public void onWorkflowFinalized(WorkflowModel workflow) { + produceMessage(workflow); + } + + @Override + public void onWorkflowCompleted(WorkflowModel workflow) { + produceMessage(workflow); + } + + @Override + public void onWorkflowTerminated(WorkflowModel workflow) { + produceMessage(workflow); + } + + private void produceMessage(WorkflowModel workflowModel) { + try { + Event event = new Event<>(workflowModel); + event.setEventType("Workflow." + workflowModel.getStatus().name()); + event.setEventTime(OffsetDateTime.now()); + kafkaEventService.produce( + event.getEventId(), event, kafkaProperties.getWorkflowStatusTopic()); + + } catch (Exception e) { + LOGGER.error( + "Failed to produce message to topic: {}. Exception: {}", + kafkaProperties.getWorkflowStatusTopic(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventService.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventService.java new file mode 100644 index 000000000..37b0cddf0 --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventService.java @@ -0,0 +1,20 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.services; + +public interface KafkaEventService { + + void produceEvent(T payload, String topic) throws Exception; + + void produce(String key, V message, String topic) throws Exception; +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java new file mode 100644 index 000000000..62fd7a236 --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaEventServiceImpl.java @@ -0,0 +1,88 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.services; + +import java.net.InetAddress; +import java.time.OffsetDateTime; +import java.util.Properties; + +import org.apache.kafka.clients.producer.KafkaProducer; +import org.apache.kafka.clients.producer.ProducerConfig; +import org.apache.kafka.clients.producer.ProducerRecord; +import org.apache.kafka.common.serialization.StringSerializer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import com.netflix.conductor.kafka.status.events.Event; +import com.netflix.conductor.kafka.status.events.config.KafkaProperties; + +public class KafkaEventServiceImpl implements KafkaEventService { + + private static final Logger LOGGER = LoggerFactory.getLogger(KafkaEventServiceImpl.class); + + @Autowired private final KafkaProperties kafkaProperties; + + public KafkaEventServiceImpl(KafkaProperties kafkaProperties) { + this.kafkaProperties = kafkaProperties; + } + + @Override + public void produceEvent(T payload, String topic) throws Exception { + Event event = new Event<>(payload); + event.setEventTime(OffsetDateTime.now()); + + produce(event.getEventId(), event, topic); + } + + @Override + public void produce(String key, V message, String topic) throws Exception { + + Properties producerConfig = new Properties(); + producerConfig.put( + ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaProperties.getBootstrapServers()); + producerConfig.put(ProducerConfig.ACKS_CONFIG, "all"); + producerConfig.put( + ProducerConfig.CLIENT_ID_CONFIG, InetAddress.getLocalHost().getHostName()); + producerConfig.put( + ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); + + KafkaJsonSerializer jsonSerializer = new KafkaJsonSerializer<>(); + + try (KafkaProducer producer = + new KafkaProducer<>(producerConfig, new StringSerializer(), jsonSerializer)) { + + final ProducerRecord record = new ProducerRecord<>(topic, key, message); + + producer.send( + record, + (metadata, exception) -> { + if (exception != null) { + LOGGER.error( + "Failed to send message to Kafka topic: {}", topic, exception); + } else { + LOGGER.info( + "Message sent to topic: {} with offset: {}", + topic, + metadata.offset()); + } + }); + + producer.flush(); + + } catch (Exception e) { + LOGGER.error("Failed to create producer.", e); + throw e; + } + } +} diff --git a/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaJsonSerializer.java b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaJsonSerializer.java new file mode 100644 index 000000000..73be059b3 --- /dev/null +++ b/kafka/src/main/java/com/netflix/conductor/kafka/status/events/services/KafkaJsonSerializer.java @@ -0,0 +1,49 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.kafka.status.events.services; + +import java.util.Map; + +import org.apache.kafka.common.errors.SerializationException; +import org.apache.kafka.common.serialization.Serializer; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; + +public class KafkaJsonSerializer implements Serializer { + + private final ObjectMapper objectMapper; + + public KafkaJsonSerializer() { + this.objectMapper = new ObjectMapper(); + this.objectMapper.registerModule(new JavaTimeModule()); + this.objectMapper.disable(SerializationFeature.WRITE_DATES_AS_TIMESTAMPS); + } + + @Override + public void configure(Map configs, boolean isKey) {} + + @Override + public byte[] serialize(String topic, T data) { + try { + return this.objectMapper.writeValueAsBytes(data); + } catch (JsonProcessingException e) { + throw new SerializationException(e); + } + } + + @Override + public void close() {} +} diff --git a/rabbitmq-message-publisher/build.gradle b/rabbitmq-message-publisher/build.gradle new file mode 100644 index 000000000..0f9929643 --- /dev/null +++ b/rabbitmq-message-publisher/build.gradle @@ -0,0 +1,21 @@ +plugins { + id 'groovy' +} +dependencies { + + implementation project(':conductor-common') + implementation project(':conductor-core') + implementation project(':conductor-amqp') + + implementation "com.rabbitmq:amqp-client:${revAmqpClient}" + + implementation "org.apache.commons:commons-lang3:" + implementation "com.google.guava:guava:${revGuava}" + implementation "io.reactivex:rxjava:${revRxJava}" + + compileOnly 'org.springframework.boot:spring-boot-starter' + compileOnly 'org.springframework.boot:spring-boot-starter-web' + + testImplementation platform('org.junit:junit-bom:5.9.1') + testImplementation 'org.junit.jupiter:junit-jupiter' +} diff --git a/rabbitmq-message-publisher/dependencies.lock b/rabbitmq-message-publisher/dependencies.lock new file mode 100644 index 000000000..d4a23229a --- /dev/null +++ b/rabbitmq-message-publisher/dependencies.lock @@ -0,0 +1,505 @@ +{ + "annotationProcessor": { + "org.springframework.boot:spring-boot-configuration-processor": { + "locked": "3.1.4" + } + }, + "compileClasspath": { + "com.google.guava:guava": { + "locked": "30.0-jre" + }, + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "project": true + }, + "com.netflix.conductor:conductor-core": { + "project": true + }, + "com.rabbitmq:amqp-client": { + "locked": "5.13.0" + }, + "io.reactivex:rxjava": { + "locked": "1.2.2" + }, + "org.apache.commons:commons-lang3": { + "locked": "3.12.0" + }, + "org.apache.logging.log4j:log4j-api": { + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-core": { + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-jul": { + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-web": { + "locked": "2.20.0" + }, + "org.springframework.boot:spring-boot-starter": { + "locked": "3.1.4" + }, + "org.springframework.boot:spring-boot-starter-web": { + "locked": "3.1.4" + } + }, + "runtimeClasspath": { + "com.fasterxml.jackson.core:jackson-annotations": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "2.15.2" + }, + "com.fasterxml.jackson.core:jackson-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.15.2" + }, + "com.fasterxml.jackson.core:jackson-databind": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.15.2" + }, + "com.fasterxml.jackson.module:jackson-module-afterburner": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common" + ], + "locked": "2.15.2" + }, + "com.github.ben-manes.caffeine:caffeine": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "3.1.8" + }, + "com.google.guava:guava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "30.0-jre" + }, + "com.google.protobuf:protobuf-java": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "3.21.12" + }, + "com.jayway.jsonpath:json-path": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "2.8.0" + }, + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-annotations": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common" + ], + "project": true + }, + "com.netflix.conductor:conductor-common": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-core" + ], + "project": true + }, + "com.netflix.conductor:conductor-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "project": true + }, + "com.netflix.spectator:spectator-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "0.122.0" + }, + "com.rabbitmq:amqp-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "5.13.0" + }, + "com.spotify:completable-futures": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "0.3.3" + }, + "commons-io:commons-io": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "2.7" + }, + "io.reactivex:rxjava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-core" + ], + "locked": "1.2.2" + }, + "jakarta.activation:jakarta.activation-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "2.1.2" + }, + "jakarta.xml.bind:jakarta.xml.bind-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "4.0.1" + }, + "org.apache.bval:bval-jsr": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.0.5" + }, + "org.apache.commons:commons-lang3": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "3.12.0" + }, + "org.apache.logging.log4j:log4j-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-jul": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-web": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.openjdk.nashorn:nashorn-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "15.4" + }, + "org.springdoc:springdoc-openapi-starter-webmvc-ui": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common" + ], + "locked": "2.1.0" + } + }, + "testCompileClasspath": { + "com.google.guava:guava": { + "locked": "30.0-jre" + }, + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-common": { + "project": true + }, + "com.netflix.conductor:conductor-core": { + "project": true + }, + "com.rabbitmq:amqp-client": { + "locked": "5.13.0" + }, + "io.reactivex:rxjava": { + "locked": "1.2.2" + }, + "junit:junit": { + "locked": "4.13.2" + }, + "org.apache.commons:commons-lang3": { + "locked": "3.12.0" + }, + "org.apache.logging.log4j:log4j-api": { + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-core": { + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-jul": { + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-web": { + "locked": "2.20.0" + }, + "org.junit.jupiter:junit-jupiter": { + "locked": "5.9.3" + }, + "org.junit.vintage:junit-vintage-engine": { + "locked": "5.9.3" + }, + "org.junit:junit-bom": { + "locked": "5.9.1" + }, + "org.springframework.boot:spring-boot-starter-log4j2": { + "locked": "3.1.4" + }, + "org.springframework.boot:spring-boot-starter-test": { + "locked": "3.1.4" + } + }, + "testRuntimeClasspath": { + "com.fasterxml.jackson.core:jackson-annotations": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "2.15.2" + }, + "com.fasterxml.jackson.core:jackson-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.15.2" + }, + "com.fasterxml.jackson.core:jackson-databind": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.15.2" + }, + "com.fasterxml.jackson.module:jackson-module-afterburner": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common" + ], + "locked": "2.15.2" + }, + "com.github.ben-manes.caffeine:caffeine": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "3.1.8" + }, + "com.google.guava:guava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "30.0-jre" + }, + "com.google.protobuf:protobuf-java": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "3.21.12" + }, + "com.jayway.jsonpath:json-path": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "2.8.0" + }, + "com.netflix.conductor:conductor-amqp": { + "project": true + }, + "com.netflix.conductor:conductor-annotations": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common" + ], + "project": true + }, + "com.netflix.conductor:conductor-common": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-core" + ], + "project": true + }, + "com.netflix.conductor:conductor-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "project": true + }, + "com.netflix.spectator:spectator-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "0.122.0" + }, + "com.rabbitmq:amqp-client": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp" + ], + "locked": "5.13.0" + }, + "com.spotify:completable-futures": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "0.3.3" + }, + "commons-io:commons-io": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "2.7" + }, + "io.reactivex:rxjava": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-core" + ], + "locked": "1.2.2" + }, + "jakarta.activation:jakarta.activation-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "2.1.2" + }, + "jakarta.xml.bind:jakarta.xml.bind-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "4.0.1" + }, + "junit:junit": { + "locked": "4.13.2" + }, + "org.apache.bval:bval-jsr": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.0.5" + }, + "org.apache.commons:commons-lang3": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "3.12.0" + }, + "org.apache.logging.log4j:log4j-api": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-jul": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-slf4j-impl": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.apache.logging.log4j:log4j-web": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-amqp", + "com.netflix.conductor:conductor-annotations", + "com.netflix.conductor:conductor-common", + "com.netflix.conductor:conductor-core" + ], + "locked": "2.20.0" + }, + "org.junit.jupiter:junit-jupiter": { + "locked": "5.9.3" + }, + "org.junit.vintage:junit-vintage-engine": { + "locked": "5.9.3" + }, + "org.junit:junit-bom": { + "locked": "5.9.1" + }, + "org.openjdk.nashorn:nashorn-core": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-core" + ], + "locked": "15.4" + }, + "org.springdoc:springdoc-openapi-starter-webmvc-ui": { + "firstLevelTransitive": [ + "com.netflix.conductor:conductor-common" + ], + "locked": "2.1.0" + }, + "org.springframework.boot:spring-boot-starter-log4j2": { + "locked": "3.1.4" + }, + "org.springframework.boot:spring-boot-starter-test": { + "locked": "3.1.4" + } + } +} \ No newline at end of file diff --git a/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java new file mode 100644 index 000000000..0033bf5a8 --- /dev/null +++ b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQConfiguration.java @@ -0,0 +1,81 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.config; + +import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty; +import org.springframework.boot.context.properties.EnableConfigurationProperties; +import org.springframework.context.annotation.Bean; +import org.springframework.context.annotation.Configuration; + +import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; +import com.netflix.conductor.contribs.queue.amqp.config.AMQPRetryPattern; +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.rabbitmq.listener.TaskStatusPublisherRabbitMQ; +import com.netflix.conductor.rabbitmq.listener.WorkflowStatusPublisherRabbitMQ; +import com.netflix.conductor.rabbitmq.services.RabbitMQService; +import com.netflix.conductor.rabbitmq.services.RabbitMQServiceImpl; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.Address; +import com.rabbitmq.client.ConnectionFactory; + +@Configuration +@EnableConfigurationProperties(RabbitMQProperties.class) +@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "rabbitmq") +public class RabbitMQConfiguration { + @Bean + public AMQPConnection amqpConnection(RabbitMQProperties rabbitMQProperties) { + ConnectionFactory connectionFactory = new ConnectionFactory(); + + connectionFactory.setHost(rabbitMQProperties.getHosts()); + connectionFactory.setPort(rabbitMQProperties.getPort()); + connectionFactory.setUsername(rabbitMQProperties.getUsername()); + connectionFactory.setPassword(rabbitMQProperties.getPassword()); + + Address[] addresses = + new Address[] { + new Address(rabbitMQProperties.getHosts(), rabbitMQProperties.getPort()) + }; + + AMQPRetryPattern retryPattern = new AMQPRetryPattern(); + + return AMQPConnection.getInstance(connectionFactory, addresses, retryPattern); + } + + @Bean + public RabbitMQService rabbitMQService( + AMQPConnection amqpConnection, ObjectMapper objectMapper) { + return new RabbitMQServiceImpl(amqpConnection, objectMapper); + } + + @ConditionalOnProperty( + name = "conductor.workflow-status-listener.type", + havingValue = "rabbitmq", + matchIfMissing = false) + @Bean + public WorkflowStatusListener workflowStatusListenerRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + return new WorkflowStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties); + } + + @ConditionalOnProperty( + name = "conductor.task-status-listener.type", + havingValue = "rabbitmq", + matchIfMissing = false) + @Bean + public TaskStatusListener taskStatusPublisherRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + return new TaskStatusPublisherRabbitMQ(rabbitMQService, rabbitMQProperties); + } +} diff --git a/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java new file mode 100644 index 000000000..f99a70308 --- /dev/null +++ b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/config/RabbitMQProperties.java @@ -0,0 +1,137 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.config; + +import java.util.Arrays; +import java.util.List; + +import org.springframework.boot.context.properties.ConfigurationProperties; + +import com.rabbitmq.client.ConnectionFactory; + +@ConfigurationProperties("conductor.message-publisher.rabbitmq") +public class RabbitMQProperties { + private String hosts = ConnectionFactory.DEFAULT_HOST; + private String username = ConnectionFactory.DEFAULT_USER; + private String password = ConnectionFactory.DEFAULT_PASS; + + private int port = ConnectionFactory.DEFAULT_AMQP_PORT; + private int maxChannelCount = 5000; + private int limit = 50; + private int duration = 1000; + + private String virtualHost = ConnectionFactory.DEFAULT_VHOST; + + private String allowedTaskStatuses; + + private String workflowStatusExchange; + private String taskStatusExchange; + + private boolean alwaysPublishWorkflowStatusEnabled = true; + + public String getHosts() { + return hosts; + } + + public void setHosts(String hosts) { + this.hosts = hosts; + } + + public String getUsername() { + return username; + } + + public void setUsername(String username) { + this.username = username; + } + + public String getPassword() { + return password; + } + + public void setPassword(String password) { + this.password = password; + } + + public int getPort() { + return port; + } + + public void setPort(int port) { + this.port = port; + } + + public int getMaxChannelCount() { + return maxChannelCount; + } + + public void setMaxChannelCount(int maxChannelCount) { + this.maxChannelCount = maxChannelCount; + } + + public int getLimit() { + return limit; + } + + public void setLimit(int limit) { + this.limit = limit; + } + + public int getDuration() { + return duration; + } + + public void setDuration(int duration) { + this.duration = duration; + } + + public String getVirtualHost() { + return virtualHost; + } + + public void setVirtualHost(String virtualHost) { + this.virtualHost = virtualHost; + } + + public String getWorkflowStatusExchange() { + return workflowStatusExchange; + } + + public void setWorkflowStatusExchange(String workflowStatusExchange) { + this.workflowStatusExchange = workflowStatusExchange; + } + + public String getTaskStatusExchange() { + return taskStatusExchange; + } + + public void setTaskStatusExchange(String taskStatusExchange) { + this.taskStatusExchange = taskStatusExchange; + } + + public List getAllowedTaskStatuses() { + return Arrays.asList(this.allowedTaskStatuses.split(",")); + } + + public void setAllowedTaskStatuses(String allowedTaskStatuses) { + this.allowedTaskStatuses = allowedTaskStatuses; + } + + public boolean isAlwaysPublishWorkflowStatusEnabled() { + return alwaysPublishWorkflowStatusEnabled; + } + + public void setAlwaysPublishWorkflowStatusEnabled(boolean alwaysPublishWorkflowStatusEnabled) { + this.alwaysPublishWorkflowStatusEnabled = alwaysPublishWorkflowStatusEnabled; + } +} diff --git a/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java new file mode 100644 index 000000000..20c1d073d --- /dev/null +++ b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/TaskStatusPublisherRabbitMQ.java @@ -0,0 +1,97 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.TaskStatusListener; +import com.netflix.conductor.model.TaskModel; +import com.netflix.conductor.rabbitmq.config.RabbitMQProperties; +import com.netflix.conductor.rabbitmq.services.RabbitMQService; + +public class TaskStatusPublisherRabbitMQ implements TaskStatusListener { + + private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusPublisherRabbitMQ.class); + + private final RabbitMQService rabbitMQService; + private final RabbitMQProperties rabbitMQProperties; + + public TaskStatusPublisherRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + this.rabbitMQService = rabbitMQService; + this.rabbitMQProperties = rabbitMQProperties; + } + + @Override + public void onTaskScheduled(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskInProgress(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskCanceled(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskFailed(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskFailedWithTerminalError(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskCompleted(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskCompletedWithErrors(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskTimedOut(TaskModel task) { + publishMessage(task); + } + + @Override + public void onTaskSkipped(TaskModel task) { + publishMessage(task); + } + + private boolean IsStatusEnabled(TaskModel task) { + return rabbitMQProperties.getAllowedTaskStatuses().contains(task.getStatus().name()); + } + + private void publishMessage(TaskModel task) { + try { + if (IsStatusEnabled(task)) + rabbitMQService.publishMessage(rabbitMQProperties.getTaskStatusExchange(), task); + } catch (Exception e) { + LOGGER.error( + "Failed to publish message to exchange: {}. Exception: {}", + rabbitMQProperties.getTaskStatusExchange(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java new file mode 100644 index 000000000..972d5ae55 --- /dev/null +++ b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/listener/WorkflowStatusPublisherRabbitMQ.java @@ -0,0 +1,86 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.listener; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.netflix.conductor.core.listener.WorkflowStatusListener; +import com.netflix.conductor.model.WorkflowModel; +import com.netflix.conductor.rabbitmq.config.RabbitMQProperties; +import com.netflix.conductor.rabbitmq.services.RabbitMQService; + +public class WorkflowStatusPublisherRabbitMQ implements WorkflowStatusListener { + + private final Logger LOGGER = LoggerFactory.getLogger(WorkflowStatusPublisherRabbitMQ.class); + private final RabbitMQService rabbitMQService; + private final RabbitMQProperties rabbitMQProperties; + + public WorkflowStatusPublisherRabbitMQ( + RabbitMQService rabbitMQService, RabbitMQProperties rabbitMQProperties) { + this.rabbitMQService = rabbitMQService; + this.rabbitMQProperties = rabbitMQProperties; + } + + @Override + public void onWorkflowCompletedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowCompleted(workflow); + } + } + + @Override + public void onWorkflowTerminatedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowTerminated(workflow); + } + } + + @Override + public void onWorkflowFinalizedIfEnabled(WorkflowModel workflow) { + if (workflow.getWorkflowDefinition().isWorkflowStatusListenerEnabled() + || rabbitMQProperties.isAlwaysPublishWorkflowStatusEnabled()) { + onWorkflowFinalized(workflow); + } + } + + @Override + public void onWorkflowCompleted(WorkflowModel workflow) { + publishMessage(workflow); + } + + @Override + public void onWorkflowTerminated(WorkflowModel workflow) { + publishMessage(workflow); + } + + @Override + public void onWorkflowFinalized(WorkflowModel workflow) { + publishMessage(workflow); + } + + private void publishMessage(WorkflowModel workflow) { + try { + rabbitMQService.publishMessage( + rabbitMQProperties.getWorkflowStatusExchange(), workflow); + } catch (Exception e) { + LOGGER.error( + "Failed to publish message to exchange: {}. Exception: {}", + rabbitMQProperties.getWorkflowStatusExchange(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java new file mode 100644 index 000000000..932785a5a --- /dev/null +++ b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQService.java @@ -0,0 +1,25 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.services; + +import com.rabbitmq.client.BuiltinExchangeType; + +public interface RabbitMQService { + void publishMessage(String exchangeName, T content) throws Exception; + + void publishMessage( + String exchangeName, BuiltinExchangeType exchangeType, String routingKey, T content) + throws Exception; + + void close(); +} diff --git a/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java new file mode 100644 index 000000000..e8c5bee6d --- /dev/null +++ b/rabbitmq-message-publisher/src/main/java/com/netflix/conductor/rabbitmq/services/RabbitMQServiceImpl.java @@ -0,0 +1,84 @@ +/* + * Copyright 2023 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.rabbitmq.services; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; + +import com.netflix.conductor.contribs.queue.amqp.AMQPConnection; +import com.netflix.conductor.contribs.queue.amqp.util.ConnectionType; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.rabbitmq.client.BuiltinExchangeType; +import com.rabbitmq.client.Channel; + +public class RabbitMQServiceImpl implements RabbitMQService { + + private static final Logger LOGGER = LoggerFactory.getLogger(RabbitMQServiceImpl.class); + + private final AMQPConnection amqpConnection; + + @Autowired private final ObjectMapper objectMapper; + + public RabbitMQServiceImpl(AMQPConnection amqpConnection, ObjectMapper objectMapper) { + this.amqpConnection = amqpConnection; + this.objectMapper = objectMapper; + } + + @Override + public void publishMessage(String exchangeName, T content) throws Exception { + Channel channel = amqpConnection.getOrCreateChannel(ConnectionType.PUBLISHER, exchangeName); + + channel.exchangeDeclare(exchangeName, BuiltinExchangeType.FANOUT, false); + + String jsonMessage = serializeContent(content); + + channel.basicPublish(exchangeName, "", null, jsonMessage.getBytes()); + + amqpConnection.returnChannel(ConnectionType.PUBLISHER, channel); + } + + @Override + public void publishMessage( + String exchangeName, BuiltinExchangeType exchangeType, String routingKey, T content) + throws Exception { + Channel channel = amqpConnection.getOrCreateChannel(ConnectionType.PUBLISHER, exchangeName); + + channel.exchangeDeclare(exchangeName, exchangeType, false); + + String jsonMessage = serializeContent(content); + + channel.basicPublish(exchangeName, routingKey, null, jsonMessage.getBytes()); + + amqpConnection.returnChannel(ConnectionType.PUBLISHER, channel); + } + + @Override + public void close() { + amqpConnection.close(); + } + + private String serializeContent(T content) { + try { + return objectMapper.writeValueAsString(content); + } catch (JsonProcessingException e) { + LOGGER.error( + "Failed to serialize message of type: {} to String. Exception: {}", + content.getClass(), + e); + throw new RuntimeException(e); + } + } +} diff --git a/server/build.gradle b/server/build.gradle index 09c0ce0c2..bfc483055 100644 --- a/server/build.gradle +++ b/server/build.gradle @@ -56,6 +56,8 @@ dependencies { //Event Listener implementation project(':conductor-workflow-event-listener') + implementation project(':conductor-rabbitmq-message-publisher') + implementation 'org.springframework.boot:spring-boot-starter' implementation 'org.springframework.boot:spring-boot-starter-validation' diff --git a/settings.gradle b/settings.gradle index 605765ede..4bd05f0b3 100644 --- a/settings.gradle +++ b/settings.gradle @@ -77,6 +77,8 @@ include 'nats' include 'nats-streaming' include 'test-harness' +include 'rabbitmq-message-publisher' rootProject.children.each {it.name="conductor-${it.name}"} +