Skip to content

Commit

Permalink
Optimise db performance of GetLedgerRange (#256)
Browse files Browse the repository at this point in the history
* Use GetLedgerRange to get the latest ledger

* Remove latestLedger key from meta table

* Remove latestLedger key from meta table - 2

* Fix failing unittest - 1

* Remove cache check condition

* Fix failing unittest - 2

* Uncomment failing ledger entry test

* Remove db.SessionInterface param

* Fix failing unittest

* Cache ledger range - 1

* Cache ledger range - 2

* Cache ledger range - 3

* Cache ledger range - 4

* Fix failing test

* Fix failing test - 2

* Fix linting - 1

* Fix linting - 2

* Reduce the time further

* Remove creating transaction for GetLedgerRange

* Remove creating transaction for GetLedgerRange - 2

* Remove creating transaction for GetLedgerRange - 3

* Fix failing unittest - 1

* Fix failing tests - 2

* Update cmd/soroban-rpc/internal/db/ledger.go

Co-authored-by: tamirms <[email protected]>

* Add check for empty result in ledger range

* Add a cache check on GetLatestLedgerSequence call

* Move cache read to DB function

* Move cache read to DB function - 2

* Revert "Move cache read to DB function - 2"

This reverts commit f7ebe3e.

* Lock cache before creating tx

* Fix failing test

* Add migration

* Fix failing test

* Refactor NewTx and NewCachedTx

* Refactor NewTx and NewCachedTx - 2

* Refactor NewTx and NewCachedTx - 3

* Refactor NewTx and NewCachedTx - 4

---------

Co-authored-by: tamirms <[email protected]>
  • Loading branch information
aditya1702 and tamirms authored Sep 5, 2024
1 parent e110732 commit 918c978
Show file tree
Hide file tree
Showing 18 changed files with 205 additions and 152 deletions.
52 changes: 26 additions & 26 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,7 @@ var sqlMigrations embed.FS
var ErrEmptyDB = errors.New("DB is empty")

const (
metaTableName = "metadata"
latestLedgerSequenceMetaKey = "LatestLedgerSequence"
metaTableName = "metadata"
)

type ReadWriter interface {
Expand All @@ -42,13 +41,14 @@ type WriteTx interface {
LedgerEntryWriter() LedgerEntryWriter
LedgerWriter() LedgerWriter

Commit(ledgerSeq uint32) error
Commit(ledgerCloseMeta xdr.LedgerCloseMeta) error
Rollback() error
}

type dbCache struct {
latestLedgerSeq uint32
ledgerEntries transactionalCache // Just like the DB: compress-encoded ledger key -> ledger entry XDR
latestLedgerSeq uint32
latestLedgerCloseTime int64
ledgerEntries transactionalCache // Just like the DB: compress-encoded ledger key -> ledger entry XDR
sync.RWMutex
}

Expand Down Expand Up @@ -129,33 +129,37 @@ func getMetaValue(ctx context.Context, q db.SessionInterface, key string) (strin
case 1:
// expected length on an initialized DB
default:
return "", fmt.Errorf("multiple entries (%d) for key %q in table %q", len(results), latestLedgerSequenceMetaKey, metaTableName)
return "", fmt.Errorf("multiple entries (%d) for key %q in table %q", len(results), key, metaTableName)
}
return results[0], nil
}

func getLatestLedgerSequence(ctx context.Context, q db.SessionInterface, cache *dbCache) (uint32, error) {
latestLedgerStr, err := getMetaValue(ctx, q, latestLedgerSequenceMetaKey)
if err != nil {
return 0, err
func getLatestLedgerSequence(ctx context.Context, ledgerReader LedgerReader, cache *dbCache) (uint32, error) {
cache.RLock()
latestLedgerSeqCache := cache.latestLedgerSeq
cache.RUnlock()

if latestLedgerSeqCache != 0 {
return latestLedgerSeqCache, nil
}
latestLedger, err := strconv.ParseUint(latestLedgerStr, 10, 32)

ledgerRange, err := ledgerReader.GetLedgerRange(ctx)
if err != nil {
return 0, err
}
result := uint32(latestLedger)

// Add missing ledger sequence to the top cache.
// Add missing ledger sequence and close time to the top cache.
// Otherwise, the write-through cache won't get updated until the first ingestion commit
cache.Lock()
if cache.latestLedgerSeq == 0 {
// Only update the cache if the value is missing (0), otherwise
// we may end up overwriting the entry with an older version
cache.latestLedgerSeq = result
cache.latestLedgerSeq = ledgerRange.LastLedger.Sequence
cache.latestLedgerCloseTime = ledgerRange.LastLedger.CloseTime
}
cache.Unlock()

return result, nil
return ledgerRange.LastLedger.Sequence, nil
}

type ReadWriterMetrics struct {
Expand Down Expand Up @@ -216,7 +220,7 @@ func NewReadWriter(
}

func (rw *readWriter) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, rw.db, rw.db.cache)
return getLatestLedgerSequence(ctx, NewLedgerReader(rw.db), rw.db.cache)
}

func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
Expand Down Expand Up @@ -293,7 +297,10 @@ func (w writeTx) EventWriter() EventWriter {
return &w.eventWriter
}

func (w writeTx) Commit(ledgerSeq uint32) error {
func (w writeTx) Commit(ledgerCloseMeta xdr.LedgerCloseMeta) error {
ledgerSeq := ledgerCloseMeta.LedgerSequence()
ledgerCloseTime := ledgerCloseMeta.LedgerCloseTime()

if err := w.ledgerEntryWriter.flush(); err != nil {
return err
}
Expand All @@ -309,24 +316,17 @@ func (w writeTx) Commit(ledgerSeq uint32) error {
return err
}

_, err := sq.Replace(metaTableName).
Values(latestLedgerSequenceMetaKey, strconv.FormatUint(uint64(ledgerSeq), 10)).
RunWith(w.stmtCache).
Exec()
if err != nil {
return err
}

// We need to make the cache update atomic with the transaction commit.
// Otherwise, the cache can be made inconsistent if a write transaction finishes
// in between, updating the cache in the wrong order.
commitAndUpdateCache := func() error {
w.globalCache.Lock()
defer w.globalCache.Unlock()
if err = w.tx.Commit(); err != nil {
if err := w.tx.Commit(); err != nil {
return err
}
w.globalCache.latestLedgerSeq = ledgerSeq
w.globalCache.latestLedgerCloseTime = ledgerCloseTime
w.ledgerEntryWriter.ledgerEntryCacheWriteTx.commit()
return nil
}
Expand Down
36 changes: 34 additions & 2 deletions cmd/soroban-rpc/internal/db/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,39 @@ func (r ledgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.Ledge

// GetLedgerRange pulls the min/max ledger sequence numbers from the meta table.
func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) {
r.db.cache.RLock()
latestLedgerSeqCache := r.db.cache.latestLedgerSeq
latestLedgerCloseTimeCache := r.db.cache.latestLedgerCloseTime
r.db.cache.RUnlock()

// Make use of the cached latest ledger seq and close time to query only the oldest ledger details.
if latestLedgerSeqCache != 0 {
query := sq.Select("meta").
From(ledgerCloseMetaTableName).
Where(
fmt.Sprintf("sequence = (SELECT MIN(sequence) FROM %s)", ledgerCloseMetaTableName),
)
var lcm []xdr.LedgerCloseMeta
if err := r.db.Select(ctx, &lcm, query); err != nil {
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}

if len(lcm) == 0 {
return ledgerbucketwindow.LedgerRange{}, ErrEmptyDB
}

return ledgerbucketwindow.LedgerRange{
FirstLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcm[0].LedgerSequence(),
CloseTime: lcm[0].LedgerCloseTime(),
},
LastLedger: ledgerbucketwindow.LedgerInfo{
Sequence: latestLedgerSeqCache,
CloseTime: latestLedgerCloseTimeCache,
},
}, nil
}

query := sq.Select("lcm.meta").
From(ledgerCloseMetaTableName + " as lcm").
Where(sq.Or{
Expand All @@ -117,9 +150,8 @@ func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.Le
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}

// Empty DB
if len(lcms) == 0 {
return ledgerbucketwindow.LedgerRange{}, nil
return ledgerbucketwindow.LedgerRange{}, ErrEmptyDB
}

return ledgerbucketwindow.LedgerRange{
Expand Down
24 changes: 14 additions & 10 deletions cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,8 +81,10 @@ func TestLedgers(t *testing.T) {
ledgerSequence := uint32(i)
tx, err := NewReadWriter(logger, db, daemon, 150, 15, passphrase).NewTx(context.Background())
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
require.NoError(t, tx.Commit(ledgerSequence))

ledgerCloseMeta := createLedger(ledgerSequence)
require.NoError(t, tx.LedgerWriter().InsertLedger(ledgerCloseMeta))
require.NoError(t, tx.Commit(ledgerCloseMeta))
// rolling back after a commit is a no-op
require.NoError(t, tx.Rollback())
}
Expand All @@ -92,16 +94,18 @@ func TestLedgers(t *testing.T) {
ledgerSequence := uint32(11)
tx, err := NewReadWriter(logger, db, daemon, 150, 15, passphrase).NewTx(context.Background())
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
require.NoError(t, tx.Commit(ledgerSequence))
ledgerCloseMeta := createLedger(ledgerSequence)
require.NoError(t, tx.LedgerWriter().InsertLedger(ledgerCloseMeta))
require.NoError(t, tx.Commit(ledgerCloseMeta))

assertLedgerRange(t, reader, 1, 11)

ledgerSequence = uint32(12)
tx, err = NewReadWriter(logger, db, daemon, 150, 5, passphrase).NewTx(context.Background())
require.NoError(t, err)
require.NoError(t, tx.LedgerWriter().InsertLedger(createLedger(ledgerSequence)))
require.NoError(t, tx.Commit(ledgerSequence))
ledgerCloseMeta = createLedger(ledgerSequence)
require.NoError(t, tx.LedgerWriter().InsertLedger(ledgerCloseMeta))
require.NoError(t, tx.Commit(ledgerCloseMeta))

assertLedgerRange(t, reader, 8, 12)
}
Expand All @@ -126,7 +130,7 @@ func TestGetLedgerRange_NonEmptyDB(t *testing.T) {
require.NoError(t, ledgerW.InsertLedger(lcm), "ingestion failed for ledger %+v", lcm.V1)
require.NoError(t, txW.InsertTransactions(lcm), "ingestion failed for ledger %+v", lcm.V1)
}
require.NoError(t, write.Commit(lcms[len(lcms)-1].LedgerSequence()))
require.NoError(t, write.Commit(lcms[len(lcms)-1]))

reader := NewLedgerReader(db)
ledgerRange, err := reader.GetLedgerRange(ctx)
Expand Down Expand Up @@ -154,7 +158,7 @@ func TestGetLedgerRange_SingleDBRow(t *testing.T) {
require.NoError(t, ledgerW.InsertLedger(lcm), "ingestion failed for ledger %+v", lcm.V1)
require.NoError(t, txW.InsertTransactions(lcm), "ingestion failed for ledger %+v", lcm.V1)
}
require.NoError(t, write.Commit(lcms[len(lcms)-1].LedgerSequence()))
require.NoError(t, write.Commit(lcms[len(lcms)-1]))

reader := NewLedgerReader(db)
ledgerRange, err := reader.GetLedgerRange(ctx)
Expand All @@ -171,7 +175,7 @@ func TestGetLedgerRange_EmptyDB(t *testing.T) {

reader := NewLedgerReader(db)
ledgerRange, err := reader.GetLedgerRange(ctx)
require.NoError(t, err)
assert.Equal(t, ErrEmptyDB, err)
assert.Equal(t, uint32(0), ledgerRange.FirstLedger.Sequence)
assert.Equal(t, int64(0), ledgerRange.FirstLedger.CloseTime)
assert.Equal(t, uint32(0), ledgerRange.LastLedger.Sequence)
Expand All @@ -196,7 +200,7 @@ func BenchmarkGetLedgerRange(b *testing.B) {
require.NoError(b, ledgerW.InsertLedger(lcm))
require.NoError(b, txW.InsertTransactions(lcm))
}
require.NoError(b, write.Commit(lcms[len(lcms)-1].LedgerSequence()))
require.NoError(b, write.Commit(lcms[len(lcms)-1]))
reader := NewLedgerReader(db)

b.ResetTimer()
Expand Down
59 changes: 25 additions & 34 deletions cmd/soroban-rpc/internal/db/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ import (
"fmt"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"
)
Expand All @@ -19,8 +18,7 @@ const (

type LedgerEntryReader interface {
GetLatestLedgerSequence(ctx context.Context) (uint32, error)
NewTx(ctx context.Context) (LedgerEntryReadTx, error)
NewCachedTx(ctx context.Context) (LedgerEntryReadTx, error)
NewTx(ctx context.Context, cacheTx bool) (LedgerEntryReadTx, error)
}

type LedgerKeyAndEntry struct {
Expand Down Expand Up @@ -140,6 +138,7 @@ type ledgerEntryReadTx struct {
stmtCache *sq.StmtCache
latestLedgerSeqCache uint32
ledgerEntryCacheReadTx *transactionalCacheReadTx
ledgerReader LedgerReader
tx db.SessionInterface
buffer *xdr.EncodingBuffer
}
Expand All @@ -148,7 +147,7 @@ func (l *ledgerEntryReadTx) GetLatestLedgerSequence() (uint32, error) {
if l.latestLedgerSeqCache != 0 {
return l.latestLedgerSeqCache, nil
}
latestLedgerSeq, err := getLatestLedgerSequence(context.Background(), l.tx, l.globalCache)
latestLedgerSeq, err := getLatestLedgerSequence(context.Background(), l.ledgerReader, l.globalCache)
if err == nil {
l.latestLedgerSeqCache = latestLedgerSeq
}
Expand Down Expand Up @@ -341,47 +340,39 @@ func NewLedgerEntryReader(db *DB) LedgerEntryReader {
}

func (r ledgerEntryReader) GetLatestLedgerSequence(ctx context.Context) (uint32, error) {
return getLatestLedgerSequence(ctx, r.db, r.db.cache)
return getLatestLedgerSequence(ctx, NewLedgerReader(r.db), r.db.cache)
}

// NewCachedTx() caches all accessed ledger entries and select statements. If many ledger entries are accessed, it will grow without bounds.
func (r ledgerEntryReader) NewCachedTx(ctx context.Context) (LedgerEntryReadTx, error) {
txSession := r.db.Clone()
// We need to copy the cached ledger entries locally when we start the transaction
// since otherwise we would break the consistency between the transaction and the cache.

// We need to make the parent cache access atomic with the read transaction creation.
// Otherwise, the cache can be made inconsistent if a write transaction finishes
// in between, updating the cache.
// NewTx creates a new ledger entry read transaction. When cacheTx is set to True, it will cache all accessed
// ledger entries and select statements. If many ledger entries are accessed, it will grow without bounds.
func (r ledgerEntryReader) NewTx(ctx context.Context, cacheTx bool) (LedgerEntryReadTx, error) {
r.db.cache.RLock()
defer r.db.cache.RUnlock()
if err := txSession.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}); err != nil {
return nil, err
}
cacheReadTx := r.db.cache.ledgerEntries.newReadTx()
return &ledgerEntryReadTx{
globalCache: r.db.cache,
stmtCache: sq.NewStmtCache(txSession.GetTx()),
latestLedgerSeqCache: r.db.cache.latestLedgerSeq,
ledgerEntryCacheReadTx: &cacheReadTx,
tx: txSession,
buffer: xdr.NewEncodingBuffer(),
}, nil
}

func (r ledgerEntryReader) NewTx(ctx context.Context) (LedgerEntryReadTx, error) {
txSession := r.db.Clone()
if err := txSession.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}); err != nil {
return nil, err
return nil, fmt.Errorf("failed to begin read transaction: %w", err)
}
r.db.cache.RLock()
defer r.db.cache.RUnlock()
return &ledgerEntryReadTx{
tx := &ledgerEntryReadTx{
globalCache: r.db.cache,
latestLedgerSeqCache: r.db.cache.latestLedgerSeq,
tx: txSession,
ledgerReader: NewLedgerReader(r.db),
buffer: xdr.NewEncodingBuffer(),
}, nil
}

// We need to copy the cached ledger entries locally when we start the transaction
// since otherwise we would break the consistency between the transaction and the cache.

// We need to make the parent cache access atomic with the read transaction creation.
// Otherwise, the cache can be made inconsistent if a write transaction finishes
// in between, updating the cache.
if cacheTx {
tx.stmtCache = sq.NewStmtCache(txSession.GetTx())
cacheReadTx := r.db.cache.ledgerEntries.newReadTx()
tx.ledgerEntryCacheReadTx = &cacheReadTx
}

return tx, nil
}

func encodeLedgerKey(buffer *xdr.EncodingBuffer, key xdr.LedgerKey) (string, error) {
Expand Down
Loading

0 comments on commit 918c978

Please sign in to comment.