Skip to content

Commit

Permalink
- Adding back external storage support + Event refactoring
Browse files Browse the repository at this point in the history
  • Loading branch information
jmigueprieto committed Oct 2, 2024
1 parent 1fc924f commit bfa68e8
Show file tree
Hide file tree
Showing 31 changed files with 1,001 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
import java.io.IOException;
import java.net.InetSocketAddress;

import com.netflix.conductor.client.automator.events.PollCompleted;
import com.netflix.conductor.client.automator.events.PollFailure;
import com.netflix.conductor.client.automator.events.PollStarted;
import com.netflix.conductor.client.automator.events.TaskExecutionCompleted;
import com.netflix.conductor.client.automator.events.TaskExecutionFailure;
import com.netflix.conductor.client.automator.events.TaskExecutionStarted;
import com.netflix.conductor.client.events.task.TaskPayloadUsedEvent;
import com.netflix.conductor.client.events.task.TaskResultPayloadSizeEvent;
import com.netflix.conductor.client.events.taskrunner.PollCompleted;
import com.netflix.conductor.client.events.taskrunner.PollFailure;
import com.netflix.conductor.client.events.taskrunner.PollStarted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted;
import com.netflix.conductor.client.events.workflow.WorkflowInputPayloadSizeEvent;
import com.netflix.conductor.client.events.workflow.WorkflowPayloadUsedEvent;
import com.netflix.conductor.client.events.workflow.WorkflowStartedEvent;
import com.netflix.conductor.client.metrics.MetricsCollector;

import com.sun.net.httpserver.HttpServer;
Expand All @@ -31,8 +36,12 @@ public class PrometheusMetricsCollector implements MetricsCollector {

private static final PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);

private static final int DEFAULT_PORT = 9991;

private static final String DEFAULT_ENDPOINT = "/metrics";

public void startServer() throws IOException {
startServer(9991, "/metrics");
startServer(DEFAULT_PORT, DEFAULT_ENDPOINT);
}

