Skip to content

Commit

Permalink
KS-198: Workflow Spec Approval (#13181)
Browse files Browse the repository at this point in the history
* KS-198: Workflow Spec Approval

* use job.Workflow spec, clarify code

* add test for auto approval happy path

* more tests

* linter

* better workflow spec validation

* linter
  • Loading branch information
krehermann authored May 21, 2024
1 parent 4938ef3 commit c14576a
Show file tree
Hide file tree
Showing 4 changed files with 211 additions and 11 deletions.
5 changes: 5 additions & 0 deletions .changeset/polite-yaks-smell.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

#added workflow spec auto-approval via CLO
65 changes: 60 additions & 5 deletions core/services/feeds/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/ocr"
ocr2 "github.com/smartcontractkit/chainlink/v2/core/services/ocr2/validate"
"github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"
)

Expand All @@ -50,6 +51,21 @@ var (
Help: "Metric to track job proposal requests",
})

promWorkflowRequests = promauto.NewCounter(prometheus.CounterOpts{
Name: "feeds_workflow_requests",
Help: "Metric to track workflow requests",
})

promWorkflowApprovals = promauto.NewCounter(prometheus.CounterOpts{
Name: "feeds_workflow_approvals",
Help: "Metric to track workflow successful auto approvals",
})

promWorkflowFailures = promauto.NewCounter(prometheus.CounterOpts{
Name: "feeds_workflow_rejections",
Help: "Metric to track workflow failed auto approvals",
})

promJobProposalCounts = promauto.NewGaugeVec(prometheus.GaugeOpts{
Name: "feeds_job_proposal_count",
Help: "Number of job proposals for the node partitioned by status.",
Expand Down Expand Up @@ -553,6 +569,7 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
}

if exists {
// note: CLO auto-increments the version number on re-proposal, so this should never happen
return 0, errors.New("proposed job spec version already exists")
}
}
Expand Down Expand Up @@ -596,9 +613,21 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
if err != nil {
return 0, err
}

// Track the given job proposal request
promJobProposalRequest.Inc()
// auto approve workflow specs
if isWFSpec(logger, args.Spec) {
promWorkflowRequests.Inc()
err = s.ApproveSpec(ctx, id, true)
if err != nil {
promWorkflowFailures.Inc()
logger.Errorw("Failed to auto approve workflow spec", "id", id, "err", err)
return 0, fmt.Errorf("failed to approve workflow spec %d: %w", id, err)
}
logger.Infow("Successful workflow spec auto approval", "id", id)
promWorkflowApprovals.Inc()
} else {
// Track the given job proposal request
promJobProposalRequest.Inc()
}

if err = s.observeJobProposalCounts(ctx); err != nil {
logger.Errorw("Failed to push metrics for propose job", err)
Expand All @@ -607,6 +636,16 @@ func (s *service) ProposeJob(ctx context.Context, args *ProposeJobArgs) (int64,
return id, nil
}

func isWFSpec(lggr logger.Logger, spec string) bool {
jobType, err := job.ValidateSpec(spec)
if err != nil {
// this should not happen in practice
lggr.Errorw("Failed to validate spec while checking for workflow", "err", err)
return false
}
return jobType == job.Workflow
}

// GetJobProposal gets a job proposal by id.
func (s *service) GetJobProposal(ctx context.Context, id int64) (*JobProposal, error) {
return s.orm.GetJobProposal(ctx, id)
Expand Down Expand Up @@ -761,6 +800,15 @@ func (s *service) ApproveSpec(ctx context.Context, id int64, force bool) error {
return errors.Wrap(txerr, "FindOCR2JobIDByAddress failed")
}
}
case job.Workflow:
existingJobID, txerr = findExistingWorkflowJob(ctx, *j.WorkflowSpec, tx.jobORM)
if txerr != nil {
// Return an error if the repository errors. If there is a not found
// error we want to continue with approving the job.
if !errors.Is(txerr, sql.ErrNoRows) {
return fmt.Errorf("failed while checking for existing workflow job: %w", txerr)
}
}
default:
return errors.Errorf("unsupported job type when approving job proposal specs: %s", j.Type)
}
Expand Down Expand Up @@ -1058,6 +1106,11 @@ func (s *service) observeJobProposalCounts(ctx context.Context) error {
return nil
}

