Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add getEvents backed by DB #215

Merged
merged 67 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
67 commits
Select commit Hold shift + click to select a range
0a7e05a
Ingest events into DB
psheth9 May 31, 2024
9fa8c2a
Update tests
psheth9 Jun 3, 2024
4cc58b9
Update tests
psheth9 Jun 3, 2024
3940d27
Fix tests
psheth9 Jun 11, 2024
4d0c280
Ignore ingestion when events are empty
psheth9 Jun 11, 2024
4f68a35
Add getEvents backed by DB
psheth9 Jun 13, 2024
9519aa2
Refactor getEvents to call fetch events from db
psheth9 Jun 17, 2024
133ef97
Remove in-memory events store
psheth9 Jun 17, 2024
c52f618
Make use of cursor in pagination and combine filters
psheth9 Jun 18, 2024
2a6435d
Update scan function logic in order to test
psheth9 Jun 18, 2024
5c5b407
Move NewTestDb util
psheth9 Jun 18, 2024
a566923
Trim events db and remove eventTypes in SELECT query as event types a…
psheth9 Jun 18, 2024
c70c9e4
Introduce cursor range and add logs for latency
psheth9 Jun 18, 2024
e86cebc
remove event memory store
psheth9 Jun 20, 2024
4d809d2
Merge branch 'events-db-backend' into refactor-get-events
psheth9 Jun 20, 2024
a3f8987
remove event memory store from latest merge
psheth9 Jun 20, 2024
1a03730
Fix lint issues part 1
psheth9 Jun 20, 2024
6551dc1
Fix lint issues part 2
psheth9 Jun 21, 2024
7ac1ece
Fix more lint and add ledger range code for events
psheth9 Jun 21, 2024
768e657
Update eventHandler mock
psheth9 Jun 21, 2024
d3c913c
Fix 2 major errors in tests: nil pointer reference and unknown hash
psheth9 Jun 24, 2024
4326883
Fix contract Id filter logic and add cursor set to avoid duplicates
psheth9 Jun 24, 2024
a4f3d6a
Validate requested start ledger with stored ledger range
psheth9 Jun 24, 2024
2c35f60
Fix lint error part 4
psheth9 Jun 25, 2024
9783797
Add migration for events table
psheth9 Jun 25, 2024
73ca500
Fix lint error part 5
psheth9 Jun 25, 2024
3b9ce34
Fix lint error part 6
psheth9 Jun 25, 2024
814b6ae
Address review comments pt1
psheth9 Jun 25, 2024
f3c9461
Make contract id a blob type
psheth9 Jun 26, 2024
5709a8f
Remove events package and move cursor.go to db package
psheth9 Jun 26, 2024
a28b57f
Fix lint error part 6
psheth9 Jun 26, 2024
952c0e3
Fix lint error part 7
psheth9 Jun 26, 2024
263e6f7
Optimize migration code
psheth9 Jun 28, 2024
3abf97d
Merge branch 'events-db-backend' into refactor-get-events
psheth9 Jun 28, 2024
88f3dfb
Introduce endLedger and remove Cursor id from schema
psheth9 Jul 2, 2024
1bab126
Merge branch 'events-db-backend' into refactor-get-events
psheth9 Jul 2, 2024
eceb767
Use LedgerReader to get Ledger Range in events
psheth9 Jul 2, 2024
115c32a
Fix lint errors
psheth9 Jul 3, 2024
289aa93
Add benchmark for testing various load parameter
psheth9 Jul 16, 2024
50ca152
update benchmark
psheth9 Jul 17, 2024
7b9934b
Comment benchmark events
psheth9 Jul 23, 2024
c849a1e
Merge branch 'events-db-backend' into refactor-get-events
psheth9 Jul 25, 2024
3506b41
Reduce allocs pt1
psheth9 Jul 29, 2024
c431913
Benchmark with 30 million events
psheth9 Aug 2, 2024
818cd20
Refactor getEvents to backed only by Events table
psheth9 Aug 7, 2024
2c7ffa4
change test db path
psheth9 Aug 7, 2024
b269134
Use Binary encoding for saving events into DB
psheth9 Aug 7, 2024
f5ea54e
Correct number of topics
psheth9 Aug 7, 2024
12f7790
reduce events in benchmark so that tests run
psheth9 Aug 7, 2024
b9e5f42
update events schema
psheth9 Aug 7, 2024
215e6c4
Fix topic count
psheth9 Aug 8, 2024
a6d35ae
Fix fetch query to not stop if null topic
psheth9 Aug 8, 2024
905eaf0
Fix lint pt1
psheth9 Aug 8, 2024
4b8726b
Fix trimEvents and lint errors
psheth9 Aug 9, 2024
8b28d65
update log info
psheth9 Aug 9, 2024
051ab4f
Fix more lint errors
psheth9 Aug 12, 2024
f284c9e
Fix more lint errors pt 11
psheth9 Aug 12, 2024
9c7e870
Fix format in error
psheth9 Aug 13, 2024
52bdf24
Fix linter error pt 12
psheth9 Aug 13, 2024
99d5411
Add nolint for GetEvents as a temp fix.
psheth9 Aug 13, 2024
7466e54
Add events table migration (#262)
aditya1702 Aug 16, 2024
8e33b1e
Address review comments
psheth9 Aug 19, 2024
7d47427
Store binary of topics instead of string
psheth9 Aug 20, 2024
435b7e0
Unify min/max topic count in event.go
psheth9 Aug 20, 2024
4f7b675
Address review comments for event Types and fix unit tests
psheth9 Aug 20, 2024
1fbd381
cleanup
psheth9 Aug 20, 2024
187e9d6
Fix linter errors for one last time
psheth9 Aug 20, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 7 additions & 15 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ingest"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
Expand Down Expand Up @@ -201,13 +200,13 @@
}, metricsRegistry),
}

