diff --git a/p2p/server.go b/p2p/server.go index a08c1ac6..327b9158 100644 --- a/p2p/server.go +++ b/p2p/server.go @@ -28,8 +28,9 @@ var ( type ExchangeServer[H header.Header[H]] struct { protocolID protocol.ID - host host.Host - store header.Store[H] + host host.Host + store header.Store[H] + metrics *serverMetrics ctx context.Context cancel context.CancelFunc @@ -52,10 +53,20 @@ func NewExchangeServer[H header.Header[H]]( return nil, err } + var metrics *serverMetrics + if params.metrics { + var err error + metrics, err = newServerMetrics() + if err != nil { + return nil, err + } + } + return &ExchangeServer[H]{ protocolID: protocolID(params.networkID), host: host, store: store, + metrics: metrics, Params: params, }, nil } @@ -158,6 +169,7 @@ func (serv *ExchangeServer[H]) requestHandler(stream network.Stream) { // handleRequestByHash returns the Header at the given hash // if it exists. func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) { + startTime := time.Now() log.Debugw("server: handling header request", "hash", header.Hash(hash).String()) ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout) defer cancel() @@ -170,6 +182,7 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) { if err != nil { log.Errorw("server: getting header by hash", "hash", header.Hash(hash).String(), "err", err) span.SetStatus(codes.Error, err.Error()) + serv.metrics.getServed(ctx, time.Since(startTime), true) return nil, err } @@ -178,6 +191,8 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) { attribute.Int64("height", int64(h.Height()))), ) span.SetStatus(codes.Ok, "") + + serv.metrics.getServed(ctx, time.Since(startTime), false) return []H{h}, nil } @@ -188,6 +203,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) { return serv.handleHeadRequest() } + startTime := time.Now() ctx, span := tracer.Start(serv.ctx, "request-range", trace.WithAttributes( attribute.Int64("from", int64(from)), attribute.Int64("to", int64(to)))) @@ -196,6 +212,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) { if to-from > header.MaxRangeRequestSize { log.Errorw("server: skip request for too many headers.", "amount", to-from) span.SetStatus(codes.Error, header.ErrHeadersLimitExceeded.Error()) + serv.metrics.rangeServed(ctx, time.Since(startTime), int(to-from), true) return nil, header.ErrHeadersLimitExceeded } @@ -206,6 +223,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) { if err != nil { span.SetStatus(codes.Error, err.Error()) log.Debugw("server: could not get current head", "err", err) + serv.metrics.rangeServed(ctx, time.Since(startTime), int(to-from), true) return nil, err } @@ -218,6 +236,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) { "currentHead", head.Height(), ) + serv.metrics.rangeServed(ctx, time.Since(startTime), int(to-from), true) return nil, header.ErrNotFound } @@ -237,17 +256,20 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) { return nil, header.ErrNotFound } log.Errorw("server: getting headers", "from", from, "to", to, "err", err) + serv.metrics.rangeServed(ctx, time.Since(startTime), int(to-from), true) return nil, err } span.AddEvent("fetched-range-of-headers", trace.WithAttributes( attribute.Int("amount", len(headersByRange)))) span.SetStatus(codes.Ok, "") + serv.metrics.rangeServed(ctx, time.Since(startTime), len(headersByRange), false) return headersByRange, nil } // handleHeadRequest returns the latest stored head. func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) { + startTime := time.Now() log.Debug("server: handling head request") ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout) defer cancel() @@ -258,6 +280,7 @@ func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) { if err != nil { log.Errorw("server: getting head", "err", err) span.SetStatus(codes.Error, err.Error()) + serv.metrics.headServed(ctx, time.Since(startTime), true) return nil, err } @@ -266,5 +289,6 @@ func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) { attribute.Int64("height", int64(head.Height()))), ) span.SetStatus(codes.Ok, "") + serv.metrics.headServed(ctx, time.Since(startTime), false) return []H{head}, nil } diff --git a/p2p/server_metrics.go b/p2p/server_metrics.go new file mode 100644 index 00000000..a4a9b1c9 --- /dev/null +++ b/p2p/server_metrics.go @@ -0,0 +1,106 @@ +package p2p + +import ( + "context" + "time" + + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +const ( + headersServedKey = "num_headers_served" + failedRequestKey = "failed_request" +) + +type serverMetrics struct { + headersServedInst metric.Int64Counter + headServeTimeInst metric.Float64Histogram + rangeServeTimeInst metric.Float64Histogram + getServeTimeInst metric.Float64Histogram +} + +func newServerMetrics() (m *serverMetrics, err error) { + m = new(serverMetrics) + m.headersServedInst, err = meter.Int64Counter( + "hdr_p2p_exch_srvr_headers_served_counter", + metric.WithDescription("number of headers served"), + ) + if err != nil { + return nil, err + } + m.headServeTimeInst, err = meter.Float64Histogram( + "hdr_p2p_exch_srvr_head_serve_time_hist", + metric.WithDescription("exchange server head serve time in seconds"), + ) + if err != nil { + return nil, err + } + m.rangeServeTimeInst, err = meter.Float64Histogram( + "hdr_p2p_exch_srvr_range_serve_time_hist", + metric.WithDescription("exchange server range serve time in seconds"), + ) + if err != nil { + return nil, err + } + m.getServeTimeInst, err = meter.Float64Histogram( + "hdr_p2p_exch_srvr_get_serve_time_hist", + metric.WithDescription("exchange server get serve time in seconds"), + ) + if err != nil { + return nil, err + } + return m, nil +} + +func (m *serverMetrics) headServed(ctx context.Context, duration time.Duration, failed bool) { + m.observe(ctx, func(ctx context.Context) { + m.headersServedInst.Add(ctx, + 1, + metric.WithAttributes(attribute.Bool(failedRequestKey, failed)), + ) + m.headServeTimeInst.Record(ctx, + duration.Seconds(), + metric.WithAttributes(attribute.Bool(failedRequestKey, failed)), + ) + }) +} + +func (m *serverMetrics) rangeServed(ctx context.Context, duration time.Duration, headersServed int, failed bool) { + m.observe(ctx, func(ctx context.Context) { + m.headersServedInst.Add(ctx, + int64(headersServed), + metric.WithAttributes(attribute.Bool(failedRequestKey, failed)), + ) + m.rangeServeTimeInst.Record(ctx, + duration.Seconds(), + metric.WithAttributes(attribute.Int(headersServedKey, headersServed/100)), // divide by 100 to reduce cardinality + metric.WithAttributes(attribute.Bool(failedRequestKey, failed)), + ) + }) +} + +func (m *serverMetrics) getServed(ctx context.Context, duration time.Duration, failed bool) { + m.observe(ctx, func(ctx context.Context) { + m.headersServedInst.Add(ctx, + 1, + metric.WithAttributes(attribute.Bool(failedRequestKey, failed)), + ) + m.getServeTimeInst.Record(ctx, + duration.Seconds(), + metric.WithAttributes(attribute.Bool(failedRequestKey, failed)), + ) + }) +} + +func (m *serverMetrics) observe(ctx context.Context, f func(context.Context)) { + if m == nil { + return + } + + if ctx.Err() != nil { + ctx = context.Background() + } + + f(ctx) +}