Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Migrate data to new transactions table #207

Merged
merged 37 commits into from
Jun 14, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
Show all changes
37 commits
Select commit Hold shift + click to select a range
b3f414c
First stab
2opremio Jun 10, 2024
daf4ca8
Fix query
2opremio Jun 10, 2024
ce0fe5e
Cleanup storage initialization
2opremio Jun 11, 2024
61dff8b
Clean up the migration even more
2opremio Jun 11, 2024
8f3abca
Factor out transaction table migration cleanly
2opremio Jun 11, 2024
3776163
Truncate table before migration
2opremio Jun 11, 2024
8940d75
Add applicable ranges to migrations
2opremio Jun 11, 2024
ddc4035
Address review feedback
2opremio Jun 11, 2024
e3711bf
Remove DB Clone() method
2opremio Jun 11, 2024
6381655
Cleanup ledger sequence ranges even further
2opremio Jun 11, 2024
52ba0ac
Add comments and make sure multiMigration only applies needed migrations
2opremio Jun 12, 2024
2422422
Add infrastructure to run integration tests from the soroban-rpc process
2opremio Jun 12, 2024
d1e790c
Use docker container instead
2opremio Jun 13, 2024
4f90b57
blacklist certain docker versions
2opremio Jun 13, 2024
567dd0d
Tweak comment
2opremio Jun 13, 2024
7f84990
Fix bug setting core image
2opremio Jun 13, 2024
c1620d2
Tweak comment
2opremio Jun 13, 2024
dbe9bcd
Merge branch 'main' into transactions-db-migration
2opremio Jun 13, 2024
670d3d8
Multiple cleanups
2opremio Jun 13, 2024
d922aff
Debug why RPC keeps unhealthy in CI
2opremio Jun 13, 2024
9e581ab
Print backtrace when RPC fails
2opremio Jun 13, 2024
c624ce1
Increase timeout
2opremio Jun 13, 2024
8908c62
Fix goroutine leak
2opremio Jun 13, 2024
ce7194d
Remove JSONRPC client leaks
2opremio Jun 13, 2024
3773bf8
Bring down the wait to 30 seconds again
2opremio Jun 13, 2024
3f4ca3f
Traceback ancestors in goroutine dumps
2opremio Jun 13, 2024
c1d4d77
Allow up to 1MB for the stacktrace
2opremio Jun 13, 2024
a0e7bbe
Print if someone is listening
2opremio Jun 13, 2024
36c66c6
Refresh soroban rpc client on every failure
2opremio Jun 13, 2024
078fe48
Remove debig leftover
2opremio Jun 13, 2024
5c4c87e
Debug RPC container
2opremio Jun 13, 2024
410fd0d
Add host.docker.internal to rpc container
2opremio Jun 13, 2024
cf06605
Fix archive host names
2opremio Jun 13, 2024
618aac8
Run docker-compose logs -f rpc in the background
2opremio Jun 13, 2024
10e09bf
Fix captive core storage path
2opremio Jun 13, 2024
7a3daca
Stop printing env variables with docker-compose
2opremio Jun 13, 2024
8bff2dc
Final cleanup
2opremio Jun 13, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/golangci-lint.yml
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ jobs:
make build-libpreflight

- name: Run golangci-lint
uses: golangci/golangci-lint-action@537aa1903e5d359d0b27dbc19ddd22c5087f3fbc # version v3.2.0
uses: golangci/golangci-lint-action@a4f60bb28d35aeee14e6880718e0c85ff1882e64 # version v6.0.1
with:
version: v1.52.2 # this is the golangci-lint version
args: --issues-exit-code=0 # exit without errors for now - won't fail the build
Expand Down
110 changes: 66 additions & 44 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second
inMemoryInitializationLedgerLogPeriod = 1_000_000
transactionsTableMigrationDoneMetaKey = "TransactionsTableMigrationDone"
)

type Daemon struct {
Expand Down Expand Up @@ -187,49 +188,7 @@ func MustNew(cfg *config.Config) *Daemon {
}, metricsRegistry),
}

eventStore := events.NewMemoryStore(
daemon,
cfg.NetworkPassphrase,
cfg.EventLedgerRetentionWindow,
)
feewindows := feewindow.NewFeeWindows(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase)

