diff --git a/.changeset/gold-rats-hide.md b/.changeset/gold-rats-hide.md new file mode 100644 index 00000000000..b290847556a --- /dev/null +++ b/.changeset/gold-rats-hide.md @@ -0,0 +1,5 @@ +--- +"chainlink": patch +--- + +External peering core service diff --git a/core/capabilities/syncer.go b/core/capabilities/syncer.go new file mode 100644 index 00000000000..a8cfb2c56f8 --- /dev/null +++ b/core/capabilities/syncer.go @@ -0,0 +1,80 @@ +package capabilities + +import ( + "context" + + "github.com/smartcontractkit/chainlink-common/pkg/services" + "github.com/smartcontractkit/chainlink-common/pkg/types" + + "github.com/smartcontractkit/libocr/ragep2p" + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + + "github.com/smartcontractkit/chainlink/v2/core/logger" + p2ptypes "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +type registrySyncer struct { + peerWrapper p2ptypes.PeerWrapper + registry types.CapabilitiesRegistry + lggr logger.Logger +} + +var _ services.Service = ®istrySyncer{} + +// RegistrySyncer updates local Registry to match its onchain counterpart +func NewRegistrySyncer(peerWrapper p2ptypes.PeerWrapper, registry types.CapabilitiesRegistry, lggr logger.Logger) *registrySyncer { + return ®istrySyncer{ + peerWrapper: peerWrapper, + registry: registry, + lggr: lggr, + } +} + +func (s *registrySyncer) Start(ctx context.Context) error { + // NOTE: temporary hard-coded values + defaultStreamConfig := p2ptypes.StreamConfig{ + IncomingMessageBufferSize: 1000000, + OutgoingMessageBufferSize: 1000000, + MaxMessageLenBytes: 100000, + MessageRateLimiter: ragep2p.TokenBucketParams{ + Rate: 10.0, + Capacity: 1000, + }, + BytesRateLimiter: ragep2p.TokenBucketParams{ + Rate: 10.0, + Capacity: 1000, + }, + } + peerIDs := []string{ + "12D3KooWF3dVeJ6YoT5HFnYhmwQWWMoEwVFzJQ5kKCMX3ZityxMC", + "12D3KooWQsmok6aD8PZqt3RnJhQRrNzKHLficq7zYFRp7kZ1hHP8", + "12D3KooWJbZLiMuGeKw78s3LM5TNgBTJHcF39DraxLu14bucG9RN", + "12D3KooWGqfSPhHKmQycfhRjgUDE2vg9YWZN27Eue8idb2ZUk6EH", + } + peers := make(map[ragetypes.PeerID]p2ptypes.StreamConfig) + for _, peerID := range peerIDs { + var p ragetypes.PeerID + err := p.UnmarshalText([]byte(peerID)) + if err != nil { + return err + } + peers[p] = defaultStreamConfig + } + return s.peerWrapper.GetPeer().UpdateConnections(peers) +} + +func (s *registrySyncer) Close() error { + return s.peerWrapper.GetPeer().UpdateConnections(map[ragetypes.PeerID]p2ptypes.StreamConfig{}) +} + +func (s *registrySyncer) Ready() error { + return nil +} + +func (s *registrySyncer) HealthReport() map[string]error { + return nil +} + +func (s *registrySyncer) Name() string { + return "RegistrySyncer" +} diff --git a/core/capabilities/syncer_test.go b/core/capabilities/syncer_test.go new file mode 100644 index 00000000000..acfe0f00233 --- /dev/null +++ b/core/capabilities/syncer_test.go @@ -0,0 +1,28 @@ +package capabilities_test + +import ( + "testing" + + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + commonMocks "github.com/smartcontractkit/chainlink-common/pkg/types/mocks" + coreCapabilities "github.com/smartcontractkit/chainlink/v2/core/capabilities" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types/mocks" +) + +func TestSyncer_CleanStartClose(t *testing.T) { + lggr := logger.TestLogger(t) + ctx := testutils.Context(t) + peer := mocks.NewPeer(t) + peer.On("UpdateConnections", mock.Anything).Return(nil) + wrapper := mocks.NewPeerWrapper(t) + wrapper.On("GetPeer").Return(peer) + registry := commonMocks.NewCapabilitiesRegistry(t) + + syncer := coreCapabilities.NewRegistrySyncer(wrapper, registry, lggr) + require.NoError(t, syncer.Start(ctx)) + require.NoError(t, syncer.Close()) +} diff --git a/core/chains/evm/config/mocks/chain_scoped_config.go b/core/chains/evm/config/mocks/chain_scoped_config.go index badba1d69f3..29b6d6f3f3e 100644 --- a/core/chains/evm/config/mocks/chain_scoped_config.go +++ b/core/chains/evm/config/mocks/chain_scoped_config.go @@ -80,6 +80,26 @@ func (_m *ChainScopedConfig) AutoPprof() coreconfig.AutoPprof { return r0 } +// Capabilities provides a mock function with given fields: +func (_m *ChainScopedConfig) Capabilities() coreconfig.Capabilities { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Capabilities") + } + + var r0 coreconfig.Capabilities + if rf, ok := ret.Get(0).(func() coreconfig.Capabilities); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(coreconfig.Capabilities) + } + } + + return r0 +} + // CosmosEnabled provides a mock function with given fields: func (_m *ChainScopedConfig) CosmosEnabled() bool { ret := _m.Called() diff --git a/core/config/app_config.go b/core/config/app_config.go index 290e14dcc45..869477218db 100644 --- a/core/config/app_config.go +++ b/core/config/app_config.go @@ -35,6 +35,7 @@ type AppConfig interface { AuditLogger() AuditLogger AutoPprof() AutoPprof + Capabilities() Capabilities Database() Database Feature() Feature FluxMonitor() FluxMonitor diff --git a/core/config/capabilities_config.go b/core/config/capabilities_config.go new file mode 100644 index 00000000000..8cde986ccb7 --- /dev/null +++ b/core/config/capabilities_config.go @@ -0,0 +1,6 @@ +package config + +type Capabilities interface { + Peering() P2P + // NOTE: RegistrySyncer will need config with relay ID, chain ID and contract address when implemented +} diff --git a/core/config/docs/core.toml b/core/config/docs/core.toml index 95d59cca062..984080ea3f1 100644 --- a/core/config/docs/core.toml +++ b/core/config/docs/core.toml @@ -434,6 +434,47 @@ DeltaReconcile = '1m' # Default # but the host and port must be fully specified and cannot be empty. You can specify `0.0.0.0` (IPv4) or `::` (IPv6) to listen on all interfaces, but that is not recommended. ListenAddresses = ['1.2.3.4:9999', '[a52d:0:a88:1274::abcd]:1337'] # Example +[Capabilities.Peering] +# IncomingMessageBufferSize is the per-remote number of incoming +# messages to buffer. Any additional messages received on top of those +# already in the queue will be dropped. +IncomingMessageBufferSize = 10 # Default +# OutgoingMessageBufferSize is the per-remote number of outgoing +# messages to buffer. Any additional messages send on top of those +# already in the queue will displace the oldest. +# NOTE: OutgoingMessageBufferSize should be comfortably smaller than remote's +# IncomingMessageBufferSize to give the remote enough space to process +# them all in case we regained connection and now send a bunch at once +OutgoingMessageBufferSize = 10 # Default +# PeerID is the default peer ID to use for OCR jobs. If unspecified, uses the first available peer ID. +PeerID = '12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw' # Example +# TraceLogging enables trace level logging. +TraceLogging = false # Default + +[Capabilities.Peering.V2] +# Enabled enables P2P V2. +Enabled = false # Default +# AnnounceAddresses is the addresses the peer will advertise on the network in `host:port` form as accepted by the TCP version of Go’s `net.Dial`. +# The addresses should be reachable by other nodes on the network. When attempting to connect to another node, +# a node will attempt to dial all of the other node’s AnnounceAddresses in round-robin fashion. +AnnounceAddresses = ['1.2.3.4:9999', '[a52d:0:a88:1274::abcd]:1337'] # Example +# DefaultBootstrappers is the default bootstrapper peers for libocr's v2 networking stack. +# +# Oracle nodes typically only know each other’s PeerIDs, but not their hostnames, IP addresses, or ports. +# DefaultBootstrappers are special nodes that help other nodes discover each other’s `AnnounceAddresses` so they can communicate. +# Nodes continuously attempt to connect to bootstrappers configured in here. When a node wants to connect to another node +# (which it knows only by PeerID, but not by address), it discovers the other node’s AnnounceAddresses from communications +# received from its DefaultBootstrappers or other discovered nodes. To facilitate discovery, +# nodes will regularly broadcast signed announcements containing their PeerID and AnnounceAddresses. +DefaultBootstrappers = ['12D3KooWMHMRLQkgPbFSYHwD3NBuwtS1AmxhvKVUrcfyaGDASR4U@1.2.3.4:9999', '12D3KooWM55u5Swtpw9r8aFLQHEtw7HR4t44GdNs654ej5gRs2Dh@example.com:1234'] # Example +# DeltaDial controls how far apart Dial attempts are +DeltaDial = '15s' # Default +# DeltaReconcile controls how often a Reconcile message is sent to every peer. +DeltaReconcile = '1m' # Default +# ListenAddresses is the addresses the peer will listen to on the network in `host:port` form as accepted by `net.Listen()`, +# but the host and port must be fully specified and cannot be empty. You can specify `0.0.0.0` (IPv4) or `::` (IPv6) to listen on all interfaces, but that is not recommended. +ListenAddresses = ['1.2.3.4:9999', '[a52d:0:a88:1274::abcd]:1337'] # Example + [Keeper] # **ADVANCED** # DefaultTransactionQueueDepth controls the queue size for `DropOldestStrategy` in Keeper. Set to 0 to use `SendEvery` strategy instead. diff --git a/core/config/toml/types.go b/core/config/toml/types.go index 08ebf68f59b..68445f5b860 100644 --- a/core/config/toml/types.go +++ b/core/config/toml/types.go @@ -55,6 +55,7 @@ type Core struct { Insecure Insecure `toml:",omitempty"` Tracing Tracing `toml:",omitempty"` Mercury Mercury `toml:",omitempty"` + Capabilities Capabilities `toml:",omitempty"` } // SetFrom updates c with any non-nil values from f. (currently TOML field only!) @@ -84,6 +85,7 @@ func (c *Core) SetFrom(f *Core) { c.P2P.setFrom(&f.P2P) c.Keeper.setFrom(&f.Keeper) c.Mercury.setFrom(&f.Mercury) + c.Capabilities.setFrom(&f.Capabilities) c.AutoPprof.setFrom(&f.AutoPprof) c.Pyroscope.setFrom(&f.Pyroscope) @@ -1386,6 +1388,14 @@ func (m *MercurySecrets) ValidateConfig() (err error) { return err } +type Capabilities struct { + Peering P2P `toml:",omitempty"` +} + +func (c *Capabilities) setFrom(f *Capabilities) { + c.Peering.setFrom(&f.Peering) +} + type ThresholdKeyShareSecrets struct { ThresholdKeyShare *models.Secret } diff --git a/core/services/chainlink/application.go b/core/services/chainlink/application.go index bb6c0030a95..ca8f118b149 100644 --- a/core/services/chainlink/application.go +++ b/core/services/chainlink/application.go @@ -49,6 +49,7 @@ import ( "github.com/smartcontractkit/chainlink/v2/core/services/ocr2" "github.com/smartcontractkit/chainlink/v2/core/services/ocrbootstrap" "github.com/smartcontractkit/chainlink/v2/core/services/ocrcommon" + externalp2p "github.com/smartcontractkit/chainlink/v2/core/services/p2p/wrapper" "github.com/smartcontractkit/chainlink/v2/core/services/periodicbackup" "github.com/smartcontractkit/chainlink/v2/core/services/pg" "github.com/smartcontractkit/chainlink/v2/core/services/pipeline" @@ -191,6 +192,15 @@ func NewApplication(opts ApplicationOpts) (Application, error) { unrestrictedHTTPClient := opts.UnrestrictedHTTPClient registry := capabilities.NewRegistry(globalLogger) + if cfg.Capabilities().Peering().Enabled() { + externalPeerWrapper := externalp2p.NewExternalPeerWrapper(keyStore.P2P(), cfg.Capabilities().Peering(), globalLogger) + srvcs = append(srvcs, externalPeerWrapper) + + // NOTE: RegistrySyncer will depend on a Relayer when fully implemented + registrySyncer := capabilities.NewRegistrySyncer(externalPeerWrapper, registry, globalLogger) + srvcs = append(srvcs, registrySyncer) + } + // LOOPs can be created as options, in the case of LOOP relayers, or // as OCR2 job implementations, in the case of Median today. // We will have a non-nil registry here in LOOP relayers are being used, otherwise diff --git a/core/services/chainlink/config_capabilities.go b/core/services/chainlink/config_capabilities.go new file mode 100644 index 00000000000..d432d31ad18 --- /dev/null +++ b/core/services/chainlink/config_capabilities.go @@ -0,0 +1,16 @@ +package chainlink + +import ( + "github.com/smartcontractkit/chainlink/v2/core/config" + "github.com/smartcontractkit/chainlink/v2/core/config/toml" +) + +var _ config.Capabilities = (*capabilitiesConfig)(nil) + +type capabilitiesConfig struct { + c toml.Capabilities +} + +func (c *capabilitiesConfig) Peering() config.P2P { + return &p2p{c: c.c.Peering} +} diff --git a/core/services/chainlink/config_capabilities_test.go b/core/services/chainlink/config_capabilities_test.go new file mode 100644 index 00000000000..7ff3f3fed08 --- /dev/null +++ b/core/services/chainlink/config_capabilities_test.go @@ -0,0 +1,46 @@ +package chainlink + +import ( + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/libocr/commontypes" +) + +func TestCapabilitiesConfig(t *testing.T) { + opts := GeneralConfigOpts{ + ConfigStrings: []string{fullTOML}, + } + cfg, err := opts.New() + require.NoError(t, err) + + p2p := cfg.Capabilities().Peering() + assert.Equal(t, "p2p_12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw", p2p.PeerID().String()) + assert.Equal(t, 13, p2p.IncomingMessageBufferSize()) + assert.Equal(t, 17, p2p.OutgoingMessageBufferSize()) + assert.True(t, p2p.TraceLogging()) + + v2 := p2p.V2() + assert.False(t, v2.Enabled()) + assert.Equal(t, []string{"a", "b", "c"}, v2.AnnounceAddresses()) + assert.ElementsMatch( + t, + []commontypes.BootstrapperLocator{ + { + PeerID: "12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw", + Addrs: []string{"test:99"}, + }, + { + PeerID: "12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw", + Addrs: []string{"foo:42", "bar:10"}, + }, + }, + v2.DefaultBootstrappers(), + ) + assert.Equal(t, time.Minute, v2.DeltaDial().Duration()) + assert.Equal(t, 2*time.Second, v2.DeltaReconcile().Duration()) + assert.Equal(t, []string{"foo", "bar"}, v2.ListenAddresses()) +} diff --git a/core/services/chainlink/config_general.go b/core/services/chainlink/config_general.go index 97243926973..cae01c01cb7 100644 --- a/core/services/chainlink/config_general.go +++ b/core/services/chainlink/config_general.go @@ -397,6 +397,10 @@ func (g *generalConfig) AutoPprofProfileRoot() string { return s } +func (g *generalConfig) Capabilities() config.Capabilities { + return &capabilitiesConfig{c: g.c.Capabilities} +} + func (g *generalConfig) Database() coreconfig.Database { return &databaseConfig{c: g.c.Database, s: g.secrets.Secrets.Database, logSQL: g.logSQL} } diff --git a/core/services/chainlink/config_test.go b/core/services/chainlink/config_test.go index 4422a743689..6cf9537f065 100644 --- a/core/services/chainlink/config_test.go +++ b/core/services/chainlink/config_test.go @@ -424,6 +424,25 @@ func TestConfig_Marshal(t *testing.T) { ListenAddresses: &[]string{"foo", "bar"}, }, } + full.Capabilities = toml.Capabilities{ + Peering: toml.P2P{ + IncomingMessageBufferSize: ptr[int64](13), + OutgoingMessageBufferSize: ptr[int64](17), + PeerID: mustPeerID("12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw"), + TraceLogging: ptr(true), + V2: toml.P2PV2{ + Enabled: ptr(false), + AnnounceAddresses: &[]string{"a", "b", "c"}, + DefaultBootstrappers: &[]ocrcommontypes.BootstrapperLocator{ + {PeerID: "12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw", Addrs: []string{"foo:42", "bar:10"}}, + {PeerID: "12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw", Addrs: []string{"test:99"}}, + }, + DeltaDial: commoncfg.MustNewDuration(time.Minute), + DeltaReconcile: commoncfg.MustNewDuration(2 * time.Second), + ListenAddresses: &[]string{"foo", "bar"}, + }, + }, + } full.Keeper = toml.Keeper{ DefaultTransactionQueueDepth: ptr[uint32](17), GasPriceBufferPercent: ptr[uint16](12), diff --git a/core/services/chainlink/mocks/general_config.go b/core/services/chainlink/mocks/general_config.go index 1dd85875395..a520a878d3c 100644 --- a/core/services/chainlink/mocks/general_config.go +++ b/core/services/chainlink/mocks/general_config.go @@ -86,6 +86,26 @@ func (_m *GeneralConfig) AutoPprof() config.AutoPprof { return r0 } +// Capabilities provides a mock function with given fields: +func (_m *GeneralConfig) Capabilities() config.Capabilities { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Capabilities") + } + + var r0 config.Capabilities + if rf, ok := ret.Get(0).(func() config.Capabilities); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(config.Capabilities) + } + } + + return r0 +} + // ConfigTOML provides a mock function with given fields: func (_m *GeneralConfig) ConfigTOML() (string, string) { ret := _m.Called() diff --git a/core/services/chainlink/testdata/config-empty-effective.toml b/core/services/chainlink/testdata/config-empty-effective.toml index 148f6b24ff5..8fdb2858cdb 100644 --- a/core/services/chainlink/testdata/config-empty-effective.toml +++ b/core/services/chainlink/testdata/config-empty-effective.toml @@ -228,3 +228,18 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' + +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] diff --git a/core/services/chainlink/testdata/config-full.toml b/core/services/chainlink/testdata/config-full.toml index c1606a5b067..cd8a17e538a 100644 --- a/core/services/chainlink/testdata/config-full.toml +++ b/core/services/chainlink/testdata/config-full.toml @@ -239,6 +239,21 @@ LatestReportDeadline = '1m42s' [Mercury.TLS] CertFile = '/path/to/cert.pem' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 13 +OutgoingMessageBufferSize = 17 +PeerID = '12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw' +TraceLogging = true + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = ['a', 'b', 'c'] +DefaultBootstrappers = ['12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw@foo:42/bar:10', '12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw@test:99'] +DeltaDial = '1m0s' +DeltaReconcile = '2s' +ListenAddresses = ['foo', 'bar'] + [[EVM]] ChainID = '1' Enabled = false diff --git a/core/services/chainlink/testdata/config-multi-chain-effective.toml b/core/services/chainlink/testdata/config-multi-chain-effective.toml index 9f69d4aa909..45d52432ee5 100644 --- a/core/services/chainlink/testdata/config-multi-chain-effective.toml +++ b/core/services/chainlink/testdata/config-multi-chain-effective.toml @@ -229,6 +229,21 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/core/services/p2p/types/mocks/peer_wrapper.go b/core/services/p2p/types/mocks/peer_wrapper.go new file mode 100644 index 00000000000..02347cf6b86 --- /dev/null +++ b/core/services/p2p/types/mocks/peer_wrapper.go @@ -0,0 +1,141 @@ +// Code generated by mockery v2.38.0. DO NOT EDIT. + +package mocks + +import ( + context "context" + + types "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" + mock "github.com/stretchr/testify/mock" +) + +// PeerWrapper is an autogenerated mock type for the PeerWrapper type +type PeerWrapper struct { + mock.Mock +} + +// Close provides a mock function with given fields: +func (_m *PeerWrapper) Close() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Close") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// GetPeer provides a mock function with given fields: +func (_m *PeerWrapper) GetPeer() types.Peer { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for GetPeer") + } + + var r0 types.Peer + if rf, ok := ret.Get(0).(func() types.Peer); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(types.Peer) + } + } + + return r0 +} + +// HealthReport provides a mock function with given fields: +func (_m *PeerWrapper) HealthReport() map[string]error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for HealthReport") + } + + var r0 map[string]error + if rf, ok := ret.Get(0).(func() map[string]error); ok { + r0 = rf() + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).(map[string]error) + } + } + + return r0 +} + +// Name provides a mock function with given fields: +func (_m *PeerWrapper) Name() string { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Name") + } + + var r0 string + if rf, ok := ret.Get(0).(func() string); ok { + r0 = rf() + } else { + r0 = ret.Get(0).(string) + } + + return r0 +} + +// Ready provides a mock function with given fields: +func (_m *PeerWrapper) Ready() error { + ret := _m.Called() + + if len(ret) == 0 { + panic("no return value specified for Ready") + } + + var r0 error + if rf, ok := ret.Get(0).(func() error); ok { + r0 = rf() + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// Start provides a mock function with given fields: _a0 +func (_m *PeerWrapper) Start(_a0 context.Context) error { + ret := _m.Called(_a0) + + if len(ret) == 0 { + panic("no return value specified for Start") + } + + var r0 error + if rf, ok := ret.Get(0).(func(context.Context) error); ok { + r0 = rf(_a0) + } else { + r0 = ret.Error(0) + } + + return r0 +} + +// NewPeerWrapper creates a new instance of PeerWrapper. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewPeerWrapper(t interface { + mock.TestingT + Cleanup(func()) +}) *PeerWrapper { + mock := &PeerWrapper{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} diff --git a/core/services/p2p/types/types.go b/core/services/p2p/types/types.go index 5c2e5fa39bb..0f395d75409 100644 --- a/core/services/p2p/types/types.go +++ b/core/services/p2p/types/types.go @@ -15,6 +15,12 @@ type Peer interface { Receive() <-chan Message } +//go:generate mockery --quiet --name PeerWrapper --output ./mocks/ --case=underscore +type PeerWrapper interface { + services.Service + GetPeer() Peer +} + type Message struct { Sender ragetypes.PeerID Payload []byte diff --git a/core/services/p2p/wrapper/wrapper.go b/core/services/p2p/wrapper/wrapper.go new file mode 100644 index 00000000000..138d1ef21fc --- /dev/null +++ b/core/services/p2p/wrapper/wrapper.go @@ -0,0 +1,120 @@ +package wrapper + +import ( + "context" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + + "github.com/smartcontractkit/libocr/commontypes" + ragetypes "github.com/smartcontractkit/libocr/ragep2p/types" + + "github.com/smartcontractkit/chainlink/v2/core/config" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p/types" +) + +type peerWrapper struct { + peer types.Peer + keystoreP2P keystore.P2P + p2pConfig config.P2P + lggr logger.Logger +} + +var _ types.PeerWrapper = &peerWrapper{} + +func NewExternalPeerWrapper(keystoreP2P keystore.P2P, p2pConfig config.P2P, lggr logger.Logger) *peerWrapper { + return &peerWrapper{ + keystoreP2P: keystoreP2P, + p2pConfig: p2pConfig, + lggr: lggr, + } +} + +func (e *peerWrapper) GetPeer() types.Peer { + return e.peer +} + +// convert to "external" P2P PeerConfig, which is independent of OCR +// this has to be done in Start() because keystore is not unlocked at construction time +func convertPeerConfig(keystoreP2P keystore.P2P, p2pConfig config.P2P) (p2p.PeerConfig, error) { + key, err := keystoreP2P.GetOrFirst(p2pConfig.PeerID()) + if err != nil { + return p2p.PeerConfig{}, err + } + + // TODO(KS-106): use real DB + discovererDB := p2p.NewInMemoryDiscovererDatabase() + bootstrappers, err := convertBootstrapperLocators(p2pConfig.V2().DefaultBootstrappers()) + if err != nil { + return p2p.PeerConfig{}, err + } + + peerConfig := p2p.PeerConfig{ + PrivateKey: key.PrivKey, + + ListenAddresses: p2pConfig.V2().ListenAddresses(), + AnnounceAddresses: p2pConfig.V2().AnnounceAddresses(), + Bootstrappers: bootstrappers, + + DeltaReconcile: p2pConfig.V2().DeltaReconcile().Duration(), + DeltaDial: p2pConfig.V2().DeltaDial().Duration(), + DiscovererDatabase: discovererDB, + + MetricsRegisterer: prometheus.DefaultRegisterer, + } + + return peerConfig, nil +} + +func convertBootstrapperLocators(bootstrappers []commontypes.BootstrapperLocator) ([]ragetypes.PeerInfo, error) { + infos := []ragetypes.PeerInfo{} + for _, b := range bootstrappers { + addrs := make([]ragetypes.Address, len(b.Addrs)) + for i, a := range b.Addrs { + addrs[i] = ragetypes.Address(a) + } + var rageID ragetypes.PeerID + err := rageID.UnmarshalText([]byte(b.PeerID)) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal v2 peer ID (%q) from BootstrapperLocator: %w", b.PeerID, err) + } + infos = append(infos, ragetypes.PeerInfo{ + ID: rageID, + Addrs: addrs, + }) + } + return infos, nil +} + +func (e *peerWrapper) Start(ctx context.Context) error { + cfg, err := convertPeerConfig(e.keystoreP2P, e.p2pConfig) + if err != nil { + return err + } + e.lggr.Info("Starting external P2P peer") + peer, err := p2p.NewPeer(cfg, e.lggr) + if err != nil { + return err + } + e.peer = peer + return e.peer.Start(ctx) +} + +func (e *peerWrapper) Close() error { + return e.peer.Close() +} + +func (e *peerWrapper) Ready() error { + return nil +} + +func (e *peerWrapper) HealthReport() map[string]error { + return nil +} + +func (e *peerWrapper) Name() string { + return "PeerWrapper" +} diff --git a/core/services/p2p/wrapper/wrapper_test.go b/core/services/p2p/wrapper/wrapper_test.go new file mode 100644 index 00000000000..dd91ecaee47 --- /dev/null +++ b/core/services/p2p/wrapper/wrapper_test.go @@ -0,0 +1,37 @@ +package wrapper_test + +import ( + "fmt" + "testing" + + "github.com/hashicorp/consul/sdk/freeport" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" + + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils" + "github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest" + "github.com/smartcontractkit/chainlink/v2/core/logger" + "github.com/smartcontractkit/chainlink/v2/core/services/chainlink" + "github.com/smartcontractkit/chainlink/v2/core/services/keystore/keys/p2pkey" + ksmocks "github.com/smartcontractkit/chainlink/v2/core/services/keystore/mocks" + "github.com/smartcontractkit/chainlink/v2/core/services/p2p/wrapper" +) + +func TestPeerWrapper_CleanStartClose(t *testing.T) { + lggr := logger.TestLogger(t) + port := freeport.GetOne(t) + cfg := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) { + enabled := true + c.Capabilities.Peering.V2.Enabled = &enabled + c.Capabilities.Peering.V2.ListenAddresses = &[]string{fmt.Sprintf("127.0.0.1:%d", port)} + }) + keystoreP2P := ksmocks.NewP2P(t) + key, err := p2pkey.NewV2() + require.NoError(t, err) + keystoreP2P.On("GetOrFirst", mock.Anything).Return(key, nil) + + wrapper := wrapper.NewExternalPeerWrapper(keystoreP2P, cfg.Capabilities().Peering(), lggr) + require.NotNil(t, wrapper) + require.NoError(t, wrapper.Start(testutils.Context(t))) + require.NoError(t, wrapper.Close()) +} diff --git a/core/web/resolver/testdata/config-empty-effective.toml b/core/web/resolver/testdata/config-empty-effective.toml index 148f6b24ff5..8fdb2858cdb 100644 --- a/core/web/resolver/testdata/config-empty-effective.toml +++ b/core/web/resolver/testdata/config-empty-effective.toml @@ -228,3 +228,18 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' + +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] diff --git a/core/web/resolver/testdata/config-full.toml b/core/web/resolver/testdata/config-full.toml index cdfb85a6f5c..a497428c06a 100644 --- a/core/web/resolver/testdata/config-full.toml +++ b/core/web/resolver/testdata/config-full.toml @@ -239,6 +239,21 @@ LatestReportDeadline = '1m42s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 13 +OutgoingMessageBufferSize = 17 +PeerID = '12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw' +TraceLogging = true + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = ['a', 'b', 'c'] +DefaultBootstrappers = ['12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw@foo:42/bar:10', '12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw@test:99'] +DeltaDial = '1m0s' +DeltaReconcile = '2s' +ListenAddresses = ['foo', 'bar'] + [[EVM]] ChainID = '1' Enabled = false diff --git a/core/web/resolver/testdata/config-multi-chain-effective.toml b/core/web/resolver/testdata/config-multi-chain-effective.toml index 9f69d4aa909..45d52432ee5 100644 --- a/core/web/resolver/testdata/config-multi-chain-effective.toml +++ b/core/web/resolver/testdata/config-multi-chain-effective.toml @@ -229,6 +229,21 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/docs/CONFIG.md b/docs/CONFIG.md index 025995f115b..732ed762be3 100644 --- a/docs/CONFIG.md +++ b/docs/CONFIG.md @@ -1188,6 +1188,105 @@ ListenAddresses = ['1.2.3.4:9999', '[a52d:0:a88:1274::abcd]:1337'] # Example ListenAddresses is the addresses the peer will listen to on the network in `host:port` form as accepted by `net.Listen()`, but the host and port must be fully specified and cannot be empty. You can specify `0.0.0.0` (IPv4) or `::` (IPv6) to listen on all interfaces, but that is not recommended. +## Capabilities.Peering +```toml +[Capabilities.Peering] +IncomingMessageBufferSize = 10 # Default +OutgoingMessageBufferSize = 10 # Default +PeerID = '12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw' # Example +TraceLogging = false # Default +``` + + +### IncomingMessageBufferSize +```toml +IncomingMessageBufferSize = 10 # Default +``` +IncomingMessageBufferSize is the per-remote number of incoming +messages to buffer. Any additional messages received on top of those +already in the queue will be dropped. + +### OutgoingMessageBufferSize +```toml +OutgoingMessageBufferSize = 10 # Default +``` +OutgoingMessageBufferSize is the per-remote number of outgoing +messages to buffer. Any additional messages send on top of those +already in the queue will displace the oldest. +NOTE: OutgoingMessageBufferSize should be comfortably smaller than remote's +IncomingMessageBufferSize to give the remote enough space to process +them all in case we regained connection and now send a bunch at once + +### PeerID +```toml +PeerID = '12D3KooWMoejJznyDuEk5aX6GvbjaG12UzeornPCBNzMRqdwrFJw' # Example +``` +PeerID is the default peer ID to use for OCR jobs. If unspecified, uses the first available peer ID. + +### TraceLogging +```toml +TraceLogging = false # Default +``` +TraceLogging enables trace level logging. + +## Capabilities.Peering.V2 +```toml +[Capabilities.Peering.V2] +Enabled = false # Default +AnnounceAddresses = ['1.2.3.4:9999', '[a52d:0:a88:1274::abcd]:1337'] # Example +DefaultBootstrappers = ['12D3KooWMHMRLQkgPbFSYHwD3NBuwtS1AmxhvKVUrcfyaGDASR4U@1.2.3.4:9999', '12D3KooWM55u5Swtpw9r8aFLQHEtw7HR4t44GdNs654ej5gRs2Dh@example.com:1234'] # Example +DeltaDial = '15s' # Default +DeltaReconcile = '1m' # Default +ListenAddresses = ['1.2.3.4:9999', '[a52d:0:a88:1274::abcd]:1337'] # Example +``` + + +### Enabled +```toml +Enabled = false # Default +``` +Enabled enables P2P V2. + +### AnnounceAddresses +```toml +AnnounceAddresses = ['1.2.3.4:9999', '[a52d:0:a88:1274::abcd]:1337'] # Example +``` +AnnounceAddresses is the addresses the peer will advertise on the network in `host:port` form as accepted by the TCP version of Go’s `net.Dial`. +The addresses should be reachable by other nodes on the network. When attempting to connect to another node, +a node will attempt to dial all of the other node’s AnnounceAddresses in round-robin fashion. + +### DefaultBootstrappers +```toml +DefaultBootstrappers = ['12D3KooWMHMRLQkgPbFSYHwD3NBuwtS1AmxhvKVUrcfyaGDASR4U@1.2.3.4:9999', '12D3KooWM55u5Swtpw9r8aFLQHEtw7HR4t44GdNs654ej5gRs2Dh@example.com:1234'] # Example +``` +DefaultBootstrappers is the default bootstrapper peers for libocr's v2 networking stack. + +Oracle nodes typically only know each other’s PeerIDs, but not their hostnames, IP addresses, or ports. +DefaultBootstrappers are special nodes that help other nodes discover each other’s `AnnounceAddresses` so they can communicate. +Nodes continuously attempt to connect to bootstrappers configured in here. When a node wants to connect to another node +(which it knows only by PeerID, but not by address), it discovers the other node’s AnnounceAddresses from communications +received from its DefaultBootstrappers or other discovered nodes. To facilitate discovery, +nodes will regularly broadcast signed announcements containing their PeerID and AnnounceAddresses. + +### DeltaDial +```toml +DeltaDial = '15s' # Default +``` +DeltaDial controls how far apart Dial attempts are + +### DeltaReconcile +```toml +DeltaReconcile = '1m' # Default +``` +DeltaReconcile controls how often a Reconcile message is sent to every peer. + +### ListenAddresses +```toml +ListenAddresses = ['1.2.3.4:9999', '[a52d:0:a88:1274::abcd]:1337'] # Example +``` +ListenAddresses is the addresses the peer will listen to on the network in `host:port` form as accepted by `net.Listen()`, +but the host and port must be fully specified and cannot be empty. You can specify `0.0.0.0` (IPv4) or `::` (IPv6) to listen on all interfaces, but that is not recommended. + ## Keeper ```toml [Keeper] diff --git a/testdata/scripts/node/validate/default.txtar b/testdata/scripts/node/validate/default.txtar index 8a3c99ee8da..dcf9c4dc154 100644 --- a/testdata/scripts/node/validate/default.txtar +++ b/testdata/scripts/node/validate/default.txtar @@ -241,6 +241,21 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + Invalid configuration: invalid secrets: 2 errors: - Database.URL: empty: must be provided and non-empty - Password.Keystore: empty: must be provided and non-empty diff --git a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar index 873b9e91bc1..1f3ccefe51e 100644 --- a/testdata/scripts/node/validate/disk-based-logging-disabled.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-disabled.txtar @@ -285,6 +285,21 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar index 0c00fbb7adc..1b72a05a311 100644 --- a/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar +++ b/testdata/scripts/node/validate/disk-based-logging-no-dir.txtar @@ -285,6 +285,21 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/disk-based-logging.txtar b/testdata/scripts/node/validate/disk-based-logging.txtar index 0bbddd6f40f..0110db3f373 100644 --- a/testdata/scripts/node/validate/disk-based-logging.txtar +++ b/testdata/scripts/node/validate/disk-based-logging.txtar @@ -285,6 +285,21 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar index 7f109b654d9..438d94be93b 100644 --- a/testdata/scripts/node/validate/invalid-ocr-p2p.txtar +++ b/testdata/scripts/node/validate/invalid-ocr-p2p.txtar @@ -270,7 +270,22 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + Invalid configuration: invalid configuration: P2P.V2.Enabled: invalid value (false): P2P required for OCR or OCR2. Please enable P2P or disable OCR/OCR2. -- err.txt -- -invalid configuration \ No newline at end of file +invalid configuration diff --git a/testdata/scripts/node/validate/invalid.txtar b/testdata/scripts/node/validate/invalid.txtar index 011298fcde7..3c6b514de90 100644 --- a/testdata/scripts/node/validate/invalid.txtar +++ b/testdata/scripts/node/validate/invalid.txtar @@ -275,6 +275,21 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/valid.txtar b/testdata/scripts/node/validate/valid.txtar index e0bd015a184..07bf48bb084 100644 --- a/testdata/scripts/node/validate/valid.txtar +++ b/testdata/scripts/node/validate/valid.txtar @@ -282,6 +282,21 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + [[EVM]] ChainID = '1' AutoCreateKey = true diff --git a/testdata/scripts/node/validate/warnings.txtar b/testdata/scripts/node/validate/warnings.txtar index 01968ffd65d..bd84ced5f82 100644 --- a/testdata/scripts/node/validate/warnings.txtar +++ b/testdata/scripts/node/validate/warnings.txtar @@ -264,6 +264,21 @@ LatestReportDeadline = '5s' [Mercury.TLS] CertFile = '' +[Capabilities] +[Capabilities.Peering] +IncomingMessageBufferSize = 10 +OutgoingMessageBufferSize = 10 +PeerID = '' +TraceLogging = false + +[Capabilities.Peering.V2] +Enabled = false +AnnounceAddresses = [] +DefaultBootstrappers = [] +DeltaDial = '15s' +DeltaReconcile = '1m0s' +ListenAddresses = [] + # Configuration warning: Tracing.TLSCertPath: invalid value (something): must be empty when Tracing.Mode is 'unencrypted' Valid configuration.