Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Java Client v4] External storage support + Event refactoring #274

Merged
merged 1 commit into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,17 @@
import java.io.IOException;
import java.net.InetSocketAddress;

import com.netflix.conductor.client.automator.events.PollCompleted;
import com.netflix.conductor.client.automator.events.PollFailure;
import com.netflix.conductor.client.automator.events.PollStarted;
import com.netflix.conductor.client.automator.events.TaskExecutionCompleted;
import com.netflix.conductor.client.automator.events.TaskExecutionFailure;
import com.netflix.conductor.client.automator.events.TaskExecutionStarted;
import com.netflix.conductor.client.events.task.TaskPayloadUsedEvent;
import com.netflix.conductor.client.events.task.TaskResultPayloadSizeEvent;
import com.netflix.conductor.client.events.taskrunner.PollCompleted;
import com.netflix.conductor.client.events.taskrunner.PollFailure;
import com.netflix.conductor.client.events.taskrunner.PollStarted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted;
import com.netflix.conductor.client.events.workflow.WorkflowInputPayloadSizeEvent;
import com.netflix.conductor.client.events.workflow.WorkflowPayloadUsedEvent;
import com.netflix.conductor.client.events.workflow.WorkflowStartedEvent;
import com.netflix.conductor.client.metrics.MetricsCollector;

import com.sun.net.httpserver.HttpServer;
Expand All @@ -31,8 +36,12 @@ public class PrometheusMetricsCollector implements MetricsCollector {

private static final PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT);

private static final int DEFAULT_PORT = 9991;

private static final String DEFAULT_ENDPOINT = "/metrics";

public void startServer() throws IOException {
startServer(9991, "/metrics");
startServer(DEFAULT_PORT, DEFAULT_ENDPOINT);
}

