diff --git a/.changeset/tricky-panthers-rush.md b/.changeset/tricky-panthers-rush.md new file mode 100644 index 00000000000..52b35eaec97 --- /dev/null +++ b/.changeset/tricky-panthers-rush.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +#internal #bugfix Fix target wrapper init problems diff --git a/core/capabilities/registry.go b/core/capabilities/registry.go index 4e3877f0c5f..042e5dfca90 100644 --- a/core/capabilities/registry.go +++ b/core/capabilities/registry.go @@ -6,20 +6,15 @@ import ( "sync" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/logger" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) // Registry is a struct for the registry of capabilities. // Registry is safe for concurrent use. type Registry struct { - lggr logger.Logger - peerID p2ptypes.PeerID - don capabilities.DON - - m map[string]capabilities.BaseCapability - mu sync.RWMutex + lggr logger.Logger + m map[string]capabilities.BaseCapability + mu sync.RWMutex } // Get gets a capability from the registry. @@ -139,17 +134,6 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error if !ok { return fmt.Errorf("target capability does not satisfy TargetCapability interface") } - - capInfo, err := c.Info(ctx) - if err != nil { - return fmt.Errorf("failed to get info of target capability: %w", err) - } - - // If the DON is nil this is a local capability and requires wrapping in a local target transmission capability - if capInfo.DON == nil { - c = transmission.NewLocalTargetCapability(r.lggr, r.peerID, r.don, c.(capabilities.TargetCapability)) - } - default: return fmt.Errorf("unknown capability type: %s", info.CapabilityType) } @@ -166,11 +150,9 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error } // NewRegistry returns a new Registry. -func NewRegistry(lggr logger.Logger, peerID p2ptypes.PeerID, don capabilities.DON) *Registry { +func NewRegistry(lggr logger.Logger) *Registry { return &Registry{ - m: map[string]capabilities.BaseCapability{}, - lggr: lggr.Named("CapabilityRegistry"), - peerID: peerID, - don: don, + m: map[string]capabilities.BaseCapability{}, + lggr: lggr.Named("CapabilityRegistry"), } } diff --git a/core/capabilities/registry_test.go b/core/capabilities/registry_test.go index db366bf41ec..3bed31a957a 100644 --- a/core/capabilities/registry_test.go +++ b/core/capabilities/registry_test.go @@ -10,12 +10,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" - "github.com/smartcontractkit/chainlink-common/pkg/values" coreCapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) type mockCapability struct { @@ -37,7 +34,7 @@ func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request cap func TestRegistry(t *testing.T) { ctx := testutils.Context(t) - r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) + r := coreCapabilities.NewRegistry(logger.TestLogger(t)) id := "capability-1" ci, err := capabilities.NewCapabilityInfo( @@ -65,7 +62,7 @@ func TestRegistry(t *testing.T) { func TestRegistry_NoDuplicateIDs(t *testing.T) { ctx := testutils.Context(t) - r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) + r := coreCapabilities.NewRegistry(logger.TestLogger(t)) id := "capability-1" ci, err := capabilities.NewCapabilityInfo( @@ -176,7 +173,7 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) { } ctx := testutils.Context(t) - reg := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) + reg := coreCapabilities.NewRegistry(logger.TestLogger(t)) for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { id, err := tc.newCapability(ctx, reg) @@ -187,47 +184,3 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) { }) } } - -func TestRegistry_ReturnsLocalTargetCapabilityForLocalTargets(t *testing.T) { - ctx := testutils.Context(t) - r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) - - id := "capability-1" - ci, err := capabilities.NewRemoteCapabilityInfo( - id, - capabilities.CapabilityTypeTarget, - "capability-1-description", - "v1.0.0", - nil, - ) - require.NoError(t, err) - - c := &mockCapability{CapabilityInfo: ci} - err = r.Add(ctx, c) - require.NoError(t, err) - - targetCapability, err := r.GetTarget(ctx, id) - require.NoError(t, err) - - duffTransmissionSchedule, err := values.NewMap(map[string]any{ - "schedule": transmission.Schedule_AllAtOnce, - "deltaStage": "10banana", - }) - require.NoError(t, err) - - _, err = targetCapability.Execute(ctx, capabilities.CapabilityRequest{ - Config: duffTransmissionSchedule, - }) - assert.NotNil(t, err) - - validTransmissionSchedule, err := values.NewMap(map[string]any{ - "schedule": transmission.Schedule_OneAtATime, - "deltaStage": "10ms", - }) - require.NoError(t, err) - - _, err = targetCapability.Execute(ctx, capabilities.CapabilityRequest{ - Config: validTransmissionSchedule, - }) - assert.NoError(t, err) -} diff --git a/core/capabilities/syncer.go b/core/capabilities/syncer.go index 0227bfe1b46..1c3bbab8fe8 100644 --- a/core/capabilities/syncer.go +++ b/core/capabilities/syncer.go @@ -95,6 +95,11 @@ func (s *registrySyncer) launch(ctx context.Context) { RegistrationExpiryMs: 60000, MinResponsesToAggregate: uint32(s.networkSetup.TriggerCapabilityDonInfo.F) + 1, } + err = s.peerWrapper.GetPeer().UpdateConnections(s.networkSetup.allPeers) + if err != nil { + s.lggr.Errorw("failed to update connections", "error", err) + return + } if s.networkSetup.IsWorkflowDon(myId) { s.lggr.Info("member of a workflow DON - starting remote subscribers") codec := streams.NewCodec(s.lggr) @@ -197,12 +202,13 @@ type HardcodedDonNetworkSetup struct { workflowDonPeers []string triggerDonPeers []string triggerDonSigners []string + allPeers map[ragetypes.PeerID]p2ptypes.StreamConfig WorkflowsDonInfo capabilities.DON TriggerCapabilityDonInfo capabilities.DON } -func NewHardcodedDonNetworkSetup(peerWrapper p2ptypes.PeerWrapper) (HardcodedDonNetworkSetup, error) { +func NewHardcodedDonNetworkSetup() (HardcodedDonNetworkSetup, error) { result := HardcodedDonNetworkSetup{} result.workflowDonPeers = []string{ @@ -230,7 +236,7 @@ func NewHardcodedDonNetworkSetup(peerWrapper p2ptypes.PeerWrapper) (HardcodedDon "0x91d9b0062265514f012Eb8fABA59372fD9520f56", } - allPeers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) + result.allPeers = make(map[ragetypes.PeerID]p2ptypes.StreamConfig) addPeersToDONInfo := func(peers []string, donInfo *capabilities.DON) error { for _, peerID := range peers { var p ragetypes.PeerID @@ -238,7 +244,7 @@ func NewHardcodedDonNetworkSetup(peerWrapper p2ptypes.PeerWrapper) (HardcodedDon if err != nil { return err } - allPeers[p] = defaultStreamConfig + result.allPeers[p] = defaultStreamConfig donInfo.Members = append(donInfo.Members, p) } return nil @@ -251,10 +257,6 @@ func NewHardcodedDonNetworkSetup(peerWrapper p2ptypes.PeerWrapper) (HardcodedDon if err := addPeersToDONInfo(result.triggerDonPeers, &result.TriggerCapabilityDonInfo); err != nil { return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to add peers to trigger DON info: %w", err) } - err := peerWrapper.GetPeer().UpdateConnections(allPeers) - if err != nil { - return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to update connections: %w", err) - } return result, nil } diff --git a/core/capabilities/syncer_test.go b/core/capabilities/syncer_test.go index 0dfa49eeeb9..a654f303a95 100644 --- a/core/capabilities/syncer_test.go +++ b/core/capabilities/syncer_test.go @@ -32,7 +32,7 @@ func TestSyncer_CleanStartClose(t *testing.T) { dispatcher := remoteMocks.NewDispatcher(t) dispatcher.On("SetReceiver", mock.Anything, mock.Anything, mock.Anything).Return(nil) - networkSetup, err := coreCapabilities.NewHardcodedDonNetworkSetup(wrapper) + networkSetup, err := coreCapabilities.NewHardcodedDonNetworkSetup() require.NoError(t, err) syncer := coreCapabilities.NewRegistrySyncer(wrapper, registry, dispatcher, lggr, networkSetup) require.NoError(t, syncer.Start(ctx)) diff --git a/core/cmd/shell.go b/core/cmd/shell.go index 0ababfca58d..d6f99955e10 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -31,7 +31,6 @@ import ( "github.com/jmoiron/sqlx" - commoncapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" @@ -44,7 +43,6 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" @@ -168,7 +166,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(), }) - capabilitiesRegistry := capabilities.NewRegistry(appLggr, p2ptypes.PeerID{}, commoncapabilities.DON{}) + capabilitiesRegistry := capabilities.NewRegistry(appLggr) // create the relayer-chain interoperators from application configuration relayerFactory := chainlink.RelayerFactory{ diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index d796faa00c4..76e1472192f 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -17,7 +17,6 @@ import ( "go.uber.org/multierr" "go.uber.org/zap/zapcore" - commoncapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/loop" commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -201,6 +200,10 @@ func NewApplication(opts ApplicationOpts) (Application, error) { restrictedHTTPClient := opts.RestrictedHTTPClient unrestrictedHTTPClient := opts.UnrestrictedHTTPClient + if opts.CapabilitiesRegistry == nil { // for tests only, in prod Registry is always set at this point + opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger) + } + var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger) @@ -209,31 +212,16 @@ func NewApplication(opts ApplicationOpts) (Application, error) { srvcs = append(srvcs, externalPeerWrapper) - networkSetup, err := capabilities.NewHardcodedDonNetworkSetup(externalPeerWrapper) + networkSetup, err := capabilities.NewHardcodedDonNetworkSetup() if err != nil { return nil, fmt.Errorf("failed to create hardcoded Don network setup: %w", err) } - if opts.CapabilitiesRegistry == nil { - peerID := externalPeerWrapper.GetPeer().ID() - if networkSetup.IsWorkflowDon(peerID) { - opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger, peerID, networkSetup.WorkflowsDonInfo) - } else if networkSetup.IsTriggerDon(peerID) { - opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger, peerID, networkSetup.TriggerCapabilityDonInfo) - } else { - return nil, fmt.Errorf("peer %s is not a member of any known DON", peerID) - } - } - // NOTE: RegistrySyncer will depend on a Relayer when fully implemented dispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger) registrySyncer := capabilities.NewRegistrySyncer(externalPeerWrapper, opts.CapabilitiesRegistry, dispatcher, globalLogger, networkSetup) srvcs = append(srvcs, dispatcher, registrySyncer) - } else { - if opts.CapabilitiesRegistry == nil { - opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger, p2ptypes.PeerID{}, commoncapabilities.DON{}) - } } // LOOPs can be created as options, in the case of LOOP relayers, or diff --git a/core/services/job/spawner_test.go b/core/services/job/spawner_test.go index 8b98fd219fb..8ed08a1cb8a 100644 --- a/core/services/job/spawner_test.go +++ b/core/services/job/spawner_test.go @@ -12,7 +12,6 @@ import ( "github.com/jmoiron/sqlx" - commoncapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" @@ -20,7 +19,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox/mailboxtest" "github.com/smartcontractkit/chainlink/v2/core/capabilities" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/bridges" mocklp "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" @@ -298,7 +296,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { evmRelayer, err := evmrelayer.NewRelayer(lggr, chain, evmrelayer.RelayerOpts{ DS: db, CSAETHKeystore: keyStore, - CapabilitiesRegistry: capabilities.NewRegistry(lggr, p2ptypes.PeerID{}, commoncapabilities.DON{}), + CapabilitiesRegistry: capabilities.NewRegistry(lggr), }) assert.NoError(t, err) @@ -316,7 +314,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), processConfig) d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig, - keyStore.OCR2(), keyStore.DKGSign(), keyStore.DKGEncrypt(), ethKeyStore, testRelayGetter, mailMon, capabilities.NewRegistry(lggr, p2ptypes.PeerID{}, commoncapabilities.DON{})) + keyStore.OCR2(), keyStore.DKGSign(), keyStore.DKGEncrypt(), ethKeyStore, testRelayGetter, mailMon, capabilities.NewRegistry(lggr)) delegateOCR2 := &delegate{jobOCR2VRF.Type, []job.ServiceCtx{}, 0, nil, d} spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{ diff --git a/core/services/relay/evm/write_target_test.go b/core/services/relay/evm/write_target_test.go index b7a1199b5ed..76060dce990 100644 --- a/core/services/relay/evm/write_target_test.go +++ b/core/services/relay/evm/write_target_test.go @@ -8,7 +8,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -72,7 +71,7 @@ func TestEvmWrite(t *testing.T) { relayer, err := relayevm.NewRelayer(lggr, chain, relayevm.RelayerOpts{ DS: db, CSAETHKeystore: keyStore, - CapabilitiesRegistry: evmcapabilities.NewRegistry(lggr, p2ptypes.PeerID{}, capabilities.DON{}), + CapabilitiesRegistry: evmcapabilities.NewRegistry(lggr), }) require.NoError(t, err) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index cfa15905f29..6f616235632 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -15,6 +15,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink-common/pkg/workflows" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/logger" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" @@ -139,6 +140,20 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error { return fmt.Errorf("failed to get capability with ref %s: %s", step.ID, err) } + // Special treatment for local targets - wrap into a transmission capability + target, isTarget := cp.(capabilities.TargetCapability) + if isTarget { + capInfo, err2 := target.Info(ctx) + if err2 != nil { + return fmt.Errorf("failed to get info of target capability: %w", err2) + } + + // If the DON is nil this is a local target + if capInfo.DON == nil { + cp = transmission.NewLocalTargetCapability(e.logger, *e.donInfo.PeerID(), *e.donInfo.DON, target) + } + } + // We configure actions, consensus and targets here, and // they all satisfy the `CallbackCapability` interface cc, ok := cp.(capabilities.CallbackCapability) diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index aebb5c2066f..808e0b2555d 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -199,7 +199,7 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) + reg := coreCap.NewRegistry(logger.TestLogger(t)) trigger, cr := mockTrigger(t) @@ -386,7 +386,7 @@ func mockTarget() *mockCapability { func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) + reg := coreCap.NewRegistry(logger.TestLogger(t)) trigger, _ := mockTrigger(t) @@ -480,7 +480,7 @@ func mockAction() (*mockCapability, values.Value) { func TestEngine_MultiStepDependencies(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) + reg := coreCap.NewRegistry(logger.TestLogger(t)) trigger, cr := mockTrigger(t) @@ -523,7 +523,7 @@ func TestEngine_MultiStepDependencies(t *testing.T) { func TestEngine_ResumesPendingExecutions(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) + reg := coreCap.NewRegistry(logger.TestLogger(t)) trigger := mockNoopTrigger(t) resp, err := values.NewMap(map[string]any{ @@ -577,7 +577,7 @@ func TestEngine_ResumesPendingExecutions(t *testing.T) { func TestEngine_TimesOutOldExecutions(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) + reg := coreCap.NewRegistry(logger.TestLogger(t)) trigger := mockNoopTrigger(t) resp, err := values.NewMap(map[string]any{