From 450720183200cc53826179136b53431bbccfc926 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 18 Nov 2024 12:16:49 +0530 Subject: [PATCH 1/8] make BulkResponse parametrized --- .../netflix/conductor/common/model/BulkResponse.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java b/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java index b4133b5bf..6d4d16c3e 100644 --- a/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java +++ b/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java @@ -22,12 +22,12 @@ * Response object to return a list of succeeded entities and a map of failed ones, including error * message, for the bulk request. */ -public class BulkResponse { +public class BulkResponse { /** Key - entityId Value - error message processing this entity */ private final Map bulkErrorResults; - private final List bulkSuccessfulResults; + private final List bulkSuccessfulResults; private final String message = "Bulk Request has been processed."; public BulkResponse() { @@ -35,7 +35,7 @@ public BulkResponse() { this.bulkErrorResults = new HashMap<>(); } - public List getBulkSuccessfulResults() { + public List getBulkSuccessfulResults() { return bulkSuccessfulResults; } @@ -43,8 +43,8 @@ public Map getBulkErrorResults() { return bulkErrorResults; } - public void appendSuccessResponse(String id) { - bulkSuccessfulResults.add(id); + public void appendSuccessResponse(T result) { + bulkSuccessfulResults.add(result); } public void appendFailedResponse(String id, String errorMessage) { From b29c947ef76e86da9e93d953a9c19537e5525e38 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 18 Nov 2024 13:07:43 +0530 Subject: [PATCH 2/8] code changes --- .../conductor/client/http/WorkflowClient.java | 2 +- .../conductor/common/model/BulkResponse.java | 3 +- .../conductor/client/http/WorkflowClient.java | 2 +- .../client/http/OrkesWorkflowClient.java | 10 +++---- .../client/http/WorkflowBulkResource.java | 2 +- .../conductor/service/MetadataService.java | 2 +- .../service/MetadataServiceImpl.java | 4 +-- .../service/WorkflowBulkService.java | 14 +++++----- .../service/WorkflowBulkServiceImpl.java | 28 +++++++++---------- .../service/MetadataServiceTest.java | 4 +-- .../rest/controllers/MetadataResource.java | 2 +- .../controllers/WorkflowBulkResource.java | 14 +++++----- 12 files changed, 43 insertions(+), 44 deletions(-) diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index a16ae421f..d3fb4bead 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -256,7 +256,7 @@ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { * @param reason the reason to be logged and displayed * @return the {@link BulkResponse} contains bulkErrorResults and bulkSuccessfulResults */ - public BulkResponse terminateWorkflows(List workflowIds, String reason) { + public BulkResponse terminateWorkflows(List workflowIds, String reason) { Validate.isTrue(!workflowIds.isEmpty(), "workflow id cannot be blank"); return postForEntity( "workflow/bulk/terminate", diff --git a/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java b/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java index 6d4d16c3e..a72d2a9f4 100644 --- a/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java +++ b/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java @@ -56,10 +56,9 @@ public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof BulkResponse)) { + if (!(o instanceof BulkResponse that)) { return false; } - BulkResponse that = (BulkResponse) o; return Objects.equals(bulkSuccessfulResults, that.bulkSuccessfulResults) && Objects.equals(bulkErrorResults, that.bulkErrorResults); } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index bb5ae9048..63654e4b1 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -255,7 +255,7 @@ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { * @param reason the reason to be logged and displayed * @return the {@link BulkResponse} contains bulkErrorResults and bulkSuccessfulResults */ - public BulkResponse terminateWorkflows(List workflowIds, String reason) { + public BulkResponse terminateWorkflows(List workflowIds, String reason) { Validate.isTrue(!workflowIds.isEmpty(), "workflow id cannot be blank"); ConductorClientRequest request = ConductorClientRequest.builder() diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index 0b47c6ff3..acdef219e 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -120,23 +120,23 @@ public void terminateWorkflowWithFailure(String workflowId, String reason, boole workflowResource.terminateWithAReason(workflowId, reason, triggerFailureWorkflow); } - public BulkResponse pauseWorkflow(List workflowIds) { + public BulkResponse pauseWorkflow(List workflowIds) { return bulkResource.pauseWorkflows(workflowIds); } - public BulkResponse restartWorkflow(List workflowIds, Boolean useLatestDefinitions) { + public BulkResponse restartWorkflow(List workflowIds, Boolean useLatestDefinitions) { return bulkResource.restartWorkflows(workflowIds, useLatestDefinitions); } - public BulkResponse resumeWorkflow(List workflowIds) { + public BulkResponse resumeWorkflow(List workflowIds) { return bulkResource.resumeWorkflows(workflowIds); } - public BulkResponse retryWorkflow(List workflowIds) { + public BulkResponse retryWorkflow(List workflowIds) { return bulkResource.retryWorkflows(workflowIds); } - public BulkResponse terminateWorkflowsWithFailure(List workflowIds, String reason, boolean triggerFailureWorkflow) { + public BulkResponse terminateWorkflowsWithFailure(List workflowIds, String reason, boolean triggerFailureWorkflow) { return bulkResource.terminateWorkflows(workflowIds, reason, triggerFailureWorkflow); } diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java index 31db8c05a..d00c636a6 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java @@ -84,7 +84,7 @@ BulkResponse retryWorkflows(List workflowIds) { return resp.getData(); } - public BulkResponse terminateWorkflows(List workflowIds, String reason, boolean triggerFailureWorkflow) { + public BulkResponse terminateWorkflows(List workflowIds, String reason, boolean triggerFailureWorkflow) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) .path("/workflow/bulk/terminate") diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataService.java b/core/src/main/java/com/netflix/conductor/service/MetadataService.java index 5edd42ef5..413066689 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataService.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataService.java @@ -69,7 +69,7 @@ void registerTaskDef( /** * @param workflowDefList Workflow definitions to be updated. */ - BulkResponse updateWorkflowDef( + BulkResponse updateWorkflowDef( @NotNull(message = "WorkflowDef list name cannot be null or empty") @Size(min = 1, message = "WorkflowDefList is empty") List<@NotNull(message = "WorkflowDef cannot be null") @Valid WorkflowDef> diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java index 1399e9f08..2e9af4acb 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java @@ -124,8 +124,8 @@ public void updateWorkflowDef(WorkflowDef workflowDef) { /** * @param workflowDefList Workflow definitions to be updated. */ - public BulkResponse updateWorkflowDef(List workflowDefList) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse updateWorkflowDef(List workflowDefList) { + BulkResponse bulkResponse = new BulkResponse<>(); for (WorkflowDef workflowDef : workflowDefList) { try { updateWorkflowDef(workflowDef); diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java index ca240b1a9..8ea185be7 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java @@ -26,7 +26,7 @@ public interface WorkflowBulkService { int MAX_REQUEST_ITEMS = 1000; - BulkResponse pauseWorkflow( + BulkResponse pauseWorkflow( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -34,7 +34,7 @@ BulkResponse pauseWorkflow( "Cannot process more than {max} workflows. Please use multiple requests.") List workflowIds); - BulkResponse resumeWorkflow( + BulkResponse resumeWorkflow( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -42,7 +42,7 @@ BulkResponse resumeWorkflow( "Cannot process more than {max} workflows. Please use multiple requests.") List workflowIds); - BulkResponse restart( + BulkResponse restart( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -51,7 +51,7 @@ BulkResponse restart( List workflowIds, boolean useLatestDefinitions); - BulkResponse retry( + BulkResponse retry( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -59,7 +59,7 @@ BulkResponse retry( "Cannot process more than {max} workflows. Please use multiple requests.") List workflowIds); - BulkResponse terminate( + BulkResponse terminate( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -68,7 +68,7 @@ BulkResponse terminate( List workflowIds, String reason); - BulkResponse deleteWorkflow( + BulkResponse deleteWorkflow( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -77,7 +77,7 @@ BulkResponse deleteWorkflow( List workflowIds, boolean archiveWorkflow); - BulkResponse terminateRemove( + BulkResponse terminateRemove( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java index fcbdbe3bc..30e52e3b1 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java @@ -45,9 +45,9 @@ public WorkflowBulkServiceImpl( * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse pauseWorkflow(List workflowIds) { + public BulkResponse pauseWorkflow(List workflowIds) { - BulkResponse bulkResponse = new BulkResponse(); + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.pauseWorkflow(workflowId); @@ -72,8 +72,8 @@ public BulkResponse pauseWorkflow(List workflowIds) { * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse resumeWorkflow(List workflowIds) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse resumeWorkflow(List workflowIds) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.resumeWorkflow(workflowId); @@ -98,8 +98,8 @@ public BulkResponse resumeWorkflow(List workflowIds) { * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse restart(List workflowIds, boolean useLatestDefinitions) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse restart(List workflowIds, boolean useLatestDefinitions) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.restart(workflowId, useLatestDefinitions); @@ -123,8 +123,8 @@ public BulkResponse restart(List workflowIds, boolean useLatestDefinitio * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse retry(List workflowIds) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse retry(List workflowIds) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.retry(workflowId, false); @@ -150,8 +150,8 @@ public BulkResponse retry(List workflowIds) { * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse terminate(List workflowIds, String reason) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse terminate(List workflowIds, String reason) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.terminateWorkflow(workflowId, reason); @@ -174,8 +174,8 @@ public BulkResponse terminate(List workflowIds, String reason) { * @param workflowIds List of WorkflowIDs of the workflows you want to remove from system. * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. */ - public BulkResponse deleteWorkflow(List workflowIds, boolean archiveWorkflow) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse deleteWorkflow(List workflowIds, boolean archiveWorkflow) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowService.deleteWorkflow( @@ -203,9 +203,9 @@ public BulkResponse deleteWorkflow(List workflowIds, boolean archiveWork * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse terminateRemove( + public BulkResponse terminateRemove( List workflowIds, String reason, boolean archiveWorkflow) { - BulkResponse bulkResponse = new BulkResponse(); + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.terminateWorkflow(workflowId, reason); diff --git a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java index 255703be5..3f4c028c1 100644 --- a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java @@ -336,7 +336,7 @@ public void testUpdateWorkflowDefWithCaseExpression() { tasks.add(workflowTask); workflowDef.setTasks(tasks); - BulkResponse bulkResponse = + BulkResponse bulkResponse = metadataService.updateWorkflowDef(Collections.singletonList(workflowDef)); } @@ -366,7 +366,7 @@ public void testUpdateWorkflowDefWithJavscriptEvaluator() { tasks.add(workflowTask); workflowDef.setTasks(tasks); - BulkResponse bulkResponse = + BulkResponse bulkResponse = metadataService.updateWorkflowDef(Collections.singletonList(workflowDef)); } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java index 5ffda1ed4..234351883 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java @@ -59,7 +59,7 @@ public void validate(@RequestBody WorkflowDef workflowDef) { @PutMapping("/workflow") @Operation(summary = "Create or update workflow definition") - public BulkResponse update(@RequestBody List workflowDefs) { + public BulkResponse update(@RequestBody List workflowDefs) { return metadataService.updateWorkflowDef(workflowDefs); } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java index 9005b9343..b9f36679d 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java @@ -49,7 +49,7 @@ public WorkflowBulkResource(WorkflowBulkService workflowBulkService) { */ @PutMapping("/pause") @Operation(summary = "Pause the list of workflows") - public BulkResponse pauseWorkflow(@RequestBody List workflowIds) { + public BulkResponse pauseWorkflow(@RequestBody List workflowIds) { return workflowBulkService.pauseWorkflow(workflowIds); } @@ -62,7 +62,7 @@ public BulkResponse pauseWorkflow(@RequestBody List workflowIds) { */ @PutMapping("/resume") @Operation(summary = "Resume the list of workflows") - public BulkResponse resumeWorkflow(@RequestBody List workflowIds) { + public BulkResponse resumeWorkflow(@RequestBody List workflowIds) { return workflowBulkService.resumeWorkflow(workflowIds); } @@ -76,7 +76,7 @@ public BulkResponse resumeWorkflow(@RequestBody List workflowIds) { */ @PostMapping("/restart") @Operation(summary = "Restart the list of completed workflow") - public BulkResponse restart( + public BulkResponse restart( @RequestBody List workflowIds, @RequestParam(value = "useLatestDefinitions", defaultValue = "false", required = false) boolean useLatestDefinitions) { @@ -92,7 +92,7 @@ public BulkResponse restart( */ @PostMapping("/retry") @Operation(summary = "Retry the last failed task for each workflow from the list") - public BulkResponse retry(@RequestBody List workflowIds) { + public BulkResponse retry(@RequestBody List workflowIds) { return workflowBulkService.retry(workflowIds); } @@ -107,7 +107,7 @@ public BulkResponse retry(@RequestBody List workflowIds) { */ @PostMapping("/terminate") @Operation(summary = "Terminate workflows execution") - public BulkResponse terminate( + public BulkResponse terminate( @RequestBody List workflowIds, @RequestParam(value = "reason", required = false) String reason) { return workflowBulkService.terminate(workflowIds, reason); @@ -120,7 +120,7 @@ public BulkResponse terminate( * @return bulk reponse object containing a list of successfully deleted workflows */ @DeleteMapping("/remove") - public BulkResponse deleteWorkflow( + public BulkResponse deleteWorkflow( @RequestBody List workflowIds, @RequestParam(value = "archiveWorkflow", defaultValue = "true", required = false) boolean archiveWorkflow) { @@ -134,7 +134,7 @@ public BulkResponse deleteWorkflow( * @return bulk response object containing a list of successfully deleted workflows */ @DeleteMapping("/terminate-remove") - public BulkResponse terminateRemove( + public BulkResponse terminateRemove( @RequestBody List workflowIds, @RequestParam(value = "archiveWorkflow", defaultValue = "true", required = false) boolean archiveWorkflow, From d0967c1e293a81c99297a251527fd2345fcb10ce Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 18 Nov 2024 14:37:19 +0530 Subject: [PATCH 3/8] code changes for terminateWorkflows --- .../java/com/netflix/conductor/client/http/WorkflowClient.java | 2 +- .../java/com/netflix/conductor/common/model/BulkResponse.java | 2 ++ .../java/com/netflix/conductor/client/http/WorkflowClient.java | 2 +- 3 files changed, 4 insertions(+), 2 deletions(-) diff --git a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index d3fb4bead..a16ae421f 100644 --- a/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -256,7 +256,7 @@ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { * @param reason the reason to be logged and displayed * @return the {@link BulkResponse} contains bulkErrorResults and bulkSuccessfulResults */ - public BulkResponse terminateWorkflows(List workflowIds, String reason) { + public BulkResponse terminateWorkflows(List workflowIds, String reason) { Validate.isTrue(!workflowIds.isEmpty(), "workflow id cannot be blank"); return postForEntity( "workflow/bulk/terminate", diff --git a/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java b/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java index a72d2a9f4..ff35ea5df 100644 --- a/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java +++ b/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java @@ -21,6 +21,8 @@ /** * Response object to return a list of succeeded entities and a map of failed ones, including error * message, for the bulk request. + * + * @param the type of entities included in the successful results */ public class BulkResponse { diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java index 63654e4b1..bb5ae9048 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/client/http/WorkflowClient.java @@ -255,7 +255,7 @@ public void deleteWorkflow(String workflowId, boolean archiveWorkflow) { * @param reason the reason to be logged and displayed * @return the {@link BulkResponse} contains bulkErrorResults and bulkSuccessfulResults */ - public BulkResponse terminateWorkflows(List workflowIds, String reason) { + public BulkResponse terminateWorkflows(List workflowIds, String reason) { Validate.isTrue(!workflowIds.isEmpty(), "workflow id cannot be blank"); ConductorClientRequest request = ConductorClientRequest.builder() From 9906b1557db4e7fb0e3d3bc6bf23ef1ba5755cd1 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 18 Nov 2024 14:46:54 +0530 Subject: [PATCH 4/8] code changes for OrkesWorkflowClient --- .../conductor/client/http/OrkesWorkflowClient.java | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java index acdef219e..0b47c6ff3 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/OrkesWorkflowClient.java @@ -120,23 +120,23 @@ public void terminateWorkflowWithFailure(String workflowId, String reason, boole workflowResource.terminateWithAReason(workflowId, reason, triggerFailureWorkflow); } - public BulkResponse pauseWorkflow(List workflowIds) { + public BulkResponse pauseWorkflow(List workflowIds) { return bulkResource.pauseWorkflows(workflowIds); } - public BulkResponse restartWorkflow(List workflowIds, Boolean useLatestDefinitions) { + public BulkResponse restartWorkflow(List workflowIds, Boolean useLatestDefinitions) { return bulkResource.restartWorkflows(workflowIds, useLatestDefinitions); } - public BulkResponse resumeWorkflow(List workflowIds) { + public BulkResponse resumeWorkflow(List workflowIds) { return bulkResource.resumeWorkflows(workflowIds); } - public BulkResponse retryWorkflow(List workflowIds) { + public BulkResponse retryWorkflow(List workflowIds) { return bulkResource.retryWorkflows(workflowIds); } - public BulkResponse terminateWorkflowsWithFailure(List workflowIds, String reason, boolean triggerFailureWorkflow) { + public BulkResponse terminateWorkflowsWithFailure(List workflowIds, String reason, boolean triggerFailureWorkflow) { return bulkResource.terminateWorkflows(workflowIds, reason, triggerFailureWorkflow); } From f878d5c0dbbdb0a9b163e4a1b56c373d2d183cd9 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 18 Nov 2024 14:53:54 +0530 Subject: [PATCH 5/8] code changes for WorkflowBulkResource --- .../orkes/conductor/client/http/WorkflowBulkResource.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java index d00c636a6..56c3e92a5 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java @@ -31,7 +31,7 @@ class WorkflowBulkResource { this.client = client; } - BulkResponse pauseWorkflows(List workflowIds) { + BulkResponse pauseWorkflows(List workflowIds) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.PUT) .path("/workflow/bulk/pause") @@ -44,7 +44,7 @@ BulkResponse pauseWorkflows(List workflowIds) { return resp.getData(); } - BulkResponse restartWorkflows(List workflowIds, Boolean useLatestDefinitions) { + BulkResponse restartWorkflows(List workflowIds, Boolean useLatestDefinitions) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) .path("/workflow/bulk/restart") @@ -58,7 +58,7 @@ BulkResponse restartWorkflows(List workflowIds, Boolean useLatestDefinit return resp.getData(); } - BulkResponse resumeWorkflows(List workflowIds) { + BulkResponse resumeWorkflows(List workflowIds) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.PUT) .path("/workflow/bulk/resume") @@ -71,7 +71,7 @@ BulkResponse resumeWorkflows(List workflowIds) { return resp.getData(); } - BulkResponse retryWorkflows(List workflowIds) { + BulkResponse retryWorkflows(List workflowIds) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) .path("/workflow/bulk/retry") From c29b2b409388482feffc94622d5d451cd0be402d Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 18 Nov 2024 15:03:29 +0530 Subject: [PATCH 6/8] code changes for BulkResponse in client --- .../netflix/conductor/common/model/BulkResponse.java | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/model/BulkResponse.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/model/BulkResponse.java index 99b3f48f9..d27d9dc9e 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/model/BulkResponse.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/model/BulkResponse.java @@ -21,15 +21,17 @@ /** * Response object to return a list of succeeded entities and a map of failed ones, including error * message, for the bulk request. + * + * @param the type of entities included in the successful results */ -public class BulkResponse { +public class BulkResponse { /** * Key - entityId Value - error message processing this entity */ private final Map bulkErrorResults; - private final List bulkSuccessfulResults; + private final List bulkSuccessfulResults; private final String message = "Bulk Request has been processed."; @@ -38,7 +40,7 @@ public BulkResponse() { this.bulkErrorResults = new HashMap<>(); } - public List getBulkSuccessfulResults() { + public List getBulkSuccessfulResults() { return bulkSuccessfulResults; } @@ -46,8 +48,8 @@ public Map getBulkErrorResults() { return bulkErrorResults; } - public void appendSuccessResponse(String id) { - bulkSuccessfulResults.add(id); + public void appendSuccessResponse(T result) { + bulkSuccessfulResults.add(result); } public void appendFailedResponse(String id, String errorMessage) { From d9b623bfb74762a99976fe9f882719241de7c41b Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 18 Nov 2024 22:48:57 +0530 Subject: [PATCH 7/8] Added endpoint --- .../service/WorkflowBulkService.java | 10 ++++ .../service/WorkflowBulkServiceImpl.java | 53 ++++++++++++++----- .../controllers/WorkflowBulkResource.java | 15 ++++++ 3 files changed, 66 insertions(+), 12 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java index 8ea185be7..3be0ffe89 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java @@ -14,6 +14,7 @@ import java.util.List; +import com.netflix.conductor.model.WorkflowModel; import org.springframework.validation.annotation.Validated; import com.netflix.conductor.common.model.BulkResponse; @@ -86,4 +87,13 @@ BulkResponse terminateRemove( List workflowIds, String reason, boolean archiveWorkflow); + + BulkResponse searchWorkflow( + @NotEmpty(message = "WorkflowIds list cannot be null.") + @Size( + max = MAX_REQUEST_ITEMS, + message = + "Cannot process more than {max} workflows. Please use multiple requests.") + List workflowIds, + boolean includeTasks); } diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java index 30e52e3b1..715c77b4c 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java @@ -14,6 +14,7 @@ import java.util.List; +import com.netflix.conductor.model.WorkflowModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -43,7 +44,7 @@ public WorkflowBulkServiceImpl( * * @param workflowIds - list of workflow Ids to perform pause operation on * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse pauseWorkflow(List workflowIds) { @@ -70,7 +71,7 @@ public BulkResponse pauseWorkflow(List workflowIds) { * * @param workflowIds - list of workflow Ids to perform resume operation on * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse resumeWorkflow(List workflowIds) { BulkResponse bulkResponse = new BulkResponse<>(); @@ -93,10 +94,10 @@ public BulkResponse resumeWorkflow(List workflowIds) { /** * Restart the list of workflows. * - * @param workflowIds - list of workflow Ids to perform restart operation on + * @param workflowIds - list of workflow Ids to perform restart operation on * @param useLatestDefinitions if true, use latest workflow and task definitions upon restart * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse restart(List workflowIds, boolean useLatestDefinitions) { BulkResponse bulkResponse = new BulkResponse<>(); @@ -121,7 +122,7 @@ public BulkResponse restart(List workflowIds, boolean useLatestD * * @param workflowIds - list of workflow Ids to perform retry operation on * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse retry(List workflowIds) { BulkResponse bulkResponse = new BulkResponse<>(); @@ -145,10 +146,10 @@ public BulkResponse retry(List workflowIds) { * Terminate workflows execution. * * @param workflowIds - list of workflow Ids to perform terminate operation on - * @param reason - description to be specified for the terminated workflow for future - * references. + * @param reason - description to be specified for the terminated workflow for future + * references. * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse terminate(List workflowIds, String reason) { BulkResponse bulkResponse = new BulkResponse<>(); @@ -171,7 +172,7 @@ public BulkResponse terminate(List workflowIds, String reason) { /** * Removes a list of workflows from the system. * - * @param workflowIds List of WorkflowIDs of the workflows you want to remove from system. + * @param workflowIds List of WorkflowIDs of the workflows you want to remove from system. * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. */ public BulkResponse deleteWorkflow(List workflowIds, boolean archiveWorkflow) { @@ -197,11 +198,11 @@ public BulkResponse deleteWorkflow(List workflowIds, boolean arc /** * Terminates execution for workflows in a list, then removes each workflow. * - * @param workflowIds List of workflow IDs to terminate and delete. - * @param reason Reason for terminating the workflow. + * @param workflowIds List of workflow IDs to terminate and delete. + * @param reason Reason for terminating the workflow. * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse terminateRemove( List workflowIds, String reason, boolean archiveWorkflow) { @@ -233,4 +234,32 @@ public BulkResponse terminateRemove( } return bulkResponse; } + + /** + * Fetch workflow details for given workflowIds. + * + * @param workflowIds List of workflow IDs to terminate and delete. + * @param includeTasks includes tasks from workflow + * @return bulk response object containing a list of workflow details + */ + @Override + public BulkResponse searchWorkflow(List workflowIds, boolean includeTasks) { + BulkResponse bulkResponse = new BulkResponse<>(); + for (String workflowId : workflowIds) { + try { + WorkflowModel workflowModel = workflowExecutor.getWorkflow(workflowId, includeTasks); + bulkResponse.appendSuccessResponse(workflowModel); + } catch (Exception e) { + LOGGER.error( + "bulk search exception, workflowId {}, message: {} ", + workflowId, + e.getMessage(), + e); + bulkResponse.appendFailedResponse(workflowId, e.getMessage()); + } + } + return bulkResponse; + } + + } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java index b9f36679d..9a7da24e5 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java @@ -14,6 +14,7 @@ import java.util.List; +import com.netflix.conductor.model.WorkflowModel; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; @@ -141,4 +142,18 @@ public BulkResponse terminateRemove( @RequestParam(value = "reason", required = false) String reason) { return workflowBulkService.terminateRemove(workflowIds, reason, archiveWorkflow); } + + /** + * Search workflows for given list of workflows. + * + * @param workflowIds - list of workflow Ids to be searched + * @return bulk response object containing a list of workflows + */ + @PostMapping("/search") + public BulkResponse searchWorkflow( + @RequestBody List workflowIds, + @RequestParam(value = "includeTasks", defaultValue = "true", required = false) + boolean includeTasks) { + return workflowBulkService.searchWorkflow(workflowIds, includeTasks); + } } From 4f6427ccce8c41bd187393d375dbba7f3f46eec7 Mon Sep 17 00:00:00 2001 From: Shailesh Jagannath Padave Date: Mon, 18 Nov 2024 23:01:37 +0530 Subject: [PATCH 8/8] updated code --- .../service/WorkflowBulkService.java | 2 +- .../service/WorkflowBulkServiceImpl.java | 36 +++++++++---------- .../controllers/WorkflowBulkResource.java | 4 +-- 3 files changed, 21 insertions(+), 21 deletions(-) diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java index 3be0ffe89..a82a8d5f2 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java @@ -14,10 +14,10 @@ import java.util.List; -import com.netflix.conductor.model.WorkflowModel; import org.springframework.validation.annotation.Validated; import com.netflix.conductor.common.model.BulkResponse; +import com.netflix.conductor.model.WorkflowModel; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.Size; diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java index 715c77b4c..aadacf9a1 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java @@ -14,7 +14,6 @@ import java.util.List; -import com.netflix.conductor.model.WorkflowModel; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.stereotype.Service; @@ -23,6 +22,7 @@ import com.netflix.conductor.annotations.Trace; import com.netflix.conductor.common.model.BulkResponse; import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.model.WorkflowModel; @Audit @Trace @@ -44,7 +44,7 @@ public WorkflowBulkServiceImpl( * * @param workflowIds - list of workflow Ids to perform pause operation on * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse pauseWorkflow(List workflowIds) { @@ -71,7 +71,7 @@ public BulkResponse pauseWorkflow(List workflowIds) { * * @param workflowIds - list of workflow Ids to perform resume operation on * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse resumeWorkflow(List workflowIds) { BulkResponse bulkResponse = new BulkResponse<>(); @@ -94,10 +94,10 @@ public BulkResponse resumeWorkflow(List workflowIds) { /** * Restart the list of workflows. * - * @param workflowIds - list of workflow Ids to perform restart operation on + * @param workflowIds - list of workflow Ids to perform restart operation on * @param useLatestDefinitions if true, use latest workflow and task definitions upon restart * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse restart(List workflowIds, boolean useLatestDefinitions) { BulkResponse bulkResponse = new BulkResponse<>(); @@ -122,7 +122,7 @@ public BulkResponse restart(List workflowIds, boolean useLatestD * * @param workflowIds - list of workflow Ids to perform retry operation on * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse retry(List workflowIds) { BulkResponse bulkResponse = new BulkResponse<>(); @@ -146,10 +146,10 @@ public BulkResponse retry(List workflowIds) { * Terminate workflows execution. * * @param workflowIds - list of workflow Ids to perform terminate operation on - * @param reason - description to be specified for the terminated workflow for future - * references. + * @param reason - description to be specified for the terminated workflow for future + * references. * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse terminate(List workflowIds, String reason) { BulkResponse bulkResponse = new BulkResponse<>(); @@ -172,7 +172,7 @@ public BulkResponse terminate(List workflowIds, String reason) { /** * Removes a list of workflows from the system. * - * @param workflowIds List of WorkflowIDs of the workflows you want to remove from system. + * @param workflowIds List of WorkflowIDs of the workflows you want to remove from system. * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. */ public BulkResponse deleteWorkflow(List workflowIds, boolean archiveWorkflow) { @@ -198,11 +198,11 @@ public BulkResponse deleteWorkflow(List workflowIds, boolean arc /** * Terminates execution for workflows in a list, then removes each workflow. * - * @param workflowIds List of workflow IDs to terminate and delete. - * @param reason Reason for terminating the workflow. + * @param workflowIds List of workflow IDs to terminate and delete. + * @param reason Reason for terminating the workflow. * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. * @return bulk response object containing a list of succeeded workflows and a list of failed - * ones with errors + * ones with errors */ public BulkResponse terminateRemove( List workflowIds, String reason, boolean archiveWorkflow) { @@ -238,16 +238,18 @@ public BulkResponse terminateRemove( /** * Fetch workflow details for given workflowIds. * - * @param workflowIds List of workflow IDs to terminate and delete. + * @param workflowIds List of workflow IDs to terminate and delete. * @param includeTasks includes tasks from workflow * @return bulk response object containing a list of workflow details */ @Override - public BulkResponse searchWorkflow(List workflowIds, boolean includeTasks) { + public BulkResponse searchWorkflow( + List workflowIds, boolean includeTasks) { BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { - WorkflowModel workflowModel = workflowExecutor.getWorkflow(workflowId, includeTasks); + WorkflowModel workflowModel = + workflowExecutor.getWorkflow(workflowId, includeTasks); bulkResponse.appendSuccessResponse(workflowModel); } catch (Exception e) { LOGGER.error( @@ -260,6 +262,4 @@ public BulkResponse searchWorkflow(List workflowIds, bool } return bulkResponse; } - - } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java index 9a7da24e5..832e9d1da 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java @@ -14,7 +14,6 @@ import java.util.List; -import com.netflix.conductor.model.WorkflowModel; import org.springframework.web.bind.annotation.DeleteMapping; import org.springframework.web.bind.annotation.PostMapping; import org.springframework.web.bind.annotation.PutMapping; @@ -24,6 +23,7 @@ import org.springframework.web.bind.annotation.RestController; import com.netflix.conductor.common.model.BulkResponse; +import com.netflix.conductor.model.WorkflowModel; import com.netflix.conductor.service.WorkflowBulkService; import io.swagger.v3.oas.annotations.Operation; @@ -153,7 +153,7 @@ public BulkResponse terminateRemove( public BulkResponse searchWorkflow( @RequestBody List workflowIds, @RequestParam(value = "includeTasks", defaultValue = "true", required = false) - boolean includeTasks) { + boolean includeTasks) { return workflowBulkService.searchWorkflow(workflowIds, includeTasks); } }