Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(workflows): adds registry syncer #15277

Merged
merged 4 commits into from
Nov 21, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 15 additions & 0 deletions .mockery.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -579,6 +579,21 @@ packages:
github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer:
interfaces:
ORM:
github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer:
interfaces:
ORM:
ContractReader:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: contract_reader_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
Handler:
config:
mockname: "Mock{{ .InterfaceName }}"
filename: handler_mock.go
inpackage: true
dir: "{{ .InterfaceDir }}"
github.com/smartcontractkit/chainlink/v2/core/capabilities/targets:
interfaces:
ContractValueGetter:
2 changes: 1 addition & 1 deletion core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -215,7 +215,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

// TODO: wire this up to config so we only instantiate it
// if a workflow registry address is provided.
workflowRegistrySyncer := syncer.NewWorkflowRegistry()
workflowRegistrySyncer := syncer.NewNullWorkflowRegistrySyncer()
srvcs = append(srvcs, workflowRegistrySyncer)

var externalPeerWrapper p2ptypes.PeerWrapper
Expand Down
42 changes: 33 additions & 9 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services/relay"
"github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon"
"github.com/smartcontractkit/chainlink/v2/core/services/webhook"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs"
"github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight"
)
Expand Down Expand Up @@ -1873,6 +1874,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
c.ID = s.ID
c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name
c.SpecType = job.YamlSpec
c.SecretsID = s.SecretsID
return mustInsertWFJob(t, o, &c)
},
},
Expand All @@ -1892,29 +1894,40 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) {
var c job.WorkflowSpec
c.ID = s.ID
c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow03", addr2) // insert with mismatched owner
c.SecretsID = s.SecretsID
return mustInsertWFJob(t, o, &c)
},
},
wantErr: true,
},
}

for _, tt := range tests {
for i, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
ctx := testutils.Context(t)
ks := cltest.NewKeyStore(t, tt.fields.ds)

secretsORM := syncer.NewWorkflowRegistryDS(tt.fields.ds, logger.TestLogger(t))

sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz")
require.NoError(t, err)
tt.args.spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true}

pipelineORM := pipeline.NewORM(tt.fields.ds, logger.TestLogger(t), configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(tt.fields.ds)
o := NewTestORM(t, tt.fields.ds, pipelineORM, bridgesORM, ks)

var wantJobID int32
if tt.args.before != nil {
wantJobID = tt.args.before(t, o, tt.args.spec)
}
ctx := testutils.Context(t)

gotJ, err := o.FindJobIDByWorkflow(ctx, *tt.args.spec)
if (err != nil) != tt.wantErr {
t.Errorf("orm.FindJobByWorkflow() error = %v, wantErr %v", err, tt.wantErr)
return
}

if err == nil {
assert.Equal(t, wantJobID, gotJ, "mismatch job id")
}
Expand All @@ -1936,25 +1949,36 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) {
bridges.NewORM(db),
cltest.NewKeyStore(t, db))
ctx := testutils.Context(t)
secretsORM := syncer.NewWorkflowRegistryDS(db, logger.TestLogger(t))

var sids []int64
for i := 0; i < 3; i++ {
sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz")
require.NoError(t, err)
sids = append(sids, sid)
}

wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1)
s1 := job.WorkflowSpec{
Workflow: wfYaml1,
SpecType: job.YamlSpec,
Workflow: wfYaml1,
SpecType: job.YamlSpec,
SecretsID: sql.NullInt64{Int64: sids[0], Valid: true},
}
wantJobID1 := mustInsertWFJob(t, o, &s1)

wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1)
s2 := job.WorkflowSpec{
Workflow: wfYaml2,
SpecType: job.YamlSpec,
Workflow: wfYaml2,
SpecType: job.YamlSpec,
SecretsID: sql.NullInt64{Int64: sids[1], Valid: true},
}
wantJobID2 := mustInsertWFJob(t, o, &s2)

wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2)
s3 := job.WorkflowSpec{
Workflow: wfYaml3,
SpecType: job.YamlSpec,
Workflow: wfYaml3,
SpecType: job.YamlSpec,
SecretsID: sql.NullInt64{Int64: sids[2], Valid: true},
}
wantJobID3 := mustInsertWFJob(t, o, &s3)

Expand Down Expand Up @@ -1992,7 +2016,7 @@ func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 {
}

err = orm.CreateJob(ctx, &j)
require.NoError(t, err, "failed to insert job with wf spec %v %s", s, s.Workflow)
require.NoError(t, err, "failed to insert job with wf spec %+v %s", s, err)
return j.ID
}

Expand Down
4 changes: 4 additions & 0 deletions core/services/job/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package job

