Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add kafka task and workflow status producers #4

Merged
merged 1 commit into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.kafka.status.events;

import java.time.OffsetDateTime;
import java.util.UUID;

public class Event<T> {
private final String eventId;
private final T payload;
private OffsetDateTime eventTime;
private String eventType;
private String correlationId;
private String domain;
private String title;
private String description;
private String priority;
private OffsetDateTime timeOccurred;

public Event(T payload) {
this.eventId = UUID.randomUUID().toString();
this.payload = payload;
}

public String getEventId() {
return eventId;
}

public T getPayload() {
return payload;
}

public OffsetDateTime getEventTime() {
return eventTime;
}

public void setEventTime(OffsetDateTime eventTime) {
this.eventTime = eventTime;
}

public String getEventType() {
return eventType;
}

public void setEventType(String eventType) {
this.eventType = eventType;
}

public String getCorrelationId() {
return correlationId;
}

public void setCorrelationId(String correlationId) {
this.correlationId = correlationId;
}

public String getDomain() {
return domain;
}

public void setDomain(String domain) {
this.domain = domain;
}

public String getTitle() {
return title;
}

public void setTitle(String title) {
this.title = title;
}

public String getDescription() {
return description;
}

public void setDescription(String description) {
this.description = description;
}

public String getPriority() {
return priority;
}

public void setPriority(String priority) {
this.priority = priority;
}

public OffsetDateTime getTimeOccurred() {
return timeOccurred;
}

public void setTimeOccurred(OffsetDateTime timeOccurred) {
this.timeOccurred = timeOccurred;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.kafka.status.events.config;

import org.springframework.boot.autoconfigure.condition.ConditionalOnProperty;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.core.listener.WorkflowStatusListener;
import com.netflix.conductor.kafka.status.events.listener.TaskStatusKafkaProducer;
import com.netflix.conductor.kafka.status.events.listener.WorkflowStatusKafkaProducer;
import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
import com.netflix.conductor.kafka.status.events.services.KafkaEventServiceImpl;

@Configuration
@EnableConfigurationProperties(KafkaProperties.class)
@ConditionalOnProperty(name = "conductor.message-publisher.type", havingValue = "kafka")
public class KafkaConfiguration {

@Bean
public KafkaEventService kafkaEventService(KafkaProperties kafkaProperties) {
return new KafkaEventServiceImpl(kafkaProperties);
}

@ConditionalOnProperty(
name = "conductor.task-status-listener.type",
havingValue = "kafka",
matchIfMissing = false)
@Bean
public TaskStatusListener taskStatusPublisherRabbitMQ(
KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
return new TaskStatusKafkaProducer(kafkaEventService, kafkaProperties);
}

@ConditionalOnProperty(
name = "conductor.workflow-status-listener.type",
havingValue = "kafka",
matchIfMissing = false)
@Bean
public WorkflowStatusListener workflowStatusListenerRabbitMQ(
KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
return new WorkflowStatusKafkaProducer(kafkaEventService, kafkaProperties);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,67 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.kafka.status.events.config;

import java.util.Arrays;
import java.util.List;

import org.springframework.boot.context.properties.ConfigurationProperties;

@ConfigurationProperties("conductor.message-publisher.kafka")
public class KafkaProperties {
public String getBootstrapServers() {
return bootstrapServers;
}

public void setBootstrapServers(String bootstrapServers) {
this.bootstrapServers = bootstrapServers;
}

public String getTaskStatusTopic() {
return taskStatusTopic;
}

public void setTaskStatusTopic(String taskStatusTopic) {
this.taskStatusTopic = taskStatusTopic;
}

public String getWorkflowStatusTopic() {
return workflowStatusTopic;
}

public void setWorkflowStatusTopic(String workflowStatusTopic) {
this.workflowStatusTopic = workflowStatusTopic;
}

public List<String> getAllowedTaskStatuses() {
return Arrays.asList(this.allowedTaskStatuses.split(","));
}

public void setAllowedTaskStatuses(String allowedTaskStatuses) {
this.allowedTaskStatuses = allowedTaskStatuses;
}

public boolean isAlwaysPublishWorkflowStatusEnabled() {
return alwaysPublishWorkflowStatusEnabled;
}

public void setAlwaysPublishWorkflowStatusEnabled(boolean alwaysPublishWorkflowStatusEnabled) {
this.alwaysPublishWorkflowStatusEnabled = alwaysPublishWorkflowStatusEnabled;
}

private boolean alwaysPublishWorkflowStatusEnabled = true;
private String allowedTaskStatuses;
private String bootstrapServers = "";
private String taskStatusTopic = "conductor-task-status";
private String workflowStatusTopic = "conductor-workflow-status";
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,105 @@
/*
* Copyright 2024 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.kafka.status.events.listener;

import java.time.OffsetDateTime;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.core.listener.TaskStatusListener;
import com.netflix.conductor.kafka.status.events.Event;
import com.netflix.conductor.kafka.status.events.config.KafkaProperties;
import com.netflix.conductor.kafka.status.events.services.KafkaEventService;
import com.netflix.conductor.model.TaskModel;

public class TaskStatusKafkaProducer implements TaskStatusListener {

private final Logger LOGGER = LoggerFactory.getLogger(TaskStatusKafkaProducer.class);
private final KafkaProperties kafkaProperties;
private final KafkaEventService kafkaEventService;

public TaskStatusKafkaProducer(
KafkaEventService kafkaEventService, KafkaProperties kafkaProperties) {
this.kafkaEventService = kafkaEventService;
this.kafkaProperties = kafkaProperties;
}

@Override
public void onTaskCompleted(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskScheduled(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskInProgress(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskCanceled(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskFailed(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskFailedWithTerminalError(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskCompletedWithErrors(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskTimedOut(TaskModel task) {
produceMessage(task);
}

@Override
public void onTaskSkipped(TaskModel task) {
produceMessage(task);
}

private boolean IsStatusEnabled(TaskModel task) {
return kafkaProperties.getAllowedTaskStatuses().contains(task.getStatus().name());
}

private void produceMessage(TaskModel task) {
try {
if (IsStatusEnabled(task)) {
Event<TaskModel> event = new Event<>(task);
event.setEventType("Task." + task.getStatus().name());
event.setEventTime(OffsetDateTime.now());
kafkaEventService.produce(
event.getEventId(), event, kafkaProperties.getTaskStatusTopic());
}

} catch (Exception e) {
LOGGER.error(
"Failed to produce message to topic: {}. Exception: {}",
kafkaProperties.getTaskStatusTopic(),
e);
throw new RuntimeException(e);
}
}
}
Loading
Loading