Skip to content

Commit

Permalink
Add events table migration (#262)
Browse files Browse the repository at this point in the history
* Fix migrations - 1

* Make migrations sequential - 1

* Make migrations sequential - 2

* Fix failing unittest - 1

* Fix linting errors - 1

* Fix failing integration test - 1

* Remove %w from Fatal strings

* refactor migrationApplierFactoryF

* Add ledger seq to fatal error string

* Add comments - 1

* Fix - 1

* Optimise migrations - 1

* Optimise migrations - 2

* Optimise migrations - 3

* Fix linting - 1

* Fix linting - 2

* Remove dupicate latest ledger fetch code

* Rollback db in daemon instead of migration

* Remove unused constant

* Remove unused constant - 2

* Add rollback() statement

* Small change

* Abstract transaction and rollback management inside migration code

* Fix failing unittest
  • Loading branch information
aditya1702 authored Aug 16, 2024
1 parent 99d5411 commit 7466e54
Show file tree
Hide file tree
Showing 6 changed files with 127 additions and 132 deletions.
39 changes: 21 additions & 18 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,38 +289,43 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
feewindows := feewindow.NewFeeWindows(
feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
cfg.NetworkPassphrase,
d.db,
)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

// Merge migrations range and fee stats range to get the applicable range
latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(readTxMetaCtx)
maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("failed to get latest ledger sequence: %w", err)
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: latestLedger - maxFeeRetentionWindow, LastLedgerSeq: latestLedger}
applicableRange := dataMigrations.ApplicableRange()
ledgerSeqRange = ledgerSeqRange.Merge(applicableRange)
// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}

// Apply migration for events, transactions and fee stats
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.FirstLedgerSeq,
ledgerSeqRange.LastLedgerSeq,
func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
func(txMeta xdr.LedgerCloseMeta) error {
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
Expand All @@ -332,14 +337,12 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
}).Debug("still initializing in-memory store")
}

if err := feewindows.IngestFees(txmeta); err != nil {
if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}

if applicableRange.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")
}
if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}
return nil
})
Expand All @@ -356,7 +359,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
}).Info("finished initializing in-memory store and applying DB data migrations")
}

return feewindows
return feeWindows
}

func (d *Daemon) Run() {
Expand Down
27 changes: 11 additions & 16 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -322,26 +322,21 @@ func (e *eventTableMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta)
}

func newEventTableMigration(
_ context.Context,
logger *log.Entry,
retentionWindow uint32,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) {
firstLedgerToMigrate := firstLedger
writer := &eventHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}

