Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Surface blocking flag to the public interface to support async actions. #69

Merged
merged 2 commits into from
Aug 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,6 @@ public RunResponse restartDirectly(

/** bypass the signal dependencies. */
public StepInstanceActionResponse bypassStepDependencies(
WorkflowInstance instance, String stepId, User user) {
return bypassStepDependencies(instance, stepId, user, true);
}

@VisibleForTesting
StepInstanceActionResponse bypassStepDependencies(
WorkflowInstance instance, String stepId, User user, boolean blocking) {
validateStepId(instance, stepId, Actions.StepInstanceAction.BYPASS_STEP_DEPENDENCIES);
StepInstance stepInstance =
Expand Down Expand Up @@ -399,12 +393,6 @@ int deleteAction(StepInstance stepInstance, Actions.StepInstanceAction action) {
* callback will do the cleanup.
*/
public StepInstanceActionResponse terminate(
WorkflowInstance instance, String stepId, User user, Actions.StepInstanceAction action) {
return terminate(instance, stepId, user, action, true);
}

@VisibleForTesting
StepInstanceActionResponse terminate(
WorkflowInstance instance,
String stepId,
User user,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,15 +49,15 @@ public class StepInstanceActionHandler {
* the runtime DAG. Only allow existing a single non-terminal step attempt at any time, which must
* be the latest one.
*/
public RunResponse restart(RunRequest runRequest) {
public RunResponse restart(RunRequest runRequest, boolean blocking) {
if (!runRequest.isFreshRun()
&& runRequest.getCurrentPolicy() != RunPolicy.RESTART_FROM_SPECIFIC) {
updateRunRequestForRestartFromInlineRoot(runRequest);
}

RunResponse runResponse = actionHandler.restartRecursively(runRequest);
if (runResponse.getStatus() == RunResponse.Status.DELEGATED) {
return restartDirectly(runResponse, runRequest);
return restartDirectly(runResponse, runRequest, blocking);
}
return runResponse;
}
Expand Down Expand Up @@ -112,16 +112,17 @@ private void updateRunRequestForRestartFromInlineRoot(RunRequest runRequest) {
}

/** Directly restart a step without going to its ancestors. */
public RunResponse restartDirectly(RunResponse restartStepInfo, RunRequest runRequest) {
return actionDao.restartDirectly(restartStepInfo, runRequest, true);
public RunResponse restartDirectly(
RunResponse restartStepInfo, RunRequest runRequest, boolean blocking) {
return actionDao.restartDirectly(restartStepInfo, runRequest, blocking);
}

/** Bypasses the step dependencies. */
public StepInstanceActionResponse bypassStepDependencies(
String workflowId, long workflowInstanceId, String stepId, User user) {
String workflowId, long workflowInstanceId, String stepId, User user, boolean blocking) {
WorkflowInstance instance =
instanceDao.getLatestWorkflowInstanceRun(workflowId, workflowInstanceId);
return actionDao.bypassStepDependencies(instance, stepId, user);
return actionDao.bypassStepDependencies(instance, stepId, user, blocking);
}

/** Terminate a step instance, i.e. the latest workflow instance run's latest attempt. */
Expand All @@ -130,19 +131,25 @@ public StepInstanceActionResponse terminate(
long workflowInstanceId,
String stepId,
User user,
Actions.StepInstanceAction action) {
Actions.StepInstanceAction action,
boolean blocking) {
WorkflowInstance instance =
instanceDao.getLatestWorkflowInstanceRun(workflowId, workflowInstanceId);
if (instance.getStatus().isTerminal()) {
throw new MaestroInvalidStatusException(
"Cannot manually %s the step [%s] as the workflow instance %s is in a terminal state [%s]",
action.name(), stepId, instance.getIdentity(), instance.getStatus());
}
return actionDao.terminate(instance, stepId, user, action);
return actionDao.terminate(instance, stepId, user, action, blocking);
}

public StepInstanceActionResponse skip(
String workflowId, long workflowInstanceId, String stepId, User user, RunRequest runRequest) {
String workflowId,
long workflowInstanceId,
String stepId,
User user,
RunRequest runRequest,
boolean blocking) {
WorkflowInstance instance =
instanceDao.getWorkflowInstance(
workflowId, workflowInstanceId, Constants.LATEST_INSTANCE_RUN, true);
Expand Down Expand Up @@ -175,10 +182,10 @@ public StepInstanceActionResponse skip(
}

if (!instance.getStatus().isTerminal() && view.getStatus().shouldWakeup()) {
return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP);
return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP, blocking);
}

RunResponse runResponse = restart(runRequest);
RunResponse runResponse = restart(runRequest, blocking);
return runResponse.toStepInstanceActionResponse();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public void testInvalidBypassSignalDependencies() {
"Cannot manually RESTART the step",
MaestroBadRequestException.class,
"Cannot manually BYPASS_STEP_DEPENDENCIES the step [not-existing] because the latest workflow run",
() -> actionDao.bypassStepDependencies(instance, "not-existing", user));
() -> actionDao.bypassStepDependencies(instance, "not-existing", user, true));

stepInstance.getRuntimeState().setStatus(StepInstance.Status.RUNNING);
stepInstanceDao.insertOrUpsertStepInstance(stepInstance, true);
Expand All @@ -527,7 +527,7 @@ public void testInvalidBypassSignalDependencies() {
"Cannot manually bypass the step dependencies",
MaestroInvalidStatusException.class,
"Cannot manually bypass-step-dependencies the step as its status [RUNNING] is not waiting for signals",
() -> actionDao.bypassStepDependencies(instance, "job1", user));
() -> actionDao.bypassStepDependencies(instance, "job1", user, true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ public void testRestartNewRun() {
.restartConfig(
RestartConfig.builder().addRestartNode("sample-minimal-wf", 1, "job1").build())
.build();
RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand All @@ -104,7 +104,7 @@ public void testRestartNewAttempt() {
RunResponse runResponse =
RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build();
when(actionDao.restartDirectly(any(), any(), anyBoolean())).thenReturn(runResponse);
RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand Down Expand Up @@ -158,7 +158,7 @@ public void testRestartFromInlineRootWithinForeach() {
.build())
.build();

RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand Down Expand Up @@ -204,7 +204,7 @@ public void testRestartFromInlineRootWithinNonForeach() {
.build())
.build();

RunResponse response = stepActionHandler.restart(runRequest);
RunResponse response = stepActionHandler.restart(runRequest, true);

ArgumentCaptor<RunRequest> requestCaptor = ArgumentCaptor.forClass(RunRequest.class);
Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture());
Expand Down Expand Up @@ -238,7 +238,7 @@ public void testInvalidRestartFromInlineRoot() {
"Cannot restart from inline root for non-terminal root",
IllegalArgumentException.class,
"instance [null] is in non-terminal state [IN_PROGRESS]",
() -> stepActionHandler.restart(runRequest));
() -> stepActionHandler.restart(runRequest, true));

when(instance.getStatus()).thenReturn(WorkflowInstance.Status.FAILED);
WorkflowInstanceAggregatedInfo aggregatedInfo = mock(WorkflowInstanceAggregatedInfo.class);
Expand All @@ -250,34 +250,34 @@ public void testInvalidRestartFromInlineRoot() {
"Cannot restart from inline root for non-terminal step",
IllegalArgumentException.class,
"step null[job1] is in non-terminal state [RUNNING]",
() -> stepActionHandler.restart(runRequest));
() -> stepActionHandler.restart(runRequest, true));

when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.FATALLY_FAILED);
AssertHelper.assertThrows(
"Cannot restart from inline root for invalid restart path",
IllegalArgumentException.class,
"restart-path size is not 1",
() -> stepActionHandler.restart(runRequest));
() -> stepActionHandler.restart(runRequest, true));
}

@Test
public void testTerminate() {
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP);
verify(actionDao, times(1)).terminate(instance, "job1", user, STOP);
stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP, true);
verify(actionDao, times(1)).terminate(instance, "job1", user, STOP, true);
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.FAILED);
AssertHelper.assertThrows(
"Cannot manually terminate the step",
MaestroInvalidStatusException.class,
"Cannot manually STOP the step [job1] as the workflow instance",
() -> stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP));
() -> stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP, true));
}

