Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getEvents backed by DB #215

Merged
merged 67 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 60 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
0a7e05a
Ingest events into DB
psheth9 May 31, 2024
9fa8c2a
Update tests
psheth9 Jun 3, 2024
4cc58b9
Update tests
psheth9 Jun 3, 2024
3940d27
Fix tests
psheth9 Jun 11, 2024
4d0c280
Ignore ingestion when events are empty
psheth9 Jun 11, 2024
4f68a35
Add getEvents backed by DB
psheth9 Jun 13, 2024
9519aa2
Refactor getEvents to call fetch events from db
psheth9 Jun 17, 2024
133ef97
Remove in-memory events store
psheth9 Jun 17, 2024
c52f618
Make use of cursor in pagination and combine filters
psheth9 Jun 18, 2024
2a6435d
Update scan function logic in order to test
psheth9 Jun 18, 2024
5c5b407
Move NewTestDb util
psheth9 Jun 18, 2024
a566923
Trim events db and remove eventTypes in SELECT query as event types a…
psheth9 Jun 18, 2024
c70c9e4
Introduce cursor range and add logs for latency
psheth9 Jun 18, 2024
e86cebc
remove event memory store
psheth9 Jun 20, 2024
4d809d2
Merge branch 'events-db-backend' into refactor-get-events
psheth9 Jun 20, 2024
a3f8987
remove event memory store from latest merge
psheth9 Jun 20, 2024
1a03730
Fix lint issues part 1
psheth9 Jun 20, 2024
6551dc1
Fix lint issues part 2
psheth9 Jun 21, 2024
7ac1ece
Fix more lint and add ledger range code for events
psheth9 Jun 21, 2024
768e657
Update eventHandler mock
psheth9 Jun 21, 2024
d3c913c
Fix 2 major errors in tests: nil pointer reference and unknown hash
psheth9 Jun 24, 2024
4326883
Fix contract Id filter logic and add cursor set to avoid duplicates
psheth9 Jun 24, 2024
a4f3d6a
Validate requested start ledger with stored ledger range
psheth9 Jun 24, 2024
2c35f60
Fix lint error part 4
psheth9 Jun 25, 2024
9783797
Add migration for events table
psheth9 Jun 25, 2024
73ca500
Fix lint error part 5
psheth9 Jun 25, 2024
3b9ce34
Fix lint error part 6
psheth9 Jun 25, 2024
814b6ae
Address review comments pt1
psheth9 Jun 25, 2024
f3c9461
Make contract id a blob type
psheth9 Jun 26, 2024
5709a8f
Remove events package and move cursor.go to db package
psheth9 Jun 26, 2024
a28b57f
Fix lint error part 6
psheth9 Jun 26, 2024
952c0e3
Fix lint error part 7
psheth9 Jun 26, 2024
263e6f7
Optimize migration code
psheth9 Jun 28, 2024
3abf97d
Merge branch 'events-db-backend' into refactor-get-events
psheth9 Jun 28, 2024
88f3dfb
Introduce endLedger and remove Cursor id from schema
psheth9 Jul 2, 2024
1bab126
Merge branch 'events-db-backend' into refactor-get-events
psheth9 Jul 2, 2024
eceb767
Use LedgerReader to get Ledger Range in events
psheth9 Jul 2, 2024
115c32a
Fix lint errors
psheth9 Jul 3, 2024
289aa93
Add benchmark for testing various load parameter
psheth9 Jul 16, 2024
50ca152
update benchmark
psheth9 Jul 17, 2024
7b9934b
Comment benchmark events
psheth9 Jul 23, 2024
c849a1e
Merge branch 'events-db-backend' into refactor-get-events
psheth9 Jul 25, 2024
3506b41
Reduce allocs pt1
psheth9 Jul 29, 2024
c431913
Benchmark with 30 million events
psheth9 Aug 2, 2024
818cd20
Refactor getEvents to backed only by Events table
psheth9 Aug 7, 2024
2c7ffa4
change test db path
psheth9 Aug 7, 2024
b269134
Use Binary encoding for saving events into DB
psheth9 Aug 7, 2024
f5ea54e
Correct number of topics
psheth9 Aug 7, 2024
12f7790
reduce events in benchmark so that tests run
psheth9 Aug 7, 2024
b9e5f42
update events schema
psheth9 Aug 7, 2024
215e6c4
Fix topic count
psheth9 Aug 8, 2024
a6d35ae
Fix fetch query to not stop if null topic
psheth9 Aug 8, 2024
905eaf0
Fix lint pt1
psheth9 Aug 8, 2024
4b8726b
Fix trimEvents and lint errors
psheth9 Aug 9, 2024
8b28d65
update log info
psheth9 Aug 9, 2024
051ab4f
Fix more lint errors
psheth9 Aug 12, 2024
f284c9e
Fix more lint errors pt 11
psheth9 Aug 12, 2024
9c7e870
Fix format in error
psheth9 Aug 13, 2024
52bdf24
Fix linter error pt 12
psheth9 Aug 13, 2024
99d5411
Add nolint for GetEvents as a temp fix.
psheth9 Aug 13, 2024
7466e54
Add events table migration (#262)
aditya1702 Aug 16, 2024
8e33b1e
Address review comments
psheth9 Aug 19, 2024
7d47427
Store binary of topics instead of string
psheth9 Aug 20, 2024
435b7e0
Unify min/max topic count in event.go
psheth9 Aug 20, 2024
4f7b675
Address review comments for event Types and fix unit tests
psheth9 Aug 20, 2024
1fbd381
cleanup
psheth9 Aug 20, 2024
187e9d6
Fix linter errors for one last time
psheth9 Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/soroban-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

const (
// OneDayOfLedgers is (roughly) a 24 hour window of ledgers.
OneDayOfLedgers = 17280
OneDayOfLedgers = 17280
SevenDayOfLedgers = OneDayOfLedgers * 7

defaultHTTPEndpoint = "localhost:8000"
)
Expand Down
83 changes: 43 additions & 40 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ingest"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/preflight"
Expand Down Expand Up @@ -199,7 +198,7 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}, metricsRegistry),
}

