From b4a595f19828cc31b702479794eab3173f41546d Mon Sep 17 00:00:00 2001 From: Martini <110882768+martini612@users.noreply.github.com> Date: Mon, 15 Apr 2024 14:15:04 +0900 Subject: [PATCH 01/25] if status-listener error, task not executed --- .../netflix/conductor/service/ExecutionService.java | 13 ++++++++++++- 1 file changed, 12 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java index 2c985dec0..4fb3f78ea 100644 --- a/core/src/main/java/com/netflix/conductor/service/ExecutionService.java +++ b/core/src/main/java/com/netflix/conductor/service/ExecutionService.java @@ -189,7 +189,18 @@ public List poll( .map(executionDAOFacade::getTaskModel) .filter(Objects::nonNull) .filter(task -> TaskModel.Status.IN_PROGRESS.equals(task.getStatus())) - .forEach(taskStatusListener::onTaskInProgress); + .forEach( + task -> { + try { + taskStatusListener.onTaskInProgress(task); + } catch (Exception e) { + String errorMsg = + String.format( + "Error while notifying TaskStatusListener: %s for workflow: %s", + task.getTaskId(), task.getWorkflowInstanceId()); + LOGGER.error(errorMsg, e); + } + }); executionDAOFacade.updateTaskLastPoll(taskType, domain, workerId); Monitors.recordTaskPoll(queueName); tasks.forEach(this::ackTaskReceived); From 3c428d229e191b767140e636730a67d0ddf0d507 Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Wed, 31 Jul 2024 11:38:43 +0900 Subject: [PATCH 02/25] Address missing time part for task logs. (cherry picked from commit c84b5f4bd94cf4a1afb32d2e3e7996849c0b0728) --- .../com/netflix/conductor/postgres/dao/PostgresIndexDAO.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java index 674fcd02d..b0481a293 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresIndexDAO.java @@ -233,7 +233,7 @@ public List getTaskExecutionLogs(String taskId) { log.setLog(rs.getString("log")); log.setTaskId(rs.getString("task_id")); log.setCreatedTime( - rs.getDate("created_time").getTime()); + rs.getTimestamp("created_time").getTime()); result.add(log); } return result; From d94115741d9ad81adc2061422b1eabf58bcdaa7b Mon Sep 17 00:00:00 2001 From: Matthew Avery Date: Thu, 8 Aug 2024 16:20:39 -0400 Subject: [PATCH 03/25] Add beans and deserializers to @WorkerTask methods. --- .../sdk/workflow/executor/WorkflowExecutor.java | 9 +++++++++ .../sdk/workflow/executor/task/AnnotatedWorker.java | 5 +++++ .../executor/task/AnnotatedWorkerExecutor.java | 10 ++++++++++ 3 files changed, 24 insertions(+) diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index 15ba64683..d80d55928 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -36,6 +36,7 @@ import com.netflix.conductor.sdk.workflow.utils.ObjectMapperProvider; import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; import com.sun.jersey.api.client.ClientHandler; import com.sun.jersey.api.client.config.DefaultClientConfig; @@ -241,4 +242,12 @@ public TaskClient getTaskClient() { public WorkflowClient getWorkflowClient() { return workflowClient; } + + public void addBean(Object bean) { + annotatedWorkerExecutor.addBean(bean); + } + + public void registerModule(Module module) { + annotatedWorkerExecutor.registerModule(module); + } } diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorker.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorker.java index 51b8abcff..57a3a8241 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorker.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorker.java @@ -27,6 +27,7 @@ import com.netflix.conductor.sdk.workflow.utils.ObjectMapperProvider; import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; public class AnnotatedWorker implements Worker { @@ -51,6 +52,10 @@ public AnnotatedWorker(String name, Method workerMethod, Object obj) { om.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false); } + void registerModule(Module module) { + om.registerModule(module); + } + @Override public String getTaskDefName() { return name; diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java index 56c06003b..fe237f867 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/task/AnnotatedWorkerExecutor.java @@ -23,6 +23,7 @@ import com.netflix.conductor.client.worker.Worker; import com.netflix.conductor.sdk.workflow.task.WorkerTask; +import com.fasterxml.jackson.databind.Module; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Strings; import com.google.common.reflect.ClassPath; @@ -51,6 +52,8 @@ public class AnnotatedWorkerExecutor { private WorkerConfiguration workerConfiguration; + private Set modules = new HashSet<>(); + public AnnotatedWorkerExecutor(TaskClient taskClient) { this.taskClient = taskClient; this.workerConfiguration = new WorkerConfiguration(); @@ -189,6 +192,9 @@ public void startPolling() { (taskName, method) -> { Object obj = workerClassObjs.get(taskName); AnnotatedWorker executor = new AnnotatedWorker(taskName, method, obj); + for (Module module : modules) { + executor.registerModule(module); + } executor.setPollingInterval(workerToPollingInterval.get(taskName)); executors.add(executor); }); @@ -218,4 +224,8 @@ List getExecutors() { TaskRunnerConfigurer getTaskRunner() { return taskRunner; } + + public void registerModule(Module module) { + modules.add(module); + } } From 620ce9f0280df4c3054b4e9d06e6815bd0b2c512 Mon Sep 17 00:00:00 2001 From: Yarden Shoham Date: Mon, 12 Aug 2024 17:29:16 +0000 Subject: [PATCH 04/25] Fix monaco-editor not loading when hosting the app on a subpath It didn't take the base path into account Signed-off-by: Yarden Shoham --- ui/src/App.jsx | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/ui/src/App.jsx b/ui/src/App.jsx index 64292db5a..97427e1c0 100644 --- a/ui/src/App.jsx +++ b/ui/src/App.jsx @@ -27,6 +27,7 @@ import CustomRoutes from "./plugins/CustomRoutes"; import AppBarModules from "./plugins/AppBarModules"; import CustomAppBarButtons from "./plugins/CustomAppBarButtons"; import Workbench from "./pages/workbench/Workbench"; +import { getBasename } from "./utils/helpers"; const useStyles = makeStyles((theme) => ({ root: { @@ -136,5 +137,5 @@ export default function App() { if (process.env.REACT_APP_MONACO_EDITOR_USING_CDN === "false") { // Change the source of the monaco files, see https://github.com/suren-atoyan/monaco-react/issues/168#issuecomment-762336713 - loader.config({ paths: { vs: '/monaco-editor/min/vs' } }); + loader.config({ paths: { vs: getBasename() + 'monaco-editor/min/vs' } }); } From eddb905678a957d514390de73107534f1490ca71 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Sat, 17 Aug 2024 11:09:26 -0700 Subject: [PATCH 05/25] update orkes-queue version --- dependencies.gradle | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/dependencies.gradle b/dependencies.gradle index 0e3a83adb..9c6343a77 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -45,7 +45,7 @@ ext { revJsr311Api = '1.1.1' revMockServerClient = '5.12.0' revSpringDoc = '2.1.0' - revOrkesQueues = '1.0.7' + revOrkesQueues = '1.0.9' revPowerMock = '2.0.9' revProtoBuf = '3.21.12' revProtogenAnnotations = '1.0.0' From 9609a5004cfe3530de457754041676ea1b56f153 Mon Sep 17 00:00:00 2001 From: liivw <164842155+liivw@users.noreply.github.com> Date: Tue, 20 Aug 2024 11:56:32 +0800 Subject: [PATCH 06/25] Update index.md - fixed broken link --- docs/index.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/index.md b/docs/index.md index 89809f95c..5e1efe073 100644 --- a/docs/index.md +++ b/docs/index.md @@ -14,7 +14,7 @@ hide: Conductor is a platform originally created at Netflix to orchestrate workflows that span across microservices.
From feeb3470edb552804859ab3dfe15674f19929bbd Mon Sep 17 00:00:00 2001 From: Yong Sheng Tan Date: Fri, 23 Aug 2024 05:02:20 +0800 Subject: [PATCH 07/25] Fix allowing update task to modify creation date (#243) --- .../service/MetadataServiceImpl.java | 2 + .../service/MetadataServiceTest.java | 49 ++++++++++++++++--- 2 files changed, 43 insertions(+), 8 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java index d48b95a42..1399e9f08 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java @@ -82,6 +82,8 @@ public void updateTaskDef(TaskDef taskDefinition) { } taskDefinition.setUpdatedBy(WorkflowContext.get().getClientApp()); taskDefinition.setUpdateTime(System.currentTimeMillis()); + taskDefinition.setCreateTime(existing.getCreateTime()); + taskDefinition.setCreatedBy(existing.getCreatedBy()); metadataDAO.updateTaskDef(taskDefinition); } diff --git a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java index e4f827340..255703be5 100644 --- a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java @@ -16,6 +16,7 @@ import org.junit.Test; import org.junit.runner.RunWith; +import org.mockito.stubbing.Answer; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.boot.autoconfigure.EnableAutoConfiguration; import org.springframework.boot.test.context.TestConfiguration; @@ -54,7 +55,6 @@ public class MetadataServiceTest { @TestConfiguration static class TestMetadataConfiguration { - @Bean public MetadataDAO metadataDAO() { return mock(MetadataDAO.class); @@ -72,8 +72,23 @@ public MetadataService metadataService( MetadataDAO metadataDAO, ConductorProperties properties) { EventHandlerDAO eventHandlerDAO = mock(EventHandlerDAO.class); + Map taskDefinitions = new HashMap<>(); + when(metadataDAO.getAllWorkflowDefs()).thenReturn(mockWorkflowDefs()); + Answer upsertTaskDef = + (invocation) -> { + TaskDef argument = invocation.getArgument(0, TaskDef.class); + taskDefinitions.put(argument.getName(), argument); + return argument; + }; + when(metadataDAO.createTaskDef(any(TaskDef.class))).then(upsertTaskDef); + when(metadataDAO.updateTaskDef(any(TaskDef.class))).then(upsertTaskDef); + when(metadataDAO.getTaskDef(any())) + .then( + invocation -> + taskDefinitions.get(invocation.getArgument(0, String.class))); + return new MetadataServiceImpl(metadataDAO, eventHandlerDAO, properties); } @@ -175,7 +190,6 @@ public void testUpdateTaskDefNotExisting() { TaskDef taskDef = new TaskDef(); taskDef.setName("test"); taskDef.setOwnerEmail("sample@test.com"); - when(metadataDAO.getTaskDef(any())).thenReturn(null); metadataService.updateTaskDef(taskDef); } @@ -184,7 +198,6 @@ public void testUpdateTaskDefDaoException() { TaskDef taskDef = new TaskDef(); taskDef.setName("test"); taskDef.setOwnerEmail("sample@test.com"); - when(metadataDAO.getTaskDef(any())).thenReturn(null); metadataService.updateTaskDef(taskDef); } @@ -198,6 +211,27 @@ public void testRegisterTaskDef() { verify(metadataDAO, times(1)).createTaskDef(any(TaskDef.class)); } + @Test + public void testUpdateTask() { + String taskDefName = "another-task"; + TaskDef taskDef = new TaskDef(); + taskDef.setName(taskDefName); + taskDef.setOwnerEmail("sample@test.com"); + taskDef.setRetryCount(1); + metadataService.registerTaskDef(Collections.singletonList(taskDef)); + TaskDef before = metadataService.getTaskDef(taskDefName); + + taskDef.setRetryCount(2); + taskDef.setCreatedBy("someone-else"); + taskDef.setCreateTime(1000L); + metadataService.updateTaskDef(taskDef); + verify(metadataDAO, times(1)).updateTaskDef(any(TaskDef.class)); + + TaskDef after = metadataService.getTaskDef(taskDefName); + assertEquals(2, after.getRetryCount()); + assertEquals(before.getCreateTime(), after.getCreateTime()); + } + @Test(expected = ConstraintViolationException.class) public void testUpdateWorkflowDefNull() { try { @@ -272,7 +306,6 @@ public void testUpdateWorkflowDef() { workflowTask.setName("hello"); tasks.add(workflowTask); workflowDef.setTasks(tasks); - when(metadataDAO.getTaskDef(any())).thenReturn(new TaskDef()); metadataService.updateWorkflowDef(Collections.singletonList(workflowDef)); verify(metadataDAO, times(1)).updateWorkflowDef(workflowDef); } @@ -302,7 +335,7 @@ public void testUpdateWorkflowDefWithCaseExpression() { workflowTask.setCaseExpression("1 >0abcd"); tasks.add(workflowTask); workflowDef.setTasks(tasks); - when(metadataDAO.getTaskDef(any())).thenReturn(new TaskDef()); + BulkResponse bulkResponse = metadataService.updateWorkflowDef(Collections.singletonList(workflowDef)); } @@ -332,7 +365,7 @@ public void testUpdateWorkflowDefWithJavscriptEvaluator() { workflowTask.setDecisionCases(decisionCases); tasks.add(workflowTask); workflowDef.setTasks(tasks); - when(metadataDAO.getTaskDef(any())).thenReturn(new TaskDef()); + BulkResponse bulkResponse = metadataService.updateWorkflowDef(Collections.singletonList(workflowDef)); } @@ -419,7 +452,7 @@ public void testRegisterWorkflowDef() { workflowTask.setName("hello"); tasks.add(workflowTask); workflowDef.setTasks(tasks); - when(metadataDAO.getTaskDef(any())).thenReturn(new TaskDef()); + metadataService.registerWorkflowDef(workflowDef); verify(metadataDAO, times(1)).createWorkflowDef(workflowDef); assertEquals(2, workflowDef.getSchemaVersion()); @@ -437,7 +470,7 @@ public void testValidateWorkflowDef() { workflowTask.setName("hello"); tasks.add(workflowTask); workflowDef.setTasks(tasks); - when(metadataDAO.getTaskDef(any())).thenReturn(new TaskDef()); + metadataService.validateWorkflowDef(workflowDef); verify(metadataDAO, times(1)).createWorkflowDef(workflowDef); assertEquals(2, workflowDef.getSchemaVersion()); From 1815e13b79c6e630a5456664630e0369940df59a Mon Sep 17 00:00:00 2001 From: David Gracza <37271364+gr4cza@users.noreply.github.com> Date: Thu, 8 Aug 2024 23:47:39 +0200 Subject: [PATCH 08/25] Update javax dependencies in the client to jakarta Replaced `javax.ws.rs` with `jakarta.ws.rs` and updated related imports and client request handling logic. Modified several classes to use Jakarta libraries instead of deprecated javax libraries and updated dependencies to the latest versions. --- client/build.gradle | 6 +- .../conductor/client/http/ClientBase.java | 141 +++++++----------- .../client/http/ClientRequestHandler.java | 58 +++---- .../conductor/client/http/EventClient.java | 39 ++--- .../conductor/client/http/MetadataClient.java | 34 ++--- .../conductor/client/http/PayloadStorage.java | 9 +- .../conductor/client/http/TaskClient.java | 34 ++--- .../conductor/client/http/WorkflowClient.java | 34 ++--- .../client/http/EventClientSpec.groovy | 25 +++- .../client/http/MetadataClientSpec.groovy | 12 +- .../client/http/TaskClientSpec.groovy | 18 +-- .../client/http/WorkflowClientSpec.groovy | 18 +-- dependencies.gradle | 8 +- java-sdk/build.gradle | 5 +- .../workflow/executor/WorkflowExecutor.java | 15 +- test-harness/build.gradle | 2 +- 16 files changed, 177 insertions(+), 281 deletions(-) diff --git a/client/build.gradle b/client/build.gradle index 2ffd092ed..03ce9ca78 100644 --- a/client/build.gradle +++ b/client/build.gradle @@ -19,8 +19,8 @@ dependencies { compileOnly 'org.jetbrains:annotations:23.0.0' implementation project(':conductor-common') - implementation "com.sun.jersey:jersey-client:${revJersey}" - implementation "javax.ws.rs:javax.ws.rs-api:${revJAXRS}" + implementation "org.glassfish.jersey.core:jersey-client:${revJersey}" + implementation "jakarta.ws.rs:jakarta.ws.rs-api:${revJAXRS}" implementation "org.glassfish.jersey.core:jersey-common:${revJerseyCommon}" implementation "com.netflix.spectator:spectator-api:${revSpectator}" @@ -29,7 +29,7 @@ dependencies { } implementation "com.amazonaws:aws-java-sdk-core:${revAwsSdk}" - implementation "com.fasterxml.jackson.jaxrs:jackson-jaxrs-json-provider" + implementation "com.fasterxml.jackson.jakarta.rs:jackson-jakarta-rs-json-provider" implementation "com.fasterxml.jackson.datatype:jackson-datatype-jsr310" implementation "org.apache.commons:commons-lang3" diff --git a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java index 5326abe4d..42d4d7858 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java +++ b/client/src/main/java/com/netflix/conductor/client/http/ClientBase.java @@ -20,8 +20,6 @@ import java.util.Map; import java.util.function.Function; -import javax.ws.rs.core.UriBuilder; - import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.Validate; import org.slf4j.Logger; @@ -31,7 +29,6 @@ import com.netflix.conductor.client.config.DefaultConductorClientConfiguration; import com.netflix.conductor.client.exception.ConductorClientException; import com.netflix.conductor.common.config.ObjectMapperProvider; -import com.netflix.conductor.common.model.BulkResponse; import com.netflix.conductor.common.run.ExternalStorageLocation; import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.netflix.conductor.common.validation.ErrorResponse; @@ -39,11 +36,12 @@ import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.sun.jersey.api.client.ClientHandlerException; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.UniformInterfaceException; -import com.sun.jersey.api.client.WebResource.Builder; +import jakarta.ws.rs.client.Entity; +import jakarta.ws.rs.client.Invocation; +import jakarta.ws.rs.core.GenericType; +import jakarta.ws.rs.core.Response; +import jakarta.ws.rs.core.UriBuilder; +import lombok.Getter; /** Abstract client for the REST server */ public abstract class ClientBase { @@ -86,35 +84,40 @@ protected void delete(String url, Object... uriVariables) { protected void deleteWithUriVariables( Object[] queryParams, String url, Object... uriVariables) { - delete(queryParams, url, uriVariables, null); - } - - protected BulkResponse deleteWithRequestBody(Object[] queryParams, String url, Object body) { - return delete(queryParams, url, null, body); + delete(queryParams, url, uriVariables); } - private BulkResponse delete( - Object[] queryParams, String url, Object[] uriVariables, Object body) { + private void delete(Object[] queryParams, String url, Object[] uriVariables) { URI uri = null; - BulkResponse response = null; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - response = requestHandler.delete(uri, body); + Response response = requestHandler.delete(uri); + if (response.getStatus() >= 300) { + throw new UniformInterfaceException(response); + } } catch (UniformInterfaceException e) { handleUniformInterfaceException(e, uri); } catch (RuntimeException e) { handleRuntimeException(e, uri); } - return response; } protected void put(String url, Object[] queryParams, Object request, Object... uriVariables) { URI uri = null; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - requestHandler.getWebResourceBuilder(uri, request).put(); + Entity entity; + if (request != null) { + entity = Entity.json(request); + } else { + entity = Entity.text(""); + } + Response response = requestHandler.getWebResourceBuilder(uri).put(entity); + if (response.getStatus() >= 300) { + throw new UniformInterfaceException(response); + } } catch (RuntimeException e) { - handleException(uri, e); + handleException(e, uri); } } @@ -139,7 +142,7 @@ protected T postForEntity( request, queryParams, responseType, - builder -> builder.post(responseType), + builder -> builder.post(Entity.json(request), responseType), uriVariables); } @@ -154,7 +157,7 @@ protected T postForEntity( request, queryParams, responseType, - builder -> builder.post(responseType), + builder -> builder.post(Entity.json(request), responseType), uriVariables); } @@ -163,14 +166,17 @@ private T postForEntity( Object request, Object[] queryParams, Object responseType, - Function postWithEntity, + Function postWithEntity, Object... uriVariables) { URI uri = null; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - Builder webResourceBuilder = requestHandler.getWebResourceBuilder(uri, request); + Invocation.Builder webResourceBuilder = requestHandler.getWebResourceBuilder(uri); if (responseType == null) { - webResourceBuilder.post(); + Response response = webResourceBuilder.post(Entity.json(request)); + if (response.getStatus() >= 300) { + throw new UniformInterfaceException(response); + } return null; } return postWithEntity.apply(webResourceBuilder); @@ -185,29 +191,29 @@ private T postForEntity( protected T getForEntity( String url, Object[] queryParams, Class responseType, Object... uriVariables) { return getForEntity( - url, queryParams, response -> response.getEntity(responseType), uriVariables); + url, queryParams, response -> response.readEntity(responseType), uriVariables); } protected T getForEntity( String url, Object[] queryParams, GenericType responseType, Object... uriVariables) { return getForEntity( - url, queryParams, response -> response.getEntity(responseType), uriVariables); + url, queryParams, response -> response.readEntity(responseType), uriVariables); } private T getForEntity( String url, Object[] queryParams, - Function entityProvider, + Function entityProvider, Object... uriVariables) { URI uri = null; - ClientResponse clientResponse; + Response response; try { uri = getURIBuilder(root + url, queryParams).build(uriVariables); - clientResponse = requestHandler.get(uri); - if (clientResponse.getStatus() < 300) { - return entityProvider.apply(clientResponse); + response = requestHandler.get(uri); + if (response.getStatus() < 300) { + return entityProvider.apply(response); } else { - throw new UniformInterfaceException(clientResponse); + throw new UniformInterfaceException(response); } } catch (UniformInterfaceException e) { handleUniformInterfaceException(e, uri); @@ -296,15 +302,6 @@ protected boolean isNewerJacksonVersion() { return version.getMajorVersion() == 2 && version.getMinorVersion() >= 12; } - private void handleClientHandlerException(ClientHandlerException exception, URI uri) { - String errorMessage = - String.format( - "Unable to invoke Conductor API with uri: %s, failure to process request or response", - uri); - LOGGER.error(errorMessage, exception); - throw new ConductorClientException(errorMessage, exception); - } - private void handleRuntimeException(RuntimeException exception, URI uri) { String errorMessage = String.format( @@ -315,73 +312,49 @@ private void handleRuntimeException(RuntimeException exception, URI uri) { } private void handleUniformInterfaceException(UniformInterfaceException exception, URI uri) { - ClientResponse clientResponse = exception.getResponse(); - if (clientResponse == null) { - throw new ConductorClientException( - String.format("Unable to invoke Conductor API with uri: %s", uri)); - } - try { - if (clientResponse.getStatus() < 300) { + Response response = exception.getResponse(); + try (response) { + if (response == null) { + throw new ConductorClientException( + String.format("Unable to invoke Conductor API with uri: %s", uri)); + } + if (response.getStatus() < 300) { return; } - String errorMessage = clientResponse.getEntity(String.class); + String errorMessage = response.readEntity(String.class); LOGGER.warn( "Unable to invoke Conductor API with uri: {}, unexpected response from server: statusCode={}, responseBody='{}'.", uri, - clientResponse.getStatus(), + response.getStatus(), errorMessage); ErrorResponse errorResponse; try { errorResponse = objectMapper.readValue(errorMessage, ErrorResponse.class); } catch (IOException e) { - throw new ConductorClientException(clientResponse.getStatus(), errorMessage); + throw new ConductorClientException(response.getStatus(), errorMessage); } - throw new ConductorClientException(clientResponse.getStatus(), errorResponse); + throw new ConductorClientException(response.getStatus(), errorResponse); } catch (ConductorClientException e) { throw e; - } catch (ClientHandlerException e) { - handleClientHandlerException(e, uri); } catch (RuntimeException e) { handleRuntimeException(e, uri); - } finally { - clientResponse.close(); } } - private void handleException(URI uri, RuntimeException e) { + private void handleException(RuntimeException e, URI uri) { if (e instanceof UniformInterfaceException) { handleUniformInterfaceException(((UniformInterfaceException) e), uri); - } else if (e instanceof ClientHandlerException) { - handleClientHandlerException((ClientHandlerException) e, uri); } else { handleRuntimeException(e, uri); } } - /** - * Converts ClientResponse object to string with detailed debug information including status - * code, media type, response headers, and response body if exists. - */ - private String clientResponseToString(ClientResponse response) { - if (response == null) { - return null; - } - StringBuilder builder = new StringBuilder(); - builder.append("[status: ").append(response.getStatus()); - builder.append(", media type: ").append(response.getType()); - if (response.getStatus() != 404) { - try { - String responseBody = response.getEntity(String.class); - if (responseBody != null) { - builder.append(", response body: ").append(responseBody); - } - } catch (RuntimeException ignore) { - // Ignore if there is no response body, or IO error - it may have already been read - // in certain scenario. - } + @Getter + static class UniformInterfaceException extends RuntimeException { + private final Response response; + + public UniformInterfaceException(Response response) { + this.response = response; } - builder.append(", response headers: ").append(response.getHeaders()); - builder.append("]"); - return builder.toString(); } } diff --git a/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java b/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java index 164cb579b..527059ea5 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java +++ b/client/src/main/java/com/netflix/conductor/client/http/ClientRequestHandler.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Conductor Authors. + * Copyright 2024 Conductor Authors. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -14,27 +14,25 @@ import java.net.URI; -import javax.ws.rs.core.MediaType; +import org.glassfish.jersey.client.ClientConfig; import com.netflix.conductor.common.config.ObjectMapperProvider; -import com.netflix.conductor.common.model.BulkResponse; import com.fasterxml.jackson.core.Version; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; -import com.fasterxml.jackson.jaxrs.json.JacksonJsonProvider; -import com.sun.jersey.api.client.Client; -import com.sun.jersey.api.client.ClientHandler; -import com.sun.jersey.api.client.ClientResponse; -import com.sun.jersey.api.client.WebResource; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.filter.ClientFilter; +import com.fasterxml.jackson.jakarta.rs.json.JacksonJsonProvider; +import jakarta.ws.rs.client.Client; +import jakarta.ws.rs.client.ClientBuilder; +import jakarta.ws.rs.client.ClientRequestFilter; +import jakarta.ws.rs.client.Invocation; +import jakarta.ws.rs.core.MediaType; +import jakarta.ws.rs.core.Response; public class ClientRequestHandler { private final Client client; - public ClientRequestHandler( - ClientConfig config, ClientHandler handler, ClientFilter... filters) { + public ClientRequestHandler(ClientConfig config, ClientRequestFilter... filters) { ObjectMapper objectMapper = new ObjectMapperProvider().getObjectMapper(); // https://github.com/FasterXML/jackson-databind/issues/2683 @@ -43,40 +41,26 @@ public ClientRequestHandler( } JacksonJsonProvider provider = new JacksonJsonProvider(objectMapper); - config.getSingletons().add(provider); + config.register(provider); - if (handler == null) { - this.client = Client.create(config); - } else { - this.client = new Client(handler, config); - } + this.client = ClientBuilder.newClient(config); - for (ClientFilter filter : filters) { - this.client.addFilter(filter); + for (ClientRequestFilter filter : filters) { + this.client.register(filter); } } - public BulkResponse delete(URI uri, Object body) { - if (body != null) { - return client.resource(uri) - .type(MediaType.APPLICATION_JSON_TYPE) - .delete(BulkResponse.class, body); - } else { - client.resource(uri).delete(); - } - return null; + public Response delete(URI uri) { + return client.target(uri).request().delete(); } - public ClientResponse get(URI uri) { - return client.resource(uri) - .accept(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN) - .get(ClientResponse.class); + public Response get(URI uri) { + return client.target(uri).request(MediaType.APPLICATION_JSON, MediaType.TEXT_PLAIN).get(); } - public WebResource.Builder getWebResourceBuilder(URI URI, Object entity) { - return client.resource(URI) - .type(MediaType.APPLICATION_JSON) - .entity(entity) + public Invocation.Builder getWebResourceBuilder(URI uri) { + return client.target(uri) + .request(MediaType.APPLICATION_JSON) .accept(MediaType.TEXT_PLAIN, MediaType.APPLICATION_JSON); } diff --git a/client/src/main/java/com/netflix/conductor/client/http/EventClient.java b/client/src/main/java/com/netflix/conductor/client/http/EventClient.java index 1782d0177..5cf964189 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/EventClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/EventClient.java @@ -1,5 +1,5 @@ /* - * Copyright 2022 Conductor Authors. + * Copyright 2024 Conductor Authors. *

* Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at @@ -15,67 +15,50 @@ import java.util.List; import org.apache.commons.lang3.Validate; +import org.glassfish.jersey.client.ClientConfig; import com.netflix.conductor.client.config.ConductorClientConfiguration; import com.netflix.conductor.client.config.DefaultConductorClientConfiguration; import com.netflix.conductor.common.metadata.events.EventHandler; -import com.sun.jersey.api.client.ClientHandler; -import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.client.filter.ClientFilter; +import jakarta.ws.rs.client.ClientRequestFilter; +import jakarta.ws.rs.core.GenericType; // Client class for all Event Handler operations public class EventClient extends ClientBase { - private static final GenericType> eventHandlerList = - new GenericType>() {}; + private static final GenericType> eventHandlerList = new GenericType<>() {}; /** Creates a default metadata client */ public EventClient() { - this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null); + this(new ClientConfig(), new DefaultConductorClientConfiguration()); } /** * @param clientConfig REST Client configuration */ public EventClient(ClientConfig clientConfig) { - this(clientConfig, new DefaultConductorClientConfiguration(), null); - } - - /** - * @param clientConfig REST Client configuration - * @param clientHandler Jersey client handler. Useful when plugging in various http client - * interaction modules (e.g. ribbon) - */ - public EventClient(ClientConfig clientConfig, ClientHandler clientHandler) { - this(clientConfig, new DefaultConductorClientConfiguration(), clientHandler); + this(clientConfig, new DefaultConductorClientConfiguration()); } /** * @param config config REST Client configuration - * @param handler handler Jersey client handler. Useful when plugging in various http client - * interaction modules (e.g. ribbon) * @param filters Chain of client side filters to be applied per request */ - public EventClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) { - this(config, new DefaultConductorClientConfiguration(), handler, filters); + public EventClient(ClientConfig config, ClientRequestFilter... filters) { + this(config, new DefaultConductorClientConfiguration(), filters); } /** * @param config REST Client configuration * @param clientConfiguration Specific properties configured for the client, see {@link * ConductorClientConfiguration} - * @param handler Jersey client handler. Useful when plugging in various http client interaction - * modules (e.g. ribbon) * @param filters Chain of client side filters to be applied per request */ public EventClient( ClientConfig config, ConductorClientConfiguration clientConfiguration, - ClientHandler handler, - ClientFilter... filters) { - super(new ClientRequestHandler(config, handler, filters), clientConfiguration); + ClientRequestFilter... filters) { + super(new ClientRequestHandler(config, filters), clientConfiguration); } EventClient(ClientRequestHandler requestHandler) { diff --git a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java index 2db8df237..c2ae5e1cc 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/MetadataClient.java @@ -15,17 +15,15 @@ import java.util.List; import org.apache.commons.lang3.Validate; +import org.glassfish.jersey.client.ClientConfig; import com.netflix.conductor.client.config.ConductorClientConfiguration; import com.netflix.conductor.client.config.DefaultConductorClientConfiguration; import com.netflix.conductor.common.metadata.tasks.TaskDef; import com.netflix.conductor.common.metadata.workflow.WorkflowDef; -import com.sun.jersey.api.client.ClientHandler; -import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.client.filter.ClientFilter; +import jakarta.ws.rs.client.ClientRequestFilter; +import jakarta.ws.rs.core.GenericType; public class MetadataClient extends ClientBase { @@ -34,49 +32,35 @@ public class MetadataClient extends ClientBase { /** Creates a default metadata client */ public MetadataClient() { - this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null); + this(new ClientConfig(), new DefaultConductorClientConfiguration()); } /** * @param clientConfig REST Client configuration */ public MetadataClient(ClientConfig clientConfig) { - this(clientConfig, new DefaultConductorClientConfiguration(), null); - } - - /** - * @param clientConfig REST Client configuration - * @param clientHandler Jersey client handler. Useful when plugging in various http client - * interaction modules (e.g. ribbon) - */ - public MetadataClient(ClientConfig clientConfig, ClientHandler clientHandler) { - this(clientConfig, new DefaultConductorClientConfiguration(), clientHandler); + this(clientConfig, new DefaultConductorClientConfiguration()); } /** * @param config config REST Client configuration - * @param handler handler Jersey client handler. Useful when plugging in various http client - * interaction modules (e.g. ribbon) * @param filters Chain of client side filters to be applied per request */ - public MetadataClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) { - this(config, new DefaultConductorClientConfiguration(), handler, filters); + public MetadataClient(ClientConfig config, ClientRequestFilter... filters) { + this(config, new DefaultConductorClientConfiguration(), filters); } /** * @param config REST Client configuration * @param clientConfiguration Specific properties configured for the client, see {@link * ConductorClientConfiguration} - * @param handler Jersey client handler. Useful when plugging in various http client interaction - * modules (e.g. ribbon) * @param filters Chain of client side filters to be applied per request */ public MetadataClient( ClientConfig config, ConductorClientConfiguration clientConfiguration, - ClientHandler handler, - ClientFilter... filters) { - super(new ClientRequestHandler(config, handler, filters), clientConfiguration); + ClientRequestFilter... filters) { + super(new ClientRequestHandler(config, filters), clientConfiguration); } MetadataClient(ClientRequestHandler requestHandler) { diff --git a/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java b/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java index 668e50047..98f54cf0e 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java +++ b/client/src/main/java/com/netflix/conductor/client/http/PayloadStorage.java @@ -15,13 +15,7 @@ import java.io.BufferedOutputStream; import java.io.IOException; import java.io.InputStream; -import java.net.HttpURLConnection; -import java.net.MalformedURLException; -import java.net.URI; -import java.net.URISyntaxException; -import java.net.URL; - -import javax.ws.rs.core.Response; +import java.net.*; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -31,6 +25,7 @@ import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.amazonaws.util.IOUtils; +import jakarta.ws.rs.core.Response; /** An implementation of {@link ExternalPayloadStorage} for storing large JSON payload data. */ class PayloadStorage implements ExternalPayloadStorage { diff --git a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java index 35d48b2e3..7b4e120fb 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/TaskClient.java @@ -22,6 +22,7 @@ import org.apache.commons.lang3.ObjectUtils; import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import org.glassfish.jersey.client.ClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,11 +39,8 @@ import com.netflix.conductor.common.utils.ExternalPayloadStorage; import com.netflix.conductor.common.utils.ExternalPayloadStorage.PayloadType; -import com.sun.jersey.api.client.ClientHandler; -import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.client.filter.ClientFilter; +import jakarta.ws.rs.client.ClientRequestFilter; +import jakarta.ws.rs.core.GenericType; /** Client for conductor task management including polling for task, updating task status etc. */ public class TaskClient extends ClientBase { @@ -68,49 +66,35 @@ public class TaskClient extends ClientBase { /** Creates a default task client */ public TaskClient() { - this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null); + this(new ClientConfig(), new DefaultConductorClientConfiguration()); } /** * @param config REST Client configuration */ public TaskClient(ClientConfig config) { - this(config, new DefaultConductorClientConfiguration(), null); + this(config, new DefaultConductorClientConfiguration()); } /** * @param config REST Client configuration - * @param handler Jersey client handler. Useful when plugging in various http client interaction - * modules (e.g. ribbon) - */ - public TaskClient(ClientConfig config, ClientHandler handler) { - this(config, new DefaultConductorClientConfiguration(), handler); - } - - /** - * @param config REST Client configuration - * @param handler Jersey client handler. Useful when plugging in various http client interaction - * modules (e.g. ribbon) * @param filters Chain of client side filters to be applied per request */ - public TaskClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) { - this(config, new DefaultConductorClientConfiguration(), handler, filters); + public TaskClient(ClientConfig config, ClientRequestFilter... filters) { + this(config, new DefaultConductorClientConfiguration(), filters); } /** * @param config REST Client configuration * @param clientConfiguration Specific properties configured for the client, see {@link * ConductorClientConfiguration} - * @param handler Jersey client handler. Useful when plugging in various http client interaction - * modules (e.g. ribbon) * @param filters Chain of client side filters to be applied per request */ public TaskClient( ClientConfig config, ConductorClientConfiguration clientConfiguration, - ClientHandler handler, - ClientFilter... filters) { - super(new ClientRequestHandler(config, handler, filters), clientConfiguration); + ClientRequestFilter... filters) { + super(new ClientRequestHandler(config, filters), clientConfiguration); } TaskClient(ClientRequestHandler requestHandler) { diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 14deabff3..a16ae421f 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -18,6 +18,7 @@ import org.apache.commons.lang3.StringUtils; import org.apache.commons.lang3.Validate; +import org.glassfish.jersey.client.ClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -34,11 +35,8 @@ import com.netflix.conductor.common.run.WorkflowTestRequest; import com.netflix.conductor.common.utils.ExternalPayloadStorage; -import com.sun.jersey.api.client.ClientHandler; -import com.sun.jersey.api.client.GenericType; -import com.sun.jersey.api.client.config.ClientConfig; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.client.filter.ClientFilter; +import jakarta.ws.rs.client.ClientRequestFilter; +import jakarta.ws.rs.core.GenericType; public class WorkflowClient extends ClientBase { @@ -52,49 +50,35 @@ public class WorkflowClient extends ClientBase { /** Creates a default workflow client */ public WorkflowClient() { - this(new DefaultClientConfig(), new DefaultConductorClientConfiguration(), null); + this(new ClientConfig(), new DefaultConductorClientConfiguration()); } /** * @param config REST Client configuration */ public WorkflowClient(ClientConfig config) { - this(config, new DefaultConductorClientConfiguration(), null); + this(config, new DefaultConductorClientConfiguration()); } /** * @param config REST Client configuration - * @param handler Jersey client handler. Useful when plugging in various http client interaction - * modules (e.g. ribbon) - */ - public WorkflowClient(ClientConfig config, ClientHandler handler) { - this(config, new DefaultConductorClientConfiguration(), handler); - } - - /** - * @param config REST Client configuration - * @param handler Jersey client handler. Useful when plugging in various http client interaction - * modules (e.g. ribbon) * @param filters Chain of client side filters to be applied per request */ - public WorkflowClient(ClientConfig config, ClientHandler handler, ClientFilter... filters) { - this(config, new DefaultConductorClientConfiguration(), handler, filters); + public WorkflowClient(ClientConfig config, ClientRequestFilter... filters) { + this(config, new DefaultConductorClientConfiguration(), filters); } /** * @param config REST Client configuration * @param clientConfiguration Specific properties configured for the client, see {@link * ConductorClientConfiguration} - * @param handler Jersey client handler. Useful when plugging in various http client interaction - * modules (e.g. ribbon) * @param filters Chain of client side filters to be applied per request */ public WorkflowClient( ClientConfig config, ConductorClientConfiguration clientConfiguration, - ClientHandler handler, - ClientFilter... filters) { - super(new ClientRequestHandler(config, handler, filters), clientConfiguration); + ClientRequestFilter... filters) { + super(new ClientRequestHandler(config, filters), clientConfiguration); } WorkflowClient(ClientRequestHandler requestHandler) { diff --git a/client/src/test/groovy/com/netflix/conductor/client/http/EventClientSpec.groovy b/client/src/test/groovy/com/netflix/conductor/client/http/EventClientSpec.groovy index bb121c56d..ec1602736 100644 --- a/client/src/test/groovy/com/netflix/conductor/client/http/EventClientSpec.groovy +++ b/client/src/test/groovy/com/netflix/conductor/client/http/EventClientSpec.groovy @@ -14,8 +14,8 @@ package com.netflix.conductor.client.http import com.netflix.conductor.common.metadata.events.EventHandler -import com.sun.jersey.api.client.ClientResponse -import com.sun.jersey.api.client.WebResource +import jakarta.ws.rs.client.Invocation +import jakarta.ws.rs.core.Response import spock.lang.Subject import spock.lang.Unroll @@ -33,36 +33,47 @@ class EventClientSpec extends ClientSpecification { given: EventHandler handler = new EventHandler() URI uri = createURI("event") + Invocation.Builder builder = Mock(Invocation.Builder.class) + Response response = Mock(Response.class) + when: eventClient.registerEventHandler(handler) then: - 1 * requestHandler.getWebResourceBuilder(uri, handler) >> Mock(WebResource.Builder.class) + 1 * requestHandler.getWebResourceBuilder(uri) >> builder + 1 * builder.post(_) >> response + 1 * response.getStatus() >> 200 } def "update event handler"() { given: EventHandler handler = new EventHandler() URI uri = createURI("event") + Invocation.Builder builder = Mock(Invocation.Builder.class) + Response response = Mock(Response.class) when: eventClient.updateEventHandler(handler) then: - 1 * requestHandler.getWebResourceBuilder(uri, handler) >> Mock(WebResource.Builder.class) + 1 * requestHandler.getWebResourceBuilder(uri) >> builder + 1 * builder.put(_) >> response + 1 * response.getStatus() >> 200 } def "unregister event handler"() { given: String eventName = "test" URI uri = createURI("event/$eventName") + Response response = Mock(Response.class) when: eventClient.unregisterEventHandler(eventName) then: - 1 * requestHandler.delete(uri, null) + 1 * requestHandler.delete(uri) >> response + 1 * response.getStatus() >> 200 } @Unroll @@ -77,8 +88,8 @@ class EventClientSpec extends ClientSpecification { then: eventHandlers && eventHandlers.size() == 2 - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> handlers + 1 * requestHandler.get(uri) >> Mock(Response.class) { + readEntity(_) >> handlers } where: diff --git a/client/src/test/groovy/com/netflix/conductor/client/http/MetadataClientSpec.groovy b/client/src/test/groovy/com/netflix/conductor/client/http/MetadataClientSpec.groovy index 13790d3fc..3e010fadd 100644 --- a/client/src/test/groovy/com/netflix/conductor/client/http/MetadataClientSpec.groovy +++ b/client/src/test/groovy/com/netflix/conductor/client/http/MetadataClientSpec.groovy @@ -15,7 +15,7 @@ package com.netflix.conductor.client.http import com.netflix.conductor.client.exception.ConductorClientException import com.netflix.conductor.common.metadata.workflow.WorkflowDef -import com.sun.jersey.api.client.ClientResponse +import jakarta.ws.rs.core.Response import spock.lang.Subject class MetadataClientSpec extends ClientSpecification { @@ -33,12 +33,14 @@ class MetadataClientSpec extends ClientSpecification { String workflowName = 'test' int version = 1 URI uri = createURI("metadata/workflow/$workflowName/$version") + Response response = Mock(Response.class) when: metadataClient.unregisterWorkflowDef(workflowName, version) then: - 1 * requestHandler.delete(uri, null) + 1 * requestHandler.delete(uri) >> response + 1 * response.getStatus() >> 200 } def "workflow delete throws exception"() { @@ -51,7 +53,7 @@ class MetadataClientSpec extends ClientSpecification { metadataClient.unregisterWorkflowDef(workflowName, version) then: - 1 * requestHandler.delete(uri, null) >> { throw new RuntimeException(clientResponse) } + 1 * requestHandler.delete(uri) >> { throw new RuntimeException() } def ex = thrown(ConductorClientException.class) ex.message == "Unable to invoke Conductor API with uri: $uri, runtime exception occurred" } @@ -87,8 +89,8 @@ class MetadataClientSpec extends ClientSpecification { metadataClient.getAllWorkflowsWithLatestVersions() then: - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> result + 1 * requestHandler.get(uri) >> Mock(Response.class) { + getEntity() >> result } } } diff --git a/client/src/test/groovy/com/netflix/conductor/client/http/TaskClientSpec.groovy b/client/src/test/groovy/com/netflix/conductor/client/http/TaskClientSpec.groovy index 2caba4490..2d924deeb 100644 --- a/client/src/test/groovy/com/netflix/conductor/client/http/TaskClientSpec.groovy +++ b/client/src/test/groovy/com/netflix/conductor/client/http/TaskClientSpec.groovy @@ -16,7 +16,7 @@ import com.netflix.conductor.common.metadata.tasks.Task import com.netflix.conductor.common.run.SearchResult import com.netflix.conductor.common.run.TaskSummary -import com.sun.jersey.api.client.ClientResponse +import jakarta.ws.rs.core.Response import spock.lang.Subject class TaskClientSpec extends ClientSpecification { @@ -42,8 +42,8 @@ class TaskClientSpec extends ClientSpecification { SearchResult searchResult = taskClient.search(query) then: - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> result + 1 * requestHandler.get(uri) >> Mock(Response.class) { + readEntity(_) >> result } searchResult.totalHits == result.totalHits @@ -64,8 +64,8 @@ class TaskClientSpec extends ClientSpecification { SearchResult searchResult = taskClient.searchV2('my_complex_query') then: - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> result + 1 * requestHandler.get(uri) >> Mock(Response.class) { + readEntity(_) >> result } searchResult.totalHits == result.totalHits @@ -90,8 +90,8 @@ class TaskClientSpec extends ClientSpecification { SearchResult searchResult = taskClient.search(start, size, sort, freeText, query) then: - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> result + 1 * requestHandler.get(uri) >> Mock(Response.class) { + readEntity(_) >> result } searchResult.totalHits == result.totalHits @@ -116,8 +116,8 @@ class TaskClientSpec extends ClientSpecification { SearchResult searchResult = taskClient.searchV2(start, size, sort, freeText, query) then: - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> result + 1 * requestHandler.get(uri) >> Mock(Response.class) { + readEntity(_) >> result } searchResult.totalHits == result.totalHits diff --git a/client/src/test/groovy/com/netflix/conductor/client/http/WorkflowClientSpec.groovy b/client/src/test/groovy/com/netflix/conductor/client/http/WorkflowClientSpec.groovy index b4f30aeb1..eea105b60 100644 --- a/client/src/test/groovy/com/netflix/conductor/client/http/WorkflowClientSpec.groovy +++ b/client/src/test/groovy/com/netflix/conductor/client/http/WorkflowClientSpec.groovy @@ -17,7 +17,7 @@ import com.netflix.conductor.common.run.SearchResult import com.netflix.conductor.common.run.Workflow import com.netflix.conductor.common.run.WorkflowSummary -import com.sun.jersey.api.client.ClientResponse +import jakarta.ws.rs.core.Response import spock.lang.Subject class WorkflowClientSpec extends ClientSpecification { @@ -43,8 +43,8 @@ class WorkflowClientSpec extends ClientSpecification { SearchResult searchResult = workflowClient.search(query) then: - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> result + 1 * requestHandler.get(uri) >> Mock(Response.class) { + readEntity(_) >> result } searchResult.totalHits == result.totalHits @@ -65,8 +65,8 @@ class WorkflowClientSpec extends ClientSpecification { SearchResult searchResult = workflowClient.searchV2('my_complex_query') then: - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> result + 1 * requestHandler.get(uri) >> Mock(Response.class) { + readEntity(_) >> result } searchResult.totalHits == result.totalHits @@ -91,8 +91,8 @@ class WorkflowClientSpec extends ClientSpecification { SearchResult searchResult = workflowClient.search(start, size, sort, freeText, query) then: - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> result + 1 * requestHandler.get(uri) >> Mock(Response.class) { + readEntity(_) >> result } searchResult.totalHits == result.totalHits @@ -117,8 +117,8 @@ class WorkflowClientSpec extends ClientSpecification { SearchResult searchResult = workflowClient.searchV2(start, size, sort, freeText, query) then: - 1 * requestHandler.get(uri) >> Mock(ClientResponse.class) { - getEntity(_) >> result + 1 * requestHandler.get(uri) >> Mock(Response.class) { + readEntity(_) >> result } searchResult.totalHits == result.totalHits diff --git a/dependencies.gradle b/dependencies.gradle index 9c6343a77..e4033cba5 100644 --- a/dependencies.gradle +++ b/dependencies.gradle @@ -26,7 +26,7 @@ ext { revDynoQueues = '2.0.20' revElasticSearch6 = '6.8.23' revEmbeddedRedis = '0.6' - revEurekaClient = '1.10.10' + revEurekaClient = '2.0.2' revGroovy = '4.0.9' revGrpc = '1.57.2' revGuava = '33.2.1-jre' @@ -36,10 +36,10 @@ ext { revProtoBuf = '3.21.12' revJakartaAnnotation = '2.1.1' revJAXB = '4.0.1' - revJAXRS = '2.1.1' + revJAXRS = '4.0.0' revJedis = '3.3.0' - revJersey = '1.19.4' - revJerseyCommon = '2.22.2' + revJersey = '3.1.7' + revJerseyCommon = '3.1.7' revJsonPath = '2.4.0' revJq = '0.0.13' revJsr311Api = '1.1.1' diff --git a/java-sdk/build.gradle b/java-sdk/build.gradle index 91ce0f4cb..9c16f6da7 100644 --- a/java-sdk/build.gradle +++ b/java-sdk/build.gradle @@ -8,8 +8,8 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-databind:${revFasterXml}" implementation "com.google.guava:guava:${revGuava}" implementation "cglib:cglib:3.3.0" - implementation "com.sun.jersey:jersey-client:${revJersey}" - implementation "javax.ws.rs:javax.ws.rs-api:${revJAXRS}" + implementation "org.glassfish.jersey.core:jersey-client:${revJersey}" + implementation "jakarta.ws.rs:jakarta.ws.rs-api:${revJAXRS}" implementation "org.glassfish.jersey.core:jersey-common:${revJerseyCommon}" implementation "org.openjdk.nashorn:nashorn-core:15.4" @@ -30,4 +30,3 @@ test { } } sourceSets.main.java.srcDirs += ['example/java', 'example/resources'] - diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java index d80d55928..f71073a3a 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/executor/WorkflowExecutor.java @@ -19,6 +19,7 @@ import java.util.Map; import java.util.concurrent.*; +import org.glassfish.jersey.client.ClientConfig; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -38,9 +39,7 @@ import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.databind.Module; import com.fasterxml.jackson.databind.ObjectMapper; -import com.sun.jersey.api.client.ClientHandler; -import com.sun.jersey.api.client.config.DefaultClientConfig; -import com.sun.jersey.api.client.filter.ClientFilter; +import jakarta.ws.rs.client.ClientRequestFilter; public class WorkflowExecutor { @@ -91,17 +90,15 @@ public WorkflowExecutor(String apiServerURL) { } public WorkflowExecutor( - String apiServerURL, int pollingInterval, ClientFilter... clientFilter) { + String apiServerURL, int pollingInterval, ClientRequestFilter... clientFilter) { - taskClient = new TaskClient(new DefaultClientConfig(), (ClientHandler) null, clientFilter); + taskClient = new TaskClient(new ClientConfig(), clientFilter); taskClient.setRootURI(apiServerURL); - workflowClient = - new WorkflowClient(new DefaultClientConfig(), (ClientHandler) null, clientFilter); + workflowClient = new WorkflowClient(new ClientConfig(), clientFilter); workflowClient.setRootURI(apiServerURL); - metadataClient = - new MetadataClient(new DefaultClientConfig(), (ClientHandler) null, clientFilter); + metadataClient = new MetadataClient(new ClientConfig(), clientFilter); metadataClient.setRootURI(apiServerURL); annotatedWorkerExecutor = new AnnotatedWorkerExecutor(taskClient, pollingInterval); diff --git a/test-harness/build.gradle b/test-harness/build.gradle index c250eddf6..db73b2706 100644 --- a/test-harness/build.gradle +++ b/test-harness/build.gradle @@ -38,6 +38,6 @@ dependencies { testImplementation "org.testcontainers:elasticsearch:${revTestContainer}" testImplementation('junit:junit:4.13.2') testImplementation "org.junit.vintage:junit-vintage-engine" - testImplementation "javax.ws.rs:javax.ws.rs-api:${revJAXRS}" + testImplementation "jakarta.ws.rs:jakarta.ws.rs-api:${revJAXRS}" testImplementation "org.glassfish.jersey.core:jersey-common:${revJerseyCommon}" } From a161f6fbbea6e189e7ec7ae4d5142b0c6a740b81 Mon Sep 17 00:00:00 2001 From: Kay Ulbrich Date: Fri, 23 Aug 2024 16:08:27 +0200 Subject: [PATCH 09/25] Add build arg YARN_OPTS to be added to "yarn" on Docker build --- docker/docker-compose-mysql.yaml | 2 ++ docker/docker-compose-postgres-es7.yaml | 2 ++ docker/docker-compose-postgres.yaml | 2 ++ docker/docker-compose.yaml | 2 ++ docker/server/Dockerfile | 3 ++- 5 files changed, 10 insertions(+), 1 deletion(-) diff --git a/docker/docker-compose-mysql.yaml b/docker/docker-compose-mysql.yaml index 5c587e321..8c2c3753e 100644 --- a/docker/docker-compose-mysql.yaml +++ b/docker/docker-compose-mysql.yaml @@ -10,6 +10,8 @@ services: build: context: ../ dockerfile: docker/server/Dockerfile + args: + YARN_OPTS: ${YARN_OPTS} networks: - internal ports: diff --git a/docker/docker-compose-postgres-es7.yaml b/docker/docker-compose-postgres-es7.yaml index 1dff23682..383be9500 100644 --- a/docker/docker-compose-postgres-es7.yaml +++ b/docker/docker-compose-postgres-es7.yaml @@ -9,6 +9,8 @@ services: build: context: ../ dockerfile: docker/server/Dockerfile + args: + YARN_OPTS: ${YARN_OPTS} networks: - internal ports: diff --git a/docker/docker-compose-postgres.yaml b/docker/docker-compose-postgres.yaml index 1e86fc69a..d1a51cefe 100644 --- a/docker/docker-compose-postgres.yaml +++ b/docker/docker-compose-postgres.yaml @@ -9,6 +9,8 @@ services: build: context: ../ dockerfile: docker/server/Dockerfile + args: + YARN_OPTS: ${YARN_OPTS} networks: - internal ports: diff --git a/docker/docker-compose.yaml b/docker/docker-compose.yaml index 7c32c1437..9425f735a 100644 --- a/docker/docker-compose.yaml +++ b/docker/docker-compose.yaml @@ -9,6 +9,8 @@ services: build: context: ../ dockerfile: docker/server/Dockerfile + args: + YARN_OPTS: ${YARN_OPTS} networks: - internal ports: diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index 4dd41bad1..97a22463e 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -25,6 +25,7 @@ RUN ls -ltr # 1. Builder stage 2 # =========================================================================================================== FROM alpine:3.20 AS ui-builder +ARG YARN_OPTS LABEL maintainer="Orkes OSS " @@ -36,7 +37,7 @@ WORKDIR /conductor/ui # Include monaco sources into bundle (instead of using CDN) ENV REACT_APP_MONACO_EDITOR_USING_CDN=false -RUN yarn install && cp -r node_modules/monaco-editor public/ && yarn build +RUN yarn ${YARN_OPTS} install && cp -r node_modules/monaco-editor public/ && yarn ${YARN_OPTS} build RUN ls -ltr RUN echo "Done building UI" From aafc53c33732c27ffade4ebed0ddbe2e84b19557 Mon Sep 17 00:00:00 2001 From: Kay Ulbrich Date: Fri, 23 Aug 2024 16:33:26 +0200 Subject: [PATCH 10/25] Extended Docker build documentation --- docker/README.md | 32 ++++++++++++++++++++++++++++++++ 1 file changed, 32 insertions(+) diff --git a/docker/README.md b/docker/README.md index e19a916ee..2f80e85d0 100644 --- a/docker/README.md +++ b/docker/README.md @@ -49,3 +49,35 @@ TODO: Link to the helm charts | [docker-compose.yaml](docker-compose.yaml) | Redis + Elasticsearch 7 | | [docker-compose-postgres.yaml](docker-compose-postgres.yaml) | Postgres + Elasticsearch 7 | | [docker-compose-mysql.yaml](docker-compose-mysql.yaml) | Mysql + Elasticsearch 7 | + +### Network errors during UI build with yarn + +It has been observed, that the UI build may fail with an error message like + +``` +> [linux/arm64 ui-builder 5/7] RUN yarn install && cp -r node_modules/monaco-editor public/ && yarn build: +269.9 at Object.onceWrapper (node:events:633:28) +269.9 at TLSSocket.emit (node:events:531:35) +269.9 at Socket._onTimeout (node:net:590:8) +269.9 at listOnTimeout (node:internal/timers:573:17) +269.9 at process.processTimers (node:internal/timers:514:7) +269.9 info Visit https://yarnpkg.com/en/docs/cli/install for documentation about this command. +281.2 info There appears to be trouble with your network connection. Retrying... +313.5 info There appears to be trouble with your network connection. Retrying... +920.3 info There appears to be trouble with your network connection. Retrying... +953.6 info There appears to be trouble with your network connection. Retrying... +``` + +This can happen, even if the network resources `yarn` tries to use are available, but there is too much network latency. `yarn` accepts the option `--network-timeout` to set a custom timeout in milliseconds. + +For passing arguments to `yarn`, in [this Dockerfile](server/Dockerfile) the build arg `YARN_OPTS` has been added. This argument will be added to each `yarn` call. When using one of the `docker-compose-*` files, you can set this via the eponymous environment variable `YARN_OPTS`, e.g.: + +``` +YARN_OPTS='--network-timeout 10000000' docker compose -f docker-compose.yaml up +``` + +When building a Docker image using `docker`, you must call it like e.g. + +``` +docker build --build-arg='YARN_OPTS=--network-timeout 10000000' .. -f server/Dockerfile -t oss-conductor:v3.21.4 +``` From 42251ac39c7de0cdcaa0f4b11617762269e9c9e6 Mon Sep 17 00:00:00 2001 From: Kay Ulbrich Date: Fri, 23 Aug 2024 16:47:52 +0200 Subject: [PATCH 11/25] Added to README --- docker/README.md | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/docker/README.md b/docker/README.md index 2f80e85d0..d096159a1 100644 --- a/docker/README.md +++ b/docker/README.md @@ -70,7 +70,9 @@ It has been observed, that the UI build may fail with an error message like This can happen, even if the network resources `yarn` tries to use are available, but there is too much network latency. `yarn` accepts the option `--network-timeout` to set a custom timeout in milliseconds. -For passing arguments to `yarn`, in [this Dockerfile](server/Dockerfile) the build arg `YARN_OPTS` has been added. This argument will be added to each `yarn` call. When using one of the `docker-compose-*` files, you can set this via the eponymous environment variable `YARN_OPTS`, e.g.: +For passing arguments to `yarn`, in [this Dockerfile](server/Dockerfile) the build arg `YARN_OPTS` has been added. This argument will be added to each `yarn` call. It is not compulsory to set this argument. + +When using one of the `docker-compose-*` files, you can set this via the eponymous environment variable `YARN_OPTS`, e.g.: ``` YARN_OPTS='--network-timeout 10000000' docker compose -f docker-compose.yaml up From faa0e6b77b7aca2f65a521b2d8517d34d07a1917 Mon Sep 17 00:00:00 2001 From: Kay Ulbrich Date: Fri, 23 Aug 2024 17:30:26 +0200 Subject: [PATCH 12/25] Add curl to standard Docker image --- docker/server/Dockerfile | 1 + 1 file changed, 1 insertion(+) diff --git a/docker/server/Dockerfile b/docker/server/Dockerfile index 4dd41bad1..1562ea999 100644 --- a/docker/server/Dockerfile +++ b/docker/server/Dockerfile @@ -50,6 +50,7 @@ LABEL maintainer="Orkes OSS " RUN apk add openjdk17 RUN apk add nginx +RUN apk add curl # Make app folders RUN mkdir -p /app/config /app/logs /app/libs From 896e49efd1919d37de4b32100805e4347f9052bc Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Fri, 23 Aug 2024 14:44:08 -0700 Subject: [PATCH 13/25] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 5f0407a40..b05b2e0bf 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -![Conductor](docs/img/logo.svg) +![Conductor](https://assets.conductor-oss.org/logo.png) [![Github release](https://img.shields.io/github/v/release/conductor-oss/conductor.svg)](https://GitHub.com/Netflix/conductor-oss/releases) [![License](https://img.shields.io/github/license/conductor-oss/conductor.svg)](http://www.apache.org/licenses/LICENSE-2.0) From 6aac51ea92b98d5e0e7c101938c3dc4b1183e2d9 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Fri, 23 Aug 2024 14:49:34 -0700 Subject: [PATCH 14/25] Update README.md --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index b05b2e0bf..8c80070e0 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,4 @@ -![Conductor](https://assets.conductor-oss.org/logo.png) + [![Github release](https://img.shields.io/github/v/release/conductor-oss/conductor.svg)](https://GitHub.com/Netflix/conductor-oss/releases) [![License](https://img.shields.io/github/license/conductor-oss/conductor.svg)](http://www.apache.org/licenses/LICENSE-2.0) From e1f495a6c99d11261adbe99b921a9b88a82ce604 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Fri, 23 Aug 2024 14:53:45 -0700 Subject: [PATCH 15/25] Update README.md --- README.md | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/README.md b/README.md index 8c80070e0..0996501e4 100644 --- a/README.md +++ b/README.md @@ -1,4 +1,5 @@ - +![Conductor OSS Logo](https://assets.conductor-oss.org/logo.png "Conductor OSS") + [![Github release](https://img.shields.io/github/v/release/conductor-oss/conductor.svg)](https://GitHub.com/Netflix/conductor-oss/releases) [![License](https://img.shields.io/github/license/conductor-oss/conductor.svg)](http://www.apache.org/licenses/LICENSE-2.0) From 29f5a9adae5be181329d8fc5eae249bca6243a81 Mon Sep 17 00:00:00 2001 From: Viren Baraiya Date: Fri, 23 Aug 2024 15:00:04 -0700 Subject: [PATCH 16/25] update logo --- ui/src/plugins/AppLogo.jsx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ui/src/plugins/AppLogo.jsx b/ui/src/plugins/AppLogo.jsx index aa29ec996..14b1eae1e 100644 --- a/ui/src/plugins/AppLogo.jsx +++ b/ui/src/plugins/AppLogo.jsx @@ -13,6 +13,6 @@ const useStyles = makeStyles((theme) => ({ export default function AppLogo() { const classes = useStyles(); - const logoPath = getBasename() + 'logo.svg'; + const logoPath = 'https://assets.conductor-oss.org/logo.png'; return Conductor; } From 218a6001012d87a9040a951e887678565f906492 Mon Sep 17 00:00:00 2001 From: kay-horst <37873708+kay-horst@users.noreply.github.com> Date: Sat, 24 Aug 2024 13:42:46 +0200 Subject: [PATCH 17/25] Corrected wording --- docker/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docker/README.md b/docker/README.md index d096159a1..1074cbe10 100644 --- a/docker/README.md +++ b/docker/README.md @@ -72,7 +72,7 @@ This can happen, even if the network resources `yarn` tries to use are available For passing arguments to `yarn`, in [this Dockerfile](server/Dockerfile) the build arg `YARN_OPTS` has been added. This argument will be added to each `yarn` call. It is not compulsory to set this argument. -When using one of the `docker-compose-*` files, you can set this via the eponymous environment variable `YARN_OPTS`, e.g.: +When using one of the `docker-compose-*` files, you can set this via the environment variable `YARN_OPTS`, e.g.: ``` YARN_OPTS='--network-timeout 10000000' docker compose -f docker-compose.yaml up From 467a6a6bd02d243b9c348f1eae8770b82c9c2f92 Mon Sep 17 00:00:00 2001 From: kay-horst <37873708+kay-horst@users.noreply.github.com> Date: Sat, 24 Aug 2024 15:39:22 +0200 Subject: [PATCH 18/25] Improved wording --- docker/README.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/docker/README.md b/docker/README.md index 1074cbe10..809260c1c 100644 --- a/docker/README.md +++ b/docker/README.md @@ -68,9 +68,9 @@ It has been observed, that the UI build may fail with an error message like 953.6 info There appears to be trouble with your network connection. Retrying... ``` -This can happen, even if the network resources `yarn` tries to use are available, but there is too much network latency. `yarn` accepts the option `--network-timeout` to set a custom timeout in milliseconds. +This does not necessarily mean, that the network is unavailable, but can be caused by too high latency, as well. `yarn` accepts the option `--network-timeout <#ms>` to set a custom timeout in milliseconds. -For passing arguments to `yarn`, in [this Dockerfile](server/Dockerfile) the build arg `YARN_OPTS` has been added. This argument will be added to each `yarn` call. It is not compulsory to set this argument. +For passing arguments to `yarn`, in [this Dockerfile](server/Dockerfile) the _optional_ build arg `YARN_OPTS` has been added. This argument will be added to each `yarn` call. When using one of the `docker-compose-*` files, you can set this via the environment variable `YARN_OPTS`, e.g.: From 05df715947b9e7608ea0a8e1c973624e4ece3e8e Mon Sep 17 00:00:00 2001 From: junaidHussain-clari <91540528+junaidHussain-clari@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:42:01 +0530 Subject: [PATCH 19/25] made changes to pass down the taskToDomain map to child workflow in case of events --- .../core/events/SimpleActionProcessor.java | 14 +++++- .../conductor/core/execution/tasks/Event.java | 1 + .../events/TestSimpleActionProcessor.java | 43 +++++++++++++++++++ 3 files changed, 57 insertions(+), 1 deletion(-) diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java index 474d7cfa3..46fcd545d 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java @@ -32,6 +32,7 @@ import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; +import org.springframework.util.CollectionUtils; /** * Action Processor subscribes to the Event Actions queue and processes the actions (e.g. start @@ -196,10 +197,19 @@ private Map startWorkflow( Map workflowInput = parametersUtils.replace(inputParams, payload); Map paramsMap = new HashMap<>(); + // extracting taskToDomain map from the event payload + paramsMap.put("taskToDomain","${taskToDomain}"); Optional.ofNullable(params.getCorrelationId()) .ifPresent(value -> paramsMap.put("correlationId", value)); Map replaced = parametersUtils.replace(paramsMap, payload); + // if taskToDomain is absent from event handler definition, and taskDomain Map is passed as a part of payload + // then assign payload taskToDomain map to the new workflow instance + final Map taskToDomain = params.getTaskToDomain() != null ? + params.getTaskToDomain() : + (Map) replaced.get("taskToDomain"); + + workflowInput.put("conductor.event.messageId", messageId); workflowInput.put("conductor.event.name", event); @@ -212,7 +222,9 @@ private Map startWorkflow( .orElse(params.getCorrelationId())); startWorkflowInput.setWorkflowInput(workflowInput); startWorkflowInput.setEvent(event); - startWorkflowInput.setTaskToDomain(params.getTaskToDomain()); + if (!CollectionUtils.isEmpty(taskToDomain)) { + startWorkflowInput.setTaskToDomain(taskToDomain); + } String workflowId = workflowExecutor.startWorkflow(startWorkflowInput); diff --git a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java index 9674efe07..a46cc69f1 100644 --- a/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java +++ b/core/src/main/java/com/netflix/conductor/core/execution/tasks/Event.java @@ -62,6 +62,7 @@ public void start(WorkflowModel workflow, TaskModel task, WorkflowExecutor workf payload.put("workflowType", workflow.getWorkflowName()); payload.put("workflowVersion", workflow.getWorkflowVersion()); payload.put("correlationId", workflow.getCorrelationId()); + payload.put("taskToDomain", workflow.getTaskToDomain()); task.setStatus(TaskModel.Status.IN_PROGRESS); task.addOutput(payload); diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java index eef93c734..afda4c769 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java @@ -114,6 +114,49 @@ public void testStartWorkflow_correlationId() throws Exception { assertEquals(taskToDomain, capturedValue.getTaskToDomain()); } + @Test + public void testStartWorkflow_taskDomain() throws Exception { + StartWorkflow startWorkflow = new StartWorkflow(); + startWorkflow.setName("testWorkflow"); + startWorkflow.getInput().put("testInput", "${testId}"); + + Action action = new Action(); + action.setAction(Type.start_workflow); + action.setStart_workflow(startWorkflow); + + Object payload = + objectMapper.readValue( + "{ \"testId\": \"test_1\", \"taskToDomain\":{\"testTask\":\"testDomain\"} }", Object.class); + + Map taskToDomain = new HashMap<>(); + taskToDomain.put("testTask", "testDomain"); + + WorkflowDef workflowDef = new WorkflowDef(); + workflowDef.setName("testWorkflow"); + workflowDef.setVersion(1); + + when(workflowExecutor.startWorkflow(any())).thenReturn("workflow_1"); + + Map output = + actionProcessor.execute(action, payload, "testEvent", "testMessage"); + + assertNotNull(output); + assertEquals("workflow_1", output.get("workflowId")); + + ArgumentCaptor startWorkflowInputArgumentCaptor = + ArgumentCaptor.forClass(StartWorkflowInput.class); + + verify(workflowExecutor).startWorkflow(startWorkflowInputArgumentCaptor.capture()); + StartWorkflowInput capturedValue = startWorkflowInputArgumentCaptor.getValue(); + + assertEquals("test_1", capturedValue.getWorkflowInput().get("testInput")); + assertEquals(taskToDomain, capturedValue.getTaskToDomain()); + assertEquals( + "testMessage", capturedValue.getWorkflowInput().get("conductor.event.messageId")); + assertEquals("testEvent", capturedValue.getWorkflowInput().get("conductor.event.name")); + } + + @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testStartWorkflow() throws Exception { From 70b06c77510614aa3e4a0b855538137cbcd960a1 Mon Sep 17 00:00:00 2001 From: junaidHussain-clari <91540528+junaidHussain-clari@users.noreply.github.com> Date: Tue, 27 Aug 2024 15:46:31 +0530 Subject: [PATCH 20/25] ran gradlew spotlessApply for formatting --- .../core/events/SimpleActionProcessor.java | 15 ++++++++------- .../core/events/TestSimpleActionProcessor.java | 4 ++-- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java index 46fcd545d..84e723439 100644 --- a/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java +++ b/core/src/main/java/com/netflix/conductor/core/events/SimpleActionProcessor.java @@ -19,6 +19,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Component; +import org.springframework.util.CollectionUtils; import com.netflix.conductor.common.metadata.events.EventHandler.Action; import com.netflix.conductor.common.metadata.events.EventHandler.StartWorkflow; @@ -32,7 +33,6 @@ import com.netflix.conductor.metrics.Monitors; import com.netflix.conductor.model.TaskModel; import com.netflix.conductor.model.WorkflowModel; -import org.springframework.util.CollectionUtils; /** * Action Processor subscribes to the Event Actions queue and processes the actions (e.g. start @@ -198,17 +198,18 @@ private Map startWorkflow( Map paramsMap = new HashMap<>(); // extracting taskToDomain map from the event payload - paramsMap.put("taskToDomain","${taskToDomain}"); + paramsMap.put("taskToDomain", "${taskToDomain}"); Optional.ofNullable(params.getCorrelationId()) .ifPresent(value -> paramsMap.put("correlationId", value)); Map replaced = parametersUtils.replace(paramsMap, payload); - // if taskToDomain is absent from event handler definition, and taskDomain Map is passed as a part of payload + // if taskToDomain is absent from event handler definition, and taskDomain Map is passed + // as a part of payload // then assign payload taskToDomain map to the new workflow instance - final Map taskToDomain = params.getTaskToDomain() != null ? - params.getTaskToDomain() : - (Map) replaced.get("taskToDomain"); - + final Map taskToDomain = + params.getTaskToDomain() != null + ? params.getTaskToDomain() + : (Map) replaced.get("taskToDomain"); workflowInput.put("conductor.event.messageId", messageId); workflowInput.put("conductor.event.name", event); diff --git a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java index afda4c769..3e57142b7 100644 --- a/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java +++ b/core/src/test/java/com/netflix/conductor/core/events/TestSimpleActionProcessor.java @@ -126,7 +126,8 @@ public void testStartWorkflow_taskDomain() throws Exception { Object payload = objectMapper.readValue( - "{ \"testId\": \"test_1\", \"taskToDomain\":{\"testTask\":\"testDomain\"} }", Object.class); + "{ \"testId\": \"test_1\", \"taskToDomain\":{\"testTask\":\"testDomain\"} }", + Object.class); Map taskToDomain = new HashMap<>(); taskToDomain.put("testTask", "testDomain"); @@ -156,7 +157,6 @@ public void testStartWorkflow_taskDomain() throws Exception { assertEquals("testEvent", capturedValue.getWorkflowInput().get("conductor.event.name")); } - @SuppressWarnings({"unchecked", "rawtypes"}) @Test public void testStartWorkflow() throws Exception { From 72c1f42861f1cc3226a94ce3f48e1f81256605a0 Mon Sep 17 00:00:00 2001 From: Pablo Cuadrado Date: Mon, 2 Sep 2024 20:54:40 -0300 Subject: [PATCH 21/25] Remove unused import --- ui/src/plugins/AppLogo.jsx | 1 - 1 file changed, 1 deletion(-) diff --git a/ui/src/plugins/AppLogo.jsx b/ui/src/plugins/AppLogo.jsx index 14b1eae1e..72933b613 100644 --- a/ui/src/plugins/AppLogo.jsx +++ b/ui/src/plugins/AppLogo.jsx @@ -1,6 +1,5 @@ import React from "react"; import { makeStyles } from "@material-ui/core/styles"; -import { getBasename } from "../utils/helpers"; import { cleanDuplicateSlash } from "./fetch"; const useStyles = makeStyles((theme) => ({ From 279ec23d6855ef121f55d2b5e4b901db9fa7a94d Mon Sep 17 00:00:00 2001 From: Kay Ulbrich Date: Sat, 24 Aug 2024 17:43:16 +0200 Subject: [PATCH 22/25] Add Jackson Kotlin module to ObjectMapper instances --- common/build.gradle | 2 +- .../netflix/conductor/common/config/ObjectMapperProvider.java | 2 ++ java-sdk/build.gradle | 2 +- .../conductor/sdk/workflow/utils/ObjectMapperProvider.java | 2 ++ 4 files changed, 6 insertions(+), 2 deletions(-) diff --git a/common/build.gradle b/common/build.gradle index 8332cb312..f6bdf1230 100644 --- a/common/build.gradle +++ b/common/build.gradle @@ -21,6 +21,7 @@ dependencies { implementation "com.fasterxml.jackson.core:jackson-core:${revFasterXml}" // https://github.com/FasterXML/jackson-modules-base/tree/master/afterburner implementation "com.fasterxml.jackson.module:jackson-module-afterburner:${revFasterXml}" + implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${revFasterXml}" testImplementation 'org.springframework.boot:spring-boot-starter-validation' testImplementation "org.springdoc:springdoc-openapi-starter-webmvc-ui:${revSpringDoc}" @@ -53,4 +54,3 @@ task protogen(dependsOn: jar, type: JavaExec) { "com.netflix.conductor.common", ) } - diff --git a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java index 5e3a5562c..815dcc2d0 100644 --- a/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java +++ b/common/src/main/java/com/netflix/conductor/common/config/ObjectMapperProvider.java @@ -20,6 +20,7 @@ import com.fasterxml.jackson.databind.SerializationFeature; import com.fasterxml.jackson.datatype.jsr310.JavaTimeModule; import com.fasterxml.jackson.module.afterburner.AfterburnerModule; +import com.fasterxml.jackson.module.kotlin.KotlinModule; /** * A Factory class for creating a customized {@link ObjectMapper}. This is only used by the @@ -58,6 +59,7 @@ private static ObjectMapper _getObjectMapper() { objectMapper.registerModule(new JsonProtoModule()); objectMapper.registerModule(new JavaTimeModule()); objectMapper.registerModule(new AfterburnerModule()); + objectMapper.registerModule(new KotlinModule.Builder().build()); return objectMapper; } } diff --git a/java-sdk/build.gradle b/java-sdk/build.gradle index 91ce0f4cb..c8aba12ad 100644 --- a/java-sdk/build.gradle +++ b/java-sdk/build.gradle @@ -6,6 +6,7 @@ dependencies { implementation project(':conductor-client') implementation "com.fasterxml.jackson.core:jackson-databind:${revFasterXml}" + implementation "com.fasterxml.jackson.module:jackson-module-kotlin:${revFasterXml}" implementation "com.google.guava:guava:${revGuava}" implementation "cglib:cglib:3.3.0" implementation "com.sun.jersey:jersey-client:${revJersey}" @@ -30,4 +31,3 @@ test { } } sourceSets.main.java.srcDirs += ['example/java', 'example/resources'] - diff --git a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/utils/ObjectMapperProvider.java b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/utils/ObjectMapperProvider.java index 1721c8236..998297457 100644 --- a/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/utils/ObjectMapperProvider.java +++ b/java-sdk/src/main/java/com/netflix/conductor/sdk/workflow/utils/ObjectMapperProvider.java @@ -18,6 +18,7 @@ import com.fasterxml.jackson.databind.DeserializationFeature; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.SerializationFeature; +import com.fasterxml.jackson.module.kotlin.KotlinModule; public class ObjectMapperProvider { @@ -35,6 +36,7 @@ public ObjectMapper getObjectMapper() { // objectMapper.setSerializationInclusion(JsonInclude.Include.); objectMapper.registerModule(new JsonProtoModule()); + objectMapper.registerModule(new KotlinModule.Builder().build()); return objectMapper; } } From e72500cbc98aedde7d1281bccb7644a21f3b3675 Mon Sep 17 00:00:00 2001 From: Miguel Prieto Date: Wed, 11 Sep 2024 15:29:15 -0400 Subject: [PATCH 23/25] Fix CI build-ui. Upgrading to v3 due to deprecation. SEE: https://github.blog/changelog/2024-02-13-deprecation-notice-v1-and-v2-of-the-artifact-actions/ --- .github/workflows/ci.yml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/.github/workflows/ci.yml b/.github/workflows/ci.yml index a25c97dd2..8172e83ad 100644 --- a/.github/workflows/ci.yml +++ b/.github/workflows/ci.yml @@ -89,14 +89,14 @@ jobs: component: true - name: Archive test screenshots - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 if: failure() with: name: cypress-screenshots path: ui/cypress/screenshots - name: Archive test videos - uses: actions/upload-artifact@v2 + uses: actions/upload-artifact@v3 if: always() with: name: cypress-videos From 4524c9cf0bc59997ccbb910532daece85f34b77e Mon Sep 17 00:00:00 2001 From: Miguel Prieto Date: Wed, 11 Sep 2024 19:37:02 -0400 Subject: [PATCH 24/25] The assertions made by these tests were wrong. We have no guarantees on which integration_task_2 will be polled and completed first. --- test-harness/build.gradle | 1 + ...rchicalForkJoinSubworkflowRerunSpec.groovy | 24 +++++++++++++++---- ...hicalForkJoinSubworkflowRestartSpec.groovy | 24 +++++++++++++++---- 3 files changed, 41 insertions(+), 8 deletions(-) diff --git a/test-harness/build.gradle b/test-harness/build.gradle index db73b2706..5f8a64a59 100644 --- a/test-harness/build.gradle +++ b/test-harness/build.gradle @@ -40,4 +40,5 @@ dependencies { testImplementation "org.junit.vintage:junit-vintage-engine" testImplementation "jakarta.ws.rs:jakarta.ws.rs-api:${revJAXRS}" testImplementation "org.glassfish.jersey.core:jersey-common:${revJerseyCommon}" + testImplementation "org.awaitility:awaitility:${revAwaitility}" } diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy index 1a8f1dff4..8496eef94 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRerunSpec.groovy @@ -12,6 +12,8 @@ */ package com.netflix.conductor.test.integration +import java.util.concurrent.TimeUnit + import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task @@ -30,6 +32,8 @@ import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOI import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask +import static org.awaitility.Awaitility.await + class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { @Shared @@ -230,10 +234,23 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { tasks[3].status == Task.Status.IN_PROGRESS } - when: "poll and complete the integration_task_2 task in the root workflow" + when: "poll and complete integration_task_2 in root and mid level workflow" def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId - workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId + // The root workflow has an integration_task_2. Its subworkflow also has an integration_task_2. + // We have NO guarantees which will be polled and completed first, so the assertions done in previous versions of this test were wrong. + await().atMost(10, TimeUnit.SECONDS).until { + workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) + def rootWf = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) + def midWf = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true) + + rootWf.status == Workflow.WorkflowStatus.RUNNING && + rootWf.tasks[2].taskType == 'integration_task_2' && + rootWf.tasks[2].status == Task.Status.COMPLETED && + midWf.status == Workflow.WorkflowStatus.RUNNING && + midWf.tasks[2].taskType == 'integration_task_2' && + midWf.tasks[2].status == Task.Status.COMPLETED + } then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -251,9 +268,8 @@ class HierarchicalForkJoinSubworkflowRerunSpec extends AbstractSpecification { tasks[3].status == Task.Status.IN_PROGRESS } - when: "poll and complete the integration_task_2 task in the root-level workflow" + when: "mid level workflow is in RUNNING state" def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId - workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" diff --git a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy index 1d9cc0b11..c50670bf0 100644 --- a/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy +++ b/test-harness/src/test/groovy/com/netflix/conductor/test/integration/HierarchicalForkJoinSubworkflowRestartSpec.groovy @@ -12,6 +12,8 @@ */ package com.netflix.conductor.test.integration +import java.util.concurrent.TimeUnit + import org.springframework.beans.factory.annotation.Autowired import com.netflix.conductor.common.metadata.tasks.Task @@ -29,6 +31,8 @@ import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_JOI import static com.netflix.conductor.common.metadata.tasks.TaskType.TASK_TYPE_SUB_WORKFLOW import static com.netflix.conductor.test.util.WorkflowTestUtil.verifyPolledAndAcknowledgedTask +import static org.awaitility.Awaitility.await + class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { @Shared @@ -231,10 +235,23 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { tasks[3].status == Task.Status.IN_PROGRESS } - when: "poll and complete the integration_task_2 task in the root workflow" + when: "poll and complete integration_task_2 in root and mid level workflow" def rootJoinId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTaskByRefName("fanouttask_join").taskId - workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def newMidLevelWorkflowId = workflowExecutionService.getExecutionStatus(rootWorkflowId, true).getTasks().get(1).subWorkflowId + // The root workflow has an integration_task_2. Its subworkflow also has an integration_task_2. + // We have NO guarantees which will be polled and completed first, so the assertions done in previous versions of this test were wrong. + await().atMost(10, TimeUnit.SECONDS).until { + workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) + def rootWf = workflowExecutionService.getExecutionStatus(rootWorkflowId, true) + def midWf = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true) + + rootWf.status == Workflow.WorkflowStatus.RUNNING && + rootWf.tasks[2].taskType == 'integration_task_2' && + rootWf.tasks[2].status == Task.Status.COMPLETED && + midWf.status == Workflow.WorkflowStatus.RUNNING && + midWf.tasks[2].taskType == 'integration_task_2' && + midWf.tasks[2].status == Task.Status.COMPLETED + } then: "verify that a new mid level workflow is created and is in RUNNING state" newMidLevelWorkflowId != midLevelWorkflowId @@ -251,9 +268,8 @@ class HierarchicalForkJoinSubworkflowRestartSpec extends AbstractSpecification { tasks[3].status == Task.Status.IN_PROGRESS } - when: "poll and complete the integration_task_2 task in the mid-level workflow" + when: "mid level workflow is in RUNNING state" def midJoinId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTaskByRefName("fanouttask_join").taskId - workflowTestUtil.pollAndCompleteTask('integration_task_2', 'task2.integration.worker', ['op': 'task2.done']) def newLeafWorkflowId = workflowExecutionService.getExecutionStatus(newMidLevelWorkflowId, true).getTasks().get(1).subWorkflowId then: "verify that a new leaf workflow is created and is in RUNNING state" From dec0f4d467ddee27391697bb80f4f52e46ac945d Mon Sep 17 00:00:00 2001 From: Robert Karlsson Date: Fri, 13 Sep 2024 10:07:41 +0900 Subject: [PATCH 25/25] Handle duplicate event inserts gracefully when using Postgresql --- .../netflix/conductor/postgres/dao/PostgresExecutionDAO.java | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java index aa5233f6b..597c3c7ff 100644 --- a/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java +++ b/postgres-persistence/src/main/java/com/netflix/conductor/postgres/dao/PostgresExecutionDAO.java @@ -903,7 +903,8 @@ private boolean insertEventExecution(Connection connection, EventExecution event String INSERT_EVENT_EXECUTION = "INSERT INTO event_execution (event_handler_name, event_name, message_id, execution_id, json_data) " - + "VALUES (?, ?, ?, ?, ?)"; + + "VALUES (?, ?, ?, ?, ?) " + + "ON CONFLICT DO NOTHING"; int count = query( connection,