From 04cd1d6dd3072f8cb64ed8d393745af0999898f9 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 8 Jan 2024 12:23:18 +0200 Subject: [PATCH] chore: rework metrics registration and handling --- sync/metrics.go | 20 ++++++++++++-------- sync/sync.go | 2 +- sync/sync_head.go | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index bd8019de..9a6c5357 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -14,7 +14,6 @@ var meter = otel.Meter("header/sync") type metrics struct { totalSynced atomic.Int64 totalSyncedInst metric.Int64ObservableGauge - totalSyncedReg metric.Registration syncLoopStarted metric.Int64Counter trustedPeersOutOfSync metric.Int64Counter @@ -27,7 +26,8 @@ type metrics struct { prevHeader time.Time subjectiveHeadInst metric.Int64ObservableGauge - subjectiveHeadReg metric.Registration + + syncReg metric.Registration headersThreshold time.Duration } @@ -91,16 +91,21 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { headersThreshold: headersThreshold, } - m.totalSyncedReg, err = meter.RegisterCallback(m.observeTotalSynced, totalSynced, subjectiveHead) + m.syncReg, err = meter.RegisterCallback(m.observeMetrics, m.totalSyncedInst, m.subjectiveHeadInst) if err != nil { return nil, err } - m.subjectiveHeadReg, err = meter.RegisterCallback(m.observeNewHead, totalSynced, subjectiveHead) + return m, nil +} + +func (m *metrics) observeMetrics(ctx context.Context, obs metric.Observer) error { + err := m.observeTotalSynced(ctx, obs) if err != nil { - return nil, err + return err } - return m, nil + + return m.observeNewHead(ctx, obs) } func (m *metrics) observeTotalSynced(_ context.Context, obs metric.Observer) error { @@ -109,7 +114,7 @@ func (m *metrics) observeTotalSynced(_ context.Context, obs metric.Observer) err } func (m *metrics) observeNewHead(_ context.Context, obs metric.Observer) error { - obs.ObserveInt64(m.subjectiveHeadInst, m.totalSynced.Load()) + obs.ObserveInt64(m.subjectiveHeadInst, m.subjectiveHead.Load()) return nil } @@ -154,6 +159,5 @@ func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) if ctx.Err() != nil { ctx = context.Background() } - observeFn(ctx) } diff --git a/sync/sync.go b/sync/sync.go index 671c3511..46472c28 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -72,7 +72,7 @@ func NewSyncer[H header.Header[H]]( var metrics *metrics if params.metrics { var err error - metrics, err = newMetrics(params.blockTime) + metrics, err = newMetrics(params.recencyThreshold) if err != nil { return nil, err } diff --git a/sync/sync_head.go b/sync/sync_head.go index 5e767692..31a42691 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -121,6 +121,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { "hash", netHead.Hash().String(), "err", err) } + s.metrics.observeNewSubjectiveHead(s.ctx, int64(netHead.Height()), netHead.Time()) storeHead, err := s.store.Head(ctx) if err == nil && storeHead.Height() >= netHead.Height() { @@ -131,7 +132,6 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { s.pending.Add(netHead) s.wantSync() log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash()) - s.metrics.observeNewSubjectiveHead(s.ctx, int64(netHead.Height()), netHead.Time()) } // incomingNetworkHead processes new potential network headers.