Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add events table migration #262

Merged
merged 27 commits into from
Aug 16, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
b98e0c6
Fix migrations - 1
aditya1702 Jul 31, 2024
2277d08
Make migrations sequential - 1
aditya1702 Aug 8, 2024
78a9e23
Make migrations sequential - 2
aditya1702 Aug 8, 2024
72d26ae
Fix failing unittest - 1
aditya1702 Aug 8, 2024
43e5ec5
Fix linting errors - 1
aditya1702 Aug 8, 2024
6ba9db9
Merge branch 'refs/heads/refactor-get-events' into events-migration
aditya1702 Aug 8, 2024
1088de8
Fix failing integration test - 1
aditya1702 Aug 8, 2024
9cabba6
Remove %w from Fatal strings
aditya1702 Aug 9, 2024
3ad9388
refactor migrationApplierFactoryF
aditya1702 Aug 9, 2024
110b158
Add ledger seq to fatal error string
aditya1702 Aug 9, 2024
de1b40d
Add comments - 1
aditya1702 Aug 9, 2024
234fc21
Fix - 1
aditya1702 Aug 9, 2024
b0f6c57
Merge branch 'refs/heads/refactor-get-events' into events-migration
aditya1702 Aug 9, 2024
949f9ef
Optimise migrations - 1
aditya1702 Aug 9, 2024
54f002f
Optimise migrations - 2
aditya1702 Aug 9, 2024
0caef16
Optimise migrations - 3
aditya1702 Aug 9, 2024
1daaee0
Fix linting - 1
aditya1702 Aug 9, 2024
007c9e0
Fix linting - 2
aditya1702 Aug 9, 2024
314790f
Remove dupicate latest ledger fetch code
aditya1702 Aug 12, 2024
1cdafff
Rollback db in daemon instead of migration
aditya1702 Aug 12, 2024
48d386c
Remove unused constant
aditya1702 Aug 12, 2024
d8252d2
Remove unused constant - 2
aditya1702 Aug 12, 2024
4a84f77
Add rollback() statement
aditya1702 Aug 13, 2024
44eb375
Small change
aditya1702 Aug 13, 2024
d7966a4
Merge branch 'refs/heads/refactor-get-events' into events-migration
aditya1702 Aug 13, 2024
fb738bf
Abstract transaction and rollback management inside migration code
aditya1702 Aug 14, 2024
9a2fa6f
Fix failing unittest
aditya1702 Aug 14, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 21 additions & 18 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,38 +289,43 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
feewindows := feewindow.NewFeeWindows(
feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
cfg.NetworkPassphrase,
d.db,
)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

// Merge migrations range and fee stats range to get the applicable range
latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(readTxMetaCtx)
maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
tamirms marked this conversation as resolved.
Show resolved Hide resolved
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("failed to get latest ledger sequence: %w", err)
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: latestLedger - maxFeeRetentionWindow, LastLedgerSeq: latestLedger}
applicableRange := dataMigrations.ApplicableRange()
ledgerSeqRange = ledgerSeqRange.Merge(applicableRange)
// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}

// Apply migration for events, transactions and fee stats
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.FirstLedgerSeq,
ledgerSeqRange.LastLedgerSeq,
func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
func(txMeta xdr.LedgerCloseMeta) error {
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
Expand All @@ -332,14 +337,12 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
}).Debug("still initializing in-memory store")
}

if err := feewindows.IngestFees(txmeta); err != nil {
if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}

if applicableRange.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")
}
if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}
return nil
})
Expand All @@ -356,7 +359,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows
}).Info("finished initializing in-memory store and applying DB data migrations")
}

return feewindows
return feeWindows
}

func (d *Daemon) Run() {
Expand Down
27 changes: 11 additions & 16 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -320,26 +320,21 @@ func (e *eventTableMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta)
}

func newEventTableMigration(
_ context.Context,
logger *log.Entry,
retentionWindow uint32,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) {
firstLedgerToMigrate := firstLedger
writer := &eventHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}

