Skip to content

Commit

Permalink
Add getEvents backed by DB (#215)
Browse files Browse the repository at this point in the history
* Ingest events into DB

* Update tests

* Update tests

* Fix tests

* Ignore ingestion when events are empty

* Add getEvents backed by DB

* Refactor getEvents to call fetch events from db

* Remove in-memory events store

* Make use of cursor in pagination and combine filters

* Update scan function logic in order to test

* Move NewTestDb util

* Trim events db and remove eventTypes in SELECT query as event types are not indexed

* Introduce cursor range and add logs for latency

* remove event memory store

* remove event memory store from latest merge

* Fix lint issues part 1

* Fix lint issues part 2

* Fix more lint and add ledger range code for events

* Update eventHandler mock

* Fix 2 major errors in tests: nil pointer reference and unknown hash

* Fix contract Id filter logic and add cursor set to avoid duplicates

* Validate requested start ledger with stored ledger range

* Fix lint error part 4

* Add migration for events table

* Fix lint error part 5

* Fix lint error part 6

* Address review comments pt1

* Make contract id a blob type

* Remove events package and move cursor.go to db package

* Fix lint error part 6

* Fix lint error part 7

* Optimize migration code

* Introduce endLedger and remove Cursor id from schema

* Use LedgerReader to get Ledger Range in events

* Fix lint errors

* Add benchmark for testing various load parameter

* update benchmark

* Comment benchmark events

* Reduce allocs pt1

* Benchmark with 30 million events

* Refactor getEvents to backed only by Events table

* change test db path

* Use Binary encoding for saving events into DB

* Correct number of topics

* reduce events in benchmark so that tests run

* update events schema

* Fix topic count

* Fix fetch query to not stop if null topic

* Fix lint pt1

* Fix trimEvents and lint errors

* update log info

* Fix more lint errors

* Fix more lint errors pt 11

* Fix format in error

* Fix linter error pt 12

* Add nolint for GetEvents as a temp fix.

* Add events table migration (#262)

* Fix migrations - 1

* Make migrations sequential - 1

* Make migrations sequential - 2

* Fix failing unittest - 1

* Fix linting errors - 1

* Fix failing integration test - 1

* Remove %w from Fatal strings

* refactor migrationApplierFactoryF

* Add ledger seq to fatal error string

* Add comments - 1

* Fix - 1

* Optimise migrations - 1

* Optimise migrations - 2

* Optimise migrations - 3

* Fix linting - 1

* Fix linting - 2

* Remove dupicate latest ledger fetch code

* Rollback db in daemon instead of migration

* Remove unused constant

* Remove unused constant - 2

* Add rollback() statement

* Small change

* Abstract transaction and rollback management inside migration code

* Fix failing unittest

* Address review comments

* Store binary of topics instead of string

* Unify min/max topic count in event.go

* Address review comments for event Types and fix unit tests

* cleanup

* Fix linter errors for one last time

---------

Co-authored-by: Aditya Vyas <[email protected]>
  • Loading branch information
psheth9 and aditya1702 authored Aug 20, 2024
1 parent 7b7580c commit a7e6459
Show file tree
Hide file tree
Showing 22 changed files with 1,046 additions and 1,023 deletions.
3 changes: 2 additions & 1 deletion cmd/soroban-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

const (
// OneDayOfLedgers is (roughly) a 24 hour window of ledgers.
OneDayOfLedgers = 17280
OneDayOfLedgers = 17280
SevenDayOfLedgers = OneDayOfLedgers * 7

defaultHTTPEndpoint = "localhost:8000"
)
Expand Down
90 changes: 48 additions & 42 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ingest"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/preflight"
Expand Down Expand Up @@ -199,7 +198,7 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}, metricsRegistry),
}

feewindows, eventStore := daemon.mustInitializeStorage(cfg)
feewindows := daemon.mustInitializeStorage(cfg)

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
Expand All @@ -215,7 +214,6 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
),
EventStore: eventStore,
NetworkPassPhrase: cfg.NetworkPassphrase,
Archive: historyArchive,
LedgerBackend: core,
Expand All @@ -240,12 +238,12 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

jsonRPCHandler := internal.NewJSONRPCHandler(cfg, internal.HandlerParams{
Daemon: daemon,
EventStore: eventStore,
FeeStatWindows: feewindows,
Logger: logger,
LedgerReader: db.NewLedgerReader(dbConn),
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase),
EventReader: db.NewEventReader(logger, dbConn, cfg.NetworkPassphrase),
PreflightGetter: preflightWorkerPool,
})

Expand Down Expand Up @@ -290,56 +288,64 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) {
eventStore := events.NewMemoryStore(
d,
cfg.NetworkPassphrase,
cfg.HistoryRetentionWindow,
)
feewindows := feewindow.NewFeeWindows(
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
cfg.NetworkPassphrase,
d.db,
)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store and applying DB data migrations")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
// TODO: clean up once we remove the in-memory storage.
// (we should only stream over the required range)
if r := dataMigrations.ApplicableRange(); r.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")

// Apply migration for events, transactions and fee stats
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.FirstLedgerSeq,
ledgerSeqRange.LastLedgerSeq,
func(txMeta xdr.LedgerCloseMeta) error {
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
}
return nil
})

if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}

if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
Expand All @@ -353,7 +359,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
}).Info("finished initializing in-memory store and applying DB data migrations")
}

return feewindows, eventStore
return feeWindows
}

func (d *Daemon) Run() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down Expand Up @@ -27,6 +27,13 @@ type Cursor struct {
Event uint32
}

type CursorRange struct {
// Start defines the (inclusive) start of the range.
Start Cursor
// End defines the (exclusive) end of the range.
End Cursor
}

// String returns a string representation of this cursor
func (c Cursor) String() string {
return fmt.Sprintf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down
4 changes: 4 additions & 0 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ func (w writeTx) Commit(ledgerSeq uint32) error {
return err
}

if err := w.eventWriter.trimEvents(ledgerSeq, w.ledgerRetentionWindow); err != nil {
return err
}

_, err := sq.Replace(metaTableName).
Values(latestLedgerSequenceMetaKey, strconv.FormatUint(uint64(ledgerSeq), 10)).
RunWith(w.stmtCache).
Expand Down
Loading

0 comments on commit a7e6459

Please sign in to comment.