Skip to content

Commit

Permalink
feat(p2p)!: parameters for Subscriber (#118)
Browse files Browse the repository at this point in the history
* Adds params and options for Subscriber
* NetworkID was optional in other places besides Subscriber.
* Fixes issue with optional network id where it results in '//'
* Unifies optionality for metrics
	* Before we called InitMetrics on the component, while we could leverage existing options
	* Thus it's breaking. We wanted to break Getter interface anyway.
* Fixed old breaking TODO

Co-authored-by: Viacheslav <[email protected]>
  • Loading branch information
Wondertan and vgonkivs authored Oct 11, 2023
1 parent 30117cf commit df01474
Show file tree
Hide file tree
Showing 8 changed files with 95 additions and 45 deletions.
9 changes: 7 additions & 2 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,9 @@ type Exchange[H header.Header[H]] struct {

trustedPeers func() peer.IDSlice
peerTracker *peerTracker
metrics *metrics

Params ClientParameters

metrics *metrics
}

func NewExchange[H header.Header[H]](
Expand All @@ -64,11 +63,17 @@ func NewExchange[H header.Header[H]](
return nil, err
}

var metrics *metrics
if params.metrics {
metrics = newExchangeMetrics()
}

ex := &Exchange[H]{
host: host,
protocolID: protocolID(params.networkID),
peerTracker: newPeerTracker(host, gater, params.pidstore),
Params: params,
metrics: metrics,
}

ex.trustedPeers = func() peer.IDSlice {
Expand Down
12 changes: 10 additions & 2 deletions p2p/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@ import (
)

func protocolID(networkID string) protocol.ID {
return protocol.ID(fmt.Sprintf("/%s/header-ex/v0.0.3", networkID))
base := protocol.ID("header-ex/v0.0.3")
if networkID != "" {
base = protocol.ID(fmt.Sprintf("/%s/%s", networkID, base))
}
return base
}

func PubsubTopicID(networkID string) string {
return fmt.Sprintf("/%s/header-sub/v0.0.1", networkID)
base := "header-sub/v0.0.1"
if networkID != "" {
base = fmt.Sprintf("/%s/%s", networkID, base)
}
return base
}

func validateChainID(want, have string) error {
Expand Down
21 changes: 8 additions & 13 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,32 @@ import (
"go.opentelemetry.io/otel/metric"
)

var meter = otel.Meter("header/p2p")

type metrics struct {
responseSize metric.Float64Histogram
responseDuration metric.Float64Histogram
}

var (
meter = otel.Meter("header/p2p")
)

func (ex *Exchange[H]) InitMetrics() error {
func newExchangeMetrics() *metrics {
responseSize, err := meter.Float64Histogram(
"header_p2p_headers_response_size",
"header_p2p_exchange_response_size",
metric.WithDescription("Size of get headers response in bytes"),
)
if err != nil {
return err
panic(err)
}

responseDuration, err := meter.Float64Histogram(
"header_p2p_headers_request_duration",
"header_p2p_exchange_request_duration",
metric.WithDescription("Duration of get headers request in seconds"),
)
if err != nil {
return err
panic(err)
}

ex.metrics = &metrics{
return &metrics{
responseSize: responseSize,
responseDuration: responseDuration,
}
return nil
}

func (m *metrics) observeResponse(ctx context.Context, size uint64, duration uint64, err error) {
Expand Down
20 changes: 17 additions & 3 deletions p2p/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type ServerParameters struct {
// networkID is a network that will be used to create a protocol.ID
// Is empty by default
networkID string
// metrics is a flag that enables metrics collection.
metrics bool
}

// DefaultServerParameters returns the default params to configure the store.
Expand All @@ -53,6 +55,17 @@ func (p *ServerParameters) Validate() error {
return nil
}

func WithMetrics[T parameters]() Option[T] {
return func(p *T) {
switch t := any(p).(type) { //nolint:gocritic
case *ServerParameters:
t.metrics = true
case *ClientParameters:
t.metrics = true
}
}
}

// WithWriteDeadline is a functional option that configures the
// `WriteDeadline` parameter.
func WithWriteDeadline[T ServerParameters](deadline time.Duration) Option[T] {
Expand Down Expand Up @@ -119,7 +132,9 @@ type ClientParameters struct {
networkID string
// chainID is an identifier of the chain.
chainID string

// metrics is a flag that enables metrics collection.
metrics bool
// pidstore is an optional interface used to periodically dump peers
pidstore PeerIDStore
}

Expand Down Expand Up @@ -149,14 +164,13 @@ func (p *ClientParameters) Validate() error {
}

// WithMaxHeadersPerRangeRequest is a functional option that configures the
// // `MaxRangeRequestSize` parameter.
// `MaxRangeRequestSize` parameter.
func WithMaxHeadersPerRangeRequest[T ClientParameters](amount uint64) Option[T] {
return func(p *T) {
switch t := any(p).(type) { //nolint:gocritic
case *ClientParameters:
t.MaxHeadersPerRangeRequest = amount
}

}
}

Expand Down
59 changes: 44 additions & 15 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import (
"github.com/celestiaorg/go-header"
)

// SubscriberOption is a functional option for the Subscriber.
type SubscriberOption func(*SubscriberParams)

// SubscriberParams defines the parameters for the Subscriber
// configurable with SubscriberOption.
type SubscriberParams struct {
networkID string
metrics bool
}

// Subscriber manages the lifecycle and relationship of header Module
// with the "header-sub" gossipsub topic.
type Subscriber[H header.Header[H]] struct {
Expand All @@ -21,41 +31,60 @@ type Subscriber[H header.Header[H]] struct {
msgID pubsub.MsgIdFunction
}

// WithSubscriberMetrics enables metrics collection for the Subscriber.
func WithSubscriberMetrics() SubscriberOption {
return func(params *SubscriberParams) {
params.metrics = true
}
}

// WithSubscriberNetworkID sets the network ID for the Subscriber.
func WithSubscriberNetworkID(networkID string) SubscriberOption {
return func(params *SubscriberParams) {
params.networkID = networkID
}
}

// NewSubscriber returns a Subscriber that manages the header Module's
// relationship with the "header-sub" gossipsub topic.
func NewSubscriber[H header.Header[H]](
ps *pubsub.PubSub,
msgID pubsub.MsgIdFunction,
networkID string,
opts ...SubscriberOption,
) *Subscriber[H] {
var params SubscriberParams
for _, opt := range opts {
opt(&params)
}

return &Subscriber[H]{
pubsubTopicID: PubsubTopicID(networkID),
pubsubTopicID: PubsubTopicID(params.networkID),
pubsub: ps,
msgID: msgID,
}
}

// Start starts the Subscriber, registering a topic validator for the "header-sub"
// topic and joining it.
func (p *Subscriber[H]) Start(context.Context) (err error) {
log.Infow("joining topic", "topic ID", p.pubsubTopicID)
p.topic, err = p.pubsub.Join(p.pubsubTopicID, pubsub.WithTopicMessageIdFn(p.msgID))
func (s *Subscriber[H]) Start(context.Context) (err error) {
log.Infow("joining topic", "topic ID", s.pubsubTopicID)
s.topic, err = s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID))
return err
}

// Stop closes the topic and unregisters its validator.
func (p *Subscriber[H]) Stop(context.Context) error {
err := p.pubsub.UnregisterTopicValidator(p.pubsubTopicID)
func (s *Subscriber[H]) Stop(context.Context) error {
err := s.pubsub.UnregisterTopicValidator(s.pubsubTopicID)
if err != nil {
log.Warnf("unregistering validator: %s", err)
}

return p.topic.Close()
return s.topic.Close()
}

// SetVerifier set given verification func as Header PubSub topic validator
// Does not punish peers if *header.VerifyError is given with Uncertain set to true.
func (p *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
hdr := header.New[H]()
err := hdr.UnmarshalBinary(msg.Data)
Expand Down Expand Up @@ -88,24 +117,24 @@ func (p *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
return pubsub.ValidationReject
}
}
return p.pubsub.RegisterTopicValidator(p.pubsubTopicID, pval)
return s.pubsub.RegisterTopicValidator(s.pubsubTopicID, pval)
}

// Subscribe returns a new subscription to the Subscriber's
// topic.
func (p *Subscriber[H]) Subscribe() (header.Subscription[H], error) {
if p.topic == nil {
func (s *Subscriber[H]) Subscribe() (header.Subscription[H], error) {
if s.topic == nil {
return nil, fmt.Errorf("header topic is not instantiated, service must be started before subscribing")
}

return newSubscription[H](p.topic)
return newSubscription[H](s.topic)
}

// Broadcast broadcasts the given Header to the topic.
func (p *Subscriber[H]) Broadcast(ctx context.Context, header H, opts ...pubsub.PubOpt) error {
func (s *Subscriber[H]) Broadcast(ctx context.Context, header H, opts ...pubsub.PubOpt) error {
bin, err := header.MarshalBinary()
if err != nil {
return err
}
return p.topic.Publish(ctx, bin, opts...)
return s.topic.Publish(ctx, bin, opts...)
}
4 changes: 2 additions & 2 deletions p2p/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSubscriber(t *testing.T) {
require.NoError(t, err)

// create sub-service lifecycles for header service 1
p2pSub1 := NewSubscriber[*headertest.DummyHeader](pubsub1, pubsub.DefaultMsgIdFn, networkID)
p2pSub1 := NewSubscriber[*headertest.DummyHeader](pubsub1, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID))
err = p2pSub1.Start(context.Background())
require.NoError(t, err)

Expand All @@ -40,7 +40,7 @@ func TestSubscriber(t *testing.T) {
require.NoError(t, err)

// create sub-service lifecycles for header service 2
p2pSub2 := NewSubscriber[*headertest.DummyHeader](pubsub2, pubsub.DefaultMsgIdFn, networkID)
p2pSub2 := NewSubscriber[*headertest.DummyHeader](pubsub2, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID))
err = p2pSub2.Start(context.Background())
require.NoError(t, err)

Expand Down
13 changes: 6 additions & 7 deletions sync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"time"
)

// Options is the functional option that is applied to the Syner instance
// Option is the functional option that is applied to the Syner instance
// to configure its parameters.
// TODO(@Wondertan): rename to single Option in some breaking release
type Options func(*Parameters)
type Option func(*Parameters)

// Parameters is the set of parameters that must be configured for the syncer.
type Parameters struct {
Expand Down Expand Up @@ -47,30 +46,30 @@ func (p *Parameters) Validate() error {

// WithBlockTime is a functional option that configures the
// `blockTime` parameter.
func WithBlockTime(duration time.Duration) Options {
func WithBlockTime(duration time.Duration) Option {
return func(p *Parameters) {
p.blockTime = duration
}
}

// WithRecencyThreshold is a functional option that configures the
// `recencyThreshold` parameter.
func WithRecencyThreshold(threshold time.Duration) Options {
func WithRecencyThreshold(threshold time.Duration) Option {
return func(p *Parameters) {
p.recencyThreshold = threshold
}
}

// WithTrustingPeriod is a functional option that configures the
// `TrustingPeriod` parameter.
func WithTrustingPeriod(duration time.Duration) Options {
func WithTrustingPeriod(duration time.Duration) Option {
return func(p *Parameters) {
p.TrustingPeriod = duration
}
}

// WithParams is a functional option that overrides Parameters.
func WithParams(new Parameters) Options {
func WithParams(new Parameters) Option {
return func(old *Parameters) {
*old = new
}
Expand Down
2 changes: 1 addition & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewSyncer[H header.Header[H]](
getter header.Getter[H],
store header.Store[H],
sub header.Subscriber[H],
opts ...Options,
opts ...Option,
) (*Syncer[H], error) {
params := DefaultParameters()
for _, opt := range opts {
Expand Down

0 comments on commit df01474

Please sign in to comment.