Skip to content

Commit

Permalink
Abstract transaction and rollback management inside migration code
Browse files Browse the repository at this point in the history
  • Loading branch information
aditya1702 committed Aug 14, 2024
1 parent d7966a4 commit fb738bf
Show file tree
Hide file tree
Showing 3 changed files with 38 additions and 32 deletions.
23 changes: 6 additions & 17 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -293,6 +293,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
cfg.NetworkPassphrase,
d.db,
)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
Expand All @@ -313,21 +314,9 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

// Start a common db transaction for the entire migration duration
err = d.db.Begin(readTxMetaCtx)
if err != nil {
d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback())
}
defer func() {
err = d.db.Commit()
if err != nil {
d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback())
}
}()

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations: ", d.db.Rollback())
d.logger.WithError(err).Fatal("could not build migrations")
}

// Apply migration for events, transactions and fee stats
Expand All @@ -349,19 +338,19 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
}

if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats: ", d.db.Rollback())
d.logger.WithError(err).Fatal("could not initialize fee stats")
}

if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq, ": ", d.db.Rollback())
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database: ", d.db.Rollback())
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations ", d.db.Rollback())
d.logger.WithError(err).Fatal("could not commit data migrations")
}

if currentSeq != 0 {
Expand Down
33 changes: 23 additions & 10 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -67,39 +67,42 @@ type Migration interface {
Commit(ctx context.Context) error
}

type MultiMigration []Migration
type MultiMigration struct {
migrations []Migration
db *DB
}

func (mm MultiMigration) ApplicableRange() *LedgerSeqRange {
var result *LedgerSeqRange
for _, m := range mm {
for _, m := range mm.migrations {
result = m.ApplicableRange().Merge(result)
}
return result
}

func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error {
var err error
for _, m := range mm {
for _, m := range mm.migrations {
ledgerSeq := meta.LedgerSequence()
if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) {
// The range of a sub-migration can be smaller than the global range.
continue
}
if localErr := m.Apply(ctx, meta); localErr != nil {
err = errors.Join(err, localErr)
err = errors.Join(err, localErr, mm.db.Rollback())
}
}
return err
}

func (mm MultiMigration) Commit(ctx context.Context) error {
var err error
for _, m := range mm {
for _, m := range mm.migrations {
if localErr := m.Commit(ctx); localErr != nil {
err = errors.Join(err, localErr)
err = errors.Join(err, localErr, mm.db.Rollback())
}
}
return err
return mm.db.Commit()
}

// guardedMigration is a db data migration whose application is guarded by a boolean in the meta table
Expand Down Expand Up @@ -180,12 +183,18 @@ func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32
func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange *LedgerSeqRange,
) (MultiMigration, error) {
// Start a common db transaction for the entire migration duration
err := db.Begin(ctx)
if err != nil {
return MultiMigration{}, errors.Join(err, db.Rollback())
}

migrationNameToFunc := map[string]migrationApplierF{
transactionsMigrationName: newTransactionTableMigration,
eventsMigrationName: newEventTableMigration,
}

migrations := make(MultiMigration, 0, len(migrationNameToFunc))
migrations := make([]Migration, 0, len(migrationNameToFunc))

for migrationName, migrationFunc := range migrationNameToFunc {
migrationLogger := logger.WithField("migration", migrationName)
Expand All @@ -198,9 +207,13 @@ func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPass

guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db)
if err != nil {
return nil, fmt.Errorf("could not create guarded migration for %s: %w", migrationName, err)
return MultiMigration{}, errors.Join(fmt.Errorf(
"could not create guarded migration for %s: %w", migrationName, err), db.Rollback())
}
migrations = append(migrations, guardedM)
}
return migrations, nil
return MultiMigration{
migrations: migrations,
db: db,
}, nil
}
14 changes: 9 additions & 5 deletions cmd/soroban-rpc/internal/feewindow/feewindow.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,15 @@
package feewindow

import (
"errors"
"io"
"slices"
"sync"

"github.com/stellar/go/ingest"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

Expand Down Expand Up @@ -128,20 +130,22 @@ type FeeWindows struct {
SorobanInclusionFeeWindow *FeeWindow
ClassicFeeWindow *FeeWindow
networkPassPhrase string
db *db.DB
}

func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string) *FeeWindows {
func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string, db *db.DB) *FeeWindows {
return &FeeWindows{
SorobanInclusionFeeWindow: NewFeeWindow(sorobanRetetion),
ClassicFeeWindow: NewFeeWindow(classicRetention),
networkPassPhrase: networkPassPhrase,
db: db,
}
}

func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error {
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(fw.networkPassPhrase, meta)
if err != nil {
return err
return errors.Join(err, fw.db.Rollback())
}
var sorobanInclusionFees []uint64
var classicFees []uint64
Expand All @@ -151,7 +155,7 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error {
break
}
if err != nil {
return err
return errors.Join(err, fw.db.Rollback())
}
feeCharged := uint64(tx.Result.Result.FeeCharged)
ops := tx.Envelope.Operations()
Expand Down Expand Up @@ -182,11 +186,11 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error {
BucketContent: classicFees,
}
if err := fw.ClassicFeeWindow.AppendLedgerFees(bucket); err != nil {
return err
return errors.Join(err, fw.db.Rollback())
}
bucket.BucketContent = sorobanInclusionFees
if err := fw.SorobanInclusionFeeWindow.AppendLedgerFees(bucket); err != nil {
return err
return errors.Join(err, fw.db.Rollback())
}
return nil
}

0 comments on commit fb738bf

Please sign in to comment.