// TODO KS-205 implement this. Need to figure out how exactly how we want to handle this.
func findExistingWorkflowJob(ctx context.Context, wfSpec job.WorkflowSpec, tx job.ORM) (int32, error) {
return 0, nil
}

// findExistingJobForOCR2 looks for existing job for OCR2
func findExistingJobForOCR2(ctx context.Context, j *job.Job, tx job.ORM) (int32, error) {
var contractID string
Expand All @@ -1073,7 +1126,7 @@ func findExistingJobForOCR2(ctx context.Context, j *job.Job, tx job.ORM) (int32,
feedID = j.BootstrapSpec.FeedID
}
case job.FluxMonitor, job.OffchainReporting:
return 0, errors.Errorf("contradID and feedID not applicable for job type: %s", j.Type)
return 0, errors.Errorf("contractID and feedID not applicable for job type: %s", j.Type)
default:
return 0, errors.Errorf("unsupported job type: %s", j.Type)
}
Expand Down Expand Up @@ -1106,7 +1159,7 @@ func findExistingJobForOCRFlux(ctx context.Context, j *job.Job, tx job.ORM) (int
func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error) {
jobType, err := job.ValidateSpec(spec)
if err != nil {
return nil, errors.Wrap(err, "failed to parse job spec TOML")
return nil, fmt.Errorf("failed to parse job spec TOML'%s': %w", spec, err)
}

var js job.Job
Expand All @@ -1128,6 +1181,8 @@ func (s *service) generateJob(ctx context.Context, spec string) (*job.Job, error
js, err = ocrbootstrap.ValidatedBootstrapSpecToml(spec)
case job.FluxMonitor:
js, err = fluxmonitorv2.ValidatedFluxMonitorSpec(s.jobCfg, spec)
case job.Workflow:
js, err = workflows.ValidatedWorkflowSpec(spec)
default:
return nil, errors.Errorf("unknown job type: %s", jobType)
}
Expand Down
133 changes: 133 additions & 0 deletions core/services/feeds/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/pipeline"
evmrelay "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"
"github.com/smartcontractkit/chainlink/v2/core/services/versioning"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"
)

Expand Down Expand Up @@ -638,6 +639,54 @@ func Test_Service_ProposeJob(t *testing.T) {
}

httpTimeout = *commonconfig.MustNewDuration(1 * time.Second)

// variables for workflow spec
wfID = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef"
wfOwner = "00000000000000000000000000000000000000aa"
specYaml = `
triggers:
- id: "a-trigger"
actions:
- id: "an-action"
ref: "an-action"
inputs:
trigger_output: $(trigger.outputs)
consensus:
- id: "a-consensus"
ref: "a-consensus"
inputs:
trigger_output: $(trigger.outputs)
an-action_output: $(an-action.outputs)
targets:
- id: "a-target"
ref: "a-target"
inputs:
consensus_output: $(a-consensus.outputs)
`
wfSpec = testspecs.GenerateWorkflowSpec(wfID, wfOwner, specYaml).Toml()
proposalIDWF = int64(11)
remoteUUIDWF = uuid.New()
argsWF = &feeds.ProposeJobArgs{
FeedsManagerID: 1,
RemoteUUID: remoteUUIDWF,
Spec: wfSpec,
Version: 1,
}
jpWF = feeds.JobProposal{
FeedsManagerID: 1,
Name: null.StringFrom("test-spec"),
RemoteUUID: remoteUUIDWF,
Status: feeds.JobProposalStatusPending,
}
proposalSpecWF = feeds.JobProposalSpec{
Definition: wfSpec,
Status: feeds.SpecStatusPending,
Version: 1,
JobProposalID: proposalIDWF,
}
)

