diff --git a/p2p/exchange.go b/p2p/exchange.go index 0b5a9315..607bfb0f 100644 --- a/p2p/exchange.go +++ b/p2p/exchange.go @@ -42,7 +42,7 @@ type Exchange[H header.Header[H]] struct { trustedPeers func() peer.IDSlice peerTracker *peerTracker - metrics *metrics + metrics *exchangeMetrics Params ClientParameters } @@ -63,7 +63,7 @@ func NewExchange[H header.Header[H]]( return nil, err } - var metrics *metrics + var metrics *exchangeMetrics if params.metrics { var err error metrics, err = newExchangeMetrics() @@ -75,7 +75,7 @@ func NewExchange[H header.Header[H]]( ex := &Exchange[H]{ host: host, protocolID: protocolID(params.networkID), - peerTracker: newPeerTracker(host, gater, params.pidstore), + peerTracker: newPeerTracker(host, gater, params.pidstore, metrics), Params: params, metrics: metrics, } @@ -102,7 +102,8 @@ func (ex *Exchange[H]) Stop(ctx context.Context) error { // cancel the session if it exists ex.cancel() // stop the peerTracker - return ex.peerTracker.stop(ctx) + err := ex.peerTracker.stop(ctx) + return errors.Join(err, ex.metrics.Close()) } // Head requests the latest Header from trusted peers. @@ -114,14 +115,15 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( log.Debug("requesting head") reqCtx := ctx + startTime := time.Now() if deadline, ok := ctx.Deadline(); ok { // allocate 90% of caller's set deadline for requests // and give leftover to determine the bestHead from gathered responses // this avoids DeadlineExceeded error when any of the peers are unresponsive - now := time.Now() - sub := deadline.Sub(now) * 9 / 10 + + sub := deadline.Sub(startTime) * 9 / 10 var cancel context.CancelFunc - reqCtx, cancel = context.WithDeadline(ctx, now.Add(sub)) + reqCtx, cancel = context.WithDeadline(ctx, startTime.Add(sub)) defer cancel() } @@ -184,6 +186,11 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( }(from) } + headType := headTypeTrusted + if useTrackedPeers { + headType = headTypeUntrusted + } + headers := make([]H, 0, len(peers)) for range peers { select { @@ -192,12 +199,27 @@ func (ex *Exchange[H]) Head(ctx context.Context, opts ...header.HeadOption[H]) ( headers = append(headers, h) } case <-ctx.Done(): + status := headStatusCanceled + if errors.Is(ctx.Err(), context.DeadlineExceeded) { + status = headStatusTimeout + } + + ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, status) return zero, ctx.Err() case <-ex.ctx.Done(): + ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusCanceled) return zero, ex.ctx.Err() } } - return bestHead[H](headers) + + head, err := bestHead[H](headers) + if err != nil { + ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusNoHeaders) + return zero, err + } + + ex.metrics.head(ctx, time.Since(startTime), len(headers), headType, headStatusOk) + return head, nil } // GetByHeight performs a request for the Header at the given @@ -230,7 +252,7 @@ func (ex *Exchange[H]) GetRangeByHeight( to uint64, ) ([]H, error) { session := newSession[H]( - ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, withValidation(from), + ex.ctx, ex.host, ex.peerTracker, ex.protocolID, ex.Params.RangeRequestTimeout, ex.metrics, withValidation(from), ) defer session.close() // we request the next header height that we don't have: `fromHead`+1 @@ -307,7 +329,7 @@ func (ex *Exchange[H]) request( ) ([]H, error) { log.Debugw("requesting peer", "peer", to) responses, size, duration, err := sendMessage(ctx, ex.host, to, ex.protocolID, req) - ex.metrics.observeResponse(ctx, size, duration, err) + ex.metrics.response(ctx, size, duration, err) if err != nil { log.Debugw("err sending request", "peer", to, "err", err) return nil, err diff --git a/p2p/exchange_metrics.go b/p2p/exchange_metrics.go new file mode 100644 index 00000000..82456c2f --- /dev/null +++ b/p2p/exchange_metrics.go @@ -0,0 +1,184 @@ +package p2p + +import ( + "context" + "errors" + "sync/atomic" + "time" + + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" +) + +var meter = otel.Meter("header/p2p") + +const ( + failedKey = "failed" + headerReceivedKey = "num_headers_received" + headTypeKey = "request_type" + headTypeTrusted = "trusted_request" + headTypeUntrusted = "untrusted_request" + headStatusKey = "status" + headStatusOk = "ok" + headStatusTimeout = "timeout" + headStatusCanceled = "canceled" + headStatusNoHeaders = "no_headers" +) + +type exchangeMetrics struct { + headRequestTimeInst metric.Float64Histogram + responseSizeInst metric.Int64Histogram + responseTimeInst metric.Float64Histogram + + trackerPeersNum atomic.Int64 + trackedPeersNumInst metric.Int64ObservableGauge + trackedPeersNumReg metric.Registration + + disconnectedPeersNum atomic.Int64 + disconnectedPeersNumInst metric.Int64ObservableGauge + disconnectedPeersNumReg metric.Registration + + blockedPeersNum atomic.Int64 + blockedPeersNumInst metric.Int64ObservableGauge + blockedPeersNumReg metric.Registration +} + +func newExchangeMetrics() (m *exchangeMetrics, err error) { + m = new(exchangeMetrics) + m.headRequestTimeInst, err = meter.Float64Histogram( + "hdr_p2p_exch_clnt_head_time_hist", + metric.WithDescription("exchange client head request time in seconds"), + ) + if err != nil { + return nil, err + } + m.responseSizeInst, err = meter.Int64Histogram( + "hdr_p2p_exch_clnt_resp_size_hist", + metric.WithDescription("exchange client header response size in bytes"), + ) + if err != nil { + return nil, err + } + m.responseTimeInst, err = meter.Float64Histogram( + "hdr_p2p_exch_clnt_resp_time_hist", + metric.WithDescription("exchange client response time in seconds"), + ) + if err != nil { + return nil, err + } + m.trackedPeersNumInst, err = meter.Int64ObservableGauge( + "hdr_p2p_exch_clnt_trck_peer_num_gauge", + metric.WithDescription("exchange client tracked peers number"), + ) + if err != nil { + return nil, err + } + m.trackedPeersNumReg, err = meter.RegisterCallback(m.observeTrackedPeers, m.trackedPeersNumInst) + if err != nil { + return nil, err + } + m.disconnectedPeersNumInst, err = meter.Int64ObservableGauge( + "hdr_p2p_exch_clnt_disconn_peer_num_gauge", + metric.WithDescription("exchange client tracked disconnected peers number"), + ) + if err != nil { + return nil, err + } + m.disconnectedPeersNumReg, err = meter.RegisterCallback(m.observeDisconnectedPeers, m.disconnectedPeersNumInst) + if err != nil { + return nil, err + } + m.blockedPeersNumInst, err = meter.Int64ObservableGauge( + "hdr_p2p_exch_clnt_block_peer_num_gauge", + metric.WithDescription("exchange client blocked peers number"), + ) + if err != nil { + return nil, err + } + m.blockedPeersNumReg, err = meter.RegisterCallback(m.observeBlockedPeers, m.blockedPeersNumInst) + if err != nil { + return nil, err + } + return m, nil +} + +func (m *exchangeMetrics) head(ctx context.Context, duration time.Duration, headersReceived int, tp, status string) { + m.observe(ctx, func(ctx context.Context) { + m.headRequestTimeInst.Record(ctx, + duration.Seconds(), + metric.WithAttributes( + attribute.Int(headerReceivedKey, headersReceived), + attribute.String(headTypeKey, tp), + attribute.String(headStatusKey, status), + ), + ) + }) +} + +func (m *exchangeMetrics) response(ctx context.Context, size uint64, duration time.Duration, err error) { + m.observe(ctx, func(ctx context.Context) { + m.responseSizeInst.Record(ctx, + int64(size), + metric.WithAttributes(attribute.Bool(failedKey, err != nil)), + ) + m.responseTimeInst.Record(ctx, + duration.Seconds(), + metric.WithAttributes(attribute.Bool(failedKey, err != nil)), + ) + }) +} + +func (m *exchangeMetrics) peersTracked(num int) { + m.observe(context.Background(), func(context.Context) { + m.trackerPeersNum.Add(int64(num)) + }) +} + +func (m *exchangeMetrics) peersDisconnected(num int) { + m.observe(context.Background(), func(context.Context) { + m.disconnectedPeersNum.Add(int64(num)) + }) +} + +func (m *exchangeMetrics) peerBlocked() { + m.observe(context.Background(), func(ctx context.Context) { + m.blockedPeersNum.Add(1) + }) +} + +func (m *exchangeMetrics) observeTrackedPeers(_ context.Context, obs metric.Observer) error { + obs.ObserveInt64(m.trackedPeersNumInst, m.trackerPeersNum.Load()) + return nil +} + +func (m *exchangeMetrics) observeDisconnectedPeers(_ context.Context, obs metric.Observer) error { + obs.ObserveInt64(m.disconnectedPeersNumInst, m.disconnectedPeersNum.Load()) + return nil +} + +func (m *exchangeMetrics) observeBlockedPeers(_ context.Context, obs metric.Observer) error { + obs.ObserveInt64(m.blockedPeersNumInst, m.blockedPeersNum.Load()) + return nil +} + +func (m *exchangeMetrics) observe(ctx context.Context, observeFn func(context.Context)) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + + observeFn(ctx) +} + +func (m *exchangeMetrics) Close() (err error) { + if m == nil { + return nil + } + + err = errors.Join(err, m.trackedPeersNumReg.Unregister()) + err = errors.Join(err, m.disconnectedPeersNumReg.Unregister()) + return err +} diff --git a/p2p/helpers.go b/p2p/helpers.go index 1ca110ad..c8d3b21c 100644 --- a/p2p/helpers.go +++ b/p2p/helpers.go @@ -51,7 +51,7 @@ func sendMessage( to peer.ID, protocol protocol.ID, req *p2p_pb.HeaderRequest, -) ([]*p2p_pb.HeaderResponse, uint64, uint64, error) { +) ([]*p2p_pb.HeaderResponse, uint64, time.Duration, error) { startTime := time.Now() stream, err := host.NewStream(ctx, to, protocol) if err != nil { @@ -94,8 +94,6 @@ func sendMessage( headers = append(headers, resp) } - duration := time.Since(startTime).Milliseconds() - // we allow the server side to explicitly close the connection // if it does not have the requested range. // In this case, server side will send us a response with ErrNotFound status code inside @@ -114,7 +112,7 @@ func sendMessage( // reset stream in case of an error stream.Reset() //nolint:errcheck } - return headers, totalRespLn, uint64(duration), err + return headers, totalRespLn, time.Since(startTime), err } // convertStatusCodeToError converts passed status code into an error. diff --git a/p2p/metrics.go b/p2p/metrics.go deleted file mode 100644 index fbb56eb9..00000000 --- a/p2p/metrics.go +++ /dev/null @@ -1,49 +0,0 @@ -package p2p - -import ( - "context" - - "go.opentelemetry.io/otel" - "go.opentelemetry.io/otel/attribute" - "go.opentelemetry.io/otel/metric" -) - -var meter = otel.Meter("header/p2p") - -type metrics struct { - responseSize metric.Float64Histogram - responseDuration metric.Float64Histogram -} - -func newExchangeMetrics() (*metrics, error) { - responseSize, err := meter.Float64Histogram( - "header_p2p_exchange_response_size", - metric.WithDescription("Size of get headers response in bytes"), - ) - if err != nil { - return nil, err - } - responseDuration, err := meter.Float64Histogram( - "header_p2p_exchange_request_duration", - metric.WithDescription("Duration of get headers request in seconds"), - ) - if err != nil { - return nil, err - } - return &metrics{ - responseSize: responseSize, - responseDuration: responseDuration, - }, nil -} - -func (m *metrics) observeResponse(ctx context.Context, size uint64, duration uint64, err error) { - if m == nil { - return - } - m.responseSize.Record(ctx, float64(size), - metric.WithAttributes(attribute.Bool("failed", err != nil)), - ) - m.responseDuration.Record(ctx, float64(duration), - metric.WithAttributes(attribute.Bool("failed", err != nil)), - ) -} diff --git a/p2p/peer_stats.go b/p2p/peer_stats.go index ced05929..0277fcd4 100644 --- a/p2p/peer_stats.go +++ b/p2p/peer_stats.go @@ -26,12 +26,12 @@ type peerStat struct { // by dividing the amount by time, so the result score will represent how many bytes // were retrieved in 1 millisecond. This value will then be averaged relative to the // previous peerScore. -func (p *peerStat) updateStats(amount uint64, time uint64) { +func (p *peerStat) updateStats(amount uint64, duration time.Duration) { p.Lock() defer p.Unlock() averageSpeed := float32(amount) - if time != 0 { - averageSpeed /= float32(time) + if duration != 0 { + averageSpeed /= float32(duration.Milliseconds()) } if p.peerScore == 0.0 { p.peerScore = averageSpeed diff --git a/p2p/peer_stats_test.go b/p2p/peer_stats_test.go index d5a875d1..1d2c5cfb 100644 --- a/p2p/peer_stats_test.go +++ b/p2p/peer_stats_test.go @@ -4,6 +4,7 @@ import ( "container/heap" "context" "testing" + "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/stretchr/testify/require" @@ -66,13 +67,13 @@ func Test_StatsUpdateStats(t *testing.T) { stat := &peerStat{peerID: "peerID", peerScore: 0} heap.Push(&pQueue.stats, stat) testCases := []struct { - inputTime uint64 + inputTime time.Duration inputBytes uint64 resultScore float32 }{ // common case, where time and bytes is not equal to 0 { - inputTime: 16, + inputTime: time.Millisecond * 16, inputBytes: 4, resultScore: 4, }, @@ -80,7 +81,7 @@ func Test_StatsUpdateStats(t *testing.T) { // then the request was failed and previous score will be // decreased { - inputTime: 10, + inputTime: time.Millisecond * 10, inputBytes: 0, resultScore: 2, }, diff --git a/p2p/peer_tracker.go b/p2p/peer_tracker.go index 8981fd7f..33fffc91 100644 --- a/p2p/peer_tracker.go +++ b/p2p/peer_tracker.go @@ -30,6 +30,7 @@ var ( type peerTracker struct { host host.Host connGater *conngater.BasicConnectionGater + metrics *exchangeMetrics peerLk sync.RWMutex // trackedPeers contains active peers that we can request to. @@ -55,11 +56,13 @@ func newPeerTracker( h host.Host, connGater *conngater.BasicConnectionGater, pidstore PeerIDStore, + metrics *exchangeMetrics, ) *peerTracker { ctx, cancel := context.WithCancel(context.Background()) return &peerTracker{ host: h, connGater: connGater, + metrics: metrics, trackedPeers: make(map[libpeer.ID]*peerStat), disconnectedPeers: make(map[libpeer.ID]*peerStat), pidstore: pidstore, @@ -196,6 +199,8 @@ func (p *peerTracker) connected(pID libpeer.ID) { delete(p.disconnectedPeers, pID) } p.trackedPeers[pID] = stats + + p.metrics.peersTracked(1) } func (p *peerTracker) disconnected(pID libpeer.ID) { @@ -208,6 +213,9 @@ func (p *peerTracker) disconnected(pID libpeer.ID) { stats.pruneDeadline = time.Now().Add(maxAwaitingTime) p.disconnectedPeers[pID] = stats delete(p.trackedPeers, pID) + + p.metrics.peersTracked(-1) + p.metrics.peersDisconnected(1) } func (p *peerTracker) peers() []*peerStat { @@ -235,20 +243,25 @@ func (p *peerTracker) gc() { p.peerLk.Lock() now := time.Now() - + var deletedDisconnectedNum int for id, peer := range p.disconnectedPeers { if peer.pruneDeadline.Before(now) { delete(p.disconnectedPeers, id) + deletedDisconnectedNum++ } } + var deletedTrackedNum int for id, peer := range p.trackedPeers { if peer.peerScore <= defaultScore { delete(p.trackedPeers, id) + deletedTrackedNum++ } } p.peerLk.Unlock() + p.metrics.peersDisconnected(-deletedDisconnectedNum) + p.metrics.peersTracked(-deletedTrackedNum) p.dumpPeers(p.ctx) } } @@ -294,7 +307,6 @@ func (p *peerTracker) stop(ctx context.Context) error { // dump remaining tracked peers p.dumpPeers(ctx) - return nil } @@ -312,10 +324,5 @@ func (p *peerTracker) blockPeer(pID libpeer.ID, reason error) { } log.Warnw("header/p2p: blocked peer", "pID", pID, "reason", reason) - - p.peerLk.Lock() - defer p.peerLk.Unlock() - // remove peer from cache. - delete(p.trackedPeers, pID) - delete(p.disconnectedPeers, pID) + p.metrics.peerBlocked() } diff --git a/p2p/peer_tracker_test.go b/p2p/peer_tracker_test.go index ff5eaf31..782d3710 100644 --- a/p2p/peer_tracker_test.go +++ b/p2p/peer_tracker_test.go @@ -29,7 +29,7 @@ func TestPeerTracker_GC(t *testing.T) { require.NoError(t, err) pidstore := newDummyPIDStore() - p := newPeerTracker(h[0], connGater, pidstore) + p := newPeerTracker(h[0], connGater, pidstore, nil) maxAwaitingTime = time.Millisecond @@ -67,7 +67,7 @@ func TestPeerTracker_BlockPeer(t *testing.T) { h := createMocknet(t, 2) connGater, err := conngater.NewBasicConnectionGater(sync.MutexWrap(datastore.NewMapDatastore())) require.NoError(t, err) - p := newPeerTracker(h[0], connGater, nil) + p := newPeerTracker(h[0], connGater, nil, nil) maxAwaitingTime = time.Millisecond p.blockPeer(h[1].ID(), errors.New("test")) require.Len(t, connGater.ListBlockedPeers(), 1) @@ -100,7 +100,7 @@ func TestPeerTracker_Bootstrap(t *testing.T) { err = pidstore.Put(ctx, prevSeen[2:]) require.NoError(t, err) - tracker := newPeerTracker(mn.Hosts()[0], connGater, pidstore) + tracker := newPeerTracker(mn.Hosts()[0], connGater, pidstore, nil) go tracker.track() diff --git a/p2p/session.go b/p2p/session.go index ac750714..37f1e258 100644 --- a/p2p/session.go +++ b/p2p/session.go @@ -34,6 +34,7 @@ type session[H header.Header[H]] struct { queue *peerQueue // peerTracker contains discovered peers with records that describes their activity. peerTracker *peerTracker + metrics *exchangeMetrics // Otherwise, it will be nil. // `from` is set when additional validation for range is needed. @@ -51,6 +52,7 @@ func newSession[H header.Header[H]]( peerTracker *peerTracker, protocolID protocol.ID, requestTimeout time.Duration, + metrics *exchangeMetrics, options ...option[H], ) *session[H] { ctx, cancel := context.WithCancel(ctx) @@ -62,6 +64,7 @@ func newSession[H header.Header[H]]( queue: newPeerQueue(ctx, peerTracker.peers()), peerTracker: peerTracker, requestTimeout: requestTimeout, + metrics: metrics, } for _, opt := range options { @@ -153,6 +156,7 @@ func (s *session[H]) doRequest( defer cancel() r, size, duration, err := sendMessage(ctx, s.host, stat.peerID, s.protocolID, req) + s.metrics.response(ctx, size, duration, err) if err != nil { // we should not punish peer at this point and should try to parse responses, despite that error // was received. diff --git a/p2p/session_test.go b/p2p/session_test.go index da121521..d7044961 100644 --- a/p2p/session_test.go +++ b/p2p/session_test.go @@ -29,7 +29,7 @@ func Test_Validate(t *testing.T) { context.Background(), nil, &peerTracker{trackedPeers: make(map[peer.ID]*peerStat)}, - "", time.Second, + "", time.Second, nil, withValidation(head), ) @@ -46,7 +46,7 @@ func Test_ValidateFails(t *testing.T) { context.Background(), nil, &peerTracker{trackedPeers: make(map[peer.ID]*peerStat)}, - "", time.Second, + "", time.Second, nil, withValidation(head), ) diff --git a/p2p/subscriber_metrics.go b/p2p/subscriber_metrics.go index b3b6e7f8..164ff371 100644 --- a/p2p/subscriber_metrics.go +++ b/p2p/subscriber_metrics.go @@ -20,8 +20,8 @@ type subscriberMetrics struct { messageNumInst metric.Int64Counter messageSizeInst metric.Int64Histogram - messageTimeLast atomic.Pointer[time.Time] - messageTimeInst metric.Float64Histogram + messageTimeLast atomic.Pointer[time.Time] + messageTimeInst metric.Float64Histogram subscriptionNum atomic.Int64 subscriptionNumInst metric.Int64ObservableGauge