Skip to content

Commit

Permalink
surface the flag for additional actions
Browse files Browse the repository at this point in the history
  • Loading branch information
jun-he committed Aug 7, 2024
1 parent ad08175 commit 683307d
Show file tree
Hide file tree
Showing 4 changed files with 21 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -121,12 +121,6 @@ public RunResponse restartDirectly(

/** bypass the signal dependencies. */
public StepInstanceActionResponse bypassStepDependencies(
WorkflowInstance instance, String stepId, User user) {
return bypassStepDependencies(instance, stepId, user, true);
}

@VisibleForTesting
StepInstanceActionResponse bypassStepDependencies(
WorkflowInstance instance, String stepId, User user, boolean blocking) {
validateStepId(instance, stepId, Actions.StepInstanceAction.BYPASS_STEP_DEPENDENCIES);
StepInstance stepInstance =
Expand Down Expand Up @@ -399,12 +393,6 @@ int deleteAction(StepInstance stepInstance, Actions.StepInstanceAction action) {
* callback will do the cleanup.
*/
public StepInstanceActionResponse terminate(
WorkflowInstance instance, String stepId, User user, Actions.StepInstanceAction action) {
return terminate(instance, stepId, user, action, true);
}

@VisibleForTesting
StepInstanceActionResponse terminate(
WorkflowInstance instance,
String stepId,
User user,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -119,10 +119,10 @@ public RunResponse restartDirectly(

/** Bypasses the step dependencies. */
public StepInstanceActionResponse bypassStepDependencies(
String workflowId, long workflowInstanceId, String stepId, User user) {
String workflowId, long workflowInstanceId, String stepId, User user, boolean blocking) {
WorkflowInstance instance =
instanceDao.getLatestWorkflowInstanceRun(workflowId, workflowInstanceId);
return actionDao.bypassStepDependencies(instance, stepId, user);
return actionDao.bypassStepDependencies(instance, stepId, user, blocking);
}

/** Terminate a step instance, i.e. the latest workflow instance run's latest attempt. */
Expand All @@ -131,15 +131,16 @@ public StepInstanceActionResponse terminate(
long workflowInstanceId,
String stepId,
User user,
Actions.StepInstanceAction action) {
Actions.StepInstanceAction action,
boolean blocking) {
WorkflowInstance instance =
instanceDao.getLatestWorkflowInstanceRun(workflowId, workflowInstanceId);
if (instance.getStatus().isTerminal()) {
throw new MaestroInvalidStatusException(
"Cannot manually %s the step [%s] as the workflow instance %s is in a terminal state [%s]",
action.name(), stepId, instance.getIdentity(), instance.getStatus());
}
return actionDao.terminate(instance, stepId, user, action);
return actionDao.terminate(instance, stepId, user, action, blocking);
}

public StepInstanceActionResponse skip(
Expand Down Expand Up @@ -181,7 +182,7 @@ public StepInstanceActionResponse skip(
}

if (!instance.getStatus().isTerminal() && view.getStatus().shouldWakeup()) {
return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP);
return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP, blocking);
}

RunResponse runResponse = restart(runRequest, blocking);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ public void testInvalidBypassSignalDependencies() {
"Cannot manually RESTART the step",
MaestroBadRequestException.class,
"Cannot manually BYPASS_STEP_DEPENDENCIES the step [not-existing] because the latest workflow run",
() -> actionDao.bypassStepDependencies(instance, "not-existing", user));
() -> actionDao.bypassStepDependencies(instance, "not-existing", user, true));

stepInstance.getRuntimeState().setStatus(StepInstance.Status.RUNNING);
stepInstanceDao.insertOrUpsertStepInstance(stepInstance, true);
Expand All @@ -527,7 +527,7 @@ public void testInvalidBypassSignalDependencies() {
"Cannot manually bypass the step dependencies",
MaestroInvalidStatusException.class,
"Cannot manually bypass-step-dependencies the step as its status [RUNNING] is not waiting for signals",
() -> actionDao.bypassStepDependencies(instance, "job1", user));
() -> actionDao.bypassStepDependencies(instance, "job1", user, true));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -263,21 +263,21 @@ public void testInvalidRestartFromInlineRoot() {
@Test
public void testTerminate() {
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP);
verify(actionDao, times(1)).terminate(instance, "job1", user, STOP);
stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP, true);
verify(actionDao, times(1)).terminate(instance, "job1", user, STOP, true);
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.FAILED);
AssertHelper.assertThrows(
"Cannot manually terminate the step",
MaestroInvalidStatusException.class,
"Cannot manually STOP the step [job1] as the workflow instance",
() -> stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP));
() -> stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP, true));
}

@Test
public void testBypassDependencies() {
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
stepActionHandler.bypassStepDependencies("sample-minimal-wf", 1, "job1", user);
verify(actionDao, times(1)).bypassStepDependencies(instance, "job1", user);
stepActionHandler.bypassStepDependencies("sample-minimal-wf", 1, "job1", user, true);
verify(actionDao, times(1)).bypassStepDependencies(instance, "job1", user, true);
}

@Test
Expand All @@ -292,7 +292,8 @@ public void testSkipRunningStepInRunningInstance() {
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.RUNNING);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true);
verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
verify(actionDao, times(1))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
}

@Test
Expand Down Expand Up @@ -322,7 +323,8 @@ public void testSkipFailedStepInRunningInstance() {
when(actionDao.restartDirectly(any(), eq(runRequest), eq(true)))
.thenReturn(RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build());
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true);
verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
verify(actionDao, times(0))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
verify(actionHandler, times(1)).restartRecursively(runRequest);
verify(actionDao, times(1)).restartDirectly(any(), eq(runRequest), eq(true));
}
Expand All @@ -339,7 +341,8 @@ public void testSkipShouldWakeupStepInRunningInstance() {
when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS);
when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.PLATFORM_FAILED);
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true);
verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
verify(actionDao, times(1))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
}

@Test
Expand Down Expand Up @@ -368,7 +371,8 @@ public void testSkipStoppedStepInStoppedInstance() {
when(actionHandler.restartRecursively(runRequest))
.thenReturn(RunResponse.builder().status(RunResponse.Status.WORKFLOW_RUN_CREATED).build());
stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true);
verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP);
verify(actionDao, times(0))
.terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true);
verify(actionHandler, times(1)).restartRecursively(runRequest);
verify(actionDao, times(0)).restartDirectly(any(), eq(runRequest), eq(true));
}
Expand Down

0 comments on commit 683307d

Please sign in to comment.