diff --git a/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java b/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java index b4133b5bf..ff35ea5df 100644 --- a/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java +++ b/common/src/main/java/com/netflix/conductor/common/model/BulkResponse.java @@ -21,13 +21,15 @@ /** * Response object to return a list of succeeded entities and a map of failed ones, including error * message, for the bulk request. + * + * @param the type of entities included in the successful results */ -public class BulkResponse { +public class BulkResponse { /** Key - entityId Value - error message processing this entity */ private final Map bulkErrorResults; - private final List bulkSuccessfulResults; + private final List bulkSuccessfulResults; private final String message = "Bulk Request has been processed."; public BulkResponse() { @@ -35,7 +37,7 @@ public BulkResponse() { this.bulkErrorResults = new HashMap<>(); } - public List getBulkSuccessfulResults() { + public List getBulkSuccessfulResults() { return bulkSuccessfulResults; } @@ -43,8 +45,8 @@ public Map getBulkErrorResults() { return bulkErrorResults; } - public void appendSuccessResponse(String id) { - bulkSuccessfulResults.add(id); + public void appendSuccessResponse(T result) { + bulkSuccessfulResults.add(result); } public void appendFailedResponse(String id, String errorMessage) { @@ -56,10 +58,9 @@ public boolean equals(Object o) { if (this == o) { return true; } - if (!(o instanceof BulkResponse)) { + if (!(o instanceof BulkResponse that)) { return false; } - BulkResponse that = (BulkResponse) o; return Objects.equals(bulkSuccessfulResults, that.bulkSuccessfulResults) && Objects.equals(bulkErrorResults, that.bulkErrorResults); } diff --git a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/model/BulkResponse.java b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/model/BulkResponse.java index 99b3f48f9..d27d9dc9e 100644 --- a/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/model/BulkResponse.java +++ b/conductor-clients/java/conductor-java-sdk/conductor-client/src/main/java/com/netflix/conductor/common/model/BulkResponse.java @@ -21,15 +21,17 @@ /** * Response object to return a list of succeeded entities and a map of failed ones, including error * message, for the bulk request. + * + * @param the type of entities included in the successful results */ -public class BulkResponse { +public class BulkResponse { /** * Key - entityId Value - error message processing this entity */ private final Map bulkErrorResults; - private final List bulkSuccessfulResults; + private final List bulkSuccessfulResults; private final String message = "Bulk Request has been processed."; @@ -38,7 +40,7 @@ public BulkResponse() { this.bulkErrorResults = new HashMap<>(); } - public List getBulkSuccessfulResults() { + public List getBulkSuccessfulResults() { return bulkSuccessfulResults; } @@ -46,8 +48,8 @@ public Map getBulkErrorResults() { return bulkErrorResults; } - public void appendSuccessResponse(String id) { - bulkSuccessfulResults.add(id); + public void appendSuccessResponse(T result) { + bulkSuccessfulResults.add(result); } public void appendFailedResponse(String id, String errorMessage) { diff --git a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java index 31db8c05a..56c3e92a5 100644 --- a/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java +++ b/conductor-clients/java/conductor-java-sdk/orkes-client/src/main/java/io/orkes/conductor/client/http/WorkflowBulkResource.java @@ -31,7 +31,7 @@ class WorkflowBulkResource { this.client = client; } - BulkResponse pauseWorkflows(List workflowIds) { + BulkResponse pauseWorkflows(List workflowIds) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.PUT) .path("/workflow/bulk/pause") @@ -44,7 +44,7 @@ BulkResponse pauseWorkflows(List workflowIds) { return resp.getData(); } - BulkResponse restartWorkflows(List workflowIds, Boolean useLatestDefinitions) { + BulkResponse restartWorkflows(List workflowIds, Boolean useLatestDefinitions) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) .path("/workflow/bulk/restart") @@ -58,7 +58,7 @@ BulkResponse restartWorkflows(List workflowIds, Boolean useLatestDefinit return resp.getData(); } - BulkResponse resumeWorkflows(List workflowIds) { + BulkResponse resumeWorkflows(List workflowIds) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.PUT) .path("/workflow/bulk/resume") @@ -71,7 +71,7 @@ BulkResponse resumeWorkflows(List workflowIds) { return resp.getData(); } - BulkResponse retryWorkflows(List workflowIds) { + BulkResponse retryWorkflows(List workflowIds) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) .path("/workflow/bulk/retry") @@ -84,7 +84,7 @@ BulkResponse retryWorkflows(List workflowIds) { return resp.getData(); } - public BulkResponse terminateWorkflows(List workflowIds, String reason, boolean triggerFailureWorkflow) { + public BulkResponse terminateWorkflows(List workflowIds, String reason, boolean triggerFailureWorkflow) { ConductorClientRequest request = ConductorClientRequest.builder() .method(Method.POST) .path("/workflow/bulk/terminate") diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataService.java b/core/src/main/java/com/netflix/conductor/service/MetadataService.java index 5edd42ef5..413066689 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataService.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataService.java @@ -69,7 +69,7 @@ void registerTaskDef( /** * @param workflowDefList Workflow definitions to be updated. */ - BulkResponse updateWorkflowDef( + BulkResponse updateWorkflowDef( @NotNull(message = "WorkflowDef list name cannot be null or empty") @Size(min = 1, message = "WorkflowDefList is empty") List<@NotNull(message = "WorkflowDef cannot be null") @Valid WorkflowDef> diff --git a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java index 1399e9f08..2e9af4acb 100644 --- a/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/MetadataServiceImpl.java @@ -124,8 +124,8 @@ public void updateWorkflowDef(WorkflowDef workflowDef) { /** * @param workflowDefList Workflow definitions to be updated. */ - public BulkResponse updateWorkflowDef(List workflowDefList) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse updateWorkflowDef(List workflowDefList) { + BulkResponse bulkResponse = new BulkResponse<>(); for (WorkflowDef workflowDef : workflowDefList) { try { updateWorkflowDef(workflowDef); diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java index ca240b1a9..a82a8d5f2 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkService.java @@ -17,6 +17,7 @@ import org.springframework.validation.annotation.Validated; import com.netflix.conductor.common.model.BulkResponse; +import com.netflix.conductor.model.WorkflowModel; import jakarta.validation.constraints.NotEmpty; import jakarta.validation.constraints.Size; @@ -26,7 +27,7 @@ public interface WorkflowBulkService { int MAX_REQUEST_ITEMS = 1000; - BulkResponse pauseWorkflow( + BulkResponse pauseWorkflow( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -34,7 +35,7 @@ BulkResponse pauseWorkflow( "Cannot process more than {max} workflows. Please use multiple requests.") List workflowIds); - BulkResponse resumeWorkflow( + BulkResponse resumeWorkflow( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -42,7 +43,7 @@ BulkResponse resumeWorkflow( "Cannot process more than {max} workflows. Please use multiple requests.") List workflowIds); - BulkResponse restart( + BulkResponse restart( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -51,7 +52,7 @@ BulkResponse restart( List workflowIds, boolean useLatestDefinitions); - BulkResponse retry( + BulkResponse retry( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -59,7 +60,7 @@ BulkResponse retry( "Cannot process more than {max} workflows. Please use multiple requests.") List workflowIds); - BulkResponse terminate( + BulkResponse terminate( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -68,7 +69,7 @@ BulkResponse terminate( List workflowIds, String reason); - BulkResponse deleteWorkflow( + BulkResponse deleteWorkflow( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -77,7 +78,7 @@ BulkResponse deleteWorkflow( List workflowIds, boolean archiveWorkflow); - BulkResponse terminateRemove( + BulkResponse terminateRemove( @NotEmpty(message = "WorkflowIds list cannot be null.") @Size( max = MAX_REQUEST_ITEMS, @@ -86,4 +87,13 @@ BulkResponse terminateRemove( List workflowIds, String reason, boolean archiveWorkflow); + + BulkResponse searchWorkflow( + @NotEmpty(message = "WorkflowIds list cannot be null.") + @Size( + max = MAX_REQUEST_ITEMS, + message = + "Cannot process more than {max} workflows. Please use multiple requests.") + List workflowIds, + boolean includeTasks); } diff --git a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java index fcbdbe3bc..aadacf9a1 100644 --- a/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java +++ b/core/src/main/java/com/netflix/conductor/service/WorkflowBulkServiceImpl.java @@ -22,6 +22,7 @@ import com.netflix.conductor.annotations.Trace; import com.netflix.conductor.common.model.BulkResponse; import com.netflix.conductor.core.execution.WorkflowExecutor; +import com.netflix.conductor.model.WorkflowModel; @Audit @Trace @@ -45,9 +46,9 @@ public WorkflowBulkServiceImpl( * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse pauseWorkflow(List workflowIds) { + public BulkResponse pauseWorkflow(List workflowIds) { - BulkResponse bulkResponse = new BulkResponse(); + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.pauseWorkflow(workflowId); @@ -72,8 +73,8 @@ public BulkResponse pauseWorkflow(List workflowIds) { * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse resumeWorkflow(List workflowIds) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse resumeWorkflow(List workflowIds) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.resumeWorkflow(workflowId); @@ -98,8 +99,8 @@ public BulkResponse resumeWorkflow(List workflowIds) { * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse restart(List workflowIds, boolean useLatestDefinitions) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse restart(List workflowIds, boolean useLatestDefinitions) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.restart(workflowId, useLatestDefinitions); @@ -123,8 +124,8 @@ public BulkResponse restart(List workflowIds, boolean useLatestDefinitio * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse retry(List workflowIds) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse retry(List workflowIds) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.retry(workflowId, false); @@ -150,8 +151,8 @@ public BulkResponse retry(List workflowIds) { * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse terminate(List workflowIds, String reason) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse terminate(List workflowIds, String reason) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.terminateWorkflow(workflowId, reason); @@ -174,8 +175,8 @@ public BulkResponse terminate(List workflowIds, String reason) { * @param workflowIds List of WorkflowIDs of the workflows you want to remove from system. * @param archiveWorkflow Archives the workflow and associated tasks instead of removing them. */ - public BulkResponse deleteWorkflow(List workflowIds, boolean archiveWorkflow) { - BulkResponse bulkResponse = new BulkResponse(); + public BulkResponse deleteWorkflow(List workflowIds, boolean archiveWorkflow) { + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowService.deleteWorkflow( @@ -203,9 +204,9 @@ public BulkResponse deleteWorkflow(List workflowIds, boolean archiveWork * @return bulk response object containing a list of succeeded workflows and a list of failed * ones with errors */ - public BulkResponse terminateRemove( + public BulkResponse terminateRemove( List workflowIds, String reason, boolean archiveWorkflow) { - BulkResponse bulkResponse = new BulkResponse(); + BulkResponse bulkResponse = new BulkResponse<>(); for (String workflowId : workflowIds) { try { workflowExecutor.terminateWorkflow(workflowId, reason); @@ -233,4 +234,32 @@ public BulkResponse terminateRemove( } return bulkResponse; } + + /** + * Fetch workflow details for given workflowIds. + * + * @param workflowIds List of workflow IDs to terminate and delete. + * @param includeTasks includes tasks from workflow + * @return bulk response object containing a list of workflow details + */ + @Override + public BulkResponse searchWorkflow( + List workflowIds, boolean includeTasks) { + BulkResponse bulkResponse = new BulkResponse<>(); + for (String workflowId : workflowIds) { + try { + WorkflowModel workflowModel = + workflowExecutor.getWorkflow(workflowId, includeTasks); + bulkResponse.appendSuccessResponse(workflowModel); + } catch (Exception e) { + LOGGER.error( + "bulk search exception, workflowId {}, message: {} ", + workflowId, + e.getMessage(), + e); + bulkResponse.appendFailedResponse(workflowId, e.getMessage()); + } + } + return bulkResponse; + } } diff --git a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java index 255703be5..3f4c028c1 100644 --- a/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java +++ b/core/src/test/java/com/netflix/conductor/service/MetadataServiceTest.java @@ -336,7 +336,7 @@ public void testUpdateWorkflowDefWithCaseExpression() { tasks.add(workflowTask); workflowDef.setTasks(tasks); - BulkResponse bulkResponse = + BulkResponse bulkResponse = metadataService.updateWorkflowDef(Collections.singletonList(workflowDef)); } @@ -366,7 +366,7 @@ public void testUpdateWorkflowDefWithJavscriptEvaluator() { tasks.add(workflowTask); workflowDef.setTasks(tasks); - BulkResponse bulkResponse = + BulkResponse bulkResponse = metadataService.updateWorkflowDef(Collections.singletonList(workflowDef)); } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java index 5ffda1ed4..234351883 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/MetadataResource.java @@ -59,7 +59,7 @@ public void validate(@RequestBody WorkflowDef workflowDef) { @PutMapping("/workflow") @Operation(summary = "Create or update workflow definition") - public BulkResponse update(@RequestBody List workflowDefs) { + public BulkResponse update(@RequestBody List workflowDefs) { return metadataService.updateWorkflowDef(workflowDefs); } diff --git a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java index 9005b9343..832e9d1da 100644 --- a/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java +++ b/rest/src/main/java/com/netflix/conductor/rest/controllers/WorkflowBulkResource.java @@ -23,6 +23,7 @@ import org.springframework.web.bind.annotation.RestController; import com.netflix.conductor.common.model.BulkResponse; +import com.netflix.conductor.model.WorkflowModel; import com.netflix.conductor.service.WorkflowBulkService; import io.swagger.v3.oas.annotations.Operation; @@ -49,7 +50,7 @@ public WorkflowBulkResource(WorkflowBulkService workflowBulkService) { */ @PutMapping("/pause") @Operation(summary = "Pause the list of workflows") - public BulkResponse pauseWorkflow(@RequestBody List workflowIds) { + public BulkResponse pauseWorkflow(@RequestBody List workflowIds) { return workflowBulkService.pauseWorkflow(workflowIds); } @@ -62,7 +63,7 @@ public BulkResponse pauseWorkflow(@RequestBody List workflowIds) { */ @PutMapping("/resume") @Operation(summary = "Resume the list of workflows") - public BulkResponse resumeWorkflow(@RequestBody List workflowIds) { + public BulkResponse resumeWorkflow(@RequestBody List workflowIds) { return workflowBulkService.resumeWorkflow(workflowIds); } @@ -76,7 +77,7 @@ public BulkResponse resumeWorkflow(@RequestBody List workflowIds) { */ @PostMapping("/restart") @Operation(summary = "Restart the list of completed workflow") - public BulkResponse restart( + public BulkResponse restart( @RequestBody List workflowIds, @RequestParam(value = "useLatestDefinitions", defaultValue = "false", required = false) boolean useLatestDefinitions) { @@ -92,7 +93,7 @@ public BulkResponse restart( */ @PostMapping("/retry") @Operation(summary = "Retry the last failed task for each workflow from the list") - public BulkResponse retry(@RequestBody List workflowIds) { + public BulkResponse retry(@RequestBody List workflowIds) { return workflowBulkService.retry(workflowIds); } @@ -107,7 +108,7 @@ public BulkResponse retry(@RequestBody List workflowIds) { */ @PostMapping("/terminate") @Operation(summary = "Terminate workflows execution") - public BulkResponse terminate( + public BulkResponse terminate( @RequestBody List workflowIds, @RequestParam(value = "reason", required = false) String reason) { return workflowBulkService.terminate(workflowIds, reason); @@ -120,7 +121,7 @@ public BulkResponse terminate( * @return bulk reponse object containing a list of successfully deleted workflows */ @DeleteMapping("/remove") - public BulkResponse deleteWorkflow( + public BulkResponse deleteWorkflow( @RequestBody List workflowIds, @RequestParam(value = "archiveWorkflow", defaultValue = "true", required = false) boolean archiveWorkflow) { @@ -134,11 +135,25 @@ public BulkResponse deleteWorkflow( * @return bulk response object containing a list of successfully deleted workflows */ @DeleteMapping("/terminate-remove") - public BulkResponse terminateRemove( + public BulkResponse terminateRemove( @RequestBody List workflowIds, @RequestParam(value = "archiveWorkflow", defaultValue = "true", required = false) boolean archiveWorkflow, @RequestParam(value = "reason", required = false) String reason) { return workflowBulkService.terminateRemove(workflowIds, reason, archiveWorkflow); } + + /** + * Search workflows for given list of workflows. + * + * @param workflowIds - list of workflow Ids to be searched + * @return bulk response object containing a list of workflows + */ + @PostMapping("/search") + public BulkResponse searchWorkflow( + @RequestBody List workflowIds, + @RequestParam(value = "includeTasks", defaultValue = "true", required = false) + boolean includeTasks) { + return workflowBulkService.searchWorkflow(workflowIds, includeTasks); + } }