Skip to content

Commit

Permalink
Merge pull request #311 from shaileshpadave/BulkResponseParameterized
Browse files Browse the repository at this point in the history
Make BulkResponse parametrised
  • Loading branch information
v1r3n authored Nov 18, 2024
2 parents e5021ae + 4f6427c commit c83ce9b
Show file tree
Hide file tree
Showing 10 changed files with 108 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,30 +21,32 @@
/**
* Response object to return a list of succeeded entities and a map of failed ones, including error
* message, for the bulk request.
*
* @param <T> the type of entities included in the successful results
*/
public class BulkResponse {
public class BulkResponse<T> {

/** Key - entityId Value - error message processing this entity */
private final Map<String, String> bulkErrorResults;

private final List<String> bulkSuccessfulResults;
private final List<T> bulkSuccessfulResults;
private final String message = "Bulk Request has been processed.";

public BulkResponse() {
this.bulkSuccessfulResults = new ArrayList<>();
this.bulkErrorResults = new HashMap<>();
}

public List<String> getBulkSuccessfulResults() {
public List<T> getBulkSuccessfulResults() {
return bulkSuccessfulResults;
}

public Map<String, String> getBulkErrorResults() {
return bulkErrorResults;
}

public void appendSuccessResponse(String id) {
bulkSuccessfulResults.add(id);
public void appendSuccessResponse(T result) {
bulkSuccessfulResults.add(result);
}

public void appendFailedResponse(String id, String errorMessage) {
Expand All @@ -56,10 +58,9 @@ public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof BulkResponse)) {
if (!(o instanceof BulkResponse that)) {
return false;
}
BulkResponse that = (BulkResponse) o;
return Objects.equals(bulkSuccessfulResults, that.bulkSuccessfulResults)
&& Objects.equals(bulkErrorResults, that.bulkErrorResults);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,17 @@
/**
* Response object to return a list of succeeded entities and a map of failed ones, including error
* message, for the bulk request.
*
* @param <T> the type of entities included in the successful results
*/
public class BulkResponse {
public class BulkResponse<T> {

/**
* Key - entityId Value - error message processing this entity
*/
private final Map<String, String> bulkErrorResults;

private final List<String> bulkSuccessfulResults;
private final List<T> bulkSuccessfulResults;

private final String message = "Bulk Request has been processed.";

Expand All @@ -38,16 +40,16 @@ public BulkResponse() {
this.bulkErrorResults = new HashMap<>();
}

public List<String> getBulkSuccessfulResults() {
public List<T> getBulkSuccessfulResults() {
return bulkSuccessfulResults;
}

public Map<String, String> getBulkErrorResults() {
return bulkErrorResults;
}

public void appendSuccessResponse(String id) {
bulkSuccessfulResults.add(id);
public void appendSuccessResponse(T result) {
bulkSuccessfulResults.add(result);
}

public void appendFailedResponse(String id, String errorMessage) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@ class WorkflowBulkResource {
this.client = client;
}

BulkResponse pauseWorkflows(List<String> workflowIds) {
BulkResponse<String> pauseWorkflows(List<String> workflowIds) {
ConductorClientRequest request = ConductorClientRequest.builder()
.method(Method.PUT)
.path("/workflow/bulk/pause")
Expand All @@ -44,7 +44,7 @@ BulkResponse pauseWorkflows(List<String> workflowIds) {
return resp.getData();
}

BulkResponse restartWorkflows(List<String> workflowIds, Boolean useLatestDefinitions) {
BulkResponse<String> restartWorkflows(List<String> workflowIds, Boolean useLatestDefinitions) {
ConductorClientRequest request = ConductorClientRequest.builder()
.method(Method.POST)
.path("/workflow/bulk/restart")
Expand All @@ -58,7 +58,7 @@ BulkResponse restartWorkflows(List<String> workflowIds, Boolean useLatestDefinit
return resp.getData();
}

BulkResponse resumeWorkflows(List<String> workflowIds) {
BulkResponse<String> resumeWorkflows(List<String> workflowIds) {
ConductorClientRequest request = ConductorClientRequest.builder()
.method(Method.PUT)
.path("/workflow/bulk/resume")
Expand All @@ -71,7 +71,7 @@ BulkResponse resumeWorkflows(List<String> workflowIds) {
return resp.getData();
}

BulkResponse retryWorkflows(List<String> workflowIds) {
BulkResponse<String> retryWorkflows(List<String> workflowIds) {
ConductorClientRequest request = ConductorClientRequest.builder()
.method(Method.POST)
.path("/workflow/bulk/retry")
Expand All @@ -84,7 +84,7 @@ BulkResponse retryWorkflows(List<String> workflowIds) {
return resp.getData();
}

public BulkResponse terminateWorkflows(List<String> workflowIds, String reason, boolean triggerFailureWorkflow) {
public BulkResponse<String> terminateWorkflows(List<String> workflowIds, String reason, boolean triggerFailureWorkflow) {
ConductorClientRequest request = ConductorClientRequest.builder()
.method(Method.POST)
.path("/workflow/bulk/terminate")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ void registerTaskDef(
/**
* @param workflowDefList Workflow definitions to be updated.
*/
BulkResponse updateWorkflowDef(
BulkResponse<String> updateWorkflowDef(
@NotNull(message = "WorkflowDef list name cannot be null or empty")
@Size(min = 1, message = "WorkflowDefList is empty")
List<@NotNull(message = "WorkflowDef cannot be null") @Valid WorkflowDef>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,8 @@ public void updateWorkflowDef(WorkflowDef workflowDef) {
/**
* @param workflowDefList Workflow definitions to be updated.
*/
public BulkResponse updateWorkflowDef(List<WorkflowDef> workflowDefList) {
BulkResponse bulkResponse = new BulkResponse();
public BulkResponse<String> updateWorkflowDef(List<WorkflowDef> workflowDefList) {
BulkResponse<String> bulkResponse = new BulkResponse<>();
for (WorkflowDef workflowDef : workflowDefList) {
try {
updateWorkflowDef(workflowDef);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.springframework.validation.annotation.Validated;

import com.netflix.conductor.common.model.BulkResponse;
import com.netflix.conductor.model.WorkflowModel;

import jakarta.validation.constraints.NotEmpty;
import jakarta.validation.constraints.Size;
Expand All @@ -26,23 +27,23 @@ public interface WorkflowBulkService {

int MAX_REQUEST_ITEMS = 1000;

BulkResponse pauseWorkflow(
BulkResponse<String> pauseWorkflow(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
message =
"Cannot process more than {max} workflows. Please use multiple requests.")
List<String> workflowIds);

BulkResponse resumeWorkflow(
BulkResponse<String> resumeWorkflow(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
message =
"Cannot process more than {max} workflows. Please use multiple requests.")
List<String> workflowIds);

BulkResponse restart(
BulkResponse<String> restart(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
Expand All @@ -51,15 +52,15 @@ BulkResponse restart(
List<String> workflowIds,
boolean useLatestDefinitions);

BulkResponse retry(
BulkResponse<String> retry(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
message =
"Cannot process more than {max} workflows. Please use multiple requests.")
List<String> workflowIds);

BulkResponse terminate(
BulkResponse<String> terminate(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
Expand All @@ -68,7 +69,7 @@ BulkResponse terminate(
List<String> workflowIds,
String reason);

BulkResponse deleteWorkflow(
BulkResponse<String> deleteWorkflow(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
Expand All @@ -77,7 +78,7 @@ BulkResponse deleteWorkflow(
List<String> workflowIds,
boolean archiveWorkflow);

BulkResponse terminateRemove(
BulkResponse<String> terminateRemove(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
Expand All @@ -86,4 +87,13 @@ BulkResponse terminateRemove(
List<String> workflowIds,
String reason,
boolean archiveWorkflow);

BulkResponse<WorkflowModel> searchWorkflow(
@NotEmpty(message = "WorkflowIds list cannot be null.")
@Size(
max = MAX_REQUEST_ITEMS,
message =
"Cannot process more than {max} workflows. Please use multiple requests.")
List<String> workflowIds,
boolean includeTasks);
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.netflix.conductor.annotations.Trace;
import com.netflix.conductor.common.model.BulkResponse;
import com.netflix.conductor.core.execution.WorkflowExecutor;
import com.netflix.conductor.model.WorkflowModel;

@Audit
@Trace
Expand All @@ -45,9 +46,9 @@ public WorkflowBulkServiceImpl(
* @return bulk response object containing a list of succeeded workflows and a list of failed
* ones with errors
*/
public BulkResponse pauseWorkflow(List<String> workflowIds) {
public BulkResponse<String> pauseWorkflow(List<String> workflowIds) {

BulkResponse bulkResponse = new BulkResponse();
BulkResponse<String> bulkResponse = new BulkResponse<>();
for (String workflowId : workflowIds) {
try {
workflowExecutor.pauseWorkflow(workflowId);
Expand All @@ -72,8 +73,8 @@ public BulkResponse pauseWorkflow(List<String> workflowIds) {
* @return bulk response object containing a list of succeeded workflows and a list of failed
* ones with errors
*/
public BulkResponse resumeWorkflow(List<String> workflowIds) {
BulkResponse bulkResponse = new BulkResponse();
public BulkResponse<String> resumeWorkflow(List<String> workflowIds) {
BulkResponse<String> bulkResponse = new BulkResponse<>();
for (String workflowId : workflowIds) {
try {
workflowExecutor.resumeWorkflow(workflowId);
Expand All @@ -98,8 +99,8 @@ public BulkResponse resumeWorkflow(List<String> workflowIds) {
* @return bulk response object containing a list of succeeded workflows and a list of failed
* ones with errors
*/
public BulkResponse restart(List<String> workflowIds, boolean useLatestDefinitions) {
BulkResponse bulkResponse = new BulkResponse();
public BulkResponse<String> restart(List<String> workflowIds, boolean useLatestDefinitions) {
BulkResponse<String> bulkResponse = new BulkResponse<>();
for (String workflowId : workflowIds) {
try {
workflowExecutor.restart(workflowId, useLatestDefinitions);
Expand All @@ -123,8 +124,8 @@ public BulkResponse restart(List<String> workflowIds, boolean useLatestDefinitio
* @return bulk response object containing a list of succeeded workflows and a list of failed
* ones with errors
*/
public BulkResponse retry(List<String> workflowIds) {
BulkResponse bulkResponse = new BulkResponse();
public BulkResponse<String> retry(List<String> workflowIds) {
BulkResponse<String> bulkResponse = new BulkResponse<>();
for (String workflowId : workflowIds) {
try {
workflowExecutor.retry(workflowId, false);
Expand All @@ -150,8 +151,8 @@ public BulkResponse retry(List<String> workflowIds) {
* @return bulk response object containing a list of succeeded workflows and a list of failed
* ones with errors
*/
public BulkResponse terminate(List<String> workflowIds, String reason) {
BulkResponse bulkResponse = new BulkResponse();
public BulkResponse<String> terminate(List<String> workflowIds, String reason) {
BulkResponse<String> bulkResponse = new BulkResponse<>();
for (String workflowId : workflowIds) {
try {
workflowExecutor.terminateWorkflow(workflowId, reason);
Expand All @@ -174,8 +175,8 @@ public BulkResponse terminate(List<String> workflowIds, String reason) {
* @param workflowIds List of WorkflowIDs of the workflows you want to remove from system.
* @param archiveWorkflow Archives the workflow and associated tasks instead of removing them.
*/
public BulkResponse deleteWorkflow(List<String> workflowIds, boolean archiveWorkflow) {
BulkResponse bulkResponse = new BulkResponse();
public BulkResponse<String> deleteWorkflow(List<String> workflowIds, boolean archiveWorkflow) {
BulkResponse<String> bulkResponse = new BulkResponse<>();
for (String workflowId : workflowIds) {
try {
workflowService.deleteWorkflow(
Expand Down Expand Up @@ -203,9 +204,9 @@ public BulkResponse deleteWorkflow(List<String> workflowIds, boolean archiveWork
* @return bulk response object containing a list of succeeded workflows and a list of failed
* ones with errors
*/
public BulkResponse terminateRemove(
public BulkResponse<String> terminateRemove(
List<String> workflowIds, String reason, boolean archiveWorkflow) {
BulkResponse bulkResponse = new BulkResponse();
BulkResponse<String> bulkResponse = new BulkResponse<>();
for (String workflowId : workflowIds) {
try {
workflowExecutor.terminateWorkflow(workflowId, reason);
Expand Down Expand Up @@ -233,4 +234,32 @@ public BulkResponse terminateRemove(
}
return bulkResponse;
}

/**
* Fetch workflow details for given workflowIds.
*
* @param workflowIds List of workflow IDs to terminate and delete.
* @param includeTasks includes tasks from workflow
* @return bulk response object containing a list of workflow details
*/
@Override
public BulkResponse<WorkflowModel> searchWorkflow(
List<String> workflowIds, boolean includeTasks) {
BulkResponse<WorkflowModel> bulkResponse = new BulkResponse<>();
for (String workflowId : workflowIds) {
try {
WorkflowModel workflowModel =
workflowExecutor.getWorkflow(workflowId, includeTasks);
bulkResponse.appendSuccessResponse(workflowModel);
} catch (Exception e) {
LOGGER.error(
"bulk search exception, workflowId {}, message: {} ",
workflowId,
e.getMessage(),
e);
bulkResponse.appendFailedResponse(workflowId, e.getMessage());
}
}
return bulkResponse;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -336,7 +336,7 @@ public void testUpdateWorkflowDefWithCaseExpression() {
tasks.add(workflowTask);
workflowDef.setTasks(tasks);

BulkResponse bulkResponse =
BulkResponse<String> bulkResponse =
metadataService.updateWorkflowDef(Collections.singletonList(workflowDef));
}

Expand Down Expand Up @@ -366,7 +366,7 @@ public void testUpdateWorkflowDefWithJavscriptEvaluator() {
tasks.add(workflowTask);
workflowDef.setTasks(tasks);

BulkResponse bulkResponse =
BulkResponse<String> bulkResponse =
metadataService.updateWorkflowDef(Collections.singletonList(workflowDef));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ public void validate(@RequestBody WorkflowDef workflowDef) {

@PutMapping("/workflow")
@Operation(summary = "Create or update workflow definition")
public BulkResponse update(@RequestBody List<WorkflowDef> workflowDefs) {
public BulkResponse<String> update(@RequestBody List<WorkflowDef> workflowDefs) {
return metadataService.updateWorkflowDef(workflowDefs);
}

Expand Down
Loading

0 comments on commit c83ce9b

Please sign in to comment.