Skip to content

Commit

Permalink
First stab
Browse files Browse the repository at this point in the history
  • Loading branch information
2opremio committed Jun 10, 2024
1 parent ec4504e commit b3f414c
Show file tree
Hide file tree
Showing 3 changed files with 77 additions and 7 deletions.
37 changes: 37 additions & 0 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const (
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second
inMemoryInitializationLedgerLogPeriod = 1_000_000
transactionsTableMigrationDoneMetaKey = "TransactionsTableMigrationDone"
)

type Daemon struct {
Expand Down Expand Up @@ -202,6 +203,36 @@ func MustNew(cfg *config.Config) *Daemon {
// but it's probably not worth the pain.
var initialSeq uint32
var currentSeq uint32
// We should do the migration somewhere else
migrationSession := dbConn.Clone()
if err = migrationSession.Begin(readTxMetaCtx); err != nil {
logger.WithError(err).Fatal("could not start migration session")
}
migrationFunc := func(txmeta xdr.LedgerCloseMeta) error { return nil }
migrationDoneFunc := func() {}
val, err := db.GetMetaBool(readTxMetaCtx, migrationSession, transactionsTableMigrationDoneMetaKey)
if err == db.ErrEmptyDB || val == false {
logger.Info("migrating transaction to new backend")
writer := db.NewTransactionWriter(logger, migrationSession, cfg.NetworkPassphrase)
migrationFunc = func(txmeta xdr.LedgerCloseMeta) error {
return writer.InsertTransactions(txmeta)
}
migrationDoneFunc = func() {
err := db.SetMetaBool(readTxMetaCtx, migrationSession, transactionsTableMigrationDoneMetaKey)
if err != nil {
logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not set metadata")
migrationSession.Rollback()
return
}
// TODO: rollback wherever necessary
if err = migrationSession.Commit(); err != nil {
logger.WithError(err).Error("could not commit migration session")
}
}
} else if err != nil {
logger.WithError(err).WithField("key", transactionsTableMigrationDoneMetaKey).Fatal("could not get metadata")
}

err = db.NewLedgerReader(dbConn).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
Expand All @@ -220,11 +251,17 @@ func MustNew(cfg *config.Config) *Daemon {
if err := feewindows.IngestFees(txmeta); err != nil {
logger.WithError(err).Fatal("could not initialize fee stats")
}
if err := migrationFunc(txmeta); err != nil {
// TODO: we should only migrate the transaction range
logger.WithError(err).Fatal("could not run migration")
}
return nil
})
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
migrationDoneFunc()

if currentSeq != 0 {
logger.WithFields(supportlog.F{
"seq": currentSeq,
Expand Down
36 changes: 30 additions & 6 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ import (
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces"
)

Expand Down Expand Up @@ -100,21 +101,43 @@ func OpenSQLiteDB(dbFilePath string) (*DB, error) {
return &result, nil
}

func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *dbCache) (uint32, error) {
sql := sq.Select("value").From(metaTableName).Where(sq.Eq{"key": latestLedgerSequenceMetaKey})
func GetMetaBool(ctx context.Context, q db.SessionInterface, key string) (bool, error) {
valueStr, err := getMetaValue(ctx, q, key)
if err != nil {
return false, err
}
return strconv.ParseBool(valueStr)
}

func SetMetaBool(ctx context.Context, q db.SessionInterface, key string) error {
_, err := sq.Replace(metaTableName).
Values(latestLedgerSequenceMetaKey, "true").
Exec()
return err
}

func getMetaValue(ctx context.Context, q db.SessionInterface, key string) (string, error) {
sql := sq.Select("value").From(metaTableName).Where(sq.Eq{"key": key})
var results []string
if err := q.Select(ctx, &results, sql); err != nil {
return 0, err
return "", err
}
switch len(results) {
case 0:
return 0, ErrEmptyDB
return "", ErrEmptyDB
case 1:
// expected length on an initialized DB
default:
return 0, fmt.Errorf("multiple entries (%d) for key %q in table %q", len(results), latestLedgerSequenceMetaKey, metaTableName)
return "", fmt.Errorf("multiple entries (%d) for key %q in table %q", len(results), latestLedgerSequenceMetaKey, metaTableName)
}
return results[0], nil
}

func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *dbCache) (uint32, error) {
latestLedgerStr, err := getMetaValue(ctx, q, latestLedgerSequenceMetaKey)
if err != nil {
return 0, err
}
latestLedgerStr := results[0]
latestLedger, err := strconv.ParseUint(latestLedgerStr, 10, 32)
if err != nil {
return 0, err
Expand Down Expand Up @@ -206,6 +229,7 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
writer := writeTx{
globalCache: &db.cache,
postCommit: func() error {
// TODO: this is sqlite-only, it shouldn't be here
_, err := db.ExecRaw(ctx, "PRAGMA wal_checkpoint(TRUNCATE)")
return err
},
Expand Down
11 changes: 10 additions & 1 deletion cmd/soroban-rpc/internal/db/transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,12 +39,21 @@ type TransactionWriter interface {
RegisterMetrics(ingest, count prometheus.Observer)
}

// TransactionReader provides all of the public ways to read from the DB.
// TransactionReader provides all the public ways to read from the DB.
type TransactionReader interface {
GetTransaction(ctx context.Context, hash xdr.Hash) (Transaction, ledgerbucketwindow.LedgerRange, error)
GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error)
}

func NewTransactionWriter(log *log.Entry, db db.SessionInterface, networkPassphrase string) TransactionWriter {
return &transactionHandler{
log: log,
db: db,
stmtCache: sq.NewStmtCache(db.GetTx()),
passphrase: networkPassphrase,
}
}

type transactionHandler struct {
log *log.Entry
db db.SessionInterface
Expand Down

0 comments on commit b3f414c

Please sign in to comment.