Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getLedgers implementation #303

Merged
merged 40 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 35 commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
d2f1763
Add getLedgers - 1
aditya1702 Sep 19, 2024
704ba5c
Add getLedgers - 2
aditya1702 Sep 23, 2024
d3bba51
Add getLedgers unittests
aditya1702 Sep 25, 2024
43e00fa
Add getLedgers integration tests
aditya1702 Sep 25, 2024
8f5ba83
Get all ledgers in once call from the db
aditya1702 Sep 26, 2024
99d7d3a
Add one more unittest
aditya1702 Sep 26, 2024
c5e71fe
gci files
aditya1702 Sep 26, 2024
b865020
change quelimit and request duration limit
aditya1702 Sep 26, 2024
f6eda45
Merge branch 'main' into get-ledgers
aditya1702 Sep 26, 2024
935d305
Update cmd/soroban-rpc/internal/methods/get_ledgers.go
aditya1702 Sep 30, 2024
9498a68
Update cmd/soroban-rpc/internal/methods/get_ledgers.go
aditya1702 Sep 30, 2024
d0f3816
Pre-alloc array in BatchGetLedgers
aditya1702 Sep 30, 2024
02604f3
Return default error for json conversion
aditya1702 Sep 30, 2024
1918bb8
Merge remote-tracking branch 'origin/get-ledgers' into get-ledgers
aditya1702 Sep 30, 2024
28d5afb
Use strconv.Itoa
aditya1702 Sep 30, 2024
524cbf6
Use xdr2json.ConvertInterface instead of marshalling
aditya1702 Oct 1, 2024
e807acf
Move db setup to another function
aditya1702 Oct 1, 2024
07e1c4a
Add benchmarking for BatchGetLedgers
aditya1702 Oct 1, 2024
7fac004
Small changes
aditya1702 Oct 1, 2024
b9282c9
Small changes - 2
aditya1702 Oct 1, 2024
58e6d2c
Update benchmark code for BatchGetLedgers
aditya1702 Oct 2, 2024
f89c5f1
Revert PaginationOptions changes (will do in separate PR)
aditya1702 Oct 2, 2024
ab8c4e5
Revert PaginationOptionschanges - 2
aditya1702 Oct 2, 2024
a8a24e2
Revert PaginationOptionschanges - 3
aditya1702 Oct 2, 2024
6417f93
Revert PaginationOptionschanges - 4
aditya1702 Oct 2, 2024
3325577
Revert PaginationOptionschanges - 5
aditya1702 Oct 2, 2024
2c906e9
refactor unittest to remove duplication
aditya1702 Oct 2, 2024
44b9ab9
Merge branch 'main' into get-ledgers
Shaptic Oct 7, 2024
72b136e
Update cmd/soroban-rpc/internal/methods/get_ledgers.go
aditya1702 Oct 11, 2024
b6e1f5f
Rename LedgerCloseMeta to LedgerMetadata
aditya1702 Nov 1, 2024
f305de9
Rename LedgerCloseMeta to LedgerMetadata - 2
aditya1702 Nov 1, 2024
7bc1d6f
Only pass ledger meta to extract the json for header too
aditya1702 Nov 1, 2024
686545d
Pass ledger pointer instead of entire object
aditya1702 Nov 1, 2024
526b534
Update cmd/soroban-rpc/internal/methods/get_ledgers.go
aditya1702 Nov 5, 2024
a9c2324
Use a read tx for GetLedgerRange and BatchGetLedgers
aditya1702 Nov 8, 2024
9d653e3
Merge remote-tracking branch 'refs/remotes/upstream/main' into get-le…
aditya1702 Nov 8, 2024
0be9cc5
Fix lint
aditya1702 Nov 8, 2024
ca4d6d6
Convert ReadDB to private interface
aditya1702 Nov 8, 2024
2a0cec2
Remove BatchGetLedgers from LedgerReader
aditya1702 Nov 11, 2024
6adaff0
Merge branch 'main' into get-ledgers
aditya1702 Nov 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions cmd/soroban-rpc/internal/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
CoreRequestTimeout time.Duration
DefaultEventsLimit uint
DefaultTransactionsLimit uint
DefaultLedgersLimit uint
EventLedgerRetentionWindow uint32
FriendbotURL string
HistoryArchiveURLs []string
Expand All @@ -35,6 +36,7 @@ type Config struct {
LogLevel logrus.Level
MaxEventsLimit uint
MaxTransactionsLimit uint
MaxLedgersLimit uint
MaxHealthyLedgerLatency time.Duration
NetworkPassphrase string
PreflightWorkerCount uint
Expand All @@ -54,6 +56,7 @@ type Config struct {
RequestBacklogGetLedgerEntriesQueueLimit uint
RequestBacklogGetTransactionQueueLimit uint
RequestBacklogGetTransactionsQueueLimit uint
RequestBacklogGetLedgersQueueLimit uint
RequestBacklogSendTransactionQueueLimit uint
RequestBacklogSimulateTransactionQueueLimit uint
RequestBacklogGetFeeStatsTransactionQueueLimit uint
Expand All @@ -67,6 +70,7 @@ type Config struct {
MaxGetLedgerEntriesExecutionDuration time.Duration
MaxGetTransactionExecutionDuration time.Duration
MaxGetTransactionsExecutionDuration time.Duration
MaxGetLedgersExecutionDuration time.Duration
MaxSendTransactionExecutionDuration time.Duration
MaxSimulateTransactionExecutionDuration time.Duration
MaxGetFeeStatsExecutionDuration time.Duration
Expand Down
35 changes: 35 additions & 0 deletions cmd/soroban-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,28 @@ func (cfg *Config) options() Options {
return nil
},
},
{
Name: "max-ledgers-limit",
Usage: "Maximum amount of ledgers allowed in a single getLedgers response",
ConfigKey: &cfg.MaxLedgersLimit,
DefaultValue: uint(200),
},
{
Name: "default-ledgers-limit",
Usage: "Default cap on the amount of ledgers included in a single getLedgers response",
ConfigKey: &cfg.DefaultLedgersLimit,
DefaultValue: uint(50),
Validate: func(_ *Option) error {
if cfg.DefaultLedgersLimit > cfg.MaxLedgersLimit {
return fmt.Errorf(
"default-ledgers-limit (%v) cannot exceed max-ledgers-limit (%v)",
cfg.DefaultLedgersLimit,
cfg.MaxLedgersLimit,
)
}
return nil
},
},
{
Name: "max-healthy-ledger-latency",
Usage: "maximum ledger latency (i.e. time elapsed since the last known ledger closing time) considered to be healthy" +
Expand Down Expand Up @@ -394,6 +416,13 @@ func (cfg *Config) options() Options {
DefaultValue: uint(1000),
Validate: positive,
},
{
TomlKey: strutils.KebabToConstantCase("request-backlog-get-ledgers-queue-limit"),
Usage: "Maximum number of outstanding getLedgers requests",
ConfigKey: &cfg.RequestBacklogGetLedgersQueueLimit,
DefaultValue: uint(1000),
Validate: positive,
},
{
TomlKey: strutils.KebabToConstantCase("request-backlog-send-transaction-queue-limit"),
Usage: "Maximum number of outstanding SendTransaction requests",
Expand Down Expand Up @@ -475,6 +504,12 @@ func (cfg *Config) options() Options {
ConfigKey: &cfg.MaxGetTransactionsExecutionDuration,
DefaultValue: 5 * time.Second,
},
{
TomlKey: strutils.KebabToConstantCase("max-get-ledgers-execution-duration"),
Usage: "The maximum duration of time allowed for processing a getLedgers request. When that time elapses, the rpc server would return -32001 and abort the request's execution",
ConfigKey: &cfg.MaxGetLedgersExecutionDuration,
DefaultValue: 5 * time.Second,
},
{
TomlKey: strutils.KebabToConstantCase("max-send-transaction-execution-duration"),
Usage: "The maximum duration of time allowed for processing a sendTransaction request. When that time elapses, the rpc server would return -32001 and abort the request's execution",
Expand Down
136 changes: 113 additions & 23 deletions cmd/soroban-rpc/internal/db/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package db

import (
"context"
"database/sql"
"fmt"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
Expand All @@ -19,23 +21,74 @@ type StreamLedgerFn func(xdr.LedgerCloseMeta) error

type LedgerReader interface {
GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error)
BatchGetLedgers(ctx context.Context, sequence uint32, batchSize uint) ([]xdr.LedgerCloseMeta, error)
StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error)
StreamLedgerRange(ctx context.Context, startLedger uint32, endLedger uint32, f StreamLedgerFn) error
NewTx(ctx context.Context) (LedgerReaderTx, error)
}

type LedgerReaderTx interface {
GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error)
BatchGetLedgers(ctx context.Context, sequence uint32, batchSize uint) ([]xdr.LedgerCloseMeta, error)
Done() error
}

type LedgerWriter interface {
InsertLedger(ledger xdr.LedgerCloseMeta) error
}

type ReadDB interface {
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
Select(ctx context.Context, dest interface{}, query sq.Sqlizer) error
}

type ledgerReader struct {
db *DB
}

type ledgerReaderTx struct {
tx db.SessionInterface
latestLedgerSeq uint32
latestLedgerCloseTime int64
}

func (l ledgerReaderTx) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) {
if l.latestLedgerSeq != 0 {
return getLedgerRangeWithCache(ctx, l.tx, l.latestLedgerSeq, l.latestLedgerCloseTime)
}
return getLedgerRangeWithoutCache(ctx, l.tx)
}

// BatchGetLedgers fetches ledgers in batches from the db.
func (l ledgerReaderTx) BatchGetLedgers(ctx context.Context, sequence uint32,
batchSize uint,
) ([]xdr.LedgerCloseMeta, error) {
return batchGetLedgers(ctx, l.tx, sequence, batchSize)
}

func (l ledgerReaderTx) Done() error {
return l.tx.Rollback()
}

func NewLedgerReader(db *DB) LedgerReader {
return ledgerReader{db: db}
}

func (r ledgerReader) NewTx(ctx context.Context) (LedgerReaderTx, error) {
r.db.cache.RLock()
defer r.db.cache.RUnlock()
txSession := r.db.Clone()
if err := txSession.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}); err != nil {
return nil, fmt.Errorf("failed to begin read transaction: %w", err)
}
tx := ledgerReaderTx{
tx: txSession,
latestLedgerSeq: r.db.cache.latestLedgerSeq,
latestLedgerCloseTime: r.db.cache.latestLedgerCloseTime,
}
return tx, nil
}

