Skip to content

Commit

Permalink
Migrate data to new transactions table (#207)
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio authored Jun 14, 2024
1 parent 2570fd2 commit f7580ad
Show file tree
Hide file tree
Showing 28 changed files with 789 additions and 359 deletions.
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
make build-libpreflight
- name: Run golangci-lint
uses: golangci/golangci-lint-action@537aa1903e5d359d0b27dbc19ddd22c5087f3fbc # version v3.2.0
uses: golangci/golangci-lint-action@a4f60bb28d35aeee14e6880718e0c85ff1882e64 # version v6.0.1
with:
version: v1.52.2 # this is the golangci-lint version
args: --issues-exit-code=0 # exit without errors for now - won't fail the build
Expand Down
107 changes: 64 additions & 43 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,49 +187,7 @@ func MustNew(cfg *config.Config) *Daemon {
}, metricsRegistry),
}

eventStore := events.NewMemoryStore(
daemon,
cfg.NetworkPassphrase,
cfg.EventLedgerRetentionWindow,
)
feewindows := feewindow.NewFeeWindows(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase)

// initialize the stores using what was on the DB
readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
var initialSeq uint32
var currentSeq uint32
err = db.NewLedgerReader(dbConn).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := feewindows.IngestFees(txmeta); err != nil {
logger.WithError(err).Fatal("could not initialize fee stats")
}
return nil
})
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
if currentSeq != 0 {
logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("finished initializing in-memory store")
}
feewindows, eventStore := daemon.mustInitializeStorage(cfg)

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
Expand Down Expand Up @@ -317,6 +275,69 @@ func MustNew(cfg *config.Config) *Daemon {
return daemon
}

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) {
eventStore := events.NewMemoryStore(
d,
cfg.NetworkPassphrase,
cfg.EventLedgerRetentionWindow,
)
feewindows := feewindow.NewFeeWindows(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
// TODO: clean up once we remove the in-memory storage.
// (we should only stream over the required range)
if r := dataMigrations.ApplicableRange(); r.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")
}
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations")
}

if currentSeq != 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("finished initializing in-memory store")
}

return feewindows, eventStore
}

