Skip to content

Commit

Permalink
Factor out transaction table migration cleanly
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jun 11, 2024
1 parent 61dff8b commit 7b40026
Show file tree
Hide file tree
Showing 10 changed files with 214 additions and 95 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
make build-libpreflight
- name: Run golangci-lint
uses: golangci/golangci-lint-action@537aa1903e5d359d0b27dbc19ddd22c5087f3fbc # version v3.2.0
uses: golangci/golangci-lint-action@a4f60bb28d35aeee14e6880718e0c85ff1882e64 # version v6.0.1
with:
version: v1.52.2 # this is the golangci-lint version
args: --issues-exit-code=0 # exit without errors for now - won't fail the build
Expand Down
69 changes: 11 additions & 58 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ func MustNew(cfg *config.Config) *Daemon {
Logger: logger,
LedgerReader: db.NewLedgerReader(dbConn),
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase),
TransactionReader: db.NewTransactionReader(logger, dbConn.SessionInterface, cfg.NetworkPassphrase),
PreflightGetter: preflightWorkerPool,
})

Expand Down Expand Up @@ -290,11 +290,14 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
migration, migrationDone := d.newTxMigration(readTxMetaCtx, cfg)
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
err := db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
Expand All @@ -312,16 +315,17 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
if err := migration(txmeta); err != nil {
// TODO: we should only migrate the transaction range
d.logger.WithError(err).Fatal("could not run migration")
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
migrationDone()
if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations")
}

