Skip to content

Commit

Permalink
Fix target wrapper init problems (#13406)
Browse files Browse the repository at this point in the history
See comments on #13396
Registry can't easily access PeerID when it is being created.
This fix moves desired wrapping to the Engine.
  • Loading branch information
bolekk authored Jun 4, 2024
1 parent a2441ba commit a63569c
Show file tree
Hide file tree
Showing 11 changed files with 53 additions and 113 deletions.
5 changes: 5 additions & 0 deletions .changeset/tricky-panthers-rush.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": patch
---

#internal #bugfix Fix target wrapper init problems
30 changes: 6 additions & 24 deletions core/capabilities/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,20 +6,15 @@ import (
"sync"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

// Registry is a struct for the registry of capabilities.
// Registry is safe for concurrent use.
type Registry struct {
lggr logger.Logger
peerID p2ptypes.PeerID
don capabilities.DON

m map[string]capabilities.BaseCapability
mu sync.RWMutex
lggr logger.Logger
m map[string]capabilities.BaseCapability
mu sync.RWMutex
}

// Get gets a capability from the registry.
Expand Down Expand Up @@ -139,17 +134,6 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error
if !ok {
return fmt.Errorf("target capability does not satisfy TargetCapability interface")
}

capInfo, err := c.Info(ctx)
if err != nil {
return fmt.Errorf("failed to get info of target capability: %w", err)
}

// If the DON is nil this is a local capability and requires wrapping in a local target transmission capability
if capInfo.DON == nil {
c = transmission.NewLocalTargetCapability(r.lggr, r.peerID, r.don, c.(capabilities.TargetCapability))
}

default:
return fmt.Errorf("unknown capability type: %s", info.CapabilityType)
}
Expand All @@ -166,11 +150,9 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error
}

// NewRegistry returns a new Registry.
func NewRegistry(lggr logger.Logger, peerID p2ptypes.PeerID, don capabilities.DON) *Registry {
func NewRegistry(lggr logger.Logger) *Registry {
return &Registry{
m: map[string]capabilities.BaseCapability{},
lggr: lggr.Named("CapabilityRegistry"),
peerID: peerID,
don: don,
m: map[string]capabilities.BaseCapability{},
lggr: lggr.Named("CapabilityRegistry"),
}
}
53 changes: 3 additions & 50 deletions core/capabilities/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,12 +10,9 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers"
"github.com/smartcontractkit/chainlink-common/pkg/values"
coreCapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
)

type mockCapability struct {
Expand All @@ -37,7 +34,7 @@ func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request cap
func TestRegistry(t *testing.T) {
ctx := testutils.Context(t)

r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{})
r := coreCapabilities.NewRegistry(logger.TestLogger(t))

id := "capability-1"
ci, err := capabilities.NewCapabilityInfo(
Expand Down Expand Up @@ -65,7 +62,7 @@ func TestRegistry(t *testing.T) {

func TestRegistry_NoDuplicateIDs(t *testing.T) {
ctx := testutils.Context(t)
r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{})
r := coreCapabilities.NewRegistry(logger.TestLogger(t))

id := "capability-1"
ci, err := capabilities.NewCapabilityInfo(
Expand Down Expand Up @@ -176,7 +173,7 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
}

ctx := testutils.Context(t)
reg := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{})
reg := coreCapabilities.NewRegistry(logger.TestLogger(t))
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
id, err := tc.newCapability(ctx, reg)
Expand All @@ -187,47 +184,3 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) {
})
}
}

func TestRegistry_ReturnsLocalTargetCapabilityForLocalTargets(t *testing.T) {
ctx := testutils.Context(t)
r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{})

id := "capability-1"
ci, err := capabilities.NewRemoteCapabilityInfo(
id,
capabilities.CapabilityTypeTarget,
"capability-1-description",
"v1.0.0",
nil,
)
require.NoError(t, err)

c := &mockCapability{CapabilityInfo: ci}
err = r.Add(ctx, c)
require.NoError(t, err)

targetCapability, err := r.GetTarget(ctx, id)
require.NoError(t, err)

duffTransmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_AllAtOnce,
"deltaStage": "10banana",
})
require.NoError(t, err)

_, err = targetCapability.Execute(ctx, capabilities.CapabilityRequest{
Config: duffTransmissionSchedule,
})
assert.NotNil(t, err)

validTransmissionSchedule, err := values.NewMap(map[string]any{
"schedule": transmission.Schedule_OneAtATime,
"deltaStage": "10ms",
})
require.NoError(t, err)

