diff --git a/CHANGELOG.md b/CHANGELOG.md index 3c628add..f5aae50c 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -3,6 +3,13 @@ ## Unreleased ### Added + +- Add `EndLedger` in `GetEventsRequest`. This provides finer control and clarity on the range of ledgers being queried. +- Disk-Based Event Storage: Events are now stored on disk instead of in memory. For context, storing approximately 3 million events will require around 1.5 GB of disk space. +This change enhances the scalability and can now support a larger retention window (~7 days) for events. +- Ledger Scanning Limitation: The getEvents RPC will now scan a maximum of `10,000` ledgers per request. This limits the resource usage and ensures more predictable performance, especially for queries spanning large ledger ranges. +- A migration process has been introduced to transition event storage from in-memory to disk-based storage. + * Add support for unpacked JSON responses of base64-encoded XDR fields via a new, optional parameter. When omitted, the behavior does not change and we encode fields as base64. ```typescript xdrFormat?: "" | "base64" | "json" diff --git a/cmd/soroban-rpc/internal/config/options.go b/cmd/soroban-rpc/internal/config/options.go index 9fb80996..58df85e2 100644 --- a/cmd/soroban-rpc/internal/config/options.go +++ b/cmd/soroban-rpc/internal/config/options.go @@ -17,7 +17,8 @@ import ( const ( // OneDayOfLedgers is (roughly) a 24 hour window of ledgers. - OneDayOfLedgers = 17280 + OneDayOfLedgers = 17280 + SevenDayOfLedgers = OneDayOfLedgers * 7 defaultHTTPEndpoint = "localhost:8000" ) diff --git a/cmd/soroban-rpc/internal/daemon/daemon.go b/cmd/soroban-rpc/internal/daemon/daemon.go index c6d287c6..ffe58996 100644 --- a/cmd/soroban-rpc/internal/daemon/daemon.go +++ b/cmd/soroban-rpc/internal/daemon/daemon.go @@ -27,7 +27,6 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ingest" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/preflight" @@ -199,7 +198,7 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { }, metricsRegistry), } - feewindows, eventStore := daemon.mustInitializeStorage(cfg) + feewindows := daemon.mustInitializeStorage(cfg) onIngestionRetry := func(err error, dur time.Duration) { logger.WithError(err).Error("could not run ingestion. Retrying") @@ -215,7 +214,6 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { cfg.HistoryRetentionWindow, cfg.NetworkPassphrase, ), - EventStore: eventStore, NetworkPassPhrase: cfg.NetworkPassphrase, Archive: historyArchive, LedgerBackend: core, @@ -240,12 +238,12 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { jsonRPCHandler := internal.NewJSONRPCHandler(cfg, internal.HandlerParams{ Daemon: daemon, - EventStore: eventStore, FeeStatWindows: feewindows, Logger: logger, LedgerReader: db.NewLedgerReader(dbConn), LedgerEntryReader: db.NewLedgerEntryReader(dbConn), TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase), + EventReader: db.NewEventReader(logger, dbConn, cfg.NetworkPassphrase), PreflightGetter: preflightWorkerPool, }) @@ -290,56 +288,64 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon { } // mustInitializeStorage initializes the storage using what was on the DB -func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) { - eventStore := events.NewMemoryStore( - d, - cfg.NetworkPassphrase, - cfg.HistoryRetentionWindow, - ) - feewindows := feewindow.NewFeeWindows( +func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows { + feeWindows := feewindow.NewFeeWindows( cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow, cfg.NetworkPassphrase, + d.db, ) readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout) defer cancelReadTxMeta() var initialSeq uint32 var currentSeq uint32 - dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg) + applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow) + if err != nil { + d.logger.WithError(err).Fatal("could not get ledger range for migration") + } + + maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow) + feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow) + if err != nil { + d.logger.WithError(err).Fatal("could not get ledger range for fee stats") + } + + // Combine the ledger range for fees, events and transactions + ledgerSeqRange := feeStatsRange.Merge(applicableRange) + + dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange) if err != nil { d.logger.WithError(err).Fatal("could not build migrations") } - // NOTE: We could optimize this to avoid unnecessary ingestion calls - // (the range of txmetas can be larger than the individual store retention windows) - // but it's probably not worth the pain. - err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error { - currentSeq = txmeta.LedgerSequence() - if initialSeq == 0 { - initialSeq = currentSeq - d.logger.WithFields(supportlog.F{ - "seq": currentSeq, - }).Info("initializing in-memory store and applying DB data migrations") - } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { - d.logger.WithFields(supportlog.F{ - "seq": currentSeq, - }).Debug("still initializing in-memory store") - } - if err := eventStore.IngestEvents(txmeta); err != nil { - d.logger.WithError(err).Fatal("could not initialize event memory store") - } - if err := feewindows.IngestFees(txmeta); err != nil { - d.logger.WithError(err).Fatal("could not initialize fee stats") - } - // TODO: clean up once we remove the in-memory storage. - // (we should only stream over the required range) - if r := dataMigrations.ApplicableRange(); r.IsLedgerIncluded(currentSeq) { - if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil { - d.logger.WithError(err).Fatal("could not run migrations") + + // Apply migration for events, transactions and fee stats + err = db.NewLedgerReader(d.db).StreamLedgerRange( + readTxMetaCtx, + ledgerSeqRange.First, + ledgerSeqRange.Last, + func(txMeta xdr.LedgerCloseMeta) error { + currentSeq = txMeta.LedgerSequence() + if initialSeq == 0 { + initialSeq = currentSeq + d.logger.WithFields(supportlog.F{ + "seq": currentSeq, + }).Info("initializing in-memory store") + } else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 { + d.logger.WithFields(supportlog.F{ + "seq": currentSeq, + }).Debug("still initializing in-memory store") } - } - return nil - }) + + if err = feeWindows.IngestFees(txMeta); err != nil { + d.logger.WithError(err).Fatal("could not initialize fee stats") + } + + if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil { + d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq) + } + return nil + }) if err != nil { d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database") } @@ -353,7 +359,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow }).Info("finished initializing in-memory store and applying DB data migrations") } - return feewindows, eventStore + return feeWindows } func (d *Daemon) Run() { diff --git a/cmd/soroban-rpc/internal/events/cursor.go b/cmd/soroban-rpc/internal/db/cursor.go similarity index 94% rename from cmd/soroban-rpc/internal/events/cursor.go rename to cmd/soroban-rpc/internal/db/cursor.go index 8be24efa..7d009df6 100644 --- a/cmd/soroban-rpc/internal/events/cursor.go +++ b/cmd/soroban-rpc/internal/db/cursor.go @@ -1,4 +1,4 @@ -package events +package db import ( "encoding/json" @@ -27,6 +27,13 @@ type Cursor struct { Event uint32 } +type CursorRange struct { + // Start defines the (inclusive) start of the range. + Start Cursor + // End defines the (exclusive) end of the range. + End Cursor +} + // String returns a string representation of this cursor func (c Cursor) String() string { return fmt.Sprintf( diff --git a/cmd/soroban-rpc/internal/events/cursor_test.go b/cmd/soroban-rpc/internal/db/cursor_test.go similarity index 99% rename from cmd/soroban-rpc/internal/events/cursor_test.go rename to cmd/soroban-rpc/internal/db/cursor_test.go index 6dfe1e58..b081a98b 100644 --- a/cmd/soroban-rpc/internal/events/cursor_test.go +++ b/cmd/soroban-rpc/internal/db/cursor_test.go @@ -1,4 +1,4 @@ -package events +package db import ( "encoding/json" diff --git a/cmd/soroban-rpc/internal/db/db.go b/cmd/soroban-rpc/internal/db/db.go index 1767f610..ccdf5c14 100644 --- a/cmd/soroban-rpc/internal/db/db.go +++ b/cmd/soroban-rpc/internal/db/db.go @@ -38,6 +38,7 @@ type ReadWriter interface { type WriteTx interface { TransactionWriter() TransactionWriter + EventWriter() EventWriter LedgerEntryWriter() LedgerEntryWriter LedgerWriter() LedgerWriter @@ -250,6 +251,12 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) { stmtCache: stmtCache, passphrase: rw.passphrase, }, + eventWriter: eventHandler{ + log: rw.log, + db: txSession, + stmtCache: stmtCache, + passphrase: rw.passphrase, + }, } writer.txWriter.RegisterMetrics( rw.metrics.TxIngestDuration, @@ -266,6 +273,7 @@ type writeTx struct { ledgerEntryWriter ledgerEntryWriter ledgerWriter ledgerWriter txWriter transactionHandler + eventWriter eventHandler ledgerRetentionWindow uint32 } @@ -281,6 +289,10 @@ func (w writeTx) TransactionWriter() TransactionWriter { return &w.txWriter } +func (w writeTx) EventWriter() EventWriter { + return &w.eventWriter +} + func (w writeTx) Commit(ledgerSeq uint32) error { if err := w.ledgerEntryWriter.flush(); err != nil { return err @@ -293,6 +305,10 @@ func (w writeTx) Commit(ledgerSeq uint32) error { return err } + if err := w.eventWriter.trimEvents(ledgerSeq, w.ledgerRetentionWindow); err != nil { + return err + } + _, err := sq.Replace(metaTableName). Values(latestLedgerSequenceMetaKey, strconv.FormatUint(uint64(ledgerSeq), 10)). RunWith(w.stmtCache). diff --git a/cmd/soroban-rpc/internal/db/event.go b/cmd/soroban-rpc/internal/db/event.go new file mode 100644 index 00000000..bf956219 --- /dev/null +++ b/cmd/soroban-rpc/internal/db/event.go @@ -0,0 +1,344 @@ +package db + +import ( + "context" + "errors" + "fmt" + "io" + "time" + + sq "github.com/Masterminds/squirrel" + + "github.com/stellar/go/ingest" + "github.com/stellar/go/strkey" + "github.com/stellar/go/support/db" + "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" +) + +const ( + eventTableName = "events" + firstLedger = uint32(2) + MinTopicCount = 1 + MaxTopicCount = 4 +) + +type NestedTopicArray [][][]byte + +// EventWriter is used during ingestion of events from LCM to DB +type EventWriter interface { + InsertEvents(lcm xdr.LedgerCloseMeta) error +} + +// EventReader has all the public methods to fetch events from DB +type EventReader interface { + GetEvents( + ctx context.Context, + cursorRange CursorRange, + contractIDs [][]byte, + topics NestedTopicArray, + eventTypes []int, + f ScanFunction, + ) error +} + +type eventHandler struct { + log *log.Entry + db db.SessionInterface + stmtCache *sq.StmtCache + passphrase string +} + +func NewEventReader(log *log.Entry, db db.SessionInterface, passphrase string) EventReader { + return &eventHandler{log: log, db: db, passphrase: passphrase} +} + +//nolint:gocognit,cyclop,funlen +func (eventHandler *eventHandler) InsertEvents(lcm xdr.LedgerCloseMeta) error { + txCount := lcm.CountTransactions() + + if eventHandler.stmtCache == nil { + return errors.New("EventWriter incorrectly initialized without stmtCache") + } else if txCount == 0 { + return nil + } + + txReader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(eventHandler.passphrase, lcm) + if err != nil { + return errors.Join(err, + fmt.Errorf("failed to open transaction reader for ledger %d", lcm.LedgerSequence()), + ) + } + defer func() { + closeErr := txReader.Close() + err = errors.Join(err, closeErr) + }() + + for { + var tx ingest.LedgerTransaction + tx, err = txReader.Read() + if errors.Is(err, io.EOF) { + err = nil + break + } else if err != nil { + return err + } + + if !tx.Result.Successful() { + continue + } + + transactionHash := tx.Result.TransactionHash[:] + + txEvents, err := tx.GetDiagnosticEvents() + if err != nil { + return err + } + + if len(txEvents) == 0 { + continue + } + + query := sq.Insert(eventTableName). + Columns( + "id", + "contract_id", + "event_type", + "event_data", + "ledger_close_time", + "transaction_hash", + "topic1", "topic2", "topic3", "topic4", + ) + + for index, e := range txEvents { + var contractID []byte + if e.Event.ContractId != nil { + contractID = e.Event.ContractId[:] + } + + id := Cursor{Ledger: lcm.LedgerSequence(), Tx: tx.Index, Op: 0, Event: uint32(index)}.String() + eventBlob, err := e.MarshalBinary() + if err != nil { + return err + } + + v0, ok := e.Event.Body.GetV0() + if !ok { + return errors.New("unknown event version") + } + + // Encode the topics + topicList := make([][]byte, MaxTopicCount) + for index := 0; index < len(v0.Topics) && index < MaxTopicCount; index++ { + segment := v0.Topics[index] + seg, err := segment.MarshalBinary() + if err != nil { + return err + } + topicList[index] = seg + } + + query = query.Values( + id, + contractID, + int(e.Event.Type), + eventBlob, + lcm.LedgerCloseTime(), + transactionHash, + topicList[0], topicList[1], topicList[2], topicList[3], + ) + } + // Ignore the last inserted ID as it is not needed + _, err = query.RunWith(eventHandler.stmtCache).Exec() + if err != nil { + return err + } + } + + return nil +} + +type ScanFunction func( + event xdr.DiagnosticEvent, + cursor Cursor, + ledgerCloseTimestamp int64, + txHash *xdr.Hash, +) bool + +// trimEvents removes all Events which fall outside the ledger retention window. +func (eventHandler *eventHandler) trimEvents(latestLedgerSeq uint32, retentionWindow uint32) error { + if latestLedgerSeq+1 <= retentionWindow { + return nil + } + cutoff := latestLedgerSeq + 1 - retentionWindow + id := Cursor{Ledger: cutoff}.String() + + _, err := sq.StatementBuilder. + RunWith(eventHandler.stmtCache). + Delete(eventTableName). + Where(sq.Lt{"id": id}). + Exec() + return err +} + +// GetEvents applies f on all the events occurring in the given range with specified contract IDs if provided. +// The events are returned in sorted ascending Cursor order. +// If f returns false, the scan terminates early (f will not be applied on +// remaining events in the range). +// +//nolint:funlen,cyclop +func (eventHandler *eventHandler) GetEvents( + ctx context.Context, + cursorRange CursorRange, + contractIDs [][]byte, + topics NestedTopicArray, + eventTypes []int, + f ScanFunction, +) error { + start := time.Now() + + rowQ := sq. + Select(" id", "event_data", "transaction_hash", "ledger_close_time"). + From(eventTableName). + Where(sq.GtOrEq{"id": cursorRange.Start.String()}). + Where(sq.Lt{"id": cursorRange.End.String()}). + OrderBy("id ASC") + + if len(contractIDs) > 0 { + rowQ = rowQ.Where(sq.Eq{"contract_id": contractIDs}) + } + if len(eventTypes) > 0 { + rowQ = rowQ.Where(sq.Eq{"event_type": eventTypes}) + } + + if len(topics) > 0 { + var orConditions sq.Or + for i, topic := range topics { + if topic == nil { + continue + } + orConditions = append(orConditions, sq.Eq{fmt.Sprintf("topic%d", i+1): topic}) + } + if len(orConditions) > 0 { + rowQ = rowQ.Where(orConditions) + } + } + + encodedContractIDs := make([]string, 0, len(contractIDs)) + for _, contractID := range contractIDs { + result, err := strkey.Encode(strkey.VersionByteContract, contractID) + if err != nil { + return errors.Join(err, errors.New("failed to encode contract id")) + } + encodedContractIDs = append(encodedContractIDs, result) + } + + rows, err := eventHandler.db.Query(ctx, rowQ) + if err != nil { + eventHandler.log. + WithField("duration", time.Since(start)). + WithField("start", cursorRange.Start.String()). + WithField("end", cursorRange.End.String()). + WithField("contractIds", encodedContractIDs). + WithField("eventTypes", eventTypes). + WithField("Topics", topics). + Debugf( + "db read failed for requested parameter", + ) + + return errors.Join(err, errors.New("db read failed for requested parameter")) + } + + defer rows.Close() + + foundRows := false + for rows.Next() { + foundRows = true + var row struct { + eventCursorID string `db:"id"` + eventData []byte `db:"event_data"` + transactionHash []byte `db:"transaction_hash"` + ledgerCloseTime int64 `db:"ledger_close_time"` + } + + err = rows.Scan(&row.eventCursorID, &row.eventData, &row.transactionHash, &row.ledgerCloseTime) + if err != nil { + return fmt.Errorf("failed to scan row: %w", err) + } + + id, eventData, ledgerCloseTime := row.eventCursorID, row.eventData, row.ledgerCloseTime + transactionHash := row.transactionHash + cur, err := ParseCursor(id) + if err != nil { + return errors.Join(err, errors.New("failed to parse cursor")) + } + + var eventXDR xdr.DiagnosticEvent + err = xdr.SafeUnmarshal(eventData, &eventXDR) + if err != nil { + return errors.Join(err, errors.New("failed to decode event")) + } + txHash := xdr.Hash(transactionHash) + if !f(eventXDR, cur, ledgerCloseTime, &txHash) { + return nil + } + } + if !foundRows { + eventHandler.log. + WithField("duration", time.Since(start)). + WithField("start", cursorRange.Start.String()). + WithField("end", cursorRange.End.String()). + WithField("contractIds", encodedContractIDs). + WithField("eventTypes", eventTypes). + WithField("Topics", topics). + Debugf( + "No events found for ledger range", + ) + } + + eventHandler.log. + WithField("startLedgerSequence", cursorRange.Start.Ledger). + WithField("endLedgerSequence", cursorRange.End.Ledger). + WithField("duration", time.Since(start)). + Debugf("Fetched and decoded all the events with filters - contractIDs: %v ", encodedContractIDs) + + return rows.Err() +} + +type eventTableMigration struct { + firstLedger uint32 + lastLedger uint32 + writer EventWriter +} + +func (e *eventTableMigration) ApplicableRange() *LedgerSeqRange { + return &LedgerSeqRange{ + First: e.firstLedger, + Last: e.lastLedger, + } +} + +func (e *eventTableMigration) Apply(_ context.Context, meta xdr.LedgerCloseMeta) error { + return e.writer.InsertEvents(meta) +} + +func newEventTableMigration( + _ context.Context, + logger *log.Entry, + passphrase string, + ledgerSeqRange *LedgerSeqRange, +) migrationApplierFactory { + return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) { + migration := eventTableMigration{ + firstLedger: ledgerSeqRange.First, + lastLedger: ledgerSeqRange.Last, + writer: &eventHandler{ + log: logger, + db: db, + stmtCache: sq.NewStmtCache(db.GetTx()), + passphrase: passphrase, + }, + } + return &migration, nil + }) +} diff --git a/cmd/soroban-rpc/internal/db/event_test.go b/cmd/soroban-rpc/internal/db/event_test.go new file mode 100644 index 00000000..b6d3480e --- /dev/null +++ b/cmd/soroban-rpc/internal/db/event_test.go @@ -0,0 +1,179 @@ +package db + +import ( + "context" + "testing" + "time" + + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/require" + + "github.com/stellar/go/keypair" + "github.com/stellar/go/network" + "github.com/stellar/go/support/log" + "github.com/stellar/go/xdr" + + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" +) + +func transactionMetaWithEvents(events ...xdr.ContractEvent) xdr.TransactionMeta { + return xdr.TransactionMeta{ + V: 3, + Operations: &[]xdr.OperationMeta{}, + V3: &xdr.TransactionMetaV3{ + SorobanMeta: &xdr.SorobanTransactionMeta{ + Events: events, + }, + }, + } +} + +func contractEvent(contractID xdr.Hash, topic []xdr.ScVal, body xdr.ScVal) xdr.ContractEvent { + return xdr.ContractEvent{ + ContractId: &contractID, + Type: xdr.ContractEventTypeContract, + Body: xdr.ContractEventBody{ + V: 0, + V0: &xdr.ContractEventV0{ + Topics: topic, + Data: body, + }, + }, + } +} + +func ledgerCloseMetaWithEvents( + sequence uint32, + closeTimestamp int64, + txMeta ...xdr.TransactionMeta, +) xdr.LedgerCloseMeta { + txProcessing := make([]xdr.TransactionResultMeta, 0, len(txMeta)) + phases := make([]xdr.TransactionPhase, 0, len(txMeta)) + + for _, item := range txMeta { + var operations []xdr.Operation + for range item.MustV3().SorobanMeta.Events { + operations = append(operations, + xdr.Operation{ + Body: xdr.OperationBody{ + Type: xdr.OperationTypeInvokeHostFunction, + InvokeHostFunctionOp: &xdr.InvokeHostFunctionOp{ + HostFunction: xdr.HostFunction{ + Type: xdr.HostFunctionTypeHostFunctionTypeInvokeContract, + InvokeContract: &xdr.InvokeContractArgs{ + ContractAddress: xdr.ScAddress{ + Type: xdr.ScAddressTypeScAddressTypeContract, + ContractId: &xdr.Hash{0x1, 0x2}, + }, + FunctionName: "foo", + Args: nil, + }, + }, + Auth: []xdr.SorobanAuthorizationEntry{}, + }, + }, + }) + } + envelope := xdr.TransactionEnvelope{ + Type: xdr.EnvelopeTypeEnvelopeTypeTx, + V1: &xdr.TransactionV1Envelope{ + Tx: xdr.Transaction{ + SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), + Operations: operations, + }, + }, + } + txHash, err := network.HashTransactionInEnvelope(envelope, network.FutureNetworkPassphrase) + if err != nil { + panic(err) + } + + txProcessing = append(txProcessing, xdr.TransactionResultMeta{ + TxApplyProcessing: item, + Result: xdr.TransactionResultPair{ + TransactionHash: txHash, + }, + }) + components := []xdr.TxSetComponent{ + { + Type: xdr.TxSetComponentTypeTxsetCompTxsMaybeDiscountedFee, + TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{ + Txs: []xdr.TransactionEnvelope{ + envelope, + }, + }, + }, + } + phases = append(phases, xdr.TransactionPhase{ + V: 0, + V0Components: &components, + }) + } + + return xdr.LedgerCloseMeta{ + V: 1, + V1: &xdr.LedgerCloseMetaV1{ + LedgerHeader: xdr.LedgerHeaderHistoryEntry{ + Hash: xdr.Hash{}, + Header: xdr.LedgerHeader{ + ScpValue: xdr.StellarValue{ + CloseTime: xdr.TimePoint(closeTimestamp), + }, + LedgerSeq: xdr.Uint32(sequence), + }, + }, + TxSet: xdr.GeneralizedTransactionSet{ + V: 1, + V1TxSet: &xdr.TransactionSetV1{ + PreviousLedgerHash: xdr.Hash{}, + Phases: phases, + }, + }, + TxProcessing: txProcessing, + }, + } +} + +func TestInsertEvents(t *testing.T) { + db := NewTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + now := time.Now().UTC() + + writer := NewReadWriter(log, db, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + contractID := xdr.Hash([32]byte{}) + counter := xdr.ScSymbol("COUNTER") + + txMeta := make([]xdr.TransactionMeta, 0, 10) + for range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} { + txMeta = append(txMeta, transactionMetaWithEvents( + contractEvent( + contractID, + xdr.ScVec{xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }}, + xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }, + ), + )) + } + ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) + + eventW := write.EventWriter() + err = eventW.InsertEvents(ledgerCloseMeta) + require.NoError(t, err) + + eventReader := NewEventReader(log, db, passphrase) + start := Cursor{Ledger: 1} + end := Cursor{Ledger: 100} + cursorRange := CursorRange{Start: start, End: end} + + err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, nil) + require.NoError(t, err) +} diff --git a/cmd/soroban-rpc/internal/db/ledger.go b/cmd/soroban-rpc/internal/db/ledger.go index 39e43b83..66c34229 100644 --- a/cmd/soroban-rpc/internal/db/ledger.go +++ b/cmd/soroban-rpc/internal/db/ledger.go @@ -21,6 +21,7 @@ type LedgerReader interface { GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) + StreamLedgerRange(ctx context.Context, startLedger uint32, endLedger uint32, f StreamLedgerFn) error } type LedgerWriter interface { @@ -55,6 +56,35 @@ func (r ledgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) er return q.Err() } +// StreamLedgerRange runs f over inclusive (startLedger, endLedger) (until f errors or signals it's done). +func (r ledgerReader) StreamLedgerRange( + ctx context.Context, + startLedger uint32, + endLedger uint32, + f StreamLedgerFn, +) error { + sql := sq.Select("meta").From(ledgerCloseMetaTableName). + Where(sq.GtOrEq{"sequence": startLedger}). + Where(sq.LtOrEq{"sequence": endLedger}). + OrderBy("sequence asc") + + q, err := r.db.Query(ctx, sql) + if err != nil { + return err + } + defer q.Close() + for q.Next() { + var closeMeta xdr.LedgerCloseMeta + if err = q.Scan(&closeMeta); err != nil { + return err + } + if err = f(closeMeta); err != nil { + return err + } + } + return q.Err() +} + // GetLedger fetches a single ledger from the db. func (r ledgerReader) GetLedger(ctx context.Context, sequence uint32) (xdr.LedgerCloseMeta, bool, error) { sql := sq.Select("meta").From(ledgerCloseMetaTableName).Where(sq.Eq{"sequence": sequence}) diff --git a/cmd/soroban-rpc/internal/db/migration.go b/cmd/soroban-rpc/internal/db/migration.go index 552cc98d..9f7aabd9 100644 --- a/cmd/soroban-rpc/internal/db/migration.go +++ b/cmd/soroban-rpc/internal/db/migration.go @@ -7,20 +7,23 @@ import ( "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" +) - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config" +const ( + transactionsMigrationName = "TransactionsTable" + eventsMigrationName = "EventsTable" ) type LedgerSeqRange struct { - firstLedgerSeq uint32 - lastLedgerSeq uint32 + First uint32 + Last uint32 } func (mlr *LedgerSeqRange) IsLedgerIncluded(ledgerSeq uint32) bool { if mlr == nil { return false } - return ledgerSeq >= mlr.firstLedgerSeq && ledgerSeq <= mlr.lastLedgerSeq + return ledgerSeq >= mlr.First && ledgerSeq <= mlr.Last } func (mlr *LedgerSeqRange) Merge(other *LedgerSeqRange) *LedgerSeqRange { @@ -33,8 +36,8 @@ func (mlr *LedgerSeqRange) Merge(other *LedgerSeqRange) *LedgerSeqRange { // TODO: using min/max can result in a much larger range than needed, // as an optimization, we should probably use a sequence of ranges instead. return &LedgerSeqRange{ - firstLedgerSeq: min(mlr.firstLedgerSeq, other.firstLedgerSeq), - lastLedgerSeq: max(mlr.lastLedgerSeq, other.lastLedgerSeq), + First: min(mlr.First, other.First), + Last: max(mlr.Last, other.Last), } } @@ -47,65 +50,59 @@ type MigrationApplier interface { Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error } +type migrationApplierF func(context.Context, *log.Entry, string, *LedgerSeqRange) migrationApplierFactory + type migrationApplierFactory interface { - New(db *DB, latestLedger uint32) (MigrationApplier, error) + New(db *DB) (MigrationApplier, error) } -type migrationApplierFactoryF func(db *DB, latestLedger uint32) (MigrationApplier, error) +type migrationApplierFactoryF func(db *DB) (MigrationApplier, error) -func (m migrationApplierFactoryF) New(db *DB, latestLedger uint32) (MigrationApplier, error) { - return m(db, latestLedger) +func (m migrationApplierFactoryF) New(db *DB) (MigrationApplier, error) { + return m(db) } type Migration interface { MigrationApplier Commit(ctx context.Context) error - Rollback(ctx context.Context) error } -type multiMigration []Migration +type MultiMigration struct { + migrations []Migration + db *DB +} -func (mm multiMigration) ApplicableRange() *LedgerSeqRange { +func (mm MultiMigration) ApplicableRange() *LedgerSeqRange { var result *LedgerSeqRange - for _, m := range mm { + for _, m := range mm.migrations { result = m.ApplicableRange().Merge(result) } return result } -func (mm multiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { +func (mm MultiMigration) Apply(ctx context.Context, meta xdr.LedgerCloseMeta) error { var err error - for _, m := range mm { + for _, m := range mm.migrations { ledgerSeq := meta.LedgerSequence() if !m.ApplicableRange().IsLedgerIncluded(ledgerSeq) { // The range of a sub-migration can be smaller than the global range. continue } if localErr := m.Apply(ctx, meta); localErr != nil { - err = errors.Join(err, localErr) + err = errors.Join(err, localErr, mm.db.Rollback()) } } return err } -func (mm multiMigration) Commit(ctx context.Context) error { +func (mm MultiMigration) Commit(ctx context.Context) error { var err error - for _, m := range mm { + for _, m := range mm.migrations { if localErr := m.Commit(ctx); localErr != nil { - err = errors.Join(err, localErr) - } - } - return err -} - -func (mm multiMigration) Rollback(ctx context.Context) error { - var err error - for _, m := range mm { - if localErr := m.Rollback(ctx); localErr != nil { - err = errors.Join(err, localErr) + err = errors.Join(err, localErr, mm.db.Rollback()) } } - return err + return mm.db.Commit() } // guardedMigration is a db data migration whose application is guarded by a boolean in the meta table @@ -122,32 +119,18 @@ type guardedMigration struct { func newGuardedDataMigration( ctx context.Context, uniqueMigrationName string, logger *log.Entry, factory migrationApplierFactory, db *DB, ) (Migration, error) { - migrationDB := &DB{ - cache: db.cache, - SessionInterface: db.SessionInterface.Clone(), - } - if err := migrationDB.Begin(ctx); err != nil { - return nil, err - } metaKey := "Migration" + uniqueMigrationName + "Done" - previouslyMigrated, err := getMetaBool(ctx, migrationDB, metaKey) + previouslyMigrated, err := getMetaBool(ctx, db, metaKey) if err != nil && !errors.Is(err, ErrEmptyDB) { - err = errors.Join(err, migrationDB.Rollback()) return nil, err } - latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) - if err != nil && !errors.Is(err, ErrEmptyDB) { - err = errors.Join(err, migrationDB.Rollback()) - return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err) - } - applier, err := factory.New(migrationDB, latestLedger) + applier, err := factory.New(db) if err != nil { - err = errors.Join(err, migrationDB.Rollback()) return nil, err } guardedMigration := &guardedMigration{ guardMetaKey: metaKey, - db: migrationDB, + db: db, migration: applier, alreadyMigrated: previouslyMigrated, logger: logger, @@ -179,30 +162,58 @@ func (g *guardedMigration) Commit(ctx context.Context) error { if g.alreadyMigrated { return nil } - err := setMetaBool(ctx, g.db, g.guardMetaKey, true) - if err != nil { - return errors.Join(err, g.Rollback(ctx)) - } - return g.db.Commit() + return setMetaBool(ctx, g.db, g.guardMetaKey, true) } -func (g *guardedMigration) Rollback(_ context.Context) error { - return g.db.Rollback() +func GetMigrationLedgerRange(ctx context.Context, db *DB, retentionWindow uint32) (*LedgerSeqRange, error) { + firstLedgerToMigrate := firstLedger + latestLedger, err := NewLedgerEntryReader(db).GetLatestLedgerSequence(ctx) + if err != nil && !errors.Is(err, ErrEmptyDB) { + return nil, fmt.Errorf("failed to get latest ledger sequence: %w", err) + } + if latestLedger > retentionWindow { + firstLedgerToMigrate = latestLedger - retentionWindow + } + return &LedgerSeqRange{ + First: firstLedgerToMigrate, + Last: latestLedger, + }, nil } -func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, cfg *config.Config) (Migration, error) { - migrationName := "TransactionsTable" - logger = logger.WithField("migration", migrationName) - factory := newTransactionTableMigration( - ctx, - logger, - cfg.HistoryRetentionWindow, - cfg.NetworkPassphrase, - ) - m, err := newGuardedDataMigration(ctx, migrationName, logger, factory, db) +func BuildMigrations(ctx context.Context, logger *log.Entry, db *DB, networkPassphrase string, + ledgerSeqRange *LedgerSeqRange, +) (MultiMigration, error) { + // Start a common db transaction for the entire migration duration + err := db.Begin(ctx) if err != nil { - return nil, fmt.Errorf("creating guarded transaction migration: %w", err) + return MultiMigration{}, errors.Join(err, db.Rollback()) + } + + migrationNameToFunc := map[string]migrationApplierF{ + transactionsMigrationName: newTransactionTableMigration, + eventsMigrationName: newEventTableMigration, + } + + migrations := make([]Migration, 0, len(migrationNameToFunc)) + + for migrationName, migrationFunc := range migrationNameToFunc { + migrationLogger := logger.WithField("migration", migrationName) + factory := migrationFunc( + ctx, + migrationLogger, + networkPassphrase, + ledgerSeqRange, + ) + + guardedM, err := newGuardedDataMigration(ctx, migrationName, migrationLogger, factory, db) + if err != nil { + return MultiMigration{}, errors.Join(err, fmt.Errorf( + "could not create guarded migration for %s", migrationName), db.Rollback()) + } + migrations = append(migrations, guardedM) } - // Add other migrations here - return multiMigration{m}, nil + return MultiMigration{ + migrations: migrations, + db: db, + }, nil } diff --git a/cmd/soroban-rpc/internal/db/mocks.go b/cmd/soroban-rpc/internal/db/mocks.go index 492d64bb..fee538d9 100644 --- a/cmd/soroban-rpc/internal/db/mocks.go +++ b/cmd/soroban-rpc/internal/db/mocks.go @@ -101,6 +101,10 @@ func (m *MockLedgerReader) StreamAllLedgers(_ context.Context, _ StreamLedgerFn) return nil } +func (m *MockLedgerReader) StreamLedgerRange(_ context.Context, _ uint32, _ uint32, _ StreamLedgerFn) error { + return nil +} + func (m *MockLedgerReader) GetLedgerRange(_ context.Context) (ledgerbucketwindow.LedgerRange, error) { return m.txn.ledgerRange, nil } diff --git a/cmd/soroban-rpc/internal/db/sqlmigrations/03_events.sql b/cmd/soroban-rpc/internal/db/sqlmigrations/03_events.sql new file mode 100644 index 00000000..106824cf --- /dev/null +++ b/cmd/soroban-rpc/internal/db/sqlmigrations/03_events.sql @@ -0,0 +1,23 @@ +-- +migrate Up + +-- indexing table to find events in ledgers by contract_id +CREATE TABLE events +( + id TEXT PRIMARY KEY, + contract_id BLOB(32), + event_type INTEGER NOT NULL, + event_data BLOB NOT NULL, + ledger_close_time INTEGER NOT NULL, + transaction_hash BLOB(32), + topic1 BLOB, + topic2 BLOB, + topic3 BLOB, + topic4 BLOB +); + +CREATE INDEX idx_contract_id ON events (contract_id); +CREATE INDEX idx_topic1 ON events (topic1); + + +-- +migrate Down +drop table events cascade; diff --git a/cmd/soroban-rpc/internal/db/transaction.go b/cmd/soroban-rpc/internal/db/transaction.go index 8bccc58e..361bc6be 100644 --- a/cmd/soroban-rpc/internal/db/transaction.go +++ b/cmd/soroban-rpc/internal/db/transaction.go @@ -257,8 +257,8 @@ type transactionTableMigration struct { func (t *transactionTableMigration) ApplicableRange() *LedgerSeqRange { return &LedgerSeqRange{ - firstLedgerSeq: t.firstLedger, - lastLedgerSeq: t.lastLedger, + First: t.firstLedger, + Last: t.lastLedger, } } @@ -266,20 +266,13 @@ func (t *transactionTableMigration) Apply(_ context.Context, meta xdr.LedgerClos return t.writer.InsertTransactions(meta) } -func newTransactionTableMigration(ctx context.Context, logger *log.Entry, - retentionWindow uint32, passphrase string, +func newTransactionTableMigration( + ctx context.Context, + logger *log.Entry, + passphrase string, + ledgerSeqRange *LedgerSeqRange, ) migrationApplierFactory { - return migrationApplierFactoryF(func(db *DB, latestLedger uint32) (MigrationApplier, error) { - firstLedgerToMigrate := uint32(2) //nolint:mnd - writer := &transactionHandler{ - log: logger, - db: db, - stmtCache: sq.NewStmtCache(db.GetTx()), - passphrase: passphrase, - } - if latestLedger > retentionWindow { - firstLedgerToMigrate = latestLedger - retentionWindow - } + return migrationApplierFactoryF(func(db *DB) (MigrationApplier, error) { // Truncate the table, since it may contain data, causing insert conflicts later on. // (the migration was shipped after the actual transactions table change) _, err := db.Exec(ctx, sq.Delete(transactionTableName)) @@ -287,9 +280,14 @@ func newTransactionTableMigration(ctx context.Context, logger *log.Entry, return nil, fmt.Errorf("couldn't delete table %q: %w", transactionTableName, err) } migration := transactionTableMigration{ - firstLedger: firstLedgerToMigrate, - lastLedger: latestLedger, - writer: writer, + firstLedger: ledgerSeqRange.First, + lastLedger: ledgerSeqRange.Last, + writer: &transactionHandler{ + log: logger, + db: db, + stmtCache: sq.NewStmtCache(db.GetTx()), + passphrase: passphrase, + }, } return &migration, nil }) diff --git a/cmd/soroban-rpc/internal/db/transaction_test.go b/cmd/soroban-rpc/internal/db/transaction_test.go index e05671b9..62a940f3 100644 --- a/cmd/soroban-rpc/internal/db/transaction_test.go +++ b/cmd/soroban-rpc/internal/db/transaction_test.go @@ -27,6 +27,43 @@ func TestTransactionNotFound(t *testing.T) { require.ErrorIs(t, err, ErrNoTransaction) } +func txMetaWithEvents(acctSeq uint32) xdr.LedgerCloseMeta { + meta := txMeta(acctSeq, true) + + contractIDBytes, _ := hex.DecodeString("df06d62447fd25da07c0135eed7557e5a5497ee7d15b7fe345bd47e191d8f577") + var contractID xdr.Hash + copy(contractID[:], contractIDBytes) + counter := xdr.ScSymbol("COUNTER") + + meta.V1.TxProcessing[0].TxApplyProcessing.V3 = &xdr.TransactionMetaV3{ + SorobanMeta: &xdr.SorobanTransactionMeta{ + Events: []xdr.ContractEvent{{ + ContractId: &contractID, + Type: xdr.ContractEventTypeContract, + Body: xdr.ContractEventBody{ + V: 0, + V0: &xdr.ContractEventV0{ + Topics: []xdr.ScVal{{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }}, + Data: xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }, + }, + }, + }}, + ReturnValue: xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }, + }, + } + + return meta +} + func TestTransactionFound(t *testing.T) { db := NewTestDB(t) ctx := context.TODO() @@ -38,16 +75,17 @@ func TestTransactionFound(t *testing.T) { require.NoError(t, err) lcms := []xdr.LedgerCloseMeta{ - txMeta(1234, true), - txMeta(1235, true), - txMeta(1236, true), - txMeta(1237, true), + txMetaWithEvents(1234), + txMetaWithEvents(1235), + txMetaWithEvents(1236), + txMetaWithEvents(1237), } - + eventW := write.EventWriter() ledgerW, txW := write.LedgerWriter(), write.TransactionWriter() for _, lcm := range lcms { require.NoError(t, ledgerW.InsertLedger(lcm), "ingestion failed for ledger %+v", lcm.V1) require.NoError(t, txW.InsertTransactions(lcm), "ingestion failed for ledger %+v", lcm.V1) + require.NoError(t, eventW.InsertEvents(lcm), "ingestion failed for ledger %+v", lcm.V1) } require.NoError(t, write.Commit(lcms[len(lcms)-1].LedgerSequence())) @@ -56,6 +94,14 @@ func TestTransactionFound(t *testing.T) { _, err = reader.GetTransaction(ctx, xdr.Hash{}) require.ErrorIs(t, err, ErrNoTransaction) + eventReader := NewEventReader(log, db, passphrase) + start := Cursor{Ledger: 1} + end := Cursor{Ledger: 1000} + cursorRange := CursorRange{Start: start, End: end} + + err = eventReader.GetEvents(ctx, cursorRange, nil, nil, nil, nil) + require.NoError(t, err) + // check all 200 cases for _, lcm := range lcms { h := lcm.TransactionHash(0) diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go deleted file mode 100644 index d9c3aa65..00000000 --- a/cmd/soroban-rpc/internal/events/events.go +++ /dev/null @@ -1,277 +0,0 @@ -package events - -import ( - "errors" - "io" - "sort" - "sync" - "time" - - "github.com/prometheus/client_golang/prometheus" - - "github.com/stellar/go/ingest" - "github.com/stellar/go/xdr" - - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" -) - -type event struct { - diagnosticEventXDR []byte - txIndex uint32 - eventIndex uint32 - // intentionally stored as a pointer to save memory - // (amortized as soon as there are two events in a transaction) - txHash *xdr.Hash -} - -func (e event) cursor(ledgerSeq uint32) Cursor { - return Cursor{ - Ledger: ledgerSeq, - Tx: e.txIndex, - Event: e.eventIndex, - } -} - -// MemoryStore is an in-memory store of soroban events. -type MemoryStore struct { - // networkPassphrase is an immutable string containing the - // Stellar network passphrase. - // Accessing networkPassphrase does not need to be protected - // by the lock - networkPassphrase string - // lock protects the mutable fields below - lock sync.RWMutex - eventsByLedger *ledgerbucketwindow.LedgerBucketWindow[[]event] - eventsDurationMetric *prometheus.SummaryVec - eventCountMetric prometheus.Summary -} - -// NewMemoryStore creates a new MemoryStore. -// The retention window is in units of ledgers. -// All events occurring in the following ledger range -// [ latestLedger - retentionWindow, latestLedger ] -// will be included in the MemoryStore. If the MemoryStore -// is full, any events from new ledgers will evict -// older entries outside the retention window. -func NewMemoryStore(daemon interfaces.Daemon, networkPassphrase string, retentionWindow uint32) *MemoryStore { - window := ledgerbucketwindow.NewLedgerBucketWindow[[]event](retentionWindow) - - // eventsDurationMetric is a metric for measuring latency of event store operations - eventsDurationMetric := prometheus.NewSummaryVec(prometheus.SummaryOpts{ - Namespace: daemon.MetricsNamespace(), Subsystem: "events", Name: "operation_duration_seconds", - Help: "event store operation durations, sliding window = 10m", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }, - []string{"operation"}, - ) - - eventCountMetric := prometheus.NewSummary(prometheus.SummaryOpts{ - Namespace: daemon.MetricsNamespace(), Subsystem: "events", Name: "count", - Help: "count of events ingested, sliding window = 10m", - Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, - }) - daemon.MetricsRegistry().MustRegister(eventCountMetric, eventsDurationMetric) - return &MemoryStore{ - networkPassphrase: networkPassphrase, - eventsByLedger: window, - eventsDurationMetric: eventsDurationMetric, - eventCountMetric: eventCountMetric, - } -} - -// Range defines a [Start, End) interval of Soroban events. -type Range struct { - // Start defines the (inclusive) start of the range. - Start Cursor - // ClampStart indicates whether Start should be clamped up - // to the earliest ledger available if Start is too low. - ClampStart bool - // End defines the (exclusive) end of the range. - End Cursor - // ClampEnd indicates whether End should be clamped down - // to the latest ledger available if End is too high. - ClampEnd bool -} - -type ScanFunction func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool - -// Scan applies f on all the events occurring in the given range. -// The events are processed in sorted ascending Cursor order. -// If f returns false, the scan terminates early (f will not be applied on -// remaining events in the range). Note that a read lock is held for the -// entire duration of the Scan function so f should be written in a way -// to minimize latency. -func (m *MemoryStore) Scan(eventRange Range, f ScanFunction) (lastLedgerInWindow uint32, err error) { - startTime := time.Now() - defer func() { - if err == nil { - m.eventsDurationMetric.With(prometheus.Labels{"operation": "scan"}). - Observe(time.Since(startTime).Seconds()) - } - }() - - m.lock.RLock() - defer m.lock.RUnlock() - - if err = m.validateRange(&eventRange); err != nil { - return - } - - firstLedgerInRange := eventRange.Start.Ledger - firstLedgerInWindow := m.eventsByLedger.Get(0).LedgerSeq - lastLedgerInWindow = firstLedgerInWindow + (m.eventsByLedger.Len() - 1) - for i := firstLedgerInRange - firstLedgerInWindow; i < m.eventsByLedger.Len(); i++ { - bucket := m.eventsByLedger.Get(i) - events := bucket.BucketContent - if bucket.LedgerSeq == firstLedgerInRange { - // we need to seek for the beginning of the events in the first bucket in the range - events = seek(events, eventRange.Start) - } - timestamp := bucket.LedgerCloseTimestamp - for _, event := range events { - cur := event.cursor(bucket.LedgerSeq) - if eventRange.End.Cmp(cur) <= 0 { - return - } - var diagnosticEvent xdr.DiagnosticEvent - err = xdr.SafeUnmarshal(event.diagnosticEventXDR, &diagnosticEvent) - if err != nil { - return - } - if !f(diagnosticEvent, cur, timestamp, event.txHash) { - return - } - } - } - return -} - -// validateRange checks if the range falls within the bounds -// of the events in the memory store. -// validateRange should be called with the read lock. -func (m *MemoryStore) validateRange(eventRange *Range) error { - if m.eventsByLedger.Len() == 0 { - return errors.New("event store is empty") - } - firstBucket := m.eventsByLedger.Get(0) - min := Cursor{Ledger: firstBucket.LedgerSeq} - if eventRange.Start.Cmp(min) < 0 { - if eventRange.ClampStart { - eventRange.Start = min - } else { - return errors.New("start is before oldest ledger") - } - } - max := Cursor{Ledger: min.Ledger + m.eventsByLedger.Len()} - if eventRange.Start.Cmp(max) >= 0 { - return errors.New("start is after newest ledger") - } - if eventRange.End.Cmp(max) > 0 { - if eventRange.ClampEnd { - eventRange.End = max - } else { - return errors.New("end is after latest ledger") - } - } - - if eventRange.Start.Cmp(eventRange.End) >= 0 { - return errors.New("start is not before end") - } - - return nil -} - -// seek returns the subset of all events which occur -// at a point greater than or equal to the given cursor. -// events must be sorted in ascending order. -func seek(events []event, cursor Cursor) []event { - j := sort.Search(len(events), func(i int) bool { - return cursor.Cmp(events[i].cursor(cursor.Ledger)) <= 0 - }) - return events[j:] -} - -// IngestEvents adds new events from the given ledger into the store. -// As a side effect, events which fall outside the retention window are -// removed from the store. -func (m *MemoryStore) IngestEvents(ledgerCloseMeta xdr.LedgerCloseMeta) error { - startTime := time.Now() - // no need to acquire the lock because the networkPassphrase field - // is immutable - events, err := readEvents(m.networkPassphrase, ledgerCloseMeta) - if err != nil { - return err - } - bucket := ledgerbucketwindow.LedgerBucket[[]event]{ - LedgerSeq: ledgerCloseMeta.LedgerSequence(), - LedgerCloseTimestamp: ledgerCloseMeta.LedgerCloseTime(), - BucketContent: events, - } - m.lock.Lock() - if _, err = m.eventsByLedger.Append(bucket); err != nil { - m.lock.Unlock() - return err - } - m.lock.Unlock() - m.eventsDurationMetric.With(prometheus.Labels{"operation": "ingest"}). - Observe(time.Since(startTime).Seconds()) - m.eventCountMetric.Observe(float64(len(events))) - return nil -} - -func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (events []event, err error) { - var txReader *ingest.LedgerTransactionReader - txReader, err = ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta) - if err != nil { - return - } - defer func() { - closeErr := txReader.Close() - if err == nil { - err = closeErr - } - }() - - for { - var tx ingest.LedgerTransaction - tx, err = txReader.Read() - if err == io.EOF { - err = nil - break - } - if err != nil { - return - } - - if !tx.Result.Successful() { - continue - } - - txEvents, err := tx.GetDiagnosticEvents() - if err != nil { - return nil, err - } - txHash := tx.Result.TransactionHash - for index, e := range txEvents { - diagnosticEventXDR, err := e.MarshalBinary() - if err != nil { - return nil, err - } - events = append(events, event{ - diagnosticEventXDR: diagnosticEventXDR, - txIndex: tx.Index, - eventIndex: uint32(index), - txHash: &txHash, - }) - } - } - return events, err -} - -// GetLedgerRange returns the first and latest ledger available in the store. -func (m *MemoryStore) GetLedgerRange() (ledgerbucketwindow.LedgerRange, error) { - m.lock.RLock() - defer m.lock.RUnlock() - return m.eventsByLedger.GetLedgerRange(), nil -} diff --git a/cmd/soroban-rpc/internal/events/events_test.go b/cmd/soroban-rpc/internal/events/events_test.go deleted file mode 100644 index 50012fe9..00000000 --- a/cmd/soroban-rpc/internal/events/events_test.go +++ /dev/null @@ -1,408 +0,0 @@ -package events - -import ( - "bytes" - "testing" - - "github.com/prometheus/client_golang/prometheus" - dto "github.com/prometheus/client_model/go" - "github.com/stretchr/testify/require" - - "github.com/stellar/go/xdr" - - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" -) - -var ( - ledger5CloseTime = ledgerCloseTime(5) - ledger5Events = []event{ - newEvent(1, 0, 100), - newEvent(1, 1, 200), - newEvent(2, 0, 300), - newEvent(2, 1, 400), - } - ledger6CloseTime = ledgerCloseTime(6) - ledger6Events []event = nil - ledger7CloseTime = ledgerCloseTime(7) - ledger7Events = []event{ - newEvent(1, 0, 500), - } - ledger8CloseTime = ledgerCloseTime(8) - ledger8Events = []event{ - newEvent(1, 0, 600), - newEvent(2, 0, 700), - newEvent(2, 1, 800), - newEvent(2, 2, 900), - newEvent(2, 3, 1000), - } -) - -func ledgerCloseTime(seq uint32) int64 { - return int64(seq)*25 + 100 -} - -func newEvent(txIndex, eventIndex, val uint32) event { - v := xdr.Uint32(val) - - e := xdr.DiagnosticEvent{ - InSuccessfulContractCall: true, - Event: xdr.ContractEvent{ - Type: xdr.ContractEventTypeSystem, - Body: xdr.ContractEventBody{ - V: 0, - V0: &xdr.ContractEventV0{ - Data: xdr.ScVal{ - Type: xdr.ScValTypeScvU32, - U32: &v, - }, - }, - }, - }, - } - diagnosticEventXDR, err := e.MarshalBinary() - if err != nil { - panic(err) - } - return event{ - diagnosticEventXDR: diagnosticEventXDR, - txIndex: txIndex, - eventIndex: eventIndex, - } -} - -func (e event) equals(other event) bool { - return e.txIndex == other.txIndex && - e.eventIndex == other.eventIndex && - bytes.Equal(e.diagnosticEventXDR, other.diagnosticEventXDR) -} - -func eventsAreEqual(t *testing.T, a, b []event) { - require.Equal(t, len(a), len(b)) - for i := range a { - require.True(t, a[i].equals(b[i])) - } -} - -func TestScanRangeValidation(t *testing.T) { - m := NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 4) - assertNoCalls := func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool { - t.Fatalf("unexpected call") - return true - } - _, err := m.Scan(Range{ - Start: MinCursor, - ClampStart: true, - End: MaxCursor, - ClampEnd: true, - }, assertNoCalls) - require.EqualError(t, err, "event store is empty") - - m = createStore(t) - - for _, testCase := range []struct { - input Range - err string - }{ - { - Range{ - Start: MinCursor, - ClampStart: false, - End: MaxCursor, - ClampEnd: true, - }, - "start is before oldest ledger", - }, - { - Range{ - Start: Cursor{Ledger: 4}, - ClampStart: false, - End: MaxCursor, - ClampEnd: true, - }, - "start is before oldest ledger", - }, - { - Range{ - Start: MinCursor, - ClampStart: true, - End: MaxCursor, - ClampEnd: false, - }, - "end is after latest ledger", - }, - { - Range{ - Start: Cursor{Ledger: 5}, - ClampStart: true, - End: Cursor{Ledger: 10}, - ClampEnd: false, - }, - "end is after latest ledger", - }, - { - Range{ - Start: Cursor{Ledger: 10}, - ClampStart: true, - End: Cursor{Ledger: 3}, - ClampEnd: true, - }, - "start is after newest ledger", - }, - { - Range{ - Start: Cursor{Ledger: 10}, - ClampStart: false, - End: Cursor{Ledger: 3}, - ClampEnd: false, - }, - "start is after newest ledger", - }, - { - Range{ - Start: Cursor{Ledger: 9}, - ClampStart: false, - End: Cursor{Ledger: 10}, - ClampEnd: true, - }, - "start is after newest ledger", - }, - { - Range{ - Start: Cursor{Ledger: 9}, - ClampStart: false, - End: Cursor{Ledger: 10}, - ClampEnd: false, - }, - "start is after newest ledger", - }, - { - Range{ - Start: Cursor{Ledger: 2}, - ClampStart: true, - End: Cursor{Ledger: 3}, - ClampEnd: false, - }, - "start is not before end", - }, - { - Range{ - Start: Cursor{Ledger: 2}, - ClampStart: false, - End: Cursor{Ledger: 3}, - ClampEnd: false, - }, - "start is before oldest ledger", - }, - { - Range{ - Start: Cursor{Ledger: 6}, - ClampStart: false, - End: Cursor{Ledger: 6}, - ClampEnd: false, - }, - "start is not before end", - }, - } { - _, err := m.Scan(testCase.input, assertNoCalls) - require.EqualError(t, err, testCase.err, testCase.input) - } -} - -func createStore(t *testing.T) *MemoryStore { - m := NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 4) - m.eventsByLedger.Append(ledgerbucketwindow.LedgerBucket[[]event]{ - LedgerSeq: 5, - LedgerCloseTimestamp: ledger5CloseTime, - BucketContent: ledger5Events, - }) - m.eventsByLedger.Append(ledgerbucketwindow.LedgerBucket[[]event]{ - LedgerSeq: 6, - LedgerCloseTimestamp: ledger6CloseTime, - BucketContent: nil, - }) - m.eventsByLedger.Append(ledgerbucketwindow.LedgerBucket[[]event]{ - LedgerSeq: 7, - LedgerCloseTimestamp: ledger7CloseTime, - BucketContent: ledger7Events, - }) - m.eventsByLedger.Append(ledgerbucketwindow.LedgerBucket[[]event]{ - LedgerSeq: 8, - LedgerCloseTimestamp: ledger8CloseTime, - BucketContent: ledger8Events, - }) - - return m -} - -func concat(slices ...[]event) []event { - var result []event - for _, slice := range slices { - result = append(result, slice...) - } - return result -} - -func getMetricValue(metric prometheus.Metric) *dto.Metric { - value := &dto.Metric{} - err := metric.Write(value) - if err != nil { - panic(err) - } - return value -} - -func TestScan(t *testing.T) { - genEquivalentInputs := func(input Range) []Range { - results := []Range{input} - if !input.ClampStart { - rangeCopy := input - rangeCopy.ClampStart = true - results = append(results, rangeCopy) - } - if !input.ClampEnd { - rangeCopy := input - rangeCopy.ClampEnd = true - results = append(results, rangeCopy) - } - if !input.ClampStart && !input.ClampEnd { - rangeCopy := input - rangeCopy.ClampStart = true - rangeCopy.ClampEnd = true - results = append(results, rangeCopy) - } - return results - } - - for _, testCase := range []struct { - input Range - expected []event - }{ - { - Range{ - Start: MinCursor, - ClampStart: true, - End: MaxCursor, - ClampEnd: true, - }, - concat(ledger5Events, ledger6Events, ledger7Events, ledger8Events), - }, - { - Range{ - Start: Cursor{Ledger: 5}, - ClampStart: false, - End: Cursor{Ledger: 9}, - ClampEnd: false, - }, - concat(ledger5Events, ledger6Events, ledger7Events, ledger8Events), - }, - { - Range{ - Start: Cursor{Ledger: 5, Tx: 2}, - ClampStart: false, - End: Cursor{Ledger: 9}, - ClampEnd: false, - }, - concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events), - }, - { - Range{ - Start: Cursor{Ledger: 5, Tx: 3}, - ClampStart: false, - End: MaxCursor, - ClampEnd: true, - }, - concat(ledger6Events, ledger7Events, ledger8Events), - }, - { - Range{ - Start: Cursor{Ledger: 6}, - ClampStart: false, - End: MaxCursor, - ClampEnd: true, - }, - concat(ledger7Events, ledger8Events), - }, - { - Range{ - Start: Cursor{Ledger: 6, Tx: 1}, - ClampStart: false, - End: MaxCursor, - ClampEnd: true, - }, - concat(ledger7Events, ledger8Events), - }, - { - Range{ - Start: Cursor{Ledger: 8, Tx: 2, Event: 3}, - ClampStart: false, - End: MaxCursor, - ClampEnd: true, - }, - ledger8Events[len(ledger8Events)-1:], - }, - { - Range{ - Start: Cursor{Ledger: 8, Tx: 2, Event: 3}, - ClampStart: false, - End: Cursor{Ledger: 9}, - ClampEnd: false, - }, - ledger8Events[len(ledger8Events)-1:], - }, - { - Range{ - Start: Cursor{Ledger: 5}, - ClampStart: false, - End: Cursor{Ledger: 7}, - ClampEnd: false, - }, - concat(ledger5Events, ledger6Events), - }, - { - Range{ - Start: Cursor{Ledger: 5, Tx: 2}, - ClampStart: false, - End: Cursor{Ledger: 8, Tx: 2}, - ClampEnd: false, - }, - concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events[:1]), - }, - } { - for _, input := range genEquivalentInputs(testCase.input) { - m := createStore(t) - var events []event - iterateAll := true - f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64, hash *xdr.Hash) bool { - require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp) - diagnosticEventXDR, err := contractEvent.MarshalBinary() - require.NoError(t, err) - events = append(events, event{ - diagnosticEventXDR: diagnosticEventXDR, - txIndex: cursor.Tx, - eventIndex: cursor.Event, - txHash: hash, - }) - return iterateAll - } - latest, err := m.Scan(input, f) - require.NoError(t, err) - require.Equal(t, uint32(8), latest) - eventsAreEqual(t, testCase.expected, events) - metric, err := m.eventsDurationMetric.MetricVec.GetMetricWith(prometheus.Labels{ - "operation": "scan", - }) - require.NoError(t, err) - require.Equal(t, uint64(1), getMetricValue(metric).GetSummary().GetSampleCount()) - if len(events) > 0 { - events = nil - iterateAll = false - latest, err := m.Scan(input, f) - require.NoError(t, err) - require.Equal(t, uint64(2), getMetricValue(metric).GetSummary().GetSampleCount()) - require.Equal(t, uint32(8), latest) - eventsAreEqual(t, []event{testCase.expected[0]}, events) - } - } - } -} diff --git a/cmd/soroban-rpc/internal/feewindow/feewindow.go b/cmd/soroban-rpc/internal/feewindow/feewindow.go index 3d662fbc..b1d40bee 100644 --- a/cmd/soroban-rpc/internal/feewindow/feewindow.go +++ b/cmd/soroban-rpc/internal/feewindow/feewindow.go @@ -2,6 +2,7 @@ package feewindow import ( + "errors" "io" "slices" "sync" @@ -9,6 +10,7 @@ import ( "github.com/stellar/go/ingest" "github.com/stellar/go/xdr" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow" ) @@ -128,20 +130,22 @@ type FeeWindows struct { SorobanInclusionFeeWindow *FeeWindow ClassicFeeWindow *FeeWindow networkPassPhrase string + db *db.DB } -func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string) *FeeWindows { +func NewFeeWindows(classicRetention uint32, sorobanRetetion uint32, networkPassPhrase string, db *db.DB) *FeeWindows { return &FeeWindows{ SorobanInclusionFeeWindow: NewFeeWindow(sorobanRetetion), ClassicFeeWindow: NewFeeWindow(classicRetention), networkPassPhrase: networkPassPhrase, + db: db, } } func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(fw.networkPassPhrase, meta) if err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } var sorobanInclusionFees []uint64 var classicFees []uint64 @@ -151,7 +155,7 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { break } if err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } feeCharged := uint64(tx.Result.Result.FeeCharged) ops := tx.Envelope.Operations() @@ -182,11 +186,11 @@ func (fw *FeeWindows) IngestFees(meta xdr.LedgerCloseMeta) error { BucketContent: classicFees, } if err := fw.ClassicFeeWindow.AppendLedgerFees(bucket); err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } bucket.BucketContent = sorobanInclusionFees if err := fw.SorobanInclusionFeeWindow.AppendLedgerFees(bucket); err != nil { - return err + return errors.Join(err, fw.db.Rollback()) } return nil } diff --git a/cmd/soroban-rpc/internal/ingest/mock_db_test.go b/cmd/soroban-rpc/internal/ingest/mock_db_test.go index 6e57658d..7c389d2c 100644 --- a/cmd/soroban-rpc/internal/ingest/mock_db_test.go +++ b/cmd/soroban-rpc/internal/ingest/mock_db_test.go @@ -36,6 +36,15 @@ type MockTx struct { mock.Mock } +func (m *MockTx) EventWriter() db.EventWriter { + args := m.Called() + eventWriter, ok := args.Get(0).(db.EventWriter) + if !ok { + return nil + } + return eventWriter +} + func (m MockTx) LedgerEntryWriter() db.LedgerEntryWriter { args := m.Called() return args.Get(0).(db.LedgerEntryWriter) @@ -96,3 +105,12 @@ func (m MockTransactionWriter) InsertTransactions(ledger xdr.LedgerCloseMeta) er func (m MockTransactionWriter) RegisterMetrics(ingest, count prometheus.Observer) { m.Called(ingest, count) } + +type MockEventWriter struct { + mock.Mock +} + +func (m *MockEventWriter) InsertEvents(ledger xdr.LedgerCloseMeta) error { + args := m.Called(ledger) + return args.Error(0) +} diff --git a/cmd/soroban-rpc/internal/ingest/service.go b/cmd/soroban-rpc/internal/ingest/service.go index 86a2b445..29d278e6 100644 --- a/cmd/soroban-rpc/internal/ingest/service.go +++ b/cmd/soroban-rpc/internal/ingest/service.go @@ -19,7 +19,6 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/util" ) @@ -33,7 +32,6 @@ var errEmptyArchives = fmt.Errorf("cannot start ingestion without history archiv type Config struct { Logger *log.Entry DB db.ReadWriter - EventStore *events.MemoryStore FeeWindows *feewindow.FeeWindows NetworkPassPhrase string Archive historyarchive.ArchiveInterface @@ -81,7 +79,6 @@ func newService(cfg Config) *Service { service := &Service{ logger: cfg.Logger, db: cfg.DB, - eventStore: cfg.EventStore, feeWindows: cfg.FeeWindows, ledgerBackend: cfg.LedgerBackend, networkPassPhrase: cfg.NetworkPassPhrase, @@ -133,7 +130,6 @@ type Metrics struct { type Service struct { logger *log.Entry db db.ReadWriter - eventStore *events.MemoryStore feeWindows *feewindow.FeeWindows ledgerBackend backends.LedgerBackend timeout time.Duration @@ -337,7 +333,7 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge With(prometheus.Labels{"type": "transactions"}). Observe(time.Since(startTime).Seconds()) - if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil { + if err := tx.EventWriter().InsertEvents(ledgerCloseMeta); err != nil { return err } diff --git a/cmd/soroban-rpc/internal/ingest/service_test.go b/cmd/soroban-rpc/internal/ingest/service_test.go index dbeeb1f6..f3f2d523 100644 --- a/cmd/soroban-rpc/internal/ingest/service_test.go +++ b/cmd/soroban-rpc/internal/ingest/service_test.go @@ -17,7 +17,6 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow" ) @@ -45,7 +44,6 @@ func TestRetryRunningIngestion(t *testing.T) { config := Config{ Logger: supportlog.New(), DB: &ErrorReadWriter{}, - EventStore: nil, NetworkPassPhrase: "", Archive: nil, LedgerBackend: nil, @@ -69,8 +67,7 @@ func TestIngestion(t *testing.T) { config := Config{ Logger: supportlog.New(), DB: mockDB, - EventStore: events.NewMemoryStore(daemon, network.TestNetworkPassphrase, 1), - FeeWindows: feewindow.NewFeeWindows(1, 1, network.TestNetworkPassphrase), + FeeWindows: feewindow.NewFeeWindows(1, 1, network.TestNetworkPassphrase, nil), LedgerBackend: mockLedgerBackend, Daemon: daemon, NetworkPassPhrase: network.TestNetworkPassphrase, @@ -81,6 +78,7 @@ func TestIngestion(t *testing.T) { mockLedgerEntryWriter := &MockLedgerEntryWriter{} mockLedgerWriter := &MockLedgerWriter{} mockTxWriter := &MockTransactionWriter{} + mockEventWriter := &MockEventWriter{} ctx := context.Background() mockDB.On("NewTx", ctx).Return(mockTx, nil).Once() mockTx.On("Commit", sequence).Return(nil).Once() @@ -88,6 +86,7 @@ func TestIngestion(t *testing.T) { mockTx.On("LedgerEntryWriter").Return(mockLedgerEntryWriter).Twice() mockTx.On("LedgerWriter").Return(mockLedgerWriter).Once() mockTx.On("TransactionWriter").Return(mockTxWriter).Once() + mockTx.On("EventWriter").Return(mockEventWriter).Once() src := xdr.MustAddress("GBXGQJWVLWOYHFLVTKWV5FGHA3LNYY2JQKM7OAJAUEQFU6LPCSEFVXON") firstTx := xdr.TransactionEnvelope{ @@ -254,6 +253,7 @@ func TestIngestion(t *testing.T) { Return(nil).Once() mockLedgerWriter.On("InsertLedger", ledger).Return(nil).Once() mockTxWriter.On("InsertTransactions", ledger).Return(nil).Once() + mockEventWriter.On("InsertEvents", ledger).Return(nil).Once() assert.NoError(t, service.ingest(ctx, sequence)) mockDB.AssertExpectations(t) diff --git a/cmd/soroban-rpc/internal/jsonrpc.go b/cmd/soroban-rpc/internal/jsonrpc.go index 677b7c46..2f4a5288 100644 --- a/cmd/soroban-rpc/internal/jsonrpc.go +++ b/cmd/soroban-rpc/internal/jsonrpc.go @@ -21,7 +21,6 @@ import ( "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/methods" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/network" @@ -51,9 +50,9 @@ func (h Handler) Close() { } type HandlerParams struct { - EventStore *events.MemoryStore FeeStatWindows *feewindow.FeeWindows TransactionReader db.TransactionReader + EventReader db.EventReader LedgerEntryReader db.LedgerEntryReader LedgerReader db.LedgerReader Logger *log.Entry @@ -162,7 +161,13 @@ func NewJSONRPCHandler(cfg *config.Config, params HandlerParams) Handler { { methodName: "getEvents", underlyingHandler: methods.NewGetEventsHandler( - params.EventStore, cfg.MaxEventsLimit, cfg.DefaultEventsLimit), + params.Logger, + params.EventReader, + cfg.MaxEventsLimit, + cfg.DefaultEventsLimit, + params.LedgerReader, + ), + longName: "get_events", queueLimit: cfg.RequestBacklogGetEventsQueueLimit, requestDurationLimit: cfg.MaxGetEventsExecutionDuration, diff --git a/cmd/soroban-rpc/internal/methods/get_events.go b/cmd/soroban-rpc/internal/methods/get_events.go index db730f09..36aefcdb 100644 --- a/cmd/soroban-rpc/internal/methods/get_events.go +++ b/cmd/soroban-rpc/internal/methods/get_events.go @@ -10,13 +10,23 @@ import ( "github.com/creachadair/jrpc2" "github.com/stellar/go/strkey" + "github.com/stellar/go/support/collections/set" "github.com/stellar/go/support/errors" + "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/xdr2json" ) +const ( + LedgerScanLimit = 10000 + maxContractIDsLimit = 5 + maxTopicsLimit = 5 + maxFiltersLimit = 5 + maxEventTypes = 3 +) + type eventTypeSet map[string]interface{} func (e eventTypeSet) valid() error { @@ -58,6 +68,14 @@ func (e eventTypeSet) MarshalJSON() ([]byte, error) { return json.Marshal(strings.Join(keys, ",")) } +func (e eventTypeSet) Keys() []string { + keys := make([]string, 0, len(e)) + for key := range e { + keys = append(keys, key) + } + return keys +} + func (e eventTypeSet) matches(event xdr.ContractEvent) bool { if len(e) == 0 { return true @@ -87,6 +105,7 @@ type EventInfo struct { type GetEventsRequest struct { StartLedger uint32 `json:"startLedger,omitempty"` + EndLedger uint32 `json:"endLedger,omitempty"` Filters []EventFilter `json:"filters"` Pagination *PaginationOptions `json:"pagination,omitempty"` Format string `json:"xdrFormat,omitempty"` @@ -99,24 +118,24 @@ func (g *GetEventsRequest) Valid(maxLimit uint) error { // Validate the paging limit (if it exists) if g.Pagination != nil && g.Pagination.Cursor != nil { - if g.StartLedger != 0 { - return errors.New("startLedger and cursor cannot both be set") + if g.StartLedger != 0 || g.EndLedger != 0 { + return errors.New("ledger ranges and cursor cannot both be set") //nolint:forbidigo } } else if g.StartLedger <= 0 { - // Validate start return errors.New("startLedger must be positive") } + if g.Pagination != nil && g.Pagination.Limit > maxLimit { return fmt.Errorf("limit must not exceed %d", maxLimit) } // Validate filters - if len(g.Filters) > 5 { + if len(g.Filters) > maxFiltersLimit { return errors.New("maximum 5 filters per request") } for i, filter := range g.Filters { if err := filter.Valid(); err != nil { - return errors.Wrapf(err, "filter %d invalid", i+1) + return fmt.Errorf("filter %d invalid: %w", i+1, err) } } @@ -147,6 +166,14 @@ var eventTypeFromXDR = map[xdr.ContractEventType]string{ xdr.ContractEventTypeDiagnostic: EventTypeDiagnostic, } +func getEventTypeXDRFromEventType() map[string]xdr.ContractEventType { + return map[string]xdr.ContractEventType{ + EventTypeSystem: xdr.ContractEventTypeSystem, + EventTypeContract: xdr.ContractEventTypeContract, + EventTypeDiagnostic: xdr.ContractEventTypeDiagnostic, + } +} + type EventFilter struct { EventType eventTypeSet `json:"type,omitempty"` ContractIDs []string `json:"contractIds,omitempty"` @@ -157,10 +184,10 @@ func (e *EventFilter) Valid() error { if err := e.EventType.valid(); err != nil { return errors.Wrap(err, "filter type invalid") } - if len(e.ContractIDs) > 5 { + if len(e.ContractIDs) > maxContractIDsLimit { return errors.New("maximum 5 contract IDs per filter") } - if len(e.Topics) > 5 { + if len(e.Topics) > maxTopicsLimit { return errors.New("maximum 5 topics per filter") } for i, id := range e.ContractIDs { @@ -171,7 +198,7 @@ func (e *EventFilter) Valid() error { } for i, topic := range e.Topics { if err := topic.Valid(); err != nil { - return errors.Wrapf(err, "topic %d invalid", i+1) + return fmt.Errorf("topic %d invalid: %w", i+1, err) } } return nil @@ -215,21 +242,16 @@ func (e *EventFilter) matchesTopics(event xdr.ContractEvent) bool { type TopicFilter []SegmentFilter -const ( - minTopicCount = 1 - maxTopicCount = 4 -) - func (t *TopicFilter) Valid() error { - if len(*t) < minTopicCount { + if len(*t) < db.MinTopicCount { return errors.New("topic must have at least one segment") } - if len(*t) > maxTopicCount { + if len(*t) > db.MaxTopicCount { return errors.New("topic cannot have more than 4 segments") } for i, segment := range *t { if err := segment.Valid(); err != nil { - return errors.Wrapf(err, "segment %d invalid", i+1) + return fmt.Errorf("segment %d invalid: %w", i+1, err) } } return nil @@ -305,8 +327,8 @@ func (s *SegmentFilter) UnmarshalJSON(p []byte) error { } type PaginationOptions struct { - Cursor *events.Cursor `json:"cursor,omitempty"` - Limit uint `json:"limit,omitempty"` + Cursor *db.Cursor `json:"cursor,omitempty"` + Limit uint `json:"limit,omitempty"` } type GetEventsResponse struct { @@ -314,67 +336,163 @@ type GetEventsResponse struct { LatestLedger uint32 `json:"latestLedger"` } -type eventScanner interface { - Scan(eventRange events.Range, f events.ScanFunction) (uint32, error) -} - type eventsRPCHandler struct { - scanner eventScanner + dbReader db.EventReader maxLimit uint defaultLimit uint + logger *log.Entry + ledgerReader db.LedgerReader +} + +func combineContractIDs(filters []EventFilter) ([][]byte, error) { + contractIDSet := set.NewSet[string](maxFiltersLimit * maxContractIDsLimit) + contractIDs := make([][]byte, 0, len(contractIDSet)) + + for _, filter := range filters { + for _, contractID := range filter.ContractIDs { + if !contractIDSet.Contains(contractID) { + contractIDSet.Add(contractID) + id, err := strkey.Decode(strkey.VersionByteContract, contractID) + if err != nil { + return nil, fmt.Errorf("invalid contract ID: %v", contractID) + } + contractIDs = append(contractIDs, id) + } + } + } + + return contractIDs, nil +} + +func combineEventTypes(filters []EventFilter) []int { + eventTypes := set.NewSet[int](maxEventTypes) + + for _, filter := range filters { + for _, eventType := range filter.EventType.Keys() { + eventTypeXDR := getEventTypeXDRFromEventType()[eventType] + eventTypes.Add(int(eventTypeXDR)) + } + } + uniqueEventTypes := make([]int, 0, maxEventTypes) + for eventType := range eventTypes { + uniqueEventTypes = append(uniqueEventTypes, eventType) + } + return uniqueEventTypes +} + +func combineTopics(filters []EventFilter) ([][][]byte, error) { + encodedTopicsList := make([][][]byte, db.MaxTopicCount) + + for _, filter := range filters { + if len(filter.Topics) == 0 { + return [][][]byte{}, nil + } + + for _, topicFilter := range filter.Topics { + for i, segmentFilter := range topicFilter { + if segmentFilter.wildcard == nil && segmentFilter.scval != nil { + encodedTopic, err := segmentFilter.scval.MarshalBinary() + if err != nil { + return [][][]byte{}, fmt.Errorf("failed to marshal segment: %w", err) + } + encodedTopicsList[i] = append(encodedTopicsList[i], encodedTopic) + } + } + } + } + + return encodedTopicsList, nil } -func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse, error) { +type entry struct { + cursor db.Cursor + ledgerCloseTimestamp int64 + event xdr.DiagnosticEvent + txHash *xdr.Hash +} + +func (h eventsRPCHandler) getEvents(ctx context.Context, request GetEventsRequest) (GetEventsResponse, error) { if err := request.Valid(h.maxLimit); err != nil { return GetEventsResponse{}, &jrpc2.Error{ - Code: jrpc2.InvalidParams, - Message: err.Error(), + Code: jrpc2.InvalidParams, Message: err.Error(), + } + } + + ledgerRange, err := h.ledgerReader.GetLedgerRange(ctx) + if err != nil { + return GetEventsResponse{}, &jrpc2.Error{ + Code: jrpc2.InternalError, Message: err.Error(), } } - start := events.Cursor{Ledger: uint32(request.StartLedger)} + start := db.Cursor{Ledger: request.StartLedger} limit := h.defaultLimit if request.Pagination != nil { if request.Pagination.Cursor != nil { start = *request.Pagination.Cursor - // increment event index because, when paginating, - // we start with the item right after the cursor + // increment event index because, when paginating, we start with the item right after the cursor start.Event++ } if request.Pagination.Limit > 0 { limit = request.Pagination.Limit } } + endLedger := request.StartLedger + LedgerScanLimit - type entry struct { - cursor events.Cursor - ledgerCloseTimestamp int64 - event xdr.DiagnosticEvent - txHash *xdr.Hash - } - var found []entry - latestLedger, err := h.scanner.Scan( - events.Range{ - Start: start, - ClampStart: false, - End: events.MaxCursor, - ClampEnd: true, - }, - func(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCloseTimestamp int64, txHash *xdr.Hash) bool { - if request.Matches(event) { - found = append(found, entry{cursor, ledgerCloseTimestamp, event, txHash}) - } - return uint(len(found)) < limit - }, - ) + if request.EndLedger != 0 { + endLedger = min(request.EndLedger, endLedger) + } + + end := db.Cursor{Ledger: endLedger} + cursorRange := db.CursorRange{Start: start, End: end} + + if start.Ledger < ledgerRange.FirstLedger.Sequence || start.Ledger > ledgerRange.LastLedger.Sequence { + return GetEventsResponse{}, &jrpc2.Error{ + Code: jrpc2.InvalidRequest, + Message: fmt.Sprintf( + "startLedger must be within the ledger range: %d - %d", + ledgerRange.FirstLedger.Sequence, + ledgerRange.LastLedger.Sequence, + ), + } + } + + found := make([]entry, 0, limit) + + contractIDs, err := combineContractIDs(request.Filters) if err != nil { return GetEventsResponse{}, &jrpc2.Error{ - Code: jrpc2.InvalidRequest, - Message: err.Error(), + Code: jrpc2.InvalidParams, Message: err.Error(), } } - results := []EventInfo{} + topics, err := combineTopics(request.Filters) + if err != nil { + return GetEventsResponse{}, &jrpc2.Error{ + Code: jrpc2.InvalidParams, Message: err.Error(), + } + } + + eventTypes := combineEventTypes(request.Filters) + + // Scan function to apply filters + eventScanFunction := func( + event xdr.DiagnosticEvent, cursor db.Cursor, ledgerCloseTimestamp int64, txHash *xdr.Hash, + ) bool { + if request.Matches(event) { + found = append(found, entry{cursor, ledgerCloseTimestamp, event, txHash}) + } + return uint(len(found)) < limit + } + + err = h.dbReader.GetEvents(ctx, cursorRange, contractIDs, topics, eventTypes, eventScanFunction) + if err != nil { + return GetEventsResponse{}, &jrpc2.Error{ + Code: jrpc2.InvalidRequest, Message: err.Error(), + } + } + + results := make([]EventInfo, 0, len(found)) for _, entry := range found { info, err := eventInfoForEvent( entry.event, @@ -388,15 +506,16 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse } results = append(results, info) } + return GetEventsResponse{ - LatestLedger: latestLedger, + LatestLedger: ledgerRange.LastLedger.Sequence, Events: results, }, nil } func eventInfoForEvent( event xdr.DiagnosticEvent, - cursor events.Cursor, + cursor db.Cursor, ledgerClosedAt, txHash, format string, ) (EventInfo, error) { v0, ok := event.Event.Body.GetV0() @@ -422,7 +541,7 @@ func eventInfoForEvent( switch format { case FormatJSON: // json encode the topic - info.TopicJSON = make([]json.RawMessage, 0, maxTopicCount) + info.TopicJSON = make([]json.RawMessage, 0, db.MaxTopicCount) for _, topic := range v0.Topics { topic, err := xdr2json.ConvertInterface(topic) if err != nil { @@ -439,7 +558,7 @@ func eventInfoForEvent( default: // base64-xdr encode the topic - topic := make([]string, 0, maxTopicCount) + topic := make([]string, 0, db.MaxTopicCount) for _, segment := range v0.Topics { seg, err := xdr.MarshalBase64(segment) if err != nil { @@ -467,13 +586,21 @@ func eventInfoForEvent( } // NewGetEventsHandler returns a json rpc handler to fetch and filter events -func NewGetEventsHandler(eventsStore *events.MemoryStore, maxLimit, defaultLimit uint) jrpc2.Handler { +func NewGetEventsHandler( + logger *log.Entry, + dbReader db.EventReader, + maxLimit uint, + defaultLimit uint, + ledgerReader db.LedgerReader, +) jrpc2.Handler { eventsHandler := eventsRPCHandler{ - scanner: eventsStore, + dbReader: dbReader, maxLimit: maxLimit, defaultLimit: defaultLimit, + logger: logger, + ledgerReader: ledgerReader, } return NewHandler(func(ctx context.Context, request GetEventsRequest) (GetEventsResponse, error) { - return eventsHandler.getEvents(request) + return eventsHandler.getEvents(ctx, request) }) } diff --git a/cmd/soroban-rpc/internal/methods/get_events_test.go b/cmd/soroban-rpc/internal/methods/get_events_test.go index e5b5c464..82cf8cf0 100644 --- a/cmd/soroban-rpc/internal/methods/get_events_test.go +++ b/cmd/soroban-rpc/internal/methods/get_events_test.go @@ -1,28 +1,34 @@ package methods import ( + "context" "encoding/json" "fmt" + "path" + "strconv" "strings" "testing" "time" + "github.com/sirupsen/logrus" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/stellar/go/keypair" "github.com/stellar/go/network" "github.com/stellar/go/strkey" + "github.com/stellar/go/support/log" "github.com/stellar/go/xdr" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/daemon/interfaces" - "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events" + "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db" "github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/xdr2json" ) +var passphrase = "passphrase" + func TestEventTypeSetMatches(t *testing.T) { var defaultSet eventTypeSet - all := eventTypeSet{} all[EventTypeContract] = nil all[EventTypeDiagnostic] = nil @@ -412,8 +418,8 @@ func TestGetEventsRequestValid(t *testing.T) { assert.EqualError(t, (&GetEventsRequest{ StartLedger: 1, Filters: []EventFilter{}, - Pagination: &PaginationOptions{Cursor: &events.Cursor{}}, - }).Valid(1000), "startLedger and cursor cannot both be set") + Pagination: &PaginationOptions{Cursor: &db.Cursor{}}, + }).Valid(1000), "ledger ranges and cursor cannot both be set") assert.NoError(t, (&GetEventsRequest{ StartLedger: 1, @@ -526,22 +532,19 @@ func TestGetEvents(t *testing.T) { counterXdr, err := xdr.MarshalBase64(counterScVal) assert.NoError(t, err) - t.Run("empty", func(t *testing.T) { - store := events.NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 100) - handler := eventsRPCHandler{ - scanner: store, - maxLimit: 10000, - defaultLimit: 100, - } - _, err = handler.getEvents(GetEventsRequest{ - StartLedger: 1, - }) - assert.EqualError(t, err, "[-32600] event store is empty") - }) - t.Run("startLedger validation", func(t *testing.T) { contractID := xdr.Hash([32]byte{}) - store := events.NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 100) + dbx := newTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + + writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + ledgerW, eventW := write.LedgerWriter(), write.EventWriter() + store := db.NewEventReader(log, dbx, passphrase) + var txMeta []xdr.TransactionMeta txMeta = append(txMeta, transactionMetaWithEvents( contractEvent( @@ -556,29 +559,45 @@ func TestGetEvents(t *testing.T) { }, ), )) - assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(2, now.Unix(), txMeta...))) + + ledgerCloseMeta := ledgerCloseMetaWithEvents(2, now.Unix(), txMeta...) + require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger ") + assert.NoError(t, eventW.InsertEvents(ledgerCloseMeta)) + require.NoError(t, write.Commit(2)) handler := eventsRPCHandler{ - scanner: store, + dbReader: store, maxLimit: 10000, defaultLimit: 100, + ledgerReader: db.NewLedgerReader(dbx), } - _, err = handler.getEvents(GetEventsRequest{ + _, err = handler.getEvents(context.TODO(), GetEventsRequest{ StartLedger: 1, }) - assert.EqualError(t, err, "[-32600] start is before oldest ledger") + require.EqualError(t, err, "[-32600] startLedger must be within the ledger range: 2 - 2") - _, err = handler.getEvents(GetEventsRequest{ + _, err = handler.getEvents(context.TODO(), GetEventsRequest{ StartLedger: 3, }) - assert.EqualError(t, err, "[-32600] start is after newest ledger") + require.EqualError(t, err, "[-32600] startLedger must be within the ledger range: 2 - 2") }) t.Run("no filtering returns all", func(t *testing.T) { + dbx := newTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + + writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + + ledgerW, eventW := write.LedgerWriter(), write.EventWriter() + store := db.NewEventReader(log, dbx, passphrase) + contractID := xdr.Hash([32]byte{}) - store := events.NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 100) var txMeta []xdr.TransactionMeta - for i := 0; i < 10; i++ { + for range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} { txMeta = append(txMeta, transactionMetaWithEvents( contractEvent( contractID, @@ -593,22 +612,26 @@ func TestGetEvents(t *testing.T) { ), )) } + ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) - assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) + require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger ") + assert.NoError(t, eventW.InsertEvents(ledgerCloseMeta)) + require.NoError(t, write.Commit(1)) handler := eventsRPCHandler{ - scanner: store, + dbReader: store, maxLimit: 10000, defaultLimit: 100, + ledgerReader: db.NewLedgerReader(dbx), } - results, err := handler.getEvents(GetEventsRequest{ + results, err := handler.getEvents(context.TODO(), GetEventsRequest{ StartLedger: 1, }) assert.NoError(t, err) var expected []EventInfo for i := range txMeta { - id := events.Cursor{ + id := db.Cursor{ Ledger: 1, Tx: uint32(i + 1), Op: 0, @@ -636,16 +659,27 @@ func TestGetEvents(t *testing.T) { }) t.Run("filtering by contract id", func(t *testing.T) { - store := events.NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 100) + dbx := newTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + + writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + + ledgerW, eventW := write.LedgerWriter(), write.EventWriter() + store := db.NewEventReader(log, dbx, passphrase) + var txMeta []xdr.TransactionMeta - contractIds := []xdr.Hash{ + contractIDs := []xdr.Hash{ xdr.Hash([32]byte{}), xdr.Hash([32]byte{1}), } for i := 0; i < 5; i++ { txMeta = append(txMeta, transactionMetaWithEvents( contractEvent( - contractIds[i%len(contractIds)], + contractIDs[i%len(contractIDs)], xdr.ScVec{xdr.ScVal{ Type: xdr.ScValTypeScvSymbol, Sym: &counter, @@ -657,26 +691,31 @@ func TestGetEvents(t *testing.T) { ), )) } - assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) + + ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) + require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger ") + require.NoError(t, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events ") + require.NoError(t, write.Commit(2)) handler := eventsRPCHandler{ - scanner: store, + dbReader: store, maxLimit: 10000, defaultLimit: 100, + ledgerReader: db.NewLedgerReader(dbx), } - results, err := handler.getEvents(GetEventsRequest{ + results, err := handler.getEvents(context.TODO(), GetEventsRequest{ StartLedger: 1, Filters: []EventFilter{ - {ContractIDs: []string{strkey.MustEncode(strkey.VersionByteContract, contractIds[0][:])}}, + {ContractIDs: []string{strkey.MustEncode(strkey.VersionByteContract, contractIDs[0][:])}}, }, }) assert.NoError(t, err) assert.Equal(t, uint32(1), results.LatestLedger) expectedIds := []string{ - events.Cursor{Ledger: 1, Tx: 1, Op: 0, Event: 0}.String(), - events.Cursor{Ledger: 1, Tx: 3, Op: 0, Event: 0}.String(), - events.Cursor{Ledger: 1, Tx: 5, Op: 0, Event: 0}.String(), + db.Cursor{Ledger: 1, Tx: 1, Op: 0, Event: 0}.String(), + db.Cursor{Ledger: 1, Tx: 3, Op: 0, Event: 0}.String(), + db.Cursor{Ledger: 1, Tx: 5, Op: 0, Event: 0}.String(), } eventIds := []string{} for _, event := range results.Events { @@ -686,10 +725,21 @@ func TestGetEvents(t *testing.T) { }) t.Run("filtering by topic", func(t *testing.T) { - store := events.NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 100) + dbx := newTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + + writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + + ledgerW, eventW := write.LedgerWriter(), write.EventWriter() + store := db.NewEventReader(log, dbx, passphrase) + var txMeta []xdr.TransactionMeta contractID := xdr.Hash([32]byte{}) - for i := 0; i < 10; i++ { + for i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} { number := xdr.Uint64(i) txMeta = append(txMeta, transactionMetaWithEvents( // Generate a unique topic like /counter/4 for each event so we can check @@ -704,15 +754,19 @@ func TestGetEvents(t *testing.T) { )) } ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) - assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) + + require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger ") + require.NoError(t, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events ") + require.NoError(t, write.Commit(1)) number := xdr.Uint64(4) handler := eventsRPCHandler{ - scanner: store, + dbReader: store, maxLimit: 10000, defaultLimit: 100, + ledgerReader: db.NewLedgerReader(dbx), } - results, err := handler.getEvents(GetEventsRequest{ + results, err := handler.getEvents(context.TODO(), GetEventsRequest{ StartLedger: 1, Filters: []EventFilter{ {Topics: []TopicFilter{ @@ -725,7 +779,7 @@ func TestGetEvents(t *testing.T) { }) assert.NoError(t, err) - id := events.Cursor{Ledger: 1, Tx: 5, Op: 0, Event: 0}.String() + id := db.Cursor{Ledger: 1, Tx: 5, Op: 0, Event: 0}.String() assert.NoError(t, err) scVal := xdr.ScVal{ Type: xdr.ScValTypeScvU64, @@ -749,7 +803,7 @@ func TestGetEvents(t *testing.T) { } assert.Equal(t, GetEventsResponse{expected, 1}, results) - results, err = handler.getEvents(GetEventsRequest{ + results, err = handler.getEvents(ctx, GetEventsRequest{ StartLedger: 1, Format: FormatJSON, Filters: []EventFilter{ @@ -785,7 +839,18 @@ func TestGetEvents(t *testing.T) { }) t.Run("filtering by both contract id and topic", func(t *testing.T) { - store := events.NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 100) + dbx := newTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + + writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + + ledgerW, eventW := write.LedgerWriter(), write.EventWriter() + store := db.NewEventReader(log, dbx, passphrase) + contractID := xdr.Hash([32]byte{}) otherContractID := xdr.Hash([32]byte{1}) number := xdr.Uint64(1) @@ -834,14 +899,18 @@ func TestGetEvents(t *testing.T) { ), } ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) - assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) + + require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger ") + require.NoError(t, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events ") + require.NoError(t, write.Commit(1)) handler := eventsRPCHandler{ - scanner: store, + dbReader: store, maxLimit: 10000, defaultLimit: 100, + ledgerReader: db.NewLedgerReader(dbx), } - results, err := handler.getEvents(GetEventsRequest{ + results, err := handler.getEvents(context.TODO(), GetEventsRequest{ StartLedger: 1, Filters: []EventFilter{ { @@ -857,7 +926,7 @@ func TestGetEvents(t *testing.T) { }) assert.NoError(t, err) - id := events.Cursor{Ledger: 1, Tx: 4, Op: 0, Event: 0}.String() + id := db.Cursor{Ledger: 1, Tx: 4, Op: 0, Event: 0}.String() value, err := xdr.MarshalBase64(xdr.ScVal{ Type: xdr.ScValTypeScvU64, U64: &number, @@ -881,7 +950,17 @@ func TestGetEvents(t *testing.T) { }) t.Run("filtering by event type", func(t *testing.T) { - store := events.NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 100) + dbx := newTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + + writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + ledgerW, eventW := write.LedgerWriter(), write.EventWriter() + store := db.NewEventReader(log, dbx, passphrase) + contractID := xdr.Hash([32]byte{}) txMeta := []xdr.TransactionMeta{ transactionMetaWithEvents( @@ -909,14 +988,17 @@ func TestGetEvents(t *testing.T) { ), } ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) - assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) + require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger ") + require.NoError(t, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events ") + require.NoError(t, write.Commit(1)) handler := eventsRPCHandler{ - scanner: store, + dbReader: store, maxLimit: 10000, defaultLimit: 100, + ledgerReader: db.NewLedgerReader(dbx), } - results, err := handler.getEvents(GetEventsRequest{ + results, err := handler.getEvents(context.TODO(), GetEventsRequest{ StartLedger: 1, Filters: []EventFilter{ {EventType: map[string]interface{}{EventTypeSystem: nil}}, @@ -924,7 +1006,7 @@ func TestGetEvents(t *testing.T) { }) assert.NoError(t, err) - id := events.Cursor{Ledger: 1, Tx: 1, Op: 0, Event: 1}.String() + id := db.Cursor{Ledger: 1, Tx: 1, Op: 0, Event: 1}.String() expected := []EventInfo{ { EventType: EventTypeSystem, @@ -943,7 +1025,18 @@ func TestGetEvents(t *testing.T) { }) t.Run("with limit", func(t *testing.T) { - store := events.NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 100) + dbx := newTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + + writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + + ledgerW, eventW := write.LedgerWriter(), write.EventWriter() + store := db.NewEventReader(log, dbx, passphrase) + contractID := xdr.Hash([32]byte{}) var txMeta []xdr.TransactionMeta for i := 0; i < 180; i++ { @@ -959,14 +1052,17 @@ func TestGetEvents(t *testing.T) { )) } ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) - assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) + require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger ") + require.NoError(t, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events ") + require.NoError(t, write.Commit(1)) handler := eventsRPCHandler{ - scanner: store, + dbReader: store, maxLimit: 10000, defaultLimit: 100, + ledgerReader: db.NewLedgerReader(dbx), } - results, err := handler.getEvents(GetEventsRequest{ + results, err := handler.getEvents(context.TODO(), GetEventsRequest{ StartLedger: 1, Filters: []EventFilter{}, Pagination: &PaginationOptions{Limit: 10}, @@ -975,7 +1071,7 @@ func TestGetEvents(t *testing.T) { var expected []EventInfo for i := 0; i < 10; i++ { - id := events.Cursor{ + id := db.Cursor{ Ledger: 1, Tx: uint32(i + 1), Op: 0, @@ -1000,7 +1096,18 @@ func TestGetEvents(t *testing.T) { }) t.Run("with cursor", func(t *testing.T) { - store := events.NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 100) + dbx := newTestDB(t) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + + writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(t, err) + + ledgerW, eventW := write.LedgerWriter(), write.EventWriter() + store := db.NewEventReader(log, dbx, passphrase) + contractID := xdr.Hash([32]byte{}) datas := []xdr.ScSymbol{ // ledger/transaction/operation/event @@ -1044,15 +1151,18 @@ func TestGetEvents(t *testing.T) { ), } ledgerCloseMeta := ledgerCloseMetaWithEvents(5, now.Unix(), txMeta...) - assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) + require.NoError(t, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger ") + require.NoError(t, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events ") + require.NoError(t, write.Commit(4)) - id := &events.Cursor{Ledger: 5, Tx: 1, Op: 0, Event: 0} + id := &db.Cursor{Ledger: 5, Tx: 1, Op: 0, Event: 0} handler := eventsRPCHandler{ - scanner: store, + dbReader: store, maxLimit: 10000, defaultLimit: 100, + ledgerReader: db.NewLedgerReader(dbx), } - results, err := handler.getEvents(GetEventsRequest{ + results, err := handler.getEvents(context.TODO(), GetEventsRequest{ Pagination: &PaginationOptions{ Cursor: id, Limit: 2, @@ -1062,8 +1172,8 @@ func TestGetEvents(t *testing.T) { var expected []EventInfo expectedIDs := []string{ - events.Cursor{Ledger: 5, Tx: 1, Op: 0, Event: 1}.String(), - events.Cursor{Ledger: 5, Tx: 2, Op: 0, Event: 0}.String(), + db.Cursor{Ledger: 5, Tx: 1, Op: 0, Event: 1}.String(), + db.Cursor{Ledger: 5, Tx: 2, Op: 0, Event: 0}.String(), } symbols := datas[1:3] for i, id := range expectedIDs { @@ -1084,9 +1194,9 @@ func TestGetEvents(t *testing.T) { } assert.Equal(t, GetEventsResponse{expected, 5}, results) - results, err = handler.getEvents(GetEventsRequest{ + results, err = handler.getEvents(context.TODO(), GetEventsRequest{ Pagination: &PaginationOptions{ - Cursor: &events.Cursor{Ledger: 5, Tx: 2, Op: 0, Event: 1}, + Cursor: &db.Cursor{Ledger: 5, Tx: 2, Op: 0, Event: 1}, Limit: 2, }, }) @@ -1095,44 +1205,111 @@ func TestGetEvents(t *testing.T) { }) } +func BenchmarkGetEvents(b *testing.B) { + var counters [10]xdr.ScSymbol + for i := 0; i < len(counters); i++ { + counters[i] = xdr.ScSymbol("TEST-COUNTER-" + strconv.Itoa(i+1)) + } + // counter := xdr.ScSymbol("COUNTER") + // requestedCounter := xdr.ScSymbol("REQUESTED") + dbx := newTestDB(b) + ctx := context.TODO() + log := log.DefaultLogger + log.SetLevel(logrus.TraceLevel) + store := db.NewEventReader(log, dbx, passphrase) + contractID := xdr.Hash([32]byte{}) + now := time.Now().UTC() + + writer := db.NewReadWriter(log, dbx, interfaces.MakeNoOpDeamon(), 10, 10, passphrase) + write, err := writer.NewTx(ctx) + require.NoError(b, err) + ledgerW, eventW := write.LedgerWriter(), write.EventWriter() + + for i := range []int{1, 2, 3} { + txMeta := getTxMetaWithContractEvents(contractID) + ledgerCloseMeta := ledgerCloseMetaWithEvents(uint32(i), now.Unix(), txMeta...) + require.NoError(b, ledgerW.InsertLedger(ledgerCloseMeta), "ingestion failed for ledger ") + require.NoError(b, eventW.InsertEvents(ledgerCloseMeta), "ingestion failed for events ") + } + require.NoError(b, write.Commit(1)) + + handler := eventsRPCHandler{ + dbReader: store, + maxLimit: 10000, + defaultLimit: 100, + ledgerReader: db.NewLedgerReader(dbx), + } + + request := GetEventsRequest{ + StartLedger: 1, + Filters: []EventFilter{ + { + Topics: []TopicFilter{ + []SegmentFilter{ + {scval: &xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counters[1]}}, + }, + }, + }, + }, + } + + b.ResetTimer() + b.ReportAllocs() + + for i := 0; i < b.N; i++ { + _, err := handler.getEvents(ctx, request) + if err != nil { + b.Errorf("getEvents failed: %v", err) + } + } + + totalNs := b.Elapsed() + nsPerOp := totalNs.Nanoseconds() / int64(b.N) + msPerOp := float64(nsPerOp) / 1e6 + log.Infof("Benchmark Results: %v ms/op ", msPerOp) +} + +func getTxMetaWithContractEvents(contractID xdr.Hash) []xdr.TransactionMeta { + var counters [1000]xdr.ScSymbol + for j := 0; j < len(counters); j++ { + counters[j] = xdr.ScSymbol("TEST-COUNTER-" + strconv.Itoa(j+1)) + } + + events := make([]xdr.ContractEvent, 0, 10) + for i := range []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9} { + contractEvent := contractEvent( + contractID, + xdr.ScVec{ + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counters[i]}, + }, + xdr.ScVal{Type: xdr.ScValTypeScvSymbol, Sym: &counters[i]}, + ) + events = append(events, contractEvent) + } + + txMeta := []xdr.TransactionMeta{ + transactionMetaWithEvents( + events..., + ), + } + return txMeta +} + func ledgerCloseMetaWithEvents(sequence uint32, closeTimestamp int64, txMeta ...xdr.TransactionMeta) xdr.LedgerCloseMeta { var txProcessing []xdr.TransactionResultMeta var phases []xdr.TransactionPhase for _, item := range txMeta { - var operations []xdr.Operation - for range item.MustV3().SorobanMeta.Events { - operations = append(operations, - xdr.Operation{ - Body: xdr.OperationBody{ - Type: xdr.OperationTypeInvokeHostFunction, - InvokeHostFunctionOp: &xdr.InvokeHostFunctionOp{ - HostFunction: xdr.HostFunction{ - Type: xdr.HostFunctionTypeHostFunctionTypeInvokeContract, - InvokeContract: &xdr.InvokeContractArgs{ - ContractAddress: xdr.ScAddress{ - Type: xdr.ScAddressTypeScAddressTypeContract, - ContractId: &xdr.Hash{0x1, 0x2}, - }, - FunctionName: "foo", - Args: nil, - }, - }, - Auth: []xdr.SorobanAuthorizationEntry{}, - }, - }, - }) - } envelope := xdr.TransactionEnvelope{ Type: xdr.EnvelopeTypeEnvelopeTypeTx, V1: &xdr.TransactionV1Envelope{ Tx: xdr.Transaction{ SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()), - Operations: operations, + // Operations: operations, }, }, } - txHash, err := network.HashTransactionInEnvelope(envelope, "unit-tests") + txHash, err := network.HashTransactionInEnvelope(envelope, "passphrase") if err != nil { panic(err) } @@ -1141,6 +1318,7 @@ func ledgerCloseMetaWithEvents(sequence uint32, closeTimestamp int64, txMeta ... TxApplyProcessing: item, Result: xdr.TransactionResultPair{ TransactionHash: txHash, + Result: transactionResult(true), }, }) components := []xdr.TxSetComponent{ @@ -1184,12 +1362,18 @@ func ledgerCloseMetaWithEvents(sequence uint32, closeTimestamp int64, txMeta ... } func transactionMetaWithEvents(events ...xdr.ContractEvent) xdr.TransactionMeta { + counter := xdr.ScSymbol("COUNTER") + return xdr.TransactionMeta{ V: 3, Operations: &[]xdr.OperationMeta{}, V3: &xdr.TransactionMetaV3{ SorobanMeta: &xdr.SorobanTransactionMeta{ Events: events, + ReturnValue: xdr.ScVal{ + Type: xdr.ScValTypeScvSymbol, + Sym: &counter, + }, }, }, } @@ -1236,3 +1420,14 @@ func diagnosticEvent(contractID xdr.Hash, topic []xdr.ScVal, body xdr.ScVal) xdr }, } } + +func newTestDB(tb testing.TB) *db.DB { + tmp := tb.TempDir() + dbPath := path.Join(tmp, "dbx.sqlite") + db, err := db.OpenSQLiteDB(dbPath) + require.NoError(tb, err) + tb.Cleanup(func() { + assert.NoError(tb, db.Close()) + }) + return db +} diff --git a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go index affa0536..c86e9d9a 100644 --- a/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go +++ b/cmd/soroban-rpc/internal/methods/get_latest_ledger_test.go @@ -64,6 +64,15 @@ func (ledgerReader *ConstantLedgerReader) StreamAllLedgers(_ context.Context, _ return nil } +func (ledgerReader *ConstantLedgerReader) StreamLedgerRange( + _ context.Context, + _ uint32, + _ uint32, + _ db.StreamLedgerFn, +) error { + return nil +} + func createLedger(ledgerSequence uint32, protocolVersion uint32, hash byte) xdr.LedgerCloseMeta { return xdr.LedgerCloseMeta{ V: 1,