From bfa68e82817cf1f5560f0db0793fdf285740b138 Mon Sep 17 00:00:00 2001 From: Miguel Prieto Date: Mon, 30 Sep 2024 17:55:59 -0400 Subject: [PATCH] - Adding back external storage support + Event refactoring --- .../PrometheusMetricsCollector.java | 48 ++++- .../client/automator/TaskRunner.java | 84 +++------ .../automator/TaskRunnerConfigurer.java | 39 +--- .../config/ConductorClientConfiguration.java | 7 + .../DefaultConductorClientConfiguration.java | 50 +++++ .../client/events/ConductorClientEvent.java | 24 +++ .../events/dispatcher/EventDispatcher.java | 90 +++++++++ .../events/listeners/ListenerRegister.java | 52 ++++++ .../events/listeners/TaskClientListener.java | 27 +++ .../listeners/TaskRunnerEventsListener.java | 36 ++++ .../listeners/WorkflowClientListener.java | 28 +++ .../client/events/task/TaskClientEvent.java | 26 +++ .../events/task/TaskPayloadUsedEvent.java | 28 +++ .../task/TaskResultPayloadSizeEvent.java | 25 +++ .../taskrunner}/PollCompleted.java | 2 +- .../taskrunner}/PollFailure.java | 2 +- .../taskrunner}/PollStarted.java | 2 +- .../taskrunner}/TaskExecutionCompleted.java | 2 +- .../taskrunner}/TaskExecutionFailure.java | 2 +- .../taskrunner}/TaskExecutionStarted.java | 2 +- .../taskrunner}/TaskRunnerEvent.java | 7 +- .../events/workflow/WorkflowClientEvent.java | 27 +++ .../WorkflowInputPayloadSizeEvent.java | 26 +++ .../workflow/WorkflowPayloadUsedEvent.java | 28 +++ .../events/workflow/WorkflowStartedEvent.java | 33 ++++ .../conductor/client/http/PayloadStorage.java | 174 ++++++++++++++++++ .../conductor/client/http/TaskClient.java | 123 ++++++++++++- .../conductor/client/http/WorkflowClient.java | 118 +++++++++++- .../client/metrics/MetricsCollector.java | 22 +-- .../examples/events/EventListenerExample.java | 6 +- .../client/http/OrkesTaskClient.java | 5 - 31 files changed, 1001 insertions(+), 144 deletions(-) create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/config/DefaultConductorClientConfiguration.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/ConductorClientEvent.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/dispatcher/EventDispatcher.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/ListenerRegister.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/TaskClientListener.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/TaskRunnerEventsListener.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/WorkflowClientListener.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskClientEvent.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskPayloadUsedEvent.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskResultPayloadSizeEvent.java rename conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/{automator/events => events/taskrunner}/PollCompleted.java (94%) rename conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/{automator/events => events/taskrunner}/PollFailure.java (94%) rename conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/{automator/events => events/taskrunner}/PollStarted.java (93%) rename conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/{automator/events => events/taskrunner}/TaskExecutionCompleted.java (95%) rename conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/{automator/events => events/taskrunner}/TaskExecutionFailure.java (95%) rename conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/{automator/events => events/taskrunner}/TaskExecutionStarted.java (94%) rename conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/{automator/events => events/taskrunner}/TaskRunnerEvent.java (79%) create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowClientEvent.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowInputPayloadSizeEvent.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowPayloadUsedEvent.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowStartedEvent.java create mode 100644 conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client-metrics/src/main/java/com/netflix/conductor/client/metrics/prometheus/PrometheusMetricsCollector.java b/conductor-clients/java/conductor-java-sdk/conductor-client-metrics/src/main/java/com/netflix/conductor/client/metrics/prometheus/PrometheusMetricsCollector.java index 307da9109..e94f633f2 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client-metrics/src/main/java/com/netflix/conductor/client/metrics/prometheus/PrometheusMetricsCollector.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client-metrics/src/main/java/com/netflix/conductor/client/metrics/prometheus/PrometheusMetricsCollector.java @@ -15,12 +15,17 @@ import java.io.IOException; import java.net.InetSocketAddress; -import com.netflix.conductor.client.automator.events.PollCompleted; -import com.netflix.conductor.client.automator.events.PollFailure; -import com.netflix.conductor.client.automator.events.PollStarted; -import com.netflix.conductor.client.automator.events.TaskExecutionCompleted; -import com.netflix.conductor.client.automator.events.TaskExecutionFailure; -import com.netflix.conductor.client.automator.events.TaskExecutionStarted; +import com.netflix.conductor.client.events.task.TaskPayloadUsedEvent; +import com.netflix.conductor.client.events.task.TaskResultPayloadSizeEvent; +import com.netflix.conductor.client.events.taskrunner.PollCompleted; +import com.netflix.conductor.client.events.taskrunner.PollFailure; +import com.netflix.conductor.client.events.taskrunner.PollStarted; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted; +import com.netflix.conductor.client.events.workflow.WorkflowInputPayloadSizeEvent; +import com.netflix.conductor.client.events.workflow.WorkflowPayloadUsedEvent; +import com.netflix.conductor.client.events.workflow.WorkflowStartedEvent; import com.netflix.conductor.client.metrics.MetricsCollector; import com.sun.net.httpserver.HttpServer; @@ -31,8 +36,12 @@ public class PrometheusMetricsCollector implements MetricsCollector { private static final PrometheusMeterRegistry prometheusRegistry = new PrometheusMeterRegistry(PrometheusConfig.DEFAULT); + private static final int DEFAULT_PORT = 9991; + + private static final String DEFAULT_ENDPOINT = "/metrics"; + public void startServer() throws IOException { - startServer(9991, "/metrics"); + startServer(DEFAULT_PORT, DEFAULT_ENDPOINT); } public void startServer(int port, String endpoint) throws IOException { @@ -83,4 +92,29 @@ public void consume(TaskExecutionFailure e) { var timer = prometheusRegistry.timer("task_execution_failure", "type", e.getTaskType()); timer.record(e.getDuration()); } + + @Override + public void consume(TaskPayloadUsedEvent e) { + //TODO implement + } + + @Override + public void consume(TaskResultPayloadSizeEvent e) { + //TODO implement + } + + @Override + public void consume(WorkflowPayloadUsedEvent event) { + //TODO implement + } + + @Override + public void consume(WorkflowInputPayloadSizeEvent event) { + //TODO implement + } + + @Override + public void consume(WorkflowStartedEvent event) { + //TODO implement + } } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java index 3c3d9bfbd..c481bd456 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunner.java @@ -19,29 +19,26 @@ import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Semaphore; import java.util.concurrent.TimeUnit; -import java.util.function.Consumer; import java.util.function.Function; -import java.util.stream.Collectors; -import java.util.stream.Stream; import org.apache.commons.lang3.concurrent.BasicThreadFactory; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.netflix.conductor.client.automator.events.PollCompleted; -import com.netflix.conductor.client.automator.events.PollFailure; -import com.netflix.conductor.client.automator.events.PollStarted; -import com.netflix.conductor.client.automator.events.TaskExecutionCompleted; -import com.netflix.conductor.client.automator.events.TaskExecutionFailure; -import com.netflix.conductor.client.automator.events.TaskExecutionStarted; -import com.netflix.conductor.client.automator.events.TaskRunnerEvent; import com.netflix.conductor.client.automator.filters.PollFilter; import com.netflix.conductor.client.config.PropertyFactory; +import com.netflix.conductor.client.events.dispatcher.EventDispatcher; +import com.netflix.conductor.client.events.taskrunner.PollCompleted; +import com.netflix.conductor.client.events.taskrunner.PollFailure; +import com.netflix.conductor.client.events.taskrunner.PollStarted; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted; +import com.netflix.conductor.client.events.taskrunner.TaskRunnerEvent; import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; @@ -66,7 +63,7 @@ class TaskRunner { private String domain; private volatile boolean pollingAndExecuting = true; private final List pollFilters; - private final Map, List>> listeners; + private final EventDispatcher eventDispatcher; TaskRunner(Worker worker, TaskClient taskClient, @@ -76,7 +73,7 @@ class TaskRunner { int threadCount, int taskPollTimeout, List pollFilters, - Map, List>> listeners) { + EventDispatcher eventDispatcher) { this.worker = worker; this.taskClient = taskClient; this.updateRetryCount = updateRetryCount; @@ -85,7 +82,7 @@ class TaskRunner { this.taskType = worker.getTaskDefName(); this.permits = new Semaphore(threadCount); this.pollFilters = pollFilters; - this.listeners = listeners; + this.eventDispatcher = eventDispatcher; //1. Is there a worker level override? this.domain = PropertyFactory.getString(taskType, Worker.PROP_DOMAIN, null); @@ -167,7 +164,7 @@ public void shutdown(int timeout) { } private List pollTasksForWorker() { - publish(new PollStarted(taskType)); + eventDispatcher.publish(new PollStarted(taskType)); if (worker.paused()) { LOGGER.trace("Worker {} has been paused. Not polling anymore!", worker.getClass()); @@ -199,7 +196,7 @@ private List pollTasksForWorker() { stopwatch.stop(); long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); LOGGER.debug("Time taken to poll {} task with a batch size of {} is {} ms", taskType, tasks.size(), elapsed); - publish(new PollCompleted(taskType, elapsed)); + eventDispatcher.publish(new PollCompleted(taskType, elapsed)); } catch (Throwable e) { permits.release(pollCount - tasks.size()); @@ -219,7 +216,7 @@ private List pollTasksForWorker() { } long elapsed = stopwatch.elapsed(TimeUnit.MILLISECONDS); - publish(new PollFailure(taskType, elapsed, e)); + eventDispatcher.publish(new PollFailure(taskType, elapsed, e)); } return tasks; @@ -243,7 +240,7 @@ private List pollTask(String domain, int count) { }; private void processTask(Task task) { - publish(new TaskExecutionStarted(taskType, task.getTaskId(), worker.getIdentity())); + eventDispatcher.publish(new TaskExecutionStarted(taskType, task.getTaskId(), worker.getIdentity())); LOGGER.trace("Executing task: {} of type: {} in worker: {} at {}", task.getTaskId(), taskType, worker.getClass().getSimpleName(), worker.getIdentity()); LOGGER.trace("task {} is getting executed after {} ms of getting polled", task.getTaskId(), (System.currentTimeMillis() - task.getStartTime())); Stopwatch stopwatch = Stopwatch.createStarted(); @@ -280,7 +277,7 @@ private void executeTask(Worker worker, Task task) { worker.getIdentity()); result = worker.execute(task); stopwatch.stop(); - publish(new TaskExecutionCompleted(taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS))); + eventDispatcher.publish(new TaskExecutionCompleted(taskType, task.getTaskId(), worker.getIdentity(), stopwatch.elapsed(TimeUnit.MILLISECONDS))); result.setWorkflowInstanceId(task.getWorkflowInstanceId()); result.setTaskId(task.getTaskId()); result.setWorkerId(worker.getIdentity()); @@ -288,7 +285,7 @@ private void executeTask(Worker worker, Task task) { if (stopwatch.isRunning()) { stopwatch.stop(); } - publish(new TaskExecutionFailure(taskType, task.getTaskId(), worker.getIdentity(), e, stopwatch.elapsed(TimeUnit.MILLISECONDS))); + eventDispatcher.publish(new TaskExecutionFailure(taskType, task.getTaskId(), worker.getIdentity(), e, stopwatch.elapsed(TimeUnit.MILLISECONDS))); LOGGER.error( "Unable to execute task: {} of type: {}", @@ -350,10 +347,15 @@ private void updateTaskResult(int count, Task task, TaskResult result, Worker wo } } - //FIXME private Optional upload(TaskResult result, String taskType) { - // do nothing - return Optional.empty(); + try { + return taskClient.evaluateAndUploadLargePayload(result.getOutputData(), taskType); + } catch (IllegalArgumentException iae) { + result.setReasonForIncompletion(iae.getMessage()); + result.setOutputData(null); + result.setStatus(TaskResult.Status.FAILED_WITH_TERMINAL_ERROR); + return Optional.empty(); + } } private R retryOperation(Function operation, int count, T input, String opName) { @@ -370,42 +372,6 @@ private R retryOperation(Function operation, int count, T input, St throw new RuntimeException("Exhausted retries performing " + opName); } - private void publish(TaskRunnerEvent event) { - if (noListeners(event)) { - return; - } - - CompletableFuture.runAsync(() -> { - List> eventListeners = getEventListeners(event); - for (Consumer listener : eventListeners) { - ((Consumer) listener).accept(event); - } - }); - } - - private boolean noListeners(TaskRunnerEvent event) { - List> specificEventListeners = this.listeners.get(event.getClass()); - List> promiscuousListeners = this.listeners.get(TaskRunnerEvent.class); - - return (specificEventListeners == null || specificEventListeners.isEmpty()) - && (promiscuousListeners == null || promiscuousListeners.isEmpty()); - } - - private List> getEventListeners(TaskRunnerEvent event) { - List> specificEventListeners = this.listeners.get(event.getClass()); - List> promiscuousListeners = this.listeners.get(TaskRunnerEvent.class); - if (promiscuousListeners == null || promiscuousListeners.isEmpty()) { - return specificEventListeners; - } - - if (specificEventListeners == null || specificEventListeners.isEmpty()) { - return promiscuousListeners; - } - - return Stream.concat(specificEventListeners.stream(), promiscuousListeners.stream()) - .collect(Collectors.toList()); - } - private void handleException(Throwable t, TaskResult result, Worker worker, Task task) { LOGGER.error(String.format("Error while executing task %s", task.toString()), t); result.setStatus(TaskResult.Status.FAILED); diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java index a04acd622..b04ed2ed1 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/TaskRunnerConfigurer.java @@ -24,15 +24,11 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.netflix.conductor.client.automator.events.PollCompleted; -import com.netflix.conductor.client.automator.events.PollFailure; -import com.netflix.conductor.client.automator.events.PollStarted; -import com.netflix.conductor.client.automator.events.TaskExecutionCompleted; -import com.netflix.conductor.client.automator.events.TaskExecutionFailure; -import com.netflix.conductor.client.automator.events.TaskExecutionStarted; -import com.netflix.conductor.client.automator.events.TaskRunnerEvent; import com.netflix.conductor.client.automator.filters.PollFilter; import com.netflix.conductor.client.config.ConductorClientConfiguration; +import com.netflix.conductor.client.events.dispatcher.EventDispatcher; +import com.netflix.conductor.client.events.listeners.ListenerRegister; +import com.netflix.conductor.client.events.taskrunner.TaskRunnerEvent; import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.client.metrics.MetricsCollector; import com.netflix.conductor.client.worker.Worker; @@ -56,7 +52,7 @@ public class TaskRunnerConfigurer { private final List taskRunners; private ScheduledExecutorService scheduledExecutorService; private final List pollFilters; - private final Map, List>> listeners; + private final EventDispatcher eventDispatcher; /** * @see TaskRunnerConfigurer.Builder @@ -76,7 +72,7 @@ private TaskRunnerConfigurer(TaskRunnerConfigurer.Builder builder) { this.workers = new LinkedList<>(); this.threadCount = builder.threadCount; this.pollFilters = builder.pollFilters; - this.listeners = builder.listeners; + this.eventDispatcher = builder.eventDispatcher; builder.workers.forEach(this.workers::add); taskRunners = new LinkedList<>(); } @@ -163,7 +159,7 @@ private void startWorker(Worker worker) { threadCountForTask, taskPollTimeout, pollFilters, - listeners); + eventDispatcher); // startWorker(worker) is executed by several threads. // taskRunners.add(taskRunner) without synchronization could lead to a race condition and unpredictable behavior, // including potential null values being inserted or corrupted state. @@ -194,7 +190,7 @@ public static class Builder { private Map taskPollTimeout = new HashMap<>(); private Map taskPollCount = new HashMap<>(); private final List pollFilters = new LinkedList<>(); - private final Map, List>> listeners = new HashMap<>(); + private final EventDispatcher eventDispatcher = new EventDispatcher<>(); public Builder(TaskClient taskClient, Iterable workers) { Preconditions.checkNotNull(taskClient, "TaskClient cannot be null"); @@ -325,29 +321,12 @@ public Builder withPollFilter(PollFilter filter) { } public Builder withListener(Class eventType, Consumer listener) { - listeners.computeIfAbsent(eventType, k -> new LinkedList<>()).add(listener); + eventDispatcher.register(eventType, listener); return this; } public Builder withMetricsCollector(MetricsCollector metricsCollector) { - listeners.computeIfAbsent(PollFailure.class, k -> new LinkedList<>()) - .add((Consumer) metricsCollector::consume); - - listeners.computeIfAbsent(PollCompleted.class, k -> new LinkedList<>()) - .add((Consumer) metricsCollector::consume); - - listeners.computeIfAbsent(PollStarted.class, k -> new LinkedList<>()) - .add((Consumer) metricsCollector::consume); - - listeners.computeIfAbsent(TaskExecutionStarted.class, k -> new LinkedList<>()) - .add((Consumer) metricsCollector::consume); - - listeners.computeIfAbsent(TaskExecutionCompleted.class, k -> new LinkedList<>()) - .add((Consumer) metricsCollector::consume); - - listeners.computeIfAbsent(TaskExecutionFailure.class, k -> new LinkedList<>()) - .add((Consumer) metricsCollector::consume); - + ListenerRegister.register(metricsCollector, eventDispatcher); return this; } } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java index bb5a28926..d43f2518b 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/config/ConductorClientConfiguration.java @@ -47,4 +47,11 @@ public interface ConductorClientConfiguration { * and the task/workflow execution fails. */ boolean isExternalPayloadStorageEnabled(); + + /** + * @return the flag which controls whether to enforce or not the barriers on the size + * of workflow and task payloads for both input and output. + * SEE: External Payload Storage + */ + boolean isEnforceThresholds(); } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/config/DefaultConductorClientConfiguration.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/config/DefaultConductorClientConfiguration.java new file mode 100644 index 000000000..9ba023b98 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/config/DefaultConductorClientConfiguration.java @@ -0,0 +1,50 @@ +/* + * Copyright 2020 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.config; + +/** + * A default implementation of {@link ConductorClientConfiguration} where external payload barriers + * is disabled. SEE: SEE: External Payload Storage + */ +public class DefaultConductorClientConfiguration implements ConductorClientConfiguration { + + @Override + public int getWorkflowInputPayloadThresholdKB() { + return 5120; + } + + @Override + public int getWorkflowInputMaxPayloadThresholdKB() { + return 10240; + } + + @Override + public int getTaskOutputPayloadThresholdKB() { + return 3072; + } + + @Override + public int getTaskOutputMaxPayloadThresholdKB() { + return 10240; + } + + @Override + public boolean isExternalPayloadStorageEnabled() { + return false; + } + + @Override + public boolean isEnforceThresholds() { + return false; + } +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/ConductorClientEvent.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/ConductorClientEvent.java new file mode 100644 index 000000000..c1e3045f3 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/ConductorClientEvent.java @@ -0,0 +1,24 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events; + +import java.time.Instant; + +import lombok.Getter; +import lombok.ToString; + +@Getter +@ToString +public abstract class ConductorClientEvent { + private final Instant time = Instant.now(); +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/dispatcher/EventDispatcher.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/dispatcher/EventDispatcher.java new file mode 100644 index 000000000..5bfdbd7fd --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/dispatcher/EventDispatcher.java @@ -0,0 +1,90 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.dispatcher; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CopyOnWriteArrayList; +import java.util.function.Consumer; +import java.util.stream.Collectors; +import java.util.stream.Stream; + +import com.netflix.conductor.client.events.ConductorClientEvent; + +public class EventDispatcher { + + private final Map, List>> listeners; + + public EventDispatcher() { + this.listeners = new ConcurrentHashMap<>(); + } + + public void register(Class clazz, Consumer listener) { + // CopyOnWriteArrayList is thread-safe. It's particularly useful in scenarios where + // reads are far more frequent than writes (typical for event listeners). + listeners.computeIfAbsent(clazz, k -> new CopyOnWriteArrayList<>()).add(listener); + } + + public void unregister(Class clazz, Consumer listener) { + List> consumers = listeners.get(clazz); + if (consumers != null) { + consumers.remove(listener); + if (consumers.isEmpty()) { + listeners.remove(clazz); + } + } + } + + public void publish(T event) { + if (noListeners(event)) { + return; + } + + CompletableFuture.runAsync(() -> { + List> eventListeners = getEventListeners(event); + for (Consumer listener : eventListeners) { + ((Consumer) listener).accept(event); + } + }); + } + + private boolean noListeners(T event) { + if (listeners.isEmpty()) { + return true; + } + + var specificEventListeners = listeners.get(event.getClass()); + var promiscuousListeners = listeners.get(ConductorClientEvent.class); + + return (specificEventListeners == null || specificEventListeners.isEmpty()) + && (promiscuousListeners == null || promiscuousListeners.isEmpty()); + } + + private List> getEventListeners(T event) { + var specificEventListeners = listeners.get(event.getClass()); + var promiscuousListeners = listeners.get(ConductorClientEvent.class); + if (promiscuousListeners == null || promiscuousListeners.isEmpty()) { + return specificEventListeners; + } + + if (specificEventListeners == null || specificEventListeners.isEmpty()) { + return promiscuousListeners; + } + + return Stream.concat(specificEventListeners.stream(), promiscuousListeners.stream()) + .collect(Collectors.toList()); + } + +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/ListenerRegister.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/ListenerRegister.java new file mode 100644 index 000000000..2b0db419d --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/ListenerRegister.java @@ -0,0 +1,52 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.listeners; + +import com.netflix.conductor.client.events.dispatcher.EventDispatcher; +import com.netflix.conductor.client.events.task.TaskClientEvent; +import com.netflix.conductor.client.events.task.TaskPayloadUsedEvent; +import com.netflix.conductor.client.events.task.TaskResultPayloadSizeEvent; +import com.netflix.conductor.client.events.taskrunner.PollCompleted; +import com.netflix.conductor.client.events.taskrunner.PollFailure; +import com.netflix.conductor.client.events.taskrunner.PollStarted; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted; +import com.netflix.conductor.client.events.taskrunner.TaskRunnerEvent; +import com.netflix.conductor.client.events.workflow.WorkflowClientEvent; +import com.netflix.conductor.client.events.workflow.WorkflowInputPayloadSizeEvent; +import com.netflix.conductor.client.events.workflow.WorkflowPayloadUsedEvent; +import com.netflix.conductor.client.events.workflow.WorkflowStartedEvent; + +public class ListenerRegister { + + public static void register(TaskRunnerEventsListener listener, EventDispatcher dispatcher) { + dispatcher.register(PollFailure.class, listener::consume); + dispatcher.register(PollCompleted.class, listener::consume); + dispatcher.register(PollStarted.class, listener::consume); + dispatcher.register(TaskExecutionStarted.class, listener::consume); + dispatcher.register(TaskExecutionCompleted.class, listener::consume); + dispatcher.register(TaskExecutionFailure.class, listener::consume); + } + + public static void register(TaskClientListener listener, EventDispatcher dispatcher) { + dispatcher.register(TaskResultPayloadSizeEvent.class, listener::consume); + dispatcher.register(TaskPayloadUsedEvent.class, listener::consume); + } + + public static void register(WorkflowClientListener listener, EventDispatcher dispatcher) { + dispatcher.register(WorkflowStartedEvent.class, listener::consume); + dispatcher.register(WorkflowInputPayloadSizeEvent.class, listener::consume); + dispatcher.register(WorkflowPayloadUsedEvent.class, listener::consume); + } +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/TaskClientListener.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/TaskClientListener.java new file mode 100644 index 000000000..d1ec8b74c --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/TaskClientListener.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.listeners; + +import com.netflix.conductor.client.events.task.TaskPayloadUsedEvent; +import com.netflix.conductor.client.events.task.TaskResultPayloadSizeEvent; + +/** + * TaskPayloadUsedEvent and TaskRecordPayloadSizeEvent are only published when + * conductorClientConfiguration.isEnforceThresholds == true. + */ +public interface TaskClientListener { + + void consume(TaskPayloadUsedEvent e); + + void consume(TaskResultPayloadSizeEvent e); +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/TaskRunnerEventsListener.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/TaskRunnerEventsListener.java new file mode 100644 index 000000000..b5b0c4f40 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/TaskRunnerEventsListener.java @@ -0,0 +1,36 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.listeners; + +import com.netflix.conductor.client.events.taskrunner.PollCompleted; +import com.netflix.conductor.client.events.taskrunner.PollFailure; +import com.netflix.conductor.client.events.taskrunner.PollStarted; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionCompleted; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionStarted; + +public interface TaskRunnerEventsListener { + + void consume(PollFailure e); + + void consume(PollCompleted e); + + void consume(PollStarted e); + + void consume(TaskExecutionStarted e); + + void consume(TaskExecutionCompleted e); + + void consume(TaskExecutionFailure e); + +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/WorkflowClientListener.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/WorkflowClientListener.java new file mode 100644 index 000000000..8e47e4dcf --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/listeners/WorkflowClientListener.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.listeners; + +import com.netflix.conductor.client.events.workflow.WorkflowInputPayloadSizeEvent; +import com.netflix.conductor.client.events.workflow.WorkflowPayloadUsedEvent; +import com.netflix.conductor.client.events.workflow.WorkflowStartedEvent; + +/** + * WorkflowPayloadUsedEvent and WorkflowRecordPayloadSizeEvent are only published when + * conductorClientConfiguration.isEnforceThresholds == true. + */ +public interface WorkflowClientListener { + + void consume(WorkflowPayloadUsedEvent event); + void consume(WorkflowInputPayloadSizeEvent event); + void consume(WorkflowStartedEvent event); +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskClientEvent.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskClientEvent.java new file mode 100644 index 000000000..18687aedd --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskClientEvent.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.task; + +import com.netflix.conductor.client.events.ConductorClientEvent; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +@AllArgsConstructor +@Getter +@ToString +public abstract class TaskClientEvent extends ConductorClientEvent { + private final String taskType; +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskPayloadUsedEvent.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskPayloadUsedEvent.java new file mode 100644 index 000000000..35db493dd --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskPayloadUsedEvent.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.task; + +import lombok.Getter; + +@Getter +public class TaskPayloadUsedEvent extends TaskClientEvent { + + private final String operation; + private final String payloadType; + + public TaskPayloadUsedEvent(String taskType, String operation, String payloadType) { + super(taskType); + this.operation = operation; + this.payloadType = payloadType; + } +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskResultPayloadSizeEvent.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskResultPayloadSizeEvent.java new file mode 100644 index 000000000..6a6baa809 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/task/TaskResultPayloadSizeEvent.java @@ -0,0 +1,25 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.task; + +import lombok.Getter; + +@Getter +public class TaskResultPayloadSizeEvent extends TaskClientEvent { + private final long size; + + public TaskResultPayloadSizeEvent(String taskType, long size) { + super(taskType); + this.size = size; + } +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/PollCompleted.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/PollCompleted.java similarity index 94% rename from conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/PollCompleted.java rename to conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/PollCompleted.java index 15274c1b9..6383feb46 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/PollCompleted.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/PollCompleted.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package com.netflix.conductor.client.automator.events; +package com.netflix.conductor.client.events.taskrunner; import java.time.Duration; diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/PollFailure.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/PollFailure.java similarity index 94% rename from conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/PollFailure.java rename to conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/PollFailure.java index 3a5d375dd..5905023e1 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/PollFailure.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/PollFailure.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package com.netflix.conductor.client.automator.events; +package com.netflix.conductor.client.events.taskrunner; import java.time.Duration; diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/PollStarted.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/PollStarted.java similarity index 93% rename from conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/PollStarted.java rename to conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/PollStarted.java index 62068bfb0..c97e6167c 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/PollStarted.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/PollStarted.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package com.netflix.conductor.client.automator.events; +package com.netflix.conductor.client.events.taskrunner; import lombok.ToString; diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskExecutionCompleted.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskExecutionCompleted.java similarity index 95% rename from conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskExecutionCompleted.java rename to conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskExecutionCompleted.java index 2ca962d1c..409ed54a2 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskExecutionCompleted.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskExecutionCompleted.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package com.netflix.conductor.client.automator.events; +package com.netflix.conductor.client.events.taskrunner; import java.time.Duration; diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskExecutionFailure.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskExecutionFailure.java similarity index 95% rename from conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskExecutionFailure.java rename to conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskExecutionFailure.java index 3ec43c7b2..e608ca9f6 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskExecutionFailure.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskExecutionFailure.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package com.netflix.conductor.client.automator.events; +package com.netflix.conductor.client.events.taskrunner; import java.time.Duration; diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskExecutionStarted.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskExecutionStarted.java similarity index 94% rename from conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskExecutionStarted.java rename to conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskExecutionStarted.java index 6ab388653..78f41a65f 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskExecutionStarted.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskExecutionStarted.java @@ -10,7 +10,7 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package com.netflix.conductor.client.automator.events; +package com.netflix.conductor.client.events.taskrunner; import lombok.Getter; import lombok.ToString; diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskRunnerEvent.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskRunnerEvent.java similarity index 79% rename from conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskRunnerEvent.java rename to conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskRunnerEvent.java index 48474f134..dbc56a421 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/automator/events/TaskRunnerEvent.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/taskrunner/TaskRunnerEvent.java @@ -10,9 +10,9 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. */ -package com.netflix.conductor.client.automator.events; +package com.netflix.conductor.client.events.taskrunner; -import java.time.Instant; +import com.netflix.conductor.client.events.ConductorClientEvent; import lombok.AllArgsConstructor; import lombok.Getter; @@ -21,7 +21,6 @@ @AllArgsConstructor @Getter @ToString -public abstract class TaskRunnerEvent { - private final Instant time = Instant.now(); +public abstract class TaskRunnerEvent extends ConductorClientEvent { private final String taskType; } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowClientEvent.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowClientEvent.java new file mode 100644 index 000000000..835ab448f --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowClientEvent.java @@ -0,0 +1,27 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.workflow; + +import com.netflix.conductor.client.events.ConductorClientEvent; + +import lombok.AllArgsConstructor; +import lombok.Getter; +import lombok.ToString; + +@AllArgsConstructor +@Getter +@ToString +public abstract class WorkflowClientEvent extends ConductorClientEvent { + private final String name; + private final Integer version; +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowInputPayloadSizeEvent.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowInputPayloadSizeEvent.java new file mode 100644 index 000000000..048edc558 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowInputPayloadSizeEvent.java @@ -0,0 +1,26 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.workflow; + +import lombok.Getter; + +@Getter +public class WorkflowInputPayloadSizeEvent extends WorkflowClientEvent { + + private final long size; + + public WorkflowInputPayloadSizeEvent(String name, Integer version, long size) { + super(name, version); + this.size = size; + } +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowPayloadUsedEvent.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowPayloadUsedEvent.java new file mode 100644 index 000000000..9781ac9d4 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowPayloadUsedEvent.java @@ -0,0 +1,28 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.workflow; + +import lombok.Getter; + +@Getter +public class WorkflowPayloadUsedEvent extends WorkflowClientEvent { + + private final String operation; + private final String payloadType; + + public WorkflowPayloadUsedEvent(String name, Integer version, String operation, String payloadType) { + super(name, version); + this.operation = operation; + this.payloadType = payloadType; + } +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowStartedEvent.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowStartedEvent.java new file mode 100644 index 000000000..0998a21d6 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/events/workflow/WorkflowStartedEvent.java @@ -0,0 +1,33 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.events.workflow; + +import lombok.Getter; + +@Getter +public class WorkflowStartedEvent extends WorkflowClientEvent { + + private final boolean success; + + private final Throwable throwable; + + public WorkflowStartedEvent(String name, Integer version) { + this(name, version, true, null); + } + + public WorkflowStartedEvent(String name, Integer version, boolean success, Throwable t) { + super(name, version); + this.success = success; + this.throwable = t; + } +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java new file mode 100644 index 000000000..4a0dbfeb8 --- /dev/null +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java @@ -0,0 +1,174 @@ +/* + * Copyright 2024 Conductor Authors. + *

+ * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + */ +package com.netflix.conductor.client.http; + +import java.io.BufferedInputStream; +import java.io.BufferedOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.net.HttpURLConnection; +import java.net.MalformedURLException; +import java.net.URI; +import java.net.URISyntaxException; +import java.net.URL; + +import org.jetbrains.annotations.NotNull; + +import com.netflix.conductor.client.exception.ConductorClientException; +import com.netflix.conductor.common.run.ExternalStorageLocation; +import com.netflix.conductor.common.utils.ExternalPayloadStorage; + +import com.fasterxml.jackson.core.type.TypeReference; +import lombok.extern.slf4j.Slf4j; + +/** An implementation of {@link ExternalPayloadStorage} for storing large JSON payload data. */ +@Slf4j +class PayloadStorage implements ExternalPayloadStorage { + + private static final int BUFFER_SIZE = 1024 * 32; + + private final ConductorClient client; + + PayloadStorage(ConductorClient client) { + this.client = client; + } + + @Override + public ExternalStorageLocation getLocation(Operation operation, PayloadType payloadType, String path) { + ConductorClientRequest request = ConductorClientRequest.builder() + .method(ConductorClientRequest.Method.GET) + .path("/{resource}/externalstoragelocation") + .addPathParam("resource", getResource(operation, payloadType)) + .addQueryParam("path", path) + .addQueryParam("operation", operation.toString()) + .addQueryParam("payloadType", payloadType.toString()) + .build(); + + ConductorClientResponse resp = client.execute(request, new TypeReference<>() { + }); + + return resp.getData(); + } + + @NotNull + private String getResource(Operation operation, PayloadType payloadType) { + switch (payloadType) { + case WORKFLOW_INPUT: + case WORKFLOW_OUTPUT: + return "workflow"; + case TASK_INPUT: + case TASK_OUTPUT: + return "tasks"; + } + + throw new ConductorClientException(String.format("Invalid payload type: %s for operation: %s", + payloadType, operation)); + } + + /** + * Uploads the payload to the uri specified. + * + * @param uri the location to which the object is to be uploaded + * @param payload an {@link InputStream} containing the json payload which is to be uploaded + * @param payloadSize the size of the json payload in bytes + * @throws ConductorClientException if the upload fails due to an invalid path or an error from + * external storage + */ + @Override + public void upload(String uri, InputStream payload, long payloadSize) { + HttpURLConnection connection = null; + try { + URL url = new URI(uri).toURL(); + + connection = (HttpURLConnection) url.openConnection(); + connection.setDoOutput(true); + connection.setRequestMethod("PUT"); + + try (BufferedOutputStream bufferedOutputStream = new BufferedOutputStream(connection.getOutputStream())) { + byte[] buffer = new byte[BUFFER_SIZE]; + int bytesRead; + long totalBytes = 0; + while ((bytesRead = payload.read(buffer)) != -1) { + bufferedOutputStream.write(buffer, 0, bytesRead); + totalBytes += bytesRead; + } + bufferedOutputStream.flush(); + + int responseCode = connection.getResponseCode(); + if (!isSuccessful(responseCode)) { + String errorMsg = String.format("Unable to upload. Response code: %d", responseCode); + log.error(errorMsg); + throw new ConductorClientException(errorMsg); + } + log.debug("Uploaded {} bytes to uri: {}, with HTTP response code: {}", totalBytes, uri, responseCode); + } + } catch (URISyntaxException | MalformedURLException e) { + String errorMsg = String.format("Invalid path specified: %s", uri); + log.error(errorMsg, e); + throw new ConductorClientException(e); + } catch (IOException e) { + String errorMsg = String.format("Error uploading to path: %s", uri); + log.error(errorMsg, e); + throw new ConductorClientException(e); + } finally { + if (connection != null) { + connection.disconnect(); + } + try { + if (payload != null) { + payload.close(); + } + } catch (IOException e) { + log.warn("Unable to close input stream when uploading to uri: {}", uri); + } + } + } + + /** + * Downloads the payload from the given uri. + * + * @param uri the location from where the object is to be downloaded + * @return an inputstream of the payload in the external storage + * @throws ConductorClientException if the download fails due to an invalid path or an error + * from external storage + */ + @Override + public InputStream download(String uri) { + try { + URL url = new URI(uri).toURL(); + HttpURLConnection connection = (HttpURLConnection) url.openConnection(); + connection.setDoOutput(false); + + int responseCode = connection.getResponseCode(); + if (responseCode == HttpURLConnection.HTTP_OK) { + log.debug("Download completed with HTTP response code: {}", connection.getResponseCode()); + return new BufferedInputStream(connection.getInputStream()); + } + String errorMsg = String.format("Unable to download. Response code: %d", responseCode); + log.error(errorMsg); + throw new ConductorClientException(errorMsg); + } catch (URISyntaxException | MalformedURLException e) { + String errorMsg = String.format("Invalid uri specified: %s", uri); + log.error(errorMsg, e); + throw new ConductorClientException(e); + } catch (IOException e) { + String errorMsg = String.format("Error downloading from uri: %s", uri); + log.error(errorMsg, e); + throw new ConductorClientException(e); + } + } + + private boolean isSuccessful(int responseCode) { + return responseCode >= 200 && responseCode < 300; + } +} diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 279028a12..4b0ec5851 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -12,6 +12,10 @@ */ package com.netflix.conductor.client.http; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.util.List; import java.util.Map; import java.util.Optional; @@ -19,28 +23,58 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import com.netflix.conductor.client.config.ConductorClientConfiguration; +import com.netflix.conductor.client.config.DefaultConductorClientConfiguration; +import com.netflix.conductor.client.events.dispatcher.EventDispatcher; +import com.netflix.conductor.client.events.listeners.ListenerRegister; +import com.netflix.conductor.client.events.listeners.TaskClientListener; +import com.netflix.conductor.client.events.task.TaskClientEvent; +import com.netflix.conductor.client.events.task.TaskPayloadUsedEvent; +import com.netflix.conductor.client.events.task.TaskResultPayloadSizeEvent; +import com.netflix.conductor.client.exception.ConductorClientException; import com.netflix.conductor.client.http.ConductorClientRequest.Method; +import com.netflix.conductor.common.config.ObjectMapperProvider; import com.netflix.conductor.common.metadata.tasks.PollData; import com.netflix.conductor.common.metadata.tasks.Task; import com.netflix.conductor.common.metadata.tasks.TaskExecLog; import com.netflix.conductor.common.metadata.tasks.TaskResult; +import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.TaskSummary; import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; /** Client for conductor task management including polling for task, updating task status etc. */ +@Slf4j public final class TaskClient { + private final ObjectMapper objectMapper = new ObjectMapperProvider().getObjectMapper(); + + private final ConductorClientConfiguration conductorClientConfiguration; + + private final EventDispatcher eventDispatcher = new EventDispatcher<>(); + + private PayloadStorage payloadStorage; + private ConductorClient client; /** Creates a default task client */ public TaskClient() { + // client will be set once root uri is set + this(null, new DefaultConductorClientConfiguration()); } public TaskClient(ConductorClient client) { + this(client, new DefaultConductorClientConfiguration()); + } + + public TaskClient(ConductorClient client, ConductorClientConfiguration config) { this.client = client; + this.payloadStorage = new PayloadStorage(client); + this.conductorClientConfiguration = config; } /** @@ -54,6 +88,11 @@ public void setRootURI(String rootUri) { client.shutdown(); } client = new ConductorClient(rootUri); + payloadStorage = new PayloadStorage(client); + } + + public void registerListener(TaskClientListener listener) { + ListenerRegister.register(listener, eventDispatcher); } /** @@ -143,11 +182,36 @@ public void updateTask(TaskResult taskResult) { client.execute(request); } - //TODO FIXME OSS MISMATCH - https://github.com/conductor-oss/conductor-java-sdk/issues/27 public Optional evaluateAndUploadLargePayload(Map taskOutputData, String taskType) { - throw new UnsupportedOperationException("No external storage support YET"); - } + if (!conductorClientConfiguration.isEnforceThresholds()) { + return Optional.empty(); + } + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + objectMapper.writeValue(byteArrayOutputStream, taskOutputData); + byte[] taskOutputBytes = byteArrayOutputStream.toByteArray(); + long taskResultSize = taskOutputBytes.length; + eventDispatcher.publish(new TaskResultPayloadSizeEvent(taskType, taskResultSize)); + long payloadSizeThreshold = conductorClientConfiguration.getTaskOutputPayloadThresholdKB() * 1024L; + if (taskResultSize > payloadSizeThreshold) { + if (!conductorClientConfiguration.isExternalPayloadStorageEnabled() || taskResultSize + > conductorClientConfiguration.getTaskOutputMaxPayloadThresholdKB() * 1024L) { + throw new IllegalArgumentException( + String.format("The TaskResult payload size: %d is greater than the permissible %d bytes", + taskResultSize, payloadSizeThreshold)); + } + eventDispatcher.publish(new TaskPayloadUsedEvent(taskType, + ExternalPayloadStorage.Operation.WRITE.name(), + ExternalPayloadStorage.PayloadType.TASK_OUTPUT.name())); + return Optional.of(uploadToExternalPayloadStorage(taskOutputBytes, taskResultSize)); + } + return Optional.empty(); + } catch (IOException e) { + String errorMsg = String.format("Unable to update task: %s with task result", taskType); + log.error(errorMsg, e); + throw new ConductorClientException(e); + } + } /** * Ack for the task poll. * @@ -431,12 +495,31 @@ public SearchResult searchV2(Integer start, Integer size, String sort, Str return resp.getData(); } - //TODO FIXME OSS MISMATCH - https://github.com/conductor-oss/conductor-java-sdk/issues/27 - //implement populateTaskPayloads - Download from external Storage and set input and output of task private void populateTaskPayloads(Task task) { - if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath()) - || StringUtils.isNotBlank(task.getExternalOutputPayloadStoragePath())) { - throw new UnsupportedOperationException("No external storage support"); + if (!conductorClientConfiguration.isEnforceThresholds()) { + return; + } + + if (StringUtils.isNotBlank(task.getExternalInputPayloadStoragePath())) { + eventDispatcher.publish(new TaskPayloadUsedEvent(task.getTaskDefName(), + ExternalPayloadStorage.Operation.READ.name(), + ExternalPayloadStorage.PayloadType.TASK_INPUT.name())); + task.setInputData( + downloadFromExternalStorage( + ExternalPayloadStorage.PayloadType.TASK_INPUT, + task.getExternalInputPayloadStoragePath())); + task.setExternalInputPayloadStoragePath(null); + } + + if (StringUtils.isNotBlank(task.getExternalOutputPayloadStoragePath())) { + eventDispatcher.publish(new TaskPayloadUsedEvent(task.getTaskDefName(), + ExternalPayloadStorage.Operation.READ.name(), + ExternalPayloadStorage.PayloadType.TASK_OUTPUT.name())); + task.setOutputData( + downloadFromExternalStorage( + ExternalPayloadStorage.PayloadType.TASK_OUTPUT, + task.getExternalOutputPayloadStoragePath())); + task.setExternalOutputPayloadStoragePath(null); } } @@ -456,4 +539,28 @@ private List batchPoll(String taskType, String workerid, String domain, In return resp.getData(); } + + private String uploadToExternalPayloadStorage(byte[] payloadBytes, long payloadSize) { + ExternalStorageLocation externalStorageLocation = + payloadStorage.getLocation(ExternalPayloadStorage.Operation.WRITE, ExternalPayloadStorage.PayloadType.TASK_OUTPUT, ""); + payloadStorage.upload( + externalStorageLocation.getUri(), + new ByteArrayInputStream(payloadBytes), + payloadSize); + return externalStorageLocation.getPath(); + } + + @SuppressWarnings("unchecked") + private Map downloadFromExternalStorage(ExternalPayloadStorage.PayloadType payloadType, String path) { + Validate.notBlank(path, "uri cannot be blank"); + ExternalStorageLocation externalStorageLocation = payloadStorage.getLocation(ExternalPayloadStorage.Operation.READ, + payloadType, path); + try (InputStream inputStream = payloadStorage.download(externalStorageLocation.getUri())) { + return objectMapper.readValue(inputStream, Map.class); + } catch (IOException e) { + String errorMsg = String.format("Unable to download payload from external storage location: %s", path); + log.error(errorMsg, e); + throw new ConductorClientException(e); + } + } } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 8330440d7..bb5ae9048 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -12,16 +12,33 @@ */ package com.netflix.conductor.client.http; +import java.io.ByteArrayInputStream; +import java.io.ByteArrayOutputStream; +import java.io.IOException; +import java.io.InputStream; import java.util.List; +import java.util.Map; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import com.netflix.conductor.client.config.ConductorClientConfiguration; +import com.netflix.conductor.client.config.DefaultConductorClientConfiguration; +import com.netflix.conductor.client.events.dispatcher.EventDispatcher; +import com.netflix.conductor.client.events.listeners.ListenerRegister; +import com.netflix.conductor.client.events.listeners.WorkflowClientListener; +import com.netflix.conductor.client.events.workflow.WorkflowClientEvent; +import com.netflix.conductor.client.events.workflow.WorkflowInputPayloadSizeEvent; +import com.netflix.conductor.client.events.workflow.WorkflowPayloadUsedEvent; +import com.netflix.conductor.client.events.workflow.WorkflowStartedEvent; +import com.netflix.conductor.client.exception.ConductorClientException; import com.netflix.conductor.client.http.ConductorClientRequest.Method; +import com.netflix.conductor.common.config.ObjectMapperProvider; import com.netflix.conductor.common.metadata.workflow.RerunWorkflowRequest; import com.netflix.conductor.common.metadata.workflow.SkipTaskRequest; import com.netflix.conductor.common.metadata.workflow.StartWorkflowRequest; import com.netflix.conductor.common.model.BulkResponse; +import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.run.SearchResult; import com.netflix.conductor.common.run.Workflow; import com.netflix.conductor.common.run.WorkflowSummary; @@ -29,18 +46,36 @@ import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import lombok.extern.slf4j.Slf4j; - +@Slf4j public final class WorkflowClient { + private final ObjectMapper objectMapper = new ObjectMapperProvider().getObjectMapper(); + + private final ConductorClientConfiguration conductorClientConfiguration; + + private final EventDispatcher eventDispatcher = new EventDispatcher<>(); + private ConductorClient client; + private PayloadStorage payloadStorage; + /** Creates a default workflow client */ public WorkflowClient() { + // client will be set once root uri is set + this(null, new DefaultConductorClientConfiguration()); } public WorkflowClient(ConductorClient client) { + this(client, new DefaultConductorClientConfiguration()); + } + + public WorkflowClient(ConductorClient client, ConductorClientConfiguration config) { this.client = client; + this.payloadStorage = new PayloadStorage(client); + this.conductorClientConfiguration = config; } /** @@ -54,6 +89,11 @@ public void setRootURI(String rootUri) { client.shutdown(); } client = new ConductorClient(rootUri); + payloadStorage = new PayloadStorage(client); + } + + public void registerListener(WorkflowClientListener listener) { + ListenerRegister.register(listener, eventDispatcher); } /** @@ -70,6 +110,10 @@ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) { StringUtils.isBlank(startWorkflowRequest.getExternalInputPayloadStoragePath()), "External Storage Path must not be set"); + if (conductorClientConfiguration.isEnforceThresholds()) { + checkAndUploadToExternalStorage(startWorkflowRequest); + } + ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) .path("/workflow") @@ -79,9 +123,59 @@ public String startWorkflow(StartWorkflowRequest startWorkflowRequest) { ConductorClientResponse resp = client.execute(request, new TypeReference<>() { }); + eventDispatcher.publish(new WorkflowStartedEvent(startWorkflowRequest.getName(), startWorkflowRequest.getVersion())); return resp.getData(); } + private void checkAndUploadToExternalStorage(StartWorkflowRequest startWorkflowRequest) { + try (ByteArrayOutputStream byteArrayOutputStream = new ByteArrayOutputStream()) { + objectMapper.writeValue(byteArrayOutputStream, startWorkflowRequest.getInput()); + byte[] workflowInputBytes = byteArrayOutputStream.toByteArray(); + long workflowInputSize = workflowInputBytes.length; + eventDispatcher.publish(new WorkflowInputPayloadSizeEvent(startWorkflowRequest.getName(), + startWorkflowRequest.getVersion(), workflowInputSize)); + + if (workflowInputSize > conductorClientConfiguration.getWorkflowInputPayloadThresholdKB() * 1024L) { + if (!conductorClientConfiguration.isExternalPayloadStorageEnabled() || + (workflowInputSize > conductorClientConfiguration.getWorkflowInputMaxPayloadThresholdKB() * 1024L)) { + String errorMsg = String.format("Input payload larger than the allowed threshold of: %d KB", + conductorClientConfiguration.getWorkflowInputPayloadThresholdKB()); + throw new ConductorClientException(errorMsg); + } else { + eventDispatcher.publish(new WorkflowPayloadUsedEvent(startWorkflowRequest.getName(), + startWorkflowRequest.getVersion(), + ExternalPayloadStorage.Operation.WRITE.name(), + ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT.name())); + + String externalStoragePath = uploadToExternalPayloadStorage( + workflowInputBytes, + workflowInputSize); + startWorkflowRequest.setExternalInputPayloadStoragePath(externalStoragePath); + startWorkflowRequest.setInput(null); + } + } + } catch (IOException e) { + String errorMsg = String.format("Unable to start workflow:%s, version:%s", + startWorkflowRequest.getName(), startWorkflowRequest.getVersion()); + log.error(errorMsg, e); + + eventDispatcher.publish(new WorkflowStartedEvent(startWorkflowRequest.getName(), + startWorkflowRequest.getVersion(), false, e)); + + throw new ConductorClientException(e); + } + } + + private String uploadToExternalPayloadStorage(byte[] payloadBytes, long payloadSize) { + ExternalStorageLocation externalStorageLocation = + payloadStorage.getLocation(ExternalPayloadStorage.Operation.WRITE, ExternalPayloadStorage.PayloadType.WORKFLOW_INPUT, ""); + payloadStorage.upload( + externalStorageLocation.getUri(), + new ByteArrayInputStream(payloadBytes), + payloadSize); + return externalStorageLocation.getPath(); + } + /** * Retrieve a workflow by workflow id * @@ -465,7 +559,6 @@ public Workflow testWorkflow(WorkflowTestRequest testRequest) { return resp.getData(); } - /** * Populates the workflow output from external payload storage if the external storage path is * specified. @@ -473,9 +566,26 @@ public Workflow testWorkflow(WorkflowTestRequest testRequest) { * @param workflow the workflow for which the output is to be populated. */ private void populateWorkflowOutput(Workflow workflow) { - //TODO FIXME OSS MISMATCH - https://github.com/conductor-oss/conductor-java-sdk/issues/27 if (StringUtils.isNotBlank(workflow.getExternalOutputPayloadStoragePath())) { - throw new UnsupportedOperationException("No external storage support"); + eventDispatcher.publish(new WorkflowPayloadUsedEvent(workflow.getWorkflowName(), + workflow.getWorkflowVersion(), + ExternalPayloadStorage.Operation.READ.name(), + ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT.name())); + workflow.setOutput(downloadFromExternalStorage(workflow.getExternalOutputPayloadStoragePath())); + } + } + + @SuppressWarnings("unchecked") + private Map downloadFromExternalStorage(String path) { + Validate.notBlank(path, "uri cannot be blank"); + ExternalStorageLocation externalStorageLocation = payloadStorage.getLocation(ExternalPayloadStorage.Operation.READ, + ExternalPayloadStorage.PayloadType.WORKFLOW_OUTPUT, path); + try (InputStream inputStream = payloadStorage.download(externalStorageLocation.getUri())) { + return objectMapper.readValue(inputStream, Map.class); + } catch (IOException e) { + String errorMsg = String.format("Unable to download payload from external storage location: %s", path); + log.error(errorMsg, e); + throw new ConductorClientException(e); } } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/metrics/MetricsCollector.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/metrics/MetricsCollector.java index 48275d8f7..82cc1d188 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/metrics/MetricsCollector.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/metrics/MetricsCollector.java @@ -12,24 +12,10 @@ */ package com.netflix.conductor.client.metrics; -import com.netflix.conductor.client.automator.events.PollCompleted; -import com.netflix.conductor.client.automator.events.PollFailure; -import com.netflix.conductor.client.automator.events.PollStarted; -import com.netflix.conductor.client.automator.events.TaskExecutionCompleted; -import com.netflix.conductor.client.automator.events.TaskExecutionFailure; -import com.netflix.conductor.client.automator.events.TaskExecutionStarted; +import com.netflix.conductor.client.events.listeners.TaskClientListener; +import com.netflix.conductor.client.events.listeners.TaskRunnerEventsListener; +import com.netflix.conductor.client.events.listeners.WorkflowClientListener; -public interface MetricsCollector { +public interface MetricsCollector extends TaskRunnerEventsListener, WorkflowClientListener, TaskClientListener { - void consume(PollFailure e); - - void consume(PollCompleted e); - - void consume(PollStarted e); - - void consume(TaskExecutionStarted e); - - void consume(TaskExecutionCompleted e); - - void consume(TaskExecutionFailure e); } diff --git a/conductor-clients/java/conductor-java-sdk/examples/src/main/java/com/netflix/conductor/sdk/examples/events/EventListenerExample.java b/conductor-clients/java/conductor-java-sdk/examples/src/main/java/com/netflix/conductor/sdk/examples/events/EventListenerExample.java index 8d437ebb0..9d4368bd4 100644 --- a/conductor-clients/java/conductor-java-sdk/examples/src/main/java/com/netflix/conductor/sdk/examples/events/EventListenerExample.java +++ b/conductor-clients/java/conductor-java-sdk/examples/src/main/java/com/netflix/conductor/sdk/examples/events/EventListenerExample.java @@ -17,9 +17,9 @@ import java.util.List; import com.netflix.conductor.client.automator.TaskRunnerConfigurer; -import com.netflix.conductor.client.automator.events.PollCompleted; -import com.netflix.conductor.client.automator.events.PollFailure; -import com.netflix.conductor.client.automator.events.TaskExecutionFailure; +import com.netflix.conductor.client.events.taskrunner.PollCompleted; +import com.netflix.conductor.client.events.taskrunner.PollFailure; +import com.netflix.conductor.client.events.taskrunner.TaskExecutionFailure; import com.netflix.conductor.client.http.TaskClient; import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.common.metadata.tasks.Task; diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java index 73d9ebe68..3bba49946 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesTaskClient.java @@ -17,7 +17,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import org.apache.commons.lang3.Validate; @@ -153,10 +152,6 @@ public void updateTask(TaskResult taskResult) { taskClient.updateTask(taskResult); } - public Optional evaluateAndUploadLargePayload(Map taskOutputData, String taskType) { - return taskClient.evaluateAndUploadLargePayload(taskOutputData, taskType); - } - public void logMessageForTask(String taskId, String logMessage) { taskClient.logMessageForTask(taskId, logMessage); }