From 0abb06b0c1fc94dff5b2c8a186159986082a5284 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 11 Oct 2023 12:31:30 +0200 Subject: [PATCH 1/3] feat(p2p)!: parameters for Subscriber * Adds params and options for Subscriber * NetworkID was optional in other places besides Subscriber. * Fixes issue with optional network id where it results in '//' * Unifies optionality for metrics * Before we called InitMetrics on the component, while we could leverage existing options --- p2p/exchange.go | 9 ++++-- p2p/helpers.go | 12 ++++++-- p2p/metrics.go | 21 ++++++-------- p2p/options.go | 20 ++++++++++++-- p2p/subscriber.go | 59 ++++++++++++++++++++++++++++++---------- p2p/subscription_test.go | 4 +-- 6 files changed, 88 insertions(+), 37 deletions(-) diff --git a/p2p/exchange.go b/p2p/exchange.go index 816ff7e8..d2af426c 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -41,10 +41,9 @@ type Exchange[H header.Header[H]] struct { trustedPeers func() peer.IDSlice peerTracker *peerTracker + metrics *metrics Params ClientParameters - - metrics *metrics } func NewExchange[H header.Header[H]]( @@ -63,11 +62,17 @@ func NewExchange[H header.Header[H]]( return nil, err } + var metrics *metrics + if params.metrics { + metrics = newExchangeMetrics() + } + ex := &Exchange[H]{ host: host, protocolID: protocolID(params.networkID), peerTracker: newPeerTracker(host, gater, params.pidstore), Params: params, + metrics: metrics, } ex.trustedPeers = func() peer.IDSlice { diff --git a/p2p/helpers.go b/p2p/helpers.go index fd85a417..1ad0d067 100644 --- a/p2p/helpers.go +++ b/p2p/helpers.go @@ -18,11 +18,19 @@ import ( ) func protocolID(networkID string) protocol.ID { - return protocol.ID(fmt.Sprintf("/%s/header-ex/v0.0.3", networkID)) + base := protocol.ID("/header-ex/v0.0.3") + if networkID != "" { + base = protocol.ID(fmt.Sprintf("/%s/%s", networkID, base)) + } + return base } func PubsubTopicID(networkID string) string { - return fmt.Sprintf("/%s/header-sub/v0.0.1", networkID) + base := "/header-sub/v0.0.1" + if networkID != "" { + base = fmt.Sprintf("/%s/%s", networkID, base) + } + return base } func validateChainID(want, have string) error { diff --git a/p2p/metrics.go b/p2p/metrics.go index 9ce59efb..5566d51d 100644 --- a/p2p/metrics.go +++ b/p2p/metrics.go @@ -8,37 +8,32 @@ import ( "go.opentelemetry.io/otel/metric" ) +var meter = otel.Meter("header/p2p") + type metrics struct { responseSize metric.Float64Histogram responseDuration metric.Float64Histogram } -var ( - meter = otel.Meter("header/p2p") -) - -func (ex *Exchange[H]) InitMetrics() error { +func newExchangeMetrics() *metrics { responseSize, err := meter.Float64Histogram( - "header_p2p_headers_response_size", + "header_p2p_exchange_response_size", metric.WithDescription("Size of get headers response in bytes"), ) if err != nil { - return err + panic(err) } - responseDuration, err := meter.Float64Histogram( - "header_p2p_headers_request_duration", + "header_p2p_exchange_request_duration", metric.WithDescription("Duration of get headers request in seconds"), ) if err != nil { - return err + panic(err) } - - ex.metrics = &metrics{ + return &metrics{ responseSize: responseSize, responseDuration: responseDuration, } - return nil } func (m *metrics) observeResponse(ctx context.Context, size uint64, duration uint64, err error) { diff --git a/p2p/options.go b/p2p/options.go index 7ba1d82d..b1b4b9cf 100644 --- a/p2p/options.go +++ b/p2p/options.go @@ -28,6 +28,8 @@ type ServerParameters struct { // networkID is a network that will be used to create a protocol.ID // Is empty by default networkID string + // metrics is a flag that enables metrics collection. + metrics bool } // DefaultServerParameters returns the default params to configure the store. @@ -53,6 +55,17 @@ func (p *ServerParameters) Validate() error { return nil } +func WithMetrics[T parameters]() Option[T] { + return func(p *T) { + switch t := any(p).(type) { //nolint:gocritic + case *ServerParameters: + t.metrics = true + case *ClientParameters: + t.metrics = true + } + } +} + // WithWriteDeadline is a functional option that configures the // `WriteDeadline` parameter. func WithWriteDeadline[T ServerParameters](deadline time.Duration) Option[T] { @@ -119,7 +132,9 @@ type ClientParameters struct { networkID string // chainID is an identifier of the chain. chainID string - + // metrics is a flag that enables metrics collection. + metrics bool + // pidstore is an optional interface used to periodically dump peers pidstore PeerIDStore } @@ -149,14 +164,13 @@ func (p *ClientParameters) Validate() error { } // WithMaxHeadersPerRangeRequest is a functional option that configures the -// // `MaxRangeRequestSize` parameter. +// `MaxRangeRequestSize` parameter. func WithMaxHeadersPerRangeRequest[T ClientParameters](amount uint64) Option[T] { return func(p *T) { switch t := any(p).(type) { //nolint:gocritic case *ClientParameters: t.MaxHeadersPerRangeRequest = amount } - } } diff --git a/p2p/subscriber.go b/p2p/subscriber.go index 03803041..ab1fb6ca 100644 --- a/p2p/subscriber.go +++ b/p2p/subscriber.go @@ -11,6 +11,16 @@ import ( "github.com/celestiaorg/go-header" ) +// SubscriberOption is a functional option for the Subscriber. +type SubscriberOption func(*SubscriberParams) + +// SubscriberParams defines the parameters for the Subscriber +// configurable with SubscriberOption. +type SubscriberParams struct { + networkID string + metrics bool +} + // Subscriber manages the lifecycle and relationship of header Module // with the "header-sub" gossipsub topic. type Subscriber[H header.Header[H]] struct { @@ -21,15 +31,34 @@ type Subscriber[H header.Header[H]] struct { msgID pubsub.MsgIdFunction } +// WithSubscriberMetrics enables metrics collection for the Subscriber. +func WithSubscriberMetrics() SubscriberOption { + return func(params *SubscriberParams) { + params.metrics = true + } +} + +// WithSubscriberNetworkID sets the network ID for the Subscriber. +func WithSubscriberNetworkID(networkID string) SubscriberOption { + return func(params *SubscriberParams) { + params.networkID = networkID + } +} + // NewSubscriber returns a Subscriber that manages the header Module's // relationship with the "header-sub" gossipsub topic. func NewSubscriber[H header.Header[H]]( ps *pubsub.PubSub, msgID pubsub.MsgIdFunction, - networkID string, + opts ...SubscriberOption, ) *Subscriber[H] { + var params SubscriberParams + for _, opt := range opts { + opt(¶ms) + } + return &Subscriber[H]{ - pubsubTopicID: PubsubTopicID(networkID), + pubsubTopicID: PubsubTopicID(params.networkID), pubsub: ps, msgID: msgID, } @@ -37,25 +66,25 @@ func NewSubscriber[H header.Header[H]]( // Start starts the Subscriber, registering a topic validator for the "header-sub" // topic and joining it. -func (p *Subscriber[H]) Start(context.Context) (err error) { - log.Infow("joining topic", "topic ID", p.pubsubTopicID) - p.topic, err = p.pubsub.Join(p.pubsubTopicID, pubsub.WithTopicMessageIdFn(p.msgID)) +func (s *Subscriber[H]) Start(context.Context) (err error) { + log.Infow("joining topic", "topic ID", s.pubsubTopicID) + s.topic, err = s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID)) return err } // Stop closes the topic and unregisters its validator. -func (p *Subscriber[H]) Stop(context.Context) error { - err := p.pubsub.UnregisterTopicValidator(p.pubsubTopicID) +func (s *Subscriber[H]) Stop(context.Context) error { + err := s.pubsub.UnregisterTopicValidator(s.pubsubTopicID) if err != nil { log.Warnf("unregistering validator: %s", err) } - return p.topic.Close() + return s.topic.Close() } // SetVerifier set given verification func as Header PubSub topic validator // Does not punish peers if *header.VerifyError is given with Uncertain set to true. -func (p *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { +func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult { hdr := header.New[H]() err := hdr.UnmarshalBinary(msg.Data) @@ -88,24 +117,24 @@ func (p *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { return pubsub.ValidationReject } } - return p.pubsub.RegisterTopicValidator(p.pubsubTopicID, pval) + return s.pubsub.RegisterTopicValidator(s.pubsubTopicID, pval) } // Subscribe returns a new subscription to the Subscriber's // topic. -func (p *Subscriber[H]) Subscribe() (header.Subscription[H], error) { - if p.topic == nil { +func (s *Subscriber[H]) Subscribe() (header.Subscription[H], error) { + if s.topic == nil { return nil, fmt.Errorf("header topic is not instantiated, service must be started before subscribing") } - return newSubscription[H](p.topic) + return newSubscription[H](s.topic) } // Broadcast broadcasts the given Header to the topic. -func (p *Subscriber[H]) Broadcast(ctx context.Context, header H, opts ...pubsub.PubOpt) error { +func (s *Subscriber[H]) Broadcast(ctx context.Context, header H, opts ...pubsub.PubOpt) error { bin, err := header.MarshalBinary() if err != nil { return err } - return p.topic.Publish(ctx, bin, opts...) + return s.topic.Publish(ctx, bin, opts...) } diff --git a/p2p/subscription_test.go b/p2p/subscription_test.go index d3a783b9..963fa6d1 100644 --- a/p2p/subscription_test.go +++ b/p2p/subscription_test.go @@ -30,7 +30,7 @@ func TestSubscriber(t *testing.T) { require.NoError(t, err) // create sub-service lifecycles for header service 1 - p2pSub1 := NewSubscriber[*headertest.DummyHeader](pubsub1, pubsub.DefaultMsgIdFn, networkID) + p2pSub1 := NewSubscriber[*headertest.DummyHeader](pubsub1, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID)) err = p2pSub1.Start(context.Background()) require.NoError(t, err) @@ -40,7 +40,7 @@ func TestSubscriber(t *testing.T) { require.NoError(t, err) // create sub-service lifecycles for header service 2 - p2pSub2 := NewSubscriber[*headertest.DummyHeader](pubsub2, pubsub.DefaultMsgIdFn, networkID) + p2pSub2 := NewSubscriber[*headertest.DummyHeader](pubsub2, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID)) err = p2pSub2.Start(context.Background()) require.NoError(t, err) From 3c03a0221a2f34fdd02fbc9131eea6adef3de951 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 11 Oct 2023 14:48:23 +0200 Subject: [PATCH 2/3] fix breaking todo --- sync/options.go | 13 ++++++------- sync/sync.go | 2 +- 2 files changed, 7 insertions(+), 8 deletions(-) diff --git a/sync/options.go b/sync/options.go index 7264a3b5..6938dc34 100644 --- a/sync/options.go +++ b/sync/options.go @@ -5,10 +5,9 @@ import ( "time" ) -// Options is the functional option that is applied to the Syner instance +// Option is the functional option that is applied to the Syner instance // to configure its parameters. -// TODO(@Wondertan): rename to single Option in some breaking release -type Options func(*Parameters) +type Option func(*Parameters) // Parameters is the set of parameters that must be configured for the syncer. type Parameters struct { @@ -47,7 +46,7 @@ func (p *Parameters) Validate() error { // WithBlockTime is a functional option that configures the // `blockTime` parameter. -func WithBlockTime(duration time.Duration) Options { +func WithBlockTime(duration time.Duration) Option { return func(p *Parameters) { p.blockTime = duration } @@ -55,7 +54,7 @@ func WithBlockTime(duration time.Duration) Options { // WithRecencyThreshold is a functional option that configures the // `recencyThreshold` parameter. -func WithRecencyThreshold(threshold time.Duration) Options { +func WithRecencyThreshold(threshold time.Duration) Option { return func(p *Parameters) { p.recencyThreshold = threshold } @@ -63,14 +62,14 @@ func WithRecencyThreshold(threshold time.Duration) Options { // WithTrustingPeriod is a functional option that configures the // `TrustingPeriod` parameter. -func WithTrustingPeriod(duration time.Duration) Options { +func WithTrustingPeriod(duration time.Duration) Option { return func(p *Parameters) { p.TrustingPeriod = duration } } // WithParams is a functional option that overrides Parameters. -func WithParams(new Parameters) Options { +func WithParams(new Parameters) Option { return func(old *Parameters) { *old = new } diff --git a/sync/sync.go b/sync/sync.go index b0cfa705..9fb97f41 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -59,7 +59,7 @@ func NewSyncer[H header.Header[H]]( getter header.Getter[H], store header.Store[H], sub header.Subscriber[H], - opts ...Options, + opts ...Option, ) (*Syncer[H], error) { params := DefaultParameters() for _, opt := range opts { From 765fd28edff4c1a784f50485d89166c4de2b6dd4 Mon Sep 17 00:00:00 2001 From: Hlib Kanunnikov Date: Wed, 11 Oct 2023 15:06:58 +0200 Subject: [PATCH 3/3] Apply suggestions from code review Co-authored-by: Viacheslav --- p2p/helpers.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/p2p/helpers.go b/p2p/helpers.go index 1ad0d067..1ca110ad 100644 --- a/p2p/helpers.go +++ b/p2p/helpers.go @@ -18,7 +18,7 @@ import ( ) func protocolID(networkID string) protocol.ID { - base := protocol.ID("/header-ex/v0.0.3") + base := protocol.ID("header-ex/v0.0.3") if networkID != "" { base = protocol.ID(fmt.Sprintf("/%s/%s", networkID, base)) } @@ -26,7 +26,7 @@ func protocolID(networkID string) protocol.ID { } func PubsubTopicID(networkID string) string { - base := "/header-sub/v0.0.1" + base := "header-sub/v0.0.1" if networkID != "" { base = fmt.Sprintf("/%s/%s", networkID, base) }