From 5ff32bde2575d2b2ef626a69d54b9f6eed0d21c5 Mon Sep 17 00:00:00 2001 From: Matthew Pendrey Date: Mon, 3 Jun 2024 17:45:59 +0100 Subject: [PATCH] ks-265 wrap local target in capability registry with transmission logic (#13396) * wrap local target in capability registry with transmission logic * review comments * review comments * test fix --- .changeset/fast-kings-compete.md | 5 + core/capabilities/registry.go | 30 ++- core/capabilities/registry_test.go | 53 ++++- core/capabilities/syncer.go | 182 ++++++++++-------- core/capabilities/syncer_test.go | 4 +- .../transmission/local_target_capability.go | 59 ++++++ .../local_target_capability_test.go} | 56 +++++- .../capabilities/transmission/transmission.go | 2 - core/cmd/shell.go | 4 +- core/services/chainlink/application.go | 28 ++- core/services/job/spawner_test.go | 6 +- core/services/relay/evm/write_target_test.go | 3 +- core/services/workflows/engine.go | 77 ++------ core/services/workflows/engine_test.go | 10 +- core/services/workflows/execution_strategy.go | 95 --------- core/services/workflows/models.go | 5 +- 16 files changed, 351 insertions(+), 268 deletions(-) create mode 100644 .changeset/fast-kings-compete.md create mode 100644 core/capabilities/transmission/local_target_capability.go rename core/{services/workflows/execution_strategy_test.go => capabilities/transmission/local_target_capability_test.go} (73%) delete mode 100644 core/services/workflows/execution_strategy.go diff --git a/.changeset/fast-kings-compete.md b/.changeset/fast-kings-compete.md new file mode 100644 index 00000000000..941e8a802c3 --- /dev/null +++ b/.changeset/fast-kings-compete.md @@ -0,0 +1,5 @@ +--- +"chainlink": minor +--- + +#internal keystone: handle local target transmission logic in capability wrapper diff --git a/core/capabilities/registry.go b/core/capabilities/registry.go index 3c7bdf2c971..4e3877f0c5f 100644 --- a/core/capabilities/registry.go +++ b/core/capabilities/registry.go @@ -6,15 +6,20 @@ import ( "sync" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) // Registry is a struct for the registry of capabilities. // Registry is safe for concurrent use. type Registry struct { - m map[string]capabilities.BaseCapability - mu sync.RWMutex - lggr logger.Logger + lggr logger.Logger + peerID p2ptypes.PeerID + don capabilities.DON + + m map[string]capabilities.BaseCapability + mu sync.RWMutex } // Get gets a capability from the registry. @@ -134,6 +139,17 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error if !ok { return fmt.Errorf("target capability does not satisfy TargetCapability interface") } + + capInfo, err := c.Info(ctx) + if err != nil { + return fmt.Errorf("failed to get info of target capability: %w", err) + } + + // If the DON is nil this is a local capability and requires wrapping in a local target transmission capability + if capInfo.DON == nil { + c = transmission.NewLocalTargetCapability(r.lggr, r.peerID, r.don, c.(capabilities.TargetCapability)) + } + default: return fmt.Errorf("unknown capability type: %s", info.CapabilityType) } @@ -150,9 +166,11 @@ func (r *Registry) Add(ctx context.Context, c capabilities.BaseCapability) error } // NewRegistry returns a new Registry. -func NewRegistry(lggr logger.Logger) *Registry { +func NewRegistry(lggr logger.Logger, peerID p2ptypes.PeerID, don capabilities.DON) *Registry { return &Registry{ - m: map[string]capabilities.BaseCapability{}, - lggr: lggr.Named("CapabilityRegistry"), + m: map[string]capabilities.BaseCapability{}, + lggr: lggr.Named("CapabilityRegistry"), + peerID: peerID, + don: don, } } diff --git a/core/capabilities/registry_test.go b/core/capabilities/registry_test.go index 3bed31a957a..db366bf41ec 100644 --- a/core/capabilities/registry_test.go +++ b/core/capabilities/registry_test.go @@ -10,9 +10,12 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" + "github.com/smartcontractkit/chainlink-common/pkg/values" coreCapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) type mockCapability struct { @@ -34,7 +37,7 @@ func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request cap func TestRegistry(t *testing.T) { ctx := testutils.Context(t) - r := coreCapabilities.NewRegistry(logger.TestLogger(t)) + r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) id := "capability-1" ci, err := capabilities.NewCapabilityInfo( @@ -62,7 +65,7 @@ func TestRegistry(t *testing.T) { func TestRegistry_NoDuplicateIDs(t *testing.T) { ctx := testutils.Context(t) - r := coreCapabilities.NewRegistry(logger.TestLogger(t)) + r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) id := "capability-1" ci, err := capabilities.NewCapabilityInfo( @@ -173,7 +176,7 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) { } ctx := testutils.Context(t) - reg := coreCapabilities.NewRegistry(logger.TestLogger(t)) + reg := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) for _, tc := range tcs { t.Run(tc.name, func(t *testing.T) { id, err := tc.newCapability(ctx, reg) @@ -184,3 +187,47 @@ func TestRegistry_ChecksExecutionAPIByType(t *testing.T) { }) } } + +func TestRegistry_ReturnsLocalTargetCapabilityForLocalTargets(t *testing.T) { + ctx := testutils.Context(t) + r := coreCapabilities.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) + + id := "capability-1" + ci, err := capabilities.NewRemoteCapabilityInfo( + id, + capabilities.CapabilityTypeTarget, + "capability-1-description", + "v1.0.0", + nil, + ) + require.NoError(t, err) + + c := &mockCapability{CapabilityInfo: ci} + err = r.Add(ctx, c) + require.NoError(t, err) + + targetCapability, err := r.GetTarget(ctx, id) + require.NoError(t, err) + + duffTransmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_AllAtOnce, + "deltaStage": "10banana", + }) + require.NoError(t, err) + + _, err = targetCapability.Execute(ctx, capabilities.CapabilityRequest{ + Config: duffTransmissionSchedule, + }) + assert.NotNil(t, err) + + validTransmissionSchedule, err := values.NewMap(map[string]any{ + "schedule": transmission.Schedule_OneAtATime, + "deltaStage": "10ms", + }) + require.NoError(t, err) + + _, err = targetCapability.Execute(ctx, capabilities.CapabilityRequest{ + Config: validTransmissionSchedule, + }) + assert.NoError(t, err) +} diff --git a/core/capabilities/syncer.go b/core/capabilities/syncer.go index 02a21043d3a..0227bfe1b46 100644 --- a/core/capabilities/syncer.go +++ b/core/capabilities/syncer.go @@ -3,6 +3,7 @@ package capabilities import ( "context" "encoding/hex" + "fmt" "math/big" "slices" "sync" @@ -25,12 +26,14 @@ import ( ) type registrySyncer struct { - peerWrapper p2ptypes.PeerWrapper - registry core.CapabilitiesRegistry - dispatcher remotetypes.Dispatcher - subServices []services.Service - wg sync.WaitGroup - lggr logger.Logger + peerWrapper p2ptypes.PeerWrapper + registry core.CapabilitiesRegistry + dispatcher remotetypes.Dispatcher + subServices []services.Service + networkSetup HardcodedDonNetworkSetup + + wg sync.WaitGroup + lggr logger.Logger } var _ services.Service = ®istrySyncer{} @@ -52,12 +55,14 @@ var defaultStreamConfig = p2ptypes.StreamConfig{ const maxRetryCount = 60 // RegistrySyncer updates local Registry to match its onchain counterpart -func NewRegistrySyncer(peerWrapper p2ptypes.PeerWrapper, registry core.CapabilitiesRegistry, dispatcher remotetypes.Dispatcher, lggr logger.Logger) *registrySyncer { +func NewRegistrySyncer(peerWrapper p2ptypes.PeerWrapper, registry core.CapabilitiesRegistry, dispatcher remotetypes.Dispatcher, lggr logger.Logger, + networkSetup HardcodedDonNetworkSetup) *registrySyncer { return ®istrySyncer{ - peerWrapper: peerWrapper, - registry: registry, - dispatcher: dispatcher, - lggr: lggr, + peerWrapper: peerWrapper, + registry: registry, + dispatcher: dispatcher, + networkSetup: networkSetup, + lggr: lggr, } } @@ -71,97 +76,43 @@ func (s *registrySyncer) Start(ctx context.Context) error { // that reads the configuration from chain (KS-117). func (s *registrySyncer) launch(ctx context.Context) { defer s.wg.Done() - // NOTE: temporary hard-coded DONs - workflowDONPeers := []string{ - "12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N", - "12D3KooWG1AyvwmCpZ93J8pBQUE1SuzrjDXnT4BeouncHR3jWLCG", - "12D3KooWGeUKZBRMbx27FUTgBwZa9Ap9Ym92mywwpuqkEtz8XWyv", - "12D3KooW9zYWQv3STmDeNDidyzxsJSTxoCTLicafgfeEz9nhwhC4", - } - triggerDONPeers := []string{ - "12D3KooWBaiTbbRwwt2fbNifiL7Ew9tn3vds9AJE3Nf3eaVBX36m", - "12D3KooWS7JSY9fzSfWgbCE1S3W2LNY6ZVpRuun74moVBkKj6utE", - "12D3KooWMMTDXcWhpVnwrdAer1jnVARTmnr3RyT3v7Djg8ZuoBh9", - "12D3KooWGzVXsKxXsF4zLgxSDM8Gzx1ywq2pZef4PrHMKuVg4K3P", - "12D3KooWSyjmmzjVtCzwN7bXzZQFmWiJRuVcKBerNjVgL7HdLJBW", - "12D3KooWLGz9gzhrNsvyM6XnXS3JRkZoQdEzuAvysovnSChNK5ZK", - "12D3KooWAvZnvknFAfSiUYjATyhzEJLTeKvAzpcLELHi4ogM3GET", - } - triggerDONSigners := []string{ - "0x9CcE7293a4Cc2621b61193135A95928735e4795F", - "0x3c775F20bCB2108C1A818741Ce332Bb5fe0dB925", - "0x50314239e2CF05555ceeD53E7F47eB2A8Eab0dbB", - "0xd76A4f98898c3b9A72b244476d7337b50D54BCd8", - "0x656A873f6895b8a03Fb112dE927d43FA54B2c92A", - "0x5d1e87d87bF2e0cD4Ea64F381a2dbF45e5f0a553", - "0x91d9b0062265514f012Eb8fABA59372fD9520f56", - } - allPeers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) - addPeersToDONInfo := func(peers []string, donInfo *capabilities.DON) error { - for _, peerID := range peers { - var p ragetypes.PeerID - err := p.UnmarshalText([]byte(peerID)) - if err != nil { - return err - } - allPeers[p] = defaultStreamConfig - donInfo.Members = append(donInfo.Members, p) - } - return nil - } - workflowDonInfo := capabilities.DON{ID: "workflowDon1", F: 1} - if err := addPeersToDONInfo(workflowDONPeers, &workflowDonInfo); err != nil { - s.lggr.Errorw("failed to add peers to workflow DON info", "error", err) - return - } - triggerCapabilityDonInfo := capabilities.DON{ID: "capabilityDon1", F: 1} // NOTE: misconfiguration - should be 2 - if err := addPeersToDONInfo(triggerDONPeers, &triggerCapabilityDonInfo); err != nil { - s.lggr.Errorw("failed to add peers to trigger DON info", "error", err) - return - } - err := s.peerWrapper.GetPeer().UpdateConnections(allPeers) - if err != nil { - s.lggr.Errorw("failed to update connections", "error", err) - return - } - // NOTE: temporary hard-coded capabilities capId := "streams-trigger" triggerInfo, err := capabilities.NewRemoteCapabilityInfo( capId, capabilities.CapabilityTypeTrigger, "Remote Trigger", "v0.0.1", - &triggerCapabilityDonInfo, + &s.networkSetup.TriggerCapabilityDonInfo, ) if err != nil { s.lggr.Errorw("failed to create capability info for streams-trigger", "error", err) return } - myId := s.peerWrapper.GetPeer().ID().String() + myId := s.peerWrapper.GetPeer().ID() config := remotetypes.RemoteTriggerConfig{ RegistrationRefreshMs: 20000, RegistrationExpiryMs: 60000, - MinResponsesToAggregate: uint32(triggerCapabilityDonInfo.F) + 1, + MinResponsesToAggregate: uint32(s.networkSetup.TriggerCapabilityDonInfo.F) + 1, } - if slices.Contains(workflowDONPeers, myId) { + if s.networkSetup.IsWorkflowDon(myId) { s.lggr.Info("member of a workflow DON - starting remote subscribers") codec := streams.NewCodec(s.lggr) - aggregator := triggers.NewMercuryRemoteAggregator(codec, hexStringsToBytes(triggerDONSigners), int(triggerCapabilityDonInfo.F+1), s.lggr) - triggerCap := remote.NewTriggerSubscriber(config, triggerInfo, triggerCapabilityDonInfo, workflowDonInfo, s.dispatcher, aggregator, s.lggr) + aggregator := triggers.NewMercuryRemoteAggregator(codec, hexStringsToBytes(s.networkSetup.triggerDonSigners), int(s.networkSetup.TriggerCapabilityDonInfo.F+1), s.lggr) + triggerCap := remote.NewTriggerSubscriber(config, triggerInfo, s.networkSetup.TriggerCapabilityDonInfo, s.networkSetup.WorkflowsDonInfo, s.dispatcher, aggregator, s.lggr) err = s.registry.Add(ctx, triggerCap) if err != nil { s.lggr.Errorw("failed to add remote target capability to registry", "error", err) return } - err = s.dispatcher.SetReceiver(capId, triggerCapabilityDonInfo.ID, triggerCap) + err = s.dispatcher.SetReceiver(capId, s.networkSetup.TriggerCapabilityDonInfo.ID, triggerCap) if err != nil { - s.lggr.Errorw("workflow DON failed to set receiver", "capabilityId", capId, "donId", triggerCapabilityDonInfo.ID, "error", err) + s.lggr.Errorw("workflow DON failed to set receiver", "capabilityId", capId, "donId", s.networkSetup.TriggerCapabilityDonInfo.ID, "error", err) return } s.subServices = append(s.subServices, triggerCap) } - if slices.Contains(triggerDONPeers, myId) { + if s.networkSetup.IsTriggerDon(myId) { s.lggr.Info("member of a capability DON - starting remote publishers") /*{ @@ -195,12 +146,12 @@ func (s *registrySyncer) launch(ctx context.Context) { continue } workflowDONs := map[string]capabilities.DON{ - workflowDonInfo.ID: workflowDonInfo, + s.networkSetup.WorkflowsDonInfo.ID: s.networkSetup.WorkflowsDonInfo, } - triggerCap := remote.NewTriggerPublisher(config, underlying, triggerInfo, triggerCapabilityDonInfo, workflowDONs, s.dispatcher, s.lggr) - err = s.dispatcher.SetReceiver(capId, triggerCapabilityDonInfo.ID, triggerCap) + triggerCap := remote.NewTriggerPublisher(config, underlying, triggerInfo, s.networkSetup.TriggerCapabilityDonInfo, workflowDONs, s.dispatcher, s.lggr) + err = s.dispatcher.SetReceiver(capId, s.networkSetup.TriggerCapabilityDonInfo.ID, triggerCap) if err != nil { - s.lggr.Errorw("capability DON failed to set receiver", "capabilityId", capId, "donId", triggerCapabilityDonInfo.ID, "error", err) + s.lggr.Errorw("capability DON failed to set receiver", "capabilityId", capId, "donId", s.networkSetup.TriggerCapabilityDonInfo.ID, "error", err) return } s.subServices = append(s.subServices, triggerCap) @@ -241,6 +192,81 @@ func (s *registrySyncer) Name() string { return "RegistrySyncer" } +// HardcodedDonNetworkSetup is a temporary setup for testing purposes +type HardcodedDonNetworkSetup struct { + workflowDonPeers []string + triggerDonPeers []string + triggerDonSigners []string + + WorkflowsDonInfo capabilities.DON + TriggerCapabilityDonInfo capabilities.DON +} + +func NewHardcodedDonNetworkSetup(peerWrapper p2ptypes.PeerWrapper) (HardcodedDonNetworkSetup, error) { + result := HardcodedDonNetworkSetup{} + + result.workflowDonPeers = []string{ + "12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N", + "12D3KooWG1AyvwmCpZ93J8pBQUE1SuzrjDXnT4BeouncHR3jWLCG", + "12D3KooWGeUKZBRMbx27FUTgBwZa9Ap9Ym92mywwpuqkEtz8XWyv", + "12D3KooW9zYWQv3STmDeNDidyzxsJSTxoCTLicafgfeEz9nhwhC4", + } + result.triggerDonPeers = []string{ + "12D3KooWBaiTbbRwwt2fbNifiL7Ew9tn3vds9AJE3Nf3eaVBX36m", + "12D3KooWS7JSY9fzSfWgbCE1S3W2LNY6ZVpRuun74moVBkKj6utE", + "12D3KooWMMTDXcWhpVnwrdAer1jnVARTmnr3RyT3v7Djg8ZuoBh9", + "12D3KooWGzVXsKxXsF4zLgxSDM8Gzx1ywq2pZef4PrHMKuVg4K3P", + "12D3KooWSyjmmzjVtCzwN7bXzZQFmWiJRuVcKBerNjVgL7HdLJBW", + "12D3KooWLGz9gzhrNsvyM6XnXS3JRkZoQdEzuAvysovnSChNK5ZK", + "12D3KooWAvZnvknFAfSiUYjATyhzEJLTeKvAzpcLELHi4ogM3GET", + } + result.triggerDonSigners = []string{ + "0x9CcE7293a4Cc2621b61193135A95928735e4795F", + "0x3c775F20bCB2108C1A818741Ce332Bb5fe0dB925", + "0x50314239e2CF05555ceeD53E7F47eB2A8Eab0dbB", + "0xd76A4f98898c3b9A72b244476d7337b50D54BCd8", + "0x656A873f6895b8a03Fb112dE927d43FA54B2c92A", + "0x5d1e87d87bF2e0cD4Ea64F381a2dbF45e5f0a553", + "0x91d9b0062265514f012Eb8fABA59372fD9520f56", + } + + allPeers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) + addPeersToDONInfo := func(peers []string, donInfo *capabilities.DON) error { + for _, peerID := range peers { + var p ragetypes.PeerID + err := p.UnmarshalText([]byte(peerID)) + if err != nil { + return err + } + allPeers[p] = defaultStreamConfig + donInfo.Members = append(donInfo.Members, p) + } + return nil + } + result.WorkflowsDonInfo = capabilities.DON{ID: "workflowDon1", F: 1} + if err := addPeersToDONInfo(result.workflowDonPeers, &result.WorkflowsDonInfo); err != nil { + return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to add peers to workflow DON info: %w", err) + } + result.TriggerCapabilityDonInfo = capabilities.DON{ID: "capabilityDon1", F: 1} // NOTE: misconfiguration - should be 2 + if err := addPeersToDONInfo(result.triggerDonPeers, &result.TriggerCapabilityDonInfo); err != nil { + return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to add peers to trigger DON info: %w", err) + } + err := peerWrapper.GetPeer().UpdateConnections(allPeers) + if err != nil { + return HardcodedDonNetworkSetup{}, fmt.Errorf("failed to update connections: %w", err) + } + + return result, nil +} + +func (h HardcodedDonNetworkSetup) IsWorkflowDon(id p2ptypes.PeerID) bool { + return slices.Contains(h.workflowDonPeers, id.String()) +} + +func (h HardcodedDonNetworkSetup) IsTriggerDon(id p2ptypes.PeerID) bool { + return slices.Contains(h.triggerDonPeers, id.String()) +} + type mockMercuryDataProducer struct { trigger *triggers.MercuryTriggerService wg sync.WaitGroup diff --git a/core/capabilities/syncer_test.go b/core/capabilities/syncer_test.go index 757135635d8..0dfa49eeeb9 100644 --- a/core/capabilities/syncer_test.go +++ b/core/capabilities/syncer_test.go @@ -32,7 +32,9 @@ func TestSyncer_CleanStartClose(t *testing.T) { dispatcher := remoteMocks.NewDispatcher(t) dispatcher.On("SetReceiver", mock.Anything, mock.Anything, mock.Anything).Return(nil) - syncer := coreCapabilities.NewRegistrySyncer(wrapper, registry, dispatcher, lggr) + networkSetup, err := coreCapabilities.NewHardcodedDonNetworkSetup(wrapper) + require.NoError(t, err) + syncer := coreCapabilities.NewRegistrySyncer(wrapper, registry, dispatcher, lggr, networkSetup) require.NoError(t, syncer.Start(ctx)) require.NoError(t, syncer.Close()) } diff --git a/core/capabilities/transmission/local_target_capability.go b/core/capabilities/transmission/local_target_capability.go new file mode 100644 index 00000000000..4fddd93d403 --- /dev/null +++ b/core/capabilities/transmission/local_target_capability.go @@ -0,0 +1,59 @@ +package transmission + +import ( + "context" + "fmt" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +// LocalTargetCapability handles the transmission protocol required for a target capability that exists in the same don as +// the caller. +type LocalTargetCapability struct { + lggr logger.Logger + capabilities.TargetCapability + peerID p2ptypes.PeerID + don capabilities.DON +} + +func NewLocalTargetCapability(lggr logger.Logger, peerID p2ptypes.PeerID, don capabilities.DON, underlying capabilities.TargetCapability) *LocalTargetCapability { + return &LocalTargetCapability{ + TargetCapability: underlying, + lggr: lggr, + peerID: peerID, + don: don, + } +} + +func (l *LocalTargetCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { + if req.Config == nil || req.Config.Underlying["schedule"] == nil { + l.lggr.Debug("no schedule found, executing immediately") + return l.TargetCapability.Execute(ctx, req) + } + + tc, err := ExtractTransmissionConfig(req.Config) + if err != nil { + return nil, fmt.Errorf("failed to extract transmission config from request config: %w", err) + } + + peerIDToTransmissionDelay, err := GetPeerIDToTransmissionDelay(l.don.Members, l.don.Config.SharedSecret, + req.Metadata.WorkflowID+req.Metadata.WorkflowExecutionID, tc) + if err != nil { + return nil, fmt.Errorf("failed to get peer ID to transmission delay map: %w", err) + } + + delay, existsForPeerID := peerIDToTransmissionDelay[l.peerID] + if !existsForPeerID { + return nil, nil + } + + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(delay): + return l.TargetCapability.Execute(ctx, req) + } +} diff --git a/core/services/workflows/execution_strategy_test.go b/core/capabilities/transmission/local_target_capability_test.go similarity index 73% rename from core/services/workflows/execution_strategy_test.go rename to core/capabilities/transmission/local_target_capability_test.go index 917ea84c72c..19d51b492ff 100644 --- a/core/services/workflows/execution_strategy_test.go +++ b/core/capabilities/transmission/local_target_capability_test.go @@ -1,6 +1,7 @@ -package workflows +package transmission import ( + "context" "crypto/rand" "encoding/hex" "testing" @@ -25,6 +26,8 @@ func TestScheduledExecutionStrategy_LocalDON(t *testing.T) { var gotTime time.Time var called bool + log := logger.TestLogger(t) + // Our capability has DONInfo == nil, so we'll treat it as a local // capability and use the local DON Info to determine the transmission // schedule. @@ -42,8 +45,6 @@ func TestScheduledExecutionStrategy_LocalDON(t *testing.T) { }, ) - l := logger.TestLogger(t) - // The combination of this key and the metadata above // will yield the permutation [3, 2, 0, 1] key, err := hex.DecodeString("fb13ca015a9ec60089c7141e9522de79") @@ -138,19 +139,17 @@ func TestScheduledExecutionStrategy_LocalDON(t *testing.T) { randKey(), randKey(), } - don := &capabilities.DON{ + don := capabilities.DON{ Members: ids, Config: capabilities.DONConfig{ SharedSecret: [16]byte(key), }, } peerID := ids[tc.position] - de := scheduledExecution{ - DON: don, - PeerID: &peerID, - Position: tc.position, - } - _, err = de.Apply(tests.Context(t), l, mt, req) + localTargetCapability := NewLocalTargetCapability(log, peerID, don, mt) + + _, err = localTargetCapability.Execute(tests.Context(t), req) + require.NoError(t, err) require.True(t, called) @@ -167,3 +166,40 @@ func randKey() [32]byte { } return [32]byte(key) } + +type mockCapability struct { + capabilities.CapabilityInfo + capabilities.CallbackExecutable + response chan capabilities.CapabilityResponse + transform func(capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) +} + +func newMockCapability(info capabilities.CapabilityInfo, transform func(capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error)) *mockCapability { + return &mockCapability{ + transform: transform, + CapabilityInfo: info, + response: make(chan capabilities.CapabilityResponse, 10), + } +} + +func (m *mockCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { + cr, err := m.transform(req) + if err != nil { + return nil, err + } + + ch := make(chan capabilities.CapabilityResponse, 10) + + m.response <- cr + ch <- cr + close(ch) + return ch, nil +} + +func (m *mockCapability) RegisterToWorkflow(ctx context.Context, request capabilities.RegisterToWorkflowRequest) error { + return nil +} + +func (m *mockCapability) UnregisterFromWorkflow(ctx context.Context, request capabilities.UnregisterFromWorkflowRequest) error { + return nil +} diff --git a/core/capabilities/transmission/transmission.go b/core/capabilities/transmission/transmission.go index 0129d600265..5121a9bf9f3 100644 --- a/core/capabilities/transmission/transmission.go +++ b/core/capabilities/transmission/transmission.go @@ -13,8 +13,6 @@ import ( p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) -// TODO determine location for this code - var ( // S = [N] Schedule_AllAtOnce = "allAtOnce" diff --git a/core/cmd/shell.go b/core/cmd/shell.go index d6f99955e10..0ababfca58d 100644 --- a/core/cmd/shell.go +++ b/core/cmd/shell.go @@ -31,6 +31,7 @@ import ( "github.com/jmoiron/sqlx" + commoncapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox" @@ -43,6 +44,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services" "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm/mercury/wsrpc/cache" @@ -166,7 +168,7 @@ func (n ChainlinkAppFactory) NewApplication(ctx context.Context, cfg chainlink.G LatestReportDeadline: cfg.Mercury().Cache().LatestReportDeadline(), }) - capabilitiesRegistry := capabilities.NewRegistry(appLggr) + capabilitiesRegistry := capabilities.NewRegistry(appLggr, p2ptypes.PeerID{}, commoncapabilities.DON{}) // create the relayer-chain interoperators from application configuration relayerFactory := chainlink.RelayerFactory{ diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 3eeaaa880ed..d796faa00c4 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -17,6 +17,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap/zapcore" + commoncapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/loop" commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -200,10 +201,6 @@ func NewApplication(opts ApplicationOpts) (Application, error) { restrictedHTTPClient := opts.RestrictedHTTPClient unrestrictedHTTPClient := opts.UnrestrictedHTTPClient - if opts.CapabilitiesRegistry == nil { - opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger) - } - var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger) @@ -212,10 +209,31 @@ func NewApplication(opts ApplicationOpts) (Application, error) { srvcs = append(srvcs, externalPeerWrapper) + networkSetup, err := capabilities.NewHardcodedDonNetworkSetup(externalPeerWrapper) + if err != nil { + return nil, fmt.Errorf("failed to create hardcoded Don network setup: %w", err) + } + + if opts.CapabilitiesRegistry == nil { + peerID := externalPeerWrapper.GetPeer().ID() + if networkSetup.IsWorkflowDon(peerID) { + opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger, peerID, networkSetup.WorkflowsDonInfo) + } else if networkSetup.IsTriggerDon(peerID) { + opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger, peerID, networkSetup.TriggerCapabilityDonInfo) + } else { + return nil, fmt.Errorf("peer %s is not a member of any known DON", peerID) + } + } + // NOTE: RegistrySyncer will depend on a Relayer when fully implemented dispatcher := remote.NewDispatcher(externalPeerWrapper, signer, opts.CapabilitiesRegistry, globalLogger) - registrySyncer := capabilities.NewRegistrySyncer(externalPeerWrapper, opts.CapabilitiesRegistry, dispatcher, globalLogger) + registrySyncer := capabilities.NewRegistrySyncer(externalPeerWrapper, opts.CapabilitiesRegistry, dispatcher, globalLogger, networkSetup) + srvcs = append(srvcs, dispatcher, registrySyncer) + } else { + if opts.CapabilitiesRegistry == nil { + opts.CapabilitiesRegistry = capabilities.NewRegistry(globalLogger, p2ptypes.PeerID{}, commoncapabilities.DON{}) + } } // LOOPs can be created as options, in the case of LOOP relayers, or diff --git a/core/services/job/spawner_test.go b/core/services/job/spawner_test.go index 8ed08a1cb8a..8b98fd219fb 100644 --- a/core/services/job/spawner_test.go +++ b/core/services/job/spawner_test.go @@ -12,6 +12,7 @@ import ( "github.com/jmoiron/sqlx" + commoncapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/loop" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/services/servicetest" @@ -19,6 +20,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-common/pkg/utils/mailbox/mailboxtest" "github.com/smartcontractkit/chainlink/v2/core/capabilities" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/bridges" mocklp "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks" @@ -296,7 +298,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { evmRelayer, err := evmrelayer.NewRelayer(lggr, chain, evmrelayer.RelayerOpts{ DS: db, CSAETHKeystore: keyStore, - CapabilitiesRegistry: capabilities.NewRegistry(lggr), + CapabilitiesRegistry: capabilities.NewRegistry(lggr, p2ptypes.PeerID{}, commoncapabilities.DON{}), }) assert.NoError(t, err) @@ -314,7 +316,7 @@ func TestSpawner_CreateJobDeleteJob(t *testing.T) { ocr2DelegateConfig := ocr2.NewDelegateConfig(config.OCR2(), config.Mercury(), config.Threshold(), config.Insecure(), config.JobPipeline(), processConfig) d := ocr2.NewDelegate(nil, orm, nil, nil, nil, nil, nil, monitoringEndpoint, legacyChains, lggr, ocr2DelegateConfig, - keyStore.OCR2(), keyStore.DKGSign(), keyStore.DKGEncrypt(), ethKeyStore, testRelayGetter, mailMon, capabilities.NewRegistry(lggr)) + keyStore.OCR2(), keyStore.DKGSign(), keyStore.DKGEncrypt(), ethKeyStore, testRelayGetter, mailMon, capabilities.NewRegistry(lggr, p2ptypes.PeerID{}, commoncapabilities.DON{})) delegateOCR2 := &delegate{jobOCR2VRF.Type, []job.ServiceCtx{}, 0, nil, d} spawner := job.NewSpawner(orm, config.Database(), noopChecker{}, map[job.Type]job.Delegate{ diff --git a/core/services/relay/evm/write_target_test.go b/core/services/relay/evm/write_target_test.go index 76060dce990..b7a1199b5ed 100644 --- a/core/services/relay/evm/write_target_test.go +++ b/core/services/relay/evm/write_target_test.go @@ -8,6 +8,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/internal/cltest" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/pgtest" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/relay/evm" "github.com/smartcontractkit/chainlink-common/pkg/capabilities" @@ -71,7 +72,7 @@ func TestEvmWrite(t *testing.T) { relayer, err := relayevm.NewRelayer(lggr, chain, relayevm.RelayerOpts{ DS: db, CSAETHKeystore: keyStore, - CapabilitiesRegistry: evmcapabilities.NewRegistry(lggr), + CapabilitiesRegistry: evmcapabilities.NewRegistry(lggr, p2ptypes.PeerID{}, capabilities.DON{}), }) require.NoError(t, err) diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index edb1203c954..cfa15905f29 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -119,10 +119,10 @@ func (e *Engine) resolveWorkflowCapabilities(ctx context.Context) error { err := e.initializeCapability(ctx, s) if err != nil { - return err + return fmt.Errorf("failed to initialize capability for step %s: %w", s.Ref, err) } - return e.initializeExecutionStrategy(s) + return nil }) return capabilityRegistrationErr @@ -261,59 +261,6 @@ func (e *Engine) resumeInProgressExecutions(ctx context.Context) error { return nil } -// initializeExecutionStrategy for `step`. -// Broadly speaking, we'll use `immediateExecution` for non-target steps -// and `scheduledExecution` for targets. If we don't have the necessary -// config to initialize a scheduledExecution for a target, we'll fallback to -// using `immediateExecution`. -func (e *Engine) initializeExecutionStrategy(s *step) error { - if s.executionStrategy != nil { - return nil - } - - // If donInfo has no peerID, then the peer wrapper hasn't been initialized. - // Let's error and try again next time around. - if e.donInfo.PeerID() == nil { - return fmt.Errorf("failed to initialize execution strategy: peer ID %s has not been initialized", e.donInfo.PeerID()) - } - - ie := immediateExecution{} - if s.CapabilityType != capabilities.CapabilityTypeTarget { - e.logger.Debugf("initializing step %+v with immediate execution strategy: not a target", s) - s.executionStrategy = ie - return nil - } - - dinfo := e.donInfo - if dinfo.DON == nil { - e.logger.Debugf("initializing target step with immediate execution strategy: donInfo %+v", e.donInfo) - s.executionStrategy = ie - return nil - } - - var position *int - for i, w := range dinfo.Members { - if w == *dinfo.PeerID() { - idx := i - position = &idx - } - } - - if position == nil { - e.logger.Debugf("initializing step %+v with immediate execution strategy: position not found in donInfo %+v", s, e.donInfo) - s.executionStrategy = ie - return nil - } - - s.executionStrategy = scheduledExecution{ - DON: e.donInfo.DON, - Position: *position, - PeerID: e.donInfo.PeerID(), - } - e.logger.Debugf("initializing step %+v with scheduled execution strategy", s) - return nil -} - // registerTrigger is used during the initialization phase to bind a trigger to this workflow func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) error { triggerInputs, err := values.NewMap( @@ -660,7 +607,7 @@ func (e *Engine) executeStep(ctx context.Context, l logger.Logger, msg stepReque }, } - output, err := step.executionStrategy.Apply(ctx, l, step.capability, tr) + output, err := executeSyncAndUnwrapSingleValue(ctx, step.capability, tr) if err != nil { return inputs, nil, err } @@ -865,3 +812,21 @@ func NewEngine(cfg Config) (engine *Engine, err error) { } return engine, nil } + +// ExecuteSyncAndUnwrapSingleValue is a convenience method that executes a capability synchronously and unwraps the +// result if it is a single value otherwise returns the list. +func executeSyncAndUnwrapSingleValue(ctx context.Context, cap capabilities.CallbackCapability, req capabilities.CapabilityRequest) (values.Value, error) { + l, err := capabilities.ExecuteSync(ctx, cap, req) + if err != nil { + return nil, err + } + + // `ExecuteSync` returns a `values.List` even if there was + // just one return value. If that is the case, let's unwrap the + // single value to make it easier to use in -- for example -- variable interpolation. + if len(l.Underlying) > 1 { + return l, nil + } + + return l.Underlying[0], nil +} diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 808e0b2555d..aebb5c2066f 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -199,7 +199,7 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { for _, tc := range testCases { t.Run(tc.name, func(t *testing.T) { ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t)) + reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) trigger, cr := mockTrigger(t) @@ -386,7 +386,7 @@ func mockTarget() *mockCapability { func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t)) + reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) trigger, _ := mockTrigger(t) @@ -480,7 +480,7 @@ func mockAction() (*mockCapability, values.Value) { func TestEngine_MultiStepDependencies(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t)) + reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) trigger, cr := mockTrigger(t) @@ -523,7 +523,7 @@ func TestEngine_MultiStepDependencies(t *testing.T) { func TestEngine_ResumesPendingExecutions(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t)) + reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) trigger := mockNoopTrigger(t) resp, err := values.NewMap(map[string]any{ @@ -577,7 +577,7 @@ func TestEngine_ResumesPendingExecutions(t *testing.T) { func TestEngine_TimesOutOldExecutions(t *testing.T) { t.Parallel() ctx := testutils.Context(t) - reg := coreCap.NewRegistry(logger.TestLogger(t)) + reg := coreCap.NewRegistry(logger.TestLogger(t), p2ptypes.PeerID{}, capabilities.DON{}) trigger := mockNoopTrigger(t) resp, err := values.NewMap(map[string]any{ diff --git a/core/services/workflows/execution_strategy.go b/core/services/workflows/execution_strategy.go deleted file mode 100644 index 5cc8164c4f7..00000000000 --- a/core/services/workflows/execution_strategy.go +++ /dev/null @@ -1,95 +0,0 @@ -package workflows - -import ( - "context" - "fmt" - "time" - - "github.com/smartcontractkit/chainlink-common/pkg/capabilities" - "github.com/smartcontractkit/chainlink-common/pkg/values" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" - "github.com/smartcontractkit/chainlink/v2/core/logger" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" -) - -type executionStrategy interface { - Apply(ctx context.Context, l logger.Logger, cap capabilities.CallbackCapability, req capabilities.CapabilityRequest) (values.Value, error) -} - -var _ executionStrategy = immediateExecution{} - -type immediateExecution struct{} - -func (i immediateExecution) Apply(ctx context.Context, lggr logger.Logger, cap capabilities.CallbackCapability, req capabilities.CapabilityRequest) (values.Value, error) { - l, err := capabilities.ExecuteSync(ctx, cap, req) - if err != nil { - return nil, err - } - - // `ExecuteSync` returns a `values.List` even if there was - // just one return value. If that is the case, let's unwrap the - // single value to make it easier to use in -- for example -- variable interpolation. - if len(l.Underlying) > 1 { - return l, nil - } - - return l.Underlying[0], nil -} - -var _ executionStrategy = scheduledExecution{} - -type scheduledExecution struct { - DON *capabilities.DON - PeerID *p2ptypes.PeerID - Position int -} - -// scheduledExecution generates a pseudo-random transmission schedule, -// and delays execution until a node is required to transmit. -func (d scheduledExecution) Apply(ctx context.Context, lggr logger.Logger, cap capabilities.CallbackCapability, req capabilities.CapabilityRequest) (values.Value, error) { - tc, err := transmission.ExtractTransmissionConfig(req.Config) - if err != nil { - return nil, fmt.Errorf("failed to extract transmission config from request config: %w", err) - } - - info, err := cap.Info(ctx) - if err != nil { - return nil, err - } - - switch { - // Case 1: Local DON - case info.DON == nil: - - // The transmission ID is created using the workflow ID and the workflow execution ID which nodes don't know - // ahead of time and ensures a malicious node cannot game the schedule. - peerIDToTransmissionDelay, err := transmission.GetPeerIDToTransmissionDelay(d.DON.Members, d.DON.Config.SharedSecret, - req.Metadata.WorkflowID+req.Metadata.WorkflowExecutionID, tc) - if err != nil { - return nil, fmt.Errorf("failed to get peer ID to transmission delay map: %w", err) - } - - delay, existsForPeerID := peerIDToTransmissionDelay[*d.PeerID] - if !existsForPeerID { - lggr.Debugw("skipping transmission: node is not included in schedule") - return nil, nil - } - - lggr.Debugf("execution delayed by %+v", delay) - select { - case <-ctx.Done(): - return nil, ctx.Err() - case <-time.After(delay): - lggr.Debugw("executing delayed execution") - return immediateExecution{}.Apply(ctx, lggr, cap, req) - } - // Case 2: Remote DON - default: - - // In this case just execute immediately on the capability and the shims will handle the scheduling and f+1 aggregation - - // TODO: fill in the remote DON case once consensus has been reach on what to do. - lggr.Debugw("remote DON transmission not implemented: using immediate execution") - return immediateExecution{}.Apply(ctx, lggr, cap, req) - } -} diff --git a/core/services/workflows/models.go b/core/services/workflows/models.go index 14e75c6e5d9..d55212a30ca 100644 --- a/core/services/workflows/models.go +++ b/core/services/workflows/models.go @@ -78,9 +78,8 @@ func (w *workflow) dependents(start string) ([]*step, error) { // step wraps a Vertex with additional context for execution that is mutated by the engine type step struct { workflows.Vertex - capability capabilities.CallbackCapability - config *values.Map - executionStrategy executionStrategy + capability capabilities.CallbackCapability + config *values.Map } type triggerCapability struct {