// initialize the stores using what was on the DB
readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
var initialSeq uint32
var currentSeq uint32
err = db.NewLedgerReader(dbConn).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := feewindows.IngestFees(txmeta); err != nil {
logger.WithError(err).Fatal("could not initialize fee stats")
}
return nil
})
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
if currentSeq != 0 {
logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("finished initializing in-memory store")
}
feewindows, eventStore := daemon.mustInitializeStorage(cfg)

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
Expand Down Expand Up @@ -282,7 +241,7 @@ func MustNew(cfg *config.Config) *Daemon {
Logger: logger,
LedgerReader: db.NewLedgerReader(dbConn),
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase),
TransactionReader: db.NewTransactionReader(logger, dbConn.SessionInterface, cfg.NetworkPassphrase),
PreflightGetter: preflightWorkerPool,
})

Expand Down Expand Up @@ -317,6 +276,69 @@ func MustNew(cfg *config.Config) *Daemon {
return daemon
}

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) {
eventStore := events.NewMemoryStore(
d,
cfg.NetworkPassphrase,
cfg.EventLedgerRetentionWindow,
)
feewindows := feewindow.NewFeeWindows(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
// TODO: clean up once we remove the in-memory storage.
// (we should only stream over the required range)
if r := dataMigrations.ApplicableRange(); r.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")
}
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
if err := dataMigrations.Commit(readTxMetaCtx); err != nil {
d.logger.WithError(err).Fatal("could not commit data migrations")
}

if currentSeq != 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("finished initializing in-memory store")
}

return feewindows, eventStore
}

