From ad08175c548cb0c4dbaf33c7ec0e1da4ecc7db2f Mon Sep 17 00:00:00 2001 From: jun-he Date: Wed, 7 Aug 2024 09:50:14 -0700 Subject: [PATCH 1/2] Surface blocking flag to the public interface to support async actions. --- .../handlers/StepInstanceActionHandler.java | 18 ++++++++---- .../StepInstanceActionHandlerTest.java | 28 +++++++++---------- 2 files changed, 26 insertions(+), 20 deletions(-) diff --git a/maestro-engine/src/main/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandler.java b/maestro-engine/src/main/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandler.java index c841898..e2763c1 100644 --- a/maestro-engine/src/main/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandler.java +++ b/maestro-engine/src/main/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandler.java @@ -49,7 +49,7 @@ public class StepInstanceActionHandler { * the runtime DAG. Only allow existing a single non-terminal step attempt at any time, which must * be the latest one. */ - public RunResponse restart(RunRequest runRequest) { + public RunResponse restart(RunRequest runRequest, boolean blocking) { if (!runRequest.isFreshRun() && runRequest.getCurrentPolicy() != RunPolicy.RESTART_FROM_SPECIFIC) { updateRunRequestForRestartFromInlineRoot(runRequest); @@ -57,7 +57,7 @@ public RunResponse restart(RunRequest runRequest) { RunResponse runResponse = actionHandler.restartRecursively(runRequest); if (runResponse.getStatus() == RunResponse.Status.DELEGATED) { - return restartDirectly(runResponse, runRequest); + return restartDirectly(runResponse, runRequest, blocking); } return runResponse; } @@ -112,8 +112,9 @@ private void updateRunRequestForRestartFromInlineRoot(RunRequest runRequest) { } /** Directly restart a step without going to its ancestors. */ - public RunResponse restartDirectly(RunResponse restartStepInfo, RunRequest runRequest) { - return actionDao.restartDirectly(restartStepInfo, runRequest, true); + public RunResponse restartDirectly( + RunResponse restartStepInfo, RunRequest runRequest, boolean blocking) { + return actionDao.restartDirectly(restartStepInfo, runRequest, blocking); } /** Bypasses the step dependencies. */ @@ -142,7 +143,12 @@ public StepInstanceActionResponse terminate( } public StepInstanceActionResponse skip( - String workflowId, long workflowInstanceId, String stepId, User user, RunRequest runRequest) { + String workflowId, + long workflowInstanceId, + String stepId, + User user, + RunRequest runRequest, + boolean blocking) { WorkflowInstance instance = instanceDao.getWorkflowInstance( workflowId, workflowInstanceId, Constants.LATEST_INSTANCE_RUN, true); @@ -178,7 +184,7 @@ public StepInstanceActionResponse skip( return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP); } - RunResponse runResponse = restart(runRequest); + RunResponse runResponse = restart(runRequest, blocking); return runResponse.toStepInstanceActionResponse(); } } diff --git a/maestro-engine/src/test/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandlerTest.java b/maestro-engine/src/test/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandlerTest.java index b5a536e..ebfc0c5 100644 --- a/maestro-engine/src/test/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandlerTest.java +++ b/maestro-engine/src/test/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandlerTest.java @@ -80,7 +80,7 @@ public void testRestartNewRun() { .restartConfig( RestartConfig.builder().addRestartNode("sample-minimal-wf", 1, "job1").build()) .build(); - RunResponse response = stepActionHandler.restart(runRequest); + RunResponse response = stepActionHandler.restart(runRequest, true); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(RunRequest.class); Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture()); @@ -104,7 +104,7 @@ public void testRestartNewAttempt() { RunResponse runResponse = RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build(); when(actionDao.restartDirectly(any(), any(), anyBoolean())).thenReturn(runResponse); - RunResponse response = stepActionHandler.restart(runRequest); + RunResponse response = stepActionHandler.restart(runRequest, true); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(RunRequest.class); Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture()); @@ -158,7 +158,7 @@ public void testRestartFromInlineRootWithinForeach() { .build()) .build(); - RunResponse response = stepActionHandler.restart(runRequest); + RunResponse response = stepActionHandler.restart(runRequest, true); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(RunRequest.class); Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture()); @@ -204,7 +204,7 @@ public void testRestartFromInlineRootWithinNonForeach() { .build()) .build(); - RunResponse response = stepActionHandler.restart(runRequest); + RunResponse response = stepActionHandler.restart(runRequest, true); ArgumentCaptor requestCaptor = ArgumentCaptor.forClass(RunRequest.class); Mockito.verify(actionHandler, Mockito.times(1)).restartRecursively(requestCaptor.capture()); @@ -238,7 +238,7 @@ public void testInvalidRestartFromInlineRoot() { "Cannot restart from inline root for non-terminal root", IllegalArgumentException.class, "instance [null] is in non-terminal state [IN_PROGRESS]", - () -> stepActionHandler.restart(runRequest)); + () -> stepActionHandler.restart(runRequest, true)); when(instance.getStatus()).thenReturn(WorkflowInstance.Status.FAILED); WorkflowInstanceAggregatedInfo aggregatedInfo = mock(WorkflowInstanceAggregatedInfo.class); @@ -250,14 +250,14 @@ public void testInvalidRestartFromInlineRoot() { "Cannot restart from inline root for non-terminal step", IllegalArgumentException.class, "step null[job1] is in non-terminal state [RUNNING]", - () -> stepActionHandler.restart(runRequest)); + () -> stepActionHandler.restart(runRequest, true)); when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.FATALLY_FAILED); AssertHelper.assertThrows( "Cannot restart from inline root for invalid restart path", IllegalArgumentException.class, "restart-path size is not 1", - () -> stepActionHandler.restart(runRequest)); + () -> stepActionHandler.restart(runRequest, true)); } @Test @@ -291,7 +291,7 @@ public void testSkipRunningStepInRunningInstance() { when(aggregatedInfo.getStepAggregatedViews()).thenReturn(singletonMap("job1", aggregatedView)); when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS); when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.RUNNING); - stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null); + stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true); verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP); } @@ -321,7 +321,7 @@ public void testSkipFailedStepInRunningInstance() { .thenReturn(RunResponse.builder().status(RunResponse.Status.DELEGATED).build()); when(actionDao.restartDirectly(any(), eq(runRequest), eq(true))) .thenReturn(RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build()); - stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest); + stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true); verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP); verify(actionHandler, times(1)).restartRecursively(runRequest); verify(actionDao, times(1)).restartDirectly(any(), eq(runRequest), eq(true)); @@ -338,7 +338,7 @@ public void testSkipShouldWakeupStepInRunningInstance() { when(aggregatedInfo.getStepAggregatedViews()).thenReturn(singletonMap("job1", aggregatedView)); when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS); when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.PLATFORM_FAILED); - stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null); + stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true); verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP); } @@ -367,7 +367,7 @@ public void testSkipStoppedStepInStoppedInstance() { .build(); when(actionHandler.restartRecursively(runRequest)) .thenReturn(RunResponse.builder().status(RunResponse.Status.WORKFLOW_RUN_CREATED).build()); - stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest); + stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true); verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP); verify(actionHandler, times(1)).restartRecursively(runRequest); verify(actionDao, times(0)).restartDirectly(any(), eq(runRequest), eq(true)); @@ -388,20 +388,20 @@ public void testInvalidSkip() { "Cannot find status in aggregated step views", NullPointerException.class, "Invalid: cannot find the step view of workflow step ", - () -> stepActionHandler.skip("sample-minimal-wf", 1, "job2", user, null)); + () -> stepActionHandler.skip("sample-minimal-wf", 1, "job2", user, null, true)); when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.NOT_CREATED); AssertHelper.assertThrows( "Cannot skip not-created step", MaestroBadRequestException.class, "Cannot skip step [sample-minimal-wf][1][job1] before it is created. Please try it again.", - () -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null)); + () -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true)); when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.CREATED); AssertHelper.assertThrows( "Cannot skip not-created step", MaestroBadRequestException.class, "Cannot skip step [sample-minimal-wf][1][job1] because it is unsupported by the step action map", - () -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null)); + () -> stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true)); } } From 683307dd7e5fb16780f9b31c8085a6df5ed54bfb Mon Sep 17 00:00:00 2001 From: jun-he Date: Wed, 7 Aug 2024 10:00:24 -0700 Subject: [PATCH 2/2] surface the flag for additional actions --- .../dao/MaestroStepInstanceActionDao.java | 12 ---------- .../handlers/StepInstanceActionHandler.java | 11 +++++----- .../dao/MaestroStepInstanceActionDaoTest.java | 4 ++-- .../StepInstanceActionHandlerTest.java | 22 +++++++++++-------- 4 files changed, 21 insertions(+), 28 deletions(-) diff --git a/maestro-engine/src/main/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDao.java b/maestro-engine/src/main/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDao.java index c5bb0f1..27af1f5 100644 --- a/maestro-engine/src/main/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDao.java +++ b/maestro-engine/src/main/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDao.java @@ -121,12 +121,6 @@ public RunResponse restartDirectly( /** bypass the signal dependencies. */ public StepInstanceActionResponse bypassStepDependencies( - WorkflowInstance instance, String stepId, User user) { - return bypassStepDependencies(instance, stepId, user, true); - } - - @VisibleForTesting - StepInstanceActionResponse bypassStepDependencies( WorkflowInstance instance, String stepId, User user, boolean blocking) { validateStepId(instance, stepId, Actions.StepInstanceAction.BYPASS_STEP_DEPENDENCIES); StepInstance stepInstance = @@ -399,12 +393,6 @@ int deleteAction(StepInstance stepInstance, Actions.StepInstanceAction action) { * callback will do the cleanup. */ public StepInstanceActionResponse terminate( - WorkflowInstance instance, String stepId, User user, Actions.StepInstanceAction action) { - return terminate(instance, stepId, user, action, true); - } - - @VisibleForTesting - StepInstanceActionResponse terminate( WorkflowInstance instance, String stepId, User user, diff --git a/maestro-engine/src/main/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandler.java b/maestro-engine/src/main/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandler.java index e2763c1..3aa7a57 100644 --- a/maestro-engine/src/main/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandler.java +++ b/maestro-engine/src/main/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandler.java @@ -119,10 +119,10 @@ public RunResponse restartDirectly( /** Bypasses the step dependencies. */ public StepInstanceActionResponse bypassStepDependencies( - String workflowId, long workflowInstanceId, String stepId, User user) { + String workflowId, long workflowInstanceId, String stepId, User user, boolean blocking) { WorkflowInstance instance = instanceDao.getLatestWorkflowInstanceRun(workflowId, workflowInstanceId); - return actionDao.bypassStepDependencies(instance, stepId, user); + return actionDao.bypassStepDependencies(instance, stepId, user, blocking); } /** Terminate a step instance, i.e. the latest workflow instance run's latest attempt. */ @@ -131,7 +131,8 @@ public StepInstanceActionResponse terminate( long workflowInstanceId, String stepId, User user, - Actions.StepInstanceAction action) { + Actions.StepInstanceAction action, + boolean blocking) { WorkflowInstance instance = instanceDao.getLatestWorkflowInstanceRun(workflowId, workflowInstanceId); if (instance.getStatus().isTerminal()) { @@ -139,7 +140,7 @@ public StepInstanceActionResponse terminate( "Cannot manually %s the step [%s] as the workflow instance %s is in a terminal state [%s]", action.name(), stepId, instance.getIdentity(), instance.getStatus()); } - return actionDao.terminate(instance, stepId, user, action); + return actionDao.terminate(instance, stepId, user, action, blocking); } public StepInstanceActionResponse skip( @@ -181,7 +182,7 @@ public StepInstanceActionResponse skip( } if (!instance.getStatus().isTerminal() && view.getStatus().shouldWakeup()) { - return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP); + return actionDao.terminate(instance, stepId, user, Actions.StepInstanceAction.SKIP, blocking); } RunResponse runResponse = restart(runRequest, blocking); diff --git a/maestro-engine/src/test/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDaoTest.java b/maestro-engine/src/test/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDaoTest.java index b34182d..4150f3b 100644 --- a/maestro-engine/src/test/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDaoTest.java +++ b/maestro-engine/src/test/java/com/netflix/maestro/engine/dao/MaestroStepInstanceActionDaoTest.java @@ -518,7 +518,7 @@ public void testInvalidBypassSignalDependencies() { "Cannot manually RESTART the step", MaestroBadRequestException.class, "Cannot manually BYPASS_STEP_DEPENDENCIES the step [not-existing] because the latest workflow run", - () -> actionDao.bypassStepDependencies(instance, "not-existing", user)); + () -> actionDao.bypassStepDependencies(instance, "not-existing", user, true)); stepInstance.getRuntimeState().setStatus(StepInstance.Status.RUNNING); stepInstanceDao.insertOrUpsertStepInstance(stepInstance, true); @@ -527,7 +527,7 @@ public void testInvalidBypassSignalDependencies() { "Cannot manually bypass the step dependencies", MaestroInvalidStatusException.class, "Cannot manually bypass-step-dependencies the step as its status [RUNNING] is not waiting for signals", - () -> actionDao.bypassStepDependencies(instance, "job1", user)); + () -> actionDao.bypassStepDependencies(instance, "job1", user, true)); } @Test diff --git a/maestro-engine/src/test/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandlerTest.java b/maestro-engine/src/test/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandlerTest.java index ebfc0c5..e104a61 100644 --- a/maestro-engine/src/test/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandlerTest.java +++ b/maestro-engine/src/test/java/com/netflix/maestro/engine/handlers/StepInstanceActionHandlerTest.java @@ -263,21 +263,21 @@ public void testInvalidRestartFromInlineRoot() { @Test public void testTerminate() { when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS); - stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP); - verify(actionDao, times(1)).terminate(instance, "job1", user, STOP); + stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP, true); + verify(actionDao, times(1)).terminate(instance, "job1", user, STOP, true); when(instance.getStatus()).thenReturn(WorkflowInstance.Status.FAILED); AssertHelper.assertThrows( "Cannot manually terminate the step", MaestroInvalidStatusException.class, "Cannot manually STOP the step [job1] as the workflow instance", - () -> stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP)); + () -> stepActionHandler.terminate("sample-minimal-wf", 1, "job1", user, STOP, true)); } @Test public void testBypassDependencies() { when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS); - stepActionHandler.bypassStepDependencies("sample-minimal-wf", 1, "job1", user); - verify(actionDao, times(1)).bypassStepDependencies(instance, "job1", user); + stepActionHandler.bypassStepDependencies("sample-minimal-wf", 1, "job1", user, true); + verify(actionDao, times(1)).bypassStepDependencies(instance, "job1", user, true); } @Test @@ -292,7 +292,8 @@ public void testSkipRunningStepInRunningInstance() { when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS); when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.RUNNING); stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true); - verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP); + verify(actionDao, times(1)) + .terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true); } @Test @@ -322,7 +323,8 @@ public void testSkipFailedStepInRunningInstance() { when(actionDao.restartDirectly(any(), eq(runRequest), eq(true))) .thenReturn(RunResponse.builder().status(RunResponse.Status.STEP_ATTEMPT_CREATED).build()); stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true); - verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP); + verify(actionDao, times(0)) + .terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true); verify(actionHandler, times(1)).restartRecursively(runRequest); verify(actionDao, times(1)).restartDirectly(any(), eq(runRequest), eq(true)); } @@ -339,7 +341,8 @@ public void testSkipShouldWakeupStepInRunningInstance() { when(instance.getStatus()).thenReturn(WorkflowInstance.Status.IN_PROGRESS); when(aggregatedView.getStatus()).thenReturn(StepInstance.Status.PLATFORM_FAILED); stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, null, true); - verify(actionDao, times(1)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP); + verify(actionDao, times(1)) + .terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true); } @Test @@ -368,7 +371,8 @@ public void testSkipStoppedStepInStoppedInstance() { when(actionHandler.restartRecursively(runRequest)) .thenReturn(RunResponse.builder().status(RunResponse.Status.WORKFLOW_RUN_CREATED).build()); stepActionHandler.skip("sample-minimal-wf", 1, "job1", user, runRequest, true); - verify(actionDao, times(0)).terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP); + verify(actionDao, times(0)) + .terminate(instance, "job1", user, Actions.StepInstanceAction.SKIP, true); verify(actionHandler, times(1)).restartRecursively(runRequest); verify(actionDao, times(0)).restartDirectly(any(), eq(runRequest), eq(true)); }