Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p)!: parameters for Subscriber #118

Merged
merged 3 commits into from
Oct 11, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,9 @@ type Exchange[H header.Header[H]] struct {

trustedPeers func() peer.IDSlice
peerTracker *peerTracker
metrics *metrics

Params ClientParameters

metrics *metrics
}

func NewExchange[H header.Header[H]](
Expand All @@ -63,11 +62,17 @@ func NewExchange[H header.Header[H]](
return nil, err
}

var metrics *metrics
if params.metrics {
metrics = newExchangeMetrics()
}

ex := &Exchange[H]{
host: host,
protocolID: protocolID(params.networkID),
peerTracker: newPeerTracker(host, gater, params.pidstore),
Params: params,
metrics: metrics,
}

ex.trustedPeers = func() peer.IDSlice {
Expand Down
12 changes: 10 additions & 2 deletions p2p/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,11 +18,19 @@ import (
)

func protocolID(networkID string) protocol.ID {
return protocol.ID(fmt.Sprintf("/%s/header-ex/v0.0.3", networkID))
base := protocol.ID("header-ex/v0.0.3")
if networkID != "" {
base = protocol.ID(fmt.Sprintf("/%s/%s", networkID, base))
}
return base
}

func PubsubTopicID(networkID string) string {
return fmt.Sprintf("/%s/header-sub/v0.0.1", networkID)
base := "header-sub/v0.0.1"
if networkID != "" {
base = fmt.Sprintf("/%s/%s", networkID, base)
}
return base
}

func validateChainID(want, have string) error {
Expand Down
21 changes: 8 additions & 13 deletions p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,37 +8,32 @@ import (
"go.opentelemetry.io/otel/metric"
)

var meter = otel.Meter("header/p2p")

type metrics struct {
responseSize metric.Float64Histogram
responseDuration metric.Float64Histogram
}

var (
meter = otel.Meter("header/p2p")
)

func (ex *Exchange[H]) InitMetrics() error {
func newExchangeMetrics() *metrics {
responseSize, err := meter.Float64Histogram(
"header_p2p_headers_response_size",
"header_p2p_exchange_response_size",
metric.WithDescription("Size of get headers response in bytes"),
)
if err != nil {
return err
panic(err)
}

responseDuration, err := meter.Float64Histogram(
"header_p2p_headers_request_duration",
"header_p2p_exchange_request_duration",
metric.WithDescription("Duration of get headers request in seconds"),
)
if err != nil {
return err
panic(err)
}

ex.metrics = &metrics{
return &metrics{
responseSize: responseSize,
responseDuration: responseDuration,
}
return nil
}

func (m *metrics) observeResponse(ctx context.Context, size uint64, duration uint64, err error) {
Expand Down
20 changes: 17 additions & 3 deletions p2p/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,8 @@ type ServerParameters struct {
// networkID is a network that will be used to create a protocol.ID
// Is empty by default
networkID string
// metrics is a flag that enables metrics collection.
metrics bool
}

// DefaultServerParameters returns the default params to configure the store.
Expand All @@ -53,6 +55,17 @@ func (p *ServerParameters) Validate() error {
return nil
}

func WithMetrics[T parameters]() Option[T] {
return func(p *T) {
switch t := any(p).(type) { //nolint:gocritic
case *ServerParameters:
t.metrics = true
case *ClientParameters:
t.metrics = true
}
}
}

// WithWriteDeadline is a functional option that configures the
// `WriteDeadline` parameter.
func WithWriteDeadline[T ServerParameters](deadline time.Duration) Option[T] {
Expand Down Expand Up @@ -119,7 +132,9 @@ type ClientParameters struct {
networkID string
// chainID is an identifier of the chain.
chainID string

// metrics is a flag that enables metrics collection.
metrics bool
// pidstore is an optional interface used to periodically dump peers
pidstore PeerIDStore
}

Expand Down Expand Up @@ -149,14 +164,13 @@ func (p *ClientParameters) Validate() error {
}

// WithMaxHeadersPerRangeRequest is a functional option that configures the
// // `MaxRangeRequestSize` parameter.
// `MaxRangeRequestSize` parameter.
func WithMaxHeadersPerRangeRequest[T ClientParameters](amount uint64) Option[T] {
return func(p *T) {
switch t := any(p).(type) { //nolint:gocritic
case *ClientParameters:
t.MaxHeadersPerRangeRequest = amount
}

}
}

Expand Down
59 changes: 44 additions & 15 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,16 @@ import (
"github.com/celestiaorg/go-header"
)

// SubscriberOption is a functional option for the Subscriber.
type SubscriberOption func(*SubscriberParams)

// SubscriberParams defines the parameters for the Subscriber
// configurable with SubscriberOption.
type SubscriberParams struct {
networkID string
metrics bool
}

// Subscriber manages the lifecycle and relationship of header Module
// with the "header-sub" gossipsub topic.
type Subscriber[H header.Header[H]] struct {
Expand All @@ -21,41 +31,60 @@ type Subscriber[H header.Header[H]] struct {
msgID pubsub.MsgIdFunction
}

// WithSubscriberMetrics enables metrics collection for the Subscriber.
func WithSubscriberMetrics() SubscriberOption {
return func(params *SubscriberParams) {
params.metrics = true
}
}

// WithSubscriberNetworkID sets the network ID for the Subscriber.
func WithSubscriberNetworkID(networkID string) SubscriberOption {
return func(params *SubscriberParams) {
params.networkID = networkID
}
}

// NewSubscriber returns a Subscriber that manages the header Module's
// relationship with the "header-sub" gossipsub topic.
func NewSubscriber[H header.Header[H]](
ps *pubsub.PubSub,
msgID pubsub.MsgIdFunction,
networkID string,
opts ...SubscriberOption,
) *Subscriber[H] {
var params SubscriberParams
for _, opt := range opts {
opt(&params)
}

return &Subscriber[H]{
pubsubTopicID: PubsubTopicID(networkID),
pubsubTopicID: PubsubTopicID(params.networkID),
pubsub: ps,
msgID: msgID,
}
}

// Start starts the Subscriber, registering a topic validator for the "header-sub"
// topic and joining it.
func (p *Subscriber[H]) Start(context.Context) (err error) {
log.Infow("joining topic", "topic ID", p.pubsubTopicID)
p.topic, err = p.pubsub.Join(p.pubsubTopicID, pubsub.WithTopicMessageIdFn(p.msgID))
func (s *Subscriber[H]) Start(context.Context) (err error) {
log.Infow("joining topic", "topic ID", s.pubsubTopicID)
s.topic, err = s.pubsub.Join(s.pubsubTopicID, pubsub.WithTopicMessageIdFn(s.msgID))
return err
}

// Stop closes the topic and unregisters its validator.
func (p *Subscriber[H]) Stop(context.Context) error {
err := p.pubsub.UnregisterTopicValidator(p.pubsubTopicID)
func (s *Subscriber[H]) Stop(context.Context) error {
err := s.pubsub.UnregisterTopicValidator(s.pubsubTopicID)
if err != nil {
log.Warnf("unregistering validator: %s", err)
}

return p.topic.Close()
return s.topic.Close()
}

// SetVerifier set given verification func as Header PubSub topic validator
// Does not punish peers if *header.VerifyError is given with Uncertain set to true.
func (p *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
hdr := header.New[H]()
err := hdr.UnmarshalBinary(msg.Data)
Expand Down Expand Up @@ -88,24 +117,24 @@ func (p *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
return pubsub.ValidationReject
}
}
return p.pubsub.RegisterTopicValidator(p.pubsubTopicID, pval)
return s.pubsub.RegisterTopicValidator(s.pubsubTopicID, pval)
}

// Subscribe returns a new subscription to the Subscriber's
// topic.
func (p *Subscriber[H]) Subscribe() (header.Subscription[H], error) {
if p.topic == nil {
func (s *Subscriber[H]) Subscribe() (header.Subscription[H], error) {
if s.topic == nil {
return nil, fmt.Errorf("header topic is not instantiated, service must be started before subscribing")
}

return newSubscription[H](p.topic)
return newSubscription[H](s.topic)
}

// Broadcast broadcasts the given Header to the topic.
func (p *Subscriber[H]) Broadcast(ctx context.Context, header H, opts ...pubsub.PubOpt) error {
func (s *Subscriber[H]) Broadcast(ctx context.Context, header H, opts ...pubsub.PubOpt) error {
bin, err := header.MarshalBinary()
if err != nil {
return err
}
return p.topic.Publish(ctx, bin, opts...)
return s.topic.Publish(ctx, bin, opts...)
}
4 changes: 2 additions & 2 deletions p2p/subscription_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func TestSubscriber(t *testing.T) {
require.NoError(t, err)

// create sub-service lifecycles for header service 1
p2pSub1 := NewSubscriber[*headertest.DummyHeader](pubsub1, pubsub.DefaultMsgIdFn, networkID)
p2pSub1 := NewSubscriber[*headertest.DummyHeader](pubsub1, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID))
err = p2pSub1.Start(context.Background())
require.NoError(t, err)

Expand All @@ -40,7 +40,7 @@ func TestSubscriber(t *testing.T) {
require.NoError(t, err)

// create sub-service lifecycles for header service 2
p2pSub2 := NewSubscriber[*headertest.DummyHeader](pubsub2, pubsub.DefaultMsgIdFn, networkID)
p2pSub2 := NewSubscriber[*headertest.DummyHeader](pubsub2, pubsub.DefaultMsgIdFn, WithSubscriberNetworkID(networkID))
err = p2pSub2.Start(context.Background())
require.NoError(t, err)

Expand Down
13 changes: 6 additions & 7 deletions sync/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,9 @@ import (
"time"
)

// Options is the functional option that is applied to the Syner instance
// Option is the functional option that is applied to the Syner instance
// to configure its parameters.
// TODO(@Wondertan): rename to single Option in some breaking release
type Options func(*Parameters)
type Option func(*Parameters)

// Parameters is the set of parameters that must be configured for the syncer.
type Parameters struct {
Expand Down Expand Up @@ -47,30 +46,30 @@ func (p *Parameters) Validate() error {

// WithBlockTime is a functional option that configures the
// `blockTime` parameter.
func WithBlockTime(duration time.Duration) Options {
func WithBlockTime(duration time.Duration) Option {
return func(p *Parameters) {
p.blockTime = duration
}
}

// WithRecencyThreshold is a functional option that configures the
// `recencyThreshold` parameter.
func WithRecencyThreshold(threshold time.Duration) Options {
func WithRecencyThreshold(threshold time.Duration) Option {
return func(p *Parameters) {
p.recencyThreshold = threshold
}
}

// WithTrustingPeriod is a functional option that configures the
// `TrustingPeriod` parameter.
func WithTrustingPeriod(duration time.Duration) Options {
func WithTrustingPeriod(duration time.Duration) Option {
return func(p *Parameters) {
p.TrustingPeriod = duration
}
}

// WithParams is a functional option that overrides Parameters.
func WithParams(new Parameters) Options {
func WithParams(new Parameters) Option {
return func(old *Parameters) {
*old = new
}
Expand Down
2 changes: 1 addition & 1 deletion sync/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ func NewSyncer[H header.Header[H]](
getter header.Getter[H],
store header.Store[H],
sub header.Subscriber[H],
opts ...Options,
opts ...Option,
) (*Syncer[H], error) {
params := DefaultParameters()
for _, opt := range opts {
Expand Down