testCases := []struct {
Expand All @@ -647,6 +696,90 @@ func Test_Service_ProposeJob(t *testing.T) {
wantID int64
wantErr string
}{
{
name: "Auto approve WF spec",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows)
svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil)
svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(int64(100), nil)
svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything)
transactCall.Run(func(args mock.Arguments) {
fn := args[1].(func(orm feeds.ORM) error)
transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)}
})
// Auto approve is really a call to ApproveJobProposal and so we have to mock that as well
svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.EXPECT().GetSpec(mock.Anything, proposalIDWF).Return(&proposalSpecWF, nil)
svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.spawner.
On("CreateJob",
mock.Anything,
mock.Anything,
mock.MatchedBy(func(j *job.Job) bool {
return j.WorkflowSpec.WorkflowOwner == wfOwner
}),
).
Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }).
Return(nil)
svc.orm.On("ApproveSpec",
mock.Anything,
proposalSpecWF.JobProposalID,
mock.IsType(uuid.UUID{}),
).Return(nil)
svc.fmsClient.On("ApprovedJob",
mock.MatchedBy(func(ctx context.Context) bool { return true }),
&proto.ApprovedJobRequest{
Uuid: jpWF.RemoteUUID.String(),
Version: int64(proposalSpecWF.Version),
},
).Return(&proto.ApprovedJobResponse{}, nil)
},
args: argsWF,
wantID: proposalIDWF,
},
{
name: "Auto approve WF spec: error creating job",
before: func(svc *TestService) {
svc.orm.On("GetJobProposalByRemoteUUID", mock.Anything, argsWF.RemoteUUID).Return(new(feeds.JobProposal), sql.ErrNoRows)
svc.orm.On("UpsertJobProposal", mock.Anything, &jpWF).Return(proposalIDWF, nil)
svc.orm.On("CreateSpec", mock.Anything, proposalSpecWF).Return(int64(100), nil)
// svc.orm.On("CountJobProposalsByStatus", mock.Anything).Return(&feeds.JobProposalCounts{}, nil)
transactCall := svc.orm.On("Transact", mock.Anything, mock.Anything)
transactCall.Run(func(args mock.Arguments) {
fn := args[1].(func(orm feeds.ORM) error)
transactCall.ReturnArguments = mock.Arguments{fn(svc.orm)}
})
// Auto approve is really a call to ApproveJobProposal and so we have to mock that as well
svc.connMgr.On("GetClient", argsWF.FeedsManagerID).Return(svc.fmsClient, nil)
svc.orm.EXPECT().GetSpec(mock.Anything, proposalIDWF).Return(&proposalSpecWF, nil)
svc.orm.EXPECT().GetJobProposal(mock.Anything, proposalSpecWF.JobProposalID).Return(&jpWF, nil)
svc.jobORM.On("AssertBridgesExist", mock.Anything, mock.IsType(pipeline.Pipeline{})).Return(nil)

svc.jobORM.On("FindJobByExternalJobID", mock.Anything, mock.Anything).Return(job.Job{}, sql.ErrNoRows) // TODO fix the external job id in wf spec generation
svc.orm.On("WithDataSource", mock.Anything).Return(feeds.ORM(svc.orm))
svc.jobORM.On("WithDataSource", mock.Anything).Return(job.ORM(svc.jobORM))
svc.spawner.
On("CreateJob",
mock.Anything,
mock.Anything,
mock.MatchedBy(func(j *job.Job) bool {
return j.WorkflowSpec.WorkflowOwner == wfOwner
}),
).
Run(func(args mock.Arguments) { (args.Get(2).(*job.Job)).ID = 1 }).
Return(fmt.Errorf("error creating job"))
},
args: argsWF,
wantID: 0,
wantErr: "error creating job",
},

{
name: "Create success (Flux Monitor)",
before: func(svc *TestService) {
Expand Down
19 changes: 13 additions & 6 deletions core/services/workflows/delegate.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,21 +122,28 @@ func ValidatedWorkflowSpec(tomlString string) (job.Job, error) {
if err != nil {
return jb, fmt.Errorf("toml unmarshal error on spec: %w", err)
}
if jb.Type != job.Workflow {
return jb, fmt.Errorf("unsupported type %s, expected %s", jb.Type, job.Workflow)
}

var spec job.WorkflowSpec
err = tree.Unmarshal(&spec)
if err != nil {
return jb, fmt.Errorf("toml unmarshal error on job: %w", err)
return jb, fmt.Errorf("toml unmarshal error on workflow spec: %w", err)
}

if err := spec.Validate(); err != nil {
return jb, err
err = spec.Validate()
if err != nil {
return jb, fmt.Errorf("invalid WorkflowSpec: %w", err)
}

jb.WorkflowSpec = &spec
if jb.Type != job.Workflow {
return jb, fmt.Errorf("unsupported type %s", jb.Type)
// ensure the embedded workflow graph is valid
_, err = Parse(spec.Workflow)
if err != nil {
return jb, fmt.Errorf("failed to parse workflow graph: %w", err)
}
jb.WorkflowSpec = &spec
jb.WorkflowSpecID = &spec.ID

return jb, nil
}

0 comments on commit c14576a

Please sign in to comment.