diff --git a/sync/metrics.go b/sync/metrics.go index 1f2c99f7..a0916720 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -3,36 +3,123 @@ package sync import ( "context" "sync/atomic" + "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) var meter = otel.Meter("header/sync") type metrics struct { - totalSynced atomic.Int64 - totalSyncedGauge metric.Float64ObservableGauge + syncerReg metric.Registration + + subjectiveHeadInst metric.Int64ObservableGauge + syncLoopRunningInst metric.Int64ObservableGauge + + syncLoopStarted metric.Int64Counter + trustedPeersOutOfSync metric.Int64Counter + unrecentHeader metric.Int64Counter + subjectiveInit metric.Int64Counter + + subjectiveHead atomic.Int64 + + syncLoopDurationHist metric.Float64Histogram + syncLoopActive atomic.Int64 + syncStartedTs time.Time + + requestRangeTimeHist metric.Float64Histogram + requestRangeStartTs time.Time + + blockTime metric.Float64Histogram + prevHeader time.Time } func newMetrics() (*metrics, error) { - totalSynced, err := meter.Float64ObservableGauge( - "total_synced_headers", - metric.WithDescription("total synced headers"), + syncLoopStarted, err := meter.Int64Counter( + "hdr_sync_loop_started_counter", + metric.WithDescription("sync loop started shows that syncing is in progress"), ) if err != nil { return nil, err } - m := &metrics{ - totalSyncedGauge: totalSynced, + trustedPeersOutOfSync, err := meter.Int64Counter( + "hdr_sync_trust_peers_out_of_sync_counter", + metric.WithDescription("trusted peers out of sync and gave outdated header"), + ) + if err != nil { + return nil, err } - callback := func(ctx context.Context, observer metric.Observer) error { - observer.ObserveFloat64(totalSynced, float64(m.totalSynced.Load())) - return nil + unrecentHeader, err := meter.Int64Counter( + "hdr_sync_unrecent_header_counter", + metric.WithDescription("tracks every time Syncer returns an unrecent header"), + ) + if err != nil { + return nil, err } - _, err = meter.RegisterCallback(callback, totalSynced) + + subjectiveInit, err := meter.Int64Counter( + "hdr_sync_subjective_init_counter", + metric.WithDescription( + "tracks how many times is the node initialized ", + ), + ) + if err != nil { + return nil, err + } + + subjectiveHead, err := meter.Int64ObservableGauge( + "hdr_sync_subjective_head_gauge", + metric.WithDescription("subjective head height"), + ) + if err != nil { + return nil, err + } + + syncLoopDurationHist, err := meter.Float64Histogram( + "hdr_sync_loop_time_hist", + metric.WithDescription("tracks the duration of syncing")) + if err != nil { + return nil, err + } + + requestRangeTimeHist, err := meter.Float64Histogram("hdr_sync_range_request_time_hist", + metric.WithDescription("tracks the duration of GetRangeByHeight requests")) + if err != nil { + return nil, err + } + + syncLoopRunningInst, err := meter.Int64ObservableGauge( + "hdr_sync_loop_status_gauge", + metric.WithDescription("reports whether syncing is active or not")) + if err != nil { + return nil, err + } + + blockTime, err := meter.Float64Histogram( + "hdr_sync_actual_blockTime_ts_hist", + metric.WithDescription("duration between creation of 2 blocks"), + ) + if err != nil { + return nil, err + } + + m := &metrics{ + syncLoopStarted: syncLoopStarted, + trustedPeersOutOfSync: trustedPeersOutOfSync, + unrecentHeader: unrecentHeader, + subjectiveInit: subjectiveInit, + syncLoopDurationHist: syncLoopDurationHist, + syncLoopRunningInst: syncLoopRunningInst, + requestRangeTimeHist: requestRangeTimeHist, + blockTime: blockTime, + subjectiveHeadInst: subjectiveHead, + } + + m.syncerReg, err = meter.RegisterCallback(m.observeMetrics, m.subjectiveHeadInst, m.syncLoopRunningInst) if err != nil { return nil, err } @@ -40,11 +127,92 @@ func newMetrics() (*metrics, error) { return m, nil } -// recordTotalSynced records the total amount of synced headers. -func (m *metrics) recordTotalSynced(totalSynced int) { +func (m *metrics) observeMetrics(_ context.Context, obs metric.Observer) error { + obs.ObserveInt64(m.subjectiveHeadInst, m.subjectiveHead.Load()) + obs.ObserveInt64(m.syncLoopRunningInst, m.syncLoopActive.Load()) + return nil +} + +func (m *metrics) syncStarted(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.syncStartedTs = time.Now() + m.syncLoopStarted.Add(ctx, 1) + m.syncLoopActive.Store(1) + }) +} + +func (m *metrics) syncFinished(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.syncLoopActive.Store(0) + m.syncLoopDurationHist.Record(ctx, time.Since(m.syncStartedTs).Seconds()) + }) +} + +func (m *metrics) unrecentHead(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.unrecentHeader.Add(ctx, 1) + }) +} + +func (m *metrics) trustedPeersOutOufSync(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.trustedPeersOutOfSync.Add(ctx, 1) + }) +} + +func (m *metrics) subjectiveInitialization(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.subjectiveInit.Add(ctx, 1) + }) +} + +func (m *metrics) updateGetRangeRequestInfo(ctx context.Context, amount int, failed bool) { + m.observe(ctx, func(ctx context.Context) { + m.requestRangeTimeHist.Record(ctx, time.Since(m.requestRangeStartTs).Seconds(), + metric.WithAttributes( + attribute.Int("headers amount", amount), + attribute.Bool("request failed", failed), + )) + }) +} + +func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestamp time.Time) { + m.observe(ctx, func(ctx context.Context) { + m.subjectiveHead.Store(int64(height)) + + if !m.prevHeader.IsZero() { + m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds()) + } + }) +} + +func (m *metrics) rangeRequestStart() { + if m == nil { + return + } + m.requestRangeStartTs = time.Now() +} + +func (m *metrics) rangeRequestStop() { if m == nil { return } + m.requestRangeStartTs = time.Time{} +} - m.totalSynced.Add(int64(totalSynced)) +func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) { + if m == nil { + return + } + if ctx.Err() != nil { + ctx = context.Background() + } + observeFn(ctx) +} + +func (m *metrics) Close() error { + if m == nil { + return nil + } + return m.syncerReg.Unregister() } diff --git a/sync/sync.go b/sync/sync.go index 1dc53c27..bfb3bf6f 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -109,7 +109,7 @@ func (s *Syncer[H]) Start(ctx context.Context) error { // Stop stops Syncer. func (s *Syncer[H]) Stop(context.Context) error { s.cancel() - return nil + return s.metrics.Close() } // SyncWait blocks until ongoing sync is done. @@ -158,7 +158,7 @@ func (s *Syncer[H]) State() State { head, err := s.store.Head(s.ctx) if err == nil { - state.Height = uint64(head.Height()) + state.Height = head.Height() } else if state.Error == "" { // don't ignore the error if we can show it in the state state.Error = err.Error() @@ -180,7 +180,9 @@ func (s *Syncer[H]) syncLoop() { for { select { case <-s.triggerSync: + s.metrics.syncStarted(s.ctx) s.sync(s.ctx) + s.metrics.syncFinished(s.ctx) case <-s.ctx.Done(): return } @@ -239,8 +241,8 @@ func (s *Syncer[H]) sync(ctx context.Context) { func (s *Syncer[H]) doSync(ctx context.Context, fromHead, toHead H) (err error) { s.stateLk.Lock() s.state.ID++ - s.state.FromHeight = uint64(fromHead.Height()) + 1 - s.state.ToHeight = uint64(toHead.Height()) + s.state.FromHeight = fromHead.Height() + 1 + s.state.ToHeight = toHead.Height() s.state.FromHash = fromHead.Hash() s.state.ToHash = toHead.Hash() s.state.Start = time.Now() @@ -315,7 +317,10 @@ func (s *Syncer[H]) requestHeaders( } to := fromHead.Height() + size + 1 + s.metrics.rangeRequestStart() headers, err := s.getter.GetRangeByHeight(ctx, fromHead, to) + s.metrics.updateGetRangeRequestInfo(s.ctx, int(size)/100, err != nil) + s.metrics.rangeRequestStop() if err != nil { return err } @@ -338,7 +343,5 @@ func (s *Syncer[H]) storeHeaders(ctx context.Context, headers ...H) error { if err != nil { return err } - - s.metrics.recordTotalSynced(len(headers)) return nil } diff --git a/sync/sync_head.go b/sync/sync_head.go index 462af91e..ad84344b 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -39,6 +39,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err // if we can't get it - give what we have reqCtx, cancel := context.WithTimeout(ctx, time.Second*2) // TODO(@vgonkivs): make timeout configurable defer cancel() + s.metrics.unrecentHead(s.ctx) netHead, err := s.getter.Head(reqCtx, header.WithTrustedHead[H](sbjHead)) if err != nil { log.Warnw("failed to get recent head, returning current subjective", "sbjHead", sbjHead.Height(), "err", err) @@ -85,10 +86,12 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { return s.subjectiveHead(ctx) } defer s.getter.Unlock() + trustHead, err := s.getter.Head(ctx) if err != nil { return trustHead, err } + s.metrics.subjectiveInitialization(s.ctx) // and set it as the new subjective head without validation, // or, in other words, do 'automatic subjective initialization' // NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack @@ -103,6 +106,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) } log.Warn("trusted peer is out of sync") + s.metrics.trustedPeersOutOufSync(s.ctx) return trustHead, nil } @@ -121,6 +125,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { "hash", netHead.Hash().String(), "err", err) } + s.metrics.newSubjectiveHead(s.ctx, netHead.Height(), netHead.Time()) storeHead, err := s.store.Head(ctx) if err == nil && storeHead.Height() >= netHead.Height() {