public void startServer(int port, String endpoint) throws IOException {
Expand Down Expand Up @@ -83,4 +92,29 @@ public void consume(TaskExecutionFailure e) {
var timer = prometheusRegistry.timer("task_execution_failure", "type", e.getTaskType());
timer.record(e.getDuration());
}

@Override
public void consume(TaskPayloadUsedEvent e) {
//TODO implement
}

@Override
public void consume(TaskResultPayloadSizeEvent e) {
//TODO implement
}

@Override
public void consume(WorkflowPayloadUsedEvent event) {
//TODO implement
}

@Override
public void consume(WorkflowInputPayloadSizeEvent event) {
//TODO implement
}

@Override
public void consume(WorkflowStartedEvent event) {
//TODO implement
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,26 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.client.automator.events.PollCompleted;
import com.netflix.conductor.client.automator.events.PollFailure;
import com.netflix.conductor.client.automator.events.PollStarted;
import com.netflix.conductor.client.automator.events.TaskExecutionCompleted;
import com.netflix.conductor.client.automator.events.TaskExecutionFailure;
import com.netflix.conductor.client.automator.events.TaskExecutionStarted;
import com.netflix.conductor.client.automator.events.TaskRunnerEvent;
import com.netflix.conductor.client.automator.filters.PollFilter;
import com.netflix.conductor.client.config.PropertyFactory;
import com.netflix.conductor.client.events.dispatcher.EventDispatcher;
import com.netflix.conductor.client.events.taskrunner.PollCompleted;
import com.netflix.conductor.client.events.taskrunner.PollFailure;
import com.netflix.conductor.client.events.taskrunner.PollStarted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted;
import com.netflix.conductor.client.events.taskrunner.TaskRunnerEvent;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
Expand All @@ -66,7 +63,7 @@ class TaskRunner {
private String domain;
private volatile boolean pollingAndExecuting = true;
private final List<PollFilter> pollFilters;
private final Map<Class<? extends TaskRunnerEvent>, List<Consumer<? extends TaskRunnerEvent>>> listeners;
private final EventDispatcher<TaskRunnerEvent> eventDispatcher;

TaskRunner(Worker worker,
TaskClient taskClient,
Expand All @@ -76,7 +73,7 @@ class TaskRunner {
int threadCount,
int taskPollTimeout,
List<PollFilter> pollFilters,
Map<Class<? extends TaskRunnerEvent>, List<Consumer<? extends TaskRunnerEvent>>> listeners) {
EventDispatcher<TaskRunnerEvent> eventDispatcher) {
this.worker = worker;
this.taskClient = taskClient;
this.updateRetryCount = updateRetryCount;
Expand All @@ -85,7 +82,7 @@ class TaskRunner {
this.taskType = worker.getTaskDefName();
this.permits = new Semaphore(threadCount);
this.pollFilters = pollFilters;
this.listeners = listeners;
this.eventDispatcher = eventDispatcher;

//1. Is there a worker level override?
this.domain = PropertyFactory.getString(taskType, Worker.PROP_DOMAIN, null);
Expand Down Expand Up @@ -167,7 +164,7 @@ public void shutdown(int timeout) {
}

private List<Task> pollTasksForWorker() {
publish(new PollStarted(taskType));
eventDispatcher.publish(new PollStarted(taskType));

if (worker.paused()) {
LOGGER.trace("Worker {} has been paused. Not polling anymore!", worker.getClass());
Expand Down Expand Up @@ -199,7 +196,7 @@ private List<Task> pollTasksForWorker() {
stopwatch.stop();
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
LOGGER.debug("Time taken to poll {} task with a batch size of {} is {} ms", taskType, tasks.size(), elapsed);
publish(new PollCompleted(taskType, elapsed));
eventDispatcher.publish(new PollCompleted(taskType, elapsed));
} catch (Throwable e) {
permits.release(pollCount - tasks.size());

Expand All @@ -219,7 +216,7 @@ private List<Task> pollTasksForWorker() {
}

long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
publish(new PollFailure(taskType, elapsed, e));
eventDispatcher.publish(new PollFailure(taskType, elapsed, e));
}

return tasks;
Expand All @@ -243,7 +240,7 @@ private List<Task> pollTask(String domain, int count) {
};

private void processTask(Task task) {
publish(new TaskExecutionStarted(taskType, task.getTaskId(), worker.getIdentity()));
eventDispatcher.publish(new TaskExecutionStarted(taskType, task.getTaskId(), worker.getIdentity()));
LOGGER.trace("Executing task: {} of type: {} in worker: {} at {}", task.getTaskId(), taskType, worker.getClass().getSimpleName(), worker.getIdentity());
LOGGER.trace("task {} is getting executed after {} ms of getting polled", task.getTaskId(), (System.currentTimeMillis() - task.getStartTime()));
Stopwatch stopwatch = Stopwatch.createStarted();
Expand Down Expand Up @@ -280,15 +277,15 @@ private void executeTask(Worker worker, Task task) {
worker.getIdentity());
result = worker.execute(task);
stopwatch.stop();
publish(new TaskExecutionCompleted(taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS)));
eventDispatcher.publish(new TaskExecutionCompleted(taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS)));
result.setWorkflowInstanceId(task.getWorkflowInstanceId());
result.setTaskId(task.getTaskId());
result.setWorkerId(worker.getIdentity());
} catch (Exception e) {
if (stopwatch.isRunning()) {
stopwatch.stop();
}
publish(new TaskExecutionFailure(taskType, task.getTaskId(), worker.getIdentity(), e, stopwatch.elapsed(TimeUnit.MILLISECONDS)));
eventDispatcher.publish(new TaskExecutionFailure(taskType, task.getTaskId(), worker.getIdentity(), e, stopwatch.elapsed(TimeUnit.MILLISECONDS)));

LOGGER.error(
"Unable to execute task: {} of type: {}",
Expand Down Expand Up @@ -350,10 +347,15 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo
}
}

//FIXME
private Optional<String> upload(TaskResult result, String taskType) {
// do nothing
return Optional.empty();
try {
return taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType);
} catch (IllegalArgumentException iae) {
result.setReasonForIncompletion(iae.getMessage());
result.setOutputData(null);
result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
return Optional.empty();
}
}

private <T, R> R retryOperation(Function<T, R> operation, int count, T input, String opName) {
Expand All @@ -370,42 +372,6 @@ private <T, R> R retryOperation(Function<T, R> operation, int count, T input, St
throw new RuntimeException("Exhausted retries performing " + opName);
}

private void publish(TaskRunnerEvent event) {
if (noListeners(event)) {
return;
}

CompletableFuture.runAsync(() -> {
List<Consumer<? extends TaskRunnerEvent>> eventListeners = getEventListeners(event);
for (Consumer<? extends TaskRunnerEvent> listener : eventListeners) {
((Consumer<TaskRunnerEvent>) listener).accept(event);
}
});
}

private boolean noListeners(TaskRunnerEvent event) {
List<Consumer<? extends TaskRunnerEvent>> specificEventListeners = this.listeners.get(event.getClass());
List<Consumer<? extends TaskRunnerEvent>> promiscuousListeners = this.listeners.get(TaskRunnerEvent.class);

return (specificEventListeners == null || specificEventListeners.isEmpty())
&& (promiscuousListeners == null || promiscuousListeners.isEmpty());
}

private List<Consumer<? extends TaskRunnerEvent>> getEventListeners(TaskRunnerEvent event) {
List<Consumer<? extends TaskRunnerEvent>> specificEventListeners = this.listeners.get(event.getClass());
List<Consumer<? extends TaskRunnerEvent>> promiscuousListeners = this.listeners.get(TaskRunnerEvent.class);
if (promiscuousListeners == null || promiscuousListeners.isEmpty()) {
return specificEventListeners;
}

if (specificEventListeners == null || specificEventListeners.isEmpty()) {
return promiscuousListeners;
}

return Stream.concat(specificEventListeners.stream(), promiscuousListeners.stream())
.collect(Collectors.toList());
}

private void handleException(Throwable t, TaskResult result, Worker worker, Task task) {
LOGGER.error(String.format("Error while executing task %s", task.toString()), t);
result.setStatus(TaskResult.Status.FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.client.automator.events.PollCompleted;
import com.netflix.conductor.client.automator.events.PollFailure;
import com.netflix.conductor.client.automator.events.PollStarted;
import com.netflix.conductor.client.automator.events.TaskExecutionCompleted;
import com.netflix.conductor.client.automator.events.TaskExecutionFailure;
import com.netflix.conductor.client.automator.events.TaskExecutionStarted;
import com.netflix.conductor.client.automator.events.TaskRunnerEvent;
import com.netflix.conductor.client.automator.filters.PollFilter;
import com.netflix.conductor.client.config.ConductorClientConfiguration;
import com.netflix.conductor.client.events.dispatcher.EventDispatcher;
import com.netflix.conductor.client.events.listeners.ListenerRegister;
import com.netflix.conductor.client.events.taskrunner.TaskRunnerEvent;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.metrics.MetricsCollector;
import com.netflix.conductor.client.worker.Worker;
Expand All @@ -56,7 +52,7 @@ public class TaskRunnerConfigurer {
private final List<TaskRunner> taskRunners;
private ScheduledExecutorService scheduledExecutorService;
private final List<PollFilter> pollFilters;
private final Map<Class<? extends TaskRunnerEvent>, List<Consumer<? extends TaskRunnerEvent>>> listeners;
private final EventDispatcher<TaskRunnerEvent> eventDispatcher;

/**
* @see TaskRunnerConfigurer.Builder
Expand All @@ -76,7 +72,7 @@ private TaskRunnerConfigurer(TaskRunnerConfigurer.Builder builder) {
this.workers = new LinkedList<>();
this.threadCount = builder.threadCount;
this.pollFilters = builder.pollFilters;
this.listeners = builder.listeners;
this.eventDispatcher = builder.eventDispatcher;
builder.workers.forEach(this.workers::add);
taskRunners = new LinkedList<>();
}
Expand Down Expand Up @@ -163,7 +159,7 @@ private void startWorker(Worker worker) {
threadCountForTask,
taskPollTimeout,
pollFilters,
listeners);
eventDispatcher);
// startWorker(worker) is executed by several threads.
// taskRunners.add(taskRunner) without synchronization could lead to a race condition and unpredictable behavior,
// including potential null values being inserted or corrupted state.
Expand Down Expand Up @@ -194,7 +190,7 @@ public static class Builder {
private Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollTimeout = new HashMap<>();
private Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollCount = new HashMap<>();
private final List<PollFilter> pollFilters = new LinkedList<>();
private final Map<Class<? extends TaskRunnerEvent>, List<Consumer<? extends TaskRunnerEvent>>> listeners = new HashMap<>();
private final EventDispatcher<TaskRunnerEvent> eventDispatcher = new EventDispatcher<>();

public Builder(TaskClient taskClient, Iterable<Worker> workers) {
Preconditions.checkNotNull(taskClient, "TaskClient cannot be null");
Expand Down Expand Up @@ -325,29 +321,12 @@ public Builder withPollFilter(PollFilter filter) {
}

public <T extends TaskRunnerEvent> Builder withListener(Class<T> eventType, Consumer<T> listener) {
listeners.computeIfAbsent(eventType, k -> new LinkedList<>()).add(listener);
eventDispatcher.register(eventType, listener);
return this;
}

public Builder withMetricsCollector(MetricsCollector metricsCollector) {
listeners.computeIfAbsent(PollFailure.class, k -> new LinkedList<>())
.add((Consumer<PollFailure>) metricsCollector::consume);

listeners.computeIfAbsent(PollCompleted.class, k -> new LinkedList<>())
.add((Consumer<PollCompleted>) metricsCollector::consume);

listeners.computeIfAbsent(PollStarted.class, k -> new LinkedList<>())
.add((Consumer<PollStarted>) metricsCollector::consume);

listeners.computeIfAbsent(TaskExecutionStarted.class, k -> new LinkedList<>())
.add((Consumer<TaskExecutionStarted>) metricsCollector::consume);

listeners.computeIfAbsent(TaskExecutionCompleted.class, k -> new LinkedList<>())
.add((Consumer<TaskExecutionCompleted>) metricsCollector::consume);

listeners.computeIfAbsent(TaskExecutionFailure.class, k -> new LinkedList<>())
.add((Consumer<TaskExecutionFailure>) metricsCollector::consume);

ListenerRegister.register(metricsCollector, eventDispatcher);
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ public interface ConductorClientConfiguration {
* and the task/workflow execution fails.
*/
boolean isExternalPayloadStorageEnabled();

/**
* @return the flag which controls whether to enforce or not the barriers on the size
* of workflow and task payloads for both input and output.
* SEE: <a href="https://conductor-oss.github.io/conductor/documentation/advanced/externalpayloadstorage.html">External Payload Storage</a>
*/
boolean isEnforceThresholds();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.config;

/**
* A default implementation of {@link ConductorClientConfiguration} where external payload barriers
* is disabled. SEE: SEE: <a href="https://conductor-oss.github.io/conductor/documentation/advanced/externalpayloadstorage.html">External Payload Storage</a>
*/
public class DefaultConductorClientConfiguration implements ConductorClientConfiguration {

@Override
public int getWorkflowInputPayloadThresholdKB() {
return 5120;
}

@Override
public int getWorkflowInputMaxPayloadThresholdKB() {
return 10240;
}

@Override
public int getTaskOutputPayloadThresholdKB() {
return 3072;
}

@Override
public int getTaskOutputMaxPayloadThresholdKB() {
return 10240;
}

@Override
public boolean isExternalPayloadStorageEnabled() {
return false;
}

@Override
public boolean isEnforceThresholds() {
return false;
}
}
Loading

0 comments on commit bfa68e8

Please sign in to comment.