// StreamAllLedgers runs f over all the ledgers in the database (until f errors or signals it's done).
func (r ledgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error {
sql := sq.Select("meta").From(ledgerCloseMetaTableName).OrderBy("sequence asc")
Expand Down Expand Up @@ -103,6 +156,31 @@ func (r ledgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.Ledge
}
}

// BatchGetLedgers fetches ledgers in batches from the db.
func (r ledgerReader) BatchGetLedgers(ctx context.Context, sequence uint32,
batchSize uint,
) ([]xdr.LedgerCloseMeta, error) {
return batchGetLedgers(ctx, r.db, sequence, batchSize)
}

func batchGetLedgers(ctx context.Context, db ReadDB, sequence uint32,
batchSize uint,
) ([]xdr.LedgerCloseMeta, error) {
sql := sq.Select("meta").
From(ledgerCloseMetaTableName).
Where(sq.And{
sq.GtOrEq{"sequence": sequence},
sq.LtOrEq{"sequence": sequence + uint32(batchSize) - 1},
})

results := make([]xdr.LedgerCloseMeta, 0, batchSize)
if err := db.Select(ctx, &results, sql); err != nil {
return nil, err
}

return results, nil
}

// GetLedgerRange pulls the min/max ledger sequence numbers from the meta table.
func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) {
r.db.cache.RLock()
Expand All @@ -112,32 +190,44 @@ func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.Le

// Make use of the cached latest ledger seq and close time to query only the oldest ledger details.
if latestLedgerSeqCache != 0 {
query := sq.Select("meta").
From(ledgerCloseMetaTableName).
Where(
fmt.Sprintf("sequence = (SELECT MIN(sequence) FROM %s)", ledgerCloseMetaTableName),
)
var lcm []xdr.LedgerCloseMeta
if err := r.db.Select(ctx, &lcm, query); err != nil {
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}
return getLedgerRangeWithCache(ctx, r.db, latestLedgerSeqCache, latestLedgerCloseTimeCache)
}
return getLedgerRangeWithoutCache(ctx, r.db)
}

