Skip to content

Commit

Permalink
Merge branch 'main' into chore/clippy
Browse files Browse the repository at this point in the history
  • Loading branch information
leighmcculloch authored Jul 11, 2024
2 parents 2d6368f + dbb390c commit 2ff4f64
Show file tree
Hide file tree
Showing 21 changed files with 359 additions and 258 deletions.
8 changes: 4 additions & 4 deletions .github/workflows/soroban-rpc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -100,10 +100,10 @@ jobs:
SOROBAN_RPC_INTEGRATION_TESTS_ENABLED: true
SOROBAN_RPC_INTEGRATION_TESTS_CORE_MAX_SUPPORTED_PROTOCOL: ${{ matrix.protocol-version }}
SOROBAN_RPC_INTEGRATION_TESTS_CAPTIVE_CORE_BIN: /usr/bin/stellar-core
PROTOCOL_20_CORE_DEBIAN_PKG_VERSION: 21.0.1-1897.dfd3dbff1.focal
PROTOCOL_20_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:21.0.1-1897.dfd3dbff1.focal
PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 21.0.1-1897.dfd3dbff1.focal
PROTOCOL_21_CORE_DOCKER_IMG: stellar/unsafe-stellar-core:21.0.1-1897.dfd3dbff1.focal
PROTOCOL_20_CORE_DEBIAN_PKG_VERSION: 21.1.0-1909.rc1.b3aeb14cc.focal
PROTOCOL_20_CORE_DOCKER_IMG: stellar/stellar-core:21.1.0-1909.rc1.b3aeb14cc.focal
PROTOCOL_21_CORE_DEBIAN_PKG_VERSION: 21.1.0-1909.rc1.b3aeb14cc.focal
PROTOCOL_21_CORE_DOCKER_IMG: stellar/stellar-core:21.1.0-1909.rc1.b3aeb14cc.focal
steps:
- uses: actions/checkout@v4
with:
Expand Down
10 changes: 7 additions & 3 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,11 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
cfg.NetworkPassphrase,
cfg.HistoryRetentionWindow,
)
feewindows := feewindow.NewFeeWindows(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase)
feewindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
cfg.NetworkPassphrase,
)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
Expand All @@ -315,7 +319,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
}).Info("initializing in-memory store and applying DB data migrations")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
Expand Down Expand Up @@ -346,7 +350,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
if currentSeq != 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("finished initializing in-memory store")
}).Info("finished initializing in-memory store and applying DB data migrations")
}

return feewindows, eventStore
Expand Down
37 changes: 36 additions & 1 deletion cmd/soroban-rpc/internal/db/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,8 @@ import (
sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

const (
Expand All @@ -18,6 +20,7 @@ type StreamLedgerFn func(xdr.LedgerCloseMeta) error
type LedgerReader interface {
GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error)
StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error
GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error)
}

type LedgerWriter interface {
Expand Down Expand Up @@ -65,8 +68,40 @@ func (r ledgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.Ledge
case 1:
return results[0], true, nil
default:
return xdr.LedgerCloseMeta{}, false, fmt.Errorf("multiple lcm entries (%d) for sequence %d in table %q", len(results), sequence, ledgerCloseMetaTableName)
return xdr.LedgerCloseMeta{}, false, fmt.Errorf("multiple lcm entries (%d) for sequence %d in table %q",
len(results), sequence, ledgerCloseMetaTableName)
}
}

// GetLedgerRange pulls the min/max ledger sequence numbers from the meta table.
func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) {
query := sq.Select("lcm.meta").
From(ledgerCloseMetaTableName + " as lcm").
Where(sq.Or{
sq.Expr("lcm.sequence = (?)", sq.Select("MIN(sequence)").From(ledgerCloseMetaTableName)),
sq.Expr("lcm.sequence = (?)", sq.Select("MAX(sequence)").From(ledgerCloseMetaTableName)),
}).OrderBy("lcm.sequence ASC")

var lcms []xdr.LedgerCloseMeta
if err := r.db.Select(ctx, &lcms, query); err != nil {
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}

// Empty DB
if len(lcms) == 0 {
return ledgerbucketwindow.LedgerRange{}, nil
}

return ledgerbucketwindow.LedgerRange{
FirstLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcms[0].LedgerSequence(),
CloseTime: lcms[0].LedgerCloseTime(),
},
LastLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcms[len(lcms)-1].LedgerSequence(),
CloseTime: lcms[len(lcms)-1].LedgerCloseTime(),
},
}, nil
}