return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
migration := eventTableMigration{
firstLedger: firstLedgerToMigrate,
lastLedger: latestLedger,
writer: writer,
firstLedger: ledgerSeqRange.FirstLedgerSeq,
lastLedger: ledgerSeqRange.LastLedgerSeq,
writer: &eventHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
},
}
return &migration, nil
})
Expand Down
147 changes: 71 additions & 76 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (

"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
const (
transactionsMigrationName = "TransactionsTable"
eventsMigrationName = "EventsTable"
)

type LedgerSeqRange struct {
Expand Down Expand Up @@ -47,65 +50,59 @@ type MigrationApplier interface {
Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error
}

type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory

type migrationApplierFactory interface {
New(db *DB, latestLedger uint32) (MigrationApplier, error)
New(db *DB) (MigrationApplier, error)
}

type migrationApplierFactoryF func(db *DB, latestLedger uint32) (MigrationApplier, error)
type migrationApplierFactoryF func(db *DB) (MigrationApplier, error)

func (m migrationApplierFactoryF) New(db *DB, latestLedger uint32) (MigrationApplier, error) {
return m(db, latestLedger)
func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) {
return m(db)
}

type Migration interface {
MigrationApplier
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}

type multiMigration []Migration
type MultiMigration struct {
migrations []Migration
db *DB
}

func (mm multiMigration) ApplicableRange() *LedgerSeqRange {
func (mm MultiMigration) ApplicableRange() *LedgerSeqRange {
var result *LedgerSeqRange
for _, m := range mm {
for _, m := range mm.migrations {
result = m.ApplicableRange().Merge(result)
}
return result
}

func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error {
func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error {
var err error
for _, m := range mm {
for _, m := range mm.migrations {
ledgerSeq := meta.LedgerSequence()
if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) {
// The range of a sub-migration can be smaller than the global range.
continue
}
if localErr := m.Apply(ctx, meta); localErr != nil {
err = errors.Join(err, localErr)
err = errors.Join(err, localErr, mm.db.Rollback())
}
}
return err
}

func (mm multiMigration) Commit(ctx context.Context) error {
func (mm MultiMigration) Commit(ctx context.Context) error {
var err error
for _, m := range mm {
for _, m := range mm.migrations {
if localErr := m.Commit(ctx); localErr != nil {
err = errors.Join(err, localErr)
}
}
return err
}

func (mm multiMigration) Rollback(ctx context.Context) error {
var err error
for _, m := range mm {
if localErr := m.Rollback(ctx); localErr != nil {
err = errors.Join(err, localErr)
err = errors.Join(err, localErr, mm.db.Rollback())
}
}
return err
return mm.db.Commit()
}

// guardedMigration is a db data migration whose application is guarded by a boolean in the meta table
Expand All @@ -122,32 +119,18 @@ type guardedMigration struct {
func newGuardedDataMigration(
ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB,
) (Migration, error) {
migrationDB := &DB{
cache: db.cache,
SessionInterface: db.SessionInterface.Clone(),
}
if err := migrationDB.Begin(ctx); err != nil {
return nil, err
}
metaKey := "Migration" + uniqueMigrationName + "Done"
previouslyMigrated, err := getMetaBool(ctx, migrationDB, metaKey)
previouslyMigrated, err := getMetaBool(ctx, db, metaKey)
if err != nil && !errors.Is(err, ErrEmptyDB) {
err = errors.Join(err, migrationDB.Rollback())
return nil, err
}
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
err = errors.Join(err, migrationDB.Rollback())
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
applier, err := factory.New(migrationDB, latestLedger)
applier, err := factory.New(db)
if err != nil {
err = errors.Join(err, migrationDB.Rollback())
return nil, err
}
guardedMigration := &guardedMigration{
guardMetaKey: metaKey,
db: migrationDB,
db: db,
migration: applier,
alreadyMigrated: previouslyMigrated,
logger: logger,
Expand Down Expand Up @@ -179,46 +162,58 @@ func (g *guardedMigration) Commit(ctx context.Context) error {
if g.alreadyMigrated {
return nil
}
err := setMetaBool(ctx, g.db, g.guardMetaKey, true)
if err != nil {
return errors.Join(err, g.Rollback(ctx))
}
return g.db.Commit()
return setMetaBool(ctx, g.db, g.guardMetaKey, true)
}

func (g *guardedMigration) Rollback(_ context.Context) error {
return g.db.Rollback()
func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) {
firstLedgerToMigrate := firstLedger
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return &LedgerSeqRange{
FirstLedgerSeq: firstLedgerToMigrate,
LastLedgerSeq: latestLedger,
}, nil
}

func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) {
var migrations []Migration

migrationName := "TransactionsTable"
logger = logger.WithField("migration", migrationName)
factory := newTransactionTableMigration(
ctx,
logger,
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
)

m1, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db)
func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange *LedgerSeqRange,
) (MultiMigration, error) {
// Start a common db transaction for the entire migration duration
err := db.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("creating guarded transaction migration: %w", err)
return MultiMigration{}, errors.Join(err, db.Rollback())
}
migrations = append(migrations, m1)

eventMigrationName := "EventsTable"
eventFactory := newEventTableMigration(
logger.WithField("migration", eventMigrationName),
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
)
m2, err := newGuardedDataMigration(ctx, eventMigrationName, logger, eventFactory, db)
if err != nil {
return nil, fmt.Errorf("creating guarded transaction migration: %w", err)
migrationNameToFunc := map[string]migrationApplierF{
transactionsMigrationName: newTransactionTableMigration,
eventsMigrationName: newEventTableMigration,
}
migrations = append(migrations, m2)

return multiMigration(migrations), nil
migrations := make([]Migration, 0, len(migrationNameToFunc))

for migrationName, migrationFunc := range migrationNameToFunc {
migrationLogger := logger.WithField("migration", migrationName)
factory := migrationFunc(
ctx,
migrationLogger,
networkPassphrase,
ledgerSeqRange,
)

guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db)
if err != nil {
return MultiMigration{}, errors.Join(fmt.Errorf(
"could not create guarded migration for %s: %w", migrationName, err), db.Rollback())
}
migrations = append(migrations, guardedM)
}
return MultiMigration{
migrations: migrations,
db: db,
}, nil
}
30 changes: 14 additions & 16 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,30 +266,28 @@ func (t *transactionTableMigration) Apply(_ context.Context, meta xdr.LedgerClos
return t.writer.InsertTransactions(meta)
}

func newTransactionTableMigration(ctx context.Context, logger *log.Entry,
retentionWindow uint32, passphrase string,
func newTransactionTableMigration(
ctx context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) {
firstLedgerToMigrate := uint32(2) //nolint:mnd
writer := &transactionHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
// Truncate the table, since it may contain data, causing insert conflicts later on.
// (the migration was shipped after the actual transactions table change)
_, err := db.Exec(ctx, sq.Delete(transactionTableName))
if err != nil {
return nil, fmt.Errorf("couldn't delete table %q: %w", transactionTableName, err)
}
migration := transactionTableMigration{
firstLedger: firstLedgerToMigrate,
lastLedger: latestLedger,
writer: writer,
firstLedger: ledgerSeqRange.FirstLedgerSeq,
lastLedger: ledgerSeqRange.LastLedgerSeq,
writer: &transactionHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
},
}
return &migration, nil
})
Expand Down
Loading

0 comments on commit 7466e54

Please sign in to comment.