diff --git a/sync/metrics.go b/sync/metrics.go index 1f2c99f7..53781144 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -3,6 +3,7 @@ package sync import ( "context" "sync/atomic" + "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" @@ -11,8 +12,19 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { + ctx context.Context + totalSynced atomic.Int64 totalSyncedGauge metric.Float64ObservableGauge + + syncLoopStarted metric.Int64Counter + trustedPeersOutOfSync metric.Int64Counter + laggingHeadersStart metric.Int64Counter + + subjectiveHead atomic.Int64 + headerTimestamp metric.Float64Histogram + + headerReceived time.Time } func newMetrics() (*metrics, error) { @@ -24,27 +36,96 @@ func newMetrics() (*metrics, error) { return nil, err } + syncLoopStarted, err := meter.Int64Counter("sync_loop_started", metric.WithDescription("sync loop started")) + if err != nil { + return nil, err + } + + trustedPeersOutOfSync, err := meter.Int64Counter("tr_peers_out_of_sync", metric.WithDescription("trusted peers out of sync")) + if err != nil { + return nil, err + } + + laggingHeadersStart, err := meter.Int64Counter("sync_lagging_hdr_start", metric.WithDescription("lagging header start")) + if err != nil { + return nil, err + } + + subjectiveHead, err := meter.Int64ObservableGauge("sync_subjective_head", metric.WithDescription("subjective head height")) + if err != nil { + return nil, err + } + + headerTimestamp, err := meter.Float64Histogram("sync_subjective_head_ts", + metric.WithDescription("subjective_head_timestamp")) + if err != nil { + return nil, err + } + m := &metrics{ - totalSyncedGauge: totalSynced, + ctx: context.Background(), + totalSyncedGauge: totalSynced, + syncLoopStarted: syncLoopStarted, + trustedPeersOutOfSync: trustedPeersOutOfSync, + laggingHeadersStart: laggingHeadersStart, + headerTimestamp: headerTimestamp, } callback := func(ctx context.Context, observer metric.Observer) error { observer.ObserveFloat64(totalSynced, float64(m.totalSynced.Load())) + observer.ObserveInt64(subjectiveHead, m.subjectiveHead.Load()) return nil } - _, err = meter.RegisterCallback(callback, totalSynced) + + _, err = meter.RegisterCallback(callback, totalSynced, subjectiveHead) if err != nil { return nil, err } - return m, nil } -// recordTotalSynced records the total amount of synced headers. func (m *metrics) recordTotalSynced(totalSynced int) { if m == nil { return } - m.totalSynced.Add(int64(totalSynced)) } + +func (m *metrics) recordSyncLoopStarted() { + if m == nil { + return + } + m.syncLoopStarted.Add(m.ctx, 1) +} + +func (m *metrics) recordTrustedPeersOutOfSync() { + if m == nil { + return + } + m.trustedPeersOutOfSync.Add(m.ctx, 1) +} + +func (m *metrics) observeNewHead(height int64) { + if m == nil { + return + } + m.subjectiveHead.Store(height) +} + +func (m *metrics) observeLaggingHeader(threshold time.Duration, receivedAt time.Time) { + if m == nil { + return + } + if !m.headerReceived.IsZero() && + float64(receivedAt.Second()-m.headerReceived.Second()) > threshold.Seconds() { + m.laggingHeadersStart.Add(m.ctx, 1) + } + m.headerReceived = receivedAt +} + +func (m *metrics) observeHeaderTimestamp(timestamp time.Time) { + if m == nil { + return + } + m.headerTimestamp.Record(m.ctx, float64(timestamp.Second())) +} diff --git a/sync/sync.go b/sync/sync.go index 1dc53c27..5689a94c 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -180,6 +180,7 @@ func (s *Syncer[H]) syncLoop() { for { select { case <-s.triggerSync: + s.metrics.recordSyncLoopStarted() s.sync(s.ctx) case <-s.ctx.Done(): return diff --git a/sync/sync_head.go b/sync/sync_head.go index 66a497f6..38908756 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -102,6 +102,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) } log.Warn("trusted peer is out of sync") + s.metrics.recordTrustedPeersOutOfSync() return trustHead, nil } @@ -130,6 +131,9 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { s.pending.Add(netHead) s.wantSync() log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash()) + s.metrics.observeNewHead(int64(netHead.Height())) + s.metrics.observeHeaderTimestamp(netHead.Time()) + s.metrics.observeLaggingHeader(s.Params.blockTime, time.Now()) } // incomingNetworkHead processes new potential network headers.