public void startServer(int port, String endpoint) throws IOException {
Expand Down Expand Up @@ -83,4 +92,29 @@ public void consume(TaskExecutionFailure e) {
var timer = prometheusRegistry.timer("task_execution_failure", "type", e.getTaskType());
timer.record(e.getDuration());
}

@Override
public void consume(TaskPayloadUsedEvent e) {
//TODO implement
}

@Override
public void consume(TaskResultPayloadSizeEvent e) {
//TODO implement
}

@Override
public void consume(WorkflowPayloadUsedEvent event) {
//TODO implement
}

@Override
public void consume(WorkflowInputPayloadSizeEvent event) {
//TODO implement
}

@Override
public void consume(WorkflowStartedEvent event) {
//TODO implement
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,29 +19,26 @@
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.Stream;

import org.apache.commons.lang3.concurrent.BasicThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.client.automator.events.PollCompleted;
import com.netflix.conductor.client.automator.events.PollFailure;
import com.netflix.conductor.client.automator.events.PollStarted;
import com.netflix.conductor.client.automator.events.TaskExecutionCompleted;
import com.netflix.conductor.client.automator.events.TaskExecutionFailure;
import com.netflix.conductor.client.automator.events.TaskExecutionStarted;
import com.netflix.conductor.client.automator.events.TaskRunnerEvent;
import com.netflix.conductor.client.automator.filters.PollFilter;
import com.netflix.conductor.client.config.PropertyFactory;
import com.netflix.conductor.client.events.dispatcher.EventDispatcher;
import com.netflix.conductor.client.events.taskrunner.PollCompleted;
import com.netflix.conductor.client.events.taskrunner.PollFailure;
import com.netflix.conductor.client.events.taskrunner.PollStarted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure;
import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted;
import com.netflix.conductor.client.events.taskrunner.TaskRunnerEvent;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.worker.Worker;
import com.netflix.conductor.common.metadata.tasks.Task;
Expand All @@ -66,7 +63,7 @@ class TaskRunner {
private String domain;
private volatile boolean pollingAndExecuting = true;
private final List<PollFilter> pollFilters;
private final Map<Class<? extends TaskRunnerEvent>, List<Consumer<? extends TaskRunnerEvent>>> listeners;
private final EventDispatcher<TaskRunnerEvent> eventDispatcher;

TaskRunner(Worker worker,
TaskClient taskClient,
Expand All @@ -76,7 +73,7 @@ class TaskRunner {
int threadCount,
int taskPollTimeout,
List<PollFilter> pollFilters,
Map<Class<? extends TaskRunnerEvent>, List<Consumer<? extends TaskRunnerEvent>>> listeners) {
EventDispatcher<TaskRunnerEvent> eventDispatcher) {
this.worker = worker;
this.taskClient = taskClient;
this.updateRetryCount = updateRetryCount;
Expand All @@ -85,7 +82,7 @@ class TaskRunner {
this.taskType = worker.getTaskDefName();
this.permits = new Semaphore(threadCount);
this.pollFilters = pollFilters;
this.listeners = listeners;
this.eventDispatcher = eventDispatcher;

//1. Is there a worker level override?
this.domain = PropertyFactory.getString(taskType, Worker.PROP_DOMAIN, null);
Expand Down Expand Up @@ -167,7 +164,7 @@ public void shutdown(int timeout) {
}

private List<Task> pollTasksForWorker() {
publish(new PollStarted(taskType));
eventDispatcher.publish(new PollStarted(taskType));

if (worker.paused()) {
LOGGER.trace("Worker {} has been paused. Not polling anymore!", worker.getClass());
Expand Down Expand Up @@ -199,7 +196,7 @@ private List<Task> pollTasksForWorker() {
stopwatch.stop();
long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
LOGGER.debug("Time taken to poll {} task with a batch size of {} is {} ms", taskType, tasks.size(), elapsed);
publish(new PollCompleted(taskType, elapsed));
eventDispatcher.publish(new PollCompleted(taskType, elapsed));
} catch (Throwable e) {
permits.release(pollCount - tasks.size());

Expand All @@ -219,7 +216,7 @@ private List<Task> pollTasksForWorker() {
}

long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS);
publish(new PollFailure(taskType, elapsed, e));
eventDispatcher.publish(new PollFailure(taskType, elapsed, e));
}

return tasks;
Expand All @@ -243,7 +240,7 @@ private List<Task> pollTask(String domain, int count) {
};

private void processTask(Task task) {
publish(new TaskExecutionStarted(taskType, task.getTaskId(), worker.getIdentity()));
eventDispatcher.publish(new TaskExecutionStarted(taskType, task.getTaskId(), worker.getIdentity()));
LOGGER.trace("Executing task: {} of type: {} in worker: {} at {}", task.getTaskId(), taskType, worker.getClass().getSimpleName(), worker.getIdentity());
LOGGER.trace("task {} is getting executed after {} ms of getting polled", task.getTaskId(), (System.currentTimeMillis() - task.getStartTime()));
Stopwatch stopwatch = Stopwatch.createStarted();
Expand Down Expand Up @@ -280,15 +277,15 @@ private void executeTask(Worker worker, Task task) {
worker.getIdentity());
result = worker.execute(task);
stopwatch.stop();
publish(new TaskExecutionCompleted(taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS)));
eventDispatcher.publish(new TaskExecutionCompleted(taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS)));
result.setWorkflowInstanceId(task.getWorkflowInstanceId());
result.setTaskId(task.getTaskId());
result.setWorkerId(worker.getIdentity());
} catch (Exception e) {
if (stopwatch.isRunning()) {
stopwatch.stop();
}
publish(new TaskExecutionFailure(taskType, task.getTaskId(), worker.getIdentity(), e, stopwatch.elapsed(TimeUnit.MILLISECONDS)));
eventDispatcher.publish(new TaskExecutionFailure(taskType, task.getTaskId(), worker.getIdentity(), e, stopwatch.elapsed(TimeUnit.MILLISECONDS)));

LOGGER.error(
"Unable to execute task: {} of type: {}",
Expand Down Expand Up @@ -350,10 +347,15 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo
}
}

//FIXME
private Optional<String> upload(TaskResult result, String taskType) {
// do nothing
return Optional.empty();
try {
return taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType);
} catch (IllegalArgumentException iae) {
result.setReasonForIncompletion(iae.getMessage());
result.setOutputData(null);
result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR);
return Optional.empty();
}
}

private <T, R> R retryOperation(Function<T, R> operation, int count, T input, String opName) {
Expand All @@ -370,42 +372,6 @@ private <T, R> R retryOperation(Function<T, R> operation, int count, T input, St
throw new RuntimeException("Exhausted retries performing " + opName);
}

private void publish(TaskRunnerEvent event) {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Logic Moved to EventDispatcher

if (noListeners(event)) {
return;
}

CompletableFuture.runAsync(() -> {
List<Consumer<? extends TaskRunnerEvent>> eventListeners = getEventListeners(event);
for (Consumer<? extends TaskRunnerEvent> listener : eventListeners) {
((Consumer<TaskRunnerEvent>) listener).accept(event);
}
});
}

private boolean noListeners(TaskRunnerEvent event) {
List<Consumer<? extends TaskRunnerEvent>> specificEventListeners = this.listeners.get(event.getClass());
List<Consumer<? extends TaskRunnerEvent>> promiscuousListeners = this.listeners.get(TaskRunnerEvent.class);

return (specificEventListeners == null || specificEventListeners.isEmpty())
&& (promiscuousListeners == null || promiscuousListeners.isEmpty());
}

private List<Consumer<? extends TaskRunnerEvent>> getEventListeners(TaskRunnerEvent event) {
List<Consumer<? extends TaskRunnerEvent>> specificEventListeners = this.listeners.get(event.getClass());
List<Consumer<? extends TaskRunnerEvent>> promiscuousListeners = this.listeners.get(TaskRunnerEvent.class);
if (promiscuousListeners == null || promiscuousListeners.isEmpty()) {
return specificEventListeners;
}

if (specificEventListeners == null || specificEventListeners.isEmpty()) {
return promiscuousListeners;
}

return Stream.concat(specificEventListeners.stream(), promiscuousListeners.stream())
.collect(Collectors.toList());
}

private void handleException(Throwable t, TaskResult result, Worker worker, Task task) {
LOGGER.error(String.format("Error while executing task %s", task.toString()), t);
result.setStatus(TaskResult.Status.FAILED);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,15 +24,11 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.netflix.conductor.client.automator.events.PollCompleted;
import com.netflix.conductor.client.automator.events.PollFailure;
import com.netflix.conductor.client.automator.events.PollStarted;
import com.netflix.conductor.client.automator.events.TaskExecutionCompleted;
import com.netflix.conductor.client.automator.events.TaskExecutionFailure;
import com.netflix.conductor.client.automator.events.TaskExecutionStarted;
import com.netflix.conductor.client.automator.events.TaskRunnerEvent;
import com.netflix.conductor.client.automator.filters.PollFilter;
import com.netflix.conductor.client.config.ConductorClientConfiguration;
import com.netflix.conductor.client.events.dispatcher.EventDispatcher;
import com.netflix.conductor.client.events.listeners.ListenerRegister;
import com.netflix.conductor.client.events.taskrunner.TaskRunnerEvent;
import com.netflix.conductor.client.http.TaskClient;
import com.netflix.conductor.client.metrics.MetricsCollector;
import com.netflix.conductor.client.worker.Worker;
Expand All @@ -56,7 +52,7 @@ public class TaskRunnerConfigurer {
private final List<TaskRunner> taskRunners;
private ScheduledExecutorService scheduledExecutorService;
private final List<PollFilter> pollFilters;
private final Map<Class<? extends TaskRunnerEvent>, List<Consumer<? extends TaskRunnerEvent>>> listeners;
private final EventDispatcher<TaskRunnerEvent> eventDispatcher;

/**
* @see TaskRunnerConfigurer.Builder
Expand All @@ -76,7 +72,7 @@ private TaskRunnerConfigurer(TaskRunnerConfigurer.Builder builder) {
this.workers = new LinkedList<>();
this.threadCount = builder.threadCount;
this.pollFilters = builder.pollFilters;
this.listeners = builder.listeners;
this.eventDispatcher = builder.eventDispatcher;
builder.workers.forEach(this.workers::add);
taskRunners = new LinkedList<>();
}
Expand Down Expand Up @@ -163,7 +159,7 @@ private void startWorker(Worker worker) {
threadCountForTask,
taskPollTimeout,
pollFilters,
listeners);
eventDispatcher);
// startWorker(worker) is executed by several threads.
// taskRunners.add(taskRunner) without synchronization could lead to a race condition and unpredictable behavior,
// including potential null values being inserted or corrupted state.
Expand Down Expand Up @@ -194,7 +190,7 @@ public static class Builder {
private Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollTimeout = new HashMap<>();
private Map<String /* taskType */, Integer /* timeoutInMillisecond */> taskPollCount = new HashMap<>();
private final List<PollFilter> pollFilters = new LinkedList<>();
private final Map<Class<? extends TaskRunnerEvent>, List<Consumer<? extends TaskRunnerEvent>>> listeners = new HashMap<>();
private final EventDispatcher<TaskRunnerEvent> eventDispatcher = new EventDispatcher<>();

public Builder(TaskClient taskClient, Iterable<Worker> workers) {
Preconditions.checkNotNull(taskClient, "TaskClient cannot be null");
Expand Down Expand Up @@ -325,29 +321,12 @@ public Builder withPollFilter(PollFilter filter) {
}

public <T extends TaskRunnerEvent> Builder withListener(Class<T> eventType, Consumer<T> listener) {
listeners.computeIfAbsent(eventType, k -> new LinkedList<>()).add(listener);
eventDispatcher.register(eventType, listener);
return this;
}

public Builder withMetricsCollector(MetricsCollector metricsCollector) {
listeners.computeIfAbsent(PollFailure.class, k -> new LinkedList<>())
.add((Consumer<PollFailure>) metricsCollector::consume);

listeners.computeIfAbsent(PollCompleted.class, k -> new LinkedList<>())
.add((Consumer<PollCompleted>) metricsCollector::consume);

listeners.computeIfAbsent(PollStarted.class, k -> new LinkedList<>())
.add((Consumer<PollStarted>) metricsCollector::consume);

listeners.computeIfAbsent(TaskExecutionStarted.class, k -> new LinkedList<>())
.add((Consumer<TaskExecutionStarted>) metricsCollector::consume);

listeners.computeIfAbsent(TaskExecutionCompleted.class, k -> new LinkedList<>())
.add((Consumer<TaskExecutionCompleted>) metricsCollector::consume);

listeners.computeIfAbsent(TaskExecutionFailure.class, k -> new LinkedList<>())
.add((Consumer<TaskExecutionFailure>) metricsCollector::consume);

ListenerRegister.register(metricsCollector, eventDispatcher);
return this;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,4 +47,11 @@ public interface ConductorClientConfiguration {
* and the task/workflow execution fails.
*/
boolean isExternalPayloadStorageEnabled();

/**
* @return the flag which controls whether to enforce or not the barriers on the size
* of workflow and task payloads for both input and output.
* SEE: <a href="https://conductor-oss.github.io/conductor/documentation/advanced/externalpayloadstorage.html">External Payload Storage</a>
*/
boolean isEnforceThresholds();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Copyright 2020 Conductor Authors.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on
* an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the
* specific language governing permissions and limitations under the License.
*/
package com.netflix.conductor.client.config;

/**
* A default implementation of {@link ConductorClientConfiguration} where external payload barriers
* is disabled. SEE: SEE: <a href="https://conductor-oss.github.io/conductor/documentation/advanced/externalpayloadstorage.html">External Payload Storage</a>
*/
public class DefaultConductorClientConfiguration implements ConductorClientConfiguration {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This and the upload/download logic were taken from Client V3 (current OSS client)


@Override
public int getWorkflowInputPayloadThresholdKB() {
return 5120;
}

@Override
public int getWorkflowInputMaxPayloadThresholdKB() {
return 10240;
}

@Override
public int getTaskOutputPayloadThresholdKB() {
return 3072;
}

@Override
public int getTaskOutputMaxPayloadThresholdKB() {
return 10240;
}

@Override
public boolean isExternalPayloadStorageEnabled() {
return false;
}

@Override
public boolean isEnforceThresholds() {
return false;
}
}
Loading
Loading