diff --git a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go index ba29e98526e..da7c3af64ef 100644 --- a/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go +++ b/core/services/relay/evm/capabilities/workflows/syncer/workflow_syncer_test.go @@ -18,6 +18,7 @@ import ( coretestutils "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/capabilities/testutils" evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/syncer" @@ -122,6 +123,7 @@ func Test_SecretsWorker(t *testing.T) { nil, nil, emitter, + workflowkey.Key{}, syncer.WithTicker(giveTicker.C), ) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 69655b5b39c..ff7968e35af 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -96,7 +96,7 @@ func (sucm *stepUpdateManager) len() int64 { } type secretsFetcher interface { - SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) + SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) } // Engine handles the lifecycle of a single workflow and its executions. @@ -850,7 +850,7 @@ func (e *Engine) interpolateEnvVars(config map[string]any, env exec.Env) (*value // registry (for capability-level configuration). It doesn't perform any caching of the config values, since // the two registries perform their own caching. func (e *Engine) configForStep(ctx context.Context, lggr logger.Logger, step *step) (*values.Map, error) { - secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name) + secrets, err := e.secretsFetcher.SecretsFor(ctx, e.workflow.owner, e.workflow.name, e.workflow.id) if err != nil { return nil, fmt.Errorf("failed to fetch secrets: %w", err) } diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 70216ac8c78..95ac74f0c76 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -153,7 +153,7 @@ func newTestEngineWithYAMLSpec(t *testing.T, reg *coreCap.Registry, spec string, type mockSecretsFetcher struct{} -func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { +func (s mockSecretsFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) { return map[string]string{}, nil } @@ -1606,7 +1606,7 @@ type mockFetcher struct { retval map[string]string } -func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { +func (m *mockFetcher) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) { return m.retval, nil } diff --git a/core/services/workflows/syncer/handler.go b/core/services/workflows/syncer/handler.go index 5ccb3f5e180..1137f663e4c 100644 --- a/core/services/workflows/syncer/handler.go +++ b/core/services/workflows/syncer/handler.go @@ -7,6 +7,8 @@ import ( "errors" "fmt" + "github.com/jonboulle/clockwork" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/workflows/wasm/host" @@ -94,27 +96,22 @@ type WorkflowRegistryWorkflowDeletedV1 struct { } type secretsFetcher interface { - SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) -} - -// secretsFetcherFunc implements the secretsFetcher interface for a function. -type secretsFetcherFunc func(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) - -func (f secretsFetcherFunc) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { - return f(ctx, workflowOwner, workflowName) + SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) } // eventHandler is a handler for WorkflowRegistryEvent events. Each event type has a corresponding // method that handles the event. type eventHandler struct { - lggr logger.Logger - orm WorkflowRegistryDS - fetcher FetcherFunc - workflowStore store.Store - capRegistry core.CapabilitiesRegistry - engineRegistry *engineRegistry - emitter custmsg.MessageEmitter - secretsFetcher secretsFetcher + lggr logger.Logger + orm WorkflowRegistryDS + fetcher FetcherFunc + workflowStore store.Store + capRegistry core.CapabilitiesRegistry + engineRegistry *engineRegistry + emitter custmsg.MessageEmitter + secretsFetcher secretsFetcher + lastFetchedAtMap *lastFetchedAtMap + clock clockwork.Clock } // newEventHandler returns a new eventHandler instance. @@ -127,16 +124,20 @@ func newEventHandler( engineRegistry *engineRegistry, emitter custmsg.MessageEmitter, secretsFetcher secretsFetcher, + lastFetchedAtMap *lastFetchedAtMap, + clock clockwork.Clock, ) *eventHandler { return &eventHandler{ - lggr: lggr, - orm: orm, - fetcher: gateway, - workflowStore: workflowStore, - capRegistry: capRegistry, - engineRegistry: engineRegistry, - emitter: emitter, - secretsFetcher: secretsFetcher, + lggr: lggr, + orm: orm, + fetcher: gateway, + workflowStore: workflowStore, + capRegistry: capRegistry, + engineRegistry: engineRegistry, + emitter: emitter, + secretsFetcher: secretsFetcher, + lastFetchedAtMap: lastFetchedAtMap, + clock: clock, } } @@ -153,7 +154,7 @@ func (h *eventHandler) Handle(ctx context.Context, event WorkflowRegistryEvent) platform.KeyWorkflowOwner, hex.EncodeToString(payload.Owner), ) - if err := h.forceUpdateSecretsEvent(ctx, payload); err != nil { + if _, err := h.forceUpdateSecretsEvent(ctx, payload); err != nil { logCustMsg(ctx, cma, fmt.Sprintf("failed to handle force update secrets event: %v", err), h.lggr) return err } @@ -263,7 +264,7 @@ func (h *eventHandler) workflowRegisteredEvent( Lggr: h.lggr, Workflow: *sdkSpec, WorkflowID: wfID, - WorkflowOwner: hex.EncodeToString(payload.WorkflowOwner), + WorkflowOwner: string(payload.WorkflowOwner), // this gets hex encoded in the engine. WorkflowName: payload.WorkflowName, Registry: h.capRegistry, Store: h.workflowStore, @@ -312,27 +313,29 @@ func (h *eventHandler) workflowActivatedEvent( func (h *eventHandler) forceUpdateSecretsEvent( ctx context.Context, payload WorkflowRegistryForceUpdateSecretsRequestedV1, -) error { +) (string, error) { // Get the URL of the secrets file from the event data hash := hex.EncodeToString(payload.SecretsURLHash) url, err := h.orm.GetSecretsURLByHash(ctx, hash) if err != nil { - return fmt.Errorf("failed to get URL by hash %s : %w", hash, err) + return "", fmt.Errorf("failed to get URL by hash %s : %w", hash, err) } // Fetch the contents of the secrets file from the url via the fetcher secrets, err := h.fetcher(ctx, url) if err != nil { - return fmt.Errorf("failed to fetch secrets from url %s : %w", url, err) + return "", err } + h.lastFetchedAtMap.Set(hash, h.clock.Now()) + // Update the secrets in the ORM if _, err := h.orm.Update(ctx, hash, string(secrets)); err != nil { - return fmt.Errorf("failed to update secrets: %w", err) + return "", fmt.Errorf("failed to update secrets: %w", err) } - return nil + return string(secrets), nil } // workflowID returns a hex encoded sha256 hash of the wasm, config and secretsURL. diff --git a/core/services/workflows/syncer/handler_test.go b/core/services/workflows/syncer/handler_test.go index 42da3e8de9d..77d7a350dc6 100644 --- a/core/services/workflows/syncer/handler_test.go +++ b/core/services/workflows/syncer/handler_test.go @@ -63,7 +63,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(int64(1), nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock()) err = h.Handle(ctx, giveEvent) require.NoError(t, err) }) @@ -77,7 +77,7 @@ func Test_Handler(t *testing.T) { return []byte("contents"), nil } - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock()) err := h.Handle(ctx, giveEvent) require.Error(t, err) require.Contains(t, err.Error(), "event type unsupported") @@ -86,7 +86,7 @@ func Test_Handler(t *testing.T) { t.Run("fails to get secrets url", func(t *testing.T) { mockORM := mocks.NewORM(t) ctx := testutils.Context(t) - h := newEventHandler(lggr, mockORM, nil, nil, nil, nil, emitter, nil) + h := newEventHandler(lggr, mockORM, nil, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock()) giveURL := "https://original-url.com" giveBytes, err := crypto.Keccak256([]byte(giveURL)) require.NoError(t, err) @@ -126,7 +126,7 @@ func Test_Handler(t *testing.T) { return nil, assert.AnError } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock()) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) @@ -153,7 +153,7 @@ func Test_Handler(t *testing.T) { } mockORM.EXPECT().GetSecretsURLByHash(matches.AnyContext, giveHash).Return(giveURL, nil) mockORM.EXPECT().Update(matches.AnyContext, giveHash, "contents").Return(0, assert.AnError) - h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil) + h := newEventHandler(lggr, mockORM, fetcher, nil, nil, nil, emitter, nil, newLastFetchedAtMap(), clockwork.NewFakeClock()) err = h.Handle(ctx, giveEvent) require.Error(t, err) require.ErrorIs(t, err, assert.AnError) diff --git a/core/services/workflows/syncer/mocks/orm.go b/core/services/workflows/syncer/mocks/orm.go index 128100ea907..da96f422361 100644 --- a/core/services/workflows/syncer/mocks/orm.go +++ b/core/services/workflows/syncer/mocks/orm.go @@ -243,6 +243,70 @@ func (_c *ORM_GetContentsByHash_Call) RunAndReturn(run func(context.Context, str return _c } +// GetContentsByWorkflowID provides a mock function with given fields: ctx, workflowID +func (_m *ORM) GetContentsByWorkflowID(ctx context.Context, workflowID string) (string, string, error) { + ret := _m.Called(ctx, workflowID) + + if len(ret) == 0 { + panic("no return value specified for GetContentsByWorkflowID") + } + + var r0 string + var r1 string + var r2 error + if rf, ok := ret.Get(0).(func(context.Context, string) (string, string, error)); ok { + return rf(ctx, workflowID) + } + if rf, ok := ret.Get(0).(func(context.Context, string) string); ok { + r0 = rf(ctx, workflowID) + } else { + r0 = ret.Get(0).(string) + } + + if rf, ok := ret.Get(1).(func(context.Context, string) string); ok { + r1 = rf(ctx, workflowID) + } else { + r1 = ret.Get(1).(string) + } + + if rf, ok := ret.Get(2).(func(context.Context, string) error); ok { + r2 = rf(ctx, workflowID) + } else { + r2 = ret.Error(2) + } + + return r0, r1, r2 +} + +// ORM_GetContentsByWorkflowID_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'GetContentsByWorkflowID' +type ORM_GetContentsByWorkflowID_Call struct { + *mock.Call +} + +// GetContentsByWorkflowID is a helper method to define mock.On call +// - ctx context.Context +// - workflowID string +func (_e *ORM_Expecter) GetContentsByWorkflowID(ctx interface{}, workflowID interface{}) *ORM_GetContentsByWorkflowID_Call { + return &ORM_GetContentsByWorkflowID_Call{Call: _e.mock.On("GetContentsByWorkflowID", ctx, workflowID)} +} + +func (_c *ORM_GetContentsByWorkflowID_Call) Run(run func(ctx context.Context, workflowID string)) *ORM_GetContentsByWorkflowID_Call { + _c.Call.Run(func(args mock.Arguments) { + run(args[0].(context.Context), args[1].(string)) + }) + return _c +} + +func (_c *ORM_GetContentsByWorkflowID_Call) Return(_a0 string, _a1 string, _a2 error) *ORM_GetContentsByWorkflowID_Call { + _c.Call.Return(_a0, _a1, _a2) + return _c +} + +func (_c *ORM_GetContentsByWorkflowID_Call) RunAndReturn(run func(context.Context, string) (string, string, error)) *ORM_GetContentsByWorkflowID_Call { + _c.Call.Return(run) + return _c +} + // GetSecretsURLByHash provides a mock function with given fields: ctx, hash func (_m *ORM) GetSecretsURLByHash(ctx context.Context, hash string) (string, error) { ret := _m.Called(ctx, hash) diff --git a/core/services/workflows/syncer/orm.go b/core/services/workflows/syncer/orm.go index 16612b9a9c6..1755fb1eb21 100644 --- a/core/services/workflows/syncer/orm.go +++ b/core/services/workflows/syncer/orm.go @@ -3,6 +3,7 @@ package syncer import ( "context" "database/sql" + "errors" "fmt" "time" @@ -25,6 +26,9 @@ type WorkflowSecretsDS interface { // GetContentsByHash returns the contents of the secret at the given hashed URL. GetContentsByHash(ctx context.Context, hash string) (string, error) + // GetContentsByWorkflowID returns the contents and secrets_url of the secret for the given workflow. + GetContentsByWorkflowID(ctx context.Context, workflowID string) (string, string, error) + // GetSecretsURLHash returns the keccak256 hash of the owner and secrets URL. GetSecretsURLHash(owner, secretsURL []byte) ([]byte, error) @@ -123,6 +127,43 @@ func (orm *orm) GetContents(ctx context.Context, url string) (string, error) { return contents, nil // Return the populated Artifact struct } +type Int struct { + sql.NullInt64 +} + +type joinRecord struct { + SecretsID sql.NullString `db:"wspec_secrets_id"` + SecretsURLHash sql.NullString `db:"wsec_secrets_url_hash"` + Contents sql.NullString `db:"wsec_contents"` +} + +var EmptySecrets = errors.New("secrets field is empty") + +// GetContentsByWorkflowID joins the workflow_secrets on the workflow_specs table and gets +// the associated secrets contents. +func (orm *orm) GetContentsByWorkflowID(ctx context.Context, workflowID string) (string, string, error) { + var jr joinRecord + err := orm.ds.GetContext( + ctx, + &jr, + `SELECT wsec.secrets_url_hash AS wsec_secrets_url_hash, wsec.contents AS wsec_contents, wspec.secrets_id AS wspec_secrets_id + FROM workflow_specs AS wspec + LEFT JOIN + workflow_secrets AS wsec ON wspec.secrets_id = wsec.id + WHERE wspec.workflow_id = $1`, + workflowID, + ) + if err != nil { + return "", "", err + } + + if !jr.SecretsID.Valid { + return "", "", EmptySecrets + } + + return jr.SecretsURLHash.String, jr.Contents.String, nil +} + // Update updates the secrets content at the given hash or inserts a new record if not found. func (orm *orm) Update(ctx context.Context, hash, contents string) (int64, error) { var id int64 diff --git a/core/services/workflows/syncer/orm_test.go b/core/services/workflows/syncer/orm_test.go index 1be4e54f472..495a1a3831a 100644 --- a/core/services/workflows/syncer/orm_test.go +++ b/core/services/workflows/syncer/orm_test.go @@ -196,3 +196,63 @@ func Test_GetWorkflowSpec(t *testing.T) { require.Nil(t, dbSpec) }) } + +func Test_GetContentsByWorkflowID(t *testing.T) { + db := pgtest.NewSqlxDB(t) + ctx := testutils.Context(t) + lggr := logger.TestLogger(t) + orm := &orm{ds: db, lggr: lggr} + + // workflow_id is missing + _, _, err := orm.GetContentsByWorkflowID(ctx, "doesnt-exist") + assert.ErrorContains(t, err, "no rows in result set") + + // secrets_id is nil; should return EmptySecrets + workflowID := "aWorkflowID" + _, err = orm.UpsertWorkflowSpec(ctx, &job.WorkflowSpec{ + Workflow: "", + Config: "", + WorkflowID: workflowID, + WorkflowOwner: "aWorkflowOwner", + WorkflowName: "aWorkflowName", + BinaryURL: "", + ConfigURL: "", + CreatedAt: time.Now(), + SpecType: job.DefaultSpecType, + }) + require.NoError(t, err) + + _, _, err = orm.GetContentsByWorkflowID(ctx, workflowID) + assert.ErrorIs(t, err, EmptySecrets) + + // retrieves the artifact if provided + giveURL := "https://example.com" + giveBytes, err := crypto.Keccak256([]byte(giveURL)) + require.NoError(t, err) + giveHash := hex.EncodeToString(giveBytes) + giveContent := "some contents" + + secretsID, err := orm.Create(ctx, giveURL, giveHash, giveContent) + require.NoError(t, err) + + _, err = orm.UpsertWorkflowSpec(ctx, &job.WorkflowSpec{ + Workflow: "", + Config: "", + SecretsID: sql.NullInt64{Int64: secretsID, Valid: true}, + WorkflowID: workflowID, + WorkflowOwner: "aWorkflowOwner", + WorkflowName: "aWorkflowName", + BinaryURL: "", + ConfigURL: "", + CreatedAt: time.Now(), + SpecType: job.DefaultSpecType, + }) + require.NoError(t, err) + _, err = orm.GetWorkflowSpec(ctx, "aWorkflowOwner", "aWorkflowName") + require.NoError(t, err) + + gotHash, gotContent, err := orm.GetContentsByWorkflowID(ctx, workflowID) + require.NoError(t, err) + assert.Equal(t, giveHash, gotHash) + assert.Equal(t, giveContent, gotContent) +} diff --git a/core/services/workflows/syncer/workflow_registry.go b/core/services/workflows/syncer/workflow_registry.go index cdd0c71acc0..a9e1130af95 100644 --- a/core/services/workflows/syncer/workflow_registry.go +++ b/core/services/workflows/syncer/workflow_registry.go @@ -10,6 +10,8 @@ import ( "sync" "time" + "github.com/jonboulle/clockwork" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services" types "github.com/smartcontractkit/chainlink-common/pkg/types" @@ -17,8 +19,10 @@ import ( query "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets" "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/workflow/generated/workflow_registry_wrapper" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" evmtypes "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/types" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) @@ -82,6 +86,30 @@ type WorkflowRegistrySyncer interface { var _ WorkflowRegistrySyncer = (*workflowRegistry)(nil) +type lastFetchedAtMap struct { + m map[string]time.Time + sync.RWMutex +} + +func (l *lastFetchedAtMap) Set(url string, at time.Time) { + l.Lock() + defer l.Unlock() + l.m[url] = at +} + +func (l *lastFetchedAtMap) Get(url string) (time.Time, bool) { + l.RLock() + defer l.RUnlock() + got, ok := l.m[url] + return got, ok +} + +func newLastFetchedAtMap() *lastFetchedAtMap { + return &lastFetchedAtMap{ + m: map[string]time.Time{}, + } +} + // workflowRegistry is the implementation of the WorkflowRegistrySyncer interface. type workflowRegistry struct { services.StateMachine @@ -123,6 +151,11 @@ type workflowRegistry struct { workflowStore store.Store capRegistry core.CapabilitiesRegistry engineRegistry *engineRegistry + + encryptionKey workflowkey.Key + secretsFreshnessDuration time.Duration + urlHashToFetchedAt *lastFetchedAtMap + clock clockwork.Clock } // WithTicker allows external callers to provide a ticker to the workflowRegistry. This is useful @@ -139,20 +172,36 @@ func WithReader(reader types.ContractReader) func(*workflowRegistry) { } } +func WithSecretsFreshness(duration time.Duration) func(*workflowRegistry) { + return func(wr *workflowRegistry) { + wr.secretsFreshnessDuration = duration + } +} + +func WithClock(clock clockwork.Clock) func(*workflowRegistry) { + return func(wr *workflowRegistry) { + wr.clock = clock + } +} + +var defaultSecretsFreshnessDuration = 24 * time.Hour + // NewWorkflowRegistry returns a new workflowRegistry. // Only queries for WorkflowRegistryForceUpdateSecretsRequestedV1 events. -func NewWorkflowRegistry[T ContractReader]( +func NewWorkflowRegistry( lggr logger.Logger, orm WorkflowRegistryDS, - reader T, + reader ContractReader, gateway FetcherFunc, addr string, workflowStore store.Store, capRegistry core.CapabilitiesRegistry, emitter custmsg.Labeler, + encryptionKey workflowkey.Key, opts ...func(*workflowRegistry), ) *workflowRegistry { ets := []WorkflowRegistryEventType{ForceUpdateSecretsEvent} + m := newLastFetchedAtMap() wr := &workflowRegistry{ lggr: lggr.Named(name), emitter: emitter, @@ -168,19 +217,23 @@ func NewWorkflowRegistry[T ContractReader]( QueryCount: 20, StartBlockNum: 0, }, - initReader: newReader, - heap: newBlockHeightHeap(), - stopCh: make(services.StopChan), - eventTypes: ets, - eventsCh: make(chan WorkflowRegistryEventResponse), - batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), + initReader: newReader, + heap: newBlockHeightHeap(), + stopCh: make(services.StopChan), + eventTypes: ets, + eventsCh: make(chan WorkflowRegistryEventResponse), + batchCh: make(chan []WorkflowRegistryEventResponse, len(ets)), + encryptionKey: encryptionKey, + urlHashToFetchedAt: m, + secretsFreshnessDuration: defaultSecretsFreshnessDuration, + clock: clockwork.NewRealClock(), } - wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, - wr.engineRegistry, wr.emitter, secretsFetcherFunc(wr.SecretsFor), - ) for _, opt := range opts { opt(wr) } + wr.handler = newEventHandler(wr.lggr, wr.orm, wr.gateway, wr.workflowStore, wr.capRegistry, + wr.engineRegistry, wr.emitter, wr, m, wr.clock, + ) return wr } @@ -230,8 +283,66 @@ func (w *workflowRegistry) Name() string { return name } -func (w *workflowRegistry) SecretsFor(ctx context.Context, workflowOwner, workflowName string) (map[string]string, error) { - return nil, errors.New("not implemented") +func (w *workflowRegistry) refreshSecrets(ctx context.Context, workflowOwner, workflowName, workflowID, secretsURLHash string) (string, error) { + owner, err := hex.DecodeString(workflowOwner) + if err != nil { + return "", err + } + + decodedHash, err := hex.DecodeString(secretsURLHash) + if err != nil { + return "", err + } + + updatedSecrets, err := w.handler.forceUpdateSecretsEvent( + ctx, + WorkflowRegistryForceUpdateSecretsRequestedV1{ + SecretsURLHash: decodedHash, + Owner: owner, + WorkflowName: name, + }, + ) + if err != nil { + return "", err + } + + return updatedSecrets, nil +} + +func (w *workflowRegistry) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) { + secretsURLHash, secretsPayload, err := w.orm.GetContentsByWorkflowID(ctx, workflowID) + if err != nil { + // The workflow record was found, but secrets_id was empty. + // Let's just stub out the response. + if errors.Is(err, EmptySecrets) { + return map[string]string{}, nil + } + + return nil, fmt.Errorf("failed to fetch secrets by workflow ID: %w", err) + } + + lastFetchedAt, ok := w.urlHashToFetchedAt.Get(secretsURLHash) + if !ok || w.clock.Now().Sub(lastFetchedAt) > w.secretsFreshnessDuration { + updatedSecrets, err := w.refreshSecrets(ctx, workflowOwner, workflowName, workflowID, secretsURLHash) + if err != nil { + w.lggr.Errorf("could not refresh secrets: proceeding with stale secrets: %w", err) + // TODO: log custom message + } else { + secretsPayload = updatedSecrets + } + } + + res := secrets.EncryptedSecretsResult{} + err = json.Unmarshal([]byte(secretsPayload), &res) + if err != nil { + return nil, fmt.Errorf("could not unmarshal secrets: %w", err) + } + + return secrets.DecryptSecretsForNode( + res, + w.encryptionKey, + workflowOwner, + ) } // handlerLoop handles the events that are emitted by the contract. @@ -574,7 +685,7 @@ func (u *nullWorkflowRegistrySyncer) Close() error { } // SecretsFor -func (u *nullWorkflowRegistrySyncer) SecretsFor(context.Context, string, string) (map[string]string, error) { +func (u *nullWorkflowRegistrySyncer) SecretsFor(ctx context.Context, workflowOwner, workflowName, workflowID string) (map[string]string, error) { return nil, nil } diff --git a/core/services/workflows/syncer/workflow_registry_test.go b/core/services/workflows/syncer/workflow_registry_test.go index 58dcbed1022..213929a02b6 100644 --- a/core/services/workflows/syncer/workflow_registry_test.go +++ b/core/services/workflows/syncer/workflow_registry_test.go @@ -2,23 +2,34 @@ package syncer import ( "context" + "database/sql" "encoding/hex" + "encoding/json" + "errors" "strconv" "testing" "time" + "github.com/jonboulle/clockwork" + "github.com/smartcontractkit/chainlink-common/pkg/custmsg" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" types "github.com/smartcontractkit/chainlink-common/pkg/types" query "github.com/smartcontractkit/chainlink-common/pkg/types/query" "github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives" "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink-common/pkg/workflows/secrets" + "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/job" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/workflowkey" + "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" "github.com/smartcontractkit/chainlink/v2/core/utils/crypto" "github.com/smartcontractkit/chainlink/v2/core/utils/matches" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -57,7 +68,7 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { return []byte(wantContents), nil } ticker = make(chan time.Time) - worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, emitter, WithTicker(ticker)) + worker = NewWorkflowRegistry(lggr, orm, reader, gateway, giveCfg.ContractAddress, nil, nil, emitter, workflowkey.Key{}, WithTicker(ticker)) ) // Cleanup the worker @@ -101,3 +112,226 @@ func Test_Workflow_Registry_Syncer(t *testing.T) { return secrets == wantContents }, 5*time.Second, time.Second) } + +func generateSecrets(workflowOwner string, secretsMap map[string][]string, encryptionKey workflowkey.Key) ([]byte, error) { + sm, secretsEnvVars, err := secrets.EncryptSecretsForNodes( + workflowOwner, + secretsMap, + map[string][32]byte{ + "p2pId": encryptionKey.PublicKey(), + }, + secrets.SecretsConfig{}, + ) + if err != nil { + return nil, err + } + return json.Marshal(secrets.EncryptedSecretsResult{ + EncryptedSecrets: sm, + Metadata: secrets.Metadata{ + WorkflowOwner: workflowOwner, + EnvVarsAssignedToNodes: secretsEnvVars, + NodePublicEncryptionKeys: map[string]string{ + "p2pId": encryptionKey.PublicKeyString(), + }, + }, + }) +} + +func Test_WorkflowRegistry_SecretsFor(t *testing.T) { + lggr := logger.TestLogger(t) + db := pgtest.NewSqlxDB(t) + orm := &orm{ds: db, lggr: lggr} + + workflowOwner := hex.EncodeToString([]byte("anOwner")) + workflowName := "aName" + workflowID := "anID" + encryptionKey, err := workflowkey.New() + require.NoError(t, err) + + url := "http://example.com" + hash := hex.EncodeToString([]byte(url)) + secretsPayload, err := generateSecrets(workflowOwner, map[string][]string{"Foo": []string{"Bar"}}, encryptionKey) + require.NoError(t, err) + secretsID, err := orm.Create(testutils.Context(t), url, hash, string(secretsPayload)) + require.NoError(t, err) + + _, err = orm.UpsertWorkflowSpec(testutils.Context(t), &job.WorkflowSpec{ + Workflow: "", + Config: "", + SecretsID: sql.NullInt64{Int64: secretsID, Valid: true}, + WorkflowID: workflowID, + WorkflowOwner: workflowOwner, + WorkflowName: workflowName, + BinaryURL: "", + ConfigURL: "", + CreatedAt: time.Now(), + SpecType: job.DefaultSpecType, + }) + require.NoError(t, err) + + fetcher := &mockFetcher{ + responseMap: map[string]mockFetchResp{ + url: mockFetchResp{Err: errors.New("could not fetch")}, + }, + } + registry := NewWorkflowRegistry( + lggr, + orm, + nil, // not needed + fetcher.Fetch, + "0x0", // not needed + store.NewDBStore(db, lggr, clockwork.NewFakeClock()), + capabilities.NewRegistry(lggr), + custmsg.NewLabeler(), + encryptionKey, + ) + + gotSecrets, err := registry.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + require.NoError(t, err) + + expectedSecrets := map[string]string{ + "Foo": "Bar", + } + assert.Equal(t, gotSecrets, expectedSecrets) +} + +func Test_WorkflowRegistry_SecretsFor_RefreshesSecrets(t *testing.T) { + lggr := logger.TestLogger(t) + db := pgtest.NewSqlxDB(t) + orm := &orm{ds: db, lggr: lggr} + + workflowOwner := hex.EncodeToString([]byte("anOwner")) + workflowName := "aName" + workflowID := "anID" + encryptionKey, err := workflowkey.New() + require.NoError(t, err) + + secretsPayload, err := generateSecrets(workflowOwner, map[string][]string{"Foo": []string{"Bar"}}, encryptionKey) + require.NoError(t, err) + + url := "http://example.com" + hash := hex.EncodeToString([]byte(url)) + + secretsID, err := orm.Create(testutils.Context(t), url, hash, string(secretsPayload)) + require.NoError(t, err) + + _, err = orm.UpsertWorkflowSpec(testutils.Context(t), &job.WorkflowSpec{ + Workflow: "", + Config: "", + SecretsID: sql.NullInt64{Int64: secretsID, Valid: true}, + WorkflowID: workflowID, + WorkflowOwner: workflowOwner, + WorkflowName: workflowName, + BinaryURL: "", + ConfigURL: "", + CreatedAt: time.Now(), + SpecType: job.DefaultSpecType, + }) + require.NoError(t, err) + + secretsPayload, err = generateSecrets(workflowOwner, map[string][]string{"Baz": []string{"Bar"}}, encryptionKey) + require.NoError(t, err) + fetcher := &mockFetcher{ + responseMap: map[string]mockFetchResp{ + url: mockFetchResp{Body: secretsPayload}, + }, + } + registry := NewWorkflowRegistry( + lggr, + orm, + nil, // not needed + fetcher.Fetch, + "0x0", // not needed + store.NewDBStore(db, lggr, clockwork.NewFakeClock()), + capabilities.NewRegistry(lggr), + custmsg.NewLabeler(), + encryptionKey, + ) + + gotSecrets, err := registry.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + require.NoError(t, err) + + expectedSecrets := map[string]string{ + "Baz": "Bar", + } + assert.Equal(t, gotSecrets, expectedSecrets) +} + +func Test_WorkflowRegistry_SecretsFor_RefreshLogic(t *testing.T) { + lggr := logger.TestLogger(t) + db := pgtest.NewSqlxDB(t) + orm := &orm{ds: db, lggr: lggr} + + workflowOwner := hex.EncodeToString([]byte("anOwner")) + workflowName := "aName" + workflowID := "anID" + encryptionKey, err := workflowkey.New() + require.NoError(t, err) + + secretsPayload, err := generateSecrets(workflowOwner, map[string][]string{"Foo": []string{"Bar"}}, encryptionKey) + require.NoError(t, err) + + url := "http://example.com" + hash := hex.EncodeToString([]byte(url)) + + secretsID, err := orm.Create(testutils.Context(t), url, hash, string(secretsPayload)) + require.NoError(t, err) + + _, err = orm.UpsertWorkflowSpec(testutils.Context(t), &job.WorkflowSpec{ + Workflow: "", + Config: "", + SecretsID: sql.NullInt64{Int64: secretsID, Valid: true}, + WorkflowID: workflowID, + WorkflowOwner: workflowOwner, + WorkflowName: workflowName, + BinaryURL: "", + ConfigURL: "", + CreatedAt: time.Now(), + SpecType: job.DefaultSpecType, + }) + require.NoError(t, err) + + fetcher := &mockFetcher{ + responseMap: map[string]mockFetchResp{ + url: mockFetchResp{ + Body: secretsPayload, + }, + }, + } + clock := clockwork.NewFakeClock() + registry := NewWorkflowRegistry( + lggr, + orm, + nil, // not needed + fetcher.Fetch, + "0x0", // not needed + store.NewDBStore(db, lggr, clockwork.NewFakeClock()), + capabilities.NewRegistry(lggr), + custmsg.NewLabeler(), + encryptionKey, + WithClock(clock), + ) + + gotSecrets, err := registry.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + require.NoError(t, err) + + expectedSecrets := map[string]string{ + "Foo": "Bar", + } + assert.Equal(t, gotSecrets, expectedSecrets) + + // Now stub out an unparseable response, since we already fetched it recently above, we shouldn't need to refetch + // SecretsFor should still succeed. + fetcher.responseMap[url] = mockFetchResp{} + + gotSecrets, err = registry.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + require.NoError(t, err) + + assert.Equal(t, gotSecrets, expectedSecrets) + + // Now advance so that we hit the freshness limit + clock.Advance(48 * time.Hour) + + gotSecrets, err = registry.SecretsFor(testutils.Context(t), workflowOwner, workflowName, workflowID) + assert.ErrorContains(t, err, "unexpected end of JSON input") +}