feewindows, eventStore := daemon.mustInitializeStorage(cfg)
feewindows := daemon.mustInitializeStorage(cfg)

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
}

// Take the larger of (event retention, tx retention) and then the smaller
// Take the largest of (event retention, tx retention) and then the smallest
// of (tx retention, default event retention) if event retention wasn't
// specified, for some reason...?
maxRetentionWindow := ordered.Max(cfg.EventLedgerRetentionWindow, cfg.TransactionLedgerRetentionWindow)
Expand All @@ -226,7 +225,6 @@
maxRetentionWindow,
cfg.NetworkPassphrase,
),
EventStore: eventStore,
NetworkPassPhrase: cfg.NetworkPassphrase,
Archive: historyArchive,
LedgerBackend: core,
Expand All @@ -251,12 +249,12 @@

jsonRPCHandler := internal.NewJSONRPCHandler(cfg, internal.HandlerParams{
Daemon: daemon,
EventStore: eventStore,
FeeStatWindows: feewindows,
Logger: logger,
LedgerReader: db.NewLedgerReader(dbConn),
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase),
EventReader: db.NewEventReader(logger, dbConn, cfg.NetworkPassphrase),
PreflightGetter: preflightWorkerPool,
})

Expand Down Expand Up @@ -301,12 +299,8 @@
}

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) {
eventStore := events.NewMemoryStore(
d,
cfg.NetworkPassphrase,
cfg.EventLedgerRetentionWindow,
)
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {

Check failure on line 302 in cmd/soroban-rpc/internal/daemon/daemon.go

View workflow job for this annotation

GitHub Actions / golangci-lint

unnecessary leading newline (whitespace)

feewindows := feewindow.NewFeeWindows(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
Expand All @@ -332,9 +326,7 @@
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize event memory store")
}

if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
Expand All @@ -360,7 +352,7 @@
}).Info("finished initializing in-memory store")
}

return feewindows, eventStore
return feewindows
}

