Skip to content

Commit

Permalink
feat(p2p): metrics for ExchangeServer (#126)
Browse files Browse the repository at this point in the history
Includes:
* headersServed
* headServeTime
* rangeServeTime
* getServeTime
  • Loading branch information
Wondertan authored Oct 26, 2023
1 parent a4d3ca0 commit a8ce731
Show file tree
Hide file tree
Showing 2 changed files with 132 additions and 2 deletions.
28 changes: 26 additions & 2 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,9 @@ var (
type ExchangeServer[H header.Header[H]] struct {
protocolID protocol.ID

host host.Host
store header.Store[H]
host host.Host
store header.Store[H]
metrics *serverMetrics

ctx context.Context
cancel context.CancelFunc
Expand All @@ -52,10 +53,20 @@ func NewExchangeServer[H header.Header[H]](
return nil, err
}

var metrics *serverMetrics
if params.metrics {
var err error
metrics, err = newServerMetrics()
if err != nil {
return nil, err
}
}

return &ExchangeServer[H]{
protocolID: protocolID(params.networkID),
host: host,
store: store,
metrics: metrics,
Params: params,
}, nil
}
Expand Down Expand Up @@ -158,6 +169,7 @@ func (serv *ExchangeServer[H]) requestHandler(stream network.Stream) {
// handleRequestByHash returns the Header at the given hash
// if it exists.
func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) {
startTime := time.Now()
log.Debugw("server: handling header request", "hash", header.Hash(hash).String())
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
defer cancel()
Expand All @@ -170,6 +182,7 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) {
if err != nil {
log.Errorw("server: getting header by hash", "hash", header.Hash(hash).String(), "err", err)
span.SetStatus(codes.Error, err.Error())
serv.metrics.getServed(ctx, time.Since(startTime), true)
return nil, err
}

Expand All @@ -178,6 +191,8 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) {
attribute.Int64("height", int64(h.Height()))),
)
span.SetStatus(codes.Ok, "")

serv.metrics.getServed(ctx, time.Since(startTime), false)
return []H{h}, nil
}

Expand All @@ -188,6 +203,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
return serv.handleHeadRequest()
}

startTime := time.Now()
ctx, span := tracer.Start(serv.ctx, "request-range", trace.WithAttributes(
attribute.Int64("from", int64(from)),
attribute.Int64("to", int64(to))))
Expand All @@ -196,6 +212,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
if to-from > header.MaxRangeRequestSize {
log.Errorw("server: skip request for too many headers.", "amount", to-from)
span.SetStatus(codes.Error, header.ErrHeadersLimitExceeded.Error())
serv.metrics.rangeServed(ctx, time.Since(startTime), int(to-from), true)
return nil, header.ErrHeadersLimitExceeded
}

Expand All @@ -206,6 +223,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
if err != nil {
span.SetStatus(codes.Error, err.Error())
log.Debugw("server: could not get current head", "err", err)
serv.metrics.rangeServed(ctx, time.Since(startTime), int(to-from), true)
return nil, err
}

Expand All @@ -218,6 +236,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
"currentHead",
head.Height(),
)
serv.metrics.rangeServed(ctx, time.Since(startTime), int(to-from), true)
return nil, header.ErrNotFound
}

Expand All @@ -237,17 +256,20 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
return nil, header.ErrNotFound
}
log.Errorw("server: getting headers", "from", from, "to", to, "err", err)
serv.metrics.rangeServed(ctx, time.Since(startTime), int(to-from), true)
return nil, err
}

span.AddEvent("fetched-range-of-headers", trace.WithAttributes(
attribute.Int("amount", len(headersByRange))))
span.SetStatus(codes.Ok, "")
serv.metrics.rangeServed(ctx, time.Since(startTime), len(headersByRange), false)
return headersByRange, nil
}

// handleHeadRequest returns the latest stored head.
func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) {
startTime := time.Now()
log.Debug("server: handling head request")
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
defer cancel()
Expand All @@ -258,6 +280,7 @@ func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) {
if err != nil {
log.Errorw("server: getting head", "err", err)
span.SetStatus(codes.Error, err.Error())
serv.metrics.headServed(ctx, time.Since(startTime), true)
return nil, err
}

Expand All @@ -266,5 +289,6 @@ func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) {
attribute.Int64("height", int64(head.Height()))),
)
span.SetStatus(codes.Ok, "")
serv.metrics.headServed(ctx, time.Since(startTime), false)
return []H{head}, nil
}
106 changes: 106 additions & 0 deletions p2p/server_metrics.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,106 @@
package p2p

import (
"context"
"time"

"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/metric"
)

const (
headersServedKey = "num_headers_served"
failedRequestKey = "failed_request"
)

type serverMetrics struct {
headersServedInst metric.Int64Counter
headServeTimeInst metric.Float64Histogram
rangeServeTimeInst metric.Float64Histogram
getServeTimeInst metric.Float64Histogram
}

func newServerMetrics() (m *serverMetrics, err error) {
m = new(serverMetrics)
m.headersServedInst, err = meter.Int64Counter(
"hdr_p2p_exch_srvr_headers_served_counter",
metric.WithDescription("number of headers served"),
)
if err != nil {
return nil, err
}
m.headServeTimeInst, err = meter.Float64Histogram(
"hdr_p2p_exch_srvr_head_serve_time_hist",
metric.WithDescription("exchange server head serve time in seconds"),
)
if err != nil {
return nil, err
}
m.rangeServeTimeInst, err = meter.Float64Histogram(
"hdr_p2p_exch_srvr_range_serve_time_hist",
metric.WithDescription("exchange server range serve time in seconds"),
)
if err != nil {
return nil, err
}
m.getServeTimeInst, err = meter.Float64Histogram(
"hdr_p2p_exch_srvr_get_serve_time_hist",
metric.WithDescription("exchange server get serve time in seconds"),
)
if err != nil {
return nil, err
}
return m, nil
}

func (m *serverMetrics) headServed(ctx context.Context, duration time.Duration, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.headersServedInst.Add(ctx,
1,
metric.WithAttributes(attribute.Bool(failedRequestKey, failed)),
)
m.headServeTimeInst.Record(ctx,
duration.Seconds(),
metric.WithAttributes(attribute.Bool(failedRequestKey, failed)),
)
})
}

func (m *serverMetrics) rangeServed(ctx context.Context, duration time.Duration, headersServed int, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.headersServedInst.Add(ctx,
int64(headersServed),
metric.WithAttributes(attribute.Bool(failedRequestKey, failed)),
)
m.rangeServeTimeInst.Record(ctx,
duration.Seconds(),
metric.WithAttributes(attribute.Int(headersServedKey, headersServed/100)), // divide by 100 to reduce cardinality
metric.WithAttributes(attribute.Bool(failedRequestKey, failed)),
)
})
}

func (m *serverMetrics) getServed(ctx context.Context, duration time.Duration, failed bool) {
m.observe(ctx, func(ctx context.Context) {
m.headersServedInst.Add(ctx,
1,
metric.WithAttributes(attribute.Bool(failedRequestKey, failed)),
)
m.getServeTimeInst.Record(ctx,
duration.Seconds(),
metric.WithAttributes(attribute.Bool(failedRequestKey, failed)),
)
})
}

func (m *serverMetrics) observe(ctx context.Context, f func(context.Context)) {
if m == nil {
return
}

if ctx.Err() != nil {
ctx = context.Background()
}

f(ctx)
}

0 comments on commit a8ce731

Please sign in to comment.