From e1107328b9a17e69ad4c57fb8922f1767d5f87cb Mon Sep 17 00:00:00 2001 From: George Date: Wed, 4 Sep 2024 12:27:53 -0700 Subject: [PATCH] Improve startup by eliminating unnecessary migration ranges (#282) Speed up startup time in "already migrated" cases by eliminating unnecessary ledger range traversals. Specifically, this includes the following changes: - Fee stats windows cannot exceed the history retention window, as this doesn't make sense. - Migrations that are already applied are not added to the list of "multi-migrations". - Fee stats window building conforms to the migration interface to simplify code. - LedgerSeqRange has been refactored to always use a value reference. As a result, if all migrations have occurred, the traversal only occurs over the fee window. --- cmd/soroban-rpc/internal/daemon/daemon.go | 102 +++++++++++++----- cmd/soroban-rpc/internal/db/event.go | 6 +- cmd/soroban-rpc/internal/db/migration.go | 76 ++++++++----- cmd/soroban-rpc/internal/db/transaction.go | 6 +- .../internal/feewindow/feewindow.go | 37 ++++++- 5 files changed, 165 insertions(+), 62 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index dfb8e84b..07ca7992 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -34,11 +34,15 @@ import ( ) const ( - prometheusNamespace = "soroban_rpc" - maxLedgerEntryWriteBatchSize = 150 - defaultReadTimeout = 5 * time.Second - defaultShutdownGracePeriod = 10 * time.Second - inMemoryInitializationLedgerLogPeriod = 1_000_000 + prometheusNamespace = "soroban_rpc" + maxLedgerEntryWriteBatchSize = 150 + defaultReadTimeout = 5 * time.Second + defaultShutdownGracePeriod = 10 * time.Second + + // Since our default retention window will be 7 days (7*17,280 ledgers), + // choose a random 5-digit prime to have irregular logging intervals at each + // halfish-day of processing + inMemoryInitializationLedgerLogPeriod = 10_099 ) type Daemon struct { @@ -289,6 +293,29 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { // mustInitializeStorage initializes the storage using what was on the DB func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows { + // + // There's some complex "ledger window math" here so we should clarify it + // beforehand. + // + // There are two windows in play here: + // - the ledger retention window, which describes the range of txmeta + // to keep relative to the latest "ledger tip" of the network + // - the fee stats window, which describes a *subset* of the prior + // ledger retention window on which to perform fee analysis + // + // If the fee window *exceeds* the retention window, this doesn't make any + // sense since it implies the user wants to store N amount of actual + // historical data and M > N amount of ledgers just for fee processing, + // which is nonsense from a performance standpoint. We prevent this: + maxFeeRetentionWindow := max( + cfg.ClassicFeeStatsLedgerRetentionWindow, + cfg.SorobanFeeStatsLedgerRetentionWindow) + if maxFeeRetentionWindow > cfg.HistoryRetentionWindow { + d.logger.Fatalf( + "Fee stat analysis window (%d) cannot exceed history retention window (%d).", + maxFeeRetentionWindow, cfg.HistoryRetentionWindow) + } + feeWindows := feewindow.NewFeeWindows( cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, @@ -299,27 +326,42 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout) defer cancelReadTxMeta() - var initialSeq, currentSeq uint32 - applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow) + // To combine these windows, we launch as follows: + // + // 1. First, identify the ledger range for database migrations based on the + // ledger retention window. Since we don't do "partial" migrations (all or + // nothing), this represents the entire range of ledger metas we store. + // + retentionRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow) if err != nil { d.logger.WithError(err).Fatal("could not get ledger range for migration") } - maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow) - feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow) + dataMigrations, err := db.BuildMigrations( + readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, retentionRange) if err != nil { - d.logger.WithError(err).Fatal("could not get ledger range for fee stats") + d.logger.WithError(err).Fatal("could not build migrations") } - // Combine the ledger range for fees, events and transactions - ledgerSeqRange := feeStatsRange.Merge(applicableRange) - - dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) + // 2. Then, incorporate the fee analysis window. If there are migrations to + // do, this has no effect, since migration windows are larger than the fee + // window. In the absence of migrations, though, this means the ingestion + // range is just the fee stat range. + // + feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow) if err != nil { - d.logger.WithError(err).Fatal("could not build migrations") + d.logger.WithError(err).Fatal("could not get ledger range for fee stats") } - // Apply migration for events, transactions and fee stats + // Additionally, by treating the fee window *as if* it's a migration, we can + // make the interface here really clean. + dataMigrations.Append(feeWindows.AsMigration(feeStatsRange)) + ledgerSeqRange := dataMigrations.ApplicableRange() + + // + // 3. Apply all migrations, including fee stat analysis. + // + var initialSeq, currentSeq uint32 err = db.NewLedgerReader(d.db).StreamLedgerRange( readTxMetaCtx, ledgerSeqRange.First, @@ -328,32 +370,36 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows currentSeq = txMeta.LedgerSequence() if initialSeq == 0 { initialSeq = currentSeq - d.logger.WithField("seq", currentSeq). - Info("initializing in-memory store") + d.logger. + WithField("first", initialSeq). + WithField("last", ledgerSeqRange.Last). + Info("Initializing in-memory store") } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { - d.logger.WithField("seq", currentSeq). - Debug("still initializing in-memory store") - } - - if err = feeWindows.IngestFees(txMeta); err != nil { - d.logger.WithError(err).Fatal("could not initialize fee stats") + d.logger. + WithField("seq", currentSeq). + WithField("last", ledgerSeqRange.Last). + Debug("Still initializing in-memory store") } if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil { d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq) } + return nil }) if err != nil { - d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") + d.logger.WithError(err).Fatal("Could not obtain txmeta cache from the database") } + if err := dataMigrations.Commit(readTxMetaCtx); err != nil { - d.logger.WithError(err).Fatal("could not commit data migrations") + d.logger.WithError(err).Fatal("Could not commit data migrations") } if currentSeq != 0 { - d.logger.WithField("seq", currentSeq). - Info("finished initializing in-memory store and applying DB data migrations") + d.logger. + WithField("first", retentionRange.First). + WithField("last", retentionRange.Last). + Info("Finished initializing in-memory store and applying DB data migrations") } return feeWindows diff --git a/cmd/soroban-rpc/internal/db/event.go b/cmd/soroban-rpc/internal/db/event.go index bf956219..faeae767 100644 --- a/cmd/soroban-rpc/internal/db/event.go +++ b/cmd/soroban-rpc/internal/db/event.go @@ -311,8 +311,8 @@ type eventTableMigration struct { writer EventWriter } -func (e *eventTableMigration) ApplicableRange() *LedgerSeqRange { - return &LedgerSeqRange{ +func (e *eventTableMigration) ApplicableRange() LedgerSeqRange { + return LedgerSeqRange{ First: e.firstLedger, Last: e.lastLedger, } @@ -326,7 +326,7 @@ func newEventTableMigration( _ context.Context, logger *log.Entry, passphrase string, - ledgerSeqRange *LedgerSeqRange, + ledgerSeqRange LedgerSeqRange, ) migrationApplierFactory { return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) { migration := eventTableMigration{ diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 9f7aabd9..8ab14e77 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -19,38 +19,40 @@ type LedgerSeqRange struct { Last uint32 } -func (mlr *LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool { - if mlr == nil { - return false - } +func (mlr LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool { return ledgerSeq >= mlr.First && ledgerSeq <= mlr.Last } -func (mlr *LedgerSeqRange) Merge(other *LedgerSeqRange) *LedgerSeqRange { - if mlr == nil { +func (mlr LedgerSeqRange) Merge(other LedgerSeqRange) LedgerSeqRange { + if mlr.Empty() { return other } - if other == nil { + if other.Empty() { return mlr } + // TODO: using min/max can result in a much larger range than needed, // as an optimization, we should probably use a sequence of ranges instead. - return &LedgerSeqRange{ + return LedgerSeqRange{ First: min(mlr.First, other.First), Last: max(mlr.Last, other.Last), } } +func (mlr LedgerSeqRange) Empty() bool { + return mlr.First == 0 && mlr.Last == 0 +} + type MigrationApplier interface { // ApplicableRange returns the closed ledger sequence interval, - // where Apply() should be called. A null result indicates the empty range - ApplicableRange() *LedgerSeqRange + // where Apply() should be called. + ApplicableRange() LedgerSeqRange // Apply applies the migration on a ledger. It should never be applied // in ledgers outside the ApplicableRange() Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error } -type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory +type migrationApplierF func(context.Context, *log.Entry, string, LedgerSeqRange) migrationApplierFactory type migrationApplierFactory interface { New(db *DB) (MigrationApplier, error) @@ -72,8 +74,15 @@ type MultiMigration struct { db *DB } -func (mm MultiMigration) ApplicableRange() *LedgerSeqRange { - var result *LedgerSeqRange +func (mm *MultiMigration) Append(m Migration) { + r := m.ApplicableRange() + if !r.Empty() { + mm.migrations = append(mm.migrations, m) + } +} + +func (mm MultiMigration) ApplicableRange() LedgerSeqRange { + var result LedgerSeqRange for _, m := range mm.migrations { result = m.ApplicableRange().Merge(result) } @@ -117,13 +126,18 @@ type guardedMigration struct { } func newGuardedDataMigration( - ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB, + ctx context.Context, uniqueMigrationName string, + logger *log.Entry, factory migrationApplierFactory, db *DB, ) (Migration, error) { metaKey := "Migration" + uniqueMigrationName + "Done" previouslyMigrated, err := getMetaBool(ctx, db, metaKey) if err != nil && !errors.Is(err, ErrEmptyDB) { return nil, err } + if previouslyMigrated { + //nolint:nilnil // a sentinel value here would be stupid + return nil, nil + } applier, err := factory.New(db) if err != nil { return nil, err @@ -145,15 +159,15 @@ func (g *guardedMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) return nil } if !g.applyLogged { - g.logger.WithField("ledger", meta.LedgerSequence()).Info("applying migration") + g.logger.WithField("ledger", meta.LedgerSequence()).Info("Applying migration") g.applyLogged = true } return g.migration.Apply(ctx, meta) } -func (g *guardedMigration) ApplicableRange() *LedgerSeqRange { +func (g *guardedMigration) ApplicableRange() LedgerSeqRange { if g.alreadyMigrated { - return nil + return LedgerSeqRange{} } return g.migration.ApplicableRange() } @@ -165,23 +179,24 @@ func (g *guardedMigration) Commit(ctx context.Context) error { return setMetaBool(ctx, g.db, g.guardMetaKey, true) } -func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) { +func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (LedgerSeqRange, error) { firstLedgerToMigrate := firstLedger latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) if err != nil && !errors.Is(err, ErrEmptyDB) { - return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err) + return LedgerSeqRange{}, fmt.Errorf("failed to get latest ledger sequence: %w", err) } if latestLedger > retentionWindow { firstLedgerToMigrate = latestLedger - retentionWindow } - return &LedgerSeqRange{ + return LedgerSeqRange{ First: firstLedgerToMigrate, Last: latestLedger, }, nil } -func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, - ledgerSeqRange *LedgerSeqRange, +func BuildMigrations( + ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, + ledgerSeqRange LedgerSeqRange, ) (MultiMigration, error) { // Start a common db transaction for the entire migration duration err := db.Begin(ctx) @@ -189,14 +204,16 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPass return MultiMigration{}, errors.Join(err, db.Rollback()) } - migrationNameToFunc := map[string]migrationApplierF{ + // + // Add new DB migrations here: + // + currentMigrations := map[string]migrationApplierF{ transactionsMigrationName: newTransactionTableMigration, eventsMigrationName: newEventTableMigration, } - migrations := make([]Migration, 0, len(migrationNameToFunc)) - - for migrationName, migrationFunc := range migrationNameToFunc { + migrations := make([]Migration, 0, len(currentMigrations)) + for migrationName, migrationFunc := range currentMigrations { migrationLogger := logger.WithField("migration", migrationName) factory := migrationFunc( ctx, @@ -210,8 +227,15 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPass return MultiMigration{}, errors.Join(err, fmt.Errorf( "could not create guarded migration for %s", migrationName), db.Rollback()) } + + if guardedM == nil { + logger.Infof("Skipping completed migration %s", migrationName) + continue + } + migrations = append(migrations, guardedM) } + return MultiMigration{ migrations: migrations, db: db, diff --git a/cmd/soroban-rpc/internal/db/transaction.go b/cmd/soroban-rpc/internal/db/transaction.go index 361bc6be..af836455 100644 --- a/cmd/soroban-rpc/internal/db/transaction.go +++ b/cmd/soroban-rpc/internal/db/transaction.go @@ -255,8 +255,8 @@ type transactionTableMigration struct { writer TransactionWriter } -func (t *transactionTableMigration) ApplicableRange() *LedgerSeqRange { - return &LedgerSeqRange{ +func (t *transactionTableMigration) ApplicableRange() LedgerSeqRange { + return LedgerSeqRange{ First: t.firstLedger, Last: t.lastLedger, } @@ -270,7 +270,7 @@ func newTransactionTableMigration( ctx context.Context, logger *log.Entry, passphrase string, - ledgerSeqRange *LedgerSeqRange, + ledgerSeqRange LedgerSeqRange, ) migrationApplierFactory { return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) { // Truncate the table, since it may contain data, causing insert conflicts later on. diff --git a/cmd/soroban-rpc/internal/feewindow/feewindow.go b/cmd/soroban-rpc/internal/feewindow/feewindow.go index b1d40bee..08a84ab4 100644 --- a/cmd/soroban-rpc/internal/feewindow/feewindow.go +++ b/cmd/soroban-rpc/internal/feewindow/feewindow.go @@ -2,6 +2,7 @@ package feewindow import ( + "context" "errors" "io" "slices" @@ -133,9 +134,9 @@ type FeeWindows struct { db *db.DB } -func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string, db *db.DB) *FeeWindows { +func NewFeeWindows(classicRetention uint32, sorobanRetention uint32, networkPassPhrase string, db *db.DB) *FeeWindows { return &FeeWindows{ - SorobanInclusionFeeWindow: NewFeeWindow(sorobanRetetion), + SorobanInclusionFeeWindow: NewFeeWindow(sorobanRetention), ClassicFeeWindow: NewFeeWindow(classicRetention), networkPassPhrase: networkPassPhrase, db: db, @@ -194,3 +195,35 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { } return nil } + +func (fw *FeeWindows) AsMigration(seqRange db.LedgerSeqRange) db.Migration { + return &feeWindowMigration{ + firstLedger: seqRange.First, + lastLedger: seqRange.Last, + windows: fw, + } +} + +type feeWindowMigration struct { + firstLedger uint32 + lastLedger uint32 + windows *FeeWindows +} + +func (fw *feeWindowMigration) ApplicableRange() db.LedgerSeqRange { + return db.LedgerSeqRange{ + First: fw.firstLedger, + Last: fw.lastLedger, + } +} + +func (fw *feeWindowMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta) error { + return fw.windows.IngestFees(meta) +} + +func (fw *feeWindowMigration) Commit(_ context.Context) error { + return nil // no-op +} + +// ensure we conform to the migration interface +var _ db.Migration = &feeWindowMigration{}