From b2427a714a0e5cb0614beb454d685e9fc269ebb3 Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Mon, 2 Dec 2024 10:29:00 -0800 Subject: [PATCH 1/2] [KS-590] Auto-approval for workflow spec deletion (#15414) --- core/services/feeds/service.go | 21 +++++++++++++ core/services/feeds/service_test.go | 47 ++++++++++++++++++++++++++++- 2 files changed, 67 insertions(+), 1 deletion(-) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index f986d1753af..6c527408883 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -510,6 +510,27 @@ func (s *service) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, er logger.Errorw("Failed to push metrics for job proposal deletion", "err", err) } + // auto-cancellation for Workflow specs + if !proposal.ExternalJobID.Valid { + logger.Infow("ExternalJobID is null", "id", proposal.ID, "name", proposal.Name) + return proposal.ID, nil + } + job, err := s.jobORM.FindJobByExternalJobID(ctx, proposal.ExternalJobID.UUID) + if err != nil { + // NOTE: at this stage, we don't know if this job is of Workflow type + // so we don't want to return an error + logger.Infow("FindJobByExternalJobID failed", "id", proposal.ID, "externalJobID", proposal.ExternalJobID.UUID, "name", proposal.Name) + return proposal.ID, nil + } + if job.WorkflowSpecID != nil { // this is a Workflow job + specID := int64(*job.WorkflowSpecID) + if err := s.CancelSpec(ctx, proposal.ID); err != nil { + logger.Errorw("Failed to auto-cancel workflow spec", "id", specID, "err", err, "name", job.Name) + return 0, fmt.Errorf("failed to auto-cancel workflow spec %d: %w", specID, err) + } + logger.Infow("Successfully auto-cancelled a workflow spec", "id", specID) + } + return proposal.ID, nil } diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 115695d8514..09a3ac4d705 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -1269,12 +1269,25 @@ func Test_Service_DeleteJob(t *testing.T) { } approved = feeds.JobProposal{ - ID: 1, + ID: 321, FeedsManagerID: 1, RemoteUUID: remoteUUID, + ExternalJobID: uuid.NullUUID{UUID: uuid.New(), Valid: true}, Status: feeds.JobProposalStatusApproved, } + wfSpecID = int32(4321) + workflowJob = job.Job{ + ID: 1, + WorkflowSpecID: &wfSpecID, + } + spec = &feeds.JobProposalSpec{ + ID: 20, + Status: feeds.SpecStatusApproved, + JobProposalID: approved.ID, + Version: 1, + } + httpTimeout = *commonconfig.MustNewDuration(1 * time.Second) ) @@ -1291,6 +1304,7 @@ func Test_Service_DeleteJob(t *testing.T) { svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, approved.RemoteUUID).Return(&approved, nil) svc.orm.On("DeleteProposal", mock.Anything, approved.ID).Return(nil) svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(job.Job{}, sql.ErrNoRows) }, args: args, wantID: approved.ID, @@ -1334,6 +1348,37 @@ func Test_Service_DeleteJob(t *testing.T) { args: args, wantErr: "DeleteProposal failed", }, + { + name: "Delete workflow-spec with auto-cancellation", + before: func(svc *TestService) { + svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, approved.RemoteUUID).Return(&approved, nil) + svc.orm.On("DeleteProposal", mock.Anything, approved.ID).Return(nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil) + + // mocks for CancelSpec() + svc.orm.On("GetSpec", mock.Anything, approved.ID).Return(spec, nil) + svc.orm.On("GetJobProposal", mock.Anything, approved.ID).Return(&approved, nil) + svc.connMgr.On("GetClient", mock.Anything).Return(svc.fmsClient, nil) + + svc.orm.On("CancelSpec", mock.Anything, approved.ID).Return(nil) + svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil) + svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, workflowJob.ID).Return(nil) + + svc.fmsClient.On("CancelledJob", + mock.MatchedBy(func(ctx context.Context) bool { return true }), + &proto.CancelledJobRequest{ + Uuid: approved.RemoteUUID.String(), + Version: int64(spec.Version), + }, + ).Return(&proto.CancelledJobResponse{}, nil) + svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) + svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm)) + svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM)) + }, + args: args, + wantID: approved.ID, + }, } for _, tc := range testCases { From 34367295e371b189f0670a97be7b81a0ff60bc33 Mon Sep 17 00:00:00 2001 From: Bolek <1416262+bolekk@users.noreply.github.com> Date: Wed, 4 Dec 2024 06:36:04 -0800 Subject: [PATCH 2/2] [KS-590][bugfix] Auto-approval of workflow deletion - use correct ID (#15499) --- core/services/feeds/service.go | 16 +++++++++++----- core/services/feeds/service_test.go | 9 +++++---- 2 files changed, 16 insertions(+), 9 deletions(-) diff --git a/core/services/feeds/service.go b/core/services/feeds/service.go index 6c527408883..ede9362995e 100644 --- a/core/services/feeds/service.go +++ b/core/services/feeds/service.go @@ -523,12 +523,18 @@ func (s *service) DeleteJob(ctx context.Context, args *DeleteJobArgs) (int64, er return proposal.ID, nil } if job.WorkflowSpecID != nil { // this is a Workflow job - specID := int64(*job.WorkflowSpecID) - if err := s.CancelSpec(ctx, proposal.ID); err != nil { - logger.Errorw("Failed to auto-cancel workflow spec", "id", specID, "err", err, "name", job.Name) - return 0, fmt.Errorf("failed to auto-cancel workflow spec %d: %w", specID, err) + jobSpecID := int64(*job.WorkflowSpecID) + jpSpec, err2 := s.orm.GetApprovedSpec(ctx, proposal.ID) + if err2 != nil { + logger.Errorw("GetApprovedSpec failed - no approved specs to cancel?", "id", proposal.ID, "err", err2, "name", job.Name) + // return success if there are no approved specs to cancel + return proposal.ID, nil } - logger.Infow("Successfully auto-cancelled a workflow spec", "id", specID) + if err := s.CancelSpec(ctx, jpSpec.ID); err != nil { + logger.Errorw("Failed to auto-cancel workflow spec", "jobProposalID", proposal.ID, "jobProposalSpecID", jpSpec.ID, "jobSpecID", jobSpecID, "err", err, "name", job.Name) + return 0, fmt.Errorf("failed to auto-cancel workflow spec (job proposal spec ID: %d): %w", jpSpec.ID, err) + } + logger.Infow("Successfully auto-cancelled a workflow spec", "jobProposalID", proposal.ID, "jobProposalSpecID", jpSpec.ID, "jobSpecID", jobSpecID, "name", job.Name) } return proposal.ID, nil diff --git a/core/services/feeds/service_test.go b/core/services/feeds/service_test.go index 09a3ac4d705..68af051ef0d 100644 --- a/core/services/feeds/service_test.go +++ b/core/services/feeds/service_test.go @@ -1281,7 +1281,7 @@ func Test_Service_DeleteJob(t *testing.T) { ID: 1, WorkflowSpecID: &wfSpecID, } - spec = &feeds.JobProposalSpec{ + jobProposalSpec = &feeds.JobProposalSpec{ ID: 20, Status: feeds.SpecStatusApproved, JobProposalID: approved.ID, @@ -1355,13 +1355,14 @@ func Test_Service_DeleteJob(t *testing.T) { svc.orm.On("DeleteProposal", mock.Anything, approved.ID).Return(nil) svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil) + svc.orm.On("GetApprovedSpec", mock.Anything, approved.ID).Return(jobProposalSpec, nil) // mocks for CancelSpec() - svc.orm.On("GetSpec", mock.Anything, approved.ID).Return(spec, nil) + svc.orm.On("GetSpec", mock.Anything, jobProposalSpec.ID).Return(jobProposalSpec, nil) svc.orm.On("GetJobProposal", mock.Anything, approved.ID).Return(&approved, nil) svc.connMgr.On("GetClient", mock.Anything).Return(svc.fmsClient, nil) - svc.orm.On("CancelSpec", mock.Anything, approved.ID).Return(nil) + svc.orm.On("CancelSpec", mock.Anything, jobProposalSpec.ID).Return(nil) svc.jobORM.On("FindJobByExternalJobID", mock.Anything, approved.ExternalJobID.UUID).Return(workflowJob, nil) svc.spawner.On("DeleteJob", mock.Anything, mock.Anything, workflowJob.ID).Return(nil) @@ -1369,7 +1370,7 @@ func Test_Service_DeleteJob(t *testing.T) { mock.MatchedBy(func(ctx context.Context) bool { return true }), &proto.CancelledJobRequest{ Uuid: approved.RemoteUUID.String(), - Version: int64(spec.Version), + Version: int64(jobProposalSpec.Version), }, ).Return(&proto.CancelledJobResponse{}, nil) svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)