Skip to content

Commit

Permalink
feat(p2p): metrics for Exchange (#125)
Browse files Browse the repository at this point in the history
Includes:
* headRequestTime
* responseSize
* responseTime
* trackerPeersNum 
* disconnectedPeersNum
  • Loading branch information
Wondertan authored Oct 25, 2023
1 parent 09c272d commit a4d3ca0
Show file tree
Hide file tree
Showing 11 changed files with 251 additions and 84 deletions.
42 changes: 32 additions & 10 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ type Exchange[H header.Header[H]] struct {

trustedPeers func() peer.IDSlice
peerTracker *peerTracker
metrics *metrics
metrics *exchangeMetrics

Params ClientParameters
}
Expand All @@ -63,7 +63,7 @@ func NewExchange[H header.Header[H]](
return nil, err
}

var metrics *metrics
var metrics *exchangeMetrics
if params.metrics {
var err error
metrics, err = newExchangeMetrics()
Expand All @@ -75,7 +75,7 @@ func NewExchange[H header.Header[H]](
ex := &Exchange[H]{
host: host,
protocolID: protocolID(params.networkID),
peerTracker: newPeerTracker(host, gater, params.pidstore),
peerTracker: newPeerTracker(host, gater, params.pidstore, metrics),
Params: params,
metrics: metrics,
}
Expand All @@ -102,7 +102,8 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
// cancel the session if it exists
ex.cancel()
// stop the peerTracker
return ex.peerTracker.stop(ctx)
err := ex.peerTracker.stop(ctx)
return errors.Join(err, ex.metrics.Close())
}

// Head requests the latest Header from trusted peers.
Expand All @@ -114,14 +115,15 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
log.Debug("requesting head")

reqCtx := ctx
startTime := time.Now()
if deadline, ok := ctx.Deadline(); ok {
// allocate 90% of caller's set deadline for requests
// and give leftover to determine the bestHead from gathered responses
// this avoids DeadlineExceeded error when any of the peers are unresponsive
now := time.Now()
sub := deadline.Sub(now) * 9 / 10

sub := deadline.Sub(startTime) * 9 / 10
var cancel context.CancelFunc
reqCtx, cancel = context.WithDeadline(ctx, now.Add(sub))
reqCtx, cancel = context.WithDeadline(ctx, startTime.Add(sub))
defer cancel()
}

Expand Down Expand Up @@ -184,6 +186,11 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
}(from)
}

headType := headTypeTrusted
if useTrackedPeers {
headType = headTypeUntrusted
}

headers := make([]H, 0, len(peers))
for range peers {
select {
Expand All @@ -192,12 +199,27 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
headers = append(headers, h)
}
case <-ctx.Done():
status := headStatusCanceled
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
status = headStatusTimeout
}

ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, status)
return zero, ctx.Err()
case <-ex.ctx.Done():
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusCanceled)
return zero, ex.ctx.Err()
}
}
return bestHead[H](headers)

head, err := bestHead[H](headers)
if err != nil {
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusNoHeaders)
return zero, err
}

ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusOk)
return head, nil
}

