Skip to content

Commit

Permalink
Fix ledger range calculation (stellar#217)
Browse files Browse the repository at this point in the history
* Add LedgerRangeReader interface

* Add GetLedgerRange implementation for ledgers table

* Use GetLedgerRange from the ledgers table for getHealth and getFeeStats

* Change varible name

* Add GetLedgerRange method for transactions table

* Add test for GetLedgerRange for ledgers table

* Add GetLedgerRange in ConstantLedgerReader

* Add test for GetLedgerRange in transactions table

* Fix nil pointer bugs in getTransactions

* Remove latest ledger assertion

* Comment out the latest ledger assertion

* Remove GetLedgerRange from meta table and use ledgerRangeGetter for getHealth and getFeeStats

* Remove ledgerCloseTime

* Revert newline change

* Revert

* Remove assertions

* revert

* Change interface name

* insert txns during integration test setup - 1

* insert txns during integration test setup - 2

* insert txns during integration test setup - 3

* Fix linting errors

* Fix linting errors - 2

* Fix linting errors - 3

* Fix linting errors - 4

* Fix linting errors - 5

* Revert

* Revert-2

* change camel-case naming

* change camel-case naming - 2

* Fix linting errrors - 6

* Simplify ledger range query

* Simplify ledger range query - 2

* Simplify ledger range query - 3

* Add benchmarking for GetLedgerRange

* Fix linter issues - 6

* Remove else condition

* Optimise the GetLedgerRange query

* Fix intrange linter

* Use require.NoError

* Move comment to definition

* Fix intrange linter - 2

* Fix nomnd linter

* Fix intrange linter

* Remove db/util.go and add txMeta methods to infrastructure

* forgot to gci files again :/

* Remove migration FIXME

* Fix linter checks

* Add migration for lcm sequence index

* Revert transaction_test.go changes

* Add newline

* Add GetLedgerRange implementation in meta table

* Use new GetLedgerRange for getHealth and getFeeStats

* Add GetLedgerRange to ConstantLedgerReader

* Remove GetLedgerRange from transactions code and use the meta table one

* Remove unnecessary file changes

* Fix linting errors

* Fix linting errors - 2

* Fix linting errors - 3

* Fix linting errors - 4

* Remove unnecessary constants

* Use sq.Select instead of sq.Expr

* Add nolint

* Add nolint - 2

* Handle single row case in ledger range

* Remove index migration

* Order the ledger results in ASC order

* exchange the expected and actual values in assert
  • Loading branch information
aditya1702 authored Jul 2, 2024
1 parent c231749 commit 44515d0
Show file tree
Hide file tree
Showing 14 changed files with 326 additions and 225 deletions.
38 changes: 36 additions & 2 deletions cmd/soroban-rpc/internal/db/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,9 @@ import (
"fmt"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

const (
Expand All @@ -18,6 +19,7 @@ type StreamLedgerFn func(xdr.LedgerCloseMeta) error
type LedgerReader interface {
GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error)
StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error
GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error)
}

type LedgerWriter interface {
Expand Down Expand Up @@ -65,10 +67,42 @@ func (r ledgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.Ledge
case 1:
return results[0], true, nil
default:
return xdr.LedgerCloseMeta{}, false, fmt.Errorf("multiple lcm entries (%d) for sequence %d in table %q", len(results), sequence, ledgerCloseMetaTableName)
return xdr.LedgerCloseMeta{}, false, fmt.Errorf("multiple lcm entries (%d) for sequence %d in table %q",
len(results), sequence, ledgerCloseMetaTableName)
}
}

// GetLedgerRange pulls the min/max ledger sequence numbers from the meta table.
func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) {
query := sq.Select("lcm.meta").
From(ledgerCloseMetaTableName + " as lcm").
Where(sq.Or{
sq.Expr("lcm.sequence = (?)", sq.Select("MIN(sequence)").From(ledgerCloseMetaTableName)),
sq.Expr("lcm.sequence = (?)", sq.Select("MAX(sequence)").From(ledgerCloseMetaTableName)),
}).OrderBy("lcm.sequence ASC")

var lcms []xdr.LedgerCloseMeta
if err := r.db.Select(ctx, &lcms, query); err != nil {
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}

// Empty DB
if len(lcms) == 0 {
return ledgerbucketwindow.LedgerRange{}, nil
}

return ledgerbucketwindow.LedgerRange{
FirstLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcms[0].LedgerSequence(),
CloseTime: lcms[0].LedgerCloseTime(),
},
LastLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcms[len(lcms)-1].LedgerSequence(),
CloseTime: lcms[len(lcms)-1].LedgerCloseTime(),
},
}, nil
}