func (d *Daemon) Run() {
d.logger.WithFields(supportlog.F{
"addr": d.server.Addr,
Expand Down
64 changes: 44 additions & 20 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
)

//go:embed migrations/*.sql
var migrations embed.FS
//go:embed sqlmigrations/*.sql
var sqlMigrations embed.FS

var ErrEmptyDB = errors.New("DB is empty")

Expand Down Expand Up @@ -52,7 +53,7 @@ type dbCache struct {

type DB struct {
db.SessionInterface
cache dbCache
cache *dbCache
}

func openSQLiteDB(dbFilePath string) (*db.Session, error) {
Expand All @@ -65,9 +66,9 @@ func openSQLiteDB(dbFilePath string) (*db.Session, error) {
return nil, errors.Wrap(err, "open failed")
}

if err = runMigrations(session.DB.DB, "sqlite3"); err != nil {
if err = runSQLMigrations(session.DB.DB, "sqlite3"); err != nil {
_ = session.Close()
return nil, errors.Wrap(err, "could not run migrations")
return nil, errors.Wrap(err, "could not run SQL migrations")
}
return session, nil
}
Expand All @@ -79,7 +80,7 @@ func OpenSQLiteDBWithPrometheusMetrics(dbFilePath string, namespace string, sub
}
result := DB{
SessionInterface: db.RegisterMetrics(session, namespace, sub, registry),
cache: dbCache{
cache: &dbCache{
ledgerEntries: newTransactionalCache(),
},
}
Expand All @@ -93,28 +94,50 @@ func OpenSQLiteDB(dbFilePath string) (*DB, error) {
}
result := DB{
SessionInterface: session,
cache: dbCache{
cache: &dbCache{
ledgerEntries: newTransactionalCache(),
},
}
return &result, nil
}

func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *dbCache) (uint32, error) {
sql := sq.Select("value").From(metaTableName).Where(sq.Eq{"key": latestLedgerSequenceMetaKey})
func getMetaBool(ctx context.Context, q db.SessionInterface, key string) (bool, error) {
valueStr, err := getMetaValue(ctx, q, key)
if err != nil {
return false, err
}
return strconv.ParseBool(valueStr)
}

func setMetaBool(ctx context.Context, q db.SessionInterface, key string, value bool) error {
query := sq.Replace(metaTableName).
Values(key, strconv.FormatBool(value))
_, err := q.Exec(ctx, query)
return err
}

func getMetaValue(ctx context.Context, q db.SessionInterface, key string) (string, error) {
sql := sq.Select("value").From(metaTableName).Where(sq.Eq{"key": key})
var results []string
if err := q.Select(ctx, &results, sql); err != nil {
return 0, err
return "", err
}
switch len(results) {
case 0:
return 0, ErrEmptyDB
return "", ErrEmptyDB
case 1:
// expected length on an initialized DB
default:
return 0, fmt.Errorf("multiple entries (%d) for key %q in table %q", len(results), latestLedgerSequenceMetaKey, metaTableName)
return "", fmt.Errorf("multiple entries (%d) for key %q in table %q", len(results), latestLedgerSequenceMetaKey, metaTableName)
}
return results[0], nil
}

func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *dbCache) (uint32, error) {
latestLedgerStr, err := getMetaValue(ctx, q, latestLedgerSequenceMetaKey)
if err != nil {
return 0, err
}
latestLedgerStr := results[0]
latestLedger, err := strconv.ParseUint(latestLedgerStr, 10, 32)
if err != nil {
return 0, err
Expand All @@ -125,7 +148,7 @@ func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *
// Otherwise, the write-through cache won't get updated until the first ingestion commit
cache.Lock()
if cache.latestLedgerSeq == 0 {
// Only update the cache if value is missing (0), otherwise
// Only update the cache if the value is missing (0), otherwise
// we may end up overwriting the entry with an older version
cache.latestLedgerSeq = result
}
Expand Down Expand Up @@ -192,7 +215,7 @@ func NewReadWriter(
}

func (rw *readWriter) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, rw.db, &rw.db.cache)
return getLatestLedgerSequence(ctx, rw.db, rw.db.cache)
}

func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
Expand All @@ -204,8 +227,9 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {

db := rw.db
writer := writeTx{
globalCache: &db.cache,
globalCache: db.cache,
postCommit: func() error {
// TODO: this is sqlite-only, it shouldn't be here
_, err := db.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
return err
},
Expand Down Expand Up @@ -308,12 +332,12 @@ func (w writeTx) Rollback() error {
}
}

func runMigrations(db *sql.DB, dialect string) error {
func runSQLMigrations(db *sql.DB, dialect string) error {
m := &migrate.AssetMigrationSource{
Asset: migrations.ReadFile,
Asset: sqlMigrations.ReadFile,
AssetDir: func() func(string) ([]string, error) {
return func(path string) ([]string, error) {
dirEntry, err := migrations.ReadDir(path)
dirEntry, err := sqlMigrations.ReadDir(path)
if err != nil {
return nil, err
}
Expand All @@ -325,7 +349,7 @@ func runMigrations(db *sql.DB, dialect string) error {
return entries, nil
}
}(),
Dir: "migrations",
Dir: "sqlmigrations",
}
_, err := migrate.ExecMax(db, dialect, m, migrate.Up, 0)
return err
Expand Down
13 changes: 4 additions & 9 deletions cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,12 @@ import (
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/stellar/go/network"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
)

Expand Down Expand Up @@ -106,16 +108,9 @@ func NewTestDB(tb testing.TB) *DB {
tmp := tb.TempDir()
dbPath := path.Join(tmp, "db.sqlite")
db, err := OpenSQLiteDB(dbPath)
if err != nil {
assert.NoError(tb, db.Close())
}
require.NoError(tb, err)
tb.Cleanup(func() {
assert.NoError(tb, db.Close())
})
return &DB{
SessionInterface: db,
cache: dbCache{
ledgerEntries: newTransactionalCache(),
},
}
return db
}
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/db/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,7 +341,7 @@ func NewLedgerEntryReader(db *DB) LedgerEntryReader {
}

func (r ledgerEntryReader) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, r.db, &r.db.cache)
return getLatestLedgerSequence(ctx, r.db, r.db.cache)
}

// NewCachedTx() caches all accessed ledger entries and select statements. If many ledger entries are accessed, it will grow without bounds.
Expand All @@ -360,7 +360,7 @@ func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx,
}
cacheReadTx := r.db.cache.ledgerEntries.newReadTx()
return &ledgerEntryReadTx{
globalCache: &r.db.cache,
globalCache: r.db.cache,
stmtCache: sq.NewStmtCache(txSession.GetTx()),
latestLedgerSeqCache: r.db.cache.latestLedgerSeq,
ledgerEntryCacheReadTx: &cacheReadTx,
Expand All @@ -377,7 +377,7 @@ func (r ledgerEntryReader) NewTx(ctx context.Context) (LedgerEntryReadTx, error)
r.db.cache.RLock()
defer r.db.cache.RUnlock()
return &ledgerEntryReadTx{
globalCache: &r.db.cache,
globalCache: r.db.cache,
latestLedgerSeqCache: r.db.cache.latestLedgerSeq,
tx: txSession,
buffer: xdr.NewEncodingBuffer(),
Expand Down
Loading

0 comments on commit f7580ad

Please sign in to comment.