From 71bea10236753720d3da6e74feb9fc6852a1075c Mon Sep 17 00:00:00 2001 From: Wondertan Date: Wed, 18 Oct 2023 15:43:56 +0200 Subject: [PATCH] address all the feedback and apply new knowledge about metrics --- p2p/subscriber.go | 27 +++++---- p2p/subscriber_metrics.go | 115 ++++++++++++++++++++++++-------------- p2p/subscription.go | 4 +- 3 files changed, 89 insertions(+), 57 deletions(-) diff --git a/p2p/subscriber.go b/p2p/subscriber.go index 07cf87e5..c5f70dcc 100644 --- a/p2p/subscriber.go +++ b/p2p/subscriber.go @@ -4,7 +4,6 @@ import ( "context" "errors" "fmt" - "time" pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/peer" @@ -61,7 +60,11 @@ func NewSubscriber[H header.Header[H]]( var metrics *subscriberMetrics if params.metrics { - metrics = newSubscriberMetrics() + var err error + metrics, err = newSubscriberMetrics() + if err != nil { + return nil, err + } } return &Subscriber[H]{ @@ -87,13 +90,14 @@ func (s *Subscriber[H]) Stop(context.Context) error { log.Warnf("unregistering validator: %s", err) } - return s.topic.Close() + err = errors.Join(err, s.topic.Close()) + err = errors.Join(err, s.metrics.Close()) + return err } // SetVerifier set given verification func as Header PubSub topic validator // Does not punish peers if *header.VerifyError is given with Uncertain set to true. func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { - var lastAccept time.Time pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult { hdr := header.New[H]() err := hdr.UnmarshalBinary(msg.Data) @@ -101,7 +105,7 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { log.Errorw("unmarshalling header", "from", p.ShortString(), "err", err) - s.metrics.reject(ctx, err) + s.metrics.reject(ctx) return pubsub.ValidationReject } // ensure header validity @@ -110,7 +114,7 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { log.Errorw("invalid header", "from", p.ShortString(), "err", err) - s.metrics.reject(ctx, err) + s.metrics.reject(ctx) return pubsub.ValidationReject } @@ -118,23 +122,18 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error { err = val(ctx, hdr) switch { case errors.As(err, &verErr) && verErr.SoftFailure: - s.metrics.ignore(ctx, len(msg.Data), err) + s.metrics.ignore(ctx) return pubsub.ValidationIgnore case err != nil: - s.metrics.reject(ctx, err) + s.metrics.reject(ctx) return pubsub.ValidationReject default: } - now := time.Now() - if !lastAccept.IsZero() { - s.metrics.accept(ctx, now.Sub(lastAccept), len(msg.Data)) - } - lastAccept = now - // keep the valid header in the msg so Subscriptions can access it without // additional unmarshalling msg.ValidatorData = hdr + s.metrics.accept(ctx, len(msg.Data)) return pubsub.ValidationAccept } diff --git a/p2p/subscriber_metrics.go b/p2p/subscriber_metrics.go index 61b3ff8f..bbbfdbac 100644 --- a/p2p/subscriber_metrics.go +++ b/p2p/subscriber_metrics.go @@ -2,6 +2,8 @@ package p2p import ( "context" + "sync" + "sync/atomic" "time" "go.opentelemetry.io/otel/attribute" @@ -9,82 +11,106 @@ import ( ) const ( - statusKey = "status" - sizeKey = "size" - reasonKey = "reason" + statusKey = "status" + statusAccept = "accept" + statusIgnore = "ignore" + statusReject = "reject" ) type subscriberMetrics struct { - headerPropagationTime metric.Float64Histogram - headerReceivedNum metric.Int64Counter - subscriptionNum metric.Int64UpDownCounter + messageNumInst metric.Int64Counter + messageSizeInst metric.Int64Histogram + + messageTimeLastMu sync.Mutex + messageTimeLast time.Time + messageTimeInst metric.Float64Histogram + + subscriptionNum atomic.Int64 + subscriptionNumInst metric.Int64ObservableGauge + subscriptionNumReg metric.Registration } -func newSubscriberMetrics() *subscriberMetrics { - headerPropagationTime, err := meter.Float64Histogram( - "header_p2p_subscriber_propagation_time", - metric.WithDescription("header propagation time"), +func newSubscriberMetrics() (m *subscriberMetrics, err error) { + m = new(subscriberMetrics) + m.messageNumInst, err = meter.Int64Counter( + "hdr_p2p_sub_msg_num", + metric.WithDescription("header message count"), ) if err != nil { - panic(err) + return nil, err } - headerReceivedNum, err := meter.Int64Counter( - "header_p2p_subscriber_received_num", - metric.WithDescription("header received from subscription counter"), + m.messageSizeInst, err = meter.Int64Histogram( + "hdr_p2p_sub_msg_size", + metric.WithDescription("valid header message size"), ) if err != nil { - panic(err) + return nil, err } - subscriptionNum, err := meter.Int64UpDownCounter( - "header_p2p_subscriber_subscription_num", - metric.WithDescription("number of active subscriptions"), + m.messageTimeInst, err = meter.Float64Histogram( + "hdr_p2p_sub_msg_time", + metric.WithDescription("valid header message propagation time"), ) if err != nil { - panic(err) + return nil, err } - return &subscriberMetrics{ - headerPropagationTime: headerPropagationTime, - headerReceivedNum: headerReceivedNum, - subscriptionNum: subscriptionNum, + m.subscriptionNumInst, err = meter.Int64ObservableGauge( + "hdr_p2p_sub_num", + metric.WithDescription("number of active header message subscriptions"), + ) + if err != nil { + return nil, err } + m.subscriptionNumReg, err = meter.RegisterCallback(m.subscriptionCallback, m.subscriptionNumInst) + if err != nil { + return nil, err + } + return m, nil } -func (m *subscriberMetrics) accept(ctx context.Context, duration time.Duration, size int) { +func (m *subscriberMetrics) accept(ctx context.Context, size int) { m.observe(ctx, func(ctx context.Context) { - m.headerReceivedNum.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet( - attribute.String(statusKey, "accept"), - attribute.Int64(sizeKey, int64(size)), - ))) + m.messageNumInst.Add(ctx, 1, metric.WithAttributes( + attribute.String(statusKey, statusAccept), + )) + m.messageSizeInst.Record(ctx, int64(size)) - m.headerPropagationTime.Record(ctx, duration.Seconds()) + now := time.Now() + m.messageTimeLastMu.Lock() + if !m.messageTimeLast.IsZero() { + m.messageTimeInst.Record(ctx, now.Sub(m.messageTimeLast).Seconds()) + } + m.messageTimeLast = now + m.messageTimeLastMu.Unlock() }) } -func (m *subscriberMetrics) ignore(ctx context.Context, size int, reason error) { +func (m *subscriberMetrics) ignore(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { - m.headerReceivedNum.Add(ctx, 1, metric.WithAttributes( - attribute.String(statusKey, "ignore"), - attribute.Int64(sizeKey, int64(size)), - attribute.String(reasonKey, reason.Error()), + m.messageNumInst.Add(ctx, 1, metric.WithAttributes( + attribute.String(statusKey, statusIgnore), )) }) } -func (m *subscriberMetrics) reject(ctx context.Context, reason error) { +func (m *subscriberMetrics) reject(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { - m.headerReceivedNum.Add(ctx, 1, metric.WithAttributes( - attribute.String(statusKey, "reject"), - attribute.String(reasonKey, reason.Error()), + m.messageNumInst.Add(ctx, 1, metric.WithAttributes( + attribute.String(statusKey, statusReject), )) }) } -func (m *subscriberMetrics) subscription(ctx context.Context, num int) { - m.observe(ctx, func(ctx context.Context) { - m.subscriptionNum.Add(ctx, int64(num)) +func (m *subscriberMetrics) subscription(num int) { + m.observe(context.Background(), func(ctx context.Context) { + m.subscriptionNum.Add(int64(num)) }) } +func (m *subscriberMetrics) subscriptionCallback(_ context.Context, obs metric.Observer) error { + obs.ObserveInt64(m.subscriptionNumInst, m.subscriptionNum.Load()) + return nil +} + func (m *subscriberMetrics) observe(ctx context.Context, observeFn func(context.Context)) { if m == nil { return @@ -95,3 +121,10 @@ func (m *subscriberMetrics) observe(ctx context.Context, observeFn func(context. observeFn(ctx) } + +func (m *subscriberMetrics) Close() error { + if m == nil { + return nil + } + return m.subscriptionNumReg.Unregister() +} diff --git a/p2p/subscription.go b/p2p/subscription.go index c2e7918f..73dedd94 100644 --- a/p2p/subscription.go +++ b/p2p/subscription.go @@ -24,7 +24,7 @@ func newSubscription[H header.Header[H]](topic *pubsub.Topic, metrics *subscribe if err != nil { return nil, err } - metrics.subscription(context.TODO(), 1) + metrics.subscription(1) return &subscription[H]{ topic: topic, @@ -54,5 +54,5 @@ func (s *subscription[H]) NextHeader(ctx context.Context) (H, error) { // Cancel cancels the subscription to new Headers from the network. func (s *subscription[H]) Cancel() { s.subscription.Cancel() - s.metrics.subscription(context.TODO(), -1) + s.metrics.subscription(-1) }