Skip to content

Commit

Permalink
Revert "feat(workflows): adds registry syncer (#15277)"
Browse files Browse the repository at this point in the history
This reverts commit 64aeef9.
  • Loading branch information
justinkaseman committed Nov 21, 2024
1 parent 93b05ee commit 1608708
Show file tree
Hide file tree
Showing 19 changed files with 25 additions and 1,935 deletions.
15 changes: 0 additions & 15 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -579,21 +579,6 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer:
interfaces:
ORM:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: handler_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
github.com/smartcontractkit/chainlink/v2/core/capabilities/targets:
interfaces:
ContractValueGetter:
42 changes: 9 additions & 33 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/relay"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
"github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight"
)
Expand Down Expand Up @@ -1874,7 +1873,6 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
c.ID = s.ID
c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name
c.SpecType = job.YamlSpec
c.SecretsID = s.SecretsID
return mustInsertWFJob(t, o, &c)
},
},
Expand All @@ -1894,40 +1892,29 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
var c job.WorkflowSpec
c.ID = s.ID
c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow03", addr2) // insert with mismatched owner
c.SecretsID = s.SecretsID
return mustInsertWFJob(t, o, &c)
},
},
wantErr: true,
},
}

for i, tt := range tests {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := testutils.Context(t)
ks := cltest.NewKeyStore(t, tt.fields.ds)

secretsORM := syncer.NewWorkflowRegistryDS(tt.fields.ds, logger.TestLogger(t))

sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz")
require.NoError(t, err)
tt.args.spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true}

pipelineORM := pipeline.NewORM(tt.fields.ds, logger.TestLogger(t), configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(tt.fields.ds)
o := NewTestORM(t, tt.fields.ds, pipelineORM, bridgesORM, ks)

var wantJobID int32
if tt.args.before != nil {
wantJobID = tt.args.before(t, o, tt.args.spec)
}

ctx := testutils.Context(t)
gotJ, err := o.FindJobIDByWorkflow(ctx, *tt.args.spec)
if (err != nil) != tt.wantErr {
t.Errorf("orm.FindJobByWorkflow() error = %v, wantErr %v", err, tt.wantErr)
return
}

if err == nil {
assert.Equal(t, wantJobID, gotJ, "mismatch job id")
}
Expand All @@ -1949,36 +1936,25 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {
bridges.NewORM(db),
cltest.NewKeyStore(t, db))
ctx := testutils.Context(t)
secretsORM := syncer.NewWorkflowRegistryDS(db, logger.TestLogger(t))

var sids []int64
for i := 0; i < 3; i++ {
sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz")
require.NoError(t, err)
sids = append(sids, sid)
}

wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1)
s1 := job.WorkflowSpec{
Workflow: wfYaml1,
SpecType: job.YamlSpec,
SecretsID: sql.NullInt64{Int64: sids[0], Valid: true},
Workflow: wfYaml1,
SpecType: job.YamlSpec,
}
wantJobID1 := mustInsertWFJob(t, o, &s1)

wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1)
s2 := job.WorkflowSpec{
Workflow: wfYaml2,
SpecType: job.YamlSpec,
SecretsID: sql.NullInt64{Int64: sids[1], Valid: true},
Workflow: wfYaml2,
SpecType: job.YamlSpec,
}
wantJobID2 := mustInsertWFJob(t, o, &s2)

wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2)
s3 := job.WorkflowSpec{
Workflow: wfYaml3,
SpecType: job.YamlSpec,
SecretsID: sql.NullInt64{Int64: sids[2], Valid: true},
Workflow: wfYaml3,
SpecType: job.YamlSpec,
}
wantJobID3 := mustInsertWFJob(t, o, &s3)

Expand Down Expand Up @@ -2016,7 +1992,7 @@ func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 {
}

err = orm.CreateJob(ctx, &j)
require.NoError(t, err, "failed to insert job with wf spec %+v %s", s, err)
require.NoError(t, err, "failed to insert job with wf spec %v %s", s, s.Workflow)
return j.ID
}

Expand Down
4 changes: 0 additions & 4 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package job

import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -878,9 +877,6 @@ type WorkflowSpec struct {
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
BinaryURL string `db:"binary_url"`
ConfigURL string `db:"config_url"`
SecretsID sql.NullInt64 `db:"secrets_id"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
Expand Down
4 changes: 2 additions & 2 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
case Stream:
// 'stream' type has no associated spec, nothing to do here
case Workflow:
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, binary_url, config_url, secrets_id, created_at, updated_at, spec_type, config)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, :binary_url, :config_url, :secrets_id, NOW(), NOW(), :spec_type, :config)
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type, config)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type, :config)
RETURNING id;`
specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec)
if err != nil {
Expand Down

This file was deleted.

4 changes: 2 additions & 2 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 {
}

type secretsFetcher interface {
SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error)
SecretsFor(workflowOwner, workflowName string) (map[string]string, error)
}

// Engine handles the lifecycle of a single workflow and its executions.
Expand Down Expand Up @@ -850,7 +850,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value
// registry (for capability-level configuration). It doesn't perform any caching of the config values, since
// the two registries perform their own caching.
func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) {
secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name)
secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name)
if err != nil {
return nil, fmt.Errorf("failed to fetch secrets: %w", err)
}
Expand Down
Loading

0 comments on commit 1608708

Please sign in to comment.