Skip to content

Commit

Permalink
chore: cover exchange client with additional traces
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Feb 1, 2024
1 parent 8c755bc commit e5afbeb
Show file tree
Hide file tree
Showing 2 changed files with 74 additions and 9 deletions.
75 changes: 70 additions & 5 deletions p2p/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,12 +14,20 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/libp2p/go-libp2p/p2p/net/conngater"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"

"github.com/celestiaorg/go-header"
p2p_pb "github.com/celestiaorg/go-header/p2p/pb"
)

var log = logging.Logger("header/p2p")
var (
log = logging.Logger("header/p2p")

tracerClient = otel.Tracer("header/p2p-client")
)

// minHeadResponses is the minimum number of headers of the same height
// received from peers to determine the network head. If all trusted peers
Expand Down Expand Up @@ -113,6 +121,7 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error {
// and return the highest one.
func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (H, error) {
log.Debug("requesting head")
ctx, span := tracerClient.Start(ctx, "head")

reqCtx := ctx
startTime := time.Now()
Expand Down Expand Up @@ -157,8 +166,20 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
)
for _, from := range peers {
go func(from peer.ID) {
span.AddEvent("sending request to peer",
trace.WithAttributes(
attribute.String("peerID", from.String()),
attribute.Bool("is trusted", reqParams.TrustedHead.IsZero()),
),
)
headers, err := ex.request(reqCtx, from, headerReq)
if err != nil {
span.AddEvent("request failed",
trace.WithAttributes(
attribute.String("peerID", from.String()),
attribute.String("error", err.Error())),
)

log.Errorw("head request to peer failed", "peer", from, "err", err)
headerRespCh <- zero
return
Expand All @@ -171,6 +192,13 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
if errors.As(err, &verErr) && verErr.SoftFailure {
log.Debugw("received head from tracked peer that soft-failed verification",
"tracked peer", from, "err", err)

span.AddEvent("soft-failed verification header received",
trace.WithAttributes(
attribute.String("peerID", from.String()),
attribute.String("error", err.Error())),
)

headerRespCh <- headers[0]
return
}
Expand All @@ -180,10 +208,17 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
}
logF("verifying head received from tracked peer", "tracked peer", from,
"height", headers[0].Height(), "err", err)
span.AddEvent("verifying head received",
trace.WithAttributes(
attribute.String("peerID", from.String()),
attribute.Int64("height", int64(headers[0].Height())),
attribute.String("error", err.Error())),
)
headerRespCh <- zero
return
}
}
span.AddEvent("request succeeded")
// request ensures that the result slice will have at least one Header
headerRespCh <- headers[0]
}(from)
Expand All @@ -206,22 +241,25 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
if errors.Is(ctx.Err(), context.DeadlineExceeded) {
status = headStatusTimeout
}

span.SetStatus(codes.Error, fmt.Sprintf("head request %s", status))
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, status)
return zero, ctx.Err()
case <-ex.ctx.Done():
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusCanceled)
span.SetStatus(codes.Error, "exchange client stopped")
return zero, ex.ctx.Err()
}
}

head, err := bestHead[H](headers)
if err != nil {
ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusNoHeaders)
span.SetStatus(codes.Error, headStatusNoHeaders)
return zero, err
}

ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusOk)
span.SetStatus(codes.Ok, "")
return head, nil
}

Expand All @@ -230,10 +268,16 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) (
// thereafter.
func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error) {
log.Debugw("requesting header", "height", height)
ctx, span := tracerClient.Start(ctx, "get-by-height",
trace.WithAttributes(
attribute.Int64("height", int64(height)),
))
var zero H
// sanity check height
if height == 0 {
return zero, fmt.Errorf("specified request height must be greater than 0")
err := fmt.Errorf("specified request height must be greater than 0")
span.SetStatus(codes.Error, err.Error())
return zero, err
}
// create request
req := &p2p_pb.HeaderRequest{
Expand All @@ -242,8 +286,10 @@ func (ex *Exchange[H]) GetByHeight(ctx context.Context, height uint64) (H, error
}
headers, err := ex.performRequest(ctx, req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return zero, err
}
span.SetStatus(codes.Ok, "")
return headers[0], nil
}

Expand All @@ -254,19 +300,34 @@ func (ex *Exchange[H]) GetRangeByHeight(
from H,
to uint64,
) ([]H, error) {
ctx, span := tracerClient.Start(ctx, "get-range-by-height",
trace.WithAttributes(
attribute.Int64("from", int64(from.Height())),
attribute.Int64("to", int64(to)),
))
session := newSession[H](
ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from),
)
defer session.close()
// we request the next header height that we don't have: `fromHead`+1
amount := to - (from.Height() + 1)
return session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
result, err := session.getRangeByHeight(ctx, from.Height()+1, amount, ex.Params.MaxHeadersPerRangeRequest)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return nil, err
}
span.SetStatus(codes.Ok, "")
return result, nil
}

// Get performs a request for the Header by the given hash corresponding
// to the RawHeader. Note that the Header must be verified thereafter.
func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
log.Debugw("requesting header", "hash", hash.String())
ctx, span := tracerClient.Start(ctx, "get-by-hash",
trace.WithAttributes(
attribute.String("hash", hash.String()),
))
var zero H
// create request
req := &p2p_pb.HeaderRequest{
Expand All @@ -275,12 +336,16 @@ func (ex *Exchange[H]) Get(ctx context.Context, hash header.Hash) (H, error) {
}
headers, err := ex.performRequest(ctx, req)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return zero, err
}

if !bytes.Equal(headers[0].Hash(), hash) {
return zero, fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash())
err = fmt.Errorf("incorrect hash in header: expected %x, got %x", hash, headers[0].Hash())
span.SetStatus(codes.Error, err.Error())
return zero, err
}
span.SetStatus(codes.Ok, "")
return headers[0], nil
}

Expand Down
8 changes: 4 additions & 4 deletions p2p/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ import (
)

var (
tracer = otel.Tracer("header/server")
tracerServ = otel.Tracer("header/server")
)

// ExchangeServer represents the server-side component for
Expand Down Expand Up @@ -173,7 +173,7 @@ func (serv *ExchangeServer[H]) handleRequestByHash(hash []byte) ([]H, error) {
log.Debugw("server: handling header request", "hash", header.Hash(hash).String())
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
defer cancel()
ctx, span := tracer.Start(ctx, "request-by-hash", trace.WithAttributes(
ctx, span := tracerServ.Start(ctx, "request-by-hash", trace.WithAttributes(
attribute.String("hash", header.Hash(hash).String()),
))
defer span.End()
Expand Down Expand Up @@ -204,7 +204,7 @@ func (serv *ExchangeServer[H]) handleRequest(from, to uint64) ([]H, error) {
}

startTime := time.Now()
ctx, span := tracer.Start(serv.ctx, "request-range", trace.WithAttributes(
ctx, span := tracerServ.Start(serv.ctx, "request-range", trace.WithAttributes(
attribute.Int64("from", int64(from)),
attribute.Int64("to", int64(to))))
defer span.End()
Expand Down Expand Up @@ -273,7 +273,7 @@ func (serv *ExchangeServer[H]) handleHeadRequest() ([]H, error) {
log.Debug("server: handling head request")
ctx, cancel := context.WithTimeout(serv.ctx, serv.Params.RangeRequestTimeout)
defer cancel()
ctx, span := tracer.Start(ctx, "request-head")
ctx, span := tracerServ.Start(ctx, "request-head")
defer span.End()

head, err := serv.store.Head(ctx)
Expand Down

0 comments on commit e5afbeb

Please sign in to comment.