diff --git a/p2p/exchange.go b/p2p/exchange.go index 3f6a6572..df199e10 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -384,8 +384,11 @@ func (ex *Exchange[H]) request( req *p2p_pb.HeaderRequest, ) ([]H, error) { log.Debugw("requesting peer", "peer", to) - responses, size, duration, err := sendMessage(ctx, ex.host, to, ex.protocolID, req) - ex.metrics.response(ctx, size, duration, err) + start := time.Now() + responses, size, err := sendMessage(ctx, ex.host, to, ex.protocolID, req) + took := time.Since(start) + + ex.metrics.response(ctx, size, took, err) if err != nil { log.Debugw("err sending request", "peer", to, "err", err) return nil, err diff --git a/p2p/helpers.go b/p2p/helpers.go index c8d3b21c..2e4df655 100644 --- a/p2p/helpers.go +++ b/p2p/helpers.go @@ -5,7 +5,6 @@ import ( "fmt" "io" "strings" - "time" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/peer" @@ -51,11 +50,10 @@ func sendMessage( to peer.ID, protocol protocol.ID, req *p2p_pb.HeaderRequest, -) ([]*p2p_pb.HeaderResponse, uint64, time.Duration, error) { - startTime := time.Now() +) ([]*p2p_pb.HeaderResponse, uint64, error) { stream, err := host.NewStream(ctx, to, protocol) if err != nil { - return nil, 0, 0, fmt.Errorf("header/p2p: failed to open a new stream: %w", err) + return nil, 0, fmt.Errorf("header/p2p: failed to open a new stream: %w", err) } // set stream deadline from the context deadline. @@ -71,12 +69,12 @@ func sendMessage( _, err = serde.Write(stream, req) if err != nil { stream.Reset() //nolint:errcheck - return nil, 0, 0, fmt.Errorf("header/p2p: failed to write a request: %w", err) + return nil, 0, fmt.Errorf("header/p2p: failed to write a request: %w", err) } err = stream.CloseWrite() if err != nil { - return nil, 0, 0, err + return nil, 0, err } headers := make([]*p2p_pb.HeaderResponse, 0) @@ -112,7 +110,7 @@ func sendMessage( // reset stream in case of an error stream.Reset() //nolint:errcheck } - return headers, totalRespLn, time.Since(startTime), err + return headers, totalRespLn, err } // convertStatusCodeToError converts passed status code into an error. diff --git a/p2p/session.go b/p2p/session.go index 4fa92c07..8962603e 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -180,8 +180,11 @@ func (s *session[H]) doRequest( ctx, cancel := context.WithTimeout(ctx, s.requestTimeout) defer cancel() - r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req) - s.metrics.response(ctx, size, duration, err) + start := time.Now() + r, size, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req) + took := time.Since(start) + + s.metrics.response(ctx, size, took, err) if err != nil { span.SetStatus(codes.Error, err.Error()) // we should not punish peer at this point and should try to parse responses, despite that error @@ -233,7 +236,7 @@ func (s *session[H]) doRequest( span.SetStatus(codes.Ok, "") // update peer stats - stat.updateStats(size, duration) + stat.updateStats(size, took) // ensure that we received the correct amount of headers. if remainingHeaders > 0 { @@ -338,7 +341,7 @@ func processResponses[H header.Header[H]](resps []*p2p_pb.HeaderResponse) ([]H, return nil, errEmptyResponse } - hdrs := make([]H, 0) + hdrs := make([]H, 0, len(resps)) for _, resp := range resps { err := convertStatusCodeToError(resp.StatusCode) if err != nil { @@ -358,9 +361,5 @@ func processResponses[H header.Header[H]](resps []*p2p_pb.HeaderResponse) ([]H, hdrs = append(hdrs, hdr) } - - if len(hdrs) == 0 { - return nil, header.ErrNotFound - } return hdrs, nil }