return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
migration := eventTableMigration{
firstLedger: firstLedgerToMigrate,
lastLedger: latestLedger,
writer: writer,
firstLedger: ledgerSeqRange.FirstLedgerSeq,
lastLedger: ledgerSeqRange.LastLedgerSeq,
writer: &eventHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
},
}
return &migration, nil
})
Expand Down
147 changes: 71 additions & 76 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,11 @@ import (

"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
)

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
const (
transactionsMigrationName = "TransactionsTable"
eventsMigrationName = "EventsTable"
)

type LedgerSeqRange struct {
Expand Down Expand Up @@ -47,65 +50,59 @@ type MigrationApplier interface {
Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error
}

type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory

type migrationApplierFactory interface {
New(db *DB, latestLedger uint32) (MigrationApplier, error)
New(db *DB) (MigrationApplier, error)
}

type migrationApplierFactoryF func(db *DB, latestLedger uint32) (MigrationApplier, error)
type migrationApplierFactoryF func(db *DB) (MigrationApplier, error)

func (m migrationApplierFactoryF) New(db *DB, latestLedger uint32) (MigrationApplier, error) {
return m(db, latestLedger)
func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) {
return m(db)
}

type Migration interface {
MigrationApplier
Commit(ctx context.Context) error
Rollback(ctx context.Context) error
}

type multiMigration []Migration
type MultiMigration struct {
migrations []Migration
db *DB
}

func (mm multiMigration) ApplicableRange() *LedgerSeqRange {
func (mm MultiMigration) ApplicableRange() *LedgerSeqRange {
var result *LedgerSeqRange
for _, m := range mm {
for _, m := range mm.migrations {
result = m.ApplicableRange().Merge(result)
}
return result
}

func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error {
func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error {
var err error
for _, m := range mm {
for _, m := range mm.migrations {
ledgerSeq := meta.LedgerSequence()
if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) {
// The range of a sub-migration can be smaller than the global range.
continue
}
if localErr := m.Apply(ctx, meta); localErr != nil {
err = errors.Join(err, localErr)
err = errors.Join(err, localErr, mm.db.Rollback())
}
}
return err
}

func (mm multiMigration) Commit(ctx context.Context) error {
func (mm MultiMigration) Commit(ctx context.Context) error {
var err error
for _, m := range mm {
for _, m := range mm.migrations {
if localErr := m.Commit(ctx); localErr != nil {
err = errors.Join(err, localErr)
}
}
return err
}

func (mm multiMigration) Rollback(ctx context.Context) error {
var err error
for _, m := range mm {
if localErr := m.Rollback(ctx); localErr != nil {
err = errors.Join(err, localErr)
err = errors.Join(err, localErr, mm.db.Rollback())
}
}
return err
return mm.db.Commit()
}

// guardedMigration is a db data migration whose application is guarded by a boolean in the meta table
Expand All @@ -122,32 +119,18 @@ type guardedMigration struct {
func newGuardedDataMigration(
ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB,
) (Migration, error) {
migrationDB := &DB{
cache: db.cache,
SessionInterface: db.SessionInterface.Clone(),
}
if err := migrationDB.Begin(ctx); err != nil {
return nil, err
}
metaKey := "Migration" + uniqueMigrationName + "Done"
previouslyMigrated, err := getMetaBool(ctx, migrationDB, metaKey)
previouslyMigrated, err := getMetaBool(ctx, db, metaKey)
if err != nil && !errors.Is(err, ErrEmptyDB) {
err = errors.Join(err, migrationDB.Rollback())
return nil, err
}
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
err = errors.Join(err, migrationDB.Rollback())
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
applier, err := factory.New(migrationDB, latestLedger)
applier, err := factory.New(db)
if err != nil {
err = errors.Join(err, migrationDB.Rollback())
return nil, err
}
guardedMigration := &guardedMigration{
guardMetaKey: metaKey,
db: migrationDB,
db: db,
migration: applier,
alreadyMigrated: previouslyMigrated,
logger: logger,
Expand Down Expand Up @@ -179,46 +162,58 @@ func (g *guardedMigration) Commit(ctx context.Context) error {
if g.alreadyMigrated {
return nil
}
err := setMetaBool(ctx, g.db, g.guardMetaKey, true)
if err != nil {
return errors.Join(err, g.Rollback(ctx))
}
return g.db.Commit()
return setMetaBool(ctx, g.db, g.guardMetaKey, true)
}

func (g *guardedMigration) Rollback(_ context.Context) error {
return g.db.Rollback()
func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) {
firstLedgerToMigrate := firstLedger
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && !errors.Is(err, ErrEmptyDB) {
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return &LedgerSeqRange{
FirstLedgerSeq: firstLedgerToMigrate,
LastLedgerSeq: latestLedger,
}, nil
}

func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) {
var migrations []Migration

migrationName := "TransactionsTable"
logger = logger.WithField("migration", migrationName)
factory := newTransactionTableMigration(
ctx,
logger,
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
)

m1, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db)
func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string,
ledgerSeqRange *LedgerSeqRange,
) (MultiMigration, error) {
// Start a common db transaction for the entire migration duration
err := db.Begin(ctx)
if err != nil {
return nil, fmt.Errorf("creating guarded transaction migration: %w", err)
return MultiMigration{}, errors.Join(err, db.Rollback())
}
migrations = append(migrations, m1)

eventMigrationName := "EventsTable"
eventFactory := newEventTableMigration(
logger.WithField("migration", eventMigrationName),
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
)
m2, err := newGuardedDataMigration(ctx, eventMigrationName, logger, eventFactory, db)
if err != nil {
return nil, fmt.Errorf("creating guarded transaction migration: %w", err)
migrationNameToFunc := map[string]migrationApplierF{
transactionsMigrationName: newTransactionTableMigration,
eventsMigrationName: newEventTableMigration,
}
migrations = append(migrations, m2)

return multiMigration(migrations), nil
migrations := make([]Migration, 0, len(migrationNameToFunc))

for migrationName, migrationFunc := range migrationNameToFunc {
migrationLogger := logger.WithField("migration", migrationName)
factory := migrationFunc(
ctx,
migrationLogger,
networkPassphrase,
ledgerSeqRange,
)

guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db)
if err != nil {
return MultiMigration{}, errors.Join(fmt.Errorf(
"could not create guarded migration for %s: %w", migrationName, err), db.Rollback())
}
migrations = append(migrations, guardedM)
}
return MultiMigration{
migrations: migrations,
db: db,
}, nil
}
30 changes: 14 additions & 16 deletions cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -266,30 +266,28 @@ func (t *transactionTableMigration) Apply(_ context.Context, meta xdr.LedgerClos
return t.writer.InsertTransactions(meta)
}

func newTransactionTableMigration(ctx context.Context, logger *log.Entry,
retentionWindow uint32, passphrase string,
func newTransactionTableMigration(
ctx context.Context,
logger *log.Entry,
passphrase string,
ledgerSeqRange *LedgerSeqRange,
) migrationApplierFactory {
return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) {
firstLedgerToMigrate := uint32(2) //nolint:mnd
writer := &transactionHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
}
if latestLedger > retentionWindow {
firstLedgerToMigrate = latestLedger - retentionWindow
}
return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) {
// Truncate the table, since it may contain data, causing insert conflicts later on.
// (the migration was shipped after the actual transactions table change)
_, err := db.Exec(ctx, sq.Delete(transactionTableName))
if err != nil {
return nil, fmt.Errorf("couldn't delete table %q: %w", transactionTableName, err)
}
migration := transactionTableMigration{
firstLedger: firstLedgerToMigrate,
lastLedger: latestLedger,
writer: writer,
firstLedger: ledgerSeqRange.FirstLedgerSeq,
lastLedger: ledgerSeqRange.LastLedgerSeq,
writer: &transactionHandler{
log: logger,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: passphrase,
},
}
return &migration, nil
})
Expand Down
Loading
Loading