type ledgerWriter struct {
Expand Down
136 changes: 119 additions & 17 deletions cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,24 @@ func assertLedgerRange(t *testing.T, reader LedgerReader, start, end uint32) {
allLedgers = append(allLedgers, txmeta)
return nil
})
assert.NoError(t, err)
require.NoError(t, err)
for i := start - 1; i <= end+1; i++ {
ledger, exists, err := reader.GetLedger(context.Background(), i)
assert.NoError(t, err)
require.NoError(t, err)
if i < start || i > end {
assert.False(t, exists)
continue
}
assert.True(t, exists)
ledgerBinary, err := ledger.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
expected := createLedger(i)
expectedBinary, err := expected.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, expectedBinary, ledgerBinary)

ledgerBinary, err = allLedgers[0].MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, expectedBinary, ledgerBinary)
allLedgers = allLedgers[1:]
}
Expand All @@ -74,45 +74,147 @@ func TestLedgers(t *testing.T) {

reader := NewLedgerReader(db)
_, exists, err := reader.GetLedger(context.Background(), 1)
assert.NoError(t, err)
require.NoError(t, err)
assert.False(t, exists)

for i := 1; i <= 10; i++ {
ledgerSequence := uint32(i)
tx, err := NewReadWriter(logger, db, daemon, 150, 15, passphrase).NewTx(context.Background())
assert.NoError(t, err)
assert.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
assert.NoError(t, tx.Commit(ledgerSequence))
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
require.NoError(t, tx.Commit(ledgerSequence))
// rolling back after a commit is a no-op
assert.NoError(t, tx.Rollback())
require.NoError(t, tx.Rollback())
}

assertLedgerRange(t, reader, 1, 10)

ledgerSequence := uint32(11)
tx, err := NewReadWriter(logger, db, daemon, 150, 15, passphrase).NewTx(context.Background())
assert.NoError(t, err)
assert.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
assert.NoError(t, tx.Commit(ledgerSequence))
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
require.NoError(t, tx.Commit(ledgerSequence))

assertLedgerRange(t, reader, 1, 11)

ledgerSequence = uint32(12)
tx, err = NewReadWriter(logger, db, daemon, 150, 5, passphrase).NewTx(context.Background())
assert.NoError(t, err)
assert.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
assert.NoError(t, tx.Commit(ledgerSequence))
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
require.NoError(t, tx.Commit(ledgerSequence))

assertLedgerRange(t, reader, 8, 12)
}

func TestGetLedgerRange_NonEmptyDB(t *testing.T) {
db := NewTestDB(t)
ctx := context.TODO()

writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase)
write, err := writer.NewTx(ctx)
require.NoError(t, err)

lcms := []xdr.LedgerCloseMeta{
txMeta(1234, true),
txMeta(1235, true),
txMeta(1236, true),
txMeta(1237, true),
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(t, ledgerW.InsertLedger(lcm), "ingestion failed for ledger %+v", lcm.V1)
require.NoError(t, txW.InsertTransactions(lcm), "ingestion failed for ledger %+v", lcm.V1)
}
require.NoError(t, write.Commit(lcms[len(lcms)-1].LedgerSequence()))

reader := NewLedgerReader(db)
ledgerRange, err := reader.GetLedgerRange(ctx)
require.NoError(t, err)
assert.Equal(t, uint32(1334), ledgerRange.FirstLedger.Sequence)
assert.Equal(t, ledgerCloseTime(1334), ledgerRange.FirstLedger.CloseTime)
assert.Equal(t, uint32(1337), ledgerRange.LastLedger.Sequence)
assert.Equal(t, ledgerCloseTime(1337), ledgerRange.LastLedger.CloseTime)
}

func TestGetLedgerRange_SingleDBRow(t *testing.T) {
db := NewTestDB(t)
ctx := context.TODO()

writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase)
write, err := writer.NewTx(ctx)
require.NoError(t, err)

lcms := []xdr.LedgerCloseMeta{
txMeta(1234, true),
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(t, ledgerW.InsertLedger(lcm), "ingestion failed for ledger %+v", lcm.V1)
require.NoError(t, txW.InsertTransactions(lcm), "ingestion failed for ledger %+v", lcm.V1)
}
require.NoError(t, write.Commit(lcms[len(lcms)-1].LedgerSequence()))

reader := NewLedgerReader(db)
ledgerRange, err := reader.GetLedgerRange(ctx)
require.NoError(t, err)
assert.Equal(t, uint32(1334), ledgerRange.FirstLedger.Sequence)
assert.Equal(t, ledgerCloseTime(1334), ledgerRange.FirstLedger.CloseTime)
assert.Equal(t, uint32(1334), ledgerRange.LastLedger.Sequence)
assert.Equal(t, ledgerCloseTime(1334), ledgerRange.LastLedger.CloseTime)
}

