From 31329aa37cfeacd3eccae86bf9dced7316850c47 Mon Sep 17 00:00:00 2001 From: tamirms Date: Mon, 24 Jun 2024 15:26:47 +0100 Subject: [PATCH] Code review fixes --- services/horizon/cmd/db.go | 7 +++-- .../horizon/internal/db2/history/reap_test.go | 2 +- services/horizon/internal/flags.go | 8 ++++++ services/horizon/internal/httpt_test.go | 2 +- services/horizon/internal/ingest/main.go | 11 +++----- services/horizon/internal/ingest/reap.go | 27 ++++++++++--------- services/horizon/internal/ingest/reap_test.go | 4 +-- services/horizon/internal/init.go | 8 +++--- 8 files changed, 40 insertions(+), 29 deletions(-) diff --git a/services/horizon/cmd/db.go b/services/horizon/cmd/db.go index 4aa333b120..e2589a7385 100644 --- a/services/horizon/cmd/db.go +++ b/services/horizon/cmd/db.go @@ -7,6 +7,7 @@ import ( "go/types" "log" "os" + "os/signal" "strconv" "strings" @@ -235,11 +236,13 @@ var dbReapCmd = &cobra.Command{ reaper := ingest.NewReaper( ingest.ReapConfig{ RetentionCount: uint32(globalConfig.HistoryRetentionCount), - ReapBatchSize: uint32(globalConfig.HistoryRetentionReapCount), + BatchSize: uint32(globalConfig.HistoryRetentionReapCount), }, session, ) - return reaper.DeleteUnretainedHistory(context.Background()) + ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, os.Kill) + defer cancel() + return reaper.DeleteUnretainedHistory(ctx) }, } diff --git a/services/horizon/internal/db2/history/reap_test.go b/services/horizon/internal/db2/history/reap_test.go index bf6b5afd08..508041eb22 100644 --- a/services/horizon/internal/db2/history/reap_test.go +++ b/services/horizon/internal/db2/history/reap_test.go @@ -17,7 +17,7 @@ func TestReapLookupTables(t *testing.T) { reaper := ingest.NewReaper( ingest.ReapConfig{ RetentionCount: 1, - ReapBatchSize: 50, + BatchSize: 50, }, db, ) diff --git a/services/horizon/internal/flags.go b/services/horizon/internal/flags.go index 2a4fd27dd5..7321d07fb2 100644 --- a/services/horizon/internal/flags.go +++ b/services/horizon/internal/flags.go @@ -678,6 +678,14 @@ func Flags() (*Config, support.ConfigOptions) { "A value of 1 implies history is trimmed after every ledger. " + "A value of 2 implies history is trimmed on every second ledger.", UsedInCommands: IngestionCommands, + CustomSetValue: func(opt *support.ConfigOption) error { + val := viper.GetUint(opt.Name) + if val <= 0 { + return fmt.Errorf("flag --reap-frequency must be positive") + } + *(opt.ConfigKey.(*uint)) = val + return nil + }, }, &support.ConfigOption{ Name: "history-stale-threshold", diff --git a/services/horizon/internal/httpt_test.go b/services/horizon/internal/httpt_test.go index aead868a48..8dfae08979 100644 --- a/services/horizon/internal/httpt_test.go +++ b/services/horizon/internal/httpt_test.go @@ -106,7 +106,7 @@ func (ht *HTTPT) ReapHistory(retention uint32) { reaper := ingest.NewReaper( ingest.ReapConfig{ RetentionCount: retention, - ReapBatchSize: 50_000, + BatchSize: 50_000, }, ht.HorizonSession()) ht.Require.NoError(reaper.DeleteUnretainedHistory(context.Background())) diff --git a/services/horizon/internal/ingest/main.go b/services/horizon/internal/ingest/main.go index ade7ffa234..a9610805db 100644 --- a/services/horizon/internal/ingest/main.go +++ b/services/horizon/internal/ingest/main.go @@ -114,9 +114,7 @@ type Config struct { CoreProtocolVersionFn ledgerbackend.CoreProtocolVersionFunc CoreBuildVersionFn ledgerbackend.CoreBuildVersionFunc - ReapFrequency uint - HistoryRetentionCount uint - HistoryRetentionReapCount uint + ReapConfig ReapConfig } const ( @@ -326,10 +324,7 @@ func NewSystem(config Config) (System, error) { ), maxLedgerPerFlush: maxLedgersPerFlush, reaper: NewReaper( - ReapConfig{ - RetentionCount: uint32(config.HistoryRetentionCount), - ReapBatchSize: uint32(config.HistoryRetentionReapCount), - }, + config.ReapConfig, config.HistorySession, ), } @@ -710,7 +705,7 @@ func (s *system) runStateMachine(cur stateMachineNode) error { } func (s *system) maybeReapHistory(lastIngestedLedger uint32) { - if s.config.ReapFrequency == 0 || lastIngestedLedger%uint32(s.config.ReapFrequency) != 0 { + if s.config.ReapConfig.Frequency == 0 || lastIngestedLedger%uint32(s.config.ReapConfig.Frequency) != 0 { return } s.wg.Add(1) diff --git a/services/horizon/internal/ingest/reap.go b/services/horizon/internal/ingest/reap.go index 546f1715cb..07a61a4cde 100644 --- a/services/horizon/internal/ingest/reap.go +++ b/services/horizon/internal/ingest/reap.go @@ -4,7 +4,7 @@ import ( "context" "fmt" "strconv" - "sync" + "sync/atomic" "time" "github.com/prometheus/client_golang/prometheus" @@ -20,6 +20,7 @@ import ( type Reaper struct { historyQ history.IngestionQ reapLockQ history.IngestionQ + pending atomic.Bool config ReapConfig logger *logpkg.Entry @@ -27,13 +28,12 @@ type Reaper struct { totalDeleted *prometheus.SummaryVec deleteBatchDuration prometheus.Summary rowsInBatchDeleted prometheus.Summary - - lock sync.Mutex } type ReapConfig struct { + Frequency uint RetentionCount uint32 - ReapBatchSize uint32 + BatchSize uint32 } // NewReaper creates a new Reaper instance @@ -53,7 +53,7 @@ func newReaper(config ReapConfig, historyQ, reapLockQ history.IngestionQ) *Reape }), rowsInBatchDeleted: prometheus.NewSummary(prometheus.SummaryOpts{ Namespace: "horizon", Subsystem: "reap", Name: "batch_rows_deleted", - Help: "rows deleted during reap batch , sliding window = 10m", + Help: "rows deleted during reap batch, sliding window = 10m", Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, }), totalDuration: prometheus.NewSummaryVec(prometheus.SummaryOpts{ @@ -77,11 +77,12 @@ func (r *Reaper) DeleteUnretainedHistory(ctx context.Context) error { return nil } - if !r.lock.TryLock() { - r.logger.Infof("reap already in progress") + // check if reap is already in progress on this horizon node + if !r.pending.CompareAndSwap(false, true) { + r.logger.Infof("existing reap already in progress, skipping request to start a new one") return nil } - defer r.lock.Unlock() + defer r.pending.Store(false) if err := r.reapLockQ.Begin(ctx); err != nil { return errors.Wrap(err, "error while starting reaper lock transaction") @@ -91,8 +92,9 @@ func (r *Reaper) DeleteUnretainedHistory(ctx context.Context) error { r.logger.WithField("error", err).Error("failed to release reaper lock") } }() + // check if reap is already in progress on another horizon node if acquired, err := r.reapLockQ.TryReaperLock(ctx); err != nil { - return errors.Wrap(err, "error while acquiring reaper lock") + return errors.Wrap(err, "error while acquiring reaper database lock") } else if !acquired { r.logger.Info("reap already in progress on another node") return nil @@ -154,10 +156,11 @@ func (s *Reaper) RegisterMetrics(registry *prometheus.Registry) { ) } -// Work backwards in 50k (by default, otherwise configurable via the CLI) ledger +// Work in 50k (by default, otherwise configurable via the CLI) ledger // blocks to prevent using all the CPU. // -// This runs every hour, so we need to make sure it doesn't run for longer than +// By default, this runs every 720 ledgers (approximately 1 hour), so we +// need to make sure it doesn't run for longer than // an hour. // // Current ledger at 2024-04-04s is 51,092,283, so 50k means 1021 batches. At 1 @@ -166,7 +169,7 @@ func (s *Reaper) RegisterMetrics(registry *prometheus.Registry) { var sleep = 1 * time.Second func (r *Reaper) clearBefore(ctx context.Context, startSeq, endSeq uint32) (int64, error) { - batchSize := r.config.ReapBatchSize + batchSize := r.config.BatchSize var sum int64 if batchSize <= 0 { return sum, fmt.Errorf("invalid batch size for reaping (%d)", batchSize) diff --git a/services/horizon/internal/ingest/reap_test.go b/services/horizon/internal/ingest/reap_test.go index cad1397f5b..6216cc0e4e 100644 --- a/services/horizon/internal/ingest/reap_test.go +++ b/services/horizon/internal/ingest/reap_test.go @@ -22,7 +22,7 @@ func TestDeleteUnretainedHistory(t *testing.T) { reaper := NewReaper(ReapConfig{ RetentionCount: 0, - ReapBatchSize: 50, + BatchSize: 50, }, db) // Disable sleeps for this. @@ -82,7 +82,7 @@ func (t *ReaperTestSuite) SetupTest() { t.reapLockQ = &mockDBQ{} t.reaper = newReaper(ReapConfig{ RetentionCount: 30, - ReapBatchSize: 10, + BatchSize: 10, }, t.historyQ, t.reapLockQ) t.prevSleep = sleep sleep = 0 diff --git a/services/horizon/internal/init.go b/services/horizon/internal/init.go index 3b9d0e37f8..137ff26aff 100644 --- a/services/horizon/internal/init.go +++ b/services/horizon/internal/init.go @@ -103,9 +103,11 @@ func initIngester(app *App) { EnableExtendedLogLedgerStats: app.config.IngestEnableExtendedLogLedgerStats, RoundingSlippageFilter: app.config.RoundingSlippageFilter, SkipTxmeta: app.config.SkipTxmeta, - HistoryRetentionCount: app.config.HistoryRetentionCount, - HistoryRetentionReapCount: app.config.HistoryRetentionReapCount, - ReapFrequency: app.config.ReapFrequency, + ReapConfig: ingest.ReapConfig{ + Frequency: app.config.ReapFrequency, + RetentionCount: uint32(app.config.HistoryRetentionCount), + BatchSize: uint32(app.config.HistoryRetentionReapCount), + }, }) if err != nil {