_, err = targetCapability.Execute(ctx, capabilities.CapabilityRequest{
Config: validTransmissionSchedule,
})
assert.NoError(t, err)
}
16 changes: 9 additions & 7 deletions core/capabilities/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,11 @@ func (s *registrySyncer) launch(ctx context.Context) {
RegistrationExpiryMs: 60000,
MinResponsesToAggregate: uint32(s.networkSetup.TriggerCapabilityDonInfo.F) + 1,
}
err = s.peerWrapper.GetPeer().UpdateConnections(s.networkSetup.allPeers)
if err != nil {
s.lggr.Errorw("failed to update connections", "error", err)
return
}
if s.networkSetup.IsWorkflowDon(myId) {
s.lggr.Info("member of a workflow DON - starting remote subscribers")
codec := streams.NewCodec(s.lggr)
Expand Down Expand Up @@ -197,12 +202,13 @@ type HardcodedDonNetworkSetup struct {
workflowDonPeers []string
triggerDonPeers []string
triggerDonSigners []string
allPeers map[ragetypes.PeerID]p2ptypes.StreamConfig

WorkflowsDonInfo capabilities.DON
TriggerCapabilityDonInfo capabilities.DON
}

func NewHardcodedDonNetworkSetup(peerWrapper p2ptypes.PeerWrapper) (HardcodedDonNetworkSetup, error) {
func NewHardcodedDonNetworkSetup() (HardcodedDonNetworkSetup, error) {
result := HardcodedDonNetworkSetup{}

result.workflowDonPeers = []string{
Expand Down Expand Up @@ -230,15 +236,15 @@ func NewHardcodedDonNetworkSetup(peerWrapper p2ptypes.PeerWrapper) (HardcodedDon
"0x91d9b0062265514f012Eb8fABA59372fD9520f56",
}

allPeers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig)
result.allPeers = make(map[ragetypes.PeerID]p2ptypes.StreamConfig)
addPeersToDONInfo := func(peers []string, donInfo *capabilities.DON) error {
for _, peerID := range peers {
var p ragetypes.PeerID
err := p.UnmarshalText([]byte(peerID))
if err != nil {
return err
}
allPeers[p] = defaultStreamConfig
result.allPeers[p] = defaultStreamConfig
donInfo.Members = append(donInfo.Members, p)
}
return nil
Expand All @@ -251,10 +257,6 @@ func NewHardcodedDonNetworkSetup(peerWrapper p2ptypes.PeerWrapper) (HardcodedDon
if err := addPeersToDONInfo(result.triggerDonPeers, &result.TriggerCapabilityDonInfo); err != nil {
return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to add peers to trigger DON info: %w", err)
}
err := peerWrapper.GetPeer().UpdateConnections(allPeers)
if err != nil {
return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to update connections: %w", err)
}

return result, nil
}
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ func TestSyncer_CleanStartClose(t *testing.T) {
dispatcher := remoteMocks.NewDispatcher(t)
dispatcher.On("SetReceiver", mock.Anything, mock.Anything, mock.Anything).Return(nil)

networkSetup, err := coreCapabilities.NewHardcodedDonNetworkSetup(wrapper)
networkSetup, err := coreCapabilities.NewHardcodedDonNetworkSetup()
require.NoError(t, err)
syncer := coreCapabilities.NewRegistrySyncer(wrapper, registry, dispatcher, lggr, networkSetup)
require.NoError(t, syncer.Start(ctx))
Expand Down
4 changes: 1 addition & 3 deletions core/cmd/shell.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,6 @@ import (

"github.com/jmoiron/sqlx"

commoncapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox"
Expand All @@ -44,7 +43,6 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/services"
"github.com/smartcontractkit/chainlink/v2/core/services/chainlink"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache"
Expand Down Expand Up @@ -168,7 +166,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G
LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(),
})

capabilitiesRegistry := capabilities.NewRegistry(appLggr, p2ptypes.PeerID{}, commoncapabilities.DON{})
capabilitiesRegistry := capabilities.NewRegistry(appLggr)

// create the relayer-chain interoperators from application configuration
relayerFactory := chainlink.RelayerFactory{
Expand Down
22 changes: 5 additions & 17 deletions core/services/chainlink/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import (
"go.uber.org/multierr"
"go.uber.org/zap/zapcore"

commoncapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
commonservices "github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/sqlutil"
Expand Down Expand Up @@ -201,6 +200,10 @@ func NewApplication(opts ApplicationOpts) (Application, error) {
restrictedHTTPClient := opts.RestrictedHTTPClient
unrestrictedHTTPClient := opts.UnrestrictedHTTPClient

if opts.CapabilitiesRegistry == nil { // for tests only, in prod Registry is always set at this point
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger)
}

var externalPeerWrapper p2ptypes.PeerWrapper
if cfg.Capabilities().Peering().Enabled() {
externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger)
Expand All @@ -209,31 +212,16 @@ func NewApplication(opts ApplicationOpts) (Application, error) {

srvcs = append(srvcs, externalPeerWrapper)

networkSetup, err := capabilities.NewHardcodedDonNetworkSetup(externalPeerWrapper)
networkSetup, err := capabilities.NewHardcodedDonNetworkSetup()
if err != nil {
return nil, fmt.Errorf("failed to create hardcoded Don network setup: %w", err)
}

if opts.CapabilitiesRegistry == nil {
peerID := externalPeerWrapper.GetPeer().ID()
if networkSetup.IsWorkflowDon(peerID) {
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger, peerID, networkSetup.WorkflowsDonInfo)
} else if networkSetup.IsTriggerDon(peerID) {
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger, peerID, networkSetup.TriggerCapabilityDonInfo)
} else {
return nil, fmt.Errorf("peer %s is not a member of any known DON", peerID)
}
}

// NOTE: RegistrySyncer will depend on a Relayer when fully implemented
dispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger)
registrySyncer := capabilities.NewRegistrySyncer(externalPeerWrapper, opts.CapabilitiesRegistry, dispatcher, globalLogger, networkSetup)

srvcs = append(srvcs, dispatcher, registrySyncer)
} else {
if opts.CapabilitiesRegistry == nil {
opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger, p2ptypes.PeerID{}, commoncapabilities.DON{})
}
}

// LOOPs can be created as options, in the case of LOOP relayers, or
Expand Down
6 changes: 2 additions & 4 deletions core/services/job/spawner_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,13 @@ import (

"github.com/jmoiron/sqlx"

commoncapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/loop"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/services/servicetest"
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils"
"github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox/mailboxtest"
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
mocklp "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
Expand Down Expand Up @@ -298,7 +296,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) {
evmRelayer, err := evmrelayer.NewRelayer(lggr, chain, evmrelayer.RelayerOpts{
DS: db,
CSAETHKeystore: keyStore,
CapabilitiesRegistry: capabilities.NewRegistry(lggr, p2ptypes.PeerID{}, commoncapabilities.DON{}),
CapabilitiesRegistry: capabilities.NewRegistry(lggr),
})
assert.NoError(t, err)

Expand All @@ -316,7 +314,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) {
ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), processConfig)

d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig,
keyStore.OCR2(), keyStore.DKGSign(), keyStore.DKGEncrypt(), ethKeyStore, testRelayGetter, mailMon, capabilities.NewRegistry(lggr, p2ptypes.PeerID{}, commoncapabilities.DON{}))
keyStore.OCR2(), keyStore.DKGSign(), keyStore.DKGEncrypt(), ethKeyStore, testRelayGetter, mailMon, capabilities.NewRegistry(lggr))
delegateOCR2 := &delegate{jobOCR2VRF.Type, []job.ServiceCtx{}, 0, nil, d}

spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{
Expand Down
3 changes: 1 addition & 2 deletions core/services/relay/evm/write_target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/relay/evm"

"github.com/smartcontractkit/chainlink-common/pkg/capabilities"
Expand Down Expand Up @@ -72,7 +71,7 @@ func TestEvmWrite(t *testing.T) {
relayer, err := relayevm.NewRelayer(lggr, chain, relayevm.RelayerOpts{
DS: db,
CSAETHKeystore: keyStore,
CapabilitiesRegistry: evmcapabilities.NewRegistry(lggr, p2ptypes.PeerID{}, capabilities.DON{}),
CapabilitiesRegistry: evmcapabilities.NewRegistry(lggr),
})
require.NoError(t, err)

Expand Down
15 changes: 15 additions & 0 deletions core/services/workflows/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types/core"
"github.com/smartcontractkit/chainlink-common/pkg/values"
"github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission"
"github.com/smartcontractkit/chainlink/v2/core/logger"
p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types"
"github.com/smartcontractkit/chainlink/v2/core/services/workflows/store"
Expand Down Expand Up @@ -139,6 +140,20 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error {
return fmt.Errorf("failed to get capability with ref %s: %s", step.ID, err)
}

// Special treatment for local targets - wrap into a transmission capability
target, isTarget := cp.(capabilities.TargetCapability)
if isTarget {
capInfo, err2 := target.Info(ctx)
if err2 != nil {
return fmt.Errorf("failed to get info of target capability: %w", err2)
}

// If the DON is nil this is a local target
if capInfo.DON == nil {
cp = transmission.NewLocalTargetCapability(e.logger, *e.donInfo.PeerID(), *e.donInfo.DON, target)
}
}

// We configure actions, consensus and targets here, and
// they all satisfy the `CallbackCapability` interface
cc, ok := cp.(capabilities.CallbackCapability)
Expand Down
Loading

0 comments on commit a63569c

Please sign in to comment.