if len(lcm) == 0 {
return ledgerbucketwindow.LedgerRange{}, ErrEmptyDB
}
// getLedgerRangeWithCache uses the latest ledger cache to optimize the query.
// It only needs to look up the first ledger since we have the latest cached.
func getLedgerRangeWithCache(ctx context.Context, db ReadDB,
latestSeq uint32, latestTime int64,
) (ledgerbucketwindow.LedgerRange, error) {
query := sq.Select("meta").
From(ledgerCloseMetaTableName).
Where(
fmt.Sprintf("sequence = (SELECT MIN(sequence) FROM %s)", ledgerCloseMetaTableName),
)
var lcm []xdr.LedgerCloseMeta
if err := db.Select(ctx, &lcm, query); err != nil {
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}

return ledgerbucketwindow.LedgerRange{
FirstLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcm[0].LedgerSequence(),
CloseTime: lcm[0].LedgerCloseTime(),
},
LastLedger: ledgerbucketwindow.LedgerInfo{
Sequence: latestLedgerSeqCache,
CloseTime: latestLedgerCloseTimeCache,
},
}, nil
if len(lcm) == 0 {
return ledgerbucketwindow.LedgerRange{}, ErrEmptyDB
}

return ledgerbucketwindow.LedgerRange{
FirstLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcm[0].LedgerSequence(),
CloseTime: lcm[0].LedgerCloseTime(),
},
LastLedger: ledgerbucketwindow.LedgerInfo{
Sequence: latestSeq,
CloseTime: latestTime,
},
}, nil
}

