Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add events table migration #262

Merged
merged 27 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from 25 commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b98e0c6
Fix migrations - 1
aditya1702 Jul 31, 2024
2277d08
Make migrations sequential - 1
aditya1702 Aug 8, 2024
78a9e23
Make migrations sequential - 2
aditya1702 Aug 8, 2024
72d26ae
Fix failing unittest - 1
aditya1702 Aug 8, 2024
43e5ec5
Fix linting errors - 1
aditya1702 Aug 8, 2024
6ba9db9
Merge branch 'refs/heads/refactor-get-events' into events-migration
aditya1702 Aug 8, 2024
1088de8
Fix failing integration test - 1
aditya1702 Aug 8, 2024
9cabba6
Remove %w from Fatal strings
aditya1702 Aug 9, 2024
3ad9388
refactor migrationApplierFactoryF
aditya1702 Aug 9, 2024
110b158
Add ledger seq to fatal error string
aditya1702 Aug 9, 2024
de1b40d
Add comments - 1
aditya1702 Aug 9, 2024
234fc21
Fix - 1
aditya1702 Aug 9, 2024
b0f6c57
Merge branch 'refs/heads/refactor-get-events' into events-migration
aditya1702 Aug 9, 2024
949f9ef
Optimise migrations - 1
aditya1702 Aug 9, 2024
54f002f
Optimise migrations - 2
aditya1702 Aug 9, 2024
0caef16
Optimise migrations - 3
aditya1702 Aug 9, 2024
1daaee0
Fix linting - 1
aditya1702 Aug 9, 2024
007c9e0
Fix linting - 2
aditya1702 Aug 9, 2024
314790f
Remove dupicate latest ledger fetch code
aditya1702 Aug 12, 2024
1cdafff
Rollback db in daemon instead of migration
aditya1702 Aug 12, 2024
48d386c
Remove unused constant
aditya1702 Aug 12, 2024
d8252d2
Remove unused constant - 2
aditya1702 Aug 12, 2024
4a84f77
Add rollback() statement
aditya1702 Aug 13, 2024
44eb375
Small change
aditya1702 Aug 13, 2024
d7966a4
Merge branch 'refs/heads/refactor-get-events' into events-migration
aditya1702 Aug 13, 2024
fb738bf
Abstract transaction and rollback management inside migration code
aditya1702 Aug 14, 2024
9a2fa6f
Fix failing unittest
aditya1702 Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
56 changes: 35 additions & 21 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
feewindows := feewindow.NewFeeWindows(
feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
cfg.NetworkPassphrase,
Expand All @@ -299,28 +299,44 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

// Merge migrations range and fee stats range to get the applicable range
latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(readTxMetaCtx)
maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
tamirms marked this conversation as resolved.
Show resolved Hide resolved
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("failed to get latest ledger sequence: %w", err)
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: latestLedger - maxFeeRetentionWindow, LastLedgerSeq: latestLedger}
applicableRange := dataMigrations.ApplicableRange()
ledgerSeqRange = ledgerSeqRange.Merge(applicableRange)
// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

// Start a common db transaction for the entire migration duration
err = d.db.Begin(readTxMetaCtx)
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
if err != nil {
d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback())
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
}
defer func() {
err = d.db.Commit()
if err != nil {
d.logger.WithError(err).Fatal("could not commit database transaction: ", d.db.Rollback())
}
}()

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations: ", d.db.Rollback())
}

// Apply migration for events, transactions and fee stats
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.FirstLedgerSeq,
ledgerSeqRange.LastLedgerSeq,
func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
func(txMeta xdr.LedgerCloseMeta) error {
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
Expand All @@ -332,22 +348,20 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
}).Debug("still initializing in-memory store")
}

if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats: ", d.db.Rollback())
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
}

if applicableRange.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")
}
if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq, ": ", d.db.Rollback())
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database: ", d.db.Rollback())
}
if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations")
d.logger.WithError(err).Fatal("could not commit data migrations ", d.db.Rollback())
}

if currentSeq != 0 {
Expand All @@ -356,7 +370,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
}).Info("finished initializing in-memory store and applying DB data migrations")
}

return feewindows
return feeWindows
}

func (d *Daemon) Run() {
Expand Down
27 changes: 11 additions & 16 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,26 +320,21 @@ func (e *eventTableMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta)
}

func newEventTableMigration(
_ context.Context,
logger *log.Entry,
retentionWindow uint32,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) {
firstLedgerToMigrate := firstLedger
writer := &eventHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}

