From 11aaff54b817ea30fcc287a4b3955dc07da73403 Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 11 Oct 2023 15:28:25 +0200 Subject: [PATCH] feat(p2p): metrics for Subscriber --- p2p/subscriber.go | 43 ++++++++++++++----- p2p/subscriber_metrics.go | 90 +++++++++++++++++++++++++++++++++++++++ p2p/subscription.go | 6 ++- 3 files changed, 127 insertions(+), 12 deletions(-) create mode 100644 p2p/subscriber_metrics.go diff --git a/p2p/subscriber.go b/p2p/subscriber.go index ab1fb6ca..f1057d00 100644 --- a/p2p/subscriber.go +++ b/p2p/subscriber.go @@ -4,6 +4,7 @@ import ( "context" "errors" "fmt" + "time" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" @@ -26,9 +27,10 @@ type SubscriberParams struct { type Subscriber[H header.Header[H]] struct { pubsubTopicID string - pubsub *pubsub.PubSub - topic *pubsub.Topic - msgID pubsub.MsgIdFunction + metrics *subscriberMetrics + pubsub *pubsub.PubSub + topic *pubsub.Topic + msgID pubsub.MsgIdFunction } // WithSubscriberMetrics enables metrics collection for the Subscriber. @@ -57,7 +59,13 @@ func NewSubscriber[H header.Header[H]]( opt(¶ms) } + var metrics *subscriberMetrics + if params.metrics { + metrics = newSubscriberMetrics() + } + return &Subscriber[H]{ + metrics: metrics, pubsubTopicID: PubsubTopicID(params.networkID), pubsub: ps, msgID: msgID, @@ -85,6 +93,7 @@ func (s *Subscriber[H]) Stop(context.Context) error { // SetVerifier set given verification func as Header PubSub topic validator // Does not punish peers if *header.VerifyError is given with Uncertain set to true. func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { + var lastAccept time.Time pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult { hdr := header.New[H]() err := hdr.UnmarshalBinary(msg.Data) @@ -92,6 +101,7 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { log.Errorw("unmarshalling header", "from", p.ShortString(), "err", err) + s.metrics.observeReject(ctx) return pubsub.ValidationReject } // ensure header validity @@ -100,23 +110,34 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { log.Errorw("invalid header", "from", p.ShortString(), "err", err) + s.metrics.observeReject(ctx) return pubsub.ValidationReject } - // keep the valid header in the msg so Subscriptions can access it without - // additional unmarhalling - msg.ValidatorData = hdr var verErr *header.VerifyError err = val(ctx, hdr) switch { - case err == nil: - return pubsub.ValidationAccept - case errors.As(err, &verErr) && verErr.SoftFailure: - return pubsub.ValidationIgnore default: + s.metrics.observeReject(ctx) return pubsub.ValidationReject + case errors.As(err, &verErr) && verErr.SoftFailure: + s.metrics.observeIgnore(ctx) + return pubsub.ValidationIgnore + case err == nil: + } + + now := time.Now() + if !lastAccept.IsZero() { + s.metrics.observeAccept(ctx, now.Sub(lastAccept), len(msg.Data)) } + lastAccept = now + + // keep the valid header in the msg so Subscriptions can access it without + // additional unmarshalling + msg.ValidatorData = hdr + return pubsub.ValidationAccept } + return s.pubsub.RegisterTopicValidator(s.pubsubTopicID, pval) } @@ -127,7 +148,7 @@ func (s *Subscriber[H]) Subscribe() (header.Subscription[H], error) { return nil, fmt.Errorf("header topic is not instantiated, service must be started before subscribing") } - return newSubscription[H](s.topic) + return newSubscription[H](s.topic, s.metrics) } // Broadcast broadcasts the given Header to the topic. diff --git a/p2p/subscriber_metrics.go b/p2p/subscriber_metrics.go new file mode 100644 index 00000000..45dff4f9 --- /dev/null +++ b/p2p/subscriber_metrics.go @@ -0,0 +1,90 @@ +package p2p + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +type subscriberMetrics struct { + headerPropagationTime metric.Float64Histogram + headerReceivedNum metric.Int64Counter + subscriptionNum metric.Int64UpDownCounter +} + +func newSubscriberMetrics() *subscriberMetrics { + headerPropagationTime, err := meter.Float64Histogram( + "header_p2p_subscriber_propagation_time", + metric.WithDescription("header propagation time"), + ) + if err != nil { + panic(err) + } + headerReceivedNum, err := meter.Int64Counter( + "header_p2p_subscriber_received_num", + metric.WithDescription("header received from subscription counter"), + ) + if err != nil { + panic(err) + } + subscriptionNum, err := meter.Int64UpDownCounter( + "header_p2p_subscriber_subscription_num", + metric.WithDescription("number of active subscriptions"), + ) + if err != nil { + panic(err) + } + return &subscriberMetrics{ + headerPropagationTime: headerPropagationTime, + headerReceivedNum: headerReceivedNum, + subscriptionNum: subscriptionNum, + } +} + +func (m *subscriberMetrics) observeAccept(ctx context.Context, duration time.Duration, size int) { + m.observe(ctx, func(ctx context.Context) { + m.headerReceivedNum.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet( + attribute.String("status", "accept"), + attribute.Int64("size", int64(size)), + ))) + + m.headerPropagationTime.Record(ctx, duration.Seconds()) + }) +} + +func (m *subscriberMetrics) observeIgnore(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.headerReceivedNum.Add(ctx, 1, metric.WithAttributes( + attribute.String("status", "ignore"), + // TODO(@Wondertan): Do we wanna track reason string and sizes? + )) + }) +} + +func (m *subscriberMetrics) observeReject(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.headerReceivedNum.Add(ctx, 1, metric.WithAttributes( + attribute.String("status", "reject"), + // TODO(@Wondertan): Do we wanna track reason string and sizes? + )) + }) +} + +func (m *subscriberMetrics) observeSubscription(ctx context.Context, num int) { + m.observe(ctx, func(ctx context.Context) { + m.subscriptionNum.Add(ctx, int64(num)) + }) +} + +func (m *subscriberMetrics) observe(ctx context.Context, observeFn func(context.Context)) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + observeFn(ctx) +} diff --git a/p2p/subscription.go b/p2p/subscription.go index 1f3a608a..b79001c7 100644 --- a/p2p/subscription.go +++ b/p2p/subscription.go @@ -14,19 +14,22 @@ import ( type subscription[H header.Header[H]] struct { topic *pubsub.Topic subscription *pubsub.Subscription + metrics *subscriberMetrics } // newSubscription creates a new Header event subscription // on the given host. -func newSubscription[H header.Header[H]](topic *pubsub.Topic) (*subscription[H], error) { +func newSubscription[H header.Header[H]](topic *pubsub.Topic, metrics *subscriberMetrics) (*subscription[H], error) { sub, err := topic.Subscribe() if err != nil { return nil, err } + metrics.observeSubscription(context.TODO(), 1) return &subscription[H]{ topic: topic, subscription: sub, + metrics: metrics, }, nil } @@ -51,4 +54,5 @@ func (s *subscription[H]) NextHeader(ctx context.Context) (H, error) { // Cancel cancels the subscription to new Headers from the network. func (s *subscription[H]) Cancel() { s.subscription.Cancel() + s.metrics.observeSubscription(context.TODO(), -1) }