diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index f986d1753af..ede9362995e 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -510,6 +510,33 @@ func (s *service) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, er logger.Errorw("Failed to push metrics for job proposal deletion", "err", err) } + // auto-cancellation for Workflow specs + if !proposal.ExternalJobID.Valid { + logger.Infow("ExternalJobID is null", "id", proposal.ID, "name", proposal.Name) + return proposal.ID, nil + } + job, err := s.jobORM.FindJobByExternalJobID(ctx, proposal.ExternalJobID.UUID) + if err != nil { + // NOTE: at this stage, we don't know if this job is of Workflow type + // so we don't want to return an error + logger.Infow("FindJobByExternalJobID failed", "id", proposal.ID, "externalJobID", proposal.ExternalJobID.UUID, "name", proposal.Name) + return proposal.ID, nil + } + if job.WorkflowSpecID != nil { // this is a Workflow job + jobSpecID := int64(*job.WorkflowSpecID) + jpSpec, err2 := s.orm.GetApprovedSpec(ctx, proposal.ID) + if err2 != nil { + logger.Errorw("GetApprovedSpec failed - no approved specs to cancel?", "id", proposal.ID, "err", err2, "name", job.Name) + // return success if there are no approved specs to cancel + return proposal.ID, nil + } + if err := s.CancelSpec(ctx, jpSpec.ID); err != nil { + logger.Errorw("Failed to auto-cancel workflow spec", "jobProposalID", proposal.ID, "jobProposalSpecID", jpSpec.ID, "jobSpecID", jobSpecID, "err", err, "name", job.Name) + return 0, fmt.Errorf("failed to auto-cancel workflow spec (job proposal spec ID: %d): %w", jpSpec.ID, err) + } + logger.Infow("Successfully auto-cancelled a workflow spec", "jobProposalID", proposal.ID, "jobProposalSpecID", jpSpec.ID, "jobSpecID", jobSpecID, "name", job.Name) + } + return proposal.ID, nil } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 115695d8514..68af051ef0d 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -1269,12 +1269,25 @@ func Test_Service_DeleteJob(t *testing.T) { } approved = feeds.JobProposal{ - ID: 1, + ID: 321, FeedsManagerID: 1, RemoteUUID: remoteUUID, + ExternalJobID: uuid.NullUUID{UUID: uuid.New(), Valid: true}, Status: feeds.JobProposalStatusApproved, } + wfSpecID = int32(4321) + workflowJob = job.Job{ + ID: 1, + WorkflowSpecID: &wfSpecID, + } + jobProposalSpec = &feeds.JobProposalSpec{ + ID: 20, + Status: feeds.SpecStatusApproved, + JobProposalID: approved.ID, + Version: 1, + } + httpTimeout = *commonconfig.MustNewDuration(1 * time.Second) ) @@ -1291,6 +1304,7 @@ func Test_Service_DeleteJob(t *testing.T) { svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, approved.RemoteUUID).Return(&approved, nil) svc.orm.On("DeleteProposal", mock.Anything, approved.ID).Return(nil) svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(job.Job{}, sql.ErrNoRows) }, args: args, wantID: approved.ID, @@ -1334,6 +1348,38 @@ func Test_Service_DeleteJob(t *testing.T) { args: args, wantErr: "DeleteProposal failed", }, + { + name: "Delete workflow-spec with auto-cancellation", + before: func(svc *TestService) { + svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, approved.RemoteUUID).Return(&approved, nil) + svc.orm.On("DeleteProposal", mock.Anything, approved.ID).Return(nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil) + svc.orm.On("GetApprovedSpec", mock.Anything, approved.ID).Return(jobProposalSpec, nil) + + // mocks for CancelSpec() + svc.orm.On("GetSpec", mock.Anything, jobProposalSpec.ID).Return(jobProposalSpec, nil) + svc.orm.On("GetJobProposal", mock.Anything, approved.ID).Return(&approved, nil) + svc.connMgr.On("GetClient", mock.Anything).Return(svc.fmsClient, nil) + + svc.orm.On("CancelSpec", mock.Anything, jobProposalSpec.ID).Return(nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil) + svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, workflowJob.ID).Return(nil) + + svc.fmsClient.On("CancelledJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.CancelledJobRequest{ + Uuid: approved.RemoteUUID.String(), + Version: int64(jobProposalSpec.Version), + }, + ).Return(&proto.CancelledJobResponse{}, nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + args: args, + wantID: approved.ID, + }, } for _, tc := range testCases {