if currentSeq != 0 {
d.logger.WithFields(supportlog.F{
Expand All @@ -332,57 +336,6 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
return feewindows, eventStore
}

// TODO: We should probably implement the migrations somewhere else
type migrationFunc func(txmeta xdr.LedgerCloseMeta) error
type migrationDoneFunc func()

func (d *Daemon) newTxMigration(ctx context.Context, cfg *config.Config) (migrationFunc, migrationDoneFunc) {
migrationSession := d.db.Clone()
if err := migrationSession.Begin(ctx); err != nil {
d.logger.WithError(err).Fatal("could not start migration session")
}
migration := func(txmeta xdr.LedgerCloseMeta) error { return nil }
migrationDone := func() {}
previouslyMigrated, err := db.GetMetaBool(ctx, migrationSession, transactionsTableMigrationDoneMetaKey)
if err != nil {
if !errors.Is(err, db.ErrEmptyDB) {
d.logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not get metadata")
}
} else if previouslyMigrated {
migrationSession.Rollback()
return migration, migrationDone
}

d.logger.Info("migrating transactions to new backend")
writer := db.NewTransactionWriter(d.logger, migrationSession, cfg.NetworkPassphrase)
latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(ctx)
if err != nil || err != db.ErrEmptyDB {
d.logger.WithError(err).Fatal("cannot read latest ledger")
}
firstLedgerToMigrate := uint32(2)
if latestLedger > cfg.TransactionLedgerRetentionWindow {
firstLedgerToMigrate = latestLedger - cfg.TransactionLedgerRetentionWindow
}
migration = func(txmeta xdr.LedgerCloseMeta) error {
if txmeta.LedgerSequence() < firstLedgerToMigrate {
return nil
}
return writer.InsertTransactions(txmeta)
}
migrationDone = func() {
err := db.SetMetaBool(ctx, migrationSession, transactionsTableMigrationDoneMetaKey)
if err != nil {
d.logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not set metadata")
migrationSession.Rollback()
return
}
if err = migrationSession.Commit(); err != nil {
d.logger.WithError(err).Error("could not commit migration session")
}
}
return migration, migrationDone
}

func (d *Daemon) Run() {
d.logger.WithFields(supportlog.F{
"addr": d.server.Addr,
Expand Down
41 changes: 24 additions & 17 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,8 @@ import (
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
)

//go:embed migrations/*.sql
var migrations embed.FS
//go:embed sqlmigrations/*.sql
var sqlMigrations embed.FS

var ErrEmptyDB = errors.New("DB is empty")

Expand Down Expand Up @@ -53,7 +53,14 @@ type dbCache struct {

type DB struct {
db.SessionInterface
cache dbCache
cache *dbCache
}

func (db *DB) Clone() *DB {
return &DB{
SessionInterface: db.SessionInterface.Clone(),
cache: db.cache,
}
}

func openSQLiteDB(dbFilePath string) (*db.Session, error) {
Expand All @@ -66,9 +73,9 @@ func openSQLiteDB(dbFilePath string) (*db.Session, error) {
return nil, errors.Wrap(err, "open failed")
}

if err = runMigrations(session.DB.DB, "sqlite3"); err != nil {
if err = runSQLMigrations(session.DB.DB, "sqlite3"); err != nil {
_ = session.Close()
return nil, errors.Wrap(err, "could not run migrations")
return nil, errors.Wrap(err, "could not run SQL migrations")
}
return session, nil
}
Expand All @@ -80,7 +87,7 @@ func OpenSQLiteDBWithPrometheusMetrics(dbFilePath string, namespace string, sub
}
result := DB{
SessionInterface: db.RegisterMetrics(session, namespace, sub, registry),
cache: dbCache{
cache: &dbCache{
ledgerEntries: newTransactionalCache(),
},
}
Expand All @@ -94,22 +101,22 @@ func OpenSQLiteDB(dbFilePath string) (*DB, error) {
}
result := DB{
SessionInterface: session,
cache: dbCache{
cache: &dbCache{
ledgerEntries: newTransactionalCache(),
},
}
return &result, nil
}

func GetMetaBool(ctx context.Context, q db.SessionInterface, key string) (bool, error) {
func getMetaBool(ctx context.Context, q db.SessionInterface, key string) (bool, error) {
valueStr, err := getMetaValue(ctx, q, key)
if err != nil {
return false, err
}
return strconv.ParseBool(valueStr)
}

func SetMetaBool(ctx context.Context, q db.SessionInterface, key string) error {
func setMetaBool(ctx context.Context, q db.SessionInterface, key string) error {
query := sq.Replace(metaTableName).
Values(key, "true")
_, err := q.Exec(ctx, query)
Expand Down Expand Up @@ -148,7 +155,7 @@ func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *
// Otherwise, the write-through cache won't get updated until the first ingestion commit
cache.Lock()
if cache.latestLedgerSeq == 0 {
// Only update the cache if value is missing (0), otherwise
// Only update the cache if the value is missing (0), otherwise
// we may end up overwriting the entry with an older version
cache.latestLedgerSeq = result
}
Expand Down Expand Up @@ -215,19 +222,19 @@ func NewReadWriter(
}

func (rw *readWriter) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, rw.db, &rw.db.cache)
return getLatestLedgerSequence(ctx, rw.db.SessionInterface, rw.db.cache)
}

func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
txSession := rw.db.Clone()
txSession := rw.db.SessionInterface.Clone()
if err := txSession.Begin(ctx); err != nil {
return nil, err
}
stmtCache := sq.NewStmtCache(txSession.GetTx())

db := rw.db
writer := writeTx{
globalCache: &db.cache,
globalCache: db.cache,
postCommit: func() error {
// TODO: this is sqlite-only, it shouldn't be here
_, err := db.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
Expand Down Expand Up @@ -332,12 +339,12 @@ func (w writeTx) Rollback() error {
}
}

func runMigrations(db *sql.DB, dialect string) error {
func runSQLMigrations(db *sql.DB, dialect string) error {
m := &migrate.AssetMigrationSource{
Asset: migrations.ReadFile,
Asset: sqlMigrations.ReadFile,
AssetDir: func() func(string) ([]string, error) {
return func(path string) ([]string, error) {
dirEntry, err := migrations.ReadDir(path)
dirEntry, err := sqlMigrations.ReadDir(path)
if err != nil {
return nil, err
}
Expand All @@ -349,7 +356,7 @@ func runMigrations(db *sql.DB, dialect string) error {
return entries, nil
}
}(),
Dir: "migrations",
Dir: "sqlmigrations",
}
_, err := migrate.ExecMax(db, dialect, m, migrate.Up, 0)
return err
Expand Down
5 changes: 3 additions & 2 deletions cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stellar/go/network"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
)

Expand Down Expand Up @@ -113,8 +114,8 @@ func NewTestDB(tb testing.TB) *DB {
assert.NoError(tb, db.Close())
})
return &DB{
SessionInterface: db,
cache: dbCache{
SessionInterface: db.SessionInterface,
cache: &dbCache{
ledgerEntries: newTransactionalCache(),
},
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/soroban-rpc/internal/db/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,12 @@ func NewLedgerEntryReader(db *DB) LedgerEntryReader {
}

func (r ledgerEntryReader) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, r.db, &r.db.cache)
return getLatestLedgerSequence(ctx, r.db.SessionInterface, r.db.cache)
}

// NewCachedTx() caches all accessed ledger entries and select statements. If many ledger entries are accessed, it will grow without bounds.
func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx, error) {
txSession := r.db.Clone()
txSession := r.db.SessionInterface.Clone()
// We need to copy the cached ledger entries locally when we start the transaction
// since otherwise we would break the consistency between the transaction and the cache.

Expand All @@ -360,7 +360,7 @@ func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx,
}
cacheReadTx := r.db.cache.ledgerEntries.newReadTx()
return &ledgerEntryReadTx{
globalCache: &r.db.cache,
globalCache: r.db.cache,
stmtCache: sq.NewStmtCache(txSession.GetTx()),
latestLedgerSeqCache: r.db.cache.latestLedgerSeq,
ledgerEntryCacheReadTx: &cacheReadTx,
Expand All @@ -370,14 +370,14 @@ func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx,
}

func (r ledgerEntryReader) NewTx(ctx context.Context) (LedgerEntryReadTx, error) {
txSession := r.db.Clone()
txSession := r.db.SessionInterface.Clone()
if err := txSession.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}); err != nil {
return nil, err
}
r.db.cache.RLock()
defer r.db.cache.RUnlock()
return &ledgerEntryReadTx{
globalCache: &r.db.cache,
globalCache: r.db.cache,
latestLedgerSeqCache: r.db.cache.latestLedgerSeq,
tx: txSession,
buffer: xdr.NewEncodingBuffer(),
Expand Down
Loading

0 comments on commit 7b40026

Please sign in to comment.