func (d *Daemon) Run() {
d.logger.WithFields(supportlog.F{
"addr": d.server.Addr,
Expand Down
73 changes: 52 additions & 21 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,12 @@ import (
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
)

//go:embed migrations/*.sql
var migrations embed.FS
//go:embed sqlmigrations/*.sql
var sqlMigrations embed.FS

var ErrEmptyDB = errors.New("DB is empty")

Expand Down Expand Up @@ -52,7 +53,14 @@ type dbCache struct {

type DB struct {
db.SessionInterface
cache dbCache
cache *dbCache
}

func (db *DB) Clone() *DB {
return &DB{
SessionInterface: db.SessionInterface.Clone(),
cache: db.cache,
2opremio marked this conversation as resolved.
Show resolved Hide resolved
}
}

func openSQLiteDB(dbFilePath string) (*db.Session, error) {
Expand All @@ -65,9 +73,9 @@ func openSQLiteDB(dbFilePath string) (*db.Session, error) {
return nil, errors.Wrap(err, "open failed")
}

if err = runMigrations(session.DB.DB, "sqlite3"); err != nil {
if err = runSQLMigrations(session.DB.DB, "sqlite3"); err != nil {
_ = session.Close()
return nil, errors.Wrap(err, "could not run migrations")
return nil, errors.Wrap(err, "could not run SQL migrations")
}
return session, nil
}
Expand All @@ -79,7 +87,7 @@ func OpenSQLiteDBWithPrometheusMetrics(dbFilePath string, namespace string, sub
}
result := DB{
SessionInterface: db.RegisterMetrics(session, namespace, sub, registry),
cache: dbCache{
cache: &dbCache{
ledgerEntries: newTransactionalCache(),
},
}
Expand All @@ -93,28 +101,50 @@ func OpenSQLiteDB(dbFilePath string) (*DB, error) {
}
result := DB{
SessionInterface: session,
cache: dbCache{
cache: &dbCache{
ledgerEntries: newTransactionalCache(),
},
}
return &result, nil
}

func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *dbCache) (uint32, error) {
sql := sq.Select("value").From(metaTableName).Where(sq.Eq{"key": latestLedgerSequenceMetaKey})
func getMetaBool(ctx context.Context, q db.SessionInterface, key string) (bool, error) {
valueStr, err := getMetaValue(ctx, q, key)
if err != nil {
return false, err
}
return strconv.ParseBool(valueStr)
}

func setMetaBool(ctx context.Context, q db.SessionInterface, key string) error {
2opremio marked this conversation as resolved.
Show resolved Hide resolved
query := sq.Replace(metaTableName).
Values(key, "true")
_, err := q.Exec(ctx, query)
return err
}

func getMetaValue(ctx context.Context, q db.SessionInterface, key string) (string, error) {
sql := sq.Select("value").From(metaTableName).Where(sq.Eq{"key": key})
var results []string
if err := q.Select(ctx, &results, sql); err != nil {
return 0, err
return "", err
}
switch len(results) {
case 0:
return 0, ErrEmptyDB
return "", ErrEmptyDB
case 1:
// expected length on an initialized DB
default:
return 0, fmt.Errorf("multiple entries (%d) for key %q in table %q", len(results), latestLedgerSequenceMetaKey, metaTableName)
return "", fmt.Errorf("multiple entries (%d) for key %q in table %q", len(results), latestLedgerSequenceMetaKey, metaTableName)
}
return results[0], nil
}

func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *dbCache) (uint32, error) {
latestLedgerStr, err := getMetaValue(ctx, q, latestLedgerSequenceMetaKey)
if err != nil {
return 0, err
}
latestLedgerStr := results[0]
latestLedger, err := strconv.ParseUint(latestLedgerStr, 10, 32)
if err != nil {
return 0, err
Expand All @@ -125,7 +155,7 @@ func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *
// Otherwise, the write-through cache won't get updated until the first ingestion commit
cache.Lock()
if cache.latestLedgerSeq == 0 {
// Only update the cache if value is missing (0), otherwise
// Only update the cache if the value is missing (0), otherwise
// we may end up overwriting the entry with an older version
cache.latestLedgerSeq = result
}
Expand Down Expand Up @@ -192,20 +222,21 @@ func NewReadWriter(
}

func (rw *readWriter) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, rw.db, &rw.db.cache)
return getLatestLedgerSequence(ctx, rw.db.SessionInterface, rw.db.cache)
}

func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
txSession := rw.db.Clone()
txSession := rw.db.SessionInterface.Clone()
if err := txSession.Begin(ctx); err != nil {
return nil, err
}
stmtCache := sq.NewStmtCache(txSession.GetTx())

db := rw.db
writer := writeTx{
globalCache: &db.cache,
globalCache: db.cache,
postCommit: func() error {
// TODO: this is sqlite-only, it shouldn't be here
_, err := db.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
return err
},
Expand Down Expand Up @@ -308,12 +339,12 @@ func (w writeTx) Rollback() error {
}
}

func runMigrations(db *sql.DB, dialect string) error {
func runSQLMigrations(db *sql.DB, dialect string) error {
m := &migrate.AssetMigrationSource{
Asset: migrations.ReadFile,
Asset: sqlMigrations.ReadFile,
AssetDir: func() func(string) ([]string, error) {
return func(path string) ([]string, error) {
dirEntry, err := migrations.ReadDir(path)
dirEntry, err := sqlMigrations.ReadDir(path)
if err != nil {
return nil, err
}
Expand All @@ -325,7 +356,7 @@ func runMigrations(db *sql.DB, dialect string) error {
return entries, nil
}
}(),
Dir: "migrations",
Dir: "sqlmigrations",
}
_, err := migrate.ExecMax(db, dialect, m, migrate.Up, 0)
return err
Expand Down
5 changes: 3 additions & 2 deletions cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/stellar/go/network"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
)

Expand Down Expand Up @@ -113,8 +114,8 @@ func NewTestDB(tb testing.TB) *DB {
assert.NoError(tb, db.Close())
})
return &DB{
SessionInterface: db,
cache: dbCache{
SessionInterface: db.SessionInterface,
cache: &dbCache{
ledgerEntries: newTransactionalCache(),
},
}
Expand Down
10 changes: 5 additions & 5 deletions cmd/soroban-rpc/internal/db/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -341,12 +341,12 @@ func NewLedgerEntryReader(db *DB) LedgerEntryReader {
}

func (r ledgerEntryReader) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, r.db, &r.db.cache)
return getLatestLedgerSequence(ctx, r.db.SessionInterface, r.db.cache)
}

// NewCachedTx() caches all accessed ledger entries and select statements. If many ledger entries are accessed, it will grow without bounds.
func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx, error) {
txSession := r.db.Clone()
txSession := r.db.SessionInterface.Clone()
// We need to copy the cached ledger entries locally when we start the transaction
// since otherwise we would break the consistency between the transaction and the cache.

Expand All @@ -360,7 +360,7 @@ func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx,
}
cacheReadTx := r.db.cache.ledgerEntries.newReadTx()
return &ledgerEntryReadTx{
globalCache: &r.db.cache,
globalCache: r.db.cache,
stmtCache: sq.NewStmtCache(txSession.GetTx()),
latestLedgerSeqCache: r.db.cache.latestLedgerSeq,
ledgerEntryCacheReadTx: &cacheReadTx,
Expand All @@ -370,14 +370,14 @@ func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx,
}

func (r ledgerEntryReader) NewTx(ctx context.Context) (LedgerEntryReadTx, error) {
txSession := r.db.Clone()
txSession := r.db.SessionInterface.Clone()
if err := txSession.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}); err != nil {
return nil, err
}
r.db.cache.RLock()
defer r.db.cache.RUnlock()
return &ledgerEntryReadTx{
globalCache: &r.db.cache,
globalCache: r.db.cache,
latestLedgerSeqCache: r.db.cache.latestLedgerSeq,
tx: txSession,
buffer: xdr.NewEncodingBuffer(),
Expand Down
Loading
Loading