return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
migration := eventTableMigration{
firstLedger: firstLedgerToMigrate,
lastLedger: latestLedger,
writer: writer,
firstLedger: ledgerSeqRange.FirstLedgerSeq,
lastLedger: ledgerSeqRange.LastLedgerSeq,
writer: &eventHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
},
}
return &migration, nil
})
Expand Down
124 changes: 53 additions & 71 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (

"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
const (
transactionsMigrationName = "TransactionsTable"
eventsMigrationName = "EventsTable"
)

type LedgerSeqRange struct {
Expand Down Expand Up @@ -47,33 +50,34 @@ type MigrationApplier interface {
Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error
}

type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory

type migrationApplierFactory interface {
New(db *DB, latestLedger uint32) (MigrationApplier, error)
New(db *DB) (MigrationApplier, error)
}

type migrationApplierFactoryF func(db *DB, latestLedger uint32) (MigrationApplier, error)
type migrationApplierFactoryF func(db *DB) (MigrationApplier, error)

func (m migrationApplierFactoryF) New(db *DB, latestLedger uint32) (MigrationApplier, error) {
return m(db, latestLedger)
func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) {
return m(db)
}

type Migration interface {
MigrationApplier
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}

type multiMigration []Migration
type MultiMigration []Migration

func (mm multiMigration) ApplicableRange() *LedgerSeqRange {
func (mm MultiMigration) ApplicableRange() *LedgerSeqRange {
var result *LedgerSeqRange
for _, m := range mm {
result = m.ApplicableRange().Merge(result)
}
return result
}

func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error {
func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error {
var err error
for _, m := range mm {
ledgerSeq := meta.LedgerSequence()
Expand All @@ -88,7 +92,7 @@ func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) er
return err
}

func (mm multiMigration) Commit(ctx context.Context) error {
func (mm MultiMigration) Commit(ctx context.Context) error {
var err error
for _, m := range mm {
if localErr := m.Commit(ctx); localErr != nil {
Expand All @@ -98,16 +102,6 @@ func (mm multiMigration) Commit(ctx context.Context) error {
return err
}

func (mm multiMigration) Rollback(ctx context.Context) error {
var err error
for _, m := range mm {
if localErr := m.Rollback(ctx); localErr != nil {
err = errors.Join(err, localErr)
}
}
return err
}

// guardedMigration is a db data migration whose application is guarded by a boolean in the meta table
// (after the migration is applied the boolean is set to true, so that the migration is not applied again)
type guardedMigration struct {
Expand All @@ -122,32 +116,18 @@ type guardedMigration struct {
func newGuardedDataMigration(
ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB,
) (Migration, error) {
migrationDB := &DB{
cache: db.cache,
SessionInterface: db.SessionInterface.Clone(),
}
if err := migrationDB.Begin(ctx); err != nil {
return nil, err
}
metaKey := "Migration" + uniqueMigrationName + "Done"
previouslyMigrated, err := getMetaBool(ctx, migrationDB, metaKey)
previouslyMigrated, err := getMetaBool(ctx, db, metaKey)
if err != nil && !errors.Is(err, ErrEmptyDB) {
err = errors.Join(err, migrationDB.Rollback())
return nil, err
}
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
err = errors.Join(err, migrationDB.Rollback())
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
applier, err := factory.New(migrationDB, latestLedger)
applier, err := factory.New(db)
if err != nil {
err = errors.Join(err, migrationDB.Rollback())
return nil, err
}
guardedMigration := &guardedMigration{
guardMetaKey: metaKey,
db: migrationDB,
db: db,
migration: applier,
alreadyMigrated: previouslyMigrated,
logger: logger,
Expand Down Expand Up @@ -179,46 +159,48 @@ func (g *guardedMigration) Commit(ctx context.Context) error {
if g.alreadyMigrated {
return nil
}
err := setMetaBool(ctx, g.db, g.guardMetaKey, true)
if err != nil {
return errors.Join(err, g.Rollback(ctx))
}
return g.db.Commit()
return setMetaBool(ctx, g.db, g.guardMetaKey, true)
}

func (g *guardedMigration) Rollback(_ context.Context) error {
return g.db.Rollback()
func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) {
firstLedgerToMigrate := firstLedger
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return &LedgerSeqRange{
FirstLedgerSeq: firstLedgerToMigrate,
LastLedgerSeq: latestLedger,
}, nil
}

func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) {
var migrations []Migration
func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange *LedgerSeqRange,
) (MultiMigration, error) {
migrationNameToFunc := map[string]migrationApplierF{
transactionsMigrationName: newTransactionTableMigration,
eventsMigrationName: newEventTableMigration,
}

migrationName := "TransactionsTable"
logger = logger.WithField("migration", migrationName)
factory := newTransactionTableMigration(
ctx,
logger,
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
)
migrations := make(MultiMigration, 0, len(migrationNameToFunc))

m1, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db)
if err != nil {
return nil, fmt.Errorf("creating guarded transaction migration: %w", err)
}
migrations = append(migrations, m1)
for migrationName, migrationFunc := range migrationNameToFunc {
migrationLogger := logger.WithField("migration", migrationName)
factory := migrationFunc(
ctx,
migrationLogger,
networkPassphrase,
ledgerSeqRange,
)

eventMigrationName := "EventsTable"
eventFactory := newEventTableMigration(
logger.WithField("migration", eventMigrationName),
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
)
m2, err := newGuardedDataMigration(ctx, eventMigrationName, logger, eventFactory, db)
if err != nil {
return nil, fmt.Errorf("creating guarded transaction migration: %w", err)
guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db)
if err != nil {
return nil, fmt.Errorf("could not create guarded migration for %s: %w", migrationName, err)
}
migrations = append(migrations, guardedM)
}
migrations = append(migrations, m2)

return multiMigration(migrations), nil
return migrations, nil
}
30 changes: 14 additions & 16 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,30 +266,28 @@ func (t *transactionTableMigration) Apply(_ context.Context, meta xdr.LedgerClos
return t.writer.InsertTransactions(meta)
}

func newTransactionTableMigration(ctx context.Context, logger *log.Entry,
retentionWindow uint32, passphrase string,
func newTransactionTableMigration(
ctx context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) {
firstLedgerToMigrate := uint32(2) //nolint:mnd
writer := &transactionHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
// Truncate the table, since it may contain data, causing insert conflicts later on.
// (the migration was shipped after the actual transactions table change)
_, err := db.Exec(ctx, sq.Delete(transactionTableName))
if err != nil {
return nil, fmt.Errorf("couldn't delete table %q: %w", transactionTableName, err)
}
migration := transactionTableMigration{
firstLedger: firstLedgerToMigrate,
lastLedger: latestLedger,
writer: writer,
firstLedger: ledgerSeqRange.FirstLedgerSeq,
lastLedger: ledgerSeqRange.LastLedgerSeq,
writer: &transactionHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
},
}
return &migration, nil
})
Expand Down
Loading