diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index 4104f9b1..7d3d0adb 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -293,6 +293,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase, + d.db, ) readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout) @@ -313,21 +314,9 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows // Combine the ledger range for fees, events and transactions ledgerSeqRange := feeStatsRange.Merge(applicableRange) - // Start a common db transaction for the entire migration duration - err = d.db.Begin(readTxMetaCtx) - if err != nil { - d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback()) - } - defer func() { - err = d.db.Commit() - if err != nil { - d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback()) - } - }() - dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) if err != nil { - d.logger.WithError(err).Fatal("could not build migrations: ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not build migrations") } // Apply migration for events, transactions and fee stats @@ -349,19 +338,19 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows } if err = feeWindows.IngestFees(txMeta); err != nil { - d.logger.WithError(err).Fatal("could not initialize fee stats: ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not initialize fee stats") } if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil { - d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq, ": ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq) } return nil }) if err != nil { - d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database: ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") } if err := dataMigrations.Commit(readTxMetaCtx); err != nil { - d.logger.WithError(err).Fatal("could not commit data migrations ", d.db.Rollback()) + d.logger.WithError(err).Fatal("could not commit data migrations") } if currentSeq != 0 { diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 1f586989..69df9de2 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -67,11 +67,14 @@ type Migration interface { Commit(ctx context.Context) error } -type MultiMigration []Migration +type MultiMigration struct { + migrations []Migration + db *DB +} func (mm MultiMigration) ApplicableRange() *LedgerSeqRange { var result *LedgerSeqRange - for _, m := range mm { + for _, m := range mm.migrations { result = m.ApplicableRange().Merge(result) } return result @@ -79,14 +82,14 @@ func (mm MultiMigration) ApplicableRange() *LedgerSeqRange { func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { var err error - for _, m := range mm { + for _, m := range mm.migrations { ledgerSeq := meta.LedgerSequence() if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) { // The range of a sub-migration can be smaller than the global range. continue } if localErr := m.Apply(ctx, meta); localErr != nil { - err = errors.Join(err, localErr) + err = errors.Join(err, localErr, mm.db.Rollback()) } } return err @@ -94,12 +97,12 @@ func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) er func (mm MultiMigration) Commit(ctx context.Context) error { var err error - for _, m := range mm { + for _, m := range mm.migrations { if localErr := m.Commit(ctx); localErr != nil { - err = errors.Join(err, localErr) + err = errors.Join(err, localErr, mm.db.Rollback()) } } - return err + return mm.db.Commit() } // guardedMigration is a db data migration whose application is guarded by a boolean in the meta table @@ -180,12 +183,18 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32 func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, ledgerSeqRange *LedgerSeqRange, ) (MultiMigration, error) { + // Start a common db transaction for the entire migration duration + err := db.Begin(ctx) + if err != nil { + return MultiMigration{}, errors.Join(err, db.Rollback()) + } + migrationNameToFunc := map[string]migrationApplierF{ transactionsMigrationName: newTransactionTableMigration, eventsMigrationName: newEventTableMigration, } - migrations := make(MultiMigration, 0, len(migrationNameToFunc)) + migrations := make([]Migration, 0, len(migrationNameToFunc)) for migrationName, migrationFunc := range migrationNameToFunc { migrationLogger := logger.WithField("migration", migrationName) @@ -198,9 +207,13 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPass guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db) if err != nil { - return nil, fmt.Errorf("could not create guarded migration for %s: %w", migrationName, err) + return MultiMigration{}, errors.Join(fmt.Errorf( + "could not create guarded migration for %s: %w", migrationName, err), db.Rollback()) } migrations = append(migrations, guardedM) } - return migrations, nil + return MultiMigration{ + migrations: migrations, + db: db, + }, nil } diff --git a/cmd/soroban-rpc/internal/feewindow/feewindow.go b/cmd/soroban-rpc/internal/feewindow/feewindow.go index 3d662fbc..b1d40bee 100644 --- a/cmd/soroban-rpc/internal/feewindow/feewindow.go +++ b/cmd/soroban-rpc/internal/feewindow/feewindow.go @@ -2,6 +2,7 @@ package feewindow import ( + "errors" "io" "slices" "sync" @@ -9,6 +10,7 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" ) @@ -128,20 +130,22 @@ type FeeWindows struct { SorobanInclusionFeeWindow *FeeWindow ClassicFeeWindow *FeeWindow networkPassPhrase string + db *db.DB } -func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string) *FeeWindows { +func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string, db *db.DB) *FeeWindows { return &FeeWindows{ SorobanInclusionFeeWindow: NewFeeWindow(sorobanRetetion), ClassicFeeWindow: NewFeeWindow(classicRetention), networkPassPhrase: networkPassPhrase, + db: db, } } func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(fw.networkPassPhrase, meta) if err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } var sorobanInclusionFees []uint64 var classicFees []uint64 @@ -151,7 +155,7 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { break } if err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } feeCharged := uint64(tx.Result.Result.FeeCharged) ops := tx.Envelope.Operations() @@ -182,11 +186,11 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { BucketContent: classicFees, } if err := fw.ClassicFeeWindow.AppendLedgerFees(bucket); err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } bucket.BucketContent = sorobanInclusionFees if err := fw.SorobanInclusionFeeWindow.AppendLedgerFees(bucket); err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } return nil }