// getLedgerRangeWithoutCache queries both the first and last ledger when cache isn't available
func getLedgerRangeWithoutCache(ctx context.Context, db ReadDB) (ledgerbucketwindow.LedgerRange, error) {
query := sq.Select("lcm.meta").
From(ledgerCloseMetaTableName + " as lcm").
Where(sq.Or{
Expand All @@ -146,7 +236,7 @@ func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.Le
}).OrderBy("lcm.sequence ASC")

var lcms []xdr.LedgerCloseMeta
if err := r.db.Select(ctx, &lcms, query); err != nil {
if err := db.Select(ctx, &lcms, query); err != nil {
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}

Expand Down
57 changes: 38 additions & 19 deletions cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,25 +183,8 @@ func TestGetLedgerRange_EmptyDB(t *testing.T) {
}

func BenchmarkGetLedgerRange(b *testing.B) {
db := NewTestDB(b)
logger := log.DefaultLogger
writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 100, 1_000_000, passphrase)
write, err := writer.NewTx(context.TODO())
require.NoError(b, err)

// create 100k tx rows
lcms := make([]xdr.LedgerCloseMeta, 0, 100_000)
for i := range cap(lcms) {
lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0))
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(b, ledgerW.InsertLedger(lcm))
require.NoError(b, txW.InsertTransactions(lcm))
}
require.NoError(b, write.Commit(lcms[len(lcms)-1]))
reader := NewLedgerReader(db)
testDB, lcms := setupBenchmarkingDB(b)
reader := NewLedgerReader(testDB)

b.ResetTimer()
for range b.N {
Expand All @@ -212,6 +195,20 @@ func BenchmarkGetLedgerRange(b *testing.B) {
}
}

func BenchmarkBatchGetLedgers(b *testing.B) {
testDB, lcms := setupBenchmarkingDB(b)
reader := NewLedgerReader(testDB)
batchSize := uint(200) // using the current maximum value for getLedgers endpoint

b.ResetTimer()
for range b.N {
ledgers, err := reader.BatchGetLedgers(context.TODO(), 1334, batchSize)
require.NoError(b, err)
assert.Equal(b, lcms[0].LedgerSequence(), ledgers[0].LedgerSequence())
assert.Equal(b, lcms[batchSize-1].LedgerSequence(), ledgers[batchSize-1].LedgerSequence())
}
}

func NewTestDB(tb testing.TB) *DB {
tmp := tb.TempDir()
dbPath := path.Join(tmp, "db.sqlite")
Expand All @@ -222,3 +219,25 @@ func NewTestDB(tb testing.TB) *DB {
})
return db
}

func setupBenchmarkingDB(b *testing.B) (*DB, []xdr.LedgerCloseMeta) {
testDB := NewTestDB(b)
logger := log.DefaultLogger
writer := NewReadWriter(logger, testDB, interfaces.MakeNoOpDeamon(),
100, 1_000_000, passphrase)
write, err := writer.NewTx(context.TODO())
require.NoError(b, err)

lcms := make([]xdr.LedgerCloseMeta, 0, 100_000)
for i := range cap(lcms) {
lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0))
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(b, ledgerW.InsertLedger(lcm))
require.NoError(b, txW.InsertTransactions(lcm))
}
require.NoError(b, write.Commit(lcms[len(lcms)-1]))
return testDB, lcms
}
10 changes: 10 additions & 0 deletions cmd/soroban-rpc/internal/db/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,12 @@ func (m *MockLedgerReader) GetLedger(_ context.Context, sequence uint32) (xdr.Le
return *lcm, true, nil
}

func (m *MockLedgerReader) BatchGetLedgers(_ context.Context, _ uint32,
_ uint,
) ([]xdr.LedgerCloseMeta, error) {
return []xdr.LedgerCloseMeta{}, nil
}

func (m *MockLedgerReader) StreamAllLedgers(_ context.Context, _ StreamLedgerFn) error {
return nil
}
Expand All @@ -109,6 +115,10 @@ func (m *MockLedgerReader) GetLedgerRange(_ context.Context) (ledgerbucketwindow
return m.txn.ledgerRange, nil
}

func (m *MockLedgerReader) NewTx(_ context.Context) (LedgerReaderTx, error) {
return nil, nil
}

var (
_ TransactionReader = &MockTransactionHandler{}
_ TransactionWriter = &MockTransactionHandler{}
Expand Down
Loading