diff --git a/core/capabilities/reader.go b/core/capabilities/reader.go index 1c86cb1e28b..0e664a73947 100644 --- a/core/capabilities/reader.go +++ b/core/capabilities/reader.go @@ -3,7 +3,11 @@ package capabilities import ( "context" "encoding/json" + "errors" + "fmt" + "github.com/smartcontractkit/chainlink-common/pkg/capabilities" + "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/types" kcr "github.com/smartcontractkit/chainlink/v2/core/gethwrappers/keystone/generated/keystone_capability_registry" p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" @@ -11,7 +15,9 @@ import ( ) type remoteRegistryReader struct { - r types.ContractReader + r types.ContractReader + peerWrapper p2ptypes.PeerWrapper + lggr logger.Logger } var _ reader = (*remoteRegistryReader)(nil) @@ -25,6 +31,43 @@ type state struct { IDsToCapabilities map[hashedCapabilityID]kcr.CapabilityRegistryCapability } +func (r *remoteRegistryReader) LocalNode(ctx context.Context) (capabilities.Node, error) { + if r.peerWrapper.GetPeer() == nil { + return capabilities.Node{}, errors.New("unable to get peer: peerWrapper hasn't started yet") + } + + pid := r.peerWrapper.GetPeer().ID() + + readerState, err := r.state(ctx) + if err != nil { + return capabilities.Node{}, fmt.Errorf("failed to get state from registry to determine don ownership: %w", err) + } + + var workflowDON capabilities.DON + capabilityDONs := []capabilities.DON{} + for _, d := range readerState.IDsToDONs { + for _, p := range d.NodeP2PIds { + if p == pid { + if d.AcceptsWorkflows { + if workflowDON.ID == "" { + workflowDON = *toDONInfo(d) + } else { + r.lggr.Errorf("Configuration error: node %s belongs to more than one workflowDON", pid) + } + } + + capabilityDONs = append(capabilityDONs, *toDONInfo(d)) + } + } + } + + return capabilities.Node{ + PeerID: &pid, + WorkflowDON: workflowDON, + CapabilityDONs: capabilityDONs, + }, nil +} + func (r *remoteRegistryReader) state(ctx context.Context) (state, error) { dons := []kcr.CapabilityRegistryDONInfo{} err := r.r.GetLatestValue(ctx, "capabilityRegistry", "getDONs", nil, &dons) @@ -66,7 +109,7 @@ type contractReaderFactory interface { NewContractReader(context.Context, []byte) (types.ContractReader, error) } -func newRemoteRegistryReader(ctx context.Context, relayer contractReaderFactory, remoteRegistryAddress string) (*remoteRegistryReader, error) { +func newRemoteRegistryReader(ctx context.Context, lggr logger.Logger, peerWrapper p2ptypes.PeerWrapper, relayer contractReaderFactory, remoteRegistryAddress string) (*remoteRegistryReader, error) { contractReaderConfig := evmrelaytypes.ChainReaderConfig{ Contracts: map[string]evmrelaytypes.ChainContractReader{ "capabilityRegistry": { @@ -106,5 +149,9 @@ func newRemoteRegistryReader(ctx context.Context, relayer contractReaderFactory, return nil, err } - return &remoteRegistryReader{r: cr}, err + return &remoteRegistryReader{ + r: cr, + peerWrapper: peerWrapper, + lggr: lggr, + }, err } diff --git a/core/capabilities/reader_test.go b/core/capabilities/reader_test.go index 3407ec43a43..395a08c1e19 100644 --- a/core/capabilities/reader_test.go +++ b/core/capabilities/reader_test.go @@ -17,6 +17,7 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" evmclient "github.com/smartcontractkit/chainlink/v2/core/chains/evm/client" "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller" @@ -110,6 +111,24 @@ func randomWord() [32]byte { return [32]byte(word) } +type mockWrapper struct { + services.Service + peer p2ptypes.Peer +} + +func (m mockWrapper) GetPeer() p2ptypes.Peer { + return m.peer +} + +type mockPeer struct { + p2ptypes.Peer + peerID p2ptypes.PeerID +} + +func (m mockPeer) ID() p2ptypes.PeerID { + return m.peerID +} + func TestReader_Integration(t *testing.T) { ctx := testutils.Context(t) reg, regAddress, owner, sim := startNewChainWithRegistry(t) @@ -180,7 +199,12 @@ func TestReader_Integration(t *testing.T) { require.NoError(t, err) factory := newContractReaderFactory(t, sim) - reader, err := newRemoteRegistryReader(ctx, factory, regAddress.Hex()) + pw := mockWrapper{ + peer: mockPeer{ + peerID: nodeSet[0], + }, + } + reader, err := newRemoteRegistryReader(ctx, logger.TestLogger(t), pw, factory, regAddress.Hex()) require.NoError(t, err) s, err := reader.state(ctx) @@ -207,4 +231,10 @@ func TestReader_Integration(t *testing.T) { nodeSet[1]: nodes[1], nodeSet[2]: nodes[2], }, s.IDsToNodes) + + node, err := reader.LocalNode(ctx) + require.NoError(t, err) + + assert.Equal(t, p2ptypes.PeerID(nodeSet[0]), *node.PeerID) + assert.Equal(t, fmt.Sprint(1), node.WorkflowDON.ID) } diff --git a/core/capabilities/syncer.go b/core/capabilities/syncer.go index ca55fe1ba8d..50dece12c01 100644 --- a/core/capabilities/syncer.go +++ b/core/capabilities/syncer.go @@ -32,6 +32,7 @@ import ( type reader interface { state(ctx context.Context) (state, error) + LocalNode(ctx context.Context) (capabilities.Node, error) } type registrySyncer struct { @@ -41,7 +42,7 @@ type registrySyncer struct { stopCh services.StopChan subServices []services.Service networkSetup HardcodedDonNetworkSetup - reader reader + reader wg sync.WaitGroup lggr logger.Logger @@ -79,7 +80,7 @@ func NewRegistrySyncer( ) (*registrySyncer, error) { stopCh := make(services.StopChan) ctx, _ := stopCh.NewCtx() - reader, err := newRemoteRegistryReader(ctx, relayer, registryAddress) + reader, err := newRemoteRegistryReader(ctx, lggr, peerWrapper, relayer, registryAddress) if err != nil { return nil, err } @@ -116,8 +117,8 @@ func newRegistrySyncer( } func (s *registrySyncer) Start(ctx context.Context) error { - // NOTE: Decrease wg.Add and uncomment line 124 below - // this for a hardcoded syncer + // NOTE: Decrease wg.Add and uncomment the line below + // `go s.launch()` to enable the hardcoded syncer. s.wg.Add(1) // go s.launch() go s.syncLoop() diff --git a/core/capabilities/syncer_test.go b/core/capabilities/syncer_test.go index c0eaa26a5b8..f3050167ecf 100644 --- a/core/capabilities/syncer_test.go +++ b/core/capabilities/syncer_test.go @@ -55,6 +55,10 @@ func (m mockReader) state(ctx context.Context) (state, error) { return m.s, m.err } +func (m mockReader) LocalNode(ctx context.Context) (capabilities.Node, error) { + return capabilities.Node{}, nil +} + type mockTrigger struct { capabilities.CapabilityInfo } diff --git a/core/capabilities/transmission/local_target_capability.go b/core/capabilities/transmission/local_target_capability.go index 23a9e8f0bf8..c040e6ed432 100644 --- a/core/capabilities/transmission/local_target_capability.go +++ b/core/capabilities/transmission/local_target_capability.go @@ -7,7 +7,6 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink/v2/core/logger" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" ) // LocalTargetCapability handles the transmission protocol required for a target capability that exists in the same don as @@ -15,31 +14,34 @@ import ( type LocalTargetCapability struct { lggr logger.Logger capabilities.TargetCapability - peerID p2ptypes.PeerID - don capabilities.DON + localNode capabilities.Node } -func NewLocalTargetCapability(lggr logger.Logger, peerID p2ptypes.PeerID, don capabilities.DON, underlying capabilities.TargetCapability) *LocalTargetCapability { +func NewLocalTargetCapability(lggr logger.Logger, localDON capabilities.Node, underlying capabilities.TargetCapability) *LocalTargetCapability { return &LocalTargetCapability{ TargetCapability: underlying, lggr: lggr, - peerID: peerID, - don: don, + localNode: localDON, } } func (l *LocalTargetCapability) Execute(ctx context.Context, req capabilities.CapabilityRequest) (<-chan capabilities.CapabilityResponse, error) { + if l.localNode.PeerID == nil || l.localNode.WorkflowDON.ID == "" { + l.lggr.Debugf("empty DON info, executing immediately") + return l.TargetCapability.Execute(ctx, req) + } + if req.Config == nil || req.Config.Underlying["schedule"] == nil { l.lggr.Debug("no schedule found, executing immediately") return l.TargetCapability.Execute(ctx, req) } - peerIDToTransmissionDelay, err := GetPeerIDToTransmissionDelay(l.don.Members, req) + peerIDToTransmissionDelay, err := GetPeerIDToTransmissionDelay(l.localNode.WorkflowDON.Members, req) if err != nil { return nil, fmt.Errorf("failed to get peer ID to transmission delay map: %w", err) } - delay, existsForPeerID := peerIDToTransmissionDelay[l.peerID] + delay, existsForPeerID := peerIDToTransmissionDelay[*l.localNode.PeerID] if !existsForPeerID { return nil, nil } diff --git a/core/capabilities/transmission/local_target_capability_test.go b/core/capabilities/transmission/local_target_capability_test.go index ef3e6ce5832..ded4b5c30f1 100644 --- a/core/capabilities/transmission/local_target_capability_test.go +++ b/core/capabilities/transmission/local_target_capability_test.go @@ -132,11 +132,14 @@ func TestScheduledExecutionStrategy_LocalDON(t *testing.T) { randKey(), randKey(), } - don := capabilities.DON{ - Members: ids, + localDON := capabilities.Node{ + WorkflowDON: capabilities.DON{ + ID: "1", + Members: ids, + }, + PeerID: &ids[tc.position], } - peerID := ids[tc.position] - localTargetCapability := NewLocalTargetCapability(log, peerID, don, mt) + localTargetCapability := NewLocalTargetCapability(log, localDON, mt) _, err = localTargetCapability.Execute(tests.Context(t), req) diff --git a/core/scripts/go.mod b/core/scripts/go.mod index 9f276e94a36..a4c4744da88 100644 --- a/core/scripts/go.mod +++ b/core/scripts/go.mod @@ -24,7 +24,7 @@ require ( github.com/prometheus/client_golang v1.17.0 github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458 github.com/smartcontractkit/chainlink-vrf v0.0.0-20240222010609-cd67d123c772 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c diff --git a/core/scripts/go.sum b/core/scripts/go.sum index 6d03530dd78..4c1230e1f41 100644 --- a/core/scripts/go.sum +++ b/core/scripts/go.sum @@ -1212,8 +1212,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb h1:R4OkRLPz6mZm8k7JFfLpQ9Ib/e1n1qcxg+hVxc0pKOk= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458 h1:+7LQmbMNaLXej+0ajbTxUfTt4w/ILODpmrOETQ5rTCI= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index 2d5eac01670..c885a3b2104 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -17,6 +17,7 @@ import ( "go.uber.org/multierr" "go.uber.org/zap/zapcore" + pkgcapabilities "github.com/smartcontractkit/chainlink-common/pkg/capabilities" "github.com/smartcontractkit/chainlink-common/pkg/loop" commonservices "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/sqlutil" @@ -205,6 +206,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { } var externalPeerWrapper p2ptypes.PeerWrapper + var getLocalNode func(ctx context.Context) (pkgcapabilities.Node, error) if cfg.Capabilities().Peering().Enabled() { externalPeer := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), opts.DS, globalLogger) signer := externalPeer @@ -239,6 +241,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { return nil, fmt.Errorf("could not configure syncer: %w", err) } + getLocalNode = registrySyncer.LocalNode srvcs = append(srvcs, dispatcher, registrySyncer) } @@ -430,14 +433,7 @@ func NewApplication(opts ApplicationOpts) (Application, error) { globalLogger, opts.CapabilitiesRegistry, workflowORM, - func() *p2ptypes.PeerID { - if externalPeerWrapper == nil { - return nil - } - - peerID := externalPeerWrapper.GetPeer().ID() - return &peerID - }, + getLocalNode, ) // Flux monitor requires ethereum just to boot, silence errors with a null delegate diff --git a/core/services/workflows/delegate.go b/core/services/workflows/delegate.go index b9be4c5677f..b937e0b580c 100644 --- a/core/services/workflows/delegate.go +++ b/core/services/workflows/delegate.go @@ -11,15 +11,14 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/types/core" "github.com/smartcontractkit/chainlink/v2/core/logger" "github.com/smartcontractkit/chainlink/v2/core/services/job" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) type Delegate struct { - registry core.CapabilitiesRegistry - logger logger.Logger - peerID func() *p2ptypes.PeerID - store store.Store + registry core.CapabilitiesRegistry + logger logger.Logger + getLocalNode func(ctx context.Context) (capabilities.Node, error) + store store.Store } var _ job.Delegate = (*Delegate)(nil) @@ -38,11 +37,6 @@ func (d *Delegate) OnDeleteJob(context.Context, job.Job) error { return nil } // ServicesForSpec satisfies the job.Delegate interface. func (d *Delegate) ServicesForSpec(_ context.Context, spec job.Job) ([]job.ServiceCtx, error) { - dinfo, err := initializeDONInfo() - if err != nil { - d.logger.Errorw("could not add initialize don info", err) - } - cfg := Config{ Lggr: d.logger, Spec: spec.WorkflowSpec.Workflow, @@ -50,8 +44,7 @@ func (d *Delegate) ServicesForSpec(_ context.Context, spec job.Job) ([]job.Servi WorkflowOwner: spec.WorkflowSpec.WorkflowOwner, WorkflowName: spec.WorkflowSpec.WorkflowName, Registry: d.registry, - DONInfo: dinfo, - PeerID: d.peerID, + GetLocalNode: d.getLocalNode, Store: d.store, } engine, err := NewEngine(cfg) @@ -61,36 +54,13 @@ func (d *Delegate) ServicesForSpec(_ context.Context, spec job.Job) ([]job.Servi return []job.ServiceCtx{engine}, nil } -func initializeDONInfo() (*capabilities.DON, error) { - p2pStrings := []string{ - "12D3KooWBCF1XT5Wi8FzfgNCqRL76Swv8TRU3TiD4QiJm8NMNX7N", - "12D3KooWG1AyvwmCpZ93J8pBQUE1SuzrjDXnT4BeouncHR3jWLCG", - "12D3KooWGeUKZBRMbx27FUTgBwZa9Ap9Ym92mywwpuqkEtz8XWyv", - "12D3KooW9zYWQv3STmDeNDidyzxsJSTxoCTLicafgfeEz9nhwhC4", - "12D3KooWG1AeBnSJH2mdcDusXQVye2jqodZ6pftTH98HH6xvrE97", - "12D3KooWBf3PrkhNoPEmp7iV291YnPuuTsgEDHTscLajxoDvwHGA", - "12D3KooWP3FrMTFXXRU2tBC8aYvEBgUX6qhcH9q2JZCUi9Wvc2GX", - } - - p2pIDs := []p2ptypes.PeerID{} - for _, p := range p2pStrings { - pid := p2ptypes.PeerID{} - err := pid.UnmarshalText([]byte(p)) - if err != nil { - return nil, err - } - - p2pIDs = append(p2pIDs, pid) - } - - return &capabilities.DON{ - ID: "00010203", - Members: p2pIDs, - }, nil -} - -func NewDelegate(logger logger.Logger, registry core.CapabilitiesRegistry, store store.Store, peerID func() *p2ptypes.PeerID) *Delegate { - return &Delegate{logger: logger, registry: registry, store: store, peerID: peerID} +func NewDelegate( + logger logger.Logger, + registry core.CapabilitiesRegistry, + store store.Store, + getLocalNode func(ctx context.Context) (capabilities.Node, error), +) *Delegate { + return &Delegate{logger: logger, registry: registry, store: store, getLocalNode: getLocalNode} } func ValidatedWorkflowJobSpec(tomlString string) (job.Job, error) { diff --git a/core/services/workflows/engine.go b/core/services/workflows/engine.go index 67746b12d17..28f37cd7c43 100644 --- a/core/services/workflows/engine.go +++ b/core/services/workflows/engine.go @@ -18,15 +18,9 @@ import ( "github.com/smartcontractkit/chainlink-common/pkg/workflows" "github.com/smartcontractkit/chainlink/v2/core/capabilities/transmission" "github.com/smartcontractkit/chainlink/v2/core/logger" - p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" "github.com/smartcontractkit/chainlink/v2/core/services/workflows/store" ) -type donInfo struct { - *capabilities.DON - PeerID func() *p2ptypes.PeerID -} - type stepRequest struct { stepRef string state store.WorkflowExecution @@ -38,7 +32,8 @@ type Engine struct { logger logger.Logger registry core.CapabilitiesRegistry workflow *workflow - donInfo donInfo + getLocalNode func(ctx context.Context) (capabilities.Node, error) + localNode capabilities.Node executionStates store.Store pendingStepRequests chan stepRequest triggerEvents chan capabilities.CapabilityResponse @@ -147,8 +142,7 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error { e.logger.Debugf("wrapping capability %s in local transmission protocol", info.ID) cp = transmission.NewLocalTargetCapability( e.logger, - *e.donInfo.PeerID(), - *e.donInfo.DON, + e.localNode, cp.(capabilities.TargetCapability), ) } @@ -186,16 +180,25 @@ func (e *Engine) initializeCapability(ctx context.Context, step *step) error { // init does the following: // -// 1. Resolves the underlying capability for each trigger -// 2. Registers each step's capability to this workflow -// 3. Registers for trigger events now that all capabilities are resolved +// 1. Resolves the LocalDON information +// 2. Resolves the underlying capability for each trigger +// 3. Registers each step's capability to this workflow +// 4. Registers for trigger events now that all capabilities are resolved // -// Steps 1 and 2 are retried every 5 seconds until successful. +// Steps 1-3 are retried every 5 seconds until successful. func (e *Engine) init(ctx context.Context) { defer e.wg.Done() retryErr := retryable(ctx, e.logger, e.retryMs, e.maxRetries, func() error { - err := e.resolveWorkflowCapabilities(ctx) + // first wait for localDON to return a non-error response; this depends + // on the underlying peerWrapper returning the PeerID. + node, err := e.getLocalNode(ctx) + if err != nil { + return fmt.Errorf("failed to get donInfo: %w", err) + } + e.localNode = node + + err = e.resolveWorkflowCapabilities(ctx) if err != nil { return fmt.Errorf("failed to resolve workflow: %s", err) } @@ -300,7 +303,7 @@ func (e *Engine) registerTrigger(ctx context.Context, t *triggerCapability, trig triggerRegRequest := capabilities.CapabilityRequest{ Metadata: capabilities.RequestMetadata{ WorkflowID: e.workflow.id, - WorkflowDonID: e.donInfo.ID, + WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowName: e.workflow.name, WorkflowOwner: e.workflow.owner, }, @@ -639,7 +642,7 @@ func (e *Engine) executeStep(ctx context.Context, l logger.Logger, msg stepReque WorkflowExecutionID: msg.state.ExecutionID, WorkflowOwner: e.workflow.owner, WorkflowName: e.workflow.name, - WorkflowDonID: e.donInfo.ID, + WorkflowDonID: e.localNode.WorkflowDON.ID, }, } @@ -663,7 +666,7 @@ func (e *Engine) deregisterTrigger(ctx context.Context, t *triggerCapability, tr deregRequest := capabilities.CapabilityRequest{ Metadata: capabilities.RequestMetadata{ WorkflowID: e.workflow.id, - WorkflowDonID: e.donInfo.ID, + WorkflowDonID: e.localNode.WorkflowDON.ID, WorkflowName: e.workflow.name, WorkflowOwner: e.workflow.owner, }, @@ -744,8 +747,7 @@ type Config struct { QueueSize int NewWorkerTimeout time.Duration MaxExecutionDuration time.Duration - DONInfo *capabilities.DON - PeerID func() *p2ptypes.PeerID + GetLocalNode func(ctx context.Context) (capabilities.Node, error) Store store.Store // For testing purposes only @@ -784,6 +786,12 @@ func NewEngine(cfg Config) (engine *Engine, err error) { cfg.MaxExecutionDuration = defaultMaxExecutionDuration } + if cfg.GetLocalNode == nil { + cfg.GetLocalNode = func(ctx context.Context) (capabilities.Node, error) { + return capabilities.Node{}, nil + } + } + if cfg.retryMs == 0 { cfg.retryMs = 5000 } @@ -824,13 +832,10 @@ func NewEngine(cfg Config) (engine *Engine, err error) { } engine = &Engine{ - logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), - registry: cfg.Registry, - workflow: workflow, - donInfo: donInfo{ - DON: cfg.DONInfo, - PeerID: cfg.PeerID, - }, + logger: cfg.Lggr.Named("WorkflowEngine").With("workflowID", cfg.WorkflowID), + registry: cfg.Registry, + workflow: workflow, + getLocalNode: cfg.GetLocalNode, executionStates: cfg.Store, pendingStepRequests: make(chan stepRequest, cfg.QueueSize), newWorkerCh: newWorkerCh, diff --git a/core/services/workflows/engine_test.go b/core/services/workflows/engine_test.go index dff7662cd15..09af6d63580 100644 --- a/core/services/workflows/engine_test.go +++ b/core/services/workflows/engine_test.go @@ -89,10 +89,14 @@ func newTestEngine(t *testing.T, reg *coreCap.Registry, spec string, opts ...fun Lggr: logger.TestLogger(t), Registry: reg, Spec: spec, - DONInfo: &capabilities.DON{ - ID: "00010203", + GetLocalNode: func(ctx context.Context) (capabilities.Node, error) { + return capabilities.Node{ + WorkflowDON: capabilities.DON{ + ID: "00010203", + }, + PeerID: &peerID, + }, nil }, - PeerID: func() *p2ptypes.PeerID { return &peerID }, maxRetries: 1, retryMs: 100, afterInit: func(success bool) { @@ -737,3 +741,53 @@ func TestEngine_WrapsTargets(t *testing.T) { }) require.NoError(t, err) } + +func TestEngine_GetsNodeInfoDuringInitialization(t *testing.T) { + t.Parallel() + ctx := testutils.Context(t) + reg := coreCap.NewRegistry(logger.TestLogger(t)) + + trigger, _ := mockTrigger(t) + + require.NoError(t, reg.Add(ctx, trigger)) + require.NoError(t, reg.Add(ctx, mockConsensus())) + require.NoError(t, reg.Add(ctx, mockTarget())) + + clock := clockwork.NewFakeClock() + dbstore := store.NewDBStore(pgtest.NewSqlxDB(t), clock) + + var peerID p2ptypes.PeerID + node := capabilities.Node{ + PeerID: &peerID, + WorkflowDON: capabilities.DON{ + ID: "1", + }, + } + retryCount := 0 + eng, hooks := newTestEngine( + t, + reg, + delayedWorkflow, + func(c *Config) { + c.Store = dbstore + c.clock = clock + c.maxRetries = 2 + c.retryMs = 0 + c.GetLocalNode = func(ctx context.Context) (capabilities.Node, error) { + n := capabilities.Node{} + err := errors.New("peer not initialized") + if retryCount > 0 { + n = node + err = nil + } + retryCount++ + return n, err + } + }, + ) + servicetest.Run(t, eng) + + <-hooks.initSuccessful + + assert.Equal(t, node, eng.localNode) +} diff --git a/go.mod b/go.mod index f2b42196690..0ddf7caf26d 100644 --- a/go.mod +++ b/go.mod @@ -72,7 +72,7 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/smartcontractkit/chain-selectors v1.0.10 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458 github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 github.com/smartcontractkit/chainlink-feeds v0.0.0-20240522213638-159fb2d99917 diff --git a/go.sum b/go.sum index bcbc9b2d27b..e9dc9d81303 100644 --- a/go.sum +++ b/go.sum @@ -1171,8 +1171,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb h1:R4OkRLPz6mZm8k7JFfLpQ9Ib/e1n1qcxg+hVxc0pKOk= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458 h1:+7LQmbMNaLXej+0ajbTxUfTt4w/ILODpmrOETQ5rTCI= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= diff --git a/integration-tests/go.mod b/integration-tests/go.mod index a6fa0288ef6..f0991c1f715 100644 --- a/integration-tests/go.mod +++ b/integration-tests/go.mod @@ -27,8 +27,8 @@ require ( github.com/shopspring/decimal v1.3.1 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb - github.com/smartcontractkit/chainlink-testing-framework v1.30.4 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458 + github.com/smartcontractkit/chainlink-testing-framework v1.30.5 github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 github.com/smartcontractkit/chainlink/v2 v2.0.0-00010101000000-000000000000 github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c diff --git a/integration-tests/go.sum b/integration-tests/go.sum index a643f859642..016fca4c4a9 100644 --- a/integration-tests/go.sum +++ b/integration-tests/go.sum @@ -1512,8 +1512,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb h1:R4OkRLPz6mZm8k7JFfLpQ9Ib/e1n1qcxg+hVxc0pKOk= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458 h1:+7LQmbMNaLXej+0ajbTxUfTt4w/ILODpmrOETQ5rTCI= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= @@ -1524,8 +1524,8 @@ github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240605170242-555ff582f36 github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240605170242-555ff582f36a/go.mod h1:QqcZSwLgEIn7YraAIRmomnBMAuVFephiHrIWVlkWbFI= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240531021326-99118e47f696 h1:h1E87+z+JcUEfvbJVF56SnZA/YUFE5ewUE61MaR/Ewg= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240531021326-99118e47f696/go.mod h1:OiWUTrrpSLLTMh7FINWjEh6mmDJCVPaC4yEsDCVaWdU= -github.com/smartcontractkit/chainlink-testing-framework v1.30.4 h1:kf6zRL6v5D047gynYNNqXGl9QBvnQSa4LMs1iHLRu64= -github.com/smartcontractkit/chainlink-testing-framework v1.30.4/go.mod h1:E6uNEZhZZid9PHv6/Kq5Vn63GlO61ZcKB+/f0DKo3Q4= +github.com/smartcontractkit/chainlink-testing-framework v1.30.5 h1:RBeQkaUH095L/hOH6JbfScAo4jkI0osBp8kgULnGwos= +github.com/smartcontractkit/chainlink-testing-framework v1.30.5/go.mod h1:E6uNEZhZZid9PHv6/Kq5Vn63GlO61ZcKB+/f0DKo3Q4= github.com/smartcontractkit/chainlink-testing-framework/grafana v0.0.0-20240328204215-ac91f55f1449 h1:fX/xmGm1GBsD1ZZnooNT+eWA0hiTAqFlHzOC5CY4dy8= github.com/smartcontractkit/chainlink-testing-framework/grafana v0.0.0-20240328204215-ac91f55f1449/go.mod h1:DC8sQMyTlI/44UCTL8QWFwb0bYNoXCfjwCv2hMivYZU= github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 h1:FFdvEzlYwcuVHkdZ8YnZR/XomeMGbz5E2F2HZI3I3w8= diff --git a/integration-tests/load/go.mod b/integration-tests/load/go.mod index 854f7902c19..7fb29e4060b 100644 --- a/integration-tests/load/go.mod +++ b/integration-tests/load/go.mod @@ -16,8 +16,8 @@ require ( github.com/rs/zerolog v1.30.0 github.com/slack-go/slack v0.12.2 github.com/smartcontractkit/chainlink-automation v1.0.4 - github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb - github.com/smartcontractkit/chainlink-testing-framework v1.30.4 + github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458 + github.com/smartcontractkit/chainlink-testing-framework v1.30.5 github.com/smartcontractkit/chainlink/integration-tests v0.0.0-20240214231432-4ad5eb95178c github.com/smartcontractkit/chainlink/v2 v2.9.0-beta0.0.20240216210048-da02459ddad8 github.com/smartcontractkit/libocr v0.0.0-20240419185742-fd3cab206b2c diff --git a/integration-tests/load/go.sum b/integration-tests/load/go.sum index fd31a8e8a2d..5dd5443f52f 100644 --- a/integration-tests/load/go.sum +++ b/integration-tests/load/go.sum @@ -1502,8 +1502,8 @@ github.com/smartcontractkit/chain-selectors v1.0.10 h1:t9kJeE6B6G+hKD0GYR4kGJSCq github.com/smartcontractkit/chain-selectors v1.0.10/go.mod h1:d4Hi+E1zqjy9HqMkjBE5q1vcG9VGgxf5VxiRHfzi2kE= github.com/smartcontractkit/chainlink-automation v1.0.4 h1:iyW181JjKHLNMnDleI8umfIfVVlwC7+n5izbLSFgjw8= github.com/smartcontractkit/chainlink-automation v1.0.4/go.mod h1:u4NbPZKJ5XiayfKHD/v3z3iflQWqvtdhj13jVZXj/cM= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb h1:R4OkRLPz6mZm8k7JFfLpQ9Ib/e1n1qcxg+hVxc0pKOk= -github.com/smartcontractkit/chainlink-common v0.1.7-0.20240613201342-a855825f87bb/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458 h1:+7LQmbMNaLXej+0ajbTxUfTt4w/ILODpmrOETQ5rTCI= +github.com/smartcontractkit/chainlink-common v0.1.7-0.20240614120734-7fa0ab584458/go.mod h1:L32xvCpk84Nglit64OhySPMP1tM3TTBK7Tw0qZl7Sd4= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d h1:5tgMC5Gi2UAOKZ+m28W8ubjLeR0pQCAcrz6eQ0rW510= github.com/smartcontractkit/chainlink-cosmos v0.4.1-0.20240524214833-c362c2ebbd2d/go.mod h1:0UNuO3nDt9MFsZPaHJBEUolxVkN0iC69j1ccDp95e8k= github.com/smartcontractkit/chainlink-data-streams v0.0.0-20240220203239-09be0ea34540 h1:xFSv8561jsLtF6gYZr/zW2z5qUUAkcFkApin2mnbYTo= @@ -1514,8 +1514,8 @@ github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240605170242-555ff582f36 github.com/smartcontractkit/chainlink-solana v1.0.3-0.20240605170242-555ff582f36a/go.mod h1:QqcZSwLgEIn7YraAIRmomnBMAuVFephiHrIWVlkWbFI= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240531021326-99118e47f696 h1:h1E87+z+JcUEfvbJVF56SnZA/YUFE5ewUE61MaR/Ewg= github.com/smartcontractkit/chainlink-starknet/relayer v0.0.1-beta-test.0.20240531021326-99118e47f696/go.mod h1:OiWUTrrpSLLTMh7FINWjEh6mmDJCVPaC4yEsDCVaWdU= -github.com/smartcontractkit/chainlink-testing-framework v1.30.4 h1:kf6zRL6v5D047gynYNNqXGl9QBvnQSa4LMs1iHLRu64= -github.com/smartcontractkit/chainlink-testing-framework v1.30.4/go.mod h1:E6uNEZhZZid9PHv6/Kq5Vn63GlO61ZcKB+/f0DKo3Q4= +github.com/smartcontractkit/chainlink-testing-framework v1.30.5 h1:RBeQkaUH095L/hOH6JbfScAo4jkI0osBp8kgULnGwos= +github.com/smartcontractkit/chainlink-testing-framework v1.30.5/go.mod h1:E6uNEZhZZid9PHv6/Kq5Vn63GlO61ZcKB+/f0DKo3Q4= github.com/smartcontractkit/chainlink-testing-framework/grafana v0.0.0-20240328204215-ac91f55f1449 h1:fX/xmGm1GBsD1ZZnooNT+eWA0hiTAqFlHzOC5CY4dy8= github.com/smartcontractkit/chainlink-testing-framework/grafana v0.0.0-20240328204215-ac91f55f1449/go.mod h1:DC8sQMyTlI/44UCTL8QWFwb0bYNoXCfjwCv2hMivYZU= github.com/smartcontractkit/chainlink-vrf v0.0.0-20231120191722-fef03814f868 h1:FFdvEzlYwcuVHkdZ8YnZR/XomeMGbz5E2F2HZI3I3w8=