Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(p2p): metrics for Subscriber #119

Merged
merged 4 commits into from
Oct 19, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 31 additions & 11 deletions p2p/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,10 @@ type SubscriberParams struct {
type Subscriber[H header.Header[H]] struct {
pubsubTopicID string

pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction
metrics *subscriberMetrics
pubsub *pubsub.PubSub
topic *pubsub.Topic
msgID pubsub.MsgIdFunction
}

// WithSubscriberMetrics enables metrics collection for the Subscriber.
Expand Down Expand Up @@ -57,7 +58,17 @@ func NewSubscriber[H header.Header[H]](
opt(&params)
}

var metrics *subscriberMetrics
if params.metrics {
var err error
metrics, err = newSubscriberMetrics()
if err != nil {
return nil, err
}
}

return &Subscriber[H]{
metrics: metrics,
pubsubTopicID: PubsubTopicID(params.networkID),
pubsub: ps,
msgID: msgID,
Expand All @@ -79,7 +90,9 @@ func (s *Subscriber[H]) Stop(context.Context) error {
log.Warnf("unregistering validator: %s", err)
}

return s.topic.Close()
err = errors.Join(err, s.topic.Close())
err = errors.Join(err, s.metrics.Close())
return err
}

// SetVerifier set given verification func as Header PubSub topic validator
Expand All @@ -92,6 +105,7 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
log.Errorw("unmarshalling header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx)
walldiss marked this conversation as resolved.
Show resolved Hide resolved
return pubsub.ValidationReject
}
// ensure header validity
Expand All @@ -100,23 +114,29 @@ func (s *Subscriber[H]) SetVerifier(val func(context.Context, H) error) error {
log.Errorw("invalid header",
"from", p.ShortString(),
"err", err)
s.metrics.reject(ctx)
return pubsub.ValidationReject
}
// keep the valid header in the msg so Subscriptions can access it without
// additional unmarhalling
msg.ValidatorData = hdr

var verErr *header.VerifyError
err = val(ctx, hdr)
switch {
case err == nil:
return pubsub.ValidationAccept
case errors.As(err, &verErr) && verErr.SoftFailure:
s.metrics.ignore(ctx)
return pubsub.ValidationIgnore
default:
case err != nil:
s.metrics.reject(ctx)
return pubsub.ValidationReject
default:
}

// keep the valid header in the msg so Subscriptions can access it without
// additional unmarshalling
msg.ValidatorData = hdr
s.metrics.accept(ctx, len(msg.Data))
return pubsub.ValidationAccept
}

return s.pubsub.RegisterTopicValidator(s.pubsubTopicID, pval)
}

Expand All @@ -127,7 +147,7 @@ func (s *Subscriber[H]) Subscribe() (header.Subscription[H], error) {
return nil, fmt.Errorf("header topic is not instantiated, service must be started before subscribing")
}

return newSubscription[H](s.topic)
return newSubscription[H](s.topic, s.metrics)
}

// Broadcast broadcasts the given Header to the topic.
Expand Down
127 changes: 127 additions & 0 deletions p2p/subscriber_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,127 @@
package p2p

import (
"context"
"sync/atomic"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
statusKey = "status"
statusAccept = "accept"
statusIgnore = "ignore"
statusReject = "reject"
)

type subscriberMetrics struct {
messageNumInst metric.Int64Counter
messageSizeInst metric.Int64Histogram

messageTimeLast atomic.Pointer[time.Time]
messageTimeInst metric.Float64Histogram

subscriptionNum atomic.Int64
subscriptionNumInst metric.Int64ObservableGauge
subscriptionNumReg metric.Registration
Wondertan marked this conversation as resolved.
Show resolved Hide resolved
walldiss marked this conversation as resolved.
Show resolved Hide resolved
}

func newSubscriberMetrics() (m *subscriberMetrics, err error) {
m = new(subscriberMetrics)
m.messageNumInst, err = meter.Int64Counter(
"hdr_p2p_sub_msg_num_counter",
metric.WithDescription("header message count"),
)
if err != nil {
return nil, err
}
m.messageSizeInst, err = meter.Int64Histogram(
"hdr_p2p_sub_msg_size_hist",
metric.WithDescription("valid header message size"),
)
if err != nil {
return nil, err
}
m.messageTimeInst, err = meter.Float64Histogram(
"hdr_p2p_sub_msg_time_hist",
metric.WithDescription("valid header message propagation time"),
)
if err != nil {
return nil, err
}
m.subscriptionNumInst, err = meter.Int64ObservableGauge(
"hdr_p2p_sub_num_gauge",
metric.WithDescription("number of active header message subscriptions"),
)
if err != nil {
return nil, err
}
m.subscriptionNumReg, err = meter.RegisterCallback(m.subscriptionCallback, m.subscriptionNumInst)
if err != nil {
return nil, err
}
return m, nil
}

func (m *subscriberMetrics) accept(ctx context.Context, size int) {
m.observe(ctx, func(ctx context.Context) {
m.messageNumInst.Add(ctx, 1, metric.WithAttributes(
attribute.String(statusKey, statusAccept),
))
m.messageSizeInst.Record(ctx, int64(size))

now := time.Now()
lastTime := m.messageTimeLast.Swap(&now)
if lastTime == nil || lastTime.IsZero() {
return
}
m.messageTimeInst.Record(ctx, now.Sub(*lastTime).Seconds())
})
}

func (m *subscriberMetrics) ignore(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.messageNumInst.Add(ctx, 1, metric.WithAttributes(
attribute.String(statusKey, statusIgnore),
))
})
}

func (m *subscriberMetrics) reject(ctx context.Context) {
m.observe(ctx, func(ctx context.Context) {
m.messageNumInst.Add(ctx, 1, metric.WithAttributes(
attribute.String(statusKey, statusReject),
))
})
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved

func (m *subscriberMetrics) subscription(num int) {
m.observe(context.Background(), func(ctx context.Context) {
m.subscriptionNum.Add(int64(num))
})
}
walldiss marked this conversation as resolved.
Show resolved Hide resolved

func (m *subscriberMetrics) subscriptionCallback(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.subscriptionNumInst, m.subscriptionNum.Load())
return nil
}

func (m *subscriberMetrics) observe(ctx context.Context, observeFn func(context.Context)) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

observeFn(ctx)
}

func (m *subscriberMetrics) Close() error {
if m == nil {
return nil
}
return m.subscriptionNumReg.Unregister()
}
6 changes: 5 additions & 1 deletion p2p/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,19 +14,22 @@ import (
type subscription[H header.Header[H]] struct {
topic *pubsub.Topic
subscription *pubsub.Subscription
metrics *subscriberMetrics
}

// newSubscription creates a new Header event subscription
// on the given host.
func newSubscription[H header.Header[H]](topic *pubsub.Topic) (*subscription[H], error) {
func newSubscription[H header.Header[H]](topic *pubsub.Topic, metrics *subscriberMetrics) (*subscription[H], error) {
sub, err := topic.Subscribe()
if err != nil {
return nil, err
}
metrics.subscription(1)

return &subscription[H]{
topic: topic,
subscription: sub,
metrics: metrics,
}, nil
}

Expand All @@ -51,4 +54,5 @@ func (s *subscription[H]) NextHeader(ctx context.Context) (H, error) {
// Cancel cancels the subscription to new Headers from the network.
func (s *subscription[H]) Cancel() {
s.subscription.Cancel()
s.metrics.subscription(-1)
}