Skip to content

Commit

Permalink
add kafka task and workflow status producers
Browse files Browse the repository at this point in the history
  • Loading branch information
darkobuvac committed Nov 19, 2024
1 parent bfe3f79 commit 678dafe
Show file tree
Hide file tree
Showing 8 changed files with 584 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.kafka.status.events;

import java.time.OffsetDateTime;
import java.util.UUID;

public class Event<T> {
private final String eventId;
private final T payload;
private OffsetDateTime eventTime;
private String eventType;
private String correlationId;
private String domain;
private String title;
private String description;
private String priority;
private OffsetDateTime timeOccurred;

public Event(T payload) {
this.eventId = UUID.randomUUID().toString();
this.payload = payload;
}

public String getEventId() {
return eventId;
}

public T getPayload() {
return payload;
}

public OffsetDateTime getEventTime() {
return eventTime;
}

public void setEventTime(OffsetDateTime eventTime) {
this.eventTime = eventTime;
}

public String getEventType() {
return eventType;
}

public void setEventType(String eventType) {
this.eventType = eventType;
}

public String getCorrelationId() {
return correlationId;
}

public void setCorrelationId(String correlationId) {
this.correlationId = correlationId;
}

public String getDomain() {
return domain;
}

public void setDomain(String domain) {
this.domain = domain;
}

public String getTitle() {
return title;
}

public void setTitle(String title) {
this.title = title;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public String getPriority() {
return priority;
}

public void setPriority(String priority) {
this.priority = priority;
}

public OffsetDateTime getTimeOccurred() {
return timeOccurred;
}

public void setTimeOccurred(OffsetDateTime timeOccurred) {
this.timeOccurred = timeOccurred;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.kafka.status.events.config;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.kafka.status.events.listener.TaskStatusKafkaProducer;
import com.netflix.conductor.kafka.status.events.listener.WorkflowStatusKafkaProducer;
import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
import com.netflix.conductor.kafka.status.events.services.KafkaEventServiceImpl;

@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "kafka")
public class KafkaConfiguration {

@Bean
public KafkaEventService kafkaEventService(KafkaProperties kafkaProperties) {
return new KafkaEventServiceImpl(kafkaProperties);
}

@ConditionalOnProperty(
name = "conductor.task-status-listener.type",
havingValue = "kafka",
matchIfMissing = false)
@Bean
public TaskStatusListener taskStatusPublisherRabbitMQ(
KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
return new TaskStatusKafkaProducer(kafkaEventService, kafkaProperties);
}

@ConditionalOnProperty(
name = "conductor.workflow-status-listener.type",
havingValue = "kafka",
matchIfMissing = false)
@Bean
public WorkflowStatusListener workflowStatusListenerRabbitMQ(
KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
return new WorkflowStatusKafkaProducer(kafkaEventService, kafkaProperties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.kafka.status.events.config;

import java.util.Arrays;
import java.util.List;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("conductor.message-publisher.kafka")
public class KafkaProperties {
public String getBootstrapServers() {
return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public String getTaskStatusTopic() {
return taskStatusTopic;
}

public void setTaskStatusTopic(String taskStatusTopic) {
this.taskStatusTopic = taskStatusTopic;
}

public String getWorkflowStatusTopic() {
return workflowStatusTopic;
}

public void setWorkflowStatusTopic(String workflowStatusTopic) {
this.workflowStatusTopic = workflowStatusTopic;
}

public List<String> getAllowedTaskStatuses() {
return Arrays.asList(this.allowedTaskStatuses.split(","));
}

public void setAllowedTaskStatuses(String allowedTaskStatuses) {
this.allowedTaskStatuses = allowedTaskStatuses;
}

public boolean isAlwaysPublishWorkflowStatusEnabled() {
return alwaysPublishWorkflowStatusEnabled;
}

public void setAlwaysPublishWorkflowStatusEnabled(boolean alwaysPublishWorkflowStatusEnabled) {
this.alwaysPublishWorkflowStatusEnabled = alwaysPublishWorkflowStatusEnabled;
}

private boolean alwaysPublishWorkflowStatusEnabled = true;
private String allowedTaskStatuses;
private String bootstrapServers = "";
private String taskStatusTopic = "conductor-task-status";
private String workflowStatusTopic = "conductor-workflow-status";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.kafka.status.events.listener;

import java.time.OffsetDateTime;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.kafka.status.events.Event;
import com.netflix.conductor.kafka.status.events.config.KafkaProperties;
import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
import com.netflix.conductor.model.TaskModel;

public class TaskStatusKafkaProducer implements TaskStatusListener {

private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class);
private final KafkaProperties kafkaProperties;
private final KafkaEventService kafkaEventService;

public TaskStatusKafkaProducer(
KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
this.kafkaEventService = kafkaEventService;
this.kafkaProperties = kafkaProperties;
}

@Override
public void onTaskCompleted(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskScheduled(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskInProgress(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskCanceled(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskFailed(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskFailedWithTerminalError(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskCompletedWithErrors(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskTimedOut(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskSkipped(TaskModel task) {
produceMessage(task);
}

private boolean IsStatusEnabled(TaskModel task) {
return kafkaProperties.getAllowedTaskStatuses().contains(task.getStatus().name());
}

private void produceMessage(TaskModel task) {
try {
if (IsStatusEnabled(task)) {
Event<TaskModel> event = new Event<>(task);
event.setEventType("Task." + task.getStatus().name());
event.setEventTime(OffsetDateTime.now());
kafkaEventService.produce(
event.getEventId(), event, kafkaProperties.getTaskStatusTopic());
}

} catch (Exception e) {
LOGGER.error(
"Failed to produce message to topic: {}. Exception: {}",
kafkaProperties.getTaskStatusTopic(),
e);
throw new RuntimeException(e);
}
}
}
Loading

0 comments on commit 678dafe

Please sign in to comment.