diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index b1983cb8..7d3d0adb 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -289,38 +289,43 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { // mustInitializeStorage initializes the storage using what was on the DB func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows { - feewindows := feewindow.NewFeeWindows( + feeWindows := feewindow.NewFeeWindows( cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase, + d.db, ) readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout) defer cancelReadTxMeta() var initialSeq uint32 var currentSeq uint32 - dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg) + applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow) if err != nil { - d.logger.WithError(err).Fatal("could not build migrations") + d.logger.WithError(err).Fatal("could not get ledger range for migration") } - // Merge migrations range and fee stats range to get the applicable range - latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(readTxMetaCtx) + maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow) + feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow) if err != nil { - d.logger.WithError(err).Fatal("failed to get latest ledger sequence: %w", err) + d.logger.WithError(err).Fatal("could not get ledger range for fee stats") } - maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow) - ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: latestLedger - maxFeeRetentionWindow, LastLedgerSeq: latestLedger} - applicableRange := dataMigrations.ApplicableRange() - ledgerSeqRange = ledgerSeqRange.Merge(applicableRange) + // Combine the ledger range for fees, events and transactions + ledgerSeqRange := feeStatsRange.Merge(applicableRange) + + dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) + if err != nil { + d.logger.WithError(err).Fatal("could not build migrations") + } + // Apply migration for events, transactions and fee stats err = db.NewLedgerReader(d.db).StreamLedgerRange( readTxMetaCtx, ledgerSeqRange.FirstLedgerSeq, ledgerSeqRange.LastLedgerSeq, - func(txmeta xdr.LedgerCloseMeta) error { - currentSeq = txmeta.LedgerSequence() + func(txMeta xdr.LedgerCloseMeta) error { + currentSeq = txMeta.LedgerSequence() if initialSeq == 0 { initialSeq = currentSeq d.logger.WithFields(supportlog.F{ @@ -332,14 +337,12 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows }).Debug("still initializing in-memory store") } - if err := feewindows.IngestFees(txmeta); err != nil { + if err = feeWindows.IngestFees(txMeta); err != nil { d.logger.WithError(err).Fatal("could not initialize fee stats") } - if applicableRange.IsLedgerIncluded(currentSeq) { - if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil { - d.logger.WithError(err).Fatal("could not run migrations") - } + if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil { + d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq) } return nil }) @@ -356,7 +359,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows }).Info("finished initializing in-memory store and applying DB data migrations") } - return feewindows + return feeWindows } func (d *Daemon) Run() { diff --git a/cmd/soroban-rpc/internal/db/event.go b/cmd/soroban-rpc/internal/db/event.go index 6f62d9ff..01a32192 100644 --- a/cmd/soroban-rpc/internal/db/event.go +++ b/cmd/soroban-rpc/internal/db/event.go @@ -322,26 +322,21 @@ func (e *eventTableMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta) } func newEventTableMigration( + _ context.Context, logger *log.Entry, - retentionWindow uint32, passphrase string, + ledgerSeqRange *LedgerSeqRange, ) migrationApplierFactory { - return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) { - firstLedgerToMigrate := firstLedger - writer := &eventHandler{ - log: logger, - db: db, - stmtCache: sq.NewStmtCache(db.GetTx()), - passphrase: passphrase, - } - if latestLedger > retentionWindow { - firstLedgerToMigrate = latestLedger - retentionWindow - } - + return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) { migration := eventTableMigration{ - firstLedger: firstLedgerToMigrate, - lastLedger: latestLedger, - writer: writer, + firstLedger: ledgerSeqRange.FirstLedgerSeq, + lastLedger: ledgerSeqRange.LastLedgerSeq, + writer: &eventHandler{ + log: logger, + db: db, + stmtCache: sq.NewStmtCache(db.GetTx()), + passphrase: passphrase, + }, } return &migration, nil }) diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 4c7c9205..69df9de2 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -7,8 +7,11 @@ import ( "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" +) - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config" +const ( + transactionsMigrationName = "TransactionsTable" + eventsMigrationName = "EventsTable" ) type LedgerSeqRange struct { @@ -47,65 +50,59 @@ type MigrationApplier interface { Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error } +type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory + type migrationApplierFactory interface { - New(db *DB, latestLedger uint32) (MigrationApplier, error) + New(db *DB) (MigrationApplier, error) } -type migrationApplierFactoryF func(db *DB, latestLedger uint32) (MigrationApplier, error) +type migrationApplierFactoryF func(db *DB) (MigrationApplier, error) -func (m migrationApplierFactoryF) New(db *DB, latestLedger uint32) (MigrationApplier, error) { - return m(db, latestLedger) +func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) { + return m(db) } type Migration interface { MigrationApplier Commit(ctx context.Context) error - Rollback(ctx context.Context) error } -type multiMigration []Migration +type MultiMigration struct { + migrations []Migration + db *DB +} -func (mm multiMigration) ApplicableRange() *LedgerSeqRange { +func (mm MultiMigration) ApplicableRange() *LedgerSeqRange { var result *LedgerSeqRange - for _, m := range mm { + for _, m := range mm.migrations { result = m.ApplicableRange().Merge(result) } return result } -func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { +func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { var err error - for _, m := range mm { + for _, m := range mm.migrations { ledgerSeq := meta.LedgerSequence() if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) { // The range of a sub-migration can be smaller than the global range. continue } if localErr := m.Apply(ctx, meta); localErr != nil { - err = errors.Join(err, localErr) + err = errors.Join(err, localErr, mm.db.Rollback()) } } return err } -func (mm multiMigration) Commit(ctx context.Context) error { +func (mm MultiMigration) Commit(ctx context.Context) error { var err error - for _, m := range mm { + for _, m := range mm.migrations { if localErr := m.Commit(ctx); localErr != nil { - err = errors.Join(err, localErr) - } - } - return err -} - -func (mm multiMigration) Rollback(ctx context.Context) error { - var err error - for _, m := range mm { - if localErr := m.Rollback(ctx); localErr != nil { - err = errors.Join(err, localErr) + err = errors.Join(err, localErr, mm.db.Rollback()) } } - return err + return mm.db.Commit() } // guardedMigration is a db data migration whose application is guarded by a boolean in the meta table @@ -122,32 +119,18 @@ type guardedMigration struct { func newGuardedDataMigration( ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB, ) (Migration, error) { - migrationDB := &DB{ - cache: db.cache, - SessionInterface: db.SessionInterface.Clone(), - } - if err := migrationDB.Begin(ctx); err != nil { - return nil, err - } metaKey := "Migration" + uniqueMigrationName + "Done" - previouslyMigrated, err := getMetaBool(ctx, migrationDB, metaKey) + previouslyMigrated, err := getMetaBool(ctx, db, metaKey) if err != nil && !errors.Is(err, ErrEmptyDB) { - err = errors.Join(err, migrationDB.Rollback()) return nil, err } - latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) - if err != nil && !errors.Is(err, ErrEmptyDB) { - err = errors.Join(err, migrationDB.Rollback()) - return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err) - } - applier, err := factory.New(migrationDB, latestLedger) + applier, err := factory.New(db) if err != nil { - err = errors.Join(err, migrationDB.Rollback()) return nil, err } guardedMigration := &guardedMigration{ guardMetaKey: metaKey, - db: migrationDB, + db: db, migration: applier, alreadyMigrated: previouslyMigrated, logger: logger, @@ -179,46 +162,58 @@ func (g *guardedMigration) Commit(ctx context.Context) error { if g.alreadyMigrated { return nil } - err := setMetaBool(ctx, g.db, g.guardMetaKey, true) - if err != nil { - return errors.Join(err, g.Rollback(ctx)) - } - return g.db.Commit() + return setMetaBool(ctx, g.db, g.guardMetaKey, true) } -func (g *guardedMigration) Rollback(_ context.Context) error { - return g.db.Rollback() +func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) { + firstLedgerToMigrate := firstLedger + latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) + if err != nil && !errors.Is(err, ErrEmptyDB) { + return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err) + } + if latestLedger > retentionWindow { + firstLedgerToMigrate = latestLedger - retentionWindow + } + return &LedgerSeqRange{ + FirstLedgerSeq: firstLedgerToMigrate, + LastLedgerSeq: latestLedger, + }, nil } -func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) { - var migrations []Migration - - migrationName := "TransactionsTable" - logger = logger.WithField("migration", migrationName) - factory := newTransactionTableMigration( - ctx, - logger, - cfg.HistoryRetentionWindow, - cfg.NetworkPassphrase, - ) - - m1, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db) +func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, + ledgerSeqRange *LedgerSeqRange, +) (MultiMigration, error) { + // Start a common db transaction for the entire migration duration + err := db.Begin(ctx) if err != nil { - return nil, fmt.Errorf("creating guarded transaction migration: %w", err) + return MultiMigration{}, errors.Join(err, db.Rollback()) } - migrations = append(migrations, m1) - eventMigrationName := "EventsTable" - eventFactory := newEventTableMigration( - logger.WithField("migration", eventMigrationName), - cfg.HistoryRetentionWindow, - cfg.NetworkPassphrase, - ) - m2, err := newGuardedDataMigration(ctx, eventMigrationName, logger, eventFactory, db) - if err != nil { - return nil, fmt.Errorf("creating guarded transaction migration: %w", err) + migrationNameToFunc := map[string]migrationApplierF{ + transactionsMigrationName: newTransactionTableMigration, + eventsMigrationName: newEventTableMigration, } - migrations = append(migrations, m2) - return multiMigration(migrations), nil + migrations := make([]Migration, 0, len(migrationNameToFunc)) + + for migrationName, migrationFunc := range migrationNameToFunc { + migrationLogger := logger.WithField("migration", migrationName) + factory := migrationFunc( + ctx, + migrationLogger, + networkPassphrase, + ledgerSeqRange, + ) + + guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db) + if err != nil { + return MultiMigration{}, errors.Join(fmt.Errorf( + "could not create guarded migration for %s: %w", migrationName, err), db.Rollback()) + } + migrations = append(migrations, guardedM) + } + return MultiMigration{ + migrations: migrations, + db: db, + }, nil } diff --git a/cmd/soroban-rpc/internal/db/transaction.go b/cmd/soroban-rpc/internal/db/transaction.go index 1ef6818b..931d6715 100644 --- a/cmd/soroban-rpc/internal/db/transaction.go +++ b/cmd/soroban-rpc/internal/db/transaction.go @@ -266,20 +266,13 @@ func (t *transactionTableMigration) Apply(_ context.Context, meta xdr.LedgerClos return t.writer.InsertTransactions(meta) } -func newTransactionTableMigration(ctx context.Context, logger *log.Entry, - retentionWindow uint32, passphrase string, +func newTransactionTableMigration( + ctx context.Context, + logger *log.Entry, + passphrase string, + ledgerSeqRange *LedgerSeqRange, ) migrationApplierFactory { - return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) { - firstLedgerToMigrate := uint32(2) //nolint:mnd - writer := &transactionHandler{ - log: logger, - db: db, - stmtCache: sq.NewStmtCache(db.GetTx()), - passphrase: passphrase, - } - if latestLedger > retentionWindow { - firstLedgerToMigrate = latestLedger - retentionWindow - } + return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) { // Truncate the table, since it may contain data, causing insert conflicts later on. // (the migration was shipped after the actual transactions table change) _, err := db.Exec(ctx, sq.Delete(transactionTableName)) @@ -287,9 +280,14 @@ func newTransactionTableMigration(ctx context.Context, logger *log.Entry, return nil, fmt.Errorf("couldn't delete table %q: %w", transactionTableName, err) } migration := transactionTableMigration{ - firstLedger: firstLedgerToMigrate, - lastLedger: latestLedger, - writer: writer, + firstLedger: ledgerSeqRange.FirstLedgerSeq, + lastLedger: ledgerSeqRange.LastLedgerSeq, + writer: &transactionHandler{ + log: logger, + db: db, + stmtCache: sq.NewStmtCache(db.GetTx()), + passphrase: passphrase, + }, } return &migration, nil }) diff --git a/cmd/soroban-rpc/internal/feewindow/feewindow.go b/cmd/soroban-rpc/internal/feewindow/feewindow.go index 3d662fbc..b1d40bee 100644 --- a/cmd/soroban-rpc/internal/feewindow/feewindow.go +++ b/cmd/soroban-rpc/internal/feewindow/feewindow.go @@ -2,6 +2,7 @@ package feewindow import ( + "errors" "io" "slices" "sync" @@ -9,6 +10,7 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" ) @@ -128,20 +130,22 @@ type FeeWindows struct { SorobanInclusionFeeWindow *FeeWindow ClassicFeeWindow *FeeWindow networkPassPhrase string + db *db.DB } -func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string) *FeeWindows { +func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string, db *db.DB) *FeeWindows { return &FeeWindows{ SorobanInclusionFeeWindow: NewFeeWindow(sorobanRetetion), ClassicFeeWindow: NewFeeWindow(classicRetention), networkPassPhrase: networkPassPhrase, + db: db, } } func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(fw.networkPassPhrase, meta) if err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } var sorobanInclusionFees []uint64 var classicFees []uint64 @@ -151,7 +155,7 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { break } if err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } feeCharged := uint64(tx.Result.Result.FeeCharged) ops := tx.Envelope.Operations() @@ -182,11 +186,11 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { BucketContent: classicFees, } if err := fw.ClassicFeeWindow.AppendLedgerFees(bucket); err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } bucket.BucketContent = sorobanInclusionFees if err := fw.SorobanInclusionFeeWindow.AppendLedgerFees(bucket); err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } return nil } diff --git a/cmd/soroban-rpc/internal/ingest/service_test.go b/cmd/soroban-rpc/internal/ingest/service_test.go index 2eeaf89f..f3f2d523 100644 --- a/cmd/soroban-rpc/internal/ingest/service_test.go +++ b/cmd/soroban-rpc/internal/ingest/service_test.go @@ -67,7 +67,7 @@ func TestIngestion(t *testing.T) { config := Config{ Logger: supportlog.New(), DB: mockDB, - FeeWindows: feewindow.NewFeeWindows(1, 1, network.TestNetworkPassphrase), + FeeWindows: feewindow.NewFeeWindows(1, 1, network.TestNetworkPassphrase, nil), LedgerBackend: mockLedgerBackend, Daemon: daemon, NetworkPassPhrase: network.TestNetworkPassphrase,