From abbfc2201d35b622c337ced7c63c2c8a1ee54445 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 13 Dec 2023 12:55:41 +0200 Subject: [PATCH 01/21] chore(syncer/metrics): add metrics for syncer --- sync/metrics.go | 91 ++++++++++++++++++++++++++++++++++++++++++++--- sync/sync.go | 1 + sync/sync_head.go | 4 +++ 3 files changed, 91 insertions(+), 5 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 1f2c99f7..53781144 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -3,6 +3,7 @@ package sync import ( "context" "sync/atomic" + "time" "go.opentelemetry.io/otel" "go.opentelemetry.io/otel/metric" @@ -11,8 +12,19 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { + ctx context.Context + totalSynced atomic.Int64 totalSyncedGauge metric.Float64ObservableGauge + + syncLoopStarted metric.Int64Counter + trustedPeersOutOfSync metric.Int64Counter + laggingHeadersStart metric.Int64Counter + + subjectiveHead atomic.Int64 + headerTimestamp metric.Float64Histogram + + headerReceived time.Time } func newMetrics() (*metrics, error) { @@ -24,27 +36,96 @@ func newMetrics() (*metrics, error) { return nil, err } + syncLoopStarted, err := meter.Int64Counter("sync_loop_started", metric.WithDescription("sync loop started")) + if err != nil { + return nil, err + } + + trustedPeersOutOfSync, err := meter.Int64Counter("tr_peers_out_of_sync", metric.WithDescription("trusted peers out of sync")) + if err != nil { + return nil, err + } + + laggingHeadersStart, err := meter.Int64Counter("sync_lagging_hdr_start", metric.WithDescription("lagging header start")) + if err != nil { + return nil, err + } + + subjectiveHead, err := meter.Int64ObservableGauge("sync_subjective_head", metric.WithDescription("subjective head height")) + if err != nil { + return nil, err + } + + headerTimestamp, err := meter.Float64Histogram("sync_subjective_head_ts", + metric.WithDescription("subjective_head_timestamp")) + if err != nil { + return nil, err + } + m := &metrics{ - totalSyncedGauge: totalSynced, + ctx: context.Background(), + totalSyncedGauge: totalSynced, + syncLoopStarted: syncLoopStarted, + trustedPeersOutOfSync: trustedPeersOutOfSync, + laggingHeadersStart: laggingHeadersStart, + headerTimestamp: headerTimestamp, } callback := func(ctx context.Context, observer metric.Observer) error { observer.ObserveFloat64(totalSynced, float64(m.totalSynced.Load())) + observer.ObserveInt64(subjectiveHead, m.subjectiveHead.Load()) return nil } - _, err = meter.RegisterCallback(callback, totalSynced) + + _, err = meter.RegisterCallback(callback, totalSynced, subjectiveHead) if err != nil { return nil, err } - return m, nil } -// recordTotalSynced records the total amount of synced headers. func (m *metrics) recordTotalSynced(totalSynced int) { if m == nil { return } - m.totalSynced.Add(int64(totalSynced)) } + +func (m *metrics) recordSyncLoopStarted() { + if m == nil { + return + } + m.syncLoopStarted.Add(m.ctx, 1) +} + +func (m *metrics) recordTrustedPeersOutOfSync() { + if m == nil { + return + } + m.trustedPeersOutOfSync.Add(m.ctx, 1) +} + +func (m *metrics) observeNewHead(height int64) { + if m == nil { + return + } + m.subjectiveHead.Store(height) +} + +func (m *metrics) observeLaggingHeader(threshold time.Duration, receivedAt time.Time) { + if m == nil { + return + } + if !m.headerReceived.IsZero() && + float64(receivedAt.Second()-m.headerReceived.Second()) > threshold.Seconds() { + m.laggingHeadersStart.Add(m.ctx, 1) + } + m.headerReceived = receivedAt +} + +func (m *metrics) observeHeaderTimestamp(timestamp time.Time) { + if m == nil { + return + } + m.headerTimestamp.Record(m.ctx, float64(timestamp.Second())) +} diff --git a/sync/sync.go b/sync/sync.go index 1dc53c27..5689a94c 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -180,6 +180,7 @@ func (s *Syncer[H]) syncLoop() { for { select { case <-s.triggerSync: + s.metrics.recordSyncLoopStarted() s.sync(s.ctx) case <-s.ctx.Done(): return diff --git a/sync/sync_head.go b/sync/sync_head.go index 462af91e..b3097696 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -103,6 +103,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) } log.Warn("trusted peer is out of sync") + s.metrics.recordTrustedPeersOutOfSync() return trustHead, nil } @@ -131,6 +132,9 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { s.pending.Add(netHead) s.wantSync() log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash()) + s.metrics.observeNewHead(int64(netHead.Height())) + s.metrics.observeHeaderTimestamp(netHead.Time()) + s.metrics.observeLaggingHeader(s.Params.blockTime, time.Now()) } // incomingNetworkHead processes new potential network headers. From 2ea18ac581715ca94208ce9666df0b5eac5f9c3e Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Fri, 15 Dec 2023 12:48:40 +0200 Subject: [PATCH 02/21] chore: add fixes --- sync/metrics.go | 18 ++++++++++++------ sync/sync.go | 2 +- sync/sync_head.go | 2 +- 3 files changed, 14 insertions(+), 8 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 53781144..42317e26 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -25,9 +25,12 @@ type metrics struct { headerTimestamp metric.Float64Histogram headerReceived time.Time + prevHeader time.Time + + headersThreshHold time.Duration } -func newMetrics() (*metrics, error) { +func newMetrics(headersThreshHold time.Duration) (*metrics, error) { totalSynced, err := meter.Float64ObservableGauge( "total_synced_headers", metric.WithDescription("total synced headers"), @@ -69,6 +72,7 @@ func newMetrics() (*metrics, error) { trustedPeersOutOfSync: trustedPeersOutOfSync, laggingHeadersStart: laggingHeadersStart, headerTimestamp: headerTimestamp, + headersThreshHold: headersThreshHold, } callback := func(ctx context.Context, observer metric.Observer) error { @@ -112,20 +116,22 @@ func (m *metrics) observeNewHead(height int64) { m.subjectiveHead.Store(height) } -func (m *metrics) observeLaggingHeader(threshold time.Duration, receivedAt time.Time) { +func (m *metrics) observeLaggingHeader() { if m == nil { return } - if !m.headerReceived.IsZero() && - float64(receivedAt.Second()-m.headerReceived.Second()) > threshold.Seconds() { + if time.Since(m.headerReceived) > m.headersThreshHold { m.laggingHeadersStart.Add(m.ctx, 1) } - m.headerReceived = receivedAt + m.headerReceived = time.Now() } func (m *metrics) observeHeaderTimestamp(timestamp time.Time) { if m == nil { return } - m.headerTimestamp.Record(m.ctx, float64(timestamp.Second())) + if !m.prevHeader.IsZero() { + m.headerTimestamp.Record(m.ctx, timestamp.Sub(m.prevHeader).Seconds()) + } + m.prevHeader = timestamp } diff --git a/sync/sync.go b/sync/sync.go index 5689a94c..b23463d5 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -72,7 +72,7 @@ func NewSyncer[H header.Header[H]]( var metrics *metrics if params.metrics { var err error - metrics, err = newMetrics() + metrics, err = newMetrics(params.blockTime) if err != nil { return nil, err } diff --git a/sync/sync_head.go b/sync/sync_head.go index b3097696..6825a6d5 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -134,7 +134,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash()) s.metrics.observeNewHead(int64(netHead.Height())) s.metrics.observeHeaderTimestamp(netHead.Time()) - s.metrics.observeLaggingHeader(s.Params.blockTime, time.Now()) + s.metrics.observeLaggingHeader() } // incomingNetworkHead processes new potential network headers. From ac7513a521bdf010849dce270bc868ff2181913c Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Fri, 15 Dec 2023 13:39:27 +0200 Subject: [PATCH 03/21] apply suggestions --- sync/metrics.go | 70 ++++++++++++++++++++++++----------------------- sync/sync_head.go | 4 +-- 2 files changed, 37 insertions(+), 37 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 42317e26..392e3060 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -12,8 +12,6 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { - ctx context.Context - totalSynced atomic.Int64 totalSyncedGauge metric.Float64ObservableGauge @@ -21,16 +19,16 @@ type metrics struct { trustedPeersOutOfSync metric.Int64Counter laggingHeadersStart metric.Int64Counter - subjectiveHead atomic.Int64 - headerTimestamp metric.Float64Histogram + subjectiveHead atomic.Int64 + blockTime metric.Float64Histogram headerReceived time.Time prevHeader time.Time - headersThreshHold time.Duration + headersThreshold time.Duration } -func newMetrics(headersThreshHold time.Duration) (*metrics, error) { +func newMetrics(headersThreshold time.Duration) (*metrics, error) { totalSynced, err := meter.Float64ObservableGauge( "total_synced_headers", metric.WithDescription("total synced headers"), @@ -39,40 +37,53 @@ func newMetrics(headersThreshHold time.Duration) (*metrics, error) { return nil, err } - syncLoopStarted, err := meter.Int64Counter("sync_loop_started", metric.WithDescription("sync loop started")) + syncLoopStarted, err := meter.Int64Counter( + "sync_loop_started", + metric.WithDescription("sync loop started"), + ) if err != nil { return nil, err } - trustedPeersOutOfSync, err := meter.Int64Counter("tr_peers_out_of_sync", metric.WithDescription("trusted peers out of sync")) + trustedPeersOutOfSync, err := meter.Int64Counter( + "tr_peers_out_of_sync", + metric.WithDescription("trusted peers out of sync"), + ) if err != nil { return nil, err } - laggingHeadersStart, err := meter.Int64Counter("sync_lagging_hdr_start", metric.WithDescription("lagging header start")) + laggingHeadersStart, err := meter.Int64Counter( + "sync_lagging_hdr_start", + metric.WithDescription("lagging header start"), + ) if err != nil { return nil, err } - subjectiveHead, err := meter.Int64ObservableGauge("sync_subjective_head", metric.WithDescription("subjective head height")) + subjectiveHead, err := meter.Int64ObservableGauge( + "sync_subjective_head", + metric.WithDescription("subjective head height"), + ) if err != nil { return nil, err } - headerTimestamp, err := meter.Float64Histogram("sync_subjective_head_ts", - metric.WithDescription("subjective_head_timestamp")) + blockTime, err := meter.Float64Histogram( + "sync_actual_blockTime_ts", + metric.WithDescription("duration between creation of 2 blocks"), + ) if err != nil { return nil, err } m := &metrics{ - ctx: context.Background(), totalSyncedGauge: totalSynced, syncLoopStarted: syncLoopStarted, trustedPeersOutOfSync: trustedPeersOutOfSync, laggingHeadersStart: laggingHeadersStart, - headerTimestamp: headerTimestamp, - headersThreshHold: headersThreshHold, + blockTime: blockTime, + headersThreshold: headersThreshold, } callback := func(ctx context.Context, observer metric.Observer) error { @@ -99,39 +110,30 @@ func (m *metrics) recordSyncLoopStarted() { if m == nil { return } - m.syncLoopStarted.Add(m.ctx, 1) + m.syncLoopStarted.Add(context.Background(), 1) } func (m *metrics) recordTrustedPeersOutOfSync() { if m == nil { return } - m.trustedPeersOutOfSync.Add(m.ctx, 1) + m.trustedPeersOutOfSync.Add(context.Background(), 1) } -func (m *metrics) observeNewHead(height int64) { +func (m *metrics) observeNewSubjectiveHead(height int64, timestamp time.Time) { if m == nil { return } m.subjectiveHead.Store(height) -} - -func (m *metrics) observeLaggingHeader() { - if m == nil { - return - } - if time.Since(m.headerReceived) > m.headersThreshHold { - m.laggingHeadersStart.Add(m.ctx, 1) - } - m.headerReceived = time.Now() -} -func (m *metrics) observeHeaderTimestamp(timestamp time.Time) { - if m == nil { - return - } + ctx := context.Background() if !m.prevHeader.IsZero() { - m.headerTimestamp.Record(m.ctx, timestamp.Sub(m.prevHeader).Seconds()) + m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds()) } m.prevHeader = timestamp + + if time.Since(m.headerReceived) > m.headersThreshold { + m.laggingHeadersStart.Add(ctx, 1) + } + m.headerReceived = time.Now() } diff --git a/sync/sync_head.go b/sync/sync_head.go index 6825a6d5..dc8f8cc0 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -132,9 +132,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { s.pending.Add(netHead) s.wantSync() log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash()) - s.metrics.observeNewHead(int64(netHead.Height())) - s.metrics.observeHeaderTimestamp(netHead.Time()) - s.metrics.observeLaggingHeader() + s.metrics.observeNewSubjectiveHead(int64(netHead.Height()), netHead.Time()) } // incomingNetworkHead processes new potential network headers. From 4924ba7e19aeea581e305cd5b0f89cf8b0613683 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Fri, 15 Dec 2023 16:33:22 +0200 Subject: [PATCH 04/21] chore: rename metrics --- sync/metrics.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 392e3060..7928c01b 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -30,7 +30,7 @@ type metrics struct { func newMetrics(headersThreshold time.Duration) (*metrics, error) { totalSynced, err := meter.Float64ObservableGauge( - "total_synced_headers", + "hdr_total_synced_headers", metric.WithDescription("total synced headers"), ) if err != nil { @@ -38,7 +38,7 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { } syncLoopStarted, err := meter.Int64Counter( - "sync_loop_started", + "hdr_sync_loop_started", metric.WithDescription("sync loop started"), ) if err != nil { @@ -46,7 +46,7 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { } trustedPeersOutOfSync, err := meter.Int64Counter( - "tr_peers_out_of_sync", + "hdr_tr_peers_out_of_sync", metric.WithDescription("trusted peers out of sync"), ) if err != nil { @@ -54,7 +54,7 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { } laggingHeadersStart, err := meter.Int64Counter( - "sync_lagging_hdr_start", + "hdr_sync_lagging_hdr_start", metric.WithDescription("lagging header start"), ) if err != nil { @@ -62,7 +62,7 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { } subjectiveHead, err := meter.Int64ObservableGauge( - "sync_subjective_head", + "hdr_sync_subjective_head", metric.WithDescription("subjective head height"), ) if err != nil { @@ -70,7 +70,7 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { } blockTime, err := meter.Float64Histogram( - "sync_actual_blockTime_ts", + "hdr_sync_actual_blockTime_ts", metric.WithDescription("duration between creation of 2 blocks"), ) if err != nil { From b79c70360e37f610d23bf5c16a1a6ebf59f56c69 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Fri, 15 Dec 2023 16:36:08 +0200 Subject: [PATCH 05/21] fix: pass ctx where it is needed --- sync/metrics.go | 11 +++++------ sync/sync.go | 2 +- sync/sync_head.go | 4 ++-- 3 files changed, 8 insertions(+), 9 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 7928c01b..b05c03a8 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -106,27 +106,26 @@ func (m *metrics) recordTotalSynced(totalSynced int) { m.totalSynced.Add(int64(totalSynced)) } -func (m *metrics) recordSyncLoopStarted() { +func (m *metrics) recordSyncLoopStarted(ctx context.Context) { if m == nil { return } - m.syncLoopStarted.Add(context.Background(), 1) + m.syncLoopStarted.Add(ctx, 1) } -func (m *metrics) recordTrustedPeersOutOfSync() { +func (m *metrics) recordTrustedPeersOutOfSync(ctx context.Context) { if m == nil { return } - m.trustedPeersOutOfSync.Add(context.Background(), 1) + m.trustedPeersOutOfSync.Add(ctx, 1) } -func (m *metrics) observeNewSubjectiveHead(height int64, timestamp time.Time) { +func (m *metrics) observeNewSubjectiveHead(ctx context.Context, height int64, timestamp time.Time) { if m == nil { return } m.subjectiveHead.Store(height) - ctx := context.Background() if !m.prevHeader.IsZero() { m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds()) } diff --git a/sync/sync.go b/sync/sync.go index b23463d5..671c3511 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -180,7 +180,7 @@ func (s *Syncer[H]) syncLoop() { for { select { case <-s.triggerSync: - s.metrics.recordSyncLoopStarted() + s.metrics.recordSyncLoopStarted(s.ctx) s.sync(s.ctx) case <-s.ctx.Done(): return diff --git a/sync/sync_head.go b/sync/sync_head.go index dc8f8cc0..3874cbed 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -103,7 +103,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) } log.Warn("trusted peer is out of sync") - s.metrics.recordTrustedPeersOutOfSync() + s.metrics.recordTrustedPeersOutOfSync(s.ctx) return trustHead, nil } @@ -132,7 +132,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { s.pending.Add(netHead) s.wantSync() log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash()) - s.metrics.observeNewSubjectiveHead(int64(netHead.Height()), netHead.Time()) + s.metrics.observeNewSubjectiveHead(s.ctx, int64(netHead.Height()), netHead.Time()) } // incomingNetworkHead processes new potential network headers. From ee1502e524c20566eaf9505da13b35ea8154577b Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Fri, 15 Dec 2023 16:49:44 +0200 Subject: [PATCH 06/21] chore: rework --- sync/metrics.go | 81 +++++++++++++++++++++++++++++++------------------ 1 file changed, 51 insertions(+), 30 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index b05c03a8..bd8019de 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -12,8 +12,9 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { - totalSynced atomic.Int64 - totalSyncedGauge metric.Float64ObservableGauge + totalSynced atomic.Int64 + totalSyncedInst metric.Int64ObservableGauge + totalSyncedReg metric.Registration syncLoopStarted metric.Int64Counter trustedPeersOutOfSync metric.Int64Counter @@ -25,11 +26,14 @@ type metrics struct { headerReceived time.Time prevHeader time.Time + subjectiveHeadInst metric.Int64ObservableGauge + subjectiveHeadReg metric.Registration + headersThreshold time.Duration } func newMetrics(headersThreshold time.Duration) (*metrics, error) { - totalSynced, err := meter.Float64ObservableGauge( + totalSynced, err := meter.Int64ObservableGauge( "hdr_total_synced_headers", metric.WithDescription("total synced headers"), ) @@ -78,61 +82,78 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { } m := &metrics{ - totalSyncedGauge: totalSynced, + totalSyncedInst: totalSynced, syncLoopStarted: syncLoopStarted, trustedPeersOutOfSync: trustedPeersOutOfSync, laggingHeadersStart: laggingHeadersStart, blockTime: blockTime, + subjectiveHeadInst: subjectiveHead, headersThreshold: headersThreshold, } - callback := func(ctx context.Context, observer metric.Observer) error { - observer.ObserveFloat64(totalSynced, float64(m.totalSynced.Load())) - observer.ObserveInt64(subjectiveHead, m.subjectiveHead.Load()) - return nil + m.totalSyncedReg, err = meter.RegisterCallback(m.observeTotalSynced, totalSynced, subjectiveHead) + if err != nil { + return nil, err } - _, err = meter.RegisterCallback(callback, totalSynced, subjectiveHead) + m.subjectiveHeadReg, err = meter.RegisterCallback(m.observeNewHead, totalSynced, subjectiveHead) if err != nil { return nil, err } return m, nil } +func (m *metrics) observeTotalSynced(_ context.Context, obs metric.Observer) error { + obs.ObserveInt64(m.totalSyncedInst, m.totalSynced.Load()) + return nil +} + +func (m *metrics) observeNewHead(_ context.Context, obs metric.Observer) error { + obs.ObserveInt64(m.subjectiveHeadInst, m.totalSynced.Load()) + return nil +} + func (m *metrics) recordTotalSynced(totalSynced int) { - if m == nil { - return - } - m.totalSynced.Add(int64(totalSynced)) + m.observe(context.Background(), func(_ context.Context) { + m.totalSynced.Add(int64(totalSynced)) + }) } func (m *metrics) recordSyncLoopStarted(ctx context.Context) { - if m == nil { - return - } - m.syncLoopStarted.Add(ctx, 1) + m.observe(ctx, func(ctx context.Context) { + m.syncLoopStarted.Add(ctx, 1) + }) } func (m *metrics) recordTrustedPeersOutOfSync(ctx context.Context) { - if m == nil { - return - } - m.trustedPeersOutOfSync.Add(ctx, 1) + m.observe(ctx, func(ctx context.Context) { + m.trustedPeersOutOfSync.Add(ctx, 1) + }) } func (m *metrics) observeNewSubjectiveHead(ctx context.Context, height int64, timestamp time.Time) { + m.observe(ctx, func(ctx context.Context) { + m.subjectiveHead.Store(height) + + if !m.prevHeader.IsZero() { + m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds()) + } + + if time.Since(m.headerReceived) > m.headersThreshold { + m.laggingHeadersStart.Add(ctx, 1) + } + }) + m.prevHeader = timestamp + m.headerReceived = time.Now() +} + +func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) { if m == nil { return } - m.subjectiveHead.Store(height) - - if !m.prevHeader.IsZero() { - m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds()) + if ctx.Err() != nil { + ctx = context.Background() } - m.prevHeader = timestamp - if time.Since(m.headerReceived) > m.headersThreshold { - m.laggingHeadersStart.Add(ctx, 1) - } - m.headerReceived = time.Now() + observeFn(ctx) } From 9ac9d8c4ea9b8adb56da5789afd2c5b41696ef1c Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 8 Jan 2024 12:23:18 +0200 Subject: [PATCH 07/21] chore: rework metrics registration and handling --- sync/metrics.go | 20 ++++++++++++-------- sync/sync.go | 2 +- sync/sync_head.go | 2 +- 3 files changed, 14 insertions(+), 10 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index bd8019de..9a6c5357 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -14,7 +14,6 @@ var meter = otel.Meter("header/sync") type metrics struct { totalSynced atomic.Int64 totalSyncedInst metric.Int64ObservableGauge - totalSyncedReg metric.Registration syncLoopStarted metric.Int64Counter trustedPeersOutOfSync metric.Int64Counter @@ -27,7 +26,8 @@ type metrics struct { prevHeader time.Time subjectiveHeadInst metric.Int64ObservableGauge - subjectiveHeadReg metric.Registration + + syncReg metric.Registration headersThreshold time.Duration } @@ -91,16 +91,21 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { headersThreshold: headersThreshold, } - m.totalSyncedReg, err = meter.RegisterCallback(m.observeTotalSynced, totalSynced, subjectiveHead) + m.syncReg, err = meter.RegisterCallback(m.observeMetrics, m.totalSyncedInst, m.subjectiveHeadInst) if err != nil { return nil, err } - m.subjectiveHeadReg, err = meter.RegisterCallback(m.observeNewHead, totalSynced, subjectiveHead) + return m, nil +} + +func (m *metrics) observeMetrics(ctx context.Context, obs metric.Observer) error { + err := m.observeTotalSynced(ctx, obs) if err != nil { - return nil, err + return err } - return m, nil + + return m.observeNewHead(ctx, obs) } func (m *metrics) observeTotalSynced(_ context.Context, obs metric.Observer) error { @@ -109,7 +114,7 @@ func (m *metrics) observeTotalSynced(_ context.Context, obs metric.Observer) err } func (m *metrics) observeNewHead(_ context.Context, obs metric.Observer) error { - obs.ObserveInt64(m.subjectiveHeadInst, m.totalSynced.Load()) + obs.ObserveInt64(m.subjectiveHeadInst, m.subjectiveHead.Load()) return nil } @@ -154,6 +159,5 @@ func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) if ctx.Err() != nil { ctx = context.Background() } - observeFn(ctx) } diff --git a/sync/sync.go b/sync/sync.go index 671c3511..46472c28 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -72,7 +72,7 @@ func NewSyncer[H header.Header[H]]( var metrics *metrics if params.metrics { var err error - metrics, err = newMetrics(params.blockTime) + metrics, err = newMetrics(params.recencyThreshold) if err != nil { return nil, err } diff --git a/sync/sync_head.go b/sync/sync_head.go index 3874cbed..5d8233db 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -122,6 +122,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { "hash", netHead.Hash().String(), "err", err) } + s.metrics.observeNewSubjectiveHead(s.ctx, int64(netHead.Height()), netHead.Time()) storeHead, err := s.store.Head(ctx) if err == nil && storeHead.Height() >= netHead.Height() { @@ -132,7 +133,6 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { s.pending.Add(netHead) s.wantSync() log.Infow("new network head", "height", netHead.Height(), "hash", netHead.Hash()) - s.metrics.observeNewSubjectiveHead(s.ctx, int64(netHead.Height()), netHead.Time()) } // incomingNetworkHead processes new potential network headers. From 11e3993145742e66fc48b216313ce511499bd35b Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 8 Jan 2024 12:35:42 +0200 Subject: [PATCH 08/21] fix: fix panic in test --- sync/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 9a6c5357..d09883a1 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -147,9 +147,9 @@ func (m *metrics) observeNewSubjectiveHead(ctx context.Context, height int64, ti if time.Since(m.headerReceived) > m.headersThreshold { m.laggingHeadersStart.Add(ctx, 1) } + m.prevHeader = timestamp + m.headerReceived = time.Now() }) - m.prevHeader = timestamp - m.headerReceived = time.Now() } func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) { From 0903a7ac0ca6fa318b8b40f6bff76ca26f6bcf43 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 10 Jan 2024 13:06:11 +0200 Subject: [PATCH 09/21] chore: add description --- sync/metrics.go | 18 ++++++++---------- 1 file changed, 8 insertions(+), 10 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index d09883a1..32b29f12 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -12,6 +12,8 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { + syncReg metric.Registration + totalSynced atomic.Int64 totalSyncedInst metric.Int64ObservableGauge @@ -19,15 +21,11 @@ type metrics struct { trustedPeersOutOfSync metric.Int64Counter laggingHeadersStart metric.Int64Counter - subjectiveHead atomic.Int64 - blockTime metric.Float64Histogram - - headerReceived time.Time - prevHeader time.Time - + subjectiveHead atomic.Int64 subjectiveHeadInst metric.Int64ObservableGauge - - syncReg metric.Registration + blockTime metric.Float64Histogram + headerReceived time.Time + prevHeader time.Time headersThreshold time.Duration } @@ -35,7 +33,7 @@ type metrics struct { func newMetrics(headersThreshold time.Duration) (*metrics, error) { totalSynced, err := meter.Int64ObservableGauge( "hdr_total_synced_headers", - metric.WithDescription("total synced headers"), + metric.WithDescription("total synced headers shows how many headers have been synced"), ) if err != nil { return nil, err @@ -43,7 +41,7 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { syncLoopStarted, err := meter.Int64Counter( "hdr_sync_loop_started", - metric.WithDescription("sync loop started"), + metric.WithDescription("sync loop started shows that syncing is in progress"), ) if err != nil { return nil, err From 6619634d10c48acbbd6bea84ec4666f0df40d254 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 10 Jan 2024 13:39:06 +0200 Subject: [PATCH 10/21] chore: unregister metrics on stop --- sync/metrics.go | 11 +++++++++-- sync/sync.go | 4 ++-- sync/sync_head.go | 2 +- 3 files changed, 12 insertions(+), 5 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 32b29f12..e6f01984 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -122,13 +122,13 @@ func (m *metrics) recordTotalSynced(totalSynced int) { }) } -func (m *metrics) recordSyncLoopStarted(ctx context.Context) { +func (m *metrics) syncingStarted(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { m.syncLoopStarted.Add(ctx, 1) }) } -func (m *metrics) recordTrustedPeersOutOfSync(ctx context.Context) { +func (m *metrics) peersOutOufSync(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { m.trustedPeersOutOfSync.Add(ctx, 1) }) @@ -159,3 +159,10 @@ func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) } observeFn(ctx) } + +func (m *metrics) Close() error { + if m == nil { + return nil + } + return m.syncReg.Unregister() +} diff --git a/sync/sync.go b/sync/sync.go index 46472c28..e41a304b 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -109,7 +109,7 @@ func (s *Syncer[H]) Start(ctx context.Context) error { // Stop stops Syncer. func (s *Syncer[H]) Stop(context.Context) error { s.cancel() - return nil + return s.metrics.Close() } // SyncWait blocks until ongoing sync is done. @@ -180,7 +180,7 @@ func (s *Syncer[H]) syncLoop() { for { select { case <-s.triggerSync: - s.metrics.recordSyncLoopStarted(s.ctx) + s.metrics.syncingStarted(s.ctx) s.sync(s.ctx) case <-s.ctx.Done(): return diff --git a/sync/sync_head.go b/sync/sync_head.go index 5d8233db..18b42eb1 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -103,7 +103,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) } log.Warn("trusted peer is out of sync") - s.metrics.recordTrustedPeersOutOfSync(s.ctx) + s.metrics.peersOutOufSync(s.ctx) return trustHead, nil } From 19f693fb3c0c89f98efc0b1145e4104663e0803e Mon Sep 17 00:00:00 2001 From: Viacheslav Date: Wed, 10 Jan 2024 16:31:03 +0200 Subject: [PATCH 11/21] apply suggestions Co-authored-by: rene <41963722+renaynay@users.noreply.github.com> --- sync/metrics.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index e6f01984..dd7c6302 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -33,7 +33,7 @@ type metrics struct { func newMetrics(headersThreshold time.Duration) (*metrics, error) { totalSynced, err := meter.Int64ObservableGauge( "hdr_total_synced_headers", - metric.WithDescription("total synced headers shows how many headers have been synced"), +metric.WithDescription("total synced headers shows how many headers have been synced since runtime"), ) if err != nil { return nil, err @@ -41,7 +41,7 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { syncLoopStarted, err := meter.Int64Counter( "hdr_sync_loop_started", - metric.WithDescription("sync loop started shows that syncing is in progress"), +metric.WithDescription("sync loop started records timestamp of a new sync job"), ) if err != nil { return nil, err From a07837a39501e5080d47940a04490bb46bdb97de Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 10 Jan 2024 16:33:14 +0200 Subject: [PATCH 12/21] chore: change positioning in struct --- sync/metrics.go | 21 ++++++++++----------- 1 file changed, 10 insertions(+), 11 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index dd7c6302..847af443 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -12,20 +12,19 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { - syncReg metric.Registration - - totalSynced atomic.Int64 - totalSyncedInst metric.Int64ObservableGauge + syncReg metric.Registration + subjectiveHeadInst metric.Int64ObservableGauge + totalSyncedInst metric.Int64ObservableGauge + totalSynced atomic.Int64 syncLoopStarted metric.Int64Counter trustedPeersOutOfSync metric.Int64Counter laggingHeadersStart metric.Int64Counter - subjectiveHead atomic.Int64 - subjectiveHeadInst metric.Int64ObservableGauge - blockTime metric.Float64Histogram - headerReceived time.Time - prevHeader time.Time + subjectiveHead atomic.Int64 + blockTime metric.Float64Histogram + headerReceived time.Time + prevHeader time.Time headersThreshold time.Duration } @@ -33,7 +32,7 @@ type metrics struct { func newMetrics(headersThreshold time.Duration) (*metrics, error) { totalSynced, err := meter.Int64ObservableGauge( "hdr_total_synced_headers", -metric.WithDescription("total synced headers shows how many headers have been synced since runtime"), + metric.WithDescription("total synced headers shows how many headers have been synced"), ) if err != nil { return nil, err @@ -41,7 +40,7 @@ metric.WithDescription("total synced headers shows how many headers have been sy syncLoopStarted, err := meter.Int64Counter( "hdr_sync_loop_started", -metric.WithDescription("sync loop started records timestamp of a new sync job"), + metric.WithDescription("sync loop started shows that syncing is in progress"), ) if err != nil { return nil, err From a332743be457993aaac04078ae049aa37d7da3da Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 10 Jan 2024 17:48:11 +0200 Subject: [PATCH 13/21] chore: add metric --- sync/metrics.go | 28 ++++++++++++++++++++++++---- sync/sync_head.go | 4 ++++ 2 files changed, 28 insertions(+), 4 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 847af443..fc6ac585 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -20,6 +20,7 @@ type metrics struct { syncLoopStarted metric.Int64Counter trustedPeersOutOfSync metric.Int64Counter laggingHeadersStart metric.Int64Counter + readHeader metric.Int64Counter subjectiveHead atomic.Int64 blockTime metric.Float64Histogram @@ -62,6 +63,16 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { return nil, err } + readHeader, err := meter.Int64Counter( + "hdr_sync_getter_read", + metric.WithDescription( + "sync getter used to get the header instead of receiving it through the subscription", + ), + ) + if err != nil { + return nil, err + } + subjectiveHead, err := meter.Int64ObservableGauge( "hdr_sync_subjective_head", metric.WithDescription("subjective head height"), @@ -83,6 +94,7 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { syncLoopStarted: syncLoopStarted, trustedPeersOutOfSync: trustedPeersOutOfSync, laggingHeadersStart: laggingHeadersStart, + readHeader: readHeader, blockTime: blockTime, subjectiveHeadInst: subjectiveHead, headersThreshold: headersThreshold, @@ -127,12 +139,24 @@ func (m *metrics) syncingStarted(ctx context.Context) { }) } +func (m *metrics) laggingNetworkHead(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.laggingHeadersStart.Add(ctx, 1) + }) +} + func (m *metrics) peersOutOufSync(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { m.trustedPeersOutOfSync.Add(ctx, 1) }) } +func (m *metrics) readHeaderGetter(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.readHeader.Add(ctx, 1) + }) +} + func (m *metrics) observeNewSubjectiveHead(ctx context.Context, height int64, timestamp time.Time) { m.observe(ctx, func(ctx context.Context) { m.subjectiveHead.Store(height) @@ -140,10 +164,6 @@ func (m *metrics) observeNewSubjectiveHead(ctx context.Context, height int64, ti if !m.prevHeader.IsZero() { m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds()) } - - if time.Since(m.headerReceived) > m.headersThreshold { - m.laggingHeadersStart.Add(ctx, 1) - } m.prevHeader = timestamp m.headerReceived = time.Now() }) diff --git a/sync/sync_head.go b/sync/sync_head.go index 18b42eb1..56283fe8 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -24,6 +24,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) { return sbjHead, nil } + s.metrics.laggingNetworkHead(s.ctx) // otherwise, request head from the network // TODO: Besides requesting we should listen for new gossiped headers and cancel request if so // @@ -39,6 +40,7 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err // if we can't get it - give what we have reqCtx, cancel := context.WithTimeout(ctx, time.Second*2) // TODO(@vgonkivs): make timeout configurable defer cancel() + s.metrics.readHeaderGetter(s.ctx) netHead, err := s.getter.Head(reqCtx, header.WithTrustedHead[H](sbjHead)) if err != nil { log.Warnw("failed to get recent head, returning current subjective", "sbjHead", sbjHead.Height(), "err", err) @@ -85,6 +87,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { return s.subjectiveHead(ctx) } defer s.getter.Unlock() + s.metrics.readHeaderGetter(s.ctx) trustHead, err := s.getter.Head(ctx) if err != nil { return trustHead, err @@ -100,6 +103,7 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { case isExpired(trustHead, s.Params.TrustingPeriod): log.Warnw("subjective initialization with an expired header", "height", trustHead.Height()) case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold): + s.metrics.laggingNetworkHead(s.ctx) log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) } log.Warn("trusted peer is out of sync") From 4901b85c63d9a0a28d738f0509ee61d588dd8035 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 11 Jan 2024 16:18:48 +0200 Subject: [PATCH 14/21] chore: remove redundant param --- sync/metrics.go | 5 +---- sync/sync.go | 2 +- 2 files changed, 2 insertions(+), 5 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index fc6ac585..a17bcf8c 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -26,11 +26,9 @@ type metrics struct { blockTime metric.Float64Histogram headerReceived time.Time prevHeader time.Time - - headersThreshold time.Duration } -func newMetrics(headersThreshold time.Duration) (*metrics, error) { +func newMetrics() (*metrics, error) { totalSynced, err := meter.Int64ObservableGauge( "hdr_total_synced_headers", metric.WithDescription("total synced headers shows how many headers have been synced"), @@ -97,7 +95,6 @@ func newMetrics(headersThreshold time.Duration) (*metrics, error) { readHeader: readHeader, blockTime: blockTime, subjectiveHeadInst: subjectiveHead, - headersThreshold: headersThreshold, } m.syncReg, err = meter.RegisterCallback(m.observeMetrics, m.totalSyncedInst, m.subjectiveHeadInst) diff --git a/sync/sync.go b/sync/sync.go index e41a304b..2831daf3 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -72,7 +72,7 @@ func NewSyncer[H header.Header[H]]( var metrics *metrics if params.metrics { var err error - metrics, err = newMetrics(params.recencyThreshold) + metrics, err = newMetrics() if err != nil { return nil, err } From 1787306fd4503f12ade2a1496c92f27b27b38b36 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Mon, 29 Jan 2024 13:10:23 +0200 Subject: [PATCH 15/21] chore: refactoring --- sync/metrics.go | 53 +++++++++-------------------------------------- sync/sync.go | 8 +++---- sync/sync_head.go | 2 +- 3 files changed, 14 insertions(+), 49 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index a17bcf8c..c0fe7e4e 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -12,31 +12,21 @@ import ( var meter = otel.Meter("header/sync") type metrics struct { - syncReg metric.Registration subjectiveHeadInst metric.Int64ObservableGauge - totalSyncedInst metric.Int64ObservableGauge - totalSynced atomic.Int64 syncLoopStarted metric.Int64Counter trustedPeersOutOfSync metric.Int64Counter laggingHeadersStart metric.Int64Counter readHeader metric.Int64Counter - subjectiveHead atomic.Int64 - blockTime metric.Float64Histogram - headerReceived time.Time - prevHeader time.Time + subjectiveHead atomic.Int64 + subjectiveHeadReg metric.Registration + + blockTime metric.Float64Histogram + prevHeader time.Time } func newMetrics() (*metrics, error) { - totalSynced, err := meter.Int64ObservableGauge( - "hdr_total_synced_headers", - metric.WithDescription("total synced headers shows how many headers have been synced"), - ) - if err != nil { - return nil, err - } - syncLoopStarted, err := meter.Int64Counter( "hdr_sync_loop_started", metric.WithDescription("sync loop started shows that syncing is in progress"), @@ -88,7 +78,6 @@ func newMetrics() (*metrics, error) { } m := &metrics{ - totalSyncedInst: totalSynced, syncLoopStarted: syncLoopStarted, trustedPeersOutOfSync: trustedPeersOutOfSync, laggingHeadersStart: laggingHeadersStart, @@ -97,7 +86,7 @@ func newMetrics() (*metrics, error) { subjectiveHeadInst: subjectiveHead, } - m.syncReg, err = meter.RegisterCallback(m.observeMetrics, m.totalSyncedInst, m.subjectiveHeadInst) + m.subjectiveHeadReg, err = meter.RegisterCallback(m.newHead, m.subjectiveHeadInst) if err != nil { return nil, err } @@ -105,31 +94,11 @@ func newMetrics() (*metrics, error) { return m, nil } -func (m *metrics) observeMetrics(ctx context.Context, obs metric.Observer) error { - err := m.observeTotalSynced(ctx, obs) - if err != nil { - return err - } - - return m.observeNewHead(ctx, obs) -} - -func (m *metrics) observeTotalSynced(_ context.Context, obs metric.Observer) error { - obs.ObserveInt64(m.totalSyncedInst, m.totalSynced.Load()) - return nil -} - -func (m *metrics) observeNewHead(_ context.Context, obs metric.Observer) error { +func (m *metrics) newHead(_ context.Context, obs metric.Observer) error { obs.ObserveInt64(m.subjectiveHeadInst, m.subjectiveHead.Load()) return nil } -func (m *metrics) recordTotalSynced(totalSynced int) { - m.observe(context.Background(), func(_ context.Context) { - m.totalSynced.Add(int64(totalSynced)) - }) -} - func (m *metrics) syncingStarted(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { m.syncLoopStarted.Add(ctx, 1) @@ -154,15 +123,13 @@ func (m *metrics) readHeaderGetter(ctx context.Context) { }) } -func (m *metrics) observeNewSubjectiveHead(ctx context.Context, height int64, timestamp time.Time) { +func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestamp time.Time) { m.observe(ctx, func(ctx context.Context) { - m.subjectiveHead.Store(height) + m.subjectiveHead.Store(int64(height)) if !m.prevHeader.IsZero() { m.blockTime.Record(ctx, timestamp.Sub(m.prevHeader).Seconds()) } - m.prevHeader = timestamp - m.headerReceived = time.Now() }) } @@ -180,5 +147,5 @@ func (m *metrics) Close() error { if m == nil { return nil } - return m.syncReg.Unregister() + return m.subjectiveHeadReg.Unregister() } diff --git a/sync/sync.go b/sync/sync.go index 2831daf3..d58be346 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -158,7 +158,7 @@ func (s *Syncer[H]) State() State { head, err := s.store.Head(s.ctx) if err == nil { - state.Height = uint64(head.Height()) + state.Height = head.Height() } else if state.Error == "" { // don't ignore the error if we can show it in the state state.Error = err.Error() @@ -240,8 +240,8 @@ func (s *Syncer[H]) sync(ctx context.Context) { func (s *Syncer[H]) doSync(ctx context.Context, fromHead, toHead H) (err error) { s.stateLk.Lock() s.state.ID++ - s.state.FromHeight = uint64(fromHead.Height()) + 1 - s.state.ToHeight = uint64(toHead.Height()) + s.state.FromHeight = fromHead.Height() + 1 + s.state.ToHeight = toHead.Height() s.state.FromHash = fromHead.Hash() s.state.ToHash = toHead.Hash() s.state.Start = time.Now() @@ -339,7 +339,5 @@ func (s *Syncer[H]) storeHeaders(ctx context.Context, headers ...H) error { if err != nil { return err } - - s.metrics.recordTotalSynced(len(headers)) return nil } diff --git a/sync/sync_head.go b/sync/sync_head.go index 56283fe8..7e2c47b4 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -126,7 +126,7 @@ func (s *Syncer[H]) setSubjectiveHead(ctx context.Context, netHead H) { "hash", netHead.Hash().String(), "err", err) } - s.metrics.observeNewSubjectiveHead(s.ctx, int64(netHead.Height()), netHead.Time()) + s.metrics.newSubjectiveHead(s.ctx, netHead.Height(), netHead.Time()) storeHead, err := s.store.Head(ctx) if err == nil && storeHead.Height() >= netHead.Height() { From a31ea9c8a8036e80f6737b94f350e7e8763cbb35 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Tue, 30 Jan 2024 18:38:03 +0200 Subject: [PATCH 16/21] fix: fix issues found by @Wondertan --- sync/metrics.go | 32 ++++++++++++++++---------------- sync/sync_head.go | 9 ++++----- 2 files changed, 20 insertions(+), 21 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index c0fe7e4e..5dc00248 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -16,8 +16,8 @@ type metrics struct { syncLoopStarted metric.Int64Counter trustedPeersOutOfSync metric.Int64Counter - laggingHeadersStart metric.Int64Counter - readHeader metric.Int64Counter + unrecentHeader metric.Int64Counter + subjectiveInit metric.Int64Counter subjectiveHead atomic.Int64 subjectiveHeadReg metric.Registration @@ -36,25 +36,25 @@ func newMetrics() (*metrics, error) { } trustedPeersOutOfSync, err := meter.Int64Counter( - "hdr_tr_peers_out_of_sync", - metric.WithDescription("trusted peers out of sync"), + "hdr_sync_trust_peers_out_of_sync", + metric.WithDescription("trusted peers out of sync and gave outdated header"), ) if err != nil { return nil, err } - laggingHeadersStart, err := meter.Int64Counter( - "hdr_sync_lagging_hdr_start", - metric.WithDescription("lagging header start"), + unrecentHeader, err := meter.Int64Counter( + "hdr_sync_unrecent_header", + metric.WithDescription("tracks every time Syncer returns an unrecent header"), ) if err != nil { return nil, err } - readHeader, err := meter.Int64Counter( - "hdr_sync_getter_read", + subjectiveInit, err := meter.Int64Counter( + "hdr_sync_subjective_init", metric.WithDescription( - "sync getter used to get the header instead of receiving it through the subscription", + "tracks how many times is the node initialized ", ), ) if err != nil { @@ -80,8 +80,8 @@ func newMetrics() (*metrics, error) { m := &metrics{ syncLoopStarted: syncLoopStarted, trustedPeersOutOfSync: trustedPeersOutOfSync, - laggingHeadersStart: laggingHeadersStart, - readHeader: readHeader, + unrecentHeader: unrecentHeader, + subjectiveInit: subjectiveInit, blockTime: blockTime, subjectiveHeadInst: subjectiveHead, } @@ -107,19 +107,19 @@ func (m *metrics) syncingStarted(ctx context.Context) { func (m *metrics) laggingNetworkHead(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { - m.laggingHeadersStart.Add(ctx, 1) + m.unrecentHeader.Add(ctx, 1) }) } -func (m *metrics) peersOutOufSync(ctx context.Context) { +func (m *metrics) trustedPeersOutOufSync(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { m.trustedPeersOutOfSync.Add(ctx, 1) }) } -func (m *metrics) readHeaderGetter(ctx context.Context) { +func (m *metrics) subjectiveInitialization(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { - m.readHeader.Add(ctx, 1) + m.subjectiveInit.Add(ctx, 1) }) } diff --git a/sync/sync_head.go b/sync/sync_head.go index 7e2c47b4..1b6231ce 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -24,7 +24,6 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err if isRecent(sbjHead, s.Params.blockTime, s.Params.recencyThreshold) { return sbjHead, nil } - s.metrics.laggingNetworkHead(s.ctx) // otherwise, request head from the network // TODO: Besides requesting we should listen for new gossiped headers and cancel request if so // @@ -35,12 +34,12 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err // so just recursively get it return s.Head(ctx) } + s.metrics.laggingNetworkHead(s.ctx) defer s.getter.Unlock() // limit time to get a recent header // if we can't get it - give what we have reqCtx, cancel := context.WithTimeout(ctx, time.Second*2) // TODO(@vgonkivs): make timeout configurable defer cancel() - s.metrics.readHeaderGetter(s.ctx) netHead, err := s.getter.Head(reqCtx, header.WithTrustedHead[H](sbjHead)) if err != nil { log.Warnw("failed to get recent head, returning current subjective", "sbjHead", sbjHead.Height(), "err", err) @@ -87,11 +86,12 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { return s.subjectiveHead(ctx) } defer s.getter.Unlock() - s.metrics.readHeaderGetter(s.ctx) + trustHead, err := s.getter.Head(ctx) if err != nil { return trustHead, err } + s.metrics.subjectiveInitialization(s.ctx) // and set it as the new subjective head without validation, // or, in other words, do 'automatic subjective initialization' // NOTE: we avoid validation as the head expired to prevent possibility of the Long-Range Attack @@ -103,11 +103,10 @@ func (s *Syncer[H]) subjectiveHead(ctx context.Context) (H, error) { case isExpired(trustHead, s.Params.TrustingPeriod): log.Warnw("subjective initialization with an expired header", "height", trustHead.Height()) case !isRecent(trustHead, s.Params.blockTime, s.Params.recencyThreshold): - s.metrics.laggingNetworkHead(s.ctx) log.Warnw("subjective initialization with an old header", "height", trustHead.Height()) } log.Warn("trusted peer is out of sync") - s.metrics.peersOutOufSync(s.ctx) + s.metrics.trustedPeersOutOufSync(s.ctx) return trustHead, nil } From 796ce001e9610d22582bcf7fdaf938f93879a7a7 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 31 Jan 2024 13:21:50 +0200 Subject: [PATCH 17/21] fix nits --- sync/metrics.go | 14 +++++++------- sync/sync_head.go | 2 +- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index 5dc00248..a068ca7f 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -28,7 +28,7 @@ type metrics struct { func newMetrics() (*metrics, error) { syncLoopStarted, err := meter.Int64Counter( - "hdr_sync_loop_started", + "hdr_sync_loop_started_counter", metric.WithDescription("sync loop started shows that syncing is in progress"), ) if err != nil { @@ -36,7 +36,7 @@ func newMetrics() (*metrics, error) { } trustedPeersOutOfSync, err := meter.Int64Counter( - "hdr_sync_trust_peers_out_of_sync", + "hdr_sync_trust_peers_out_of_sync_counter", metric.WithDescription("trusted peers out of sync and gave outdated header"), ) if err != nil { @@ -44,7 +44,7 @@ func newMetrics() (*metrics, error) { } unrecentHeader, err := meter.Int64Counter( - "hdr_sync_unrecent_header", + "hdr_sync_unrecent_header_counter", metric.WithDescription("tracks every time Syncer returns an unrecent header"), ) if err != nil { @@ -52,7 +52,7 @@ func newMetrics() (*metrics, error) { } subjectiveInit, err := meter.Int64Counter( - "hdr_sync_subjective_init", + "hdr_sync_subjective_init_counter", metric.WithDescription( "tracks how many times is the node initialized ", ), @@ -62,7 +62,7 @@ func newMetrics() (*metrics, error) { } subjectiveHead, err := meter.Int64ObservableGauge( - "hdr_sync_subjective_head", + "hdr_sync_subjective_head_gauge", metric.WithDescription("subjective head height"), ) if err != nil { @@ -70,7 +70,7 @@ func newMetrics() (*metrics, error) { } blockTime, err := meter.Float64Histogram( - "hdr_sync_actual_blockTime_ts", + "hdr_sync_actual_blockTime_ts_hist", metric.WithDescription("duration between creation of 2 blocks"), ) if err != nil { @@ -105,7 +105,7 @@ func (m *metrics) syncingStarted(ctx context.Context) { }) } -func (m *metrics) laggingNetworkHead(ctx context.Context) { +func (m *metrics) unrecentHead(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { m.unrecentHeader.Add(ctx, 1) }) diff --git a/sync/sync_head.go b/sync/sync_head.go index 1b6231ce..ad84344b 100644 --- a/sync/sync_head.go +++ b/sync/sync_head.go @@ -34,12 +34,12 @@ func (s *Syncer[H]) Head(ctx context.Context, _ ...header.HeadOption[H]) (H, err // so just recursively get it return s.Head(ctx) } - s.metrics.laggingNetworkHead(s.ctx) defer s.getter.Unlock() // limit time to get a recent header // if we can't get it - give what we have reqCtx, cancel := context.WithTimeout(ctx, time.Second*2) // TODO(@vgonkivs): make timeout configurable defer cancel() + s.metrics.unrecentHead(s.ctx) netHead, err := s.getter.Head(reqCtx, header.WithTrustedHead[H](sbjHead)) if err != nil { log.Warnw("failed to get recent head, returning current subjective", "sbjHead", sbjHead.Height(), "err", err) From f3272177ce277bc0bb666305d2eeb25be2f84170 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 31 Jan 2024 18:00:13 +0200 Subject: [PATCH 18/21] chore: add additional metrics + refactoring --- sync/metrics.go | 64 +++++++++++++++++++++++++++++++++++++++++++------ sync/sync.go | 5 +++- 2 files changed, 61 insertions(+), 8 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index a068ca7f..eca2e9c0 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -6,21 +6,30 @@ import ( "time" "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" "go.opentelemetry.io/otel/metric" ) var meter = otel.Meter("header/sync") type metrics struct { - subjectiveHeadInst metric.Int64ObservableGauge + syncerReg metric.Registration + + subjectiveHeadInst metric.Int64ObservableGauge + syncLoopRunningInst metric.Int64ObservableGauge syncLoopStarted metric.Int64Counter trustedPeersOutOfSync metric.Int64Counter unrecentHeader metric.Int64Counter subjectiveInit metric.Int64Counter - subjectiveHead atomic.Int64 - subjectiveHeadReg metric.Registration + subjectiveHead atomic.Int64 + + syncLoopDurationHist metric.Float64Histogram + syncLoopActive atomic.Int64 + syncStartedTs time.Time + + requestRangeTimeHist metric.Float64Histogram blockTime metric.Float64Histogram prevHeader time.Time @@ -69,6 +78,26 @@ func newMetrics() (*metrics, error) { return nil, err } + syncLoopDurationHist, err := meter.Float64Histogram( + "hdr_sync_loop_time_hist", + metric.WithDescription("tracks the duration of syncing")) + if err != nil { + return nil, err + } + + requestRangeTimeHist, err := meter.Float64Histogram("hdr_sync_range_request_time_hist", + metric.WithDescription("tracks the duration of GetRangeByHeight requests")) + if err != nil { + return nil, err + } + + syncLoopRunningInst, err := meter.Int64ObservableGauge( + "hdr_sync_loop_status_gauge", + metric.WithDescription("reports whether syncing is active or not")) + if err != nil { + return nil, err + } + blockTime, err := meter.Float64Histogram( "hdr_sync_actual_blockTime_ts_hist", metric.WithDescription("duration between creation of 2 blocks"), @@ -82,11 +111,14 @@ func newMetrics() (*metrics, error) { trustedPeersOutOfSync: trustedPeersOutOfSync, unrecentHeader: unrecentHeader, subjectiveInit: subjectiveInit, + syncLoopDurationHist: syncLoopDurationHist, + syncLoopRunningInst: syncLoopRunningInst, + requestRangeTimeHist: requestRangeTimeHist, blockTime: blockTime, subjectiveHeadInst: subjectiveHead, } - m.subjectiveHeadReg, err = meter.RegisterCallback(m.newHead, m.subjectiveHeadInst) + m.syncerReg, err = meter.RegisterCallback(m.observeMetrics, m.subjectiveHeadInst, m.syncLoopRunningInst) if err != nil { return nil, err } @@ -94,14 +126,24 @@ func newMetrics() (*metrics, error) { return m, nil } -func (m *metrics) newHead(_ context.Context, obs metric.Observer) error { +func (m *metrics) observeMetrics(_ context.Context, obs metric.Observer) error { obs.ObserveInt64(m.subjectiveHeadInst, m.subjectiveHead.Load()) + obs.ObserveInt64(m.syncLoopRunningInst, m.syncLoopActive.Load()) return nil } -func (m *metrics) syncingStarted(ctx context.Context) { +func (m *metrics) syncStarted(ctx context.Context) { m.observe(ctx, func(ctx context.Context) { + m.syncStartedTs = time.Now() m.syncLoopStarted.Add(ctx, 1) + m.syncLoopActive.Store(1) + }) +} + +func (m *metrics) syncFinished(ctx context.Context) { + m.observe(ctx, func(ctx context.Context) { + m.syncLoopActive.Store(0) + m.syncLoopDurationHist.Record(ctx, time.Since(m.syncStartedTs).Seconds()) }) } @@ -123,6 +165,14 @@ func (m *metrics) subjectiveInitialization(ctx context.Context) { }) } +func (m *metrics) getRangeRequestTime(ctx context.Context, duration time.Duration, amount int, failed bool) { + m.requestRangeTimeHist.Record(ctx, duration.Seconds(), + metric.WithAttributes( + attribute.Int("headers amount", amount), + attribute.Bool("request failed", failed), + )) +} + func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestamp time.Time) { m.observe(ctx, func(ctx context.Context) { m.subjectiveHead.Store(int64(height)) @@ -147,5 +197,5 @@ func (m *metrics) Close() error { if m == nil { return nil } - return m.subjectiveHeadReg.Unregister() + return m.syncerReg.Unregister() } diff --git a/sync/sync.go b/sync/sync.go index d58be346..2aa910bc 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -180,8 +180,9 @@ func (s *Syncer[H]) syncLoop() { for { select { case <-s.triggerSync: - s.metrics.syncingStarted(s.ctx) + s.metrics.syncStarted(s.ctx) s.sync(s.ctx) + s.metrics.syncFinished(s.ctx) case <-s.ctx.Done(): return } @@ -315,8 +316,10 @@ func (s *Syncer[H]) requestHeaders( size = amount } + now := time.Now() to := fromHead.Height() + size + 1 headers, err := s.getter.GetRangeByHeight(ctx, fromHead, to) + s.metrics.getRangeRequestTime(s.ctx, time.Since(now), int(size), err != nil) if err != nil { return err } From 167e3062cee9f1db8c53c040cbe0e56311fdb4ea Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 31 Jan 2024 18:19:43 +0200 Subject: [PATCH 19/21] fix: divide headers amount by 100 --- sync/sync.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/sync/sync.go b/sync/sync.go index 2aa910bc..ffd0d5f9 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -319,7 +319,7 @@ func (s *Syncer[H]) requestHeaders( now := time.Now() to := fromHead.Height() + size + 1 headers, err := s.getter.GetRangeByHeight(ctx, fromHead, to) - s.metrics.getRangeRequestTime(s.ctx, time.Since(now), int(size), err != nil) + s.metrics.getRangeRequestTime(s.ctx, time.Since(now), int(size)/100, err != nil) if err != nil { return err } From 3e7e45e787a09b2b4f934a97aa1e1ce094d218b0 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Wed, 31 Jan 2024 18:43:57 +0200 Subject: [PATCH 20/21] fix test --- sync/metrics.go | 12 +++++++----- 1 file changed, 7 insertions(+), 5 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index eca2e9c0..e2d75024 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -166,11 +166,13 @@ func (m *metrics) subjectiveInitialization(ctx context.Context) { } func (m *metrics) getRangeRequestTime(ctx context.Context, duration time.Duration, amount int, failed bool) { - m.requestRangeTimeHist.Record(ctx, duration.Seconds(), - metric.WithAttributes( - attribute.Int("headers amount", amount), - attribute.Bool("request failed", failed), - )) + m.observe(ctx, func(ctx context.Context) { + m.requestRangeTimeHist.Record(ctx, duration.Seconds(), + metric.WithAttributes( + attribute.Int("headers amount", amount), + attribute.Bool("request failed", failed), + )) + }) } func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestamp time.Time) { From b3f571978050849a62d910bf8d38b20c34a0f701 Mon Sep 17 00:00:00 2001 From: Viacheslav Gonkivskyi Date: Thu, 1 Feb 2024 12:02:44 +0200 Subject: [PATCH 21/21] chore: avoid calling expensive time.Now if metrics are not enabled --- sync/metrics.go | 19 +++++++++++++++++-- sync/sync.go | 5 +++-- 2 files changed, 20 insertions(+), 4 deletions(-) diff --git a/sync/metrics.go b/sync/metrics.go index e2d75024..a0916720 100644 --- a/sync/metrics.go +++ b/sync/metrics.go @@ -30,6 +30,7 @@ type metrics struct { syncStartedTs time.Time requestRangeTimeHist metric.Float64Histogram + requestRangeStartTs time.Time blockTime metric.Float64Histogram prevHeader time.Time @@ -165,9 +166,9 @@ func (m *metrics) subjectiveInitialization(ctx context.Context) { }) } -func (m *metrics) getRangeRequestTime(ctx context.Context, duration time.Duration, amount int, failed bool) { +func (m *metrics) updateGetRangeRequestInfo(ctx context.Context, amount int, failed bool) { m.observe(ctx, func(ctx context.Context) { - m.requestRangeTimeHist.Record(ctx, duration.Seconds(), + m.requestRangeTimeHist.Record(ctx, time.Since(m.requestRangeStartTs).Seconds(), metric.WithAttributes( attribute.Int("headers amount", amount), attribute.Bool("request failed", failed), @@ -185,6 +186,20 @@ func (m *metrics) newSubjectiveHead(ctx context.Context, height uint64, timestam }) } +func (m *metrics) rangeRequestStart() { + if m == nil { + return + } + m.requestRangeStartTs = time.Now() +} + +func (m *metrics) rangeRequestStop() { + if m == nil { + return + } + m.requestRangeStartTs = time.Time{} +} + func (m *metrics) observe(ctx context.Context, observeFn func(context.Context)) { if m == nil { return diff --git a/sync/sync.go b/sync/sync.go index ffd0d5f9..bfb3bf6f 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -316,10 +316,11 @@ func (s *Syncer[H]) requestHeaders( size = amount } - now := time.Now() to := fromHead.Height() + size + 1 + s.metrics.rangeRequestStart() headers, err := s.getter.GetRangeByHeight(ctx, fromHead, to) - s.metrics.getRangeRequestTime(s.ctx, time.Since(now), int(size)/100, err != nil) + s.metrics.updateGetRangeRequestInfo(s.ctx, int(size)/100, err != nil) + s.metrics.rangeRequestStop() if err != nil { return err }