type ledgerWriter struct {
stmtCache *sq.StmtCache
}
Expand Down
136 changes: 119 additions & 17 deletions cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,24 +44,24 @@ func assertLedgerRange(t *testing.T, reader LedgerReader, start, end uint32) {
allLedgers = append(allLedgers, txmeta)
return nil
})
assert.NoError(t, err)
require.NoError(t, err)
for i := start - 1; i <= end+1; i++ {
ledger, exists, err := reader.GetLedger(context.Background(), i)
assert.NoError(t, err)
require.NoError(t, err)
if i < start || i > end {
assert.False(t, exists)
continue
}
assert.True(t, exists)
ledgerBinary, err := ledger.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
expected := createLedger(i)
expectedBinary, err := expected.MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, expectedBinary, ledgerBinary)

ledgerBinary, err = allLedgers[0].MarshalBinary()
assert.NoError(t, err)
require.NoError(t, err)
assert.Equal(t, expectedBinary, ledgerBinary)
allLedgers = allLedgers[1:]
}
Expand All @@ -74,45 +74,147 @@ func TestLedgers(t *testing.T) {

reader := NewLedgerReader(db)
_, exists, err := reader.GetLedger(context.Background(), 1)
assert.NoError(t, err)
require.NoError(t, err)
assert.False(t, exists)

for i := 1; i <= 10; i++ {
ledgerSequence := uint32(i)
tx, err := NewReadWriter(logger, db, daemon, 150, 15, passphrase).NewTx(context.Background())
assert.NoError(t, err)
assert.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
assert.NoError(t, tx.Commit(ledgerSequence))
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
require.NoError(t, tx.Commit(ledgerSequence))
// rolling back after a commit is a no-op
assert.NoError(t, tx.Rollback())
require.NoError(t, tx.Rollback())
}

assertLedgerRange(t, reader, 1, 10)

ledgerSequence := uint32(11)
tx, err := NewReadWriter(logger, db, daemon, 150, 15, passphrase).NewTx(context.Background())
assert.NoError(t, err)
assert.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
assert.NoError(t, tx.Commit(ledgerSequence))
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
require.NoError(t, tx.Commit(ledgerSequence))

assertLedgerRange(t, reader, 1, 11)

ledgerSequence = uint32(12)
tx, err = NewReadWriter(logger, db, daemon, 150, 5, passphrase).NewTx(context.Background())
assert.NoError(t, err)
assert.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
assert.NoError(t, tx.Commit(ledgerSequence))
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
require.NoError(t, tx.Commit(ledgerSequence))

assertLedgerRange(t, reader, 8, 12)
}

func TestGetLedgerRange_NonEmptyDB(t *testing.T) {
db := NewTestDB(t)
ctx := context.TODO()

writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase)
write, err := writer.NewTx(ctx)
require.NoError(t, err)

lcms := []xdr.LedgerCloseMeta{
txMeta(1234, true),
txMeta(1235, true),
txMeta(1236, true),
txMeta(1237, true),
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(t, ledgerW.InsertLedger(lcm), "ingestion failed for ledger %+v", lcm.V1)
require.NoError(t, txW.InsertTransactions(lcm), "ingestion failed for ledger %+v", lcm.V1)
}
require.NoError(t, write.Commit(lcms[len(lcms)-1].LedgerSequence()))

