From 8c755bc9d7122b45c42e04dd74ca911798845509 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 29 Jan 2024 13:21:53 +0200 Subject: [PATCH] feat(traces/session): cover session with traces --- p2p/session.go | 49 +++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 41 insertions(+), 8 deletions(-) diff --git a/p2p/session.go b/p2p/session.go index 37f1e258..13074d32 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -9,11 +9,19 @@ import ( "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/protocol" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/codes" + "go.opentelemetry.io/otel/trace" "github.com/celestiaorg/go-header" p2p_pb "github.com/celestiaorg/go-header/p2p/pb" ) +var ( + tracerSession = otel.Tracer("header/p2p-session") +) + // errEmptyResponse means that server side closes the connection without sending at least 1 // response. var errEmptyResponse = errors.New("empty response") @@ -77,9 +85,15 @@ func newSession[H header.Header[H]]( func (s *session[H]) getRangeByHeight( ctx context.Context, from, amount, headersPerPeer uint64, -) ([]H, error) { +) (_ []H, err error) { log.Debugw("requesting headers", "from", from, "to", from+amount-1) // -1 need to exclude to+1 height + ctx, span := tracerSession.Start(ctx, "get-range-by-height", trace.WithAttributes( + attribute.Int64("from", int64(from)), + attribute.Int64("to", int64(from+amount-1)), + )) + defer span.End() + requests := prepareRequests(from, amount, headersPerPeer) result := make(chan []H, len(requests)) s.reqCh = make(chan *p2p_pb.HeaderRequest, len(requests)) @@ -94,8 +108,11 @@ LOOP: for { select { case <-s.ctx.Done(): - return nil, errors.New("header/p2p: exchange is closed") + err = errors.New("header/p2p: exchange is closed") + span.SetStatus(codes.Error, err.Error()) + return nil, err case <-ctx.Done(): + span.SetStatus(codes.Error, ctx.Err().Error()) return nil, ctx.Err() case res := <-result: headers = append(headers, res...) @@ -113,6 +130,7 @@ LOOP: "from", headers[0].Height(), "to", headers[len(headers)-1].Height(), ) + span.SetStatus(codes.Ok, "") return headers, nil } @@ -152,12 +170,23 @@ func (s *session[H]) doRequest( req *p2p_pb.HeaderRequest, headers chan []H, ) { + ctx, span := tracerSession.Start(ctx, "request-headers-from-peer", trace.WithAttributes( + attribute.String("peerID", stat.peerID.String()), + attribute.Int64("from", int64(req.GetOrigin())), + attribute.Int64("amount", int64(req.Amount)), + )) + + defer span.SetStatus(codes.Ok, "") + defer span.End() + ctx, cancel := context.WithTimeout(ctx, s.requestTimeout) defer cancel() r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req) s.metrics.response(ctx, size, duration, err) if err != nil { + span.AddEvent("error during range fetching", trace.WithAttributes( + attribute.String("error", err.Error()))) // we should not punish peer at this point and should try to parse responses, despite that error // was received. log.Debugw("requesting headers from peer failed", "peer", stat.peerID, "err", err) @@ -165,6 +194,8 @@ func (s *session[H]) doRequest( h, err := s.processResponses(r) if err != nil { + span.AddEvent("processing response failed", trace.WithAttributes( + attribute.String("error", err.Error()))) logFn := log.Errorw switch err { @@ -195,21 +226,23 @@ func (s *session[H]) doRequest( "requestedAmount", req.Amount, ) + remainingHeaders := req.Amount - uint64(len(h)) + + span.AddEvent("request succeed", trace.WithAttributes( + attribute.Int64("remaining headers", int64(remainingHeaders)))) + // update peer stats stat.updateStats(size, duration) - responseLn := uint64(len(h)) // ensure that we received the correct amount of headers. - if responseLn < req.Amount { - from := h[responseLn-1].Height() - amount := req.Amount - responseLn - + if remainingHeaders > 0 { + from := h[uint64(len(h))-1].Height() select { case <-s.ctx.Done(): return // create a new request with the remaining headers. // prepareRequests will return a slice with 1 element at this point - case s.reqCh <- prepareRequests(from+1, amount, req.Amount)[0]: + case s.reqCh <- prepareRequests(from+1, remainingHeaders, req.Amount)[0]: log.Debugw("sending additional request to get remaining headers") } }