@Test
public void testBypassDependencies() {
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
stepActionHandler.bypassStepDependencies("sample-minimal-wf", 1, "job1", user);
verify(actionDao, times(1)).bypassStepDependencies(instance, "job1", user);
stepActionHandler.bypassStepDependencies("sample-minimal-wf", 1, "job1", user, true);
verify(actionDao, times(1)).bypassStepDependencies(instance, "job1", user, true);
}

@Test
Expand All @@ -291,8 +291,9 @@ public void testSkipRunningStepInRunningInstance() {
when(aggregatedInfo.getStepAggregatedViews()).thenReturn(singletonMap("job1", aggregatedView));
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.RUNNING);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null);
verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true);
verify(actionDao, times(1))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
}

@Test
Expand Down Expand Up @@ -321,8 +322,9 @@ public void testSkipFailedStepInRunningInstance() {
.thenReturn(RunResponse.builder().status(RunResponse.Status.DELEGATED).build());
when(actionDao.restartDirectly(any(), eq(runRequest), eq(true)))
.thenReturn(RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build());
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest);
verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true);
verify(actionDao, times(0))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
verify(actionHandler, times(1)).restartRecursively(runRequest);
verify(actionDao, times(1)).restartDirectly(any(), eq(runRequest), eq(true));
}
Expand All @@ -338,8 +340,9 @@ public void testSkipShouldWakeupStepInRunningInstance() {
when(aggregatedInfo.getStepAggregatedViews()).thenReturn(singletonMap("job1", aggregatedView));
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.PLATFORM_FAILED);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null);
verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true);
verify(actionDao, times(1))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
}

@Test
Expand Down Expand Up @@ -367,8 +370,9 @@ public void testSkipStoppedStepInStoppedInstance() {
.build();
when(actionHandler.restartRecursively(runRequest))
.thenReturn(RunResponse.builder().status(RunResponse.Status.WORKFLOW_RUN_CREATED).build());
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest);
verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true);
verify(actionDao, times(0))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
verify(actionHandler, times(1)).restartRecursively(runRequest);
verify(actionDao, times(0)).restartDirectly(any(), eq(runRequest), eq(true));
}
Expand All @@ -388,20 +392,20 @@ public void testInvalidSkip() {
"Cannot find status in aggregated step views",
NullPointerException.class,
"Invalid: cannot find the step view of workflow step ",
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job2", user, null));
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job2", user, null, true));

when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.NOT_CREATED);
AssertHelper.assertThrows(
"Cannot skip not-created step",
MaestroBadRequestException.class,
"Cannot skip step [sample-minimal-wf][1][job1] before it is created. Please try it again.",
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null));
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true));

when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.CREATED);
AssertHelper.assertThrows(
"Cannot skip not-created step",
MaestroBadRequestException.class,
"Cannot skip step [sample-minimal-wf][1][job1] because it is unsupported by the step action map",
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null));
() -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true));
}
}