func (d *Daemon) Run() {
Expand Down
24 changes: 21 additions & 3 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,16 @@ import (
"embed"
"errors"
"fmt"
"strconv"
"sync"

sq "github.com/Masterminds/squirrel"
_ "github.com/mattn/go-sqlite3"
"github.com/prometheus/client_golang/prometheus"
migrate "github.com/rubenv/sql-migrate"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"path"
"strconv"
"sync"
"testing"
psheth9 marked this conversation as resolved.
Show resolved Hide resolved

"github.com/stellar/go/support/db"
"github.com/stellar/go/support/log"
Expand Down Expand Up @@ -305,6 +308,10 @@ func (w writeTx) Commit(ledgerSeq uint32) error {
return err
}

if err := w.eventWriter.trimEvents(ledgerSeq, w.ledgerRetentionWindow); err != nil {
return err
}

_, err := sq.Replace(metaTableName).
Values(latestLedgerSequenceMetaKey, strconv.FormatUint(uint64(ledgerSeq), 10)).
RunWith(w.stmtCache).
Expand Down Expand Up @@ -366,3 +373,14 @@ func runSQLMigrations(db *sql.DB, dialect string) error {
_, err := migrate.ExecMax(db, dialect, m, migrate.Up, 0)
return err
}

func NewTestDB(tb testing.TB) *DB {
tmp := tb.TempDir()
dbPath := path.Join(tmp, "db.sqlite")
db, err := OpenSQLiteDB(dbPath)
require.NoError(tb, err)
tb.Cleanup(func() {
assert.NoError(tb, db.Close())
})
return db
}
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
120 changes: 113 additions & 7 deletions cmd/soroban-rpc/internal/db/event.go
Original file line number Diff line number Diff line change
@@ -1,25 +1,31 @@
package db

import (
"context"
"fmt"
"io"
"time"

sq "github.com/Masterminds/squirrel"
"github.com/prometheus/client_golang/prometheus"
"github.com/stellar/go/ingest"
"github.com/stellar/go/support/db"
"github.com/stellar/go/support/errors"
"github.com/stellar/go/support/log"
"github.com/stellar/go/xdr"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events"
"io"
)

const eventTableName = "events"

// EventWriter is used during ingestion of events from LCM to DB
type EventWriter interface {
InsertEvents(lcm xdr.LedgerCloseMeta) error
}

// EventReader has all the public methods to fetch events from DB
type EventReader interface {
GetEvents(lcm xdr.LedgerCloseMeta) error
GetEvents(ctx context.Context, cursorRange events.CursorRange, contractIDs []string, f ScanFunction) error
// GetLedgerRange(ctx context.Context) error
}

type eventHandler struct {
Expand All @@ -30,21 +36,25 @@ type eventHandler struct {
ingestMetric, countMetric prometheus.Observer
}

func NewEventReader(log *log.Entry, db db.SessionInterface, passphrase string) EventReader {
return &eventHandler{log: log, db: db, passphrase: passphrase}
}

func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error {
txCount := lcm.CountTransactions()

if eventHandler.stmtCache == nil {
return errors.New("EventWriter incorrectly initialized without stmtCache")
return fmt.Errorf("EventWriter incorrectly initialized without stmtCache")
} else if txCount == 0 {
return nil
}

var txReader *ingest.LedgerTransactionReader
txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(eventHandler.passphrase, lcm)
if err != nil {
return errors.Wrapf(err,
"failed to open transaction reader for ledger %d",
lcm.LedgerSequence())
return fmt.Errorf(
"failed to open transaction reader for ledger %d: %w ",
lcm.LedgerSequence(), err)
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
}
defer func() {
closeErr := txReader.Close()
Expand Down Expand Up @@ -97,3 +107,99 @@ func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error {

return nil
}

type ScanFunction func(xdr.DiagnosticEvent, events.Cursor, int64, *xdr.Hash) bool
psheth9 marked this conversation as resolved.
Show resolved Hide resolved

// trimEvents removes all Events which fall outside the ledger retention window.
func (eventHandler *eventHandler) trimEvents(latestLedgerSeq uint32, retentionWindow uint32) error {
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
if latestLedgerSeq+1 <= retentionWindow {
return nil
}

cutoff := latestLedgerSeq + 1 - retentionWindow
_, err := sq.StatementBuilder.
RunWith(eventHandler.stmtCache).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe dumb question: why would we run deletion with a cache?

Delete(eventTableName).
Where(sq.Lt{"ledger_sequence": cutoff}).
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
Exec()
return err
}

// GetEvents applies f on all the events occurring in the given range with specified contract IDs if provided.
// The events are returned in sorted ascending Cursor order.
// If f returns false, the scan terminates early (f will not be applied on
// remaining events in the range).
func (eventHandler *eventHandler) GetEvents(
ctx context.Context,
cursorRange events.CursorRange,
contractIDs []string,
f ScanFunction,
) error {
start := time.Now()

var rows []struct {
EventCursorID string `db:"id"`
TxIndex int `db:"application_order"`
Lcm xdr.LedgerCloseMeta `db:"meta"`
}

rowQ := sq.
Select("e.id", "e.application_order", "lcm.meta").
From(eventTableName + " e").
Join(ledgerCloseMetaTableName + "lcm ON (e.ledger_sequence = lcm.sequence)").
Where(sq.GtOrEq{"e.id": cursorRange.Start.String()}).
Where(sq.Lt{"e.id": cursorRange.End.String()}).
OrderBy("e.id ASC")

if len(contractIDs) > 0 {
rowQ = rowQ.Where(sq.Eq{"e.contract_id": contractIDs})
}

if err := eventHandler.db.Select(ctx, &rows, rowQ); err != nil {
return fmt.Errorf(
"db read failed for start ledger cursor= %v contractIDs= %v: %w",
cursorRange.Start.String(),
contractIDs,
err)
} else if len(rows) < 1 {
return fmt.Errorf("no LCM found with requested event filters")
}

for _, row := range rows {
eventCursorID, txIndex, lcm := row.EventCursorID, row.TxIndex, row.Lcm
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(eventHandler.passphrase, lcm)
if err != nil {
return fmt.Errorf("failed to index to tx %d in ledger %d: %w", txIndex, lcm.LedgerSequence(), err)
}

err = reader.Seek(txIndex - 1)
if err != nil {
return fmt.Errorf("failed to index to tx %d in ledger %d: %w", txIndex, lcm.LedgerSequence(), err)
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
}

ledgerCloseTime := lcm.LedgerCloseTime()
ledgerTx, err := reader.Read()
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
transactionHash := ledgerTx.Result.TransactionHash
diagEvents, diagErr := ledgerTx.GetDiagnosticEvents()

if diagErr != nil {
return fmt.Errorf("db read failed for Event Id %s: %w", eventCursorID, err)
}

// Find events based on filter passed in function f
for eventIndex, event := range diagEvents {
cur := events.Cursor{Ledger: lcm.LedgerSequence(), Tx: uint32(txIndex), Event: uint32(eventIndex)}
if f != nil && !f(event, cur, ledgerCloseTime, &transactionHash) {
return nil
}
}
}

eventHandler.log.
WithField("startLedgerSequence", cursorRange.Start.Ledger).
WithField("endLedgerSequence", cursorRange.End.Ledger).
WithField("duration", time.Since(start)).
Debugf("Fetched and decoded all the events with filters - contractIDs: %v ", contractIDs)

return nil
}
13 changes: 0 additions & 13 deletions cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,9 @@ package db

import (
"context"
"path"
"testing"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/stellar/go/network"
"github.com/stellar/go/support/log"
Expand Down Expand Up @@ -105,14 +103,3 @@ func TestLedgers(t *testing.T) {

assertLedgerRange(t, reader, 8, 12)
}

func NewTestDB(tb testing.TB) *DB {
tmp := tb.TempDir()
dbPath := path.Join(tmp, "db.sqlite")
db, err := OpenSQLiteDB(dbPath)
require.NoError(tb, err)
tb.Cleanup(func() {
assert.NoError(tb, db.Close())
})
return db
}
Loading
Loading