diff --git a/handler.go b/handler.go index 511c3693..7c12024d 100644 --- a/handler.go +++ b/handler.go @@ -360,31 +360,60 @@ func (h Handler) ResumeWorkflowByID(ctx context.Context, input *models.ResumeWor if !resources.WorkflowIsDone(&workflow) { return &models.Workflow{}, fmt.Errorf("Workflow %s active: %s", workflow.ID, workflow.Status) } + areDescendantsDone, err := areDescendantWorkflowsDone(ctx, h.store, workflow) + if err != nil { + return &models.Workflow{}, fmt.Errorf("Failed to check descendant workflows: %s", err) + } + if !areDescendantsDone { + return &models.Workflow{}, fmt.Errorf("Cannot resume workflow %s because it has active descendants", workflow.ID) + } if _, ok := workflow.WorkflowDefinition.StateMachine.States[input.Overrides.StartAt]; !ok { return &models.Workflow{}, fmt.Errorf("Invalid StartAt state %s", input.Overrides.StartAt) } - // find the input to the StartAt state - effectiveInput := "" - for _, job := range workflow.Jobs { - if job.State == input.Overrides.StartAt { - // if job was never started then we should probably not trust the input - if job.Status == models.JobStatusAbortedDepsFailed || - job.Status == models.JobStatusQueued || - job.Status == models.JobStatusWaitingForDeps || - job.Status == models.JobStatusCreated { - - return &models.Workflow{}, - fmt.Errorf("Job %s for StartAt %s was not started for Workflow: %s. Could not infer input", - job.ID, job.State, workflow.ID) + // Find the job that ran for the StartAt state so that its input can be used for the new workflow. + // Try to use lastJob if it's available and if we want to start the new workflow from that state, + // otherwise load the full execution history and search through it to find the desired input. + // Note: getting the workflow's execution history involves making expensive API calls to the + // stepfunctions GetExecutionHistory endpoint. GetWorkflowByID can be called beforehand to avoid + // the workflow.Jobs == nil path. + // Clients may wish to increase the timeout from the global default to handle workflows with long + // execution histories. + var originalJob *models.Job + if workflow.LastJob != nil && input.Overrides.StartAt == workflow.LastJob.State { + originalJob = workflow.LastJob + } else { + if workflow.Jobs == nil { + if err := h.manager.UpdateWorkflowHistory(ctx, &workflow); err != nil { + return &models.Workflow{}, err + } + updatedWorkflow, err := h.store.GetWorkflowByID(ctx, input.WorkflowID) + if err != nil { + return &models.Workflow{}, err } + workflow = updatedWorkflow + } - effectiveInput = job.Input - break + for _, job := range workflow.Jobs { + if job.State == input.Overrides.StartAt { + originalJob = job + break + } } } - return h.manager.RetryWorkflow(ctx, workflow, input.Overrides.StartAt, effectiveInput) + if originalJob == nil { + return &models.Workflow{}, fmt.Errorf("No job found for StartAt %s", input.Overrides.StartAt) + } + + // if job was never started then we should probably not trust the input + if !hasJobStarted(originalJob.Status) { + return &models.Workflow{}, + fmt.Errorf("Job %s for StartAt %s was not started for Workflow: %s. Could not infer input", + originalJob.ID, originalJob.State, workflow.ID) + } + + return h.manager.RetryWorkflow(ctx, workflow, input.Overrides.StartAt, originalJob.Input) } // ResolveWorkflowByID sets a workflow's ResolvedByUser to true if it is currently false. @@ -430,6 +459,13 @@ func newWorkflowDefinitionFromRequest(req models.NewWorkflowDefinitionRequest) ( return resources.NewWorkflowDefinition(req.Name, req.Manager, req.StateMachine, req.DefaultTags) } +func hasJobStarted(status models.JobStatus) bool { + return !(status == models.JobStatusAbortedDepsFailed || + status == models.JobStatusQueued || + status == models.JobStatusWaitingForDeps || + status == models.JobStatusCreated) +} + // validateTagsMap ensures that all tags values are strings func validateTagsMap(apiTags map[string]interface{}) error { for _, val := range apiTags { @@ -443,3 +479,25 @@ func validateTagsMap(apiTags map[string]interface{}) error { func epochMillis(t time.Time) int { return int(t.UnixNano() / int64(time.Millisecond)) } + +// areDescendantWorkflowsDone does a depth first search on the workflow's retries and returns true +// if none of the descendant workflows are active. +func areDescendantWorkflowsDone(ctx context.Context, s store.Store, workflow models.Workflow) (bool, error) { + for _, childID := range workflow.Retries { + childWorkflow, err := s.GetWorkflowByID(ctx, childID) + if err != nil { + return false, err + } + if !resources.WorkflowIsDone(&childWorkflow) { + return false, nil + } + areDescendantsDone, err := areDescendantWorkflowsDone(ctx, s, childWorkflow) + if err != nil { + return false, err + } + if !areDescendantsDone { + return false, nil + } + } + return true, nil +} diff --git a/handler_test.go b/handler_test.go index 6f0f69f8..930222e7 100644 --- a/handler_test.go +++ b/handler_test.go @@ -13,6 +13,36 @@ import ( "github.com/stretchr/testify/require" ) +type handlerTestController struct { + mockController *gomock.Controller + mockWFM *mocks.MockWorkflowManager + mockStore *mocks.MockStore + handler Handler + t *testing.T +} + +func newSFNManagerTestController(t *testing.T) *handlerTestController { + mockController := gomock.NewController(t) + mockWFM := mocks.NewMockWorkflowManager(mockController) + mockStore := mocks.NewMockStore(mockController) + handler := Handler{ + manager: mockWFM, + store: mockStore, + } + + return &handlerTestController{ + mockController: mockController, + mockWFM: mockWFM, + mockStore: mockStore, + handler: handler, + t: t, + } +} + +func (c *handlerTestController) tearDown() { + c.mockController.Finish() +} + // TestNewWorkflowDefinitionFromRequest tests the newWorkflowFromRequest helper func TestNewWorkflowDefinitionFromRequest(t *testing.T) { workflowReq := models.NewWorkflowDefinitionRequest{ @@ -83,3 +113,351 @@ func TestStartWorkflow(t *testing.T) { assert.NoError(t, err) } } + +func TestResumeWorkflowByID(t *testing.T) { + workflowDefinition := &models.WorkflowDefinition{ + StateMachine: &models.SLStateMachine{ + StartAt: "monkey-state", + States: map[string]models.SLState{ + "monkey-state": models.SLState{ + Type: models.SLStateTypeTask, + Next: "gorilla-state", + Resource: "resource-name", + }, + "gorilla-state": models.SLState{ + Type: models.SLStateTypeTask, + Resource: "lambda:resource-name", + End: true, + }, + }, + }, + } + + specs := []struct { + desc string + isError bool + startAt string + wf *models.Workflow + newWF *models.Workflow + }{ + { + desc: "uses the input from lastJob if StartAt == lastJob.State" + + " regardless of the existance of the jobs array", + isError: false, + startAt: "gorilla-state", + wf: &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "workflow-id", + Status: models.WorkflowStatusFailed, + WorkflowDefinition: workflowDefinition, + LastJob: &models.Job{ + State: "gorilla-state", + Input: `{"snack":"plum"}`, + Status: models.JobStatusFailed, + }, + }, + Jobs: nil, + }, + newWF: &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "new-workflow-id", + Status: models.WorkflowStatusQueued, + WorkflowDefinition: workflowDefinition, + Input: `{"snack":"plum"}`, + }, + }, + }, + { + desc: "finds the input from jobs array when StartAt != lastJob.State", + isError: false, + startAt: "monkey-state", + wf: &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "workflow-id", + Status: models.WorkflowStatusFailed, + WorkflowDefinition: workflowDefinition, + LastJob: &models.Job{ + State: "gorilla-state", + Input: `{"snack":"plum"}`, + Status: models.JobStatusFailed, + }, + }, + Jobs: []*models.Job{ + { + State: "monkey-state", + Input: `{"snack":"banana"}`, + Status: models.JobStatusSucceeded, + }, + { + State: "gorilla-state", + Input: `{"snack":"plum"}`, + Status: models.JobStatusFailed, + }, + }, + }, + newWF: &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "new-workflow-id", + Status: models.WorkflowStatusQueued, + WorkflowDefinition: workflowDefinition, + Input: `{"snack":"banana"}`, + }, + }, + }, + { + desc: "fails if there are no previous attempted jobs at StartAt state", + isError: true, + startAt: "gorilla-state", + wf: &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "workflow-id", + Status: models.WorkflowStatusFailed, + WorkflowDefinition: workflowDefinition, + LastJob: &models.Job{ + State: "monkey-state", + Input: `{"snack":"banana"}`, + Status: models.JobStatusFailed, + }, + }, + Jobs: []*models.Job{ + { + State: "monkey-state", + Input: `{"snack":"banana"}`, + Status: models.JobStatusFailed, + }, + }, + }, + newWF: &models.Workflow{}, + }, + { + desc: "fails if StartAt state doesn't exist in the workflow definition", + isError: true, + startAt: "invalid-state", + wf: &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "workflow-id", + Status: models.WorkflowStatusFailed, + WorkflowDefinition: workflowDefinition, + LastJob: &models.Job{ + State: "monkey-state", + Input: `{"snack":"banana"}`, + Status: models.JobStatusFailed, + }, + }, + Jobs: []*models.Job{ + { + State: "monkey-state", + Input: `{"snack":"banana"}`, + Status: models.JobStatusFailed, + }, + }, + }, + newWF: &models.Workflow{}, + }, + } + + for _, spec := range specs { + t.Run(spec.desc, func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := newSFNManagerTestController(t) + + c.mockStore.EXPECT(). + GetWorkflowByID(ctx, gomock.Eq(spec.wf.ID)). + Return(*spec.wf, nil). + Times(1) + + if !spec.isError { + c.mockWFM.EXPECT(). + RetryWorkflow(ctx, *spec.wf, spec.startAt, spec.newWF.Input). + Return(spec.newWF, nil). + Times(1) + } + + resumedWorkflow, err := c.handler.ResumeWorkflowByID( + ctx, + &models.ResumeWorkflowByIDInput{ + WorkflowID: spec.wf.ID, + Overrides: &models.WorkflowDefinitionOverrides{StartAt: spec.startAt}, + }, + ) + + if spec.isError { + require.Error(t, err) + } else { + require.NoError(t, err) + } + assert.Equal(t, spec.newWF, resumedWorkflow) + }) + } + + t.Run("loads the jobs array when it's missing and StartAt != lastJob.State", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := newSFNManagerTestController(t) + + startAt := "monkey-state" + + wf := &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "workflow-id", + Status: models.WorkflowStatusFailed, + WorkflowDefinition: workflowDefinition, + LastJob: &models.Job{ + State: "gorilla-state", + Input: `{"snack":"plum"}`, + Status: models.JobStatusFailed, + }, + }, + Jobs: nil, + } + + jobsToLoad := []*models.Job{ + { + State: "monkey-state", + Input: `{"snack":"banana"}`, + Status: models.JobStatusSucceeded, + }, + { + State: "gorilla-state", + Input: `{"snack":"plum"}`, + Status: models.JobStatusFailed, + }, + } + + newWF := &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "new-workflow-id", + Status: models.WorkflowStatusQueued, + WorkflowDefinition: workflowDefinition, + Input: `{"snack":"banana"}`, + }, + } + + c.mockStore.EXPECT(). + GetWorkflowByID(ctx, gomock.Eq(wf.ID)). + Return(*wf, nil). + Times(1) + + wfWithoutJobs := *wf + c.mockWFM.EXPECT(). + UpdateWorkflowHistory(ctx, &wfWithoutJobs). + Return(nil). + Times(1) + + wf.Jobs = jobsToLoad + c.mockStore.EXPECT(). + GetWorkflowByID(ctx, gomock.Eq(wf.ID)). + Return(*wf, nil). + Times(1) + + c.mockWFM.EXPECT(). + RetryWorkflow(ctx, *wf, startAt, newWF.Input). + Return(newWF, nil). + Times(1) + + resumedWorkflow, err := c.handler.ResumeWorkflowByID( + ctx, + &models.ResumeWorkflowByIDInput{ + WorkflowID: wf.ID, + Overrides: &models.WorkflowDefinitionOverrides{StartAt: startAt}, + }, + ) + + require.NoError(t, err) + assert.Equal(t, newWF, resumedWorkflow) + }) + + t.Run("fails if ResumeWorkflowByID is called on a running workflow", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := newSFNManagerTestController(t) + + wf := &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "workflow-id", + Status: models.WorkflowStatusRunning, + WorkflowDefinition: workflowDefinition, + }, + Jobs: []*models.Job{ + { + State: "monkey-state", + Input: `{"snack":"banana"}`, + Status: models.JobStatusRunning, + }, + }, + } + + c.mockStore.EXPECT(). + GetWorkflowByID(ctx, gomock.Eq(wf.ID)). + Return(*wf, nil). + Times(1) + + _, err := c.handler.ResumeWorkflowByID( + ctx, + &models.ResumeWorkflowByIDInput{ + WorkflowID: wf.ID, + Overrides: &models.WorkflowDefinitionOverrides{StartAt: "monkey-state"}, + }, + ) + require.Error(t, err) + }) + + t.Run( + "fails if ResumeWorkflowByID is called on a workflow with active retries", + func(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + c := newSFNManagerTestController(t) + + wfRetry := &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "retry-workflow-id", + Status: models.WorkflowStatusRunning, + WorkflowDefinition: workflowDefinition, + }, + Jobs: []*models.Job{ + { + State: "monkey-state", + Input: `{"snack":"banana"}`, + Status: models.JobStatusRunning, + }, + }, + } + + wf := &models.Workflow{ + WorkflowSummary: models.WorkflowSummary{ + ID: "workflow-id", + Status: models.WorkflowStatusFailed, + WorkflowDefinition: workflowDefinition, + Retries: []string{wfRetry.ID}, + }, + Jobs: []*models.Job{ + { + State: "monkey-state", + Input: `{"snack":"banana"}`, + Status: models.JobStatusFailed, + }, + }, + } + + c.mockStore.EXPECT(). + GetWorkflowByID(ctx, gomock.Eq(wf.ID)). + Return(*wf, nil). + Times(1) + + c.mockStore.EXPECT(). + GetWorkflowByID(ctx, gomock.Eq(wfRetry.ID)). + Return(*wfRetry, nil). + Times(1) + + _, err := c.handler.ResumeWorkflowByID( + ctx, + &models.ResumeWorkflowByIDInput{ + WorkflowID: wf.ID, + Overrides: &models.WorkflowDefinitionOverrides{StartAt: "monkey-state"}, + }, + ) + require.Error(t, err) + }) +}