From b98e0c60c7625d3b747b50e2ec34e8cdc04e4bcf Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 31 Jul 2024 15:20:05 -0400 Subject: [PATCH 01/24] Fix migrations - 1 --- cmd/soroban-rpc/internal/daemon/daemon.go | 7 +++++-- cmd/soroban-rpc/internal/db/migration.go | 4 ++-- 2 files changed, 7 insertions(+), 4 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index b1983cb8..b59861f1 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -306,12 +306,15 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows // Merge migrations range and fee stats range to get the applicable range latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(readTxMetaCtx) - if err != nil { + if err != nil && !errors.Is(err, db.ErrEmptyDB) { d.logger.WithError(err).Fatal("failed to get latest ledger sequence: %w", err) } maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow) - ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: latestLedger - maxFeeRetentionWindow, LastLedgerSeq: latestLedger} + ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: 0, LastLedgerSeq: latestLedger} + if latestLedger > maxFeeRetentionWindow { + ledgerSeqRange.FirstLedgerSeq = latestLedger - maxFeeRetentionWindow + } applicableRange := dataMigrations.ApplicableRange() ledgerSeqRange = ledgerSeqRange.Merge(applicableRange) diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 4c7c9205..dd011863 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -204,7 +204,7 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config m1, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db) if err != nil { - return nil, fmt.Errorf("creating guarded transaction migration: %w", err) + return nil, fmt.Errorf("could not create guarded transaction migration: %w", err) } migrations = append(migrations, m1) @@ -216,7 +216,7 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config ) m2, err := newGuardedDataMigration(ctx, eventMigrationName, logger, eventFactory, db) if err != nil { - return nil, fmt.Errorf("creating guarded transaction migration: %w", err) + return nil, fmt.Errorf("could not create guarded event migration: %w", err) } migrations = append(migrations, m2) From 2277d08394c13d1569336f87f7b9b769febce088 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Thu, 8 Aug 2024 11:49:05 -0400 Subject: [PATCH 02/24] Make migrations sequential - 1 --- cmd/soroban-rpc/internal/daemon/daemon.go | 10 +++++++--- cmd/soroban-rpc/internal/db/migration.go | 15 +++++++++++++++ 2 files changed, 22 insertions(+), 3 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index b59861f1..7ab695b0 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -299,9 +299,9 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows defer cancelReadTxMeta() var initialSeq uint32 var currentSeq uint32 - dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg) + applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow) if err != nil { - d.logger.WithError(err).Fatal("could not build migrations") + d.logger.WithError(err).Fatal("could not get ledger range for migration") } // Merge migrations range and fee stats range to get the applicable range @@ -315,9 +315,13 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows if latestLedger > maxFeeRetentionWindow { ledgerSeqRange.FirstLedgerSeq = latestLedger - maxFeeRetentionWindow } - applicableRange := dataMigrations.ApplicableRange() ledgerSeqRange = ledgerSeqRange.Merge(applicableRange) + dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg) + if err != nil { + d.logger.WithError(err).Fatal("could not build migrations") + } + err = db.NewLedgerReader(d.db).StreamLedgerRange( readTxMetaCtx, ledgerSeqRange.FirstLedgerSeq, diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index dd011863..140d04f4 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -190,6 +190,21 @@ func (g *guardedMigration) Rollback(_ context.Context) error { return g.db.Rollback() } +func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) { + firstLedgerToMigrate := firstLedger + latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) + if err != nil && !errors.Is(err, ErrEmptyDB) { + return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err) + } + if latestLedger > retentionWindow { + firstLedgerToMigrate = latestLedger - retentionWindow + } + return &LedgerSeqRange{ + FirstLedgerSeq: firstLedgerToMigrate, + LastLedgerSeq: latestLedger, + }, nil +} + func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) { var migrations []Migration From 78a9e23918f90c11a02e174634ab257af1cd4ef3 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Thu, 8 Aug 2024 13:56:29 -0400 Subject: [PATCH 03/24] Make migrations sequential - 2 --- cmd/soroban-rpc/internal/daemon/daemon.go | 78 +++++++++------- cmd/soroban-rpc/internal/db/event.go | 10 +- cmd/soroban-rpc/internal/db/migration.go | 103 +++++++-------------- cmd/soroban-rpc/internal/db/transaction.go | 15 ++- 4 files changed, 90 insertions(+), 116 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 7ab695b0..2ed63e05 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -317,44 +317,60 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows } ledgerSeqRange = ledgerSeqRange.Merge(applicableRange) - dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg) + dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) if err != nil { d.logger.WithError(err).Fatal("could not build migrations") } - err = db.NewLedgerReader(d.db).StreamLedgerRange( - readTxMetaCtx, - ledgerSeqRange.FirstLedgerSeq, - ledgerSeqRange.LastLedgerSeq, - func(txmeta xdr.LedgerCloseMeta) error { - currentSeq = txmeta.LedgerSequence() - if initialSeq == 0 { - initialSeq = currentSeq - d.logger.WithFields(supportlog.F{ - "seq": currentSeq, - }).Info("initializing in-memory store") - } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { - d.logger.WithFields(supportlog.F{ - "seq": currentSeq, - }).Debug("still initializing in-memory store") - } + for _, migrationFactory := range dataMigrations { + // create guarded migration + guardedMigration, err := db.NewGuardedDataMigration( + readTxMetaCtx, + migrationFactory.MigrationName, + migrationFactory.Logger, + migrationFactory.Factory, + migrationFactory.DB, + ) + if err != nil { + d.logger.WithError(err).Fatal("could not create guarded migration for: %s", + migrationFactory.MigrationName) + } - if err := feewindows.IngestFees(txmeta); err != nil { - d.logger.WithError(err).Fatal("could not initialize fee stats") - } + err = db.NewLedgerReader(d.db).StreamLedgerRange( + readTxMetaCtx, + ledgerSeqRange.FirstLedgerSeq, + ledgerSeqRange.LastLedgerSeq, + func(txmeta xdr.LedgerCloseMeta) error { + currentSeq = txmeta.LedgerSequence() + if initialSeq == 0 { + initialSeq = currentSeq + d.logger.WithFields(supportlog.F{ + "seq": currentSeq, + }).Info("initializing in-memory store") + } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { + d.logger.WithFields(supportlog.F{ + "seq": currentSeq, + }).Debug("still initializing in-memory store") + } - if applicableRange.IsLedgerIncluded(currentSeq) { - if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil { - d.logger.WithError(err).Fatal("could not run migrations") + if err := feewindows.IngestFees(txmeta); err != nil { + d.logger.WithError(err).Fatal("could not initialize fee stats") } - } - return nil - }) - if err != nil { - d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") - } - if err := dataMigrations.Commit(readTxMetaCtx); err != nil { - d.logger.WithError(err).Fatal("could not commit data migrations") + + if applicableRange.IsLedgerIncluded(currentSeq) { + if err := guardedMigration.Apply(readTxMetaCtx, txmeta); err != nil { + d.logger.WithError(err).Fatal("could not run migrations") + } + } + return nil + }) + if err != nil { + d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") + } + if err := guardedMigration.Commit(readTxMetaCtx); err != nil { + d.logger.WithError(err).Fatal("could not commit migration for: %s", + migrationFactory.MigrationName) + } } if currentSeq != 0 { diff --git a/cmd/soroban-rpc/internal/db/event.go b/cmd/soroban-rpc/internal/db/event.go index fd6c5a7e..7525aa15 100644 --- a/cmd/soroban-rpc/internal/db/event.go +++ b/cmd/soroban-rpc/internal/db/event.go @@ -245,24 +245,20 @@ func (e *eventTableMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta) func newEventTableMigration( logger *log.Entry, - retentionWindow uint32, passphrase string, + ledgerSeqRange *LedgerSeqRange, ) migrationApplierFactory { return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) { - firstLedgerToMigrate := firstLedger writer := &eventHandler{ log: logger, db: db, stmtCache: sq.NewStmtCache(db.GetTx()), passphrase: passphrase, } - if latestLedger > retentionWindow { - firstLedgerToMigrate = latestLedger - retentionWindow - } migration := eventTableMigration{ - firstLedger: firstLedgerToMigrate, - lastLedger: latestLedger, + firstLedger: ledgerSeqRange.FirstLedgerSeq, + lastLedger: ledgerSeqRange.LastLedgerSeq, writer: writer, } return &migration, nil diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 140d04f4..53048beb 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -7,8 +7,11 @@ import ( "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" +) - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config" +const ( + TransactionsMigrationName = "TransactionsTable" + EventsMigrationName = "EventsTable" ) type LedgerSeqRange struct { @@ -63,49 +66,11 @@ type Migration interface { Rollback(ctx context.Context) error } -type multiMigration []Migration - -func (mm multiMigration) ApplicableRange() *LedgerSeqRange { - var result *LedgerSeqRange - for _, m := range mm { - result = m.ApplicableRange().Merge(result) - } - return result -} - -func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { - var err error - for _, m := range mm { - ledgerSeq := meta.LedgerSequence() - if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) { - // The range of a sub-migration can be smaller than the global range. - continue - } - if localErr := m.Apply(ctx, meta); localErr != nil { - err = errors.Join(err, localErr) - } - } - return err -} - -func (mm multiMigration) Commit(ctx context.Context) error { - var err error - for _, m := range mm { - if localErr := m.Commit(ctx); localErr != nil { - err = errors.Join(err, localErr) - } - } - return err -} - -func (mm multiMigration) Rollback(ctx context.Context) error { - var err error - for _, m := range mm { - if localErr := m.Rollback(ctx); localErr != nil { - err = errors.Join(err, localErr) - } - } - return err +type MigrationFactory struct { + Factory migrationApplierFactory + DB *DB + Logger *log.Entry + MigrationName string } // guardedMigration is a db data migration whose application is guarded by a boolean in the meta table @@ -119,7 +84,7 @@ type guardedMigration struct { applyLogged bool } -func newGuardedDataMigration( +func NewGuardedDataMigration( ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB, ) (Migration, error) { migrationDB := &DB{ @@ -205,35 +170,33 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32 }, nil } -func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) { - var migrations []Migration +func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange) ([]MigrationFactory, error) { + var migrations []MigrationFactory - migrationName := "TransactionsTable" - logger = logger.WithField("migration", migrationName) - factory := newTransactionTableMigration( + transactionFactory := newTransactionTableMigration( ctx, - logger, - cfg.HistoryRetentionWindow, - cfg.NetworkPassphrase, + logger.WithField("migration", TransactionsMigrationName), + networkPassphrase, + ledgerSeqRange, ) + migrations = append(migrations, MigrationFactory{ + Factory: transactionFactory, + DB: db, + Logger: logger, + MigrationName: TransactionsMigrationName, + }) - m1, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db) - if err != nil { - return nil, fmt.Errorf("could not create guarded transaction migration: %w", err) - } - migrations = append(migrations, m1) - - eventMigrationName := "EventsTable" eventFactory := newEventTableMigration( - logger.WithField("migration", eventMigrationName), - cfg.HistoryRetentionWindow, - cfg.NetworkPassphrase, + logger.WithField("migration", EventsMigrationName), + networkPassphrase, + ledgerSeqRange, ) - m2, err := newGuardedDataMigration(ctx, eventMigrationName, logger, eventFactory, db) - if err != nil { - return nil, fmt.Errorf("could not create guarded event migration: %w", err) - } - migrations = append(migrations, m2) - - return multiMigration(migrations), nil + migrations = append(migrations, MigrationFactory{ + Factory: eventFactory, + DB: db, + Logger: logger, + MigrationName: EventsMigrationName, + }) + + return migrations, nil } diff --git a/cmd/soroban-rpc/internal/db/transaction.go b/cmd/soroban-rpc/internal/db/transaction.go index 1ef6818b..cafb392a 100644 --- a/cmd/soroban-rpc/internal/db/transaction.go +++ b/cmd/soroban-rpc/internal/db/transaction.go @@ -266,20 +266,19 @@ func (t *transactionTableMigration) Apply(_ context.Context, meta xdr.LedgerClos return t.writer.InsertTransactions(meta) } -func newTransactionTableMigration(ctx context.Context, logger *log.Entry, - retentionWindow uint32, passphrase string, +func newTransactionTableMigration( + ctx context.Context, + logger *log.Entry, + passphrase string, + ledgerSeqRange *LedgerSeqRange, ) migrationApplierFactory { return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) { - firstLedgerToMigrate := uint32(2) //nolint:mnd writer := &transactionHandler{ log: logger, db: db, stmtCache: sq.NewStmtCache(db.GetTx()), passphrase: passphrase, } - if latestLedger > retentionWindow { - firstLedgerToMigrate = latestLedger - retentionWindow - } // Truncate the table, since it may contain data, causing insert conflicts later on. // (the migration was shipped after the actual transactions table change) _, err := db.Exec(ctx, sq.Delete(transactionTableName)) @@ -287,8 +286,8 @@ func newTransactionTableMigration(ctx context.Context, logger *log.Entry, return nil, fmt.Errorf("couldn't delete table %q: %w", transactionTableName, err) } migration := transactionTableMigration{ - firstLedger: firstLedgerToMigrate, - lastLedger: latestLedger, + firstLedger: ledgerSeqRange.FirstLedgerSeq, + lastLedger: ledgerSeqRange.LastLedgerSeq, writer: writer, } return &migration, nil From 72d26ae27a40c49975bf5f8dd95cad84f65fa84b Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Thu, 8 Aug 2024 14:16:59 -0400 Subject: [PATCH 04/24] Fix failing unittest - 1 --- cmd/soroban-rpc/internal/daemon/daemon.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 2ed63e05..ebfd29ac 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -332,7 +332,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows migrationFactory.DB, ) if err != nil { - d.logger.WithError(err).Fatal("could not create guarded migration for: %s", + d.logger.WithError(err).Fatal("could not create guarded migration for: %w", migrationFactory.MigrationName) } @@ -368,7 +368,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") } if err := guardedMigration.Commit(readTxMetaCtx); err != nil { - d.logger.WithError(err).Fatal("could not commit migration for: %s", + d.logger.WithError(err).Fatal("could not commit migration for: %w", migrationFactory.MigrationName) } } From 43e5ec57db6a447d8f033b0d8329188755b61c9f Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Thu, 8 Aug 2024 14:40:20 -0400 Subject: [PATCH 05/24] Fix linting errors - 1 --- cmd/soroban-rpc/internal/db/migration.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 53048beb..ddf5282f 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -170,7 +170,9 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32 }, nil } -func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange) ([]MigrationFactory, error) { +func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, + ledgerSeqRange *LedgerSeqRange, +) ([]MigrationFactory, error) { var migrations []MigrationFactory transactionFactory := newTransactionTableMigration( From 1088de8c067bc5eb78e69175703f82b522416663 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Thu, 8 Aug 2024 17:12:07 -0400 Subject: [PATCH 06/24] Fix failing integration test - 1 --- cmd/soroban-rpc/internal/daemon/daemon.go | 56 ++++++++++++++--------- 1 file changed, 34 insertions(+), 22 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index ebfd29ac..c1bbffa0 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -289,7 +289,7 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { // mustInitializeStorage initializes the storage using what was on the DB func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows { - feewindows := feewindow.NewFeeWindows( + feeWindows := feewindow.NewFeeWindows( cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase, @@ -311,7 +311,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows } maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow) - ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: 0, LastLedgerSeq: latestLedger} + ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: 2, LastLedgerSeq: latestLedger} if latestLedger > maxFeeRetentionWindow { ledgerSeqRange.FirstLedgerSeq = latestLedger - maxFeeRetentionWindow } @@ -322,8 +322,35 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.logger.WithError(err).Fatal("could not build migrations") } + // Apply migration for fee stats in-memory store + err = db.NewLedgerReader(d.db).StreamLedgerRange( + readTxMetaCtx, + ledgerSeqRange.FirstLedgerSeq, + ledgerSeqRange.LastLedgerSeq, + func(txMeta xdr.LedgerCloseMeta) error { + currentSeq = txMeta.LedgerSequence() + if initialSeq == 0 { + initialSeq = currentSeq + d.logger.WithFields(supportlog.F{ + "seq": currentSeq, + }).Info("initializing in-memory store") + } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { + d.logger.WithFields(supportlog.F{ + "seq": currentSeq, + }).Debug("still initializing in-memory store") + } + + if err = feeWindows.IngestFees(txMeta); err != nil { + d.logger.WithError(err).Fatal("could not initialize fee stats") + } + return nil + }) + if err != nil { + d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") + } + + // Apply migration for events and transactions tables for _, migrationFactory := range dataMigrations { - // create guarded migration guardedMigration, err := db.NewGuardedDataMigration( readTxMetaCtx, migrationFactory.MigrationName, @@ -340,25 +367,10 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows readTxMetaCtx, ledgerSeqRange.FirstLedgerSeq, ledgerSeqRange.LastLedgerSeq, - func(txmeta xdr.LedgerCloseMeta) error { - currentSeq = txmeta.LedgerSequence() - if initialSeq == 0 { - initialSeq = currentSeq - d.logger.WithFields(supportlog.F{ - "seq": currentSeq, - }).Info("initializing in-memory store") - } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { - d.logger.WithFields(supportlog.F{ - "seq": currentSeq, - }).Debug("still initializing in-memory store") - } - - if err := feewindows.IngestFees(txmeta); err != nil { - d.logger.WithError(err).Fatal("could not initialize fee stats") - } - + func(txMeta xdr.LedgerCloseMeta) error { + currentSeq = txMeta.LedgerSequence() if applicableRange.IsLedgerIncluded(currentSeq) { - if err := guardedMigration.Apply(readTxMetaCtx, txmeta); err != nil { + if err := guardedMigration.Apply(readTxMetaCtx, txMeta); err != nil { d.logger.WithError(err).Fatal("could not run migrations") } } @@ -379,7 +391,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows }).Info("finished initializing in-memory store and applying DB data migrations") } - return feewindows + return feeWindows } func (d *Daemon) Run() { From 9cabba6ee0bbdc3314cdcbb7cacf0fc530f06a0b Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 09:57:02 -0400 Subject: [PATCH 07/24] Remove %w from Fatal strings --- cmd/soroban-rpc/internal/daemon/daemon.go | 17 ++++++++--------- 1 file changed, 8 insertions(+), 9 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index c1bbffa0..2a226d66 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -359,7 +359,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows migrationFactory.DB, ) if err != nil { - d.logger.WithError(err).Fatal("could not create guarded migration for: %w", + d.logger.WithError(err).Fatal("could not create guarded migration for: ", migrationFactory.MigrationName) } @@ -369,20 +369,19 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows ledgerSeqRange.LastLedgerSeq, func(txMeta xdr.LedgerCloseMeta) error { currentSeq = txMeta.LedgerSequence() - if applicableRange.IsLedgerIncluded(currentSeq) { - if err := guardedMigration.Apply(readTxMetaCtx, txMeta); err != nil { - d.logger.WithError(err).Fatal("could not run migrations") - } + if err := guardedMigration.Apply(readTxMetaCtx, txMeta); err != nil { + d.logger.WithError(err).Fatal("could not run migrations for: ", + migrationFactory.MigrationName) } return nil }) if err != nil { - d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") - } - if err := guardedMigration.Commit(readTxMetaCtx); err != nil { - d.logger.WithError(err).Fatal("could not commit migration for: %w", + d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database for: ", migrationFactory.MigrationName) } + if err = guardedMigration.Commit(readTxMetaCtx); err != nil { + d.logger.WithError(err).Fatal("could not commit migration for: ", migrationFactory.MigrationName) + } } if currentSeq != 0 { From 3ad93889ea1b30ef07efb43df3e91abeebf74b9c Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 10:38:17 -0400 Subject: [PATCH 08/24] refactor migrationApplierFactoryF --- cmd/soroban-rpc/internal/db/event.go | 16 +++++++--------- cmd/soroban-rpc/internal/db/migration.go | 15 +++++---------- cmd/soroban-rpc/internal/db/transaction.go | 15 +++++++-------- 3 files changed, 19 insertions(+), 27 deletions(-) diff --git a/cmd/soroban-rpc/internal/db/event.go b/cmd/soroban-rpc/internal/db/event.go index 6ffb104f..5ad9f0e1 100644 --- a/cmd/soroban-rpc/internal/db/event.go +++ b/cmd/soroban-rpc/internal/db/event.go @@ -324,18 +324,16 @@ func newEventTableMigration( passphrase string, ledgerSeqRange *LedgerSeqRange, ) migrationApplierFactory { - return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) { - writer := &eventHandler{ - log: logger, - db: db, - stmtCache: sq.NewStmtCache(db.GetTx()), - passphrase: passphrase, - } - + return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) { migration := eventTableMigration{ firstLedger: ledgerSeqRange.FirstLedgerSeq, lastLedger: ledgerSeqRange.LastLedgerSeq, - writer: writer, + writer: &eventHandler{ + log: logger, + db: db, + stmtCache: sq.NewStmtCache(db.GetTx()), + passphrase: passphrase, + }, } return &migration, nil }) diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index ddf5282f..c1c18dff 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -51,13 +51,13 @@ type MigrationApplier interface { } type migrationApplierFactory interface { - New(db *DB, latestLedger uint32) (MigrationApplier, error) + New(db *DB) (MigrationApplier, error) } -type migrationApplierFactoryF func(db *DB, latestLedger uint32) (MigrationApplier, error) +type migrationApplierFactoryF func(db *DB) (MigrationApplier, error) -func (m migrationApplierFactoryF) New(db *DB, latestLedger uint32) (MigrationApplier, error) { - return m(db, latestLedger) +func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) { + return m(db) } type Migration interface { @@ -100,12 +100,7 @@ func NewGuardedDataMigration( err = errors.Join(err, migrationDB.Rollback()) return nil, err } - latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) - if err != nil && !errors.Is(err, ErrEmptyDB) { - err = errors.Join(err, migrationDB.Rollback()) - return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err) - } - applier, err := factory.New(migrationDB, latestLedger) + applier, err := factory.New(migrationDB) if err != nil { err = errors.Join(err, migrationDB.Rollback()) return nil, err diff --git a/cmd/soroban-rpc/internal/db/transaction.go b/cmd/soroban-rpc/internal/db/transaction.go index cafb392a..931d6715 100644 --- a/cmd/soroban-rpc/internal/db/transaction.go +++ b/cmd/soroban-rpc/internal/db/transaction.go @@ -272,13 +272,7 @@ func newTransactionTableMigration( passphrase string, ledgerSeqRange *LedgerSeqRange, ) migrationApplierFactory { - return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) { - writer := &transactionHandler{ - log: logger, - db: db, - stmtCache: sq.NewStmtCache(db.GetTx()), - passphrase: passphrase, - } + return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) { // Truncate the table, since it may contain data, causing insert conflicts later on. // (the migration was shipped after the actual transactions table change) _, err := db.Exec(ctx, sq.Delete(transactionTableName)) @@ -288,7 +282,12 @@ func newTransactionTableMigration( migration := transactionTableMigration{ firstLedger: ledgerSeqRange.FirstLedgerSeq, lastLedger: ledgerSeqRange.LastLedgerSeq, - writer: writer, + writer: &transactionHandler{ + log: logger, + db: db, + stmtCache: sq.NewStmtCache(db.GetTx()), + passphrase: passphrase, + }, } return &migration, nil }) From 110b1581f3ffe508355df2d70e1d37144fa09470 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 12:12:17 -0400 Subject: [PATCH 09/24] Add ledger seq to fatal error string --- cmd/soroban-rpc/internal/daemon/daemon.go | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 2a226d66..5ec6116a 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -317,11 +317,6 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows } ledgerSeqRange = ledgerSeqRange.Merge(applicableRange) - dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) - if err != nil { - d.logger.WithError(err).Fatal("could not build migrations") - } - // Apply migration for fee stats in-memory store err = db.NewLedgerReader(d.db).StreamLedgerRange( readTxMetaCtx, @@ -349,6 +344,11 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") } + dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) + if err != nil { + d.logger.WithError(err).Fatal("could not build migrations") + } + // Apply migration for events and transactions tables for _, migrationFactory := range dataMigrations { guardedMigration, err := db.NewGuardedDataMigration( @@ -370,8 +370,8 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows func(txMeta xdr.LedgerCloseMeta) error { currentSeq = txMeta.LedgerSequence() if err := guardedMigration.Apply(readTxMetaCtx, txMeta); err != nil { - d.logger.WithError(err).Fatal("could not run migrations for: ", - migrationFactory.MigrationName) + d.logger.WithError(err).Fatal("could not apply migration for ledger: ", + currentSeq, " and table: ", migrationFactory.MigrationName) } return nil }) From de1b40daaa024463be8eb6ddc71e0171f04ed69f Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 14:04:57 -0400 Subject: [PATCH 10/24] Add comments - 1 --- cmd/soroban-rpc/internal/daemon/daemon.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 5ec6116a..5bcab9cf 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -315,6 +315,8 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows if latestLedger > maxFeeRetentionWindow { ledgerSeqRange.FirstLedgerSeq = latestLedger - maxFeeRetentionWindow } + + // Combine the ledger range for fees, events and transactions ledgerSeqRange = ledgerSeqRange.Merge(applicableRange) // Apply migration for fee stats in-memory store From 234fc21e1aaa02d784dbba73f86b8f01116b64b5 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 15:02:11 -0400 Subject: [PATCH 11/24] Fix - 1 --- cmd/soroban-rpc/internal/db/migration.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index c1c18dff..d726ccc8 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -137,8 +137,9 @@ func (g *guardedMigration) ApplicableRange() *LedgerSeqRange { func (g *guardedMigration) Commit(ctx context.Context) error { if g.alreadyMigrated { - return nil + return g.Rollback(ctx) } + err := setMetaBool(ctx, g.db, g.guardMetaKey, true) if err != nil { return errors.Join(err, g.Rollback(ctx)) From 949f9ef38d95028594a7066c26c744ddb0e436db Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 17:26:24 -0400 Subject: [PATCH 12/24] Optimise migrations - 1 --- cmd/soroban-rpc/internal/daemon/daemon.go | 87 ++++++++-------- cmd/soroban-rpc/internal/db/event.go | 1 + cmd/soroban-rpc/internal/db/migration.go | 120 +++++++++++++--------- 3 files changed, 121 insertions(+), 87 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 5bcab9cf..7b2c0573 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -346,53 +346,58 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") } - dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) + err = d.db.Begin(readTxMetaCtx) if err != nil { - d.logger.WithError(err).Fatal("could not build migrations") + d.logger.WithError(err).Fatal("could not begin db transaction") } - - // Apply migration for events and transactions tables - for _, migrationFactory := range dataMigrations { - guardedMigration, err := db.NewGuardedDataMigration( - readTxMetaCtx, - migrationFactory.MigrationName, - migrationFactory.Logger, - migrationFactory.Factory, - migrationFactory.DB, - ) - if err != nil { - d.logger.WithError(err).Fatal("could not create guarded migration for: ", - migrationFactory.MigrationName) + defer func() { + commitErr := d.db.Commit() + if commitErr != nil { + d.logger.WithError(commitErr).Fatal("could not commit migrations") } + }() - err = db.NewLedgerReader(d.db).StreamLedgerRange( - readTxMetaCtx, - ledgerSeqRange.FirstLedgerSeq, - ledgerSeqRange.LastLedgerSeq, - func(txMeta xdr.LedgerCloseMeta) error { - currentSeq = txMeta.LedgerSequence() - if err := guardedMigration.Apply(readTxMetaCtx, txMeta); err != nil { - d.logger.WithError(err).Fatal("could not apply migration for ledger: ", - currentSeq, " and table: ", migrationFactory.MigrationName) - } - return nil - }) - if err != nil { - d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database for: ", - migrationFactory.MigrationName) - } - if err = guardedMigration.Commit(readTxMetaCtx); err != nil { - d.logger.WithError(err).Fatal("could not commit migration for: ", migrationFactory.MigrationName) - } - } - - if currentSeq != 0 { - d.logger.WithFields(supportlog.F{ - "seq": currentSeq, - }).Info("finished initializing in-memory store and applying DB data migrations") + dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) + if err != nil { + d.logger.WithError(err).Fatal("could not build migrations") } - return feeWindows + // Apply migration for events and transactions tables + //guardedMigration, err := db.NewGuardedDataMigration( + // readTxMetaCtx, + // migrationFactory.MigrationName, + // migrationFactory.Logger, + // migrationFactory.Factory, + // migrationFactory.DB, + //) + //if err != nil { + // d.logger.WithError(err).Fatal("could not create guarded migration for: ", + // migrationFactory.MigrationName) + //} + // + //err = db.NewLedgerReader(d.db).StreamLedgerRange( + // readTxMetaCtx, + // ledgerSeqRange.FirstLedgerSeq, + // ledgerSeqRange.LastLedgerSeq, + // func(txMeta xdr.LedgerCloseMeta) error { + // currentSeq = txMeta.LedgerSequence() + // if err := guardedMigration.Apply(readTxMetaCtx, txMeta); err != nil { + // d.logger.WithError(err).Fatal("could not apply migration for ledger: ", + // currentSeq, " and table: ", migrationFactory.MigrationName) + // } + // return nil + // }) + //if err != nil { + // d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") + //} + // + //if currentSeq != 0 { + // d.logger.WithFields(supportlog.F{ + // "seq": currentSeq, + // }).Info("finished initializing in-memory store and applying DB data migrations") + //} + // + //return feeWindows } func (d *Daemon) Run() { diff --git a/cmd/soroban-rpc/internal/db/event.go b/cmd/soroban-rpc/internal/db/event.go index 673b7b2f..35979d5a 100644 --- a/cmd/soroban-rpc/internal/db/event.go +++ b/cmd/soroban-rpc/internal/db/event.go @@ -319,6 +319,7 @@ func (e *eventTableMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta) } func newEventTableMigration( + _ context.Context, logger *log.Entry, passphrase string, ledgerSeqRange *LedgerSeqRange, diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index d726ccc8..ac0686e5 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -14,6 +14,11 @@ const ( EventsMigrationName = "EventsTable" ) +var migrationNameToFunc = map[string]migrationApplierF{ + TransactionsMigrationName: newTransactionTableMigration, + EventsMigrationName: newEventTableMigration, +} + type LedgerSeqRange struct { FirstLedgerSeq uint32 LastLedgerSeq uint32 @@ -56,6 +61,8 @@ type migrationApplierFactory interface { type migrationApplierFactoryF func(db *DB) (MigrationApplier, error) +type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory + func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) { return m(db) } @@ -66,11 +73,49 @@ type Migration interface { Rollback(ctx context.Context) error } -type MigrationFactory struct { - Factory migrationApplierFactory - DB *DB - Logger *log.Entry - MigrationName string +type multiMigration []Migration + +func (mm multiMigration) ApplicableRange() *LedgerSeqRange { + var result *LedgerSeqRange + for _, m := range mm { + result = m.ApplicableRange().Merge(result) + } + return result +} + +func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { + var err error + for _, m := range mm { + ledgerSeq := meta.LedgerSequence() + if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) { + // The range of a sub-migration can be smaller than the global range. + continue + } + if localErr := m.Apply(ctx, meta); localErr != nil { + err = errors.Join(err, localErr) + } + } + return err +} + +func (mm multiMigration) Commit(ctx context.Context) error { + var err error + for _, m := range mm { + if localErr := m.Commit(ctx); localErr != nil { + err = errors.Join(err, localErr) + } + } + return err +} + +func (mm multiMigration) Rollback(ctx context.Context) error { + var err error + for _, m := range mm { + if localErr := m.Rollback(ctx); localErr != nil { + err = errors.Join(err, localErr) + } + } + return err } // guardedMigration is a db data migration whose application is guarded by a boolean in the meta table @@ -84,30 +129,23 @@ type guardedMigration struct { applyLogged bool } -func NewGuardedDataMigration( +func newGuardedDataMigration( ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB, ) (Migration, error) { - migrationDB := &DB{ - cache: db.cache, - SessionInterface: db.SessionInterface.Clone(), - } - if err := migrationDB.Begin(ctx); err != nil { - return nil, err - } metaKey := "Migration" + uniqueMigrationName + "Done" - previouslyMigrated, err := getMetaBool(ctx, migrationDB, metaKey) + previouslyMigrated, err := getMetaBool(ctx, db, metaKey) if err != nil && !errors.Is(err, ErrEmptyDB) { - err = errors.Join(err, migrationDB.Rollback()) + err = errors.Join(err, db.Rollback()) return nil, err } - applier, err := factory.New(migrationDB) + applier, err := factory.New(db) if err != nil { - err = errors.Join(err, migrationDB.Rollback()) + err = errors.Join(err, db.Rollback()) return nil, err } guardedMigration := &guardedMigration{ guardMetaKey: metaKey, - db: migrationDB, + db: db, migration: applier, alreadyMigrated: previouslyMigrated, logger: logger, @@ -168,33 +206,23 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32 func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange, -) ([]MigrationFactory, error) { - var migrations []MigrationFactory - - transactionFactory := newTransactionTableMigration( - ctx, - logger.WithField("migration", TransactionsMigrationName), - networkPassphrase, - ledgerSeqRange, - ) - migrations = append(migrations, MigrationFactory{ - Factory: transactionFactory, - DB: db, - Logger: logger, - MigrationName: TransactionsMigrationName, - }) - - eventFactory := newEventTableMigration( - logger.WithField("migration", EventsMigrationName), - networkPassphrase, - ledgerSeqRange, - ) - migrations = append(migrations, MigrationFactory{ - Factory: eventFactory, - DB: db, - Logger: logger, - MigrationName: EventsMigrationName, - }) - +) ([]Migration, error) { + var migrations []Migration + + for _, migrationName := range []string{TransactionsMigrationName, EventsMigrationName} { + migrationLogger := logger.WithField("migration", migrationName) + factory := migrationNameToFunc[migrationName]( + ctx, + migrationLogger, + networkPassphrase, + ledgerSeqRange, + ) + + guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db) + if err != nil { + return nil, fmt.Errorf("could not create guarded migration for %s: %w", migrationName, err) + } + migrations = append(migrations, guardedM) + } return migrations, nil } From 54f002f9b0563d4ec583940338fceb0e3adfe0b9 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 18:11:34 -0400 Subject: [PATCH 13/24] Optimise migrations - 2 --- cmd/soroban-rpc/internal/daemon/daemon.go | 72 +++++++++++------------ cmd/soroban-rpc/internal/db/migration.go | 23 +++----- 2 files changed, 42 insertions(+), 53 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 7b2c0573..a15f8f11 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -346,14 +346,15 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") } + // Start a common db transaction for the entire migration duration err = d.db.Begin(readTxMetaCtx) if err != nil { - d.logger.WithError(err).Fatal("could not begin db transaction") + d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback()) } defer func() { - commitErr := d.db.Commit() - if commitErr != nil { - d.logger.WithError(commitErr).Fatal("could not commit migrations") + err = d.db.Commit() + if err != nil { + d.logger.WithError(err).Fatal("could not commit database transaction") } }() @@ -363,41 +364,34 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows } // Apply migration for events and transactions tables - //guardedMigration, err := db.NewGuardedDataMigration( - // readTxMetaCtx, - // migrationFactory.MigrationName, - // migrationFactory.Logger, - // migrationFactory.Factory, - // migrationFactory.DB, - //) - //if err != nil { - // d.logger.WithError(err).Fatal("could not create guarded migration for: ", - // migrationFactory.MigrationName) - //} - // - //err = db.NewLedgerReader(d.db).StreamLedgerRange( - // readTxMetaCtx, - // ledgerSeqRange.FirstLedgerSeq, - // ledgerSeqRange.LastLedgerSeq, - // func(txMeta xdr.LedgerCloseMeta) error { - // currentSeq = txMeta.LedgerSequence() - // if err := guardedMigration.Apply(readTxMetaCtx, txMeta); err != nil { - // d.logger.WithError(err).Fatal("could not apply migration for ledger: ", - // currentSeq, " and table: ", migrationFactory.MigrationName) - // } - // return nil - // }) - //if err != nil { - // d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") - //} - // - //if currentSeq != 0 { - // d.logger.WithFields(supportlog.F{ - // "seq": currentSeq, - // }).Info("finished initializing in-memory store and applying DB data migrations") - //} - // - //return feeWindows + err = db.NewLedgerReader(d.db).StreamLedgerRange( + readTxMetaCtx, + ledgerSeqRange.FirstLedgerSeq, + ledgerSeqRange.LastLedgerSeq, + func(txMeta xdr.LedgerCloseMeta) error { + if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil { + dbErr := dataMigrations.Rollback(readTxMetaCtx) + if dbErr != nil { + d.logger.WithError(dbErr).Fatal("could not rollback migration for ledger: ", txMeta.LedgerSequence()) + } + d.logger.WithError(err).Fatal("could not apply migration for ledger: ", txMeta.LedgerSequence()) + } + return nil + }) + if err != nil { + d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") + } + if err := dataMigrations.Commit(readTxMetaCtx); err != nil { + d.logger.WithError(err).Fatal("could not commit data migrations") + } + + if currentSeq != 0 { + d.logger.WithFields(supportlog.F{ + "seq": currentSeq, + }).Info("finished initializing in-memory store and applying DB data migrations") + } + + return feeWindows } func (d *Daemon) Run() { diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index ac0686e5..ec7c1e15 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -73,9 +73,9 @@ type Migration interface { Rollback(ctx context.Context) error } -type multiMigration []Migration +type MultiMigration []Migration -func (mm multiMigration) ApplicableRange() *LedgerSeqRange { +func (mm MultiMigration) ApplicableRange() *LedgerSeqRange { var result *LedgerSeqRange for _, m := range mm { result = m.ApplicableRange().Merge(result) @@ -83,7 +83,7 @@ func (mm multiMigration) ApplicableRange() *LedgerSeqRange { return result } -func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { +func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { var err error for _, m := range mm { ledgerSeq := meta.LedgerSequence() @@ -98,7 +98,7 @@ func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) er return err } -func (mm multiMigration) Commit(ctx context.Context) error { +func (mm MultiMigration) Commit(ctx context.Context) error { var err error for _, m := range mm { if localErr := m.Commit(ctx); localErr != nil { @@ -108,7 +108,7 @@ func (mm multiMigration) Commit(ctx context.Context) error { return err } -func (mm multiMigration) Rollback(ctx context.Context) error { +func (mm MultiMigration) Rollback(ctx context.Context) error { var err error for _, m := range mm { if localErr := m.Rollback(ctx); localErr != nil { @@ -175,14 +175,9 @@ func (g *guardedMigration) ApplicableRange() *LedgerSeqRange { func (g *guardedMigration) Commit(ctx context.Context) error { if g.alreadyMigrated { - return g.Rollback(ctx) - } - - err := setMetaBool(ctx, g.db, g.guardMetaKey, true) - if err != nil { - return errors.Join(err, g.Rollback(ctx)) + return nil } - return g.db.Commit() + return setMetaBool(ctx, g.db, g.guardMetaKey, true) } func (g *guardedMigration) Rollback(_ context.Context) error { @@ -206,8 +201,8 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32 func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange, -) ([]Migration, error) { - var migrations []Migration +) (MultiMigration, error) { + var migrations MultiMigration for _, migrationName := range []string{TransactionsMigrationName, EventsMigrationName} { migrationLogger := logger.WithField("migration", migrationName) From 0caef161f3001669fff43ace89c1b03b268bb807 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 18:14:57 -0400 Subject: [PATCH 14/24] Optimise migrations - 3 --- cmd/soroban-rpc/internal/daemon/daemon.go | 50 +++++++++-------------- 1 file changed, 19 insertions(+), 31 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index a15f8f11..026c419d 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -309,7 +309,6 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows if err != nil && !errors.Is(err, db.ErrEmptyDB) { d.logger.WithError(err).Fatal("failed to get latest ledger sequence: %w", err) } - maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow) ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: 2, LastLedgerSeq: latestLedger} if latestLedger > maxFeeRetentionWindow { @@ -319,33 +318,6 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows // Combine the ledger range for fees, events and transactions ledgerSeqRange = ledgerSeqRange.Merge(applicableRange) - // Apply migration for fee stats in-memory store - err = db.NewLedgerReader(d.db).StreamLedgerRange( - readTxMetaCtx, - ledgerSeqRange.FirstLedgerSeq, - ledgerSeqRange.LastLedgerSeq, - func(txMeta xdr.LedgerCloseMeta) error { - currentSeq = txMeta.LedgerSequence() - if initialSeq == 0 { - initialSeq = currentSeq - d.logger.WithFields(supportlog.F{ - "seq": currentSeq, - }).Info("initializing in-memory store") - } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { - d.logger.WithFields(supportlog.F{ - "seq": currentSeq, - }).Debug("still initializing in-memory store") - } - - if err = feeWindows.IngestFees(txMeta); err != nil { - d.logger.WithError(err).Fatal("could not initialize fee stats") - } - return nil - }) - if err != nil { - d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") - } - // Start a common db transaction for the entire migration duration err = d.db.Begin(readTxMetaCtx) if err != nil { @@ -363,18 +335,34 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.logger.WithError(err).Fatal("could not build migrations") } - // Apply migration for events and transactions tables + // Apply migration for events, transactions and fee stats err = db.NewLedgerReader(d.db).StreamLedgerRange( readTxMetaCtx, ledgerSeqRange.FirstLedgerSeq, ledgerSeqRange.LastLedgerSeq, func(txMeta xdr.LedgerCloseMeta) error { + currentSeq = txMeta.LedgerSequence() + if initialSeq == 0 { + initialSeq = currentSeq + d.logger.WithFields(supportlog.F{ + "seq": currentSeq, + }).Info("initializing in-memory store") + } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { + d.logger.WithFields(supportlog.F{ + "seq": currentSeq, + }).Debug("still initializing in-memory store") + } + + if err = feeWindows.IngestFees(txMeta); err != nil { + d.logger.WithError(err).Fatal("could not initialize fee stats") + } + if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil { dbErr := dataMigrations.Rollback(readTxMetaCtx) if dbErr != nil { - d.logger.WithError(dbErr).Fatal("could not rollback migration for ledger: ", txMeta.LedgerSequence()) + d.logger.WithError(dbErr).Fatal("could not rollback migration for ledger: ", currentSeq) } - d.logger.WithError(err).Fatal("could not apply migration for ledger: ", txMeta.LedgerSequence()) + d.logger.WithError(err).Fatal("could not apply migration for ledger: ", currentSeq) } return nil }) From 1daaee0b766eeca4240f0002717ddc82a4d0a7a0 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 18:23:52 -0400 Subject: [PATCH 15/24] Fix linting - 1 --- cmd/soroban-rpc/internal/daemon/daemon.go | 3 ++- cmd/soroban-rpc/internal/db/migration.go | 20 ++++++++++---------- 2 files changed, 12 insertions(+), 11 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 026c419d..1ebb7c21 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -39,6 +39,7 @@ const ( defaultReadTimeout = 5 * time.Second defaultShutdownGracePeriod = 10 * time.Second inMemoryInitializationLedgerLogPeriod = 1_000_000 + firstLedger = 2 ) type Daemon struct { @@ -310,7 +311,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.logger.WithError(err).Fatal("failed to get latest ledger sequence: %w", err) } maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow) - ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: 2, LastLedgerSeq: latestLedger} + ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: firstLedger, LastLedgerSeq: latestLedger} if latestLedger > maxFeeRetentionWindow { ledgerSeqRange.FirstLedgerSeq = latestLedger - maxFeeRetentionWindow } diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index ec7c1e15..cfb5f74f 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -10,15 +10,10 @@ import ( ) const ( - TransactionsMigrationName = "TransactionsTable" - EventsMigrationName = "EventsTable" + transactionsMigrationName = "TransactionsTable" + eventsMigrationName = "EventsTable" ) -var migrationNameToFunc = map[string]migrationApplierF{ - TransactionsMigrationName: newTransactionTableMigration, - EventsMigrationName: newEventTableMigration, -} - type LedgerSeqRange struct { FirstLedgerSeq uint32 LastLedgerSeq uint32 @@ -202,11 +197,16 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32 func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange, ) (MultiMigration, error) { - var migrations MultiMigration + migrations := make(MultiMigration, 0, 2) + + migrationNameToFunc := map[string]migrationApplierF{ + transactionsMigrationName: newTransactionTableMigration, + eventsMigrationName: newEventTableMigration, + } - for _, migrationName := range []string{TransactionsMigrationName, EventsMigrationName} { + for migrationName, migrationFunc := range migrationNameToFunc { migrationLogger := logger.WithField("migration", migrationName) - factory := migrationNameToFunc[migrationName]( + factory := migrationFunc( ctx, migrationLogger, networkPassphrase, From 007c9e00ad4f30e22cbde30e7882e7dc489014b0 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Fri, 9 Aug 2024 18:25:42 -0400 Subject: [PATCH 16/24] Fix linting - 2 --- cmd/soroban-rpc/internal/db/migration.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index cfb5f74f..cc4e84c9 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -12,6 +12,7 @@ import ( const ( transactionsMigrationName = "TransactionsTable" eventsMigrationName = "EventsTable" + numMigrations = 2 ) type LedgerSeqRange struct { @@ -197,7 +198,7 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32 func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange, ) (MultiMigration, error) { - migrations := make(MultiMigration, 0, 2) + migrations := make(MultiMigration, 0, numMigrations) migrationNameToFunc := map[string]migrationApplierF{ transactionsMigrationName: newTransactionTableMigration, From 314790f37db33e763fc96db0a2e32ddb073f5f32 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 12 Aug 2024 14:56:56 -0400 Subject: [PATCH 17/24] Remove dupicate latest ledger fetch code --- cmd/soroban-rpc/internal/daemon/daemon.go | 13 ++++--------- 1 file changed, 4 insertions(+), 9 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 1ebb7c21..56a5e9c8 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -305,19 +305,14 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows d.logger.WithError(err).Fatal("could not get ledger range for migration") } - // Merge migrations range and fee stats range to get the applicable range - latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(readTxMetaCtx) - if err != nil && !errors.Is(err, db.ErrEmptyDB) { - d.logger.WithError(err).Fatal("failed to get latest ledger sequence: %w", err) - } maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow) - ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: firstLedger, LastLedgerSeq: latestLedger} - if latestLedger > maxFeeRetentionWindow { - ledgerSeqRange.FirstLedgerSeq = latestLedger - maxFeeRetentionWindow + feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow) + if err != nil { + d.logger.WithError(err).Fatal("could not get ledger range for fee stats") } // Combine the ledger range for fees, events and transactions - ledgerSeqRange = ledgerSeqRange.Merge(applicableRange) + ledgerSeqRange := feeStatsRange.Merge(applicableRange) // Start a common db transaction for the entire migration duration err = d.db.Begin(readTxMetaCtx) From 1cdafffcf895bdd7ba114b507148328fa32dd32e Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 12 Aug 2024 15:45:56 -0400 Subject: [PATCH 18/24] Rollback db in daemon instead of migration --- cmd/soroban-rpc/internal/daemon/daemon.go | 14 +++++--------- cmd/soroban-rpc/internal/db/migration.go | 17 ----------------- 2 files changed, 5 insertions(+), 26 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 56a5e9c8..fc097e87 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -328,7 +328,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) if err != nil { - d.logger.WithError(err).Fatal("could not build migrations") + d.logger.WithError(err).Fatal("could not build migrations: ", d.db.Rollback()) } // Apply migration for events, transactions and fee stats @@ -350,23 +350,19 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows } if err = feeWindows.IngestFees(txMeta); err != nil { - d.logger.WithError(err).Fatal("could not initialize fee stats") + d.logger.WithError(err).Fatal("could not initialize fee stats: ", d.db.Rollback()) } if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil { - dbErr := dataMigrations.Rollback(readTxMetaCtx) - if dbErr != nil { - d.logger.WithError(dbErr).Fatal("could not rollback migration for ledger: ", currentSeq) - } - d.logger.WithError(err).Fatal("could not apply migration for ledger: ", currentSeq) + d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq, ": ", d.db.Rollback()) } return nil }) if err != nil { - d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") + d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database: ", d.db.Rollback()) } if err := dataMigrations.Commit(readTxMetaCtx); err != nil { - d.logger.WithError(err).Fatal("could not commit data migrations") + d.logger.WithError(err).Fatal("could not commit data migrations ", d.db.Rollback()) } if currentSeq != 0 { diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index cc4e84c9..8d45271f 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -66,7 +66,6 @@ func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) { type Migration interface { MigrationApplier Commit(ctx context.Context) error - Rollback(ctx context.Context) error } type MultiMigration []Migration @@ -104,16 +103,6 @@ func (mm MultiMigration) Commit(ctx context.Context) error { return err } -func (mm MultiMigration) Rollback(ctx context.Context) error { - var err error - for _, m := range mm { - if localErr := m.Rollback(ctx); localErr != nil { - err = errors.Join(err, localErr) - } - } - return err -} - // guardedMigration is a db data migration whose application is guarded by a boolean in the meta table // (after the migration is applied the boolean is set to true, so that the migration is not applied again) type guardedMigration struct { @@ -131,12 +120,10 @@ func newGuardedDataMigration( metaKey := "Migration" + uniqueMigrationName + "Done" previouslyMigrated, err := getMetaBool(ctx, db, metaKey) if err != nil && !errors.Is(err, ErrEmptyDB) { - err = errors.Join(err, db.Rollback()) return nil, err } applier, err := factory.New(db) if err != nil { - err = errors.Join(err, db.Rollback()) return nil, err } guardedMigration := &guardedMigration{ @@ -176,10 +163,6 @@ func (g *guardedMigration) Commit(ctx context.Context) error { return setMetaBool(ctx, g.db, g.guardMetaKey, true) } -func (g *guardedMigration) Rollback(_ context.Context) error { - return g.db.Rollback() -} - func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) { firstLedgerToMigrate := firstLedger latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) From 48d386cade3a7554029b3705e219d50538447ffa Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 12 Aug 2024 17:02:50 -0400 Subject: [PATCH 19/24] Remove unused constant --- cmd/soroban-rpc/internal/daemon/daemon.go | 1 - 1 file changed, 1 deletion(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index fc097e87..1a290b36 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -39,7 +39,6 @@ const ( defaultReadTimeout = 5 * time.Second defaultShutdownGracePeriod = 10 * time.Second inMemoryInitializationLedgerLogPeriod = 1_000_000 - firstLedger = 2 ) type Daemon struct { From d8252d233a358464ad6361067a52099ef9cd2f7f Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Mon, 12 Aug 2024 17:29:25 -0400 Subject: [PATCH 20/24] Remove unused constant - 2 --- cmd/soroban-rpc/internal/db/migration.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 8d45271f..6e9b0806 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -12,7 +12,6 @@ import ( const ( transactionsMigrationName = "TransactionsTable" eventsMigrationName = "EventsTable" - numMigrations = 2 ) type LedgerSeqRange struct { @@ -181,13 +180,13 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32 func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange, ) (MultiMigration, error) { - migrations := make(MultiMigration, 0, numMigrations) - migrationNameToFunc := map[string]migrationApplierF{ transactionsMigrationName: newTransactionTableMigration, eventsMigrationName: newEventTableMigration, } + migrations := make(MultiMigration, 0, len(migrationNameToFunc)) + for migrationName, migrationFunc := range migrationNameToFunc { migrationLogger := logger.WithField("migration", migrationName) factory := migrationFunc( From 4a84f7754203e56a07f20af6d7ede2600c941bde Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 13 Aug 2024 13:53:51 -0400 Subject: [PATCH 21/24] Add rollback() statement --- cmd/soroban-rpc/internal/daemon/daemon.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 1a290b36..4104f9b1 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -321,7 +321,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows defer func() { err = d.db.Commit() if err != nil { - d.logger.WithError(err).Fatal("could not commit database transaction") + d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback()) } }() From 44eb37543c00b5d3c690c6a2007ef1a1e5c576af Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Tue, 13 Aug 2024 13:54:26 -0400 Subject: [PATCH 22/24] Small change --- cmd/soroban-rpc/internal/db/migration.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 6e9b0806..1f586989 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -50,14 +50,14 @@ type MigrationApplier interface { Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error } +type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory + type migrationApplierFactory interface { New(db *DB) (MigrationApplier, error) } type migrationApplierFactoryF func(db *DB) (MigrationApplier, error) -type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory - func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) { return m(db) } From fb738bf0bd53930aaca8641edd29b3c1a27d6a41 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 14 Aug 2024 16:44:24 -0400 Subject: [PATCH 23/24] Abstract transaction and rollback management inside migration code --- cmd/soroban-rpc/internal/daemon/daemon.go | 23 ++++--------- cmd/soroban-rpc/internal/db/migration.go | 33 +++++++++++++------ .../internal/feewindow/feewindow.go | 14 +++++--- 3 files changed, 38 insertions(+), 32 deletions(-) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 4104f9b1..7d3d0adb 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -293,6 +293,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase, + d.db, ) readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout) @@ -313,21 +314,9 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows // Combine the ledger range for fees, events and transactions ledgerSeqRange := feeStatsRange.Merge(applicableRange) - // Start a common db transaction for the entire migration duration - err = d.db.Begin(readTxMetaCtx) - if err != nil { - d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback()) - } - defer func() { - err = d.db.Commit() - if err != nil { - d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback()) - } - }() - dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) if err != nil { - d.logger.WithError(err).Fatal("could not build migrations: ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not build migrations") } // Apply migration for events, transactions and fee stats @@ -349,19 +338,19 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows } if err = feeWindows.IngestFees(txMeta); err != nil { - d.logger.WithError(err).Fatal("could not initialize fee stats: ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not initialize fee stats") } if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil { - d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq, ": ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq) } return nil }) if err != nil { - d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database: ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") } if err := dataMigrations.Commit(readTxMetaCtx); err != nil { - d.logger.WithError(err).Fatal("could not commit data migrations ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not commit data migrations") } if currentSeq != 0 { diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 1f586989..69df9de2 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -67,11 +67,14 @@ type Migration interface { Commit(ctx context.Context) error } -type MultiMigration []Migration +type MultiMigration struct { + migrations []Migration + db *DB +} func (mm MultiMigration) ApplicableRange() *LedgerSeqRange { var result *LedgerSeqRange - for _, m := range mm { + for _, m := range mm.migrations { result = m.ApplicableRange().Merge(result) } return result @@ -79,14 +82,14 @@ func (mm MultiMigration) ApplicableRange() *LedgerSeqRange { func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { var err error - for _, m := range mm { + for _, m := range mm.migrations { ledgerSeq := meta.LedgerSequence() if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) { // The range of a sub-migration can be smaller than the global range. continue } if localErr := m.Apply(ctx, meta); localErr != nil { - err = errors.Join(err, localErr) + err = errors.Join(err, localErr, mm.db.Rollback()) } } return err @@ -94,12 +97,12 @@ func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) er func (mm MultiMigration) Commit(ctx context.Context) error { var err error - for _, m := range mm { + for _, m := range mm.migrations { if localErr := m.Commit(ctx); localErr != nil { - err = errors.Join(err, localErr) + err = errors.Join(err, localErr, mm.db.Rollback()) } } - return err + return mm.db.Commit() } // guardedMigration is a db data migration whose application is guarded by a boolean in the meta table @@ -180,12 +183,18 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32 func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange, ) (MultiMigration, error) { + // Start a common db transaction for the entire migration duration + err := db.Begin(ctx) + if err != nil { + return MultiMigration{}, errors.Join(err, db.Rollback()) + } + migrationNameToFunc := map[string]migrationApplierF{ transactionsMigrationName: newTransactionTableMigration, eventsMigrationName: newEventTableMigration, } - migrations := make(MultiMigration, 0, len(migrationNameToFunc)) + migrations := make([]Migration, 0, len(migrationNameToFunc)) for migrationName, migrationFunc := range migrationNameToFunc { migrationLogger := logger.WithField("migration", migrationName) @@ -198,9 +207,13 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPass guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db) if err != nil { - return nil, fmt.Errorf("could not create guarded migration for %s: %w", migrationName, err) + return MultiMigration{}, errors.Join(fmt.Errorf( + "could not create guarded migration for %s: %w", migrationName, err), db.Rollback()) } migrations = append(migrations, guardedM) } - return migrations, nil + return MultiMigration{ + migrations: migrations, + db: db, + }, nil } diff --git a/cmd/soroban-rpc/internal/feewindow/feewindow.go b/cmd/soroban-rpc/internal/feewindow/feewindow.go index 3d662fbc..b1d40bee 100644 --- a/cmd/soroban-rpc/internal/feewindow/feewindow.go +++ b/cmd/soroban-rpc/internal/feewindow/feewindow.go @@ -2,6 +2,7 @@ package feewindow import ( + "errors" "io" "slices" "sync" @@ -9,6 +10,7 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" ) @@ -128,20 +130,22 @@ type FeeWindows struct { SorobanInclusionFeeWindow *FeeWindow ClassicFeeWindow *FeeWindow networkPassPhrase string + db *db.DB } -func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string) *FeeWindows { +func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string, db *db.DB) *FeeWindows { return &FeeWindows{ SorobanInclusionFeeWindow: NewFeeWindow(sorobanRetetion), ClassicFeeWindow: NewFeeWindow(classicRetention), networkPassPhrase: networkPassPhrase, + db: db, } } func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(fw.networkPassPhrase, meta) if err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } var sorobanInclusionFees []uint64 var classicFees []uint64 @@ -151,7 +155,7 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { break } if err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } feeCharged := uint64(tx.Result.Result.FeeCharged) ops := tx.Envelope.Operations() @@ -182,11 +186,11 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { BucketContent: classicFees, } if err := fw.ClassicFeeWindow.AppendLedgerFees(bucket); err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } bucket.BucketContent = sorobanInclusionFees if err := fw.SorobanInclusionFeeWindow.AppendLedgerFees(bucket); err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } return nil } From 9a2fa6fed34f66896bcd2ae19e710cbe9c2865ee Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Wed, 14 Aug 2024 17:30:57 -0400 Subject: [PATCH 24/24] Fix failing unittest --- cmd/soroban-rpc/internal/ingest/service_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/soroban-rpc/internal/ingest/service_test.go b/cmd/soroban-rpc/internal/ingest/service_test.go index 2eeaf89f..f3f2d523 100644 --- a/cmd/soroban-rpc/internal/ingest/service_test.go +++ b/cmd/soroban-rpc/internal/ingest/service_test.go @@ -67,7 +67,7 @@ func TestIngestion(t *testing.T) { config := Config{ Logger: supportlog.New(), DB: mockDB, - FeeWindows: feewindow.NewFeeWindows(1, 1, network.TestNetworkPassphrase), + FeeWindows: feewindow.NewFeeWindows(1, 1, network.TestNetworkPassphrase, nil), LedgerBackend: mockLedgerBackend, Daemon: daemon, NetworkPassPhrase: network.TestNetworkPassphrase,