Skip to content

Commit

Permalink
feat(traces/session): cover session with traces
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Jan 29, 2024
1 parent 446648d commit 8c755bc
Showing 1 changed file with 41 additions and 8 deletions.
49 changes: 41 additions & 8 deletions p2p/session.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,11 +9,19 @@ import (

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/protocol"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/go-header"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
)

var (
tracerSession = otel.Tracer("header/p2p-session")
)

// errEmptyResponse means that server side closes the connection without sending at least 1
// response.
var errEmptyResponse = errors.New("empty response")
Expand Down Expand Up @@ -77,9 +85,15 @@ func newSession[H header.Header[H]](
func (s *session[H]) getRangeByHeight(
ctx context.Context,
from, amount, headersPerPeer uint64,
) ([]H, error) {
) (_ []H, err error) {
log.Debugw("requesting headers", "from", from, "to", from+amount-1) // -1 need to exclude to+1 height

ctx, span := tracerSession.Start(ctx, "get-range-by-height", trace.WithAttributes(
attribute.Int64("from", int64(from)),
attribute.Int64("to", int64(from+amount-1)),
))
defer span.End()

requests := prepareRequests(from, amount, headersPerPeer)
result := make(chan []H, len(requests))
s.reqCh = make(chan *p2p_pb.HeaderRequest, len(requests))
Expand All @@ -94,8 +108,11 @@ LOOP:
for {
select {
case <-s.ctx.Done():
return nil, errors.New("header/p2p: exchange is closed")
err = errors.New("header/p2p: exchange is closed")
span.SetStatus(codes.Error, err.Error())
return nil, err
case <-ctx.Done():
span.SetStatus(codes.Error, ctx.Err().Error())
return nil, ctx.Err()
case res := <-result:
headers = append(headers, res...)
Expand All @@ -113,6 +130,7 @@ LOOP:
"from", headers[0].Height(),
"to", headers[len(headers)-1].Height(),
)
span.SetStatus(codes.Ok, "")
return headers, nil
}

Expand Down Expand Up @@ -152,19 +170,32 @@ func (s *session[H]) doRequest(
req *p2p_pb.HeaderRequest,
headers chan []H,
) {
ctx, span := tracerSession.Start(ctx, "request-headers-from-peer", trace.WithAttributes(
attribute.String("peerID", stat.peerID.String()),
attribute.Int64("from", int64(req.GetOrigin())),
attribute.Int64("amount", int64(req.Amount)),
))

defer span.SetStatus(codes.Ok, "")
defer span.End()

ctx, cancel := context.WithTimeout(ctx, s.requestTimeout)
defer cancel()

r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req)
s.metrics.response(ctx, size, duration, err)
if err != nil {
span.AddEvent("error during range fetching", trace.WithAttributes(
attribute.String("error", err.Error())))
// we should not punish peer at this point and should try to parse responses, despite that error
// was received.
log.Debugw("requesting headers from peer failed", "peer", stat.peerID, "err", err)
}

h, err := s.processResponses(r)
if err != nil {
span.AddEvent("processing response failed", trace.WithAttributes(
attribute.String("error", err.Error())))
logFn := log.Errorw

switch err {
Expand Down Expand Up @@ -195,21 +226,23 @@ func (s *session[H]) doRequest(
"requestedAmount", req.Amount,
)

remainingHeaders := req.Amount - uint64(len(h))

span.AddEvent("request succeed", trace.WithAttributes(
attribute.Int64("remaining headers", int64(remainingHeaders))))

// update peer stats
stat.updateStats(size, duration)

responseLn := uint64(len(h))
// ensure that we received the correct amount of headers.
if responseLn < req.Amount {
from := h[responseLn-1].Height()
amount := req.Amount - responseLn

if remainingHeaders > 0 {
from := h[uint64(len(h))-1].Height()
select {
case <-s.ctx.Done():
return
// create a new request with the remaining headers.
// prepareRequests will return a slice with 1 element at this point
case s.reqCh <- prepareRequests(from+1, amount, req.Amount)[0]:
case s.reqCh <- prepareRequests(from+1, remainingHeaders, req.Amount)[0]:
log.Debugw("sending additional request to get remaining headers")
}
}
Expand Down

0 comments on commit 8c755bc

Please sign in to comment.