diff --git a/nodebuilder/share/constructors.go b/nodebuilder/share/constructors.go index d45fe6371f..2ef0596254 100644 --- a/nodebuilder/share/constructors.go +++ b/nodebuilder/share/constructors.go @@ -21,18 +21,16 @@ import ( ) func newDiscovery(cfg *disc.Parameters, -) func(routing.ContentRouting, host.Host, *peers.Manager) *disc.Discovery { +) func(routing.ContentRouting, host.Host, *peers.Manager) (*disc.Discovery, error) { return func( r routing.ContentRouting, h host.Host, manager *peers.Manager, - ) *disc.Discovery { + ) (*disc.Discovery, error) { return disc.NewDiscovery( + cfg, h, routingdisc.NewRoutingDiscovery(r), - disc.WithPeersLimit(cfg.PeersLimit), - disc.WithAdvertiseInterval(cfg.AdvertiseInterval), - disc.WithOnPeersUpdate(manager.UpdatedFullNodes), ) } } diff --git a/share/availability/full/testing.go b/share/availability/full/testing.go index a636b26ea6..deca8741c9 100644 --- a/share/availability/full/testing.go +++ b/share/availability/full/testing.go @@ -41,12 +41,15 @@ func Node(dn *availability_test.TestDagNet) *availability_test.TestNode { } func TestAvailability(t *testing.T, getter share.Getter) *ShareAvailability { - disc := discovery.NewDiscovery( + params := discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 10 + disc, err := discovery.NewDiscovery( + params, nil, routing.NewRoutingDiscovery(routinghelpers.Null{}), - discovery.WithAdvertiseInterval(time.Second), - discovery.WithPeersLimit(10), ) + require.NoError(t, err) store, err := eds.NewStore(eds.DefaultParameters(), t.TempDir(), datastore.NewMapDatastore()) require.NoError(t, err) err = store.Start(context.Background()) diff --git a/share/p2p/discovery/discovery.go b/share/p2p/discovery/discovery.go index 9d430fc115..1b67cc4bf5 100644 --- a/share/p2p/discovery/discovery.go +++ b/share/p2p/discovery/discovery.go @@ -70,26 +70,26 @@ func (f OnUpdatedPeers) add(next OnUpdatedPeers) OnUpdatedPeers { // NewDiscovery constructs a new discovery. func NewDiscovery( + params *Parameters, h host.Host, d discovery.Discovery, opts ...Option, -) *Discovery { - params := DefaultParameters() - - for _, opt := range opts { - opt(params) +) (*Discovery, error) { + if err := params.Validate(); err != nil { + return nil, err } + o := newOptions(opts...) return &Discovery{ tag: params.Tag, set: newLimitedSet(params.PeersLimit), host: h, disc: d, connector: newBackoffConnector(h, defaultBackoffFactory), - onUpdatedPeers: params.onUpdatedPeers, + onUpdatedPeers: o.onUpdatedPeers, params: params, triggerDisc: make(chan struct{}), - } + }, nil } func (d *Discovery) Start(context.Context) error { @@ -158,7 +158,6 @@ func (d *Discovery) Advertise(ctx context.Context) { timer := time.NewTimer(d.params.AdvertiseInterval) defer timer.Stop() for { - fmt.Println(d.tag) _, err := d.disc.Advertise(ctx, d.tag) d.metrics.observeAdvertise(ctx, err) if err != nil { diff --git a/share/p2p/discovery/discovery_test.go b/share/p2p/discovery/discovery_test.go index 3720a0c22d..b9db19dd2f 100644 --- a/share/p2p/discovery/discovery_test.go +++ b/share/p2p/discovery/discovery_test.go @@ -37,19 +37,24 @@ func TestDiscovery(t *testing.T) { } host, routingDisc := tn.peer() - peerA := tn.discovery( - host, routingDisc, - WithPeersLimit(nodes), - WithAdvertiseInterval(-1), + params := DefaultParameters() + params.PeersLimit = nodes + params.AdvertiseInterval = -1 + params.Tag = "full" + + peerA := tn.discovery(params, host, routingDisc, WithOnPeersUpdate(submit), ) + params = &Parameters{ + PeersLimit: 0, + AdvertiseInterval: time.Millisecond * 100, + Tag: "full", + } discs := make([]*Discovery, nodes) for i := range discs { host, routingDisc := tn.peer() - discs[i] = tn.discovery(host, routingDisc, - WithPeersLimit(0), - WithAdvertiseInterval(time.Millisecond*100)) + discs[i] = tn.discovery(params, host, routingDisc) select { case res := <-updateCh: @@ -92,23 +97,25 @@ func TestDiscoveryTagged(t *testing.T) { // sub will discover both peers, but on different tags sub, routingDisc := tn.peer() + params := DefaultParameters() + // create 2 discovery services for sub, each with a different tag + params.Tag = "tag1" done1 := make(chan struct{}) - tn.discovery(sub, routingDisc, - WithTag("tag1"), + tn.discovery(params, sub, routingDisc, WithOnPeersUpdate(checkPeer(t, adv1.ID(), done1))) + params.Tag = "tag2" done2 := make(chan struct{}) - tn.discovery(sub, routingDisc, - WithTag("tag2"), + tn.discovery(params, sub, routingDisc, WithOnPeersUpdate(checkPeer(t, adv2.ID(), done2))) // run discovery services for advertisers - tn.discovery(adv1, routingDisc1, - WithTag("tag1")) + params.Tag = "tag1" + tn.discovery(params, adv1, routingDisc1) - tn.discovery(adv2, routingDisc2, - WithTag("tag2")) + params.Tag = "tag2" + tn.discovery(params, adv2, routingDisc2) // wait for discovery services to discover each other on different tags select { @@ -148,9 +155,15 @@ func newTestnet(ctx context.Context, t *testing.T) *testnet { return &testnet{ctx: ctx, T: t, bootstrapper: *host.InfoFromHost(hst)} } -func (t *testnet) discovery(hst host.Host, routingDisc discovery.Discovery, opts ...Option) *Discovery { - disc := NewDiscovery(hst, routingDisc, opts...) - err := disc.Start(t.ctx) +func (t *testnet) discovery( + params *Parameters, + hst host.Host, + routingDisc discovery.Discovery, + opts ...Option, +) *Discovery { + disc, err := NewDiscovery(params, hst, routingDisc, opts...) + require.NoError(t.T, err) + err = disc.Start(t.ctx) require.NoError(t.T, err) t.T.Cleanup(func() { err := disc.Stop(t.ctx) diff --git a/share/p2p/discovery/options.go b/share/p2p/discovery/options.go index 11189338a9..886f1850d2 100644 --- a/share/p2p/discovery/options.go +++ b/share/p2p/discovery/options.go @@ -21,14 +21,18 @@ type Parameters struct { // Set -1 to disable. // NOTE: only full and bridge can advertise themselves. AdvertiseInterval time.Duration - // onUpdatedPeers will be called on peer set changes - onUpdatedPeers OnUpdatedPeers // Tag is used as rondezvous point for discovery service Tag string } +// options is the set of options that can be configured for the Discovery module +type options struct { + // onUpdatedPeers will be called on peer set changes + onUpdatedPeers OnUpdatedPeers +} + // Option is a function that configures Discovery Parameters -type Option func(*Parameters) +type Option func(*options) // DefaultParameters returns the default Parameters' configuration values // for the Discovery module @@ -37,29 +41,12 @@ func DefaultParameters() *Parameters { PeersLimit: 5, AdvertiseInterval: time.Hour, //TODO: remove fullNodesTag default value once multiple tags are supported - Tag: fullNodesTag, - onUpdatedPeers: func(peer.ID, bool) {}, + Tag: fullNodesTag, } } // Validate validates the values in Parameters func (p *Parameters) Validate() error { - if p.AdvertiseInterval <= 0 { - return fmt.Errorf( - "discovery: invalid option: value AdvertiseInterval %s, %s", - "is 0 or negative.", - "value must be positive", - ) - } - - if p.PeersLimit <= 0 { - return fmt.Errorf( - "discovery: invalid option: value PeersLimit %s, %s", - "is negative.", - "value must be positive", - ) - } - if p.Tag == "" { return fmt.Errorf( "discovery: invalid option: value Tag %s, %s", @@ -70,32 +57,20 @@ func (p *Parameters) Validate() error { return nil } -// WithPeersLimit is a functional option that Discovery -// uses to set the PeersLimit configuration param -func WithPeersLimit(peersLimit uint) Option { - return func(p *Parameters) { - p.PeersLimit = peersLimit - } -} - -// WithAdvertiseInterval is a functional option that Discovery -// uses to set the AdvertiseInterval configuration param -func WithAdvertiseInterval(advInterval time.Duration) Option { - return func(p *Parameters) { - p.AdvertiseInterval = advInterval - } -} - // WithOnPeersUpdate chains OnPeersUpdate callbacks on every update of discovered peers list. func WithOnPeersUpdate(f OnUpdatedPeers) Option { - return func(p *Parameters) { + return func(p *options) { p.onUpdatedPeers = p.onUpdatedPeers.add(f) } } -// WithTag is a functional option that sets the Tag for the discovery service -func WithTag(tag string) Option { - return func(p *Parameters) { - p.Tag = tag +func newOptions(opts ...Option) *options { + defaults := &options{ + onUpdatedPeers: func(peer.ID, bool) {}, + } + + for _, opt := range opts { + opt(defaults) } + return defaults } diff --git a/share/p2p/peers/manager_test.go b/share/p2p/peers/manager_test.go index 31496481ed..a461534ac8 100644 --- a/share/p2p/peers/manager_test.go +++ b/share/p2p/peers/manager_test.go @@ -389,12 +389,17 @@ func TestIntegration(t *testing.T) { bnHost := nw.Hosts()[1] bnRouter, err := dht.New(ctx, bnHost, opts...) require.NoError(t, err) - bnDisc := discovery.NewDiscovery( + + params := discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 0 + + bnDisc, err := discovery.NewDiscovery( + params, bnHost, routingdisc.NewRoutingDiscovery(bnRouter), - discovery.WithPeersLimit(0), - discovery.WithAdvertiseInterval(time.Second), ) + require.NoError(t, err) // set up full node / receiver node fnHost := nw.Hosts()[2] @@ -419,11 +424,14 @@ func TestIntegration(t *testing.T) { } // set up discovery for full node with hook to peer manager and check discovered peer - fnDisc := discovery.NewDiscovery( + params = discovery.DefaultParameters() + params.AdvertiseInterval = time.Second + params.PeersLimit = 10 + + fnDisc, err := discovery.NewDiscovery( + params, fnHost, routingdisc.NewRoutingDiscovery(fnRouter), - discovery.WithPeersLimit(10), - discovery.WithAdvertiseInterval(time.Second), discovery.WithOnPeersUpdate(fnPeerManager.UpdatedFullNodes), discovery.WithOnPeersUpdate(checkDiscoveredPeer), )