import (
"context"
"database/sql"
"database/sql/driver"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -877,6 +878,9 @@ type WorkflowSpec struct {
WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow.
WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow.
WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow.
BinaryURL string `db:"binary_url"`
ConfigURL string `db:"config_url"`
SecretsID sql.NullInt64 `db:"secrets_id"`
CreatedAt time.Time `toml:"-"`
UpdatedAt time.Time `toml:"-"`
SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"`
Expand Down
4 changes: 2 additions & 2 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -433,8 +433,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
case Stream:
// 'stream' type has no associated spec, nothing to do here
case Workflow:
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type, config)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type, :config)
sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, binary_url, config_url, secrets_id, created_at, updated_at, spec_type, config)
VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, :binary_url, :config_url, :secrets_id, NOW(), NOW(), :spec_type, :config)
RETURNING id;`
specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec)
if err != nil {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,218 @@
package workflow_registry_syncer_test

import (
"context"
"crypto/rand"
"encoding/hex"
"encoding/json"
"testing"
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"

"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper"
coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer"
"github.com/smartcontractkit/chainlink/v2/core/utils/crypto"

"github.com/stretchr/testify/require"
)

func Test_SecretsWorker(t *testing.T) {
var (
ctx = coretestutils.Context(t)
lggr = logger.TestLogger(t)
backendTH = testutils.NewEVMBackendTH(t)
db = pgtest.NewSqlxDB(t)
orm = syncer.NewWorkflowRegistryDS(db, lggr)

giveTicker = time.NewTicker(500 * time.Millisecond)
giveSecretsURL = "https://original-url.com"
donID = uint32(1)
giveWorkflow = RegisterWorkflowCMD{
Name: "test-wf",
DonID: donID,
Status: uint8(1),
SecretsURL: giveSecretsURL,
}
giveContents = "contents"
wantContents = "updated contents"
fetcherFn = func(_ context.Context, _ string) ([]byte, error) {
return []byte(wantContents), nil
}
contractName = syncer.ContractName
forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent)
)

defer giveTicker.Stop()

// fill ID with randomd data
var giveID [32]byte
_, err := rand.Read((giveID)[:])
require.NoError(t, err)
giveWorkflow.ID = giveID

// Deploy a test workflow_registry
wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client())
backendTH.Backend.Commit()
require.NoError(t, err)

lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex())

// Build the ContractReader config
contractReaderCfg := evmtypes.ChainReaderConfig{
Contracts: map[string]evmtypes.ChainContractReader{
contractName: {
ContractPollingFilter: evmtypes.ContractPollingFilter{
GenericEventNames: []string{forceUpdateSecretsEvent},
},
ContractABI: workflow_registry_wrapper.WorkflowRegistryABI,
Configs: map[string]*evmtypes.ChainReaderDefinition{
forceUpdateSecretsEvent: {
ChainSpecificName: forceUpdateSecretsEvent,
ReadType: evmtypes.Event,
},
},
},
},
}

contractReaderCfgBytes, err := json.Marshal(contractReaderCfg)
require.NoError(t, err)

contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes)
require.NoError(t, err)

err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}})
require.NoError(t, err)

// Seed the DB
hash, err := crypto.Keccak256(append(backendTH.ContractsOwner.From[:], []byte(giveSecretsURL)...))
require.NoError(t, err)
giveHash := hex.EncodeToString(hash)

gotID, err := orm.Create(ctx, giveSecretsURL, giveHash, giveContents)
require.NoError(t, err)

gotSecretsURL, err := orm.GetSecretsURLByID(ctx, gotID)
require.NoError(t, err)
require.Equal(t, giveSecretsURL, gotSecretsURL)

// verify the DB
contents, err := orm.GetContents(ctx, giveSecretsURL)
require.NoError(t, err)
require.Equal(t, contents, giveContents)

// Create the worker
worker := syncer.NewWorkflowRegistry(
lggr,
orm,
contractReader,
fetcherFn,
wfRegistryAddr.Hex(),
syncer.WithTicker(giveTicker.C),
)

servicetest.Run(t, worker)

// setup contract state to allow the secrets to be updated
updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true)
updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true)
registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow)

// generate a log event
requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL)

// Require the secrets contents to eventually be updated
require.Eventually(t, func() bool {
secrets, err := orm.GetContents(ctx, giveSecretsURL)
lggr.Debugf("got secrets %v", secrets)
require.NoError(t, err)
return secrets == wantContents
}, 5*time.Second, time.Second)
}

func updateAuthorizedAddress(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
addresses []common.Address,
_ bool,
) {
t.Helper()
_, err := wfRegC.UpdateAuthorizedAddresses(th.ContractsOwner, addresses, true)
require.NoError(t, err, "failed to update authorised addresses")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
gotAddresses, err := wfRegC.GetAllAuthorizedAddresses(&bind.CallOpts{
From: th.ContractsOwner.From,
})
require.NoError(t, err)
require.ElementsMatch(t, addresses, gotAddresses)
}

func updateAllowedDONs(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
donIDs []uint32,
allowed bool,
) {
t.Helper()
_, err := wfRegC.UpdateAllowedDONs(th.ContractsOwner, donIDs, allowed)
require.NoError(t, err, "failed to update DONs")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
gotDons, err := wfRegC.GetAllAllowedDONs(&bind.CallOpts{
From: th.ContractsOwner.From,
})
require.NoError(t, err)
require.ElementsMatch(t, donIDs, gotDons)
}

type RegisterWorkflowCMD struct {
Name string
ID [32]byte
DonID uint32
Status uint8
BinaryURL string
ConfigURL string
SecretsURL string
}

func registerWorkflow(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
input RegisterWorkflowCMD,
) {
t.Helper()
_, err := wfRegC.RegisterWorkflow(th.ContractsOwner, input.Name, input.ID, input.DonID,
input.Status, input.BinaryURL, input.ConfigURL, input.SecretsURL)
require.NoError(t, err, "failed to register workflow")
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}

func requestForceUpdateSecrets(
t *testing.T,
th *testutils.EVMBackendTH,
wfRegC *workflow_registry_wrapper.WorkflowRegistry,
secretsURL string,
) {
_, err := wfRegC.RequestForceUpdateSecrets(th.ContractsOwner, secretsURL)
require.NoError(t, err)
th.Backend.Commit()
th.Backend.Commit()
th.Backend.Commit()
}
Loading
Loading