Skip to content

Commit

Permalink
feat(p2p): metrics for Subscriber
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Oct 11, 2023
1 parent df01474 commit 11aaff5
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 12 deletions.
43 changes: 32 additions & 11 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -26,9 +27,10 @@ type SubscriberParams struct {
type Subscriber[H header.Header[H]] struct {
pubsubTopicID string

pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction
metrics *subscriberMetrics
pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction
}

// WithSubscriberMetrics enables metrics collection for the Subscriber.
Expand Down Expand Up @@ -57,7 +59,13 @@ func NewSubscriber[H header.Header[H]](
opt(&params)
}

var metrics *subscriberMetrics
if params.metrics {
metrics = newSubscriberMetrics()
}

return &Subscriber[H]{
metrics: metrics,
pubsubTopicID: PubsubTopicID(params.networkID),
pubsub: ps,
msgID: msgID,
Expand Down Expand Up @@ -85,13 +93,15 @@ func (s *Subscriber[H]) Stop(context.Context) error {
// SetVerifier set given verification func as Header PubSub topic validator
// Does not punish peers if *header.VerifyError is given with Uncertain set to true.
func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
var lastAccept time.Time
pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
hdr := header.New[H]()
err := hdr.UnmarshalBinary(msg.Data)
if err != nil {
log.Errorw("unmarshalling header",
"from", p.ShortString(),
"err", err)
s.metrics.observeReject(ctx)
return pubsub.ValidationReject
}
// ensure header validity
Expand All @@ -100,23 +110,34 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
log.Errorw("invalid header",
"from", p.ShortString(),
"err", err)
s.metrics.observeReject(ctx)
return pubsub.ValidationReject
}
// keep the valid header in the msg so Subscriptions can access it without
// additional unmarhalling
msg.ValidatorData = hdr

var verErr *header.VerifyError
err = val(ctx, hdr)
switch {
case err == nil:
return pubsub.ValidationAccept
case errors.As(err, &verErr) && verErr.SoftFailure:
return pubsub.ValidationIgnore
default:
s.metrics.observeReject(ctx)
return pubsub.ValidationReject
case errors.As(err, &verErr) && verErr.SoftFailure:
s.metrics.observeIgnore(ctx)
return pubsub.ValidationIgnore
case err == nil:
}

now := time.Now()
if !lastAccept.IsZero() {
s.metrics.observeAccept(ctx, now.Sub(lastAccept), len(msg.Data))
}
lastAccept = now

// keep the valid header in the msg so Subscriptions can access it without
// additional unmarshalling
msg.ValidatorData = hdr
return pubsub.ValidationAccept
}

return s.pubsub.RegisterTopicValidator(s.pubsubTopicID, pval)
}

Expand All @@ -127,7 +148,7 @@ func (s *Subscriber[H]) Subscribe() (header.Subscription[H], error) {
return nil, fmt.Errorf("header topic is not instantiated, service must be started before subscribing")
}

return newSubscription[H](s.topic)
return newSubscription[H](s.topic, s.metrics)
}

// Broadcast broadcasts the given Header to the topic.
Expand Down
90 changes: 90 additions & 0 deletions p2p/subscriber_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package p2p

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

type subscriberMetrics struct {
headerPropagationTime metric.Float64Histogram
headerReceivedNum metric.Int64Counter
subscriptionNum metric.Int64UpDownCounter
}

func newSubscriberMetrics() *subscriberMetrics {
headerPropagationTime, err := meter.Float64Histogram(
"header_p2p_subscriber_propagation_time",
metric.WithDescription("header propagation time"),
)
if err != nil {
panic(err)
}
headerReceivedNum, err := meter.Int64Counter(
"header_p2p_subscriber_received_num",
metric.WithDescription("header received from subscription counter"),
)
if err != nil {
panic(err)
}
subscriptionNum, err := meter.Int64UpDownCounter(
"header_p2p_subscriber_subscription_num",
metric.WithDescription("number of active subscriptions"),
)
if err != nil {
panic(err)
}
return &subscriberMetrics{
headerPropagationTime: headerPropagationTime,
headerReceivedNum: headerReceivedNum,
subscriptionNum: subscriptionNum,
}
}

func (m *subscriberMetrics) observeAccept(ctx context.Context, duration time.Duration, size int) {
m.observe(ctx, func(ctx context.Context) {
m.headerReceivedNum.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
attribute.String("status", "accept"),
attribute.Int64("size", int64(size)),
)))

m.headerPropagationTime.Record(ctx, duration.Seconds())
})
}

func (m *subscriberMetrics) observeIgnore(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.headerReceivedNum.Add(ctx, 1, metric.WithAttributes(
attribute.String("status", "ignore"),
// TODO(@Wondertan): Do we wanna track reason string and sizes?
))
})
}

func (m *subscriberMetrics) observeReject(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.headerReceivedNum.Add(ctx, 1, metric.WithAttributes(
attribute.String("status", "reject"),
// TODO(@Wondertan): Do we wanna track reason string and sizes?
))
})
}

func (m *subscriberMetrics) observeSubscription(ctx context.Context, num int) {
m.observe(ctx, func(ctx context.Context) {
m.subscriptionNum.Add(ctx, int64(num))
})
}

func (m *subscriberMetrics) observe(ctx context.Context, observeFn func(context.Context)) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

observeFn(ctx)
}
6 changes: 5 additions & 1 deletion p2p/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ import (
type subscription[H header.Header[H]] struct {
topic *pubsub.Topic
subscription *pubsub.Subscription
metrics *subscriberMetrics
}

// newSubscription creates a new Header event subscription
// on the given host.
func newSubscription[H header.Header[H]](topic *pubsub.Topic) (*subscription[H], error) {
func newSubscription[H header.Header[H]](topic *pubsub.Topic, metrics *subscriberMetrics) (*subscription[H], error) {
sub, err := topic.Subscribe()
if err != nil {
return nil, err
}
metrics.observeSubscription(context.TODO(), 1)

return &subscription[H]{
topic: topic,
subscription: sub,
metrics: metrics,
}, nil
}

Expand All @@ -51,4 +54,5 @@ func (s *subscription[H]) NextHeader(ctx context.Context) (H, error) {
// Cancel cancels the subscription to new Headers from the network.
func (s *subscription[H]) Cancel() {
s.subscription.Cancel()
s.metrics.observeSubscription(context.TODO(), -1)
}

0 comments on commit 11aaff5

Please sign in to comment.