func TestGetLedgerRange_EmptyDB(t *testing.T) {
db := NewTestDB(t)
ctx := context.TODO()

reader := NewLedgerReader(db)
ledgerRange, err := reader.GetLedgerRange(ctx)
require.NoError(t, err)
assert.Equal(t, uint32(0), ledgerRange.FirstLedger.Sequence)
assert.Equal(t, int64(0), ledgerRange.FirstLedger.CloseTime)
assert.Equal(t, uint32(0), ledgerRange.LastLedger.Sequence)
assert.Equal(t, int64(0), ledgerRange.LastLedger.CloseTime)
}

func BenchmarkGetLedgerRange(b *testing.B) {
db := NewTestDB(b)
logger := log.DefaultLogger
writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 100, 1_000_000, passphrase)
write, err := writer.NewTx(context.TODO())
require.NoError(b, err)

// create 100k tx rows
lcms := make([]xdr.LedgerCloseMeta, 0, 100_000)
for i := range cap(lcms) {
lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0))
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(b, ledgerW.InsertLedger(lcm))
require.NoError(b, txW.InsertTransactions(lcm))
}
require.NoError(b, write.Commit(lcms[len(lcms)-1].LedgerSequence()))
reader := NewLedgerReader(db)

b.ResetTimer()
for range b.N {
ledgerRange, err := reader.GetLedgerRange(context.TODO())
require.NoError(b, err)
assert.Equal(b, lcms[0].LedgerSequence(), ledgerRange.FirstLedger.Sequence)
assert.Equal(b, lcms[len(lcms)-1].LedgerSequence(), ledgerRange.LastLedger.Sequence)
}
}

func NewTestDB(tb testing.TB) *DB {
tmp := tb.TempDir()
dbPath := path.Join(tmp, "db.sqlite")
db, err := OpenSQLiteDB(dbPath)
require.NoError(tb, err)
tb.Cleanup(func() {
assert.NoError(tb, db.Close())
require.NoError(tb, db.Close())
})
return db
}
20 changes: 15 additions & 5 deletions cmd/soroban-rpc/internal/db/migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -115,9 +115,13 @@ type guardedMigration struct {
db *DB
migration MigrationApplier
alreadyMigrated bool
logger *log.Entry
applyLogged bool
}

func newGuardedDataMigration(ctx context.Context, uniqueMigrationName string, factory migrationApplierFactory, db *DB) (Migration, error) {
func newGuardedDataMigration(
ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB,
) (Migration, error) {
migrationDB := &DB{
cache: db.cache,
SessionInterface: db.SessionInterface.Clone(),
Expand All @@ -132,7 +136,7 @@ func newGuardedDataMigration(ctx context.Context, uniqueMigrationName string, fa
return nil, err
}
latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx)
if err != nil && err != ErrEmptyDB {
if err != nil && !errors.Is(err, ErrEmptyDB) {
err = errors.Join(err, migrationDB.Rollback())
return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err)
}
Expand All @@ -146,6 +150,7 @@ func newGuardedDataMigration(ctx context.Context, uniqueMigrationName string, fa
db: migrationDB,
migration: applier,
alreadyMigrated: previouslyMigrated,
logger: logger,
}
return guardedMigration, nil
}
Expand All @@ -156,6 +161,10 @@ func (g *guardedMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta)
// but, just in case.
return nil
}
if !g.applyLogged {
g.logger.WithField("ledger", meta.LedgerSequence()).Info("applying migration")
g.applyLogged = true
}
return g.migration.Apply(ctx, meta)
}

Expand All @@ -177,19 +186,20 @@ func (g *guardedMigration) Commit(ctx context.Context) error {
return g.db.Commit()
}

func (g *guardedMigration) Rollback(ctx context.Context) error {
func (g *guardedMigration) Rollback(_ context.Context) error {
return g.db.Rollback()
}

func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) {
migrationName := "TransactionsTable"
logger = logger.WithField("migration", migrationName)
factory := newTransactionTableMigration(
ctx,
logger.WithField("migration", migrationName),
logger,
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
)
m, err := newGuardedDataMigration(ctx, migrationName, factory, db)
m, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db)
if err != nil {
return nil, fmt.Errorf("creating guarded transaction migration: %w", err)
}
Expand Down
Loading

0 comments on commit 2ff4f64

Please sign in to comment.