// GetByHeight performs a request for the Header at the given
Expand Down Expand Up @@ -230,7 +252,7 @@ func (ex *Exchange[H]) GetRangeByHeight(
to uint64,
) ([]H, error) {
session := newSession[H](
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, withValidation(from),
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from),
)
defer session.close()
// we request the next header height that we don't have: `fromHead`+1
Expand Down Expand Up @@ -307,7 +329,7 @@ func (ex *Exchange[H]) request(
) ([]H, error) {
log.Debugw("requesting peer", "peer", to)
responses, size, duration, err := sendMessage(ctx, ex.host, to, ex.protocolID, req)
ex.metrics.observeResponse(ctx, size, duration, err)
ex.metrics.response(ctx, size, duration, err)
if err != nil {
log.Debugw("err sending request", "peer", to, "err", err)
return nil, err
Expand Down
184 changes: 184 additions & 0 deletions p2p/exchange_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
package p2p

import (
"context"
"errors"
"sync/atomic"
"time"

"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

var meter = otel.Meter("header/p2p")

const (
failedKey = "failed"
headerReceivedKey = "num_headers_received"
headTypeKey = "request_type"
headTypeTrusted = "trusted_request"
headTypeUntrusted = "untrusted_request"
headStatusKey = "status"
headStatusOk = "ok"
headStatusTimeout = "timeout"
headStatusCanceled = "canceled"
headStatusNoHeaders = "no_headers"
)

type exchangeMetrics struct {
headRequestTimeInst metric.Float64Histogram
responseSizeInst metric.Int64Histogram
responseTimeInst metric.Float64Histogram

trackerPeersNum atomic.Int64
trackedPeersNumInst metric.Int64ObservableGauge
trackedPeersNumReg metric.Registration

disconnectedPeersNum atomic.Int64
disconnectedPeersNumInst metric.Int64ObservableGauge
disconnectedPeersNumReg metric.Registration

blockedPeersNum atomic.Int64
blockedPeersNumInst metric.Int64ObservableGauge
blockedPeersNumReg metric.Registration
}

func newExchangeMetrics() (m *exchangeMetrics, err error) {
m = new(exchangeMetrics)
m.headRequestTimeInst, err = meter.Float64Histogram(
"hdr_p2p_exch_clnt_head_time_hist",
metric.WithDescription("exchange client head request time in seconds"),
)
if err != nil {
return nil, err
}
m.responseSizeInst, err = meter.Int64Histogram(
"hdr_p2p_exch_clnt_resp_size_hist",
metric.WithDescription("exchange client header response size in bytes"),
)
if err != nil {
return nil, err
}
m.responseTimeInst, err = meter.Float64Histogram(
"hdr_p2p_exch_clnt_resp_time_hist",
metric.WithDescription("exchange client response time in seconds"),
)
if err != nil {
return nil, err
}
m.trackedPeersNumInst, err = meter.Int64ObservableGauge(
"hdr_p2p_exch_clnt_trck_peer_num_gauge",
metric.WithDescription("exchange client tracked peers number"),
)
if err != nil {
return nil, err
}
m.trackedPeersNumReg, err = meter.RegisterCallback(m.observeTrackedPeers, m.trackedPeersNumInst)
if err != nil {
return nil, err
}
m.disconnectedPeersNumInst, err = meter.Int64ObservableGauge(
"hdr_p2p_exch_clnt_disconn_peer_num_gauge",
metric.WithDescription("exchange client tracked disconnected peers number"),
)
if err != nil {
return nil, err
}
m.disconnectedPeersNumReg, err = meter.RegisterCallback(m.observeDisconnectedPeers, m.disconnectedPeersNumInst)
if err != nil {
return nil, err
}
m.blockedPeersNumInst, err = meter.Int64ObservableGauge(
"hdr_p2p_exch_clnt_block_peer_num_gauge",
metric.WithDescription("exchange client blocked peers number"),
)
if err != nil {
return nil, err
}
m.blockedPeersNumReg, err = meter.RegisterCallback(m.observeBlockedPeers, m.blockedPeersNumInst)
if err != nil {
return nil, err
}
return m, nil
}

func (m *exchangeMetrics) head(ctx context.Context, duration time.Duration, headersReceived int, tp, status string) {
m.observe(ctx, func(ctx context.Context) {
m.headRequestTimeInst.Record(ctx,
duration.Seconds(),
metric.WithAttributes(
attribute.Int(headerReceivedKey, headersReceived),
attribute.String(headTypeKey, tp),
attribute.String(headStatusKey, status),
),
)
})
}

func (m *exchangeMetrics) response(ctx context.Context, size uint64, duration time.Duration, err error) {
m.observe(ctx, func(ctx context.Context) {
m.responseSizeInst.Record(ctx,
int64(size),
metric.WithAttributes(attribute.Bool(failedKey, err != nil)),
)
m.responseTimeInst.Record(ctx,
duration.Seconds(),
metric.WithAttributes(attribute.Bool(failedKey, err != nil)),
)
})
}

func (m *exchangeMetrics) peersTracked(num int) {
m.observe(context.Background(), func(context.Context) {
m.trackerPeersNum.Add(int64(num))
})
}

func (m *exchangeMetrics) peersDisconnected(num int) {
m.observe(context.Background(), func(context.Context) {
m.disconnectedPeersNum.Add(int64(num))
})
}

func (m *exchangeMetrics) peerBlocked() {
m.observe(context.Background(), func(ctx context.Context) {
m.blockedPeersNum.Add(1)
})
}

func (m *exchangeMetrics) observeTrackedPeers(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.trackedPeersNumInst, m.trackerPeersNum.Load())
return nil
}

func (m *exchangeMetrics) observeDisconnectedPeers(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.disconnectedPeersNumInst, m.disconnectedPeersNum.Load())
return nil
}

func (m *exchangeMetrics) observeBlockedPeers(_ context.Context, obs metric.Observer) error {
obs.ObserveInt64(m.blockedPeersNumInst, m.blockedPeersNum.Load())
return nil
}

func (m *exchangeMetrics) observe(ctx context.Context, observeFn func(context.Context)) {
if m == nil {
return
}
if ctx.Err() != nil {
ctx = context.Background()
}

observeFn(ctx)
}

func (m *exchangeMetrics) Close() (err error) {
if m == nil {
return nil
}

err = errors.Join(err, m.trackedPeersNumReg.Unregister())
err = errors.Join(err, m.disconnectedPeersNumReg.Unregister())
return err
}
6 changes: 2 additions & 4 deletions p2p/helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func sendMessage(
to peer.ID,
protocol protocol.ID,
req *p2p_pb.HeaderRequest,
) ([]*p2p_pb.HeaderResponse, uint64, uint64, error) {
) ([]*p2p_pb.HeaderResponse, uint64, time.Duration, error) {
startTime := time.Now()
stream, err := host.NewStream(ctx, to, protocol)
if err != nil {
Expand Down Expand Up @@ -94,8 +94,6 @@ func sendMessage(
headers = append(headers, resp)
}

duration := time.Since(startTime).Milliseconds()

// we allow the server side to explicitly close the connection
// if it does not have the requested range.
// In this case, server side will send us a response with ErrNotFound status code inside
Expand All @@ -114,7 +112,7 @@ func sendMessage(
// reset stream in case of an error
stream.Reset() //nolint:errcheck
}
return headers, totalRespLn, uint64(duration), err
return headers, totalRespLn, time.Since(startTime), err
}

// convertStatusCodeToError converts passed status code into an error.
Expand Down
49 changes: 0 additions & 49 deletions p2p/metrics.go

This file was deleted.

6 changes: 3 additions & 3 deletions p2p/peer_stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@ type peerStat struct {
// by dividing the amount by time, so the result score will represent how many bytes
// were retrieved in 1 millisecond. This value will then be averaged relative to the
// previous peerScore.
func (p *peerStat) updateStats(amount uint64, time uint64) {
func (p *peerStat) updateStats(amount uint64, duration time.Duration) {
p.Lock()
defer p.Unlock()
averageSpeed := float32(amount)
if time != 0 {
averageSpeed /= float32(time)
if duration != 0 {
averageSpeed /= float32(duration.Milliseconds())
}
if p.peerScore == 0.0 {
p.peerScore = averageSpeed
Expand Down
Loading

0 comments on commit a4d3ca0

Please sign in to comment.