feewindows, eventStore := daemon.mustInitializeStorage(cfg)
feewindows := daemon.mustInitializeStorage(cfg)

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
Expand All @@ -215,7 +214,6 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
),
EventStore: eventStore,
NetworkPassPhrase: cfg.NetworkPassphrase,
Archive: historyArchive,
LedgerBackend: core,
Expand All @@ -240,12 +238,12 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

jsonRPCHandler := internal.NewJSONRPCHandler(cfg, internal.HandlerParams{
Daemon: daemon,
EventStore: eventStore,
FeeStatWindows: feewindows,
Logger: logger,
LedgerReader: db.NewLedgerReader(dbConn),
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase),
EventReader: db.NewEventReader(logger, dbConn, cfg.NetworkPassphrase),
PreflightGetter: preflightWorkerPool,
})

Expand Down Expand Up @@ -290,12 +288,7 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) {
eventStore := events.NewMemoryStore(
d,
cfg.NetworkPassphrase,
cfg.HistoryRetentionWindow,
)
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
feewindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
Expand All @@ -310,36 +303,46 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store and applying DB data migrations")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
// TODO: clean up once we remove the in-memory storage.
// (we should only stream over the required range)
if r := dataMigrations.ApplicableRange(); r.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")

// Merge migrations range and fee stats range to get the applicable range
latestLedger, err := db.NewLedgerEntryReader(d.db).GetLatestLedgerSequence(readTxMetaCtx)
if err != nil {
d.logger.WithError(err).Fatal("failed to get latest ledger sequence: %w", err)
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
ledgerSeqRange := &db.LedgerSeqRange{FirstLedgerSeq: latestLedger - maxFeeRetentionWindow, LastLedgerSeq: latestLedger}
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
applicableRange := dataMigrations.ApplicableRange()
ledgerSeqRange = ledgerSeqRange.Merge(applicableRange)

err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.FirstLedgerSeq,
ledgerSeqRange.LastLedgerSeq,
func(txmeta xdr.LedgerCloseMeta) error {
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
}
return nil
})

if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}

if applicableRange.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")
}
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
Expand All @@ -353,7 +356,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
}).Info("finished initializing in-memory store and applying DB data migrations")
}

return feewindows, eventStore
return feewindows
}

func (d *Daemon) Run() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down Expand Up @@ -27,6 +27,13 @@ type Cursor struct {
Event uint32
}

type CursorRange struct {
// Start defines the (inclusive) start of the range.
Start Cursor
// End defines the (exclusive) end of the range.
End Cursor
Shaptic marked this conversation as resolved.
Show resolved Hide resolved
}

// String returns a string representation of this cursor
func (c Cursor) String() string {
return fmt.Sprintf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down
4 changes: 4 additions & 0 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -305,6 +305,10 @@ func (w writeTx) Commit(ledgerSeq uint32) error {
return err
}

if err := w.eventWriter.trimEvents(ledgerSeq, w.ledgerRetentionWindow); err != nil {
return err
}

_, err := sq.Replace(metaTableName).
Values(latestLedgerSequenceMetaKey, strconv.FormatUint(uint64(ledgerSeq), 10)).
RunWith(w.stmtCache).
Expand Down
Loading
Loading