diff --git a/.github/workflows/solidity.yml b/.github/workflows/solidity.yml index 5940313fc65..b2dc3d4153c 100644 --- a/.github/workflows/solidity.yml +++ b/.github/workflows/solidity.yml @@ -258,10 +258,7 @@ jobs: uses: smartcontractkit/.github/actions/ci-publish-npm@4b0ab756abcb1760cb82e1e87b94ff431905bffc # ci-publish-npm@0.4.0 with: npm-token: ${{ secrets.NPM_TOKEN }} - github-token: ${{ secrets.GITHUB_TOKEN }} - github-release-tag-name: ${{ github.ref_name }} - github-release-changelog-path: "contracts/CHANGELOG.md" - create-github-release: true + create-github-release: false publish-command: "pnpm publish-prod --no-git-checks" package-json-directory: contracts diff --git a/core/capabilities/remote/target.go b/core/capabilities/remote/target.go index 92b0724512a..655f4f84abb 100644 --- a/core/capabilities/remote/target.go +++ b/core/capabilities/remote/target.go @@ -4,6 +4,7 @@ import ( "context" "errors" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -12,7 +13,7 @@ import ( // remoteTargetCaller/Receiver are shims translating between capability API calls and network messages type remoteTargetCaller struct { capInfo commoncap.CapabilityInfo - donInfo *types.DON + donInfo *capabilities.DON dispatcher types.Dispatcher lggr logger.Logger } @@ -22,14 +23,14 @@ var _ types.Receiver = &remoteTargetCaller{} type remoteTargetReceiver struct { capInfo commoncap.CapabilityInfo - donInfo *types.DON + donInfo *capabilities.DON dispatcher types.Dispatcher lggr logger.Logger } var _ types.Receiver = &remoteTargetReceiver{} -func NewRemoteTargetCaller(capInfo commoncap.CapabilityInfo, donInfo *types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetCaller { +func NewRemoteTargetCaller(capInfo commoncap.CapabilityInfo, donInfo *capabilities.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetCaller { return &remoteTargetCaller{ capInfo: capInfo, donInfo: donInfo, @@ -72,7 +73,7 @@ func (c *remoteTargetCaller) Receive(msg *types.MessageBody) { c.lggr.Debugw("not implemented - received message", "capabilityId", c.capInfo.ID, "payload", msg.Payload) } -func NewRemoteTargetReceiver(capInfo commoncap.CapabilityInfo, donInfo *types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetReceiver { +func NewRemoteTargetReceiver(capInfo commoncap.CapabilityInfo, donInfo *capabilities.DON, dispatcher types.Dispatcher, lggr logger.Logger) *remoteTargetReceiver { return &remoteTargetReceiver{ capInfo: capInfo, donInfo: donInfo, diff --git a/core/capabilities/remote/target_test.go b/core/capabilities/remote/target_test.go index a9e72d778df..0f9bad51f67 100644 --- a/core/capabilities/remote/target_test.go +++ b/core/capabilities/remote/target_test.go @@ -6,9 +6,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote" - "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types" remoteMocks "github.com/smartcontractkit/chainlink/v2/core/capabilities/remote/types/mocks" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" @@ -18,7 +18,7 @@ import ( func TestTarget_Placeholder(t *testing.T) { lggr := logger.TestLogger(t) ctx := testutils.Context(t) - donInfo := &types.DON{ + donInfo := &capabilities.DON{ Members: []p2ptypes.PeerID{{}}, } dispatcher := remoteMocks.NewDispatcher(t) diff --git a/core/capabilities/remote/trigger_publisher.go b/core/capabilities/remote/trigger_publisher.go index d06254657c7..12d539a2dc0 100644 --- a/core/capabilities/remote/trigger_publisher.go +++ b/core/capabilities/remote/trigger_publisher.go @@ -24,8 +24,8 @@ type triggerPublisher struct { config types.RemoteTriggerConfig underlying commoncap.TriggerCapability capInfo commoncap.CapabilityInfo - capDonInfo types.DON - workflowDONs map[string]types.DON + capDonInfo commoncap.DON + workflowDONs map[string]commoncap.DON dispatcher types.Dispatcher messageCache *messageCache[registrationKey, p2ptypes.PeerID] registrations map[registrationKey]*pubRegState @@ -48,7 +48,7 @@ type pubRegState struct { var _ types.Receiver = &triggerPublisher{} var _ services.Service = &triggerPublisher{} -func NewTriggerPublisher(config types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, workflowDONs map[string]types.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher { +func NewTriggerPublisher(config types.RemoteTriggerConfig, underlying commoncap.TriggerCapability, capInfo commoncap.CapabilityInfo, capDonInfo commoncap.DON, workflowDONs map[string]commoncap.DON, dispatcher types.Dispatcher, lggr logger.Logger) *triggerPublisher { config.ApplyDefaults() return &triggerPublisher{ config: config, diff --git a/core/capabilities/remote/trigger_publisher_test.go b/core/capabilities/remote/trigger_publisher_test.go index dd107e12e61..71a5174c07f 100644 --- a/core/capabilities/remote/trigger_publisher_test.go +++ b/core/capabilities/remote/trigger_publisher_test.go @@ -29,12 +29,12 @@ func TestTriggerPublisher_Register(t *testing.T) { require.NoError(t, p1.UnmarshalText([]byte(peerID1))) p2 := p2ptypes.PeerID{} require.NoError(t, p2.UnmarshalText([]byte(peerID2))) - capDonInfo := remotetypes.DON{ + capDonInfo := commoncap.DON{ ID: "capability-don", Members: []p2ptypes.PeerID{p1}, F: 0, } - workflowDonInfo := remotetypes.DON{ + workflowDonInfo := commoncap.DON{ ID: "workflow-don", Members: []p2ptypes.PeerID{p2}, F: 0, @@ -47,7 +47,7 @@ func TestTriggerPublisher_Register(t *testing.T) { MinResponsesToAggregate: 1, MessageExpiryMs: 100_000, } - workflowDONs := map[string]remotetypes.DON{ + workflowDONs := map[string]commoncap.DON{ workflowDonInfo.ID: workflowDonInfo, } underlying := &testTrigger{ diff --git a/core/capabilities/remote/trigger_subscriber.go b/core/capabilities/remote/trigger_subscriber.go index a7cb58c008b..e9704bf1865 100644 --- a/core/capabilities/remote/trigger_subscriber.go +++ b/core/capabilities/remote/trigger_subscriber.go @@ -6,6 +6,7 @@ import ( sync "sync" "time" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/pb" "github.com/smartcontractkit/chainlink-common/pkg/services" @@ -24,9 +25,9 @@ import ( type triggerSubscriber struct { config types.RemoteTriggerConfig capInfo commoncap.CapabilityInfo - capDonInfo types.DON + capDonInfo capabilities.DON capDonMembers map[p2ptypes.PeerID]struct{} - localDonInfo types.DON + localDonInfo capabilities.DON dispatcher types.Dispatcher aggregator types.Aggregator messageCache *messageCache[triggerEventKey, p2ptypes.PeerID] @@ -54,8 +55,7 @@ var _ services.Service = &triggerSubscriber{} // TODO makes this configurable with a default const defaultSendChannelBufferSize = 1000 -func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo types.DON, localDonInfo types.DON, - dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber { +func NewTriggerSubscriber(config types.RemoteTriggerConfig, capInfo commoncap.CapabilityInfo, capDonInfo capabilities.DON, localDonInfo capabilities.DON, dispatcher types.Dispatcher, aggregator types.Aggregator, lggr logger.Logger) *triggerSubscriber { if aggregator == nil { lggr.Warnw("no aggregator provided, using default MODE aggregator", "capabilityId", capInfo.ID) aggregator = NewDefaultModeAggregator(uint32(capDonInfo.F + 1)) diff --git a/core/capabilities/remote/trigger_subscriber_test.go b/core/capabilities/remote/trigger_subscriber_test.go index df04306e2b0..4d251d49dc8 100644 --- a/core/capabilities/remote/trigger_subscriber_test.go +++ b/core/capabilities/remote/trigger_subscriber_test.go @@ -38,12 +38,12 @@ func TestTriggerSubscriber_RegisterAndReceive(t *testing.T) { require.NoError(t, p1.UnmarshalText([]byte(peerID1))) p2 := p2ptypes.PeerID{} require.NoError(t, p2.UnmarshalText([]byte(peerID2))) - capDonInfo := remotetypes.DON{ + capDonInfo := commoncap.DON{ ID: "capability-don", Members: []p2ptypes.PeerID{p1}, F: 0, } - workflowDonInfo := remotetypes.DON{ + workflowDonInfo := commoncap.DON{ ID: "workflow-don", Members: []p2ptypes.PeerID{p2}, F: 0, diff --git a/core/capabilities/syncer.go b/core/capabilities/syncer.go index dc9126dba36..67d388a8221 100644 --- a/core/capabilities/syncer.go +++ b/core/capabilities/syncer.go @@ -6,6 +6,7 @@ import ( "sync" "time" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" commoncap "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/mercury" "github.com/smartcontractkit/chainlink-common/pkg/capabilities/triggers" @@ -70,7 +71,7 @@ func (s *registrySyncer) Start(ctx context.Context) error { "12D3KooWN2hztiXNNS1jMQTTvvPRYcarK1C7T3Mdqk4x4gwyo5WS", } allPeers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) - addPeersToDONInfo := func(peers []string, donInfo *remotetypes.DON) error { + addPeersToDONInfo := func(peers []string, donInfo *capabilities.DON) error { for _, peerID := range peers { var p ragetypes.PeerID err := p.UnmarshalText([]byte(peerID)) @@ -82,11 +83,11 @@ func (s *registrySyncer) Start(ctx context.Context) error { } return nil } - workflowDonInfo := remotetypes.DON{ID: "workflowDon1", F: 1} + workflowDonInfo := capabilities.DON{ID: "workflowDon1", F: 1} if err := addPeersToDONInfo(workflowDONPeers, &workflowDonInfo); err != nil { return err } - triggerCapabilityDonInfo := remotetypes.DON{ID: "capabilityDon1", F: 1} + triggerCapabilityDonInfo := capabilities.DON{ID: "capabilityDon1", F: 1} if err := addPeersToDONInfo(triggerDONPeers, &triggerCapabilityDonInfo); err != nil { return err } @@ -101,6 +102,7 @@ func (s *registrySyncer) Start(ctx context.Context) error { CapabilityType: commoncap.CapabilityTypeTrigger, Description: "Remote Trigger", Version: "0.0.1", + DON: &triggerCapabilityDonInfo, } myId := s.peerWrapper.GetPeer().ID().String() config := remotetypes.RemoteTriggerConfig{ @@ -125,7 +127,7 @@ func (s *registrySyncer) Start(ctx context.Context) error { } if slices.Contains(triggerDONPeers, myId) { s.lggr.Info("member of a capability DON - starting remote publishers") - workflowDONs := map[string]remotetypes.DON{ + workflowDONs := map[string]capabilities.DON{ workflowDonInfo.ID: workflowDonInfo, } underlying := triggers.NewMercuryTriggerService(1000, s.lggr) diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 0ac6555aecc..2aebef3f8f7 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -49,6 +49,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2" "github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" externalp2p "github.com/smartcontractkit/chainlink/v2/core/services/p2p/wrapper" "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" @@ -189,9 +190,12 @@ func NewApplication(opts ApplicationOpts) (Application, error) { unrestrictedHTTPClient := opts.UnrestrictedHTTPClient registry := capabilities.NewRegistry(globalLogger) + var externalPeerWrapper p2ptypes.PeerWrapper if cfg.Capabilities().Peering().Enabled() { - externalPeerWrapper := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger) - signer := externalPeerWrapper + externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger) + signer := externalPeer + externalPeerWrapper = externalPeer + srvcs = append(srvcs, externalPeerWrapper) // NOTE: RegistrySyncer will depend on a Relayer when fully implemented @@ -367,16 +371,26 @@ func NewApplication(opts ApplicationOpts) (Application, error) { globalLogger, streamRegistry, pipelineRunner, - cfg.JobPipeline()), - job.Workflow: workflows.NewDelegate( - globalLogger, - registry, - legacyEVMChains, + cfg.JobPipeline(), ), } webhookJobRunner = delegates[job.Webhook].(*webhook.Delegate).WebhookJobRunner() ) + delegates[job.Workflow] = workflows.NewDelegate( + globalLogger, + registry, + legacyEVMChains, + func() *p2ptypes.PeerID { + if externalPeerWrapper == nil { + return nil + } + + peerID := externalPeerWrapper.GetPeer().ID() + return &peerID + }, + ) + // Flux monitor requires ethereum just to boot, silence errors with a null delegate if !cfg.EVMRPCEnabled() { delegates[job.FluxMonitor] = &job.NullDelegate{Type: job.FluxMonitor} diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index e22a78212d2..8dc440da477 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -2,22 +2,26 @@ package workflows import ( "context" + "encoding/hex" "fmt" "github.com/google/uuid" "github.com/pelletier/go-toml" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink/v2/core/capabilities/targets" "github.com/smartcontractkit/chainlink/v2/core/chains/legacyevm" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) type Delegate struct { registry core.CapabilitiesRegistry logger logger.Logger legacyEVMChains legacyevm.LegacyChainContainer + peerID func() *p2ptypes.PeerID } var _ job.Delegate = (*Delegate)(nil) @@ -42,11 +46,18 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser d.logger.Errorw("could not initialize writes", err) } + dinfo, err := initializeDONInfo(d.logger) + if err != nil { + d.logger.Errorw("could not add initialize don info", err) + } + cfg := Config{ Lggr: d.logger, Spec: spec.WorkflowSpec.Workflow, WorkflowID: spec.WorkflowSpec.WorkflowID, Registry: d.registry, + DONInfo: dinfo, + PeerID: d.peerID, } engine, err := NewEngine(cfg) if err != nil { @@ -55,8 +66,45 @@ func (d *Delegate) ServicesForSpec(ctx context.Context, spec job.Job) ([]job.Ser return []job.ServiceCtx{engine}, nil } -func NewDelegate(logger logger.Logger, registry core.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer) *Delegate { - return &Delegate{logger: logger, registry: registry, legacyEVMChains: legacyEVMChains} +func initializeDONInfo(lggr logger.Logger) (*capabilities.DON, error) { + var key [16]byte + + // TODO: fetch the key and DONInfo from the registry + keyString := "44fb5c1ee8ee48846c808a383da3aba3" + k, err := hex.DecodeString(keyString) + if err != nil { + lggr.Errorf("could not decode key %s: %w", keyString, err) + } + key = [16]byte(k) + + p2pStrings := []string{ + "12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N", + "12D3KooWG1AyvwmCpZ93J8pBQUE1SuzrjDXnT4BeouncHR3jWLCG", + "12D3KooWGeUKZBRMbx27FUTgBwZa9Ap9Ym92mywwpuqkEtz8XWyv", + "12D3KooW9zYWQv3STmDeNDidyzxsJSTxoCTLicafgfeEz9nhwhC4", + } + + p2pIDs := []p2ptypes.PeerID{} + for _, p := range p2pStrings { + pid := p2ptypes.PeerID{} + err := pid.UnmarshalText([]byte(p)) + if err != nil { + return nil, err + } + + p2pIDs = append(p2pIDs, pid) + } + + return &capabilities.DON{ + Members: p2pIDs, + Config: capabilities.DONConfig{ + SharedSecret: key, + }, + }, nil +} + +func NewDelegate(logger logger.Logger, registry core.CapabilitiesRegistry, legacyEVMChains legacyevm.LegacyChainContainer, peerID func() *p2ptypes.PeerID) *Delegate { + return &Delegate{logger: logger, registry: registry, legacyEVMChains: legacyEVMChains, peerID: peerID} } func ValidatedWorkflowSpec(tomlString string) (job.Job, error) { diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 052c2c86647..0ecc311acac 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -13,6 +13,7 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink-common/pkg/values" "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) const ( @@ -21,12 +22,18 @@ const ( mockedWorkflowID = "15c631d295ef5e32deb99a10ee6804bc4af1385568f9b3363f6552ac6dbb2cef" ) +type donInfo struct { + *capabilities.DON + PeerID func() *p2ptypes.PeerID +} + // Engine handles the lifecycle of a single workflow and its executions. type Engine struct { services.StateMachine logger logger.Logger registry core.CapabilitiesRegistry workflow *workflow + donInfo donInfo executionStates *inMemoryStore pendingStepRequests chan stepRequest triggerEvents chan capabilities.CapabilityResponse @@ -88,7 +95,11 @@ LOOP: continue } - // Walk the graph and register each step's capability to this workflow + // Walk the graph and initialize each step. + // This means: + // - fetching the capability + // - register the capability to this workflow + // - initializing the step's executionStrategy err := e.workflow.walkDo(keywordTrigger, func(s *step) error { // The graph contains a dummy step for triggers, but // we handle triggers separately since there might be more than one. @@ -96,45 +107,12 @@ LOOP: return nil } - // If the capability already exists, that means we've already registered it - if s.capability != nil { - return nil - } - - cp, innerErr := e.registry.Get(ctx, s.Type) - if innerErr != nil { - return fmt.Errorf("failed to get capability with ref %s: %s, retrying in %d seconds", s.Type, innerErr, retrySec) - } - - // We only need to configure actions, consensus and targets here, and - // they all satisfy the `CallbackExecutable` interface - cc, ok := cp.(capabilities.CallbackExecutable) - if !ok { - return fmt.Errorf("could not coerce capability %s to CallbackExecutable", s.Type) - } - - if s.config == nil { - configMap, ierr := values.NewMap(s.Config) - if ierr != nil { - return fmt.Errorf("failed to convert config to values.Map: %s", ierr) - } - s.config = configMap - } - - reg := capabilities.RegisterToWorkflowRequest{ - Metadata: capabilities.RegistrationMetadata{ - WorkflowID: e.workflow.id, - }, - Config: s.config, - } - - innerErr = cc.RegisterToWorkflow(ctx, reg) - if innerErr != nil { - return fmt.Errorf("failed to register to workflow (%+v): %w", reg, innerErr) + err := e.initializeCapability(ctx, s, retrySec) + if err != nil { + return err } - s.capability = cc - return nil + return e.initializeExecutionStrategy(s) }) if err != nil { initSuccessful = false @@ -158,6 +136,101 @@ LOOP: e.logger.Info("engine initialized") } +func (e *Engine) initializeCapability(ctx context.Context, s *step, retrySec int) error { + // If the capability already exists, that means we've already registered it + if s.capability != nil { + return nil + } + + cp, innerErr := e.registry.Get(ctx, s.Type) + if innerErr != nil { + return fmt.Errorf("failed to get capability with ref %s: %s, retrying in %d seconds", s.Type, innerErr, retrySec) + } + + // We only need to configure actions, consensus and targets here, and + // they all satisfy the `CallbackCapability` interface + cc, ok := cp.(capabilities.CallbackCapability) + if !ok { + return fmt.Errorf("could not coerce capability %s to CallbackCapability", s.Type) + } + + if s.config == nil { + configMap, ierr := values.NewMap(s.Config) + if ierr != nil { + return fmt.Errorf("failed to convert config to values.Map: %s", ierr) + } + s.config = configMap + } + + reg := capabilities.RegisterToWorkflowRequest{ + Metadata: capabilities.RegistrationMetadata{ + WorkflowID: e.workflow.id, + }, + Config: s.config, + } + + innerErr = cc.RegisterToWorkflow(ctx, reg) + if innerErr != nil { + return fmt.Errorf("failed to register to workflow (%+v): %w", reg, innerErr) + } + + s.capability = cc + return nil +} + +// initializeExecutionStrategy for `step`. +// Broadly speaking, we'll use `immediateExecution` for non-target steps +// and `scheduledExecution` for targets. If we don't have the necessary +// config to initialize a scheduledExecution for a target, we'll fallback to +// using `immediateExecution`. +func (e *Engine) initializeExecutionStrategy(step *step) error { + if step.executionStrategy != nil { + return nil + } + + // If donInfo has no peerID, then the peer wrapper hasn't been initialized. + // Let's error and try again next time around. + if e.donInfo.PeerID() == nil { + return fmt.Errorf("failed to initialize execution strategy: peer ID %s has not been initialized", e.donInfo.PeerID()) + } + + ie := immediateExecution{} + if step.CapabilityType != capabilities.CapabilityTypeTarget { + e.logger.Debugf("initializing step %+v with immediate execution strategy: not a target", step) + step.executionStrategy = ie + return nil + } + + dinfo := e.donInfo + if dinfo.DON == nil { + e.logger.Debugf("initializing target step with immediate execution strategy: donInfo %+v", e.donInfo) + step.executionStrategy = ie + return nil + } + + var position *int + for i, w := range dinfo.Members { + if w == *dinfo.PeerID() { + idx := i + position = &idx + } + } + + if position == nil { + e.logger.Debugf("initializing step %+v with immediate execution strategy: position not found in donInfo %+v", step, e.donInfo) + step.executionStrategy = ie + return nil + } + + step.executionStrategy = scheduledExecution{ + DON: e.donInfo.DON, + Position: *position, + PeerID: e.donInfo.PeerID(), + } + e.logger.Debugf("initializing step %+v with scheduled execution strategy", step) + return nil +} + // registerTrigger is used during the initialization phase to bind a trigger to this workflow func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability) error { triggerInputs, err := values.NewMap( @@ -436,20 +509,24 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { defer func() { e.newWorkerCh <- struct{}{} }() defer e.wg.Done() - e.logger.Debugw("executing on a step event", "stepRef", msg.stepRef, "executionID", msg.state.executionID) + // Instantiate a child logger; in addition to the WorkflowID field the workflow + // logger will already have, this adds the `stepRef` and `executionID` + l := e.logger.With("stepRef", msg.stepRef, "executionID", msg.state.executionID) + + l.Debugw("executing on a step event") stepState := &stepState{ outputs: &stepOutput{}, executionID: msg.state.executionID, ref: msg.stepRef, } - inputs, outputs, err := e.executeStep(ctx, msg) + inputs, outputs, err := e.executeStep(ctx, l, msg) if err != nil { - e.logger.Errorf("error executing step request: %s", err, "executionID", msg.state.executionID, "stepRef", msg.stepRef) + l.Errorf("error executing step request: %s", err) stepState.outputs.err = err stepState.status = statusErrored } else { - e.logger.Infow("step executed successfully", "executionID", msg.state.executionID, "stepRef", msg.stepRef, "outputs", outputs) + l.Infow("step executed successfully", "outputs", outputs) stepState.outputs.value = outputs stepState.status = statusCompleted } @@ -464,13 +541,13 @@ func (e *Engine) workerForStepRequest(ctx context.Context, msg stepRequest) { // like this one will get picked up again and will be reprocessed. select { case <-ctx.Done(): - e.logger.Errorf("context canceled before step update could be issued", err, "executionID", msg.state.executionID, "stepRef", msg.stepRef) + l.Errorf("context canceled before step update could be issued", err) case e.stepUpdateCh <- *stepState: } } // executeStep executes the referenced capability within a step and returns the result. -func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map, values.Value, error) { +func (e *Engine) executeStep(ctx context.Context, l logger.Logger, msg stepRequest) (*values.Map, values.Value, error) { step, err := e.workflow.Vertex(msg.stepRef) if err != nil { return nil, nil, err @@ -495,18 +572,12 @@ func (e *Engine) executeStep(ctx context.Context, msg stepRequest) (*values.Map, }, } - resp, err := capabilities.ExecuteSync(ctx, step.capability, tr) + output, err := step.executionStrategy.Apply(ctx, l, step.capability, tr) if err != nil { return inputs, nil, err } - // `ExecuteSync` returns a `values.List` even if there was - // just one return value. If that is the case, let's unwrap the - // single value to make it easier to use in -- for example -- variable interpolation. - if len(resp.Underlying) > 1 { - return inputs, resp, err - } - return inputs, resp.Underlying[0], err + return inputs, output, err } func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability) error { @@ -595,6 +666,8 @@ type Config struct { MaxWorkerLimit int QueueSize int NewWorkerTimeout time.Duration + DONInfo *capabilities.DON + PeerID func() *p2ptypes.PeerID } const ( @@ -638,9 +711,13 @@ func NewEngine(cfg Config) (engine *Engine, err error) { } engine = &Engine{ - logger: cfg.Lggr.Named("WorkflowEngine"), - registry: cfg.Registry, - workflow: workflow, + logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), + registry: cfg.Registry, + workflow: workflow, + donInfo: donInfo{ + DON: cfg.DONInfo, + PeerID: cfg.PeerID, + }, executionStates: newInMemoryStore(), pendingStepRequests: make(chan stepRequest, cfg.QueueSize), newWorkerCh: newWorkerCh, diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index 448ff13ec79..d82c9d4b7d2 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -14,6 +14,7 @@ import ( coreCap "github.com/smartcontractkit/chainlink/v2/core/capabilities" "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) const hardcodedWorkflow = ` @@ -148,10 +149,13 @@ func TestEngineWithHardcodedWorkflow(t *testing.T) { require.NoError(t, reg.Add(ctx, target2)) lggr := logger.TestLogger(t) + peerID := p2ptypes.PeerID{} cfg := Config{ Lggr: lggr, Registry: reg, Spec: hardcodedWorkflow, + DONInfo: nil, + PeerID: func() *p2ptypes.PeerID { return &peerID }, } eng, err := NewEngine(cfg) require.NoError(t, err) @@ -308,10 +312,13 @@ func TestEngine_ErrorsTheWorkflowIfAStepErrors(t *testing.T) { require.NoError(t, reg.Add(ctx, mockFailingConsensus())) require.NoError(t, reg.Add(ctx, mockTarget())) + peerID := p2ptypes.PeerID{} cfg := Config{ Lggr: logger.TestLogger(t), Registry: reg, Spec: simpleWorkflow, + DONInfo: nil, + PeerID: func() *p2ptypes.PeerID { return &peerID }, } eng, err := NewEngine(cfg) require.NoError(t, err) @@ -413,10 +420,13 @@ func TestEngine_MultiStepDependencies(t *testing.T) { action, out := mockAction() require.NoError(t, reg.Add(ctx, action)) + peerID := p2ptypes.PeerID{} cfg := Config{ Lggr: logger.TestLogger(t), Registry: reg, Spec: multiStepWorkflow, + DONInfo: nil, + PeerID: func() *p2ptypes.PeerID { return &peerID }, } eng, err := NewEngine(cfg) require.NoError(t, err) diff --git a/core/services/workflows/execution_strategy.go b/core/services/workflows/execution_strategy.go new file mode 100644 index 00000000000..f5da8bca4be --- /dev/null +++ b/core/services/workflows/execution_strategy.go @@ -0,0 +1,171 @@ +package workflows + +import ( + "context" + "fmt" + "time" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + + "github.com/smartcontractkit/libocr/permutation" + + "golang.org/x/crypto/sha3" +) + +type executionStrategy interface { + Apply(ctx context.Context, l logger.Logger, cap capabilities.CallbackCapability, req capabilities.CapabilityRequest) (values.Value, error) +} + +var _ executionStrategy = immediateExecution{} + +type immediateExecution struct{} + +func (i immediateExecution) Apply(ctx context.Context, lggr logger.Logger, cap capabilities.CallbackCapability, req capabilities.CapabilityRequest) (values.Value, error) { + l, err := capabilities.ExecuteSync(ctx, cap, req) + if err != nil { + return nil, err + } + + // `ExecuteSync` returns a `values.List` even if there was + // just one return value. If that is the case, let's unwrap the + // single value to make it easier to use in -- for example -- variable interpolation. + if len(l.Underlying) > 1 { + return l, nil + } + + return l.Underlying[0], nil +} + +var _ executionStrategy = scheduledExecution{} + +type scheduledExecution struct { + DON *capabilities.DON + PeerID *p2ptypes.PeerID + Position int +} + +var ( + // S = [N] + Schedule_AllAtOnce = "allAtOnce" + // S = [1 * N] + Schedule_OneAtATime = "oneAtATime" +) + +// scheduledExecution generates a pseudo-random transmission schedule, +// and delays execution until a node is required to transmit. +func (d scheduledExecution) Apply(ctx context.Context, lggr logger.Logger, cap capabilities.CallbackCapability, req capabilities.CapabilityRequest) (values.Value, error) { + tc, err := d.transmissionConfig(req.Config) + if err != nil { + return nil, err + } + + info, err := cap.Info(ctx) + if err != nil { + return nil, err + } + + switch { + // Case 1: Local DON + case info.DON == nil: + n := len(d.DON.Members) + key := d.key(d.DON.Config.SharedSecret, req.Metadata.WorkflowID, req.Metadata.WorkflowExecutionID) + sched, err := schedule(tc.Schedule, n) + if err != nil { + return nil, err + } + + picked := permutation.Permutation(n, key) + delay := d.delayFor(d.Position, sched, picked, tc.DeltaStage) + if delay == nil { + lggr.Debugw("skipping transmission: node is not included in schedule") + return nil, nil + } + + lggr.Debugf("execution delayed by %+v", *delay) + select { + case <-ctx.Done(): + return nil, ctx.Err() + case <-time.After(*delay): + lggr.Debugw("executing delayed execution") + return immediateExecution{}.Apply(ctx, lggr, cap, req) + } + // Case 2: Remote DON + default: + // TODO: fill in the remote DON case once consensus has been reach on what to do. + lggr.Debugw("remote DON transmission not implemented: using immediate execution") + return immediateExecution{}.Apply(ctx, lggr, cap, req) + } +} + +// `key` uses a shared secret, combined with a workflowID and a workflowExecutionID to generate +// a secret that can later be used to pseudo-randomly determine a schedule for a set of nodes in a DON. +// The addition of the workflowExecutionID -- which nodes don't know ahead of time -- additionally guarantees +// that a malicious coalition of nodes can't "game" the schedule. +// IMPORTANT: changing this function should happen carefully to maintain the guarantee that all nodes +// arrive at the same secret. +func (d scheduledExecution) key(sharedSecret [16]byte, workflowID, workflowExecutionID string) [16]byte { + hash := sha3.NewLegacyKeccak256() + hash.Write(sharedSecret[:]) + hash.Write([]byte(workflowID)) + hash.Write([]byte(workflowExecutionID)) + + var key [16]byte + copy(key[:], hash.Sum(nil)) + return key +} + +type transmissionConfig struct { + Schedule string + DeltaStage time.Duration +} + +func (d scheduledExecution) transmissionConfig(config *values.Map) (transmissionConfig, error) { + var tc struct { + DeltaStage string + Schedule string + } + err := config.UnwrapTo(&tc) + if err != nil { + return transmissionConfig{}, err + } + + duration, err := time.ParseDuration(tc.DeltaStage) + if err != nil { + return transmissionConfig{}, fmt.Errorf("failed to parse DeltaStage %s as duration: %w", tc.DeltaStage, err) + } + + return transmissionConfig{ + Schedule: tc.Schedule, + DeltaStage: duration, + }, nil +} + +func (d scheduledExecution) delayFor(position int, schedule []int, permutation []int, deltaStage time.Duration) *time.Duration { + sum := 0 + for i, s := range schedule { + sum += s + if permutation[position] < sum { + result := time.Duration(i) * deltaStage + return &result + } + } + + return nil +} + +func schedule(sched string, N int) ([]int, error) { + switch sched { + case Schedule_AllAtOnce: + return []int{N}, nil + case Schedule_OneAtATime: + sch := []int{} + for i := 0; i < N; i++ { + sch = append(sch, 1) + } + return sch, nil + } + return nil, fmt.Errorf("unknown schedule %s", sched) +} diff --git a/core/services/workflows/execution_strategy_test.go b/core/services/workflows/execution_strategy_test.go new file mode 100644 index 00000000000..bdf782c87b9 --- /dev/null +++ b/core/services/workflows/execution_strategy_test.go @@ -0,0 +1,170 @@ +package workflows + +import ( + "crypto/rand" + "encoding/hex" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/utils/tests" + "github.com/smartcontractkit/chainlink-common/pkg/values" + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +func assertBetween(t *testing.T, got time.Duration, low time.Duration, high time.Duration) { + assert.GreaterOrEqual(t, got, low) + assert.LessOrEqual(t, got, high) +} + +func TestScheduledExecutionStrategy_LocalDON(t *testing.T) { + var gotTime time.Time + var called bool + + // Our capability has DONInfo == nil, so we'll treat it as a local + // capability and use the local DON Info to determine the transmission + // schedule. + mt := newMockCapability( + capabilities.MustNewCapabilityInfo( + "write_polygon-testnet-mumbai", + capabilities.CapabilityTypeTarget, + "a write capability targeting polygon mumbai testnet", + "v1.0.0", + nil, + ), + func(req capabilities.CapabilityRequest) (capabilities.CapabilityResponse, error) { + gotTime = time.Now() + called = true + return capabilities.CapabilityResponse{}, nil + }, + ) + + l := logger.TestLogger(t) + + // The combination of this key and the metadata above + // will yield the permutation [3, 2, 0, 1] + key, err := hex.DecodeString("fb13ca015a9ec60089c7141e9522de79") + require.NoError(t, err) + + testCases := []struct { + name string + position int + schedule string + low time.Duration + high time.Duration + }{ + { + name: "position 0; oneAtATime", + position: 0, + schedule: "oneAtATime", + low: 300 * time.Millisecond, + high: 400 * time.Millisecond, + }, + { + name: "position 1; oneAtATime", + position: 1, + schedule: "oneAtATime", + low: 200 * time.Millisecond, + high: 300 * time.Millisecond, + }, + { + name: "position 2; oneAtATime", + position: 2, + schedule: "oneAtATime", + low: 0 * time.Millisecond, + high: 100 * time.Millisecond, + }, + { + name: "position 3; oneAtATime", + position: 3, + schedule: "oneAtATime", + low: 100 * time.Millisecond, + high: 200 * time.Millisecond, + }, + { + name: "position 0; allAtOnce", + position: 0, + schedule: "allAtOnce", + low: 0 * time.Millisecond, + high: 100 * time.Millisecond, + }, + { + name: "position 1; allAtOnce", + position: 1, + schedule: "allAtOnce", + low: 0 * time.Millisecond, + high: 100 * time.Millisecond, + }, + { + name: "position 2; allAtOnce", + position: 2, + schedule: "allAtOnce", + low: 0 * time.Millisecond, + high: 100 * time.Millisecond, + }, + { + name: "position 3; allAtOnce", + position: 3, + schedule: "allAtOnce", + low: 0 * time.Millisecond, + high: 100 * time.Millisecond, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + startTime := time.Now() + + m, err := values.NewMap(map[string]any{ + "schedule": tc.schedule, + "deltaStage": "100ms", + }) + require.NoError(t, err) + + req := capabilities.CapabilityRequest{ + Config: m, + Metadata: capabilities.RequestMetadata{ + WorkflowID: "mock-workflow-id", + WorkflowExecutionID: "mock-execution-id", + }, + } + + ids := []p2ptypes.PeerID{ + randKey(), + randKey(), + randKey(), + randKey(), + } + don := &capabilities.DON{ + Members: ids, + Config: capabilities.DONConfig{ + SharedSecret: [16]byte(key), + }, + } + peerID := ids[tc.position] + de := scheduledExecution{ + DON: don, + PeerID: &peerID, + Position: tc.position, + } + _, err = de.Apply(tests.Context(t), l, mt, req) + require.NoError(t, err) + require.True(t, called) + + assertBetween(t, gotTime.Sub(startTime), tc.low, tc.high) + }) + } +} + +func randKey() [32]byte { + key := make([]byte, 32) + _, err := rand.Read(key) + if err != nil { + panic(err) + } + return [32]byte(key) +} diff --git a/core/services/workflows/models.go b/core/services/workflows/models.go index e6c92a641e4..92abf36d2c0 100644 --- a/core/services/workflows/models.go +++ b/core/services/workflows/models.go @@ -19,10 +19,13 @@ type stepRequest struct { // // Within the workflow spec, they are called "Capability Properties". type stepDefinition struct { + // TODO: Rename this, type here refers to the capability ID, not its type. Type string `json:"type" jsonschema:"required"` Ref string `json:"ref,omitempty" jsonschema:"pattern=^[a-z0-9_]+$"` Inputs map[string]any `json:"inputs,omitempty"` Config map[string]any `json:"config" jsonschema:"required"` + + CapabilityType capabilities.CapabilityType `json:"-"` } // workflowSpec is the parsed representation of a workflow. @@ -106,9 +109,10 @@ func (w *workflow) dependents(start string) ([]*step, error) { // step wraps a stepDefinition with additional context for dependencies and execution type step struct { stepDefinition - dependencies []string - capability capabilities.CallbackExecutable - config *values.Map + dependencies []string + capability capabilities.CallbackCapability + config *values.Map + executionStrategy executionStrategy } type triggerCapability struct { diff --git a/core/services/workflows/models_yaml.go b/core/services/workflows/models_yaml.go index 396811c3729..3d65e24a814 100644 --- a/core/services/workflows/models_yaml.go +++ b/core/services/workflows/models_yaml.go @@ -10,6 +10,8 @@ import ( "github.com/invopop/jsonschema" "github.com/shopspring/decimal" "sigs.k8s.io/yaml" + + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" ) func GenerateJsonSchema() ([]byte, error) { @@ -47,22 +49,30 @@ type workflowSpecYaml struct { func (w workflowSpecYaml) toWorkflowSpec() workflowSpec { triggers := make([]stepDefinition, 0, len(w.Triggers)) for _, t := range w.Triggers { - triggers = append(triggers, t.toStepDefinition()) + sd := t.toStepDefinition() + sd.CapabilityType = capabilities.CapabilityTypeTrigger + triggers = append(triggers, sd) } actions := make([]stepDefinition, 0, len(w.Actions)) for _, a := range w.Actions { - actions = append(actions, a.toStepDefinition()) + sd := a.toStepDefinition() + sd.CapabilityType = capabilities.CapabilityTypeAction + actions = append(actions, sd) } consensus := make([]stepDefinition, 0, len(w.Consensus)) for _, c := range w.Consensus { - consensus = append(consensus, c.toStepDefinition()) + sd := c.toStepDefinition() + sd.CapabilityType = capabilities.CapabilityTypeConsensus + consensus = append(consensus, sd) } targets := make([]stepDefinition, 0, len(w.Targets)) for _, t := range w.Targets { - targets = append(targets, t.toStepDefinition()) + sd := t.toStepDefinition() + sd.CapabilityType = capabilities.CapabilityTypeTarget + targets = append(targets, sd) } return workflowSpec{ diff --git a/core/services/workflows/testdata/fixtures/workflows/marshalling/workflow_2_spec.json b/core/services/workflows/testdata/fixtures/workflows/marshalling/workflow_2_spec.json index dfa13449a48..f4024e24267 100644 --- a/core/services/workflows/testdata/fixtures/workflows/marshalling/workflow_2_spec.json +++ b/core/services/workflows/testdata/fixtures/workflows/marshalling/workflow_2_spec.json @@ -28,4 +28,4 @@ "config": {} } ] -} \ No newline at end of file +}