Skip to content

Commit

Permalink
address all the feedback and apply new knowledge about metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
Wondertan committed Oct 18, 2023
1 parent e8cb367 commit 296cf4e
Show file tree
Hide file tree
Showing 3 changed files with 83 additions and 57 deletions.
26 changes: 12 additions & 14 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
Expand Down Expand Up @@ -61,7 +60,11 @@ func NewSubscriber[H header.Header[H]](

var metrics *subscriberMetrics
if params.metrics {
metrics = newSubscriberMetrics()
var err error
metrics, err = newSubscriberMetrics()
if err != nil {
return nil, err
}
}

return &Subscriber[H]{
Expand All @@ -87,21 +90,22 @@ func (s *Subscriber[H]) Stop(context.Context) error {
log.Warnf("unregistering validator: %s", err)
}

return s.topic.Close()
err = errors.Join(err, s.topic.Close())
err = errors.Join(err, s.metrics.Close())
return err
}

// SetVerifier set given verification func as Header PubSub topic validator
// Does not punish peers if *header.VerifyError is given with Uncertain set to true.
func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
var lastAccept time.Time
pval := func(ctx context.Context, p peer.ID, msg *pubsub.Message) pubsub.ValidationResult {
hdr := header.New[H]()
err := hdr.UnmarshalBinary(msg.Data)
if err != nil {
log.Errorw("unmarshalling header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx, err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}
// ensure header validity
Expand All @@ -110,28 +114,22 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
log.Errorw("invalid header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx, err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}

var verErr *header.VerifyError
err = val(ctx, hdr)
switch {
case errors.As(err, &verErr) && verErr.SoftFailure:
s.metrics.ignore(ctx, len(msg.Data), err)
s.metrics.ignore(ctx)
return pubsub.ValidationIgnore
case err != nil:
s.metrics.reject(ctx, err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
default:
}

now := time.Now()
if !lastAccept.IsZero() {
s.metrics.accept(ctx, now.Sub(lastAccept), len(msg.Data))
}
lastAccept = now

// keep the valid header in the msg so Subscriptions can access it without
// additional unmarshalling
msg.ValidatorData = hdr
Expand Down
110 changes: 69 additions & 41 deletions p2p/subscriber_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package p2p

import (
"context"
"sync"
"sync/atomic"
"time"

"go.opentelemetry.io/otel/attribute"
Expand All @@ -10,79 +12,101 @@ import (

const (
statusKey = "status"
sizeKey = "size"
reasonKey = "reason"
statusAccept = "accept"
statusIgnore = "ignore"
statusReject = "reject"
)

type subscriberMetrics struct {
headerPropagationTime metric.Float64Histogram
headerReceivedNum metric.Int64Counter
subscriptionNum metric.Int64UpDownCounter
messageNumInst metric.Int64Counter
messageSizeInst metric.Int64Histogram

messageTimeLastLk sync.Mutex
messageTimeLast time.Time
messageTimeInst metric.Float64Histogram

subscriptionNum atomic.Int64
subscriptionNumInst metric.Int64ObservableGauge
subscriptionNumReg metric.Registration
}

func newSubscriberMetrics() *subscriberMetrics {
headerPropagationTime, err := meter.Float64Histogram(
"header_p2p_subscriber_propagation_time",
metric.WithDescription("header propagation time"),
func newSubscriberMetrics() (m *subscriberMetrics, err error) {
m = new(subscriberMetrics)
m.messageNumInst, err = meter.Int64Counter(
"hdr_p2p_sub_msg_num",
metric.WithDescription("header message count"),
)
if err != nil {
panic(err)
return nil, err
}
headerReceivedNum, err := meter.Int64Counter(
"header_p2p_subscriber_received_num",
metric.WithDescription("header received from subscription counter"),
m.messageSizeInst, err = meter.Int64Histogram(
"hdr_p2p_sub_msg_size",
metric.WithDescription("valid header message size"),
)
if err != nil {
panic(err)
return nil, err
}
subscriptionNum, err := meter.Int64UpDownCounter(
"header_p2p_subscriber_subscription_num",
metric.WithDescription("number of active subscriptions"),
m.messageTimeInst, err = meter.Float64Histogram(
"hdr_p2p_sub_msg_time",
metric.WithDescription("valid header message propagation time"),
)
if err != nil {
panic(err)
return nil, err
}
return &subscriberMetrics{
headerPropagationTime: headerPropagationTime,
headerReceivedNum: headerReceivedNum,
subscriptionNum: subscriptionNum,
m.subscriptionNumInst, err = meter.Int64ObservableGauge(
"hdr_p2p_sub_num",
metric.WithDescription("number of active header message subscriptions"),
)
if err != nil {
return nil, err
}
m.subscriptionNumReg, err = meter.RegisterCallback(m.subscriptionCallback, m.subscriptionNumInst)
if err != nil {
return nil, err
}
return m, nil
}

func (m *subscriberMetrics) accept(ctx context.Context, duration time.Duration, size int) {
func (m *subscriberMetrics) accept(ctx context.Context, size int) {
m.observe(ctx, func(ctx context.Context) {
m.headerReceivedNum.Add(ctx, 1, metric.WithAttributeSet(attribute.NewSet(
attribute.String(statusKey, "accept"),
attribute.Int64(sizeKey, int64(size)),
)))
m.messageNumInst.Add(ctx, 1, metric.WithAttributes(
attribute.String(statusKey, statusAccept),
))
m.messageSizeInst.Record(ctx, int64(size))

m.headerPropagationTime.Record(ctx, duration.Seconds())
now := time.Now()
m.messageTimeLastLk.Lock()
if !m.messageTimeLast.IsZero() {
m.messageTimeInst.Record(ctx, now.Sub(m.messageTimeLast).Seconds())
}
m.messageTimeLast = now
m.messageTimeLastLk.Unlock()
})
}

func (m *subscriberMetrics) ignore(ctx context.Context, size int, reason error) {
func (m *subscriberMetrics) ignore(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.headerReceivedNum.Add(ctx, 1, metric.WithAttributes(
attribute.String(statusKey, "ignore"),
attribute.Int64(sizeKey, int64(size)),
attribute.String(reasonKey, reason.Error()),
m.messageNumInst.Add(ctx, 1, metric.WithAttributes(
attribute.String(statusKey, statusIgnore),
))
})
}

func (m *subscriberMetrics) reject(ctx context.Context, reason error) {
func (m *subscriberMetrics) reject(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.headerReceivedNum.Add(ctx, 1, metric.WithAttributes(
attribute.String(statusKey, "reject"),
attribute.String(reasonKey, reason.Error()),
m.messageNumInst.Add(ctx, 1, metric.WithAttributes(
attribute.String(statusKey, statusReject),
))
})
}

func (m *subscriberMetrics) subscription(ctx context.Context, num int) {
m.observe(ctx, func(ctx context.Context) {
m.subscriptionNum.Add(ctx, int64(num))
})
func (m *subscriberMetrics) subscription(num int) {
m.subscriptionNum.Add(int64(num))
}

func (m *subscriberMetrics) subscriptionCallback(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.subscriptionNumInst, m.subscriptionNum.Load())
return nil
}

func (m *subscriberMetrics) observe(ctx context.Context, observeFn func(context.Context)) {
Expand All @@ -95,3 +119,7 @@ func (m *subscriberMetrics) observe(ctx context.Context, observeFn func(context.

observeFn(ctx)
}

func (m *subscriberMetrics) Close() error {
return m.subscriptionNumReg.Unregister()
}
4 changes: 2 additions & 2 deletions p2p/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func newSubscription[H header.Header[H]](topic *pubsub.Topic, metrics *subscribe
if err != nil {
return nil, err
}
metrics.subscription(context.TODO(), 1)
metrics.subscription( 1)

return &subscription[H]{
topic: topic,
Expand Down Expand Up @@ -54,5 +54,5 @@ func (s *subscription[H]) NextHeader(ctx context.Context) (H, error) {
// Cancel cancels the subscription to new Headers from the network.
func (s *subscription[H]) Cancel() {
s.subscription.Cancel()
s.metrics.subscription(context.TODO(), -1)
s.metrics.subscription(-1)
}

0 comments on commit 296cf4e

Please sign in to comment.