From fb2bdd188d80e2bec09652e1dd08c52062481a0a Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Mon, 18 Nov 2024 01:25:59 +0200 Subject: [PATCH 1/4] feat(workflows): adds registry syncer --- .mockery.yaml | 15 + core/services/chainlink/application.go | 2 +- core/services/job/models.go | 3 + core/services/workflows/engine.go | 4 +- core/services/workflows/engine_test.go | 4 +- .../workflows/syncer/contract_reader_mock.go | 148 +++++ core/services/workflows/syncer/handler.go | 72 +++ .../services/workflows/syncer/handler_test.go | 136 +++++ core/services/workflows/syncer/heap.go | 63 ++ core/services/workflows/syncer/mocks/orm.go | 440 ++++++++++++++ core/services/workflows/syncer/orm.go | 139 +++++ core/services/workflows/syncer/orm_test.go | 53 ++ .../workflows/syncer/workflow_registry.go | 572 +++++++++++++++++- .../syncer/workflow_registry_test.go | 104 ++++ .../migrations/0259_add_workflow_secrets.sql | 41 ++ core/utils/crypto/keccak_256.go | 16 + core/utils/matches/matches.go | 21 + core/utils/signalers/signalers.go | 24 + 18 files changed, 1840 insertions(+), 17 deletions(-) create mode 100644 core/services/workflows/syncer/contract_reader_mock.go create mode 100644 core/services/workflows/syncer/handler.go create mode 100644 core/services/workflows/syncer/handler_test.go create mode 100644 core/services/workflows/syncer/heap.go create mode 100644 core/services/workflows/syncer/mocks/orm.go create mode 100644 core/services/workflows/syncer/orm.go create mode 100644 core/services/workflows/syncer/orm_test.go create mode 100644 core/services/workflows/syncer/workflow_registry_test.go create mode 100644 core/store/migrate/migrations/0259_add_workflow_secrets.sql create mode 100644 core/utils/crypto/keccak_256.go create mode 100644 core/utils/matches/matches.go create mode 100644 core/utils/signalers/signalers.go diff --git a/.mockery.yaml b/.mockery.yaml index 711d70f59e9..70b7a9947f6 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -579,6 +579,21 @@ packages: github.com/smartcontractkit/chainlink/v2/core/services/registrysyncer: interfaces: ORM: + github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer: + interfaces: + ORM: + ContractReader: + config: + mockname: "Mock{{ .InterfaceName }}" + filename: contract_reader_mock.go + inpackage: true + dir: "{{ .InterfaceDir }}" + Handler: + config: + mockname: "Mock{{ .InterfaceName }}" + filename: handler_mock.go + inpackage: true + dir: "{{ .InterfaceDir }}" github.com/smartcontractkit/chainlink/v2/core/capabilities/targets: interfaces: ContractValueGetter: \ No newline at end of file diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index abbe9dad9ab..fef741c8c9b 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -215,7 +215,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { // TODO: wire this up to config so we only instantiate it // if a workflow registry address is provided. - workflowRegistrySyncer := syncer.NewWorkflowRegistry() + workflowRegistrySyncer := syncer.NewNullWorkflowRegistrySyncer() srvcs = append(srvcs, workflowRegistrySyncer) var externalPeerWrapper p2ptypes.PeerWrapper diff --git a/core/services/job/models.go b/core/services/job/models.go index 231bf10fda0..84ff2f5d7f1 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -877,6 +877,9 @@ type WorkflowSpec struct { WorkflowID string `toml:"-" db:"workflow_id"` // Derived. Do not modify. the CID of the workflow. WorkflowOwner string `toml:"-" db:"workflow_owner"` // Derived. Do not modify. the owner of the workflow. WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. + BinaryURL string `db:"binary_url"` + ConfigURL string `db:"config_url"` + SecretsID string `db:"secrets_id"` CreatedAt time.Time `toml:"-"` UpdatedAt time.Time `toml:"-"` SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index b958e171c0c..008b29d0fe8 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -92,7 +92,7 @@ func (sucm *stepUpdateManager) len() int64 { } type secretsFetcher interface { - SecretsFor(workflowOwner, workflowName string) (map[string]string, error) + SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) } // Engine handles the lifecycle of a single workflow and its executions. @@ -849,7 +849,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value // registry (for capability-level configuration). It doesn't perform any caching of the config values, since // the two registries perform their own caching. func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) { - secrets, err := e.secretsFetcher.SecretsFor(e.workflow.owner, e.workflow.name) + secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name) if err != nil { return nil, fmt.Errorf("failed to fetch secrets: %w", err) } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 3a2bc17bc36..290180db834 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -152,7 +152,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string, type mockSecretsFetcher struct{} -func (s mockSecretsFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) { +func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { return map[string]string{}, nil } @@ -1605,7 +1605,7 @@ type mockFetcher struct { retval map[string]string } -func (m *mockFetcher) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) { +func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { return m.retval, nil } diff --git a/core/services/workflows/syncer/contract_reader_mock.go b/core/services/workflows/syncer/contract_reader_mock.go new file mode 100644 index 00000000000..61f59fa4e69 --- /dev/null +++ b/core/services/workflows/syncer/contract_reader_mock.go @@ -0,0 +1,148 @@ +// Code generated by mockery v2.46.3. DO NOT EDIT. + +package syncer + +import ( + context "context" + + query "github.com/smartcontractkit/chainlink-common/pkg/types/query" + mock "github.com/stretchr/testify/mock" + + types "github.com/smartcontractkit/chainlink-common/pkg/types" +) + +// MockContractReader is an autogenerated mock type for the ContractReader type +type MockContractReader struct { + mock.Mock +} + +type MockContractReader_Expecter struct { + mock *mock.Mock +} + +func (_m *MockContractReader) EXPECT() *MockContractReader_Expecter { + return &MockContractReader_Expecter{mock: &_m.Mock} +} + +// Bind provides a mock function with given fields: _a0, _a1 +func (_m *MockContractReader) Bind(_a0 context.Context, _a1 []types.BoundContract) error { + ret := _m.Called(_a0, _a1) + + if len(ret) == 0 { + panic("no return value specified for Bind") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context, []types.BoundContract) error); ok { + r0 = rf(_a0, _a1) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// MockContractReader_Bind_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Bind' +type MockContractReader_Bind_Call struct { + *mock.Call +} + +// Bind is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 []types.BoundContract +func (_e *MockContractReader_Expecter) Bind(_a0 interface{}, _a1 interface{}) *MockContractReader_Bind_Call { + return &MockContractReader_Bind_Call{Call: _e.mock.On("Bind", _a0, _a1)} +} + +func (_c *MockContractReader_Bind_Call) Run(run func(_a0 context.Context, _a1 []types.BoundContract)) *MockContractReader_Bind_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].([]types.BoundContract)) + }) + return _c +} + +func (_c *MockContractReader_Bind_Call) Return(_a0 error) *MockContractReader_Bind_Call { + _c.Call.Return(_a0) + return _c +} + +func (_c *MockContractReader_Bind_Call) RunAndReturn(run func(context.Context, []types.BoundContract) error) *MockContractReader_Bind_Call { + _c.Call.Return(run) + return _c +} + +// QueryKey provides a mock function with given fields: _a0, _a1, _a2, _a3, _a4 +func (_m *MockContractReader) QueryKey(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any) ([]types.Sequence, error) { + ret := _m.Called(_a0, _a1, _a2, _a3, _a4) + + if len(ret) == 0 { + panic("no return value specified for QueryKey") + } + + var r0 []types.Sequence + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)); ok { + return rf(_a0, _a1, _a2, _a3, _a4) + } + if rf, ok := ret.Get(0).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) []types.Sequence); ok { + r0 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]types.Sequence) + } + } + + if rf, ok := ret.Get(1).(func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) error); ok { + r1 = rf(_a0, _a1, _a2, _a3, _a4) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// MockContractReader_QueryKey_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'QueryKey' +type MockContractReader_QueryKey_Call struct { + *mock.Call +} + +// QueryKey is a helper method to define mock.On call +// - _a0 context.Context +// - _a1 types.BoundContract +// - _a2 query.KeyFilter +// - _a3 query.LimitAndSort +// - _a4 any +func (_e *MockContractReader_Expecter) QueryKey(_a0 interface{}, _a1 interface{}, _a2 interface{}, _a3 interface{}, _a4 interface{}) *MockContractReader_QueryKey_Call { + return &MockContractReader_QueryKey_Call{Call: _e.mock.On("QueryKey", _a0, _a1, _a2, _a3, _a4)} +} + +func (_c *MockContractReader_QueryKey_Call) Run(run func(_a0 context.Context, _a1 types.BoundContract, _a2 query.KeyFilter, _a3 query.LimitAndSort, _a4 any)) *MockContractReader_QueryKey_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(types.BoundContract), args[2].(query.KeyFilter), args[3].(query.LimitAndSort), args[4].(any)) + }) + return _c +} + +func (_c *MockContractReader_QueryKey_Call) Return(_a0 []types.Sequence, _a1 error) *MockContractReader_QueryKey_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *MockContractReader_QueryKey_Call) RunAndReturn(run func(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error)) *MockContractReader_QueryKey_Call { + _c.Call.Return(run) + return _c +} + +// NewMockContractReader creates a new instance of MockContractReader. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockContractReader(t interface { + mock.TestingT + Cleanup(func()) +}) *MockContractReader { + mock := &MockContractReader{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go new file mode 100644 index 00000000000..0ba789b3bd3 --- /dev/null +++ b/core/services/workflows/syncer/handler.go @@ -0,0 +1,72 @@ +package syncer + +import ( + "context" + "encoding/hex" + "fmt" + + "github.com/smartcontractkit/chainlink/v2/core/logger" +) + +// eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding +// method that handles the event. +type eventHandler struct { + lggr logger.Logger + orm ORM + fetcher FetcherFunc +} + +// newEventHandler returns a new eventHandler instance. +func newEventHandler( + lggr logger.Logger, + orm ORM, + gateway FetcherFunc, +) *eventHandler { + return &eventHandler{ + lggr: lggr, + orm: orm, + fetcher: gateway, + } +} + +func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) error { + switch event.EventType { + case ForceUpdateSecretsEvent: + return h.forceUpdateSecretsEvent(ctx, event) + default: + return fmt.Errorf("event type unsupported: %v", event.EventType) + } +} + +// forceUpdateSecretsEvent handles the ForceUpdateSecretsEvent event type. +func (h *eventHandler) forceUpdateSecretsEvent( + ctx context.Context, + event WorkflowRegistryEvent, +) error { + // Get the URL of the secrets file from the event data + data, ok := event.Data.(WorkflowRegistryForceUpdateSecretsRequestedV1) + if !ok { + return fmt.Errorf("invalid data type %T for event", event.Data) + } + + hash := hex.EncodeToString(data.SecretsURLHash) + + url, err := h.orm.GetSecretsURLByHash(ctx, hash) + if err != nil { + h.lggr.Errorf("failed to get URL by hash %s : %s", hash, err) + return err + } + + // Fetch the contents of the secrets file from the url via the fetcher + secrets, err := h.fetcher(ctx, url) + if err != nil { + return err + } + + // Update the secrets in the ORM + if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil { + return err + } + + return nil +} diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go new file mode 100644 index 00000000000..dcdea28eda4 --- /dev/null +++ b/core/services/workflows/syncer/handler_test.go @@ -0,0 +1,136 @@ +package syncer + +import ( + "context" + "encoding/hex" + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer/mocks" + "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" + "github.com/smartcontractkit/chainlink/v2/core/utils/matches" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func Test_Handler(t *testing.T) { + lggr := logger.TestLogger(t) + t.Run("success", func(t *testing.T) { + mockORM := mocks.NewORM(t) + ctx := testutils.Context(t) + giveURL := "https://original-url.com" + giveBytes, err := crypto.Keccak256([]byte(giveURL)) + require.NoError(t, err) + + giveHash := hex.EncodeToString(giveBytes) + + giveEvent := WorkflowRegistryEvent{ + EventType: ForceUpdateSecretsEvent, + Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ + SecretsURLHash: giveBytes, + }, + } + + fetcher := func(_ context.Context, _ string) ([]byte, error) { + return []byte("contents"), nil + } + mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) + mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) + h := newEventHandler(lggr, mockORM, fetcher) + err = h.Handle(ctx, giveEvent) + require.NoError(t, err) + }) + + t.Run("fails with unsupported event type", func(t *testing.T) { + mockORM := mocks.NewORM(t) + ctx := testutils.Context(t) + + giveEvent := WorkflowRegistryEvent{} + fetcher := func(_ context.Context, _ string) ([]byte, error) { + return []byte("contents"), nil + } + + h := newEventHandler(lggr, mockORM, fetcher) + err := h.Handle(ctx, giveEvent) + require.Error(t, err) + require.Contains(t, err.Error(), "event type unsupported") + }) + + t.Run("fails to get secrets url", func(t *testing.T) { + mockORM := mocks.NewORM(t) + ctx := testutils.Context(t) + h := newEventHandler(lggr, mockORM, nil) + giveURL := "https://original-url.com" + giveBytes, err := crypto.Keccak256([]byte(giveURL)) + require.NoError(t, err) + + giveHash := hex.EncodeToString(giveBytes) + + giveEvent := WorkflowRegistryEvent{ + EventType: ForceUpdateSecretsEvent, + Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ + SecretsURLHash: giveBytes, + }, + } + mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return("", assert.AnError) + err = h.Handle(ctx, giveEvent) + require.Error(t, err) + require.ErrorContains(t, err, assert.AnError.Error()) + }) + + t.Run("fails to fetch contents", func(t *testing.T) { + mockORM := mocks.NewORM(t) + ctx := testutils.Context(t) + giveURL := "http://example.com" + + giveBytes, err := crypto.Keccak256([]byte(giveURL)) + require.NoError(t, err) + + giveHash := hex.EncodeToString(giveBytes) + + giveEvent := WorkflowRegistryEvent{ + EventType: ForceUpdateSecretsEvent, + Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ + SecretsURLHash: giveBytes, + }, + } + + fetcher := func(_ context.Context, _ string) ([]byte, error) { + return nil, assert.AnError + } + mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) + h := newEventHandler(lggr, mockORM, fetcher) + err = h.Handle(ctx, giveEvent) + require.Error(t, err) + require.ErrorIs(t, err, assert.AnError) + }) + + t.Run("fails to update secrets", func(t *testing.T) { + mockORM := mocks.NewORM(t) + ctx := testutils.Context(t) + giveURL := "http://example.com" + giveBytes, err := crypto.Keccak256([]byte(giveURL)) + require.NoError(t, err) + + giveHash := hex.EncodeToString(giveBytes) + + giveEvent := WorkflowRegistryEvent{ + EventType: ForceUpdateSecretsEvent, + Data: WorkflowRegistryForceUpdateSecretsRequestedV1{ + SecretsURLHash: giveBytes, + }, + } + + fetcher := func(_ context.Context, _ string) ([]byte, error) { + return []byte("contents"), nil + } + mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) + mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) + h := newEventHandler(lggr, mockORM, fetcher) + err = h.Handle(ctx, giveEvent) + require.Error(t, err) + require.ErrorIs(t, err, assert.AnError) + }) +} diff --git a/core/services/workflows/syncer/heap.go b/core/services/workflows/syncer/heap.go new file mode 100644 index 00000000000..061293928a3 --- /dev/null +++ b/core/services/workflows/syncer/heap.go @@ -0,0 +1,63 @@ +package syncer + +import "container/heap" + +type Heap interface { + // Push adds a new item to the heap. + Push(x WorkflowRegistryEventResponse) + + // Pop removes the smallest item from the heap and returns it. + Pop() WorkflowRegistryEventResponse + + // Len returns the number of items in the heap. + Len() int +} + +// publicHeap is a wrapper around the heap.Interface that exposes the Push and Pop methods. +type publicHeap[T any] struct { + heap heap.Interface +} + +func (h *publicHeap[T]) Push(x T) { + heap.Push(h.heap, x) +} + +func (h *publicHeap[T]) Pop() T { + return heap.Pop(h.heap).(T) +} + +func (h *publicHeap[T]) Len() int { + return h.heap.Len() +} + +// blockHeightHeap is a heap.Interface that sorts WorkflowRegistryEventResponses by block height. +type blockHeightHeap []WorkflowRegistryEventResponse + +// newBlockHeightHeap returns an initialized heap that sorts WorkflowRegistryEventResponses by block height. +func newBlockHeightHeap() Heap { + h := blockHeightHeap(make([]WorkflowRegistryEventResponse, 0)) + heap.Init(&h) + return &publicHeap[WorkflowRegistryEventResponse]{heap: &h} +} + +func (h *blockHeightHeap) Len() int { return len(*h) } + +func (h *blockHeightHeap) Less(i, j int) bool { + return (*h)[i].Event.Head.Height < (*h)[j].Event.Head.Height +} + +func (h *blockHeightHeap) Swap(i, j int) { + (*h)[i], (*h)[j] = (*h)[j], (*h)[i] +} + +func (h *blockHeightHeap) Push(x any) { + *h = append(*h, x.(WorkflowRegistryEventResponse)) +} + +func (h *blockHeightHeap) Pop() any { + old := *h + n := len(old) + x := old[n-1] + *h = old[0 : n-1] + return x +} diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go new file mode 100644 index 00000000000..b3d82c2067d --- /dev/null +++ b/core/services/workflows/syncer/mocks/orm.go @@ -0,0 +1,440 @@ +// Code generated by mockery v2.46.3. DO NOT EDIT. + +package mocks + +import ( + context "context" + + mock "github.com/stretchr/testify/mock" +) + +// ORM is an autogenerated mock type for the ORM type +type ORM struct { + mock.Mock +} + +type ORM_Expecter struct { + mock *mock.Mock +} + +func (_m *ORM) EXPECT() *ORM_Expecter { + return &ORM_Expecter{mock: &_m.Mock} +} + +// Create provides a mock function with given fields: ctx, secretsURL, hash, contents +func (_m *ORM) Create(ctx context.Context, secretsURL string, hash string, contents string) (int64, error) { + ret := _m.Called(ctx, secretsURL, hash, contents) + + if len(ret) == 0 { + panic("no return value specified for Create") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string, string) (int64, error)); ok { + return rf(ctx, secretsURL, hash, contents) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string, string) int64); ok { + r0 = rf(ctx, secretsURL, hash, contents) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string, string) error); ok { + r1 = rf(ctx, secretsURL, hash, contents) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_Create_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Create' +type ORM_Create_Call struct { + *mock.Call +} + +// Create is a helper method to define mock.On call +// - ctx context.Context +// - secretsURL string +// - hash string +// - contents string +func (_e *ORM_Expecter) Create(ctx interface{}, secretsURL interface{}, hash interface{}, contents interface{}) *ORM_Create_Call { + return &ORM_Create_Call{Call: _e.mock.On("Create", ctx, secretsURL, hash, contents)} +} + +func (_c *ORM_Create_Call) Run(run func(ctx context.Context, secretsURL string, hash string, contents string)) *ORM_Create_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string), args[3].(string)) + }) + return _c +} + +func (_c *ORM_Create_Call) Return(_a0 int64, _a1 error) *ORM_Create_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_Create_Call) RunAndReturn(run func(context.Context, string, string, string) (int64, error)) *ORM_Create_Call { + _c.Call.Return(run) + return _c +} + +// GetContents provides a mock function with given fields: ctx, url +func (_m *ORM) GetContents(ctx context.Context, url string) (string, error) { + ret := _m.Called(ctx, url) + + if len(ret) == 0 { + panic("no return value specified for GetContents") + } + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { + return rf(ctx, url) + } + if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { + r0 = rf(ctx, url) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, url) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_GetContents_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetContents' +type ORM_GetContents_Call struct { + *mock.Call +} + +// GetContents is a helper method to define mock.On call +// - ctx context.Context +// - url string +func (_e *ORM_Expecter) GetContents(ctx interface{}, url interface{}) *ORM_GetContents_Call { + return &ORM_GetContents_Call{Call: _e.mock.On("GetContents", ctx, url)} +} + +func (_c *ORM_GetContents_Call) Run(run func(ctx context.Context, url string)) *ORM_GetContents_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *ORM_GetContents_Call) Return(_a0 string, _a1 error) *ORM_GetContents_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_GetContents_Call) RunAndReturn(run func(context.Context, string) (string, error)) *ORM_GetContents_Call { + _c.Call.Return(run) + return _c +} + +// GetContentsByHash provides a mock function with given fields: ctx, hash +func (_m *ORM) GetContentsByHash(ctx context.Context, hash string) (string, error) { + ret := _m.Called(ctx, hash) + + if len(ret) == 0 { + panic("no return value specified for GetContentsByHash") + } + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { + return rf(ctx, hash) + } + if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { + r0 = rf(ctx, hash) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, hash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_GetContentsByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetContentsByHash' +type ORM_GetContentsByHash_Call struct { + *mock.Call +} + +// GetContentsByHash is a helper method to define mock.On call +// - ctx context.Context +// - hash string +func (_e *ORM_Expecter) GetContentsByHash(ctx interface{}, hash interface{}) *ORM_GetContentsByHash_Call { + return &ORM_GetContentsByHash_Call{Call: _e.mock.On("GetContentsByHash", ctx, hash)} +} + +func (_c *ORM_GetContentsByHash_Call) Run(run func(ctx context.Context, hash string)) *ORM_GetContentsByHash_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *ORM_GetContentsByHash_Call) Return(_a0 string, _a1 error) *ORM_GetContentsByHash_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_GetContentsByHash_Call) RunAndReturn(run func(context.Context, string) (string, error)) *ORM_GetContentsByHash_Call { + _c.Call.Return(run) + return _c +} + +// GetSecretsURLByHash provides a mock function with given fields: ctx, hash +func (_m *ORM) GetSecretsURLByHash(ctx context.Context, hash string) (string, error) { + ret := _m.Called(ctx, hash) + + if len(ret) == 0 { + panic("no return value specified for GetSecretsURLByHash") + } + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string) (string, error)); ok { + return rf(ctx, hash) + } + if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { + r0 = rf(ctx, hash) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) error); ok { + r1 = rf(ctx, hash) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_GetSecretsURLByHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSecretsURLByHash' +type ORM_GetSecretsURLByHash_Call struct { + *mock.Call +} + +// GetSecretsURLByHash is a helper method to define mock.On call +// - ctx context.Context +// - hash string +func (_e *ORM_Expecter) GetSecretsURLByHash(ctx interface{}, hash interface{}) *ORM_GetSecretsURLByHash_Call { + return &ORM_GetSecretsURLByHash_Call{Call: _e.mock.On("GetSecretsURLByHash", ctx, hash)} +} + +func (_c *ORM_GetSecretsURLByHash_Call) Run(run func(ctx context.Context, hash string)) *ORM_GetSecretsURLByHash_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *ORM_GetSecretsURLByHash_Call) Return(_a0 string, _a1 error) *ORM_GetSecretsURLByHash_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_GetSecretsURLByHash_Call) RunAndReturn(run func(context.Context, string) (string, error)) *ORM_GetSecretsURLByHash_Call { + _c.Call.Return(run) + return _c +} + +// GetSecretsURLByID provides a mock function with given fields: ctx, id +func (_m *ORM) GetSecretsURLByID(ctx context.Context, id int64) (string, error) { + ret := _m.Called(ctx, id) + + if len(ret) == 0 { + panic("no return value specified for GetSecretsURLByID") + } + + var r0 string + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, int64) (string, error)); ok { + return rf(ctx, id) + } + if rf, ok := ret.Get(0).(func(context.Context, int64) string); ok { + r0 = rf(ctx, id) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, int64) error); ok { + r1 = rf(ctx, id) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_GetSecretsURLByID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSecretsURLByID' +type ORM_GetSecretsURLByID_Call struct { + *mock.Call +} + +// GetSecretsURLByID is a helper method to define mock.On call +// - ctx context.Context +// - id int64 +func (_e *ORM_Expecter) GetSecretsURLByID(ctx interface{}, id interface{}) *ORM_GetSecretsURLByID_Call { + return &ORM_GetSecretsURLByID_Call{Call: _e.mock.On("GetSecretsURLByID", ctx, id)} +} + +func (_c *ORM_GetSecretsURLByID_Call) Run(run func(ctx context.Context, id int64)) *ORM_GetSecretsURLByID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(int64)) + }) + return _c +} + +func (_c *ORM_GetSecretsURLByID_Call) Return(_a0 string, _a1 error) *ORM_GetSecretsURLByID_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_GetSecretsURLByID_Call) RunAndReturn(run func(context.Context, int64) (string, error)) *ORM_GetSecretsURLByID_Call { + _c.Call.Return(run) + return _c +} + +// GetSecretsURLHash provides a mock function with given fields: owner, secretsURL +func (_m *ORM) GetSecretsURLHash(owner []byte, secretsURL []byte) ([]byte, error) { + ret := _m.Called(owner, secretsURL) + + if len(ret) == 0 { + panic("no return value specified for GetSecretsURLHash") + } + + var r0 []byte + var r1 error + if rf, ok := ret.Get(0).(func([]byte, []byte) ([]byte, error)); ok { + return rf(owner, secretsURL) + } + if rf, ok := ret.Get(0).(func([]byte, []byte) []byte); ok { + r0 = rf(owner, secretsURL) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]byte) + } + } + + if rf, ok := ret.Get(1).(func([]byte, []byte) error); ok { + r1 = rf(owner, secretsURL) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_GetSecretsURLHash_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetSecretsURLHash' +type ORM_GetSecretsURLHash_Call struct { + *mock.Call +} + +// GetSecretsURLHash is a helper method to define mock.On call +// - owner []byte +// - secretsURL []byte +func (_e *ORM_Expecter) GetSecretsURLHash(owner interface{}, secretsURL interface{}) *ORM_GetSecretsURLHash_Call { + return &ORM_GetSecretsURLHash_Call{Call: _e.mock.On("GetSecretsURLHash", owner, secretsURL)} +} + +func (_c *ORM_GetSecretsURLHash_Call) Run(run func(owner []byte, secretsURL []byte)) *ORM_GetSecretsURLHash_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].([]byte), args[1].([]byte)) + }) + return _c +} + +func (_c *ORM_GetSecretsURLHash_Call) Return(_a0 []byte, _a1 error) *ORM_GetSecretsURLHash_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_GetSecretsURLHash_Call) RunAndReturn(run func([]byte, []byte) ([]byte, error)) *ORM_GetSecretsURLHash_Call { + _c.Call.Return(run) + return _c +} + +// Update provides a mock function with given fields: ctx, secretsURL, contents +func (_m *ORM) Update(ctx context.Context, secretsURL string, contents string) (int64, error) { + ret := _m.Called(ctx, secretsURL, contents) + + if len(ret) == 0 { + panic("no return value specified for Update") + } + + var r0 int64 + var r1 error + if rf, ok := ret.Get(0).(func(context.Context, string, string) (int64, error)); ok { + return rf(ctx, secretsURL, contents) + } + if rf, ok := ret.Get(0).(func(context.Context, string, string) int64); ok { + r0 = rf(ctx, secretsURL, contents) + } else { + r0 = ret.Get(0).(int64) + } + + if rf, ok := ret.Get(1).(func(context.Context, string, string) error); ok { + r1 = rf(ctx, secretsURL, contents) + } else { + r1 = ret.Error(1) + } + + return r0, r1 +} + +// ORM_Update_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'Update' +type ORM_Update_Call struct { + *mock.Call +} + +// Update is a helper method to define mock.On call +// - ctx context.Context +// - secretsURL string +// - contents string +func (_e *ORM_Expecter) Update(ctx interface{}, secretsURL interface{}, contents interface{}) *ORM_Update_Call { + return &ORM_Update_Call{Call: _e.mock.On("Update", ctx, secretsURL, contents)} +} + +func (_c *ORM_Update_Call) Run(run func(ctx context.Context, secretsURL string, contents string)) *ORM_Update_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string), args[2].(string)) + }) + return _c +} + +func (_c *ORM_Update_Call) Return(_a0 int64, _a1 error) *ORM_Update_Call { + _c.Call.Return(_a0, _a1) + return _c +} + +func (_c *ORM_Update_Call) RunAndReturn(run func(context.Context, string, string) (int64, error)) *ORM_Update_Call { + _c.Call.Return(run) + return _c +} + +// NewORM creates a new instance of ORM. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewORM(t interface { + mock.TestingT + Cleanup(func()) +}) *ORM { + mock := &ORM{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go new file mode 100644 index 00000000000..a10eb708ddf --- /dev/null +++ b/core/services/workflows/syncer/orm.go @@ -0,0 +1,139 @@ +package syncer + +import ( + "context" + + "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" +) + +type ORM interface { + // GetSecretsURLByID returns the secrets URL for the given ID. + GetSecretsURLByID(ctx context.Context, id int64) (string, error) + + // GetSecretsURLByID returns the secrets URL for the given ID. + GetSecretsURLByHash(ctx context.Context, hash string) (string, error) + + // GetContents returns the contents of the secret at the given plain URL. + GetContents(ctx context.Context, url string) (string, error) + + // GetContentsByHash returns the contents of the secret at the given hashed URL. + GetContentsByHash(ctx context.Context, hash string) (string, error) + + // GetSecretsURLHash returns the keccak256 hash of the owner and secrets URL. + GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) + + // Update updates the contents of the secrets at the given plain URL or inserts a new record if not found. + Update(ctx context.Context, secretsURL, contents string) (int64, error) + + Create(ctx context.Context, secretsURL, hash, contents string) (int64, error) +} + +type WorkflowRegistryDS = ORM + +type orm struct { + ds sqlutil.DataSource + lggr logger.Logger +} + +var _ ORM = (*orm)(nil) + +func NewWorkflowRegistryDS(ds sqlutil.DataSource, lggr logger.Logger) *orm { + return &orm{ + ds: ds, + lggr: lggr, + } +} + +func (orm *orm) GetSecretsURLByID(ctx context.Context, id int64) (string, error) { + var secretsURL string + err := orm.ds.GetContext(ctx, &secretsURL, + `SELECT secrets_url FROM workflow_secrets WHERE workflow_secrets.id = $1`, + id, + ) + + return secretsURL, err +} + +func (orm *orm) GetSecretsURLByHash(ctx context.Context, hash string) (string, error) { + var secretsURL string + err := orm.ds.GetContext(ctx, &secretsURL, + `SELECT secrets_url FROM workflow_secrets WHERE workflow_secrets.secrets_url_hash = $1`, + hash, + ) + + return secretsURL, err +} + +func (orm *orm) GetContentsByHash(ctx context.Context, hash string) (string, error) { + var contents string + err := orm.ds.GetContext(ctx, &contents, + `SELECT contents + FROM workflow_secrets + WHERE secrets_url_hash = $1`, + hash, + ) + + if err != nil { + return "", err // Return an empty Artifact struct and the error + } + + return contents, nil // Return the populated Artifact struct +} + +func (orm *orm) GetContents(ctx context.Context, url string) (string, error) { + var contents string + err := orm.ds.GetContext(ctx, &contents, + `SELECT contents + FROM workflow_secrets + WHERE secrets_url = $1`, + url, + ) + + if err != nil { + return "", err // Return an empty Artifact struct and the error + } + + return contents, nil // Return the populated Artifact struct +} + +// Update updates the secrets content at the given hash or inserts a new record if not found. +func (orm *orm) Update(ctx context.Context, hash, contents string) (int64, error) { + var id int64 + err := orm.ds.QueryRowxContext(ctx, + `INSERT INTO workflow_secrets (secrets_url_hash, contents) + VALUES ($1, $2) + ON CONFLICT (secrets_url_hash) DO UPDATE + SET secrets_url_hash = EXCLUDED.secrets_url_hash, contents = EXCLUDED.contents + RETURNING id`, + hash, contents, + ).Scan(&id) + + if err != nil { + return 0, err + } + + return id, nil +} + +// Update updates the secrets content at the given hash or inserts a new record if not found. +func (orm *orm) Create(ctx context.Context, url, hash, contents string) (int64, error) { + var id int64 + err := orm.ds.QueryRowxContext(ctx, + `INSERT INTO workflow_secrets (secrets_url, secrets_url_hash, contents) + VALUES ($1, $2, $3) + RETURNING id`, + url, hash, contents, + ).Scan(&id) + + if err != nil { + return 0, err + } + + return id, nil +} + +func (orm *orm) GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) { + return crypto.Keccak256(append(owner, secretsURL...)) +} diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go new file mode 100644 index 00000000000..8b9f685bb52 --- /dev/null +++ b/core/services/workflows/syncer/orm_test.go @@ -0,0 +1,53 @@ +package syncer + +import ( + "encoding/hex" + "testing" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestWorkflowArtifactsORM_GetAndUpdate(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + giveURL := "https://example.com" + giveBytes, err := crypto.Keccak256([]byte(giveURL)) + require.NoError(t, err) + giveHash := hex.EncodeToString(giveBytes) + giveContent := "some contents" + + gotID, err := orm.Create(ctx, giveURL, giveHash, giveContent) + require.NoError(t, err) + + url, err := orm.GetSecretsURLByID(ctx, gotID) + require.NoError(t, err) + assert.Equal(t, giveURL, url) + + contents, err := orm.GetContents(ctx, giveURL) + require.NoError(t, err) + assert.Equal(t, "some contents", contents) + + contents, err = orm.GetContentsByHash(ctx, giveHash) + require.NoError(t, err) + assert.Equal(t, "some contents", contents) + + _, err = orm.Update(ctx, giveHash, "new contents") + require.NoError(t, err) + + contents, err = orm.GetContents(ctx, giveURL) + require.NoError(t, err) + assert.Equal(t, "new contents", contents) + + contents, err = orm.GetContentsByHash(ctx, giveHash) + require.NoError(t, err) + assert.Equal(t, "new contents", contents) +} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 1d42e9d5deb..1af31336734 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -2,39 +2,587 @@ package syncer import ( "context" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "strconv" + "sync" + "time" "github.com/smartcontractkit/chainlink-common/pkg/services" + types "github.com/smartcontractkit/chainlink-common/pkg/types" + query "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" + "github.com/smartcontractkit/chainlink/v2/core/logger" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/utils/signalers" ) -type WorkflowRegistry struct { +const name = "WorkflowRegistrySyncer" + +var ( + defaultTickInterval = 12 * time.Second + ContractName = "WorkflowRegistry" +) + +// WorkflowRegistryrEventType is the type of event that is emitted by the WorkflowRegistry +type WorkflowRegistryEventType string + +var ( + // ForceUpdateSecretsEvent is emitted when a request to force update a workflows secrets is made + ForceUpdateSecretsEvent WorkflowRegistryEventType = "WorkflowForceUpdateSecretsRequestedV1" +) + +// WorkflowRegistryForceUpdateSecretsRequestedV1 is a chain agnostic definition of the WorkflowRegistry +// ForceUpdateSecretsRequested event. +type WorkflowRegistryForceUpdateSecretsRequestedV1 struct { + SecretsURLHash []byte + Owner []byte + WorkflowName string +} + +type Head struct { + Hash string + Height string + Timestamp uint64 +} + +// WorkflowRegistryEvent is an event emitted by the WorkflowRegistry. Each event is typed +// so that the consumer can determine how to handle the event. +type WorkflowRegistryEvent struct { + Cursor string + Data any + EventType WorkflowRegistryEventType + Head Head +} + +// WorkflowRegistryEventResponse is a response to either parsing a queried event or handling the event. +type WorkflowRegistryEventResponse struct { + Err error + Event *WorkflowRegistryEvent +} + +// ContractEventPollerConfig is the configuration needed to poll for events on a contract. Currently +// requires the ContractEventName. +// +// TODO(mstreet3): Use LookbackBlocks instead of StartBlockNum +type ContractEventPollerConfig struct { + ContractName string + ContractAddress string + StartBlockNum uint64 + QueryCount uint64 +} + +// FetcherFunc is an abstraction for fetching the contents stored at a URL. +type FetcherFunc func(ctx context.Context, url string) ([]byte, error) + +type ContractReaderFactory interface { + NewContractReader(context.Context, []byte) (types.ContractReader, error) +} + +// ContractReader is a subset of types.ContractReader defined locally to enable mocking. +type ContractReader interface { + Bind(context.Context, []types.BoundContract) error + QueryKey(context.Context, types.BoundContract, query.KeyFilter, query.LimitAndSort, any) ([]types.Sequence, error) +} + +// WorkflowRegistrySyncer is the public interface of the package. +type WorkflowRegistrySyncer interface { + services.Service +} + +var _ WorkflowRegistrySyncer = (*workflowRegistry)(nil) + +// workflowRegistry is the implementation of the WorkflowRegistrySyncer interface. +type workflowRegistry struct { services.StateMachine + + // close stopCh to stop the workflowRegistry. + stopCh services.StopChan + + // all goroutines are waited on with wg. + wg sync.WaitGroup + + // ticker is the interval at which the workflowRegistry will poll the contract for events. + ticker <-chan struct{} + + lggr logger.Logger + orm WorkflowRegistryDS + reader ContractReader + gateway FetcherFunc + + // initReader allows the workflowRegistry to initialize a contract reader if one is not provided + // and separates the contract reader initialization from the workflowRegistry start up. + initReader func(context.Context, logger.Logger, ContractReaderFactory, types.BoundContract) (types.ContractReader, error) + relayer ContractReaderFactory + + cfg ContractEventPollerConfig + eventTypes []WorkflowRegistryEventType + + // eventsCh is read by the handler and each event is handled once received. + eventsCh chan WorkflowRegistryEventResponse + handler *eventHandler + + // batchCh is a channel that receives batches of events from the contract query goroutines. + batchCh chan []WorkflowRegistryEventResponse + + // heap is a min heap that merges batches of events from the contract query goroutines. The + // default min heap is sorted by block height. + heap Heap +} + +// WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful +// for overriding the default tick interval. +func WithTicker(ticker <-chan struct{}) func(*workflowRegistry) { + return func(wr *workflowRegistry) { + wr.ticker = ticker + } +} + +func WithReader(reader types.ContractReader) func(*workflowRegistry) { + return func(wr *workflowRegistry) { + wr.reader = reader + } +} + +// NewWorkflowRegistry returns a new workflowRegistry. +// Only queries for WorkflowRegistryForceUpdateSecretsRequestedV1 events. +func NewWorkflowRegistry[T ContractReader]( + lggr logger.Logger, + orm WorkflowRegistryDS, + reader T, + gateway FetcherFunc, + addr string, + opts ...func(*workflowRegistry), +) *workflowRegistry { + ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} + wr := &workflowRegistry{ + lggr: lggr.Named(name), + orm: orm, + reader: reader, + gateway: gateway, + cfg: ContractEventPollerConfig{ + ContractName: ContractName, + ContractAddress: addr, + QueryCount: 20, + StartBlockNum: 0, + }, + initReader: newReader, + heap: newBlockHeightHeap(), + stopCh: make(services.StopChan), + eventTypes: ets, + eventsCh: make(chan WorkflowRegistryEventResponse), + batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), + } + wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway) + for _, opt := range opts { + opt(wr) + } + return wr } -func (w *WorkflowRegistry) Start(ctx context.Context) error { +// Start starts the workflowRegistry. It starts two goroutines, one for querying the contract +// and one for handling the events. +func (w *workflowRegistry) Start(_ context.Context) error { + return w.StartOnce(w.Name(), func() error { + ctx, cancel := w.stopCh.NewCtx() + + w.wg.Add(1) + go func() { + defer w.wg.Done() + defer cancel() + + w.syncEventsLoop(ctx) + }() + + w.wg.Add(1) + go func() { + defer w.wg.Done() + defer cancel() + + w.handlerLoop(ctx) + }() + + return nil + }) +} + +func (w *workflowRegistry) Close() error { + return w.StopOnce(w.Name(), func() error { + close(w.stopCh) + w.wg.Wait() + return nil + }) +} + +func (w *workflowRegistry) Ready() error { return nil } -func (w *WorkflowRegistry) Close() error { +func (w *workflowRegistry) HealthReport() map[string]error { return nil } -func (w *WorkflowRegistry) Ready() error { +func (w *workflowRegistry) Name() string { + return name +} + +func (w *workflowRegistry) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { + return nil, errors.New("not implemented") +} + +// handlerLoop handles the events that are emitted by the contract. +func (w *workflowRegistry) handlerLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case resp, open := <-w.eventsCh: + if !open { + return + } + + if resp.Err != nil || resp.Event == nil { + w.lggr.Errorf("failed to handle event: %+v", resp.Err) + continue + } + + event := resp.Event + w.lggr.Debugf("handling event: %+v", event) + if err := w.handler.Handle(ctx, *event); err != nil { + w.lggr.Errorf("failed to handle event: %+v", event) + continue + } + } + } +} + +// syncEventsLoop polls the contract for events and passes them to a channel for handling. +func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { + var ( + // sendLog is a helper that sends a WorkflowRegistryEventResponse to the eventsCh in a + // blocking way that will send the response or be canceled. + sendLog = func(resp WorkflowRegistryEventResponse) { + select { + case w.eventsCh <- resp: + case <-ctx.Done(): + } + } + + ticker = w.getTicker(ctx) + + signal = make(chan struct{}) + ) + + // critical failure if there is no reader, the loop will exit and the parent context will be + // canceled. + reader, err := w.getContractReader(ctx) + if err != nil { + w.lggr.Criticalf("contract reader unavailable : %s", err) + return + } + + // fan out and query for each event type + for i := 0; i < len(w.eventTypes); i++ { + w.wg.Add(1) + go func() { + defer w.wg.Done() + + queryEvent( + ctx, + signal, + w.lggr, + reader, + w.cfg, + w.eventTypes[i], + w.batchCh, + ) + }() + } + + // Periodically send a signal to all the queryEvent goroutines to query the contract + for { + select { + case <-ctx.Done(): + return + case <-ticker: + // for each event type, send a signal for it to execute a query and produce a new + // batch of event logs + for i := 0; i < len(w.eventTypes); i++ { + select { + case signal <- struct{}{}: + case <-ctx.Done(): + return + } + } + + // block on fan-in until all fetched event logs are sent to the handlers + w.orderAndSend( + ctx, + len(w.eventTypes), + w.batchCh, + sendLog, + ) + } + } +} + +// orderAndSend reads n batches from the batch channel, heapifies all the batches then dequeues +// the min heap via the sendLog function. +func (w *workflowRegistry) orderAndSend( + ctx context.Context, + batchCount int, + batchCh <-chan []WorkflowRegistryEventResponse, + sendLog func(WorkflowRegistryEventResponse), +) { + for { + select { + case <-ctx.Done(): + return + case batch := <-batchCh: + for _, response := range batch { + w.heap.Push(response) + } + batchCount-- + + // If we have received responses for all the events, then we can drain the heap. + if batchCount == 0 { + for w.heap.Len() > 0 { + sendLog(w.heap.Pop()) + } + return + } + } + } +} + +// getTicker returns the ticker that the workflowRegistry will use to poll for events. If the ticker +// is nil, then a default ticker is returned. +func (w *workflowRegistry) getTicker(ctx context.Context) <-chan struct{} { + if w.ticker == nil { + return signalers.MakeTicker(ctx.Done(), defaultTickInterval) + } + + return w.ticker +} + +// getContractReader initializes a contract reader if needed, otherwise returns the existing +// reader. +func (w *workflowRegistry) getContractReader(ctx context.Context) (ContractReader, error) { + c := types.BoundContract{ + Name: w.cfg.ContractName, + Address: w.cfg.ContractAddress, + } + + if w.reader == nil { + reader, err := w.initReader(ctx, w.lggr, w.relayer, c) + if err != nil { + return nil, err + } + + w.reader = reader + } + + return w.reader, nil +} + +// queryEvent queries the contract for events of the given type on each tick from the ticker. +// Sends a batch of event logs to the batch channel. The batch represents all the +// event logs read since the last query. Loops until the context is canceled. +func queryEvent( + ctx context.Context, + ticker <-chan struct{}, + lggr logger.Logger, + reader ContractReader, + cfg ContractEventPollerConfig, + et WorkflowRegistryEventType, + batchCh chan<- []WorkflowRegistryEventResponse, +) { + // create query + var ( + responseBatch []WorkflowRegistryEventResponse + logData values.Value + cursor = "" + limitAndSort = query.LimitAndSort{ + SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, + Limit: query.Limit{Count: cfg.QueryCount}, + } + bc = types.BoundContract{ + Name: cfg.ContractName, + Address: cfg.ContractAddress, + } + ) + + // Loop until canceled + for { + select { + case <-ctx.Done(): + return + case <-ticker: + if cursor != "" { + limitAndSort.Limit = query.CursorLimit(cursor, query.CursorFollowing, cfg.QueryCount) + } + + logs, err := reader.QueryKey( + ctx, + bc, + query.KeyFilter{ + Key: string(et), + Expressions: []query.Expression{ + query.Confidence(primitives.Finalized), + query.Block(strconv.FormatUint(cfg.StartBlockNum, 10), primitives.Gte), + }, + }, + limitAndSort, + &logData, + ) + + if err != nil { + lggr.Errorw("QueryKey failure", "err", err) + continue + } + + // ChainReader QueryKey API provides logs including the cursor value and not + // after the cursor value. If the response only consists of the log corresponding + // to the cursor and no log after it, then we understand that there are no new + // logs + if len(logs) == 1 && logs[0].Cursor == cursor { + lggr.Infow("No new logs since", "cursor", cursor) + continue + } + + for _, log := range logs { + if log.Cursor == cursor { + continue + } + + responseBatch = append(responseBatch, toWorkflowRegistryEventResponse(log, et, lggr)) + cursor = log.Cursor + } + batchCh <- responseBatch + } + } +} + +func newReader( + ctx context.Context, + lggr logger.Logger, + factory ContractReaderFactory, + bc types.BoundContract, +) (types.ContractReader, error) { + contractReaderCfg := evmtypes.ChainReaderConfig{ + Contracts: map[string]evmtypes.ChainContractReader{ + ContractName: { + ContractPollingFilter: evmtypes.ContractPollingFilter{ + GenericEventNames: []string{string(ForceUpdateSecretsEvent)}, + }, + ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, + Configs: map[string]*evmtypes.ChainReaderDefinition{ + string(ForceUpdateSecretsEvent): { + ChainSpecificName: string(ForceUpdateSecretsEvent), + ReadType: evmtypes.Event, + }, + }, + }, + }, + } + + marshalledCfg, err := json.Marshal(contractReaderCfg) + if err != nil { + return nil, err + } + + reader, err := factory.NewContractReader(ctx, marshalledCfg) + if err != nil { + return nil, err + } + + // bind contract to contract reader + if err := reader.Bind(ctx, []types.BoundContract{bc}); err != nil { + return nil, err + } + + return reader, nil +} + +// toWorkflowRegistryEventResponse converts a types.Sequence to a WorkflowRegistryEventResponse. +func toWorkflowRegistryEventResponse( + log types.Sequence, + evt WorkflowRegistryEventType, + lggr logger.Logger, +) WorkflowRegistryEventResponse { + resp := WorkflowRegistryEventResponse{ + Event: &WorkflowRegistryEvent{ + Cursor: log.Cursor, + EventType: evt, + Head: Head{ + Hash: hex.EncodeToString(log.Hash), + Height: log.Height, + Timestamp: log.Timestamp, + }, + }, + } + + dataAsValuesMap, err := values.WrapMap(log.Data) + if err != nil { + return WorkflowRegistryEventResponse{ + Err: err, + } + } + + switch evt { + case ForceUpdateSecretsEvent: + var data WorkflowRegistryForceUpdateSecretsRequestedV1 + if err := dataAsValuesMap.UnwrapTo(&data); err != nil { + lggr.Errorf("failed to unwrap data: %+v", log.Data) + resp.Event = nil + resp.Err = err + return resp + } + resp.Event.Data = data + default: + lggr.Errorf("unknown event type: %s", evt) + resp.Event = nil + resp.Err = fmt.Errorf("unknown event type: %s", evt) + } + + return resp +} + +type nullWorkflowRegistrySyncer struct { + services.Service +} + +func NewNullWorkflowRegistrySyncer() *nullWorkflowRegistrySyncer { + return &nullWorkflowRegistrySyncer{} +} + +// Start +func (u *nullWorkflowRegistrySyncer) Start(context.Context) error { return nil } -func (w *WorkflowRegistry) HealthReport() map[string]error { +// Close +func (u *nullWorkflowRegistrySyncer) Close() error { return nil } -func (w *WorkflowRegistry) Name() string { - return "WorkflowRegistrySyncer" +// SecretsFor +func (u *nullWorkflowRegistrySyncer) SecretsFor(context.Context, string, string) (map[string]string, error) { + return nil, nil +} + +func (u *nullWorkflowRegistrySyncer) Ready() error { + return nil } -func (w *WorkflowRegistry) SecretsFor(workflowOwner, workflowName string) (map[string]string, error) { - // TODO: actually get this from the right place. - return map[string]string{}, nil +func (u *nullWorkflowRegistrySyncer) HealthReport() map[string]error { + return nil } -func NewWorkflowRegistry() *WorkflowRegistry { - return &WorkflowRegistry{} +func (u *nullWorkflowRegistrySyncer) Name() string { + return "Null" + name } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go new file mode 100644 index 00000000000..0fb1224d432 --- /dev/null +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -0,0 +1,104 @@ +package syncer + +import ( + "context" + "encoding/hex" + "strconv" + "testing" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + types "github.com/smartcontractkit/chainlink-common/pkg/types" + query "github.com/smartcontractkit/chainlink-common/pkg/types/query" + "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" + "github.com/smartcontractkit/chainlink/v2/core/utils/matches" + + "github.com/stretchr/testify/require" +) + +func Test_Workflow_Registry_Syncer(t *testing.T) { + var ( + giveContents = "contents" + wantContents = "updated contents" + giveCfg = ContractEventPollerConfig{ + ContractName: ContractName, + ContractAddress: "0xdeadbeef", + StartBlockNum: 0, + QueryCount: 20, + } + giveURL = "http://example.com" + giveHash, err = crypto.Keccak256([]byte(giveURL)) + + giveLog = types.Sequence{ + Data: map[string]any{ + "SecretsURLHash": giveHash, + "Owner": "0xowneraddr", + }, + Cursor: "cursor", + } + ) + + require.NoError(t, err) + + var ( + lggr = logger.TestLogger(t) + db = pgtest.NewSqlxDB(t) + orm = &orm{ds: db, lggr: lggr} + ctx, cancel = context.WithCancel(testutils.Context(t)) + reader = NewMockContractReader(t) + gateway = func(_ context.Context, _ string) ([]byte, error) { + return []byte(wantContents), nil + } + ticker = make(chan struct{}) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress) + ) + + // Cleanup the worker + defer cancel() + + // Override the ticker + worker.ticker = ticker + + // Seed the DB with an original entry + _, err = orm.Create(ctx, giveURL, hex.EncodeToString(giveHash), giveContents) + require.NoError(t, err) + + // Mock out the contract reader query + reader.EXPECT().QueryKey( + matches.AnyContext, + types.BoundContract{ + Name: giveCfg.ContractName, + Address: giveCfg.ContractAddress, + }, + query.KeyFilter{ + Key: string(ForceUpdateSecretsEvent), + Expressions: []query.Expression{ + query.Confidence(primitives.Finalized), + query.Block(strconv.FormatUint(giveCfg.StartBlockNum, 10), primitives.Gte), + }, + }, + query.LimitAndSort{ + SortBy: []query.SortBy{query.NewSortByTimestamp(query.Asc)}, + Limit: query.Limit{Count: giveCfg.QueryCount}, + }, + new(values.Value), + ).Return([]types.Sequence{giveLog}, nil) + + // Go run the worker + servicetest.Run(t, worker) + + // Send a tick to start a query + ticker <- struct{}{} + + // Require the secrets contents to eventually be updated + require.Eventually(t, func() bool { + secrets, err := orm.GetContents(ctx, giveURL) + require.NoError(t, err) + return secrets == wantContents + }, 5*time.Second, time.Second) +} diff --git a/core/store/migrate/migrations/0259_add_workflow_secrets.sql b/core/store/migrate/migrations/0259_add_workflow_secrets.sql new file mode 100644 index 00000000000..d6f82033b41 --- /dev/null +++ b/core/store/migrate/migrations/0259_add_workflow_secrets.sql @@ -0,0 +1,41 @@ +-- +goose Up +-- +goose StatementBegin +-- Create the workflow_artifacts table +CREATE TABLE workflow_secrets ( + id SERIAL PRIMARY KEY, + secrets_url TEXT, + secrets_url_hash TEXT UNIQUE, + contents TEXT +); + +-- Create an index on the secrets_url_hash column +CREATE INDEX idx_secrets_url ON workflow_secrets(secrets_url); + +-- Alter the workflow_specs table +ALTER TABLE workflow_specs +ADD COLUMN binary_url TEXT, +ADD COLUMN config_url TEXT, +ADD COLUMN secrets_id INT UNIQUE REFERENCES workflow_secrets(id) ON DELETE CASCADE; + +-- Alter the config column type +ALTER TABLE workflow_specs +ALTER COLUMN config TYPE TEXT; +-- +goose StatementEnd + +-- +goose Down +-- +goose StatementBegin +ALTER TABLE workflow_specs +DROP COLUMN IF EXISTS secrets_id, +DROP COLUMN IF EXISTS config_url, +DROP COLUMN IF EXISTS binary_url; + +-- Change the config column back to character varying(255) +ALTER TABLE workflow_specs +ALTER COLUMN config TYPE CHARACTER VARYING(255); + +-- Drop the index on the secrets_url_hash column +DROP INDEX IF EXISTS idx_secrets_url_hash; + +-- Drop the workflow_artifacts table +DROP TABLE IF EXISTS workflow_secrets; +-- +goose StatementEnd \ No newline at end of file diff --git a/core/utils/crypto/keccak_256.go b/core/utils/crypto/keccak_256.go new file mode 100644 index 00000000000..b6218d72cf0 --- /dev/null +++ b/core/utils/crypto/keccak_256.go @@ -0,0 +1,16 @@ +package crypto + +import ( + "golang.org/x/crypto/sha3" +) + +func Keccak256(input []byte) ([]byte, error) { + // Create a Keccak-256 hash + hash := sha3.NewLegacyKeccak256() + _, err := hash.Write(input) + if err != nil { + return nil, err + } + + return hash.Sum(nil), nil +} diff --git a/core/utils/matches/matches.go b/core/utils/matches/matches.go new file mode 100644 index 00000000000..90606af57e2 --- /dev/null +++ b/core/utils/matches/matches.go @@ -0,0 +1,21 @@ +package matches + +import ( + "context" + + "github.com/stretchr/testify/mock" +) + +func anyContext(_ context.Context) bool { + return true +} + +func anyString(_ string) bool { + return true +} + +// AnyContext is an argument matcher that matches any argument of type context.Context. +var AnyContext = mock.MatchedBy(anyContext) + +// AnyString is an argument matcher that matches any argument of type string. +var AnyString = mock.MatchedBy(anyString) diff --git a/core/utils/signalers/signalers.go b/core/utils/signalers/signalers.go new file mode 100644 index 00000000000..b05af179251 --- /dev/null +++ b/core/utils/signalers/signalers.go @@ -0,0 +1,24 @@ +package signalers + +import "time" + +func MakeTicker(stop <-chan struct{}, d time.Duration) <-chan struct{} { + ticker := make(chan struct{}) + internalTicker := time.NewTicker(d) + + go func() { + defer close(ticker) + defer internalTicker.Stop() + + for { + select { + case <-stop: + return + case <-internalTicker.C: + ticker <- struct{}{} + } + } + }() + + return ticker +} From e768aa877da851d7e00232bcbf3c1f2aaf048df7 Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Tue, 19 Nov 2024 18:38:09 +0200 Subject: [PATCH 2/4] chore(relay): tests secrets updating --- .../workflows/syncer/workflow_syncer_test.go | 217 ++++++++++++++++++ .../workflows/syncer/workflow_registry.go | 5 +- 2 files changed, 221 insertions(+), 1 deletion(-) create mode 100644 core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go new file mode 100644 index 00000000000..e830b141d68 --- /dev/null +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -0,0 +1,217 @@ +package workflow_registry_syncer_test + +import ( + "context" + "crypto/rand" + "encoding/hex" + "encoding/json" + "testing" + "time" + + "github.com/ethereum/go-ethereum/accounts/abi/bind" + "github.com/ethereum/go-ethereum/common" + + "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" + "github.com/smartcontractkit/chainlink-common/pkg/types" + "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" + coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils" + evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" + "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" + "github.com/smartcontractkit/chainlink/v2/core/utils/signalers" + + "github.com/stretchr/testify/require" +) + +func Test_SecretsWorker(t *testing.T) { + var ( + ctx = coretestutils.Context(t) + lggr = logger.TestLogger(t) + backendTH = testutils.NewEVMBackendTH(t) + db = pgtest.NewSqlxDB(t) + orm = syncer.NewWorkflowRegistryDS(db, lggr) + + giveTicker = signalers.MakeTicker(ctx.Done(), 500*time.Millisecond) + giveSecretsURL = "https://original-url.com" + donID = uint32(1) + giveWorkflow = RegisterWorkflowCMD{ + Name: "test-wf", + DonID: donID, + Status: uint8(1), + SecretsURL: giveSecretsURL, + } + giveContents = "contents" + wantContents = "updated contents" + fetcherFn = func(_ context.Context, _ string) ([]byte, error) { + return []byte(wantContents), nil + } + contractName = syncer.ContractName + forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent) + ) + + // fill ID with randomd data + var giveID [32]byte + _, err := rand.Read((giveID)[:]) + require.NoError(t, err) + giveWorkflow.ID = giveID + + // Deploy a test workflow_registry + wfRegistryAddr, _, wfRegistryC, err := workflow_registry_wrapper.DeployWorkflowRegistry(backendTH.ContractsOwner, backendTH.Backend.Client()) + backendTH.Backend.Commit() + require.NoError(t, err) + + lggr.Infof("deployed workflow registry at %s\n", wfRegistryAddr.Hex()) + + // Build the ContractReader config + contractReaderCfg := evmtypes.ChainReaderConfig{ + Contracts: map[string]evmtypes.ChainContractReader{ + contractName: { + ContractPollingFilter: evmtypes.ContractPollingFilter{ + GenericEventNames: []string{forceUpdateSecretsEvent}, + }, + ContractABI: workflow_registry_wrapper.WorkflowRegistryABI, + Configs: map[string]*evmtypes.ChainReaderDefinition{ + forceUpdateSecretsEvent: { + ChainSpecificName: forceUpdateSecretsEvent, + ReadType: evmtypes.Event, + }, + }, + }, + }, + } + + contractReaderCfgBytes, err := json.Marshal(contractReaderCfg) + require.NoError(t, err) + + contractReader, err := backendTH.NewContractReader(ctx, t, contractReaderCfgBytes) + require.NoError(t, err) + + err = contractReader.Bind(ctx, []types.BoundContract{{Name: contractName, Address: wfRegistryAddr.Hex()}}) + require.NoError(t, err) + + // Seed the DB + hash, err := crypto.Keccak256(append(backendTH.ContractsOwner.From[:], []byte(giveSecretsURL)...)) + require.NoError(t, err) + giveHash := hex.EncodeToString(hash) + + gotID, err := orm.Create(ctx, giveSecretsURL, giveHash, giveContents) + require.NoError(t, err) + + gotSecretsURL, err := orm.GetSecretsURLByID(ctx, gotID) + require.NoError(t, err) + require.Equal(t, giveSecretsURL, gotSecretsURL) + + // verify the DB + contents, err := orm.GetContents(ctx, giveSecretsURL) + require.NoError(t, err) + require.Equal(t, contents, giveContents) + + // Create the worker + worker := syncer.NewWorkflowRegistry( + lggr, + orm, + contractReader, + fetcherFn, + wfRegistryAddr.Hex(), + syncer.WithTicker(giveTicker), + ) + + servicetest.Run(t, worker) + + // setup contract state to allow the secrets to be updated + updateAllowedDONs(t, backendTH, wfRegistryC, []uint32{donID}, true) + updateAuthorizedAddress(t, backendTH, wfRegistryC, []common.Address{backendTH.ContractsOwner.From}, true) + registerWorkflow(t, backendTH, wfRegistryC, giveWorkflow) + + // generate a log event + requestForceUpdateSecrets(t, backendTH, wfRegistryC, giveSecretsURL) + + // Require the secrets contents to eventually be updated + require.Eventually(t, func() bool { + secrets, err := orm.GetContents(ctx, giveSecretsURL) + lggr.Debugf("got secrets %v", secrets) + require.NoError(t, err) + return secrets == wantContents + }, 5*time.Second, time.Second) +} + +func updateAuthorizedAddress( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + addresses []common.Address, + _ bool, +) { + t.Helper() + _, err := wfRegC.UpdateAuthorizedAddresses(th.ContractsOwner, addresses, true) + require.NoError(t, err, "failed to update authorised addresses") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() + gotAddresses, err := wfRegC.GetAllAuthorizedAddresses(&bind.CallOpts{ + From: th.ContractsOwner.From, + }) + require.NoError(t, err) + require.ElementsMatch(t, addresses, gotAddresses) +} + +func updateAllowedDONs( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + donIDs []uint32, + allowed bool, +) { + t.Helper() + _, err := wfRegC.UpdateAllowedDONs(th.ContractsOwner, donIDs, allowed) + require.NoError(t, err, "failed to update DONs") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() + gotDons, err := wfRegC.GetAllAllowedDONs(&bind.CallOpts{ + From: th.ContractsOwner.From, + }) + require.NoError(t, err) + require.ElementsMatch(t, donIDs, gotDons) +} + +type RegisterWorkflowCMD struct { + Name string + ID [32]byte + DonID uint32 + Status uint8 + BinaryURL string + ConfigURL string + SecretsURL string +} + +func registerWorkflow( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + input RegisterWorkflowCMD, +) { + t.Helper() + _, err := wfRegC.RegisterWorkflow(th.ContractsOwner, input.Name, input.ID, input.DonID, + input.Status, input.BinaryURL, input.ConfigURL, input.SecretsURL) + require.NoError(t, err, "failed to register workflow") + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} + +func requestForceUpdateSecrets( + t *testing.T, + th *testutils.EVMBackendTH, + wfRegC *workflow_registry_wrapper.WorkflowRegistry, + secretsURL string, +) { + _, err := wfRegC.RequestForceUpdateSecrets(th.ContractsOwner, secretsURL) + require.NoError(t, err) + th.Backend.Commit() + th.Backend.Commit() + th.Backend.Commit() +} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 1af31336734..5e712cb762f 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -274,7 +274,7 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { ticker = w.getTicker(ctx) - signal = make(chan struct{}) + signals = make(map[WorkflowRegistryEventType]chan struct{}, 0) ) // critical failure if there is no reader, the loop will exit and the parent context will be @@ -287,6 +287,8 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { // fan out and query for each event type for i := 0; i < len(w.eventTypes); i++ { + signal := make(chan struct{}, 1) + signals[w.eventTypes[i]] = signal w.wg.Add(1) go func() { defer w.wg.Done() @@ -312,6 +314,7 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { // for each event type, send a signal for it to execute a query and produce a new // batch of event logs for i := 0; i < len(w.eventTypes); i++ { + signal := signals[w.eventTypes[i]] select { case signal <- struct{}{}: case <-ctx.Done(): From 8dc2b5276a0537ae7f7869d23ab19b712501a3e4 Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Wed, 20 Nov 2024 02:13:00 +0200 Subject: [PATCH 3/4] chore(job): adjust db tests --- core/services/job/job_orm_test.go | 42 +++++++++++++++---- core/services/job/models.go | 3 +- core/services/job/orm.go | 4 +- .../migrations/0259_add_workflow_secrets.sql | 4 +- 4 files changed, 39 insertions(+), 14 deletions(-) diff --git a/core/services/job/job_orm_test.go b/core/services/job/job_orm_test.go index 9db99fcd48d..fd54a39d431 100644 --- a/core/services/job/job_orm_test.go +++ b/core/services/job/job_orm_test.go @@ -45,6 +45,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/relay" "github.com/smartcontractkit/chainlink/v2/core/services/vrf/vrfcommon" "github.com/smartcontractkit/chainlink/v2/core/services/webhook" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/testdata/testspecs" "github.com/smartcontractkit/chainlink/v2/core/utils/testutils/heavyweight" ) @@ -1873,6 +1874,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { c.ID = s.ID c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow99", addr1) // insert with mismatched name c.SpecType = job.YamlSpec + c.SecretsID = s.SecretsID return mustInsertWFJob(t, o, &c) }, }, @@ -1892,6 +1894,7 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { var c job.WorkflowSpec c.ID = s.ID c.Workflow = pkgworkflows.WFYamlSpec(t, "workflow03", addr2) // insert with mismatched owner + c.SecretsID = s.SecretsID return mustInsertWFJob(t, o, &c) }, }, @@ -1899,22 +1902,32 @@ func Test_ORM_FindJobByWorkflow(t *testing.T) { }, } - for _, tt := range tests { + for i, tt := range tests { t.Run(tt.name, func(t *testing.T) { + ctx := testutils.Context(t) ks := cltest.NewKeyStore(t, tt.fields.ds) + + secretsORM := syncer.NewWorkflowRegistryDS(tt.fields.ds, logger.TestLogger(t)) + + sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz") + require.NoError(t, err) + tt.args.spec.SecretsID = sql.NullInt64{Int64: sid, Valid: true} + pipelineORM := pipeline.NewORM(tt.fields.ds, logger.TestLogger(t), configtest.NewTestGeneralConfig(t).JobPipeline().MaxSuccessfulRuns()) bridgesORM := bridges.NewORM(tt.fields.ds) o := NewTestORM(t, tt.fields.ds, pipelineORM, bridgesORM, ks) + var wantJobID int32 if tt.args.before != nil { wantJobID = tt.args.before(t, o, tt.args.spec) } - ctx := testutils.Context(t) + gotJ, err := o.FindJobIDByWorkflow(ctx, *tt.args.spec) if (err != nil) != tt.wantErr { t.Errorf("orm.FindJobByWorkflow() error = %v, wantErr %v", err, tt.wantErr) return } + if err == nil { assert.Equal(t, wantJobID, gotJ, "mismatch job id") } @@ -1936,25 +1949,36 @@ func Test_ORM_FindJobByWorkflow_Multiple(t *testing.T) { bridges.NewORM(db), cltest.NewKeyStore(t, db)) ctx := testutils.Context(t) + secretsORM := syncer.NewWorkflowRegistryDS(db, logger.TestLogger(t)) + + var sids []int64 + for i := 0; i < 3; i++ { + sid, err := secretsORM.Create(ctx, "some-url.com", fmt.Sprintf("some-hash-%d", i), "some-contentz") + require.NoError(t, err) + sids = append(sids, sid) + } wfYaml1 := pkgworkflows.WFYamlSpec(t, "workflow00", addr1) s1 := job.WorkflowSpec{ - Workflow: wfYaml1, - SpecType: job.YamlSpec, + Workflow: wfYaml1, + SpecType: job.YamlSpec, + SecretsID: sql.NullInt64{Int64: sids[0], Valid: true}, } wantJobID1 := mustInsertWFJob(t, o, &s1) wfYaml2 := pkgworkflows.WFYamlSpec(t, "workflow01", addr1) s2 := job.WorkflowSpec{ - Workflow: wfYaml2, - SpecType: job.YamlSpec, + Workflow: wfYaml2, + SpecType: job.YamlSpec, + SecretsID: sql.NullInt64{Int64: sids[1], Valid: true}, } wantJobID2 := mustInsertWFJob(t, o, &s2) wfYaml3 := pkgworkflows.WFYamlSpec(t, "workflow00", addr2) s3 := job.WorkflowSpec{ - Workflow: wfYaml3, - SpecType: job.YamlSpec, + Workflow: wfYaml3, + SpecType: job.YamlSpec, + SecretsID: sql.NullInt64{Int64: sids[2], Valid: true}, } wantJobID3 := mustInsertWFJob(t, o, &s3) @@ -1992,7 +2016,7 @@ func mustInsertWFJob(t *testing.T, orm job.ORM, s *job.WorkflowSpec) int32 { } err = orm.CreateJob(ctx, &j) - require.NoError(t, err, "failed to insert job with wf spec %v %s", s, s.Workflow) + require.NoError(t, err, "failed to insert job with wf spec %+v %s", s, err) return j.ID } diff --git a/core/services/job/models.go b/core/services/job/models.go index 84ff2f5d7f1..423a297c8da 100644 --- a/core/services/job/models.go +++ b/core/services/job/models.go @@ -2,6 +2,7 @@ package job import ( "context" + "database/sql" "database/sql/driver" "encoding/json" "fmt" @@ -879,7 +880,7 @@ type WorkflowSpec struct { WorkflowName string `toml:"-" db:"workflow_name"` // Derived. Do not modify. the name of the workflow. BinaryURL string `db:"binary_url"` ConfigURL string `db:"config_url"` - SecretsID string `db:"secrets_id"` + SecretsID sql.NullInt64 `db:"secrets_id"` CreatedAt time.Time `toml:"-"` UpdatedAt time.Time `toml:"-"` SpecType WorkflowSpecType `toml:"spec_type" db:"spec_type"` diff --git a/core/services/job/orm.go b/core/services/job/orm.go index 5e8b5ce127f..92ec9b2e83c 100644 --- a/core/services/job/orm.go +++ b/core/services/job/orm.go @@ -433,8 +433,8 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error { case Stream: // 'stream' type has no associated spec, nothing to do here case Workflow: - sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, created_at, updated_at, spec_type, config) - VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, NOW(), NOW(), :spec_type, :config) + sql := `INSERT INTO workflow_specs (workflow, workflow_id, workflow_owner, workflow_name, binary_url, config_url, secrets_id, created_at, updated_at, spec_type, config) + VALUES (:workflow, :workflow_id, :workflow_owner, :workflow_name, :binary_url, :config_url, :secrets_id, NOW(), NOW(), :spec_type, :config) RETURNING id;` specID, err := tx.prepareQuerySpecID(ctx, sql, jb.WorkflowSpec) if err != nil { diff --git a/core/store/migrate/migrations/0259_add_workflow_secrets.sql b/core/store/migrate/migrations/0259_add_workflow_secrets.sql index d6f82033b41..fb76d945571 100644 --- a/core/store/migrate/migrations/0259_add_workflow_secrets.sql +++ b/core/store/migrate/migrations/0259_add_workflow_secrets.sql @@ -13,8 +13,8 @@ CREATE INDEX idx_secrets_url ON workflow_secrets(secrets_url); -- Alter the workflow_specs table ALTER TABLE workflow_specs -ADD COLUMN binary_url TEXT, -ADD COLUMN config_url TEXT, +ADD COLUMN binary_url TEXT DEFAULT '', +ADD COLUMN config_url TEXT DEFAULT '', ADD COLUMN secrets_id INT UNIQUE REFERENCES workflow_secrets(id) ON DELETE CASCADE; -- Alter the config column type From 37977a458cfbe7c043ee3987edbfec6e3a2038da Mon Sep 17 00:00:00 2001 From: Michael Street <5597260+MStreet3@users.noreply.github.com> Date: Wed, 20 Nov 2024 16:31:56 +0200 Subject: [PATCH 4/4] chore(syncer): removes signaler ticker --- .../workflows/syncer/workflow_syncer_test.go | 7 +++--- .../workflows/syncer/workflow_registry.go | 11 ++++----- .../syncer/workflow_registry_test.go | 9 +++---- core/utils/signalers/signalers.go | 24 ------------------- 4 files changed, 12 insertions(+), 39 deletions(-) delete mode 100644 core/utils/signalers/signalers.go diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index e830b141d68..802dc427c93 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -21,7 +21,6 @@ import ( evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" - "github.com/smartcontractkit/chainlink/v2/core/utils/signalers" "github.com/stretchr/testify/require" ) @@ -34,7 +33,7 @@ func Test_SecretsWorker(t *testing.T) { db = pgtest.NewSqlxDB(t) orm = syncer.NewWorkflowRegistryDS(db, lggr) - giveTicker = signalers.MakeTicker(ctx.Done(), 500*time.Millisecond) + giveTicker = time.NewTicker(500 * time.Millisecond) giveSecretsURL = "https://original-url.com" donID = uint32(1) giveWorkflow = RegisterWorkflowCMD{ @@ -52,6 +51,8 @@ func Test_SecretsWorker(t *testing.T) { forceUpdateSecretsEvent = string(syncer.ForceUpdateSecretsEvent) ) + defer giveTicker.Stop() + // fill ID with randomd data var giveID [32]byte _, err := rand.Read((giveID)[:]) @@ -116,7 +117,7 @@ func Test_SecretsWorker(t *testing.T) { contractReader, fetcherFn, wfRegistryAddr.Hex(), - syncer.WithTicker(giveTicker), + syncer.WithTicker(giveTicker.C), ) servicetest.Run(t, worker) diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index 5e712cb762f..ff77da9ea6f 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -18,7 +18,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" - "github.com/smartcontractkit/chainlink/v2/core/utils/signalers" ) const name = "WorkflowRegistrySyncer" @@ -107,7 +106,7 @@ type workflowRegistry struct { wg sync.WaitGroup // ticker is the interval at which the workflowRegistry will poll the contract for events. - ticker <-chan struct{} + ticker <-chan time.Time lggr logger.Logger orm WorkflowRegistryDS @@ -136,7 +135,7 @@ type workflowRegistry struct { // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful // for overriding the default tick interval. -func WithTicker(ticker <-chan struct{}) func(*workflowRegistry) { +func WithTicker(ticker <-chan time.Time) func(*workflowRegistry) { return func(wr *workflowRegistry) { wr.ticker = ticker } @@ -272,7 +271,7 @@ func (w *workflowRegistry) syncEventsLoop(ctx context.Context) { } } - ticker = w.getTicker(ctx) + ticker = w.getTicker() signals = make(map[WorkflowRegistryEventType]chan struct{}, 0) ) @@ -364,9 +363,9 @@ func (w *workflowRegistry) orderAndSend( // getTicker returns the ticker that the workflowRegistry will use to poll for events. If the ticker // is nil, then a default ticker is returned. -func (w *workflowRegistry) getTicker(ctx context.Context) <-chan struct{} { +func (w *workflowRegistry) getTicker() <-chan time.Time { if w.ticker == nil { - return signalers.MakeTicker(ctx.Done(), defaultTickInterval) + return time.NewTicker(defaultTickInterval).C } return w.ticker diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 0fb1224d432..d979437d54d 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -54,16 +54,13 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { gateway = func(_ context.Context, _ string) ([]byte, error) { return []byte(wantContents), nil } - ticker = make(chan struct{}) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress) + ticker = make(chan time.Time) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, WithTicker(ticker)) ) // Cleanup the worker defer cancel() - // Override the ticker - worker.ticker = ticker - // Seed the DB with an original entry _, err = orm.Create(ctx, giveURL, hex.EncodeToString(giveHash), giveContents) require.NoError(t, err) @@ -93,7 +90,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { servicetest.Run(t, worker) // Send a tick to start a query - ticker <- struct{}{} + ticker <- time.Now() // Require the secrets contents to eventually be updated require.Eventually(t, func() bool { diff --git a/core/utils/signalers/signalers.go b/core/utils/signalers/signalers.go deleted file mode 100644 index b05af179251..00000000000 --- a/core/utils/signalers/signalers.go +++ /dev/null @@ -1,24 +0,0 @@ -package signalers - -import "time" - -func MakeTicker(stop <-chan struct{}, d time.Duration) <-chan struct{} { - ticker := make(chan struct{}) - internalTicker := time.NewTicker(d) - - go func() { - defer close(ticker) - defer internalTicker.Stop() - - for { - select { - case <-stop: - return - case <-internalTicker.C: - ticker <- struct{}{} - } - } - }() - - return ticker -}