reader := NewLedgerReader(db)
ledgerRange, err := reader.GetLedgerRange(ctx)
require.NoError(t, err)
assert.Equal(t, uint32(1334), ledgerRange.FirstLedger.Sequence)
assert.Equal(t, ledgerCloseTime(1334), ledgerRange.FirstLedger.CloseTime)
assert.Equal(t, uint32(1337), ledgerRange.LastLedger.Sequence)
assert.Equal(t, ledgerCloseTime(1337), ledgerRange.LastLedger.CloseTime)
}

func TestGetLedgerRange_SingleDBRow(t *testing.T) {
db := NewTestDB(t)
ctx := context.TODO()

writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase)
write, err := writer.NewTx(ctx)
require.NoError(t, err)

lcms := []xdr.LedgerCloseMeta{
txMeta(1234, true),
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(t, ledgerW.InsertLedger(lcm), "ingestion failed for ledger %+v", lcm.V1)
require.NoError(t, txW.InsertTransactions(lcm), "ingestion failed for ledger %+v", lcm.V1)
}
require.NoError(t, write.Commit(lcms[len(lcms)-1].LedgerSequence()))

reader := NewLedgerReader(db)
ledgerRange, err := reader.GetLedgerRange(ctx)
require.NoError(t, err)
assert.Equal(t, uint32(1334), ledgerRange.FirstLedger.Sequence)
assert.Equal(t, ledgerCloseTime(1334), ledgerRange.FirstLedger.CloseTime)
assert.Equal(t, uint32(1334), ledgerRange.LastLedger.Sequence)
assert.Equal(t, ledgerCloseTime(1334), ledgerRange.LastLedger.CloseTime)
}

func TestGetLedgerRange_EmptyDB(t *testing.T) {
db := NewTestDB(t)
ctx := context.TODO()

reader := NewLedgerReader(db)
ledgerRange, err := reader.GetLedgerRange(ctx)
require.NoError(t, err)
assert.Equal(t, uint32(0), ledgerRange.FirstLedger.Sequence)
assert.Equal(t, int64(0), ledgerRange.FirstLedger.CloseTime)
assert.Equal(t, uint32(0), ledgerRange.LastLedger.Sequence)
assert.Equal(t, int64(0), ledgerRange.LastLedger.CloseTime)
}

func BenchmarkGetLedgerRange(b *testing.B) {
db := NewTestDB(b)
logger := log.DefaultLogger
writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 100, 1_000_000, passphrase)
write, err := writer.NewTx(context.TODO())
require.NoError(b, err)

// create 100k tx rows
lcms := make([]xdr.LedgerCloseMeta, 0, 100_000)
for i := range cap(lcms) {
lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0))
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(b, ledgerW.InsertLedger(lcm))
require.NoError(b, txW.InsertTransactions(lcm))
}
require.NoError(b, write.Commit(lcms[len(lcms)-1].LedgerSequence()))
reader := NewLedgerReader(db)

b.ResetTimer()
for range b.N {
ledgerRange, err := reader.GetLedgerRange(context.TODO())
require.NoError(b, err)
assert.Equal(b, lcms[0].LedgerSequence(), ledgerRange.FirstLedger.Sequence)
assert.Equal(b, lcms[len(lcms)-1].LedgerSequence(), ledgerRange.LastLedger.Sequence)
}
}

func NewTestDB(tb testing.TB) *DB {
tmp := tb.TempDir()
dbPath := path.Join(tmp, "db.sqlite")
db, err := OpenSQLiteDB(dbPath)
require.NoError(tb, err)
tb.Cleanup(func() {
assert.NoError(tb, db.Close())
require.NoError(tb, db.Close())
})
return db
}
56 changes: 28 additions & 28 deletions cmd/soroban-rpc/internal/db/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package db

