From 331e630175682ddfa411bc6b27af8e2c16471d42 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Tue, 11 Jun 2024 05:14:52 +0200 Subject: [PATCH] Factor out transaction table migration cleanly --- cmd/soroban-rpc/internal/daemon/daemon.go | 69 ++-------- cmd/soroban-rpc/internal/db/db.go | 53 +++---- cmd/soroban-rpc/internal/db/ledgerentry.go | 10 +- cmd/soroban-rpc/internal/db/migration.go | 130 ++++++++++++++++++ .../{migrations => sqlmigrations}/01_init.sql | 0 .../02_transactions.sql | 0 cmd/soroban-rpc/internal/db/transaction.go | 45 ++++-- 7 files changed, 212 insertions(+), 95 deletions(-) create mode 100644 cmd/soroban-rpc/internal/db/migration.go rename cmd/soroban-rpc/internal/db/{migrations => sqlmigrations}/01_init.sql (100%) rename cmd/soroban-rpc/internal/db/{migrations => sqlmigrations}/02_transactions.sql (100%) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index d7bb0f54..0afe7596 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -241,7 +241,7 @@ func MustNew(cfg *config.Config) *Daemon { Logger: logger, LedgerReader: db.NewLedgerReader(dbConn), LedgerEntryReader: db.NewLedgerEntryReader(dbConn), - TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase), + TransactionReader: db.NewTransactionReader(logger, dbConn.SessionInterface, cfg.NetworkPassphrase), PreflightGetter: preflightWorkerPool, }) @@ -290,11 +290,14 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow defer cancelReadTxMeta() var initialSeq uint32 var currentSeq uint32 - migration, migrationDone := d.newTxMigration(readTxMetaCtx, cfg) + dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg) + if err != nil { + d.logger.WithError(err).Fatal("could not build migrations") + } // NOTE: We could optimize this to avoid unnecessary ingestion calls // (the range of txmetas can be larger than the individual store retention windows) // but it's probably not worth the pain. - err := db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error { + err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error { currentSeq = txmeta.LedgerSequence() if initialSeq == 0 { initialSeq = currentSeq @@ -312,16 +315,17 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow if err := feewindows.IngestFees(txmeta); err != nil { d.logger.WithError(err).Fatal("could not initialize fee stats") } - if err := migration(txmeta); err != nil { - // TODO: we should only migrate the transaction range - d.logger.WithError(err).Fatal("could not run migration") + if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil { + d.logger.WithError(err).Fatal("could not run migrations") } return nil }) if err != nil { d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") } - migrationDone() + if err := dataMigrations.Commit(readTxMetaCtx); err != nil { + d.logger.WithError(err).Fatal("could not commit data migrations") + } if currentSeq != 0 { d.logger.WithFields(supportlog.F{ @@ -332,57 +336,6 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow return feewindows, eventStore } -// TODO: We should probably implement the migrations somewhere else -type migrationFunc func(txmeta xdr.LedgerCloseMeta) error -type migrationDoneFunc func() - -func (d *Daemon) newTxMigration(ctx context.Context, cfg *config.Config) (migrationFunc, migrationDoneFunc) { - migrationSession := d.db.Clone() - if err := migrationSession.Begin(ctx); err != nil { - d.logger.WithError(err).Fatal("could not start migration session") - } - migration := func(txmeta xdr.LedgerCloseMeta) error { return nil } - migrationDone := func() {} - previouslyMigrated, err := db.GetMetaBool(ctx, migrationSession, transactionsTableMigrationDoneMetaKey) - if err != nil { - if !errors.Is(err, db.ErrEmptyDB) { - d.logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not get metadata") - } - } else if previouslyMigrated { - migrationSession.Rollback() - return migration, migrationDone - } - - d.logger.Info("migrating transactions to new backend") - writer := db.NewTransactionWriter(d.logger, migrationSession, cfg.NetworkPassphrase) - latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(ctx) - if err != nil || err != db.ErrEmptyDB { - d.logger.WithError(err).Fatal("cannot read latest ledger") - } - firstLedgerToMigrate := uint32(2) - if latestLedger > cfg.TransactionLedgerRetentionWindow { - firstLedgerToMigrate = latestLedger - cfg.TransactionLedgerRetentionWindow - } - migration = func(txmeta xdr.LedgerCloseMeta) error { - if txmeta.LedgerSequence() < firstLedgerToMigrate { - return nil - } - return writer.InsertTransactions(txmeta) - } - migrationDone = func() { - err := db.SetMetaBool(ctx, migrationSession, transactionsTableMigrationDoneMetaKey) - if err != nil { - d.logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not set metadata") - migrationSession.Rollback() - return - } - if err = migrationSession.Commit(); err != nil { - d.logger.WithError(err).Error("could not commit migration session") - } - } - return migration, migrationDone -} - func (d *Daemon) Run() { d.logger.WithFields(supportlog.F{ "addr": d.server.Addr, diff --git a/cmd/soroban-rpc/internal/db/db.go b/cmd/soroban-rpc/internal/db/db.go index 126b0e20..2058773e 100644 --- a/cmd/soroban-rpc/internal/db/db.go +++ b/cmd/soroban-rpc/internal/db/db.go @@ -21,8 +21,8 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" ) -//go:embed migrations/*.sql -var migrations embed.FS +//go:embed sqlmigrations/*.sql +var sqlMigrations embed.FS var ErrEmptyDB = errors.New("DB is empty") @@ -53,7 +53,14 @@ type dbCache struct { type DB struct { db.SessionInterface - cache dbCache + cache *dbCache +} + +func (db *DB) Clone() *DB { + return &DB{ + SessionInterface: db.SessionInterface.Clone(), + cache: db.cache, + } } func openSQLiteDB(dbFilePath string) (*db.Session, error) { @@ -66,9 +73,9 @@ func openSQLiteDB(dbFilePath string) (*db.Session, error) { return nil, errors.Wrap(err, "open failed") } - if err = runMigrations(session.DB.DB, "sqlite3"); err != nil { + if err = runSQLMigrations(session.DB.DB, "sqlite3"); err != nil { _ = session.Close() - return nil, errors.Wrap(err, "could not run migrations") + return nil, errors.Wrap(err, "could not run SQL migrations") } return session, nil } @@ -80,7 +87,7 @@ func OpenSQLiteDBWithPrometheusMetrics(dbFilePath string, namespace string, sub } result := DB{ SessionInterface: db.RegisterMetrics(session, namespace, sub, registry), - cache: dbCache{ + cache: &dbCache{ ledgerEntries: newTransactionalCache(), }, } @@ -94,14 +101,14 @@ func OpenSQLiteDB(dbFilePath string) (*DB, error) { } result := DB{ SessionInterface: session, - cache: dbCache{ + cache: &dbCache{ ledgerEntries: newTransactionalCache(), }, } return &result, nil } -func GetMetaBool(ctx context.Context, q db.SessionInterface, key string) (bool, error) { +func getMetaBool(ctx context.Context, q db.SessionInterface, key string) (bool, error) { valueStr, err := getMetaValue(ctx, q, key) if err != nil { return false, err @@ -109,7 +116,7 @@ func GetMetaBool(ctx context.Context, q db.SessionInterface, key string) (bool, return strconv.ParseBool(valueStr) } -func SetMetaBool(ctx context.Context, q db.SessionInterface, key string) error { +func setMetaBool(ctx context.Context, q db.SessionInterface, key string) error { query := sq.Replace(metaTableName). Values(key, "true") _, err := q.Exec(ctx, query) @@ -134,6 +141,12 @@ func getMetaValue(ctx context.Context, q db.SessionInterface, key string) (strin } func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *dbCache) (uint32, error) { + cache.Lock() + defer cache.Unlock() + if cache.latestLedgerSeq != 0 { + return cache.latestLedgerSeq, nil + } + latestLedgerStr, err := getMetaValue(ctx, q, latestLedgerSequenceMetaKey) if err != nil { return 0, err @@ -146,13 +159,7 @@ func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache * // Add missing ledger sequence to the top cache. // Otherwise, the write-through cache won't get updated until the first ingestion commit - cache.Lock() - if cache.latestLedgerSeq == 0 { - // Only update the cache if value is missing (0), otherwise - // we may end up overwriting the entry with an older version - cache.latestLedgerSeq = result - } - cache.Unlock() + cache.latestLedgerSeq = result return result, nil } @@ -215,11 +222,11 @@ func NewReadWriter( } func (rw *readWriter) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { - return getLatestLedgerSequence(ctx, rw.db, &rw.db.cache) + return getLatestLedgerSequence(ctx, rw.db.SessionInterface, rw.db.cache) } func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) { - txSession := rw.db.Clone() + txSession := rw.db.SessionInterface.Clone() if err := txSession.Begin(ctx); err != nil { return nil, err } @@ -227,7 +234,7 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) { db := rw.db writer := writeTx{ - globalCache: &db.cache, + globalCache: db.cache, postCommit: func() error { // TODO: this is sqlite-only, it shouldn't be here _, err := db.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)") @@ -332,12 +339,12 @@ func (w writeTx) Rollback() error { } } -func runMigrations(db *sql.DB, dialect string) error { +func runSQLMigrations(db *sql.DB, dialect string) error { m := &migrate.AssetMigrationSource{ - Asset: migrations.ReadFile, + Asset: sqlMigrations.ReadFile, AssetDir: func() func(string) ([]string, error) { return func(path string) ([]string, error) { - dirEntry, err := migrations.ReadDir(path) + dirEntry, err := sqlMigrations.ReadDir(path) if err != nil { return nil, err } @@ -349,7 +356,7 @@ func runMigrations(db *sql.DB, dialect string) error { return entries, nil } }(), - Dir: "migrations", + Dir: "sqlmigrations", } _, err := migrate.ExecMax(db, dialect, m, migrate.Up, 0) return err diff --git a/cmd/soroban-rpc/internal/db/ledgerentry.go b/cmd/soroban-rpc/internal/db/ledgerentry.go index 8286e955..d9d60d37 100644 --- a/cmd/soroban-rpc/internal/db/ledgerentry.go +++ b/cmd/soroban-rpc/internal/db/ledgerentry.go @@ -341,12 +341,12 @@ func NewLedgerEntryReader(db *DB) LedgerEntryReader { } func (r ledgerEntryReader) GetLatestLedgerSequence(ctx context.Context) (uint32, error) { - return getLatestLedgerSequence(ctx, r.db, &r.db.cache) + return getLatestLedgerSequence(ctx, r.db.SessionInterface, r.db.cache) } // NewCachedTx() caches all accessed ledger entries and select statements. If many ledger entries are accessed, it will grow without bounds. func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx, error) { - txSession := r.db.Clone() + txSession := r.db.SessionInterface.Clone() // We need to copy the cached ledger entries locally when we start the transaction // since otherwise we would break the consistency between the transaction and the cache. @@ -360,7 +360,7 @@ func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx, } cacheReadTx := r.db.cache.ledgerEntries.newReadTx() return &ledgerEntryReadTx{ - globalCache: &r.db.cache, + globalCache: r.db.cache, stmtCache: sq.NewStmtCache(txSession.GetTx()), latestLedgerSeqCache: r.db.cache.latestLedgerSeq, ledgerEntryCacheReadTx: &cacheReadTx, @@ -370,14 +370,14 @@ func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx, } func (r ledgerEntryReader) NewTx(ctx context.Context) (LedgerEntryReadTx, error) { - txSession := r.db.Clone() + txSession := r.db.SessionInterface.Clone() if err := txSession.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}); err != nil { return nil, err } r.db.cache.RLock() defer r.db.cache.RUnlock() return &ledgerEntryReadTx{ - globalCache: &r.db.cache, + globalCache: r.db.cache, latestLedgerSeqCache: r.db.cache.latestLedgerSeq, tx: txSession, buffer: xdr.NewEncodingBuffer(), diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go new file mode 100644 index 00000000..7b2f69b3 --- /dev/null +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -0,0 +1,130 @@ +package db + +import ( + "context" + "errors" + "fmt" + + "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" + + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config" +) + +type MigrationApplier interface { + Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error +} + +type migrationApplierFactory interface { + New(db *DB) (MigrationApplier, error) +} + +type migrationApplierFactoryF func(db *DB) (MigrationApplier, error) + +func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) { + return m(db) +} + +type Migration interface { + MigrationApplier + Commit(ctx context.Context) error + Rollback(ctx context.Context) error +} + +type multiMigration []Migration + +func (m multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { + var err error + for _, data := range m { + if localErr := data.Apply(ctx, meta); localErr != nil { + err = errors.Join(err, localErr) + } + } + return err +} + +func (m multiMigration) Commit(ctx context.Context) error { + var err error + for _, data := range m { + if localErr := data.Commit(ctx); localErr != nil { + err = errors.Join(err, localErr) + } + } + return err +} + +func (m multiMigration) Rollback(ctx context.Context) error { + var err error + for _, data := range m { + if localErr := data.Rollback(ctx); localErr != nil { + err = errors.Join(err, localErr) + } + } + return err +} + +// guardedMigration is a db data migration whose application is guarded by a boolean in the meta table +// (after the migration is applied the boolean is set to true, so that the migration is not applied again) +type guardedMigration struct { + guardMetaKey string + db *DB + migration MigrationApplier + alreadyMigrated bool +} + +func newGuardedDataMigration(ctx context.Context, uniqueMigrationName string, factory migrationApplierFactory, db *DB) (Migration, error) { + migrationDB := db.Clone() + if err := migrationDB.Begin(ctx); err != nil { + return nil, err + } + metaKey := "Migration" + uniqueMigrationName + "Done" + previouslyMigrated, err := getMetaBool(ctx, migrationDB.SessionInterface, metaKey) + if err != nil && !errors.Is(err, ErrEmptyDB) { + migrationDB.Rollback() + return nil, err + } + applier, err := factory.New(migrationDB) + if err != nil { + migrationDB.Rollback() + return nil, err + } + guardedMigration := &guardedMigration{ + guardMetaKey: metaKey, + db: migrationDB, + migration: applier, + alreadyMigrated: previouslyMigrated, + } + return guardedMigration, nil +} + +func (g *guardedMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { + if g.alreadyMigrated { + return nil + } + return g.migration.Apply(ctx, meta) +} + +func (g *guardedMigration) Commit(ctx context.Context) error { + if g.alreadyMigrated { + return nil + } + err := setMetaBool(ctx, g.db.SessionInterface, g.guardMetaKey) + if err != nil { + return errors.Join(err, g.Rollback(ctx)) + } + return g.db.Commit() +} + +func (g *guardedMigration) Rollback(ctx context.Context) error { + return g.db.Rollback() +} + +func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) { + factory := newTransactionTableMigration(ctx, logger, cfg.TransactionLedgerRetentionWindow, cfg.NetworkPassphrase) + m, err := newGuardedDataMigration(ctx, "TransactionTable", factory, db) + if err != nil { + return nil, fmt.Errorf("creating guarded transaction migration: %w", err) + } + // Add other migrations here + return multiMigration{m}, nil +} diff --git a/cmd/soroban-rpc/internal/db/migrations/01_init.sql b/cmd/soroban-rpc/internal/db/sqlmigrations/01_init.sql similarity index 100% rename from cmd/soroban-rpc/internal/db/migrations/01_init.sql rename to cmd/soroban-rpc/internal/db/sqlmigrations/01_init.sql diff --git a/cmd/soroban-rpc/internal/db/migrations/02_transactions.sql b/cmd/soroban-rpc/internal/db/sqlmigrations/02_transactions.sql similarity index 100% rename from cmd/soroban-rpc/internal/db/migrations/02_transactions.sql rename to cmd/soroban-rpc/internal/db/sqlmigrations/02_transactions.sql diff --git a/cmd/soroban-rpc/internal/db/transaction.go b/cmd/soroban-rpc/internal/db/transaction.go index 817719ed..5ea51492 100644 --- a/cmd/soroban-rpc/internal/db/transaction.go +++ b/cmd/soroban-rpc/internal/db/transaction.go @@ -45,15 +45,6 @@ type TransactionReader interface { GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) } -func NewTransactionWriter(log *log.Entry, db db.SessionInterface, networkPassphrase string) TransactionWriter { - return &transactionHandler{ - log: log, - db: db, - stmtCache: sq.NewStmtCache(db.GetTx()), - passphrase: networkPassphrase, - } -} - type transactionHandler struct { log *log.Entry db db.SessionInterface @@ -311,3 +302,39 @@ func ParseTransaction(lcm xdr.LedgerCloseMeta, ingestTx ingest.LedgerTransaction return tx, nil } + +type transactionTableMigration struct { + firstLedger uint32 + writer TransactionWriter +} + +func (t *transactionTableMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { + if meta.LedgerSequence() < t.firstLedger { + return nil + } + return t.writer.InsertTransactions(meta) +} + +func newTransactionTableMigration(ctx context.Context, logger *log.Entry, retentionWindow uint32, passphrase string) migrationApplierFactory { + return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) { + latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) + if err != nil && err != ErrEmptyDB { + return nil, errors.Wrap(err, "couldn't get latest ledger sequence") + } + firstLedgerToMigrate := uint32(2) + writer := &transactionHandler{ + log: logger, + db: db.SessionInterface, + stmtCache: sq.NewStmtCache(db.GetTx()), + passphrase: passphrase, + } + if latestLedger > retentionWindow { + firstLedgerToMigrate = latestLedger - retentionWindow + } + migration := transactionTableMigration{ + firstLedger: firstLedgerToMigrate, + writer: writer, + } + return &migration, nil + }) +}