import (
"context"
"errors"
"io"

"github.com/prometheus/client_golang/prometheus"
Expand All @@ -12,7 +13,7 @@ import (
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

type mockTransactionHandler struct {
type MockTransactionHandler struct {
passphrase string

ledgerRange ledgerbucketwindow.LedgerRange
Expand All @@ -21,16 +22,16 @@ type mockTransactionHandler struct {
ledgerSeqToMeta map[uint32]*xdr.LedgerCloseMeta
}

func NewMockTransactionStore(passphrase string) *mockTransactionHandler {
return &mockTransactionHandler{
func NewMockTransactionStore(passphrase string) *MockTransactionHandler {
return &MockTransactionHandler{
passphrase: passphrase,
txs: make(map[string]ingest.LedgerTransaction),
txHashToMeta: make(map[string]*xdr.LedgerCloseMeta),
ledgerSeqToMeta: make(map[uint32]*xdr.LedgerCloseMeta),
}
}

func (txn *mockTransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error {
func (txn *MockTransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) error {
txn.ledgerSeqToMeta[lcm.LedgerSequence()] = &lcm

reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(txn.passphrase, lcm)
Expand All @@ -40,7 +41,7 @@ func (txn *mockTransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) e

for {
tx, err := reader.Read()
if err == io.EOF {
if errors.Is(err, io.EOF) {
break
} else if err != nil {
return err
Expand All @@ -65,48 +66,47 @@ func (txn *mockTransactionHandler) InsertTransactions(lcm xdr.LedgerCloseMeta) e
return nil
}

// GetLedgerRange pulls the min/max ledger sequence numbers from the database.
func (txn *mockTransactionHandler) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) {
return txn.ledgerRange, nil
}

func (txn *mockTransactionHandler) GetTransaction(ctx context.Context, hash xdr.Hash) (
Transaction, ledgerbucketwindow.LedgerRange, error,
func (txn *MockTransactionHandler) GetTransaction(_ context.Context, hash xdr.Hash) (
Transaction, error,
) {
if tx, ok := txn.txs[hash.HexString()]; !ok {
return Transaction{}, txn.ledgerRange, ErrNoTransaction
} else {
itx, err := ParseTransaction(*txn.txHashToMeta[hash.HexString()], tx)
return itx, txn.ledgerRange, err
tx, ok := txn.txs[hash.HexString()]
if !ok {
return Transaction{}, ErrNoTransaction
}
itx, err := ParseTransaction(*txn.txHashToMeta[hash.HexString()], tx)
return itx, err
}

func (txn *mockTransactionHandler) RegisterMetrics(_, _ prometheus.Observer) {}
func (txn *MockTransactionHandler) RegisterMetrics(_, _ prometheus.Observer) {}

type mockLedgerReader struct {
txn mockTransactionHandler
type MockLedgerReader struct {
txn *MockTransactionHandler
}

func NewMockLedgerReader(txn *mockTransactionHandler) *mockLedgerReader {
return &mockLedgerReader{
txn: *txn,
func NewMockLedgerReader(txn *MockTransactionHandler) *MockLedgerReader {
return &MockLedgerReader{
txn: txn,
}
}

func (m *mockLedgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) {
func (m *MockLedgerReader) GetLedger(_ context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) {
lcm, ok := m.txn.ledgerSeqToMeta[sequence]
if !ok {
return xdr.LedgerCloseMeta{}, false, nil
}
return *lcm, true, nil
}

func (m *mockLedgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error {
func (m *MockLedgerReader) StreamAllLedgers(_ context.Context, _ StreamLedgerFn) error {
return nil
}

func (m *MockLedgerReader) GetLedgerRange(_ context.Context) (ledgerbucketwindow.LedgerRange, error) {
return m.txn.ledgerRange, nil
}

var (
_ TransactionReader = &mockTransactionHandler{}
_ TransactionWriter = &mockTransactionHandler{}
_ LedgerReader = &mockLedgerReader{}
_ TransactionReader = &MockTransactionHandler{}
_ TransactionWriter = &MockTransactionHandler{}
_ LedgerReader = &MockLedgerReader{}
)
Loading

0 comments on commit 44515d0

Please sign in to comment.