From 67f8e8a3074908d4592ab047ad54c63a62443567 Mon Sep 17 00:00:00 2001 From: Aditya Vyas Date: Thu, 25 Jan 2024 12:11:01 -0500 Subject: [PATCH 1/3] soroban-rpc: Remove panics from internal codebase (#1167) * Remove panic - 1 * Remove panic - 2 * Remove panic - 3 * Remove panic - 4 * Small changes - 1 * undo changes in Get() func * undo changes - 2 * undo changes - 3 * add test for append error (cherry picked from commit b6671e2d02fef7063a9364cc8af17ef3152f6c20) --- .../internal/config/config_option.go | 4 +- .../internal/config/config_option_test.go | 11 ++ cmd/soroban-rpc/internal/config/log_format.go | 12 +- cmd/soroban-rpc/internal/config/options.go | 2 +- .../ledgerbucketwindow/ledgerbucketwindow.go | 6 +- .../ledgerbucketwindow_test.go | 123 +++++++----------- .../internal/transactions/transactions.go | 5 +- 7 files changed, 73 insertions(+), 90 deletions(-) diff --git a/cmd/soroban-rpc/internal/config/config_option.go b/cmd/soroban-rpc/internal/config/config_option.go index 86eab8e7..3c8ca87a 100644 --- a/cmd/soroban-rpc/internal/config/config_option.go +++ b/cmd/soroban-rpc/internal/config/config_option.go @@ -88,8 +88,6 @@ func (o *ConfigOption) setValue(i interface{}) (err error) { if o.CustomSetValue != nil { return o.CustomSetValue(o, i) } - // it's unfortunate that Set below panics when it cannot set the value.. - // we'll want to catch this so that we can alert the user nicely. defer func() { if recoverRes := recover(); recoverRes != nil { var ok bool @@ -101,7 +99,7 @@ func (o *ConfigOption) setValue(i interface{}) (err error) { } }() parser := func(option *ConfigOption, i interface{}) error { - panic(fmt.Sprintf("no parser for flag %s", o.Name)) + return errors.Errorf("no parser for flag %s", o.Name) } switch o.ConfigKey.(type) { case *bool: diff --git a/cmd/soroban-rpc/internal/config/config_option_test.go b/cmd/soroban-rpc/internal/config/config_option_test.go index 831c8865..a6309cb3 100644 --- a/cmd/soroban-rpc/internal/config/config_option_test.go +++ b/cmd/soroban-rpc/internal/config/config_option_test.go @@ -4,6 +4,7 @@ import ( "fmt" "math" "testing" + "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -115,6 +116,16 @@ func TestUnassignableField(t *testing.T) { require.Contains(t, err.Error(), co.Name) } +func TestNoParserForFlag(t *testing.T) { + var co ConfigOption + var invalidKey []time.Duration + co.Name = "mykey" + co.ConfigKey = &invalidKey + err := co.setValue("abc") + require.Error(t, err) + require.Contains(t, err.Error(), "no parser for flag mykey") +} + func TestSetValue(t *testing.T) { var b bool var i int diff --git a/cmd/soroban-rpc/internal/config/log_format.go b/cmd/soroban-rpc/internal/config/log_format.go index 076e43e6..1ab4c7fc 100644 --- a/cmd/soroban-rpc/internal/config/log_format.go +++ b/cmd/soroban-rpc/internal/config/log_format.go @@ -1,6 +1,8 @@ package config -import "fmt" +import ( + "fmt" +) type LogFormat int @@ -47,13 +49,13 @@ func (f *LogFormat) UnmarshalTOML(i interface{}) error { } } -func (f LogFormat) String() string { +func (f LogFormat) String() (string, error) { switch f { case LogFormatText: - return "text" + return "text", nil case LogFormatJSON: - return "json" + return "json", nil default: - panic(fmt.Sprintf("unknown log format: %d", f)) + return "", fmt.Errorf("unknown log format: %d", f) } } diff --git a/cmd/soroban-rpc/internal/config/options.go b/cmd/soroban-rpc/internal/config/options.go index cecfb2e7..d38dd7cd 100644 --- a/cmd/soroban-rpc/internal/config/options.go +++ b/cmd/soroban-rpc/internal/config/options.go @@ -130,7 +130,7 @@ func (cfg *Config) options() ConfigOptions { return nil }, MarshalTOML: func(option *ConfigOption) (interface{}, error) { - return cfg.LogFormat.String(), nil + return cfg.LogFormat.String() }, }, { diff --git a/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go b/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go index 0d447e71..8234b607 100644 --- a/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go +++ b/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow.go @@ -35,12 +35,12 @@ func NewLedgerBucketWindow[T any](retentionWindow uint32) *LedgerBucketWindow[T] } // Append adds a new bucket to the window. If the window is full a bucket will be evicted and returned. -func (w *LedgerBucketWindow[T]) Append(bucket LedgerBucket[T]) *LedgerBucket[T] { +func (w *LedgerBucketWindow[T]) Append(bucket LedgerBucket[T]) (*LedgerBucket[T], error) { length := w.Len() if length > 0 { expectedLedgerSequence := w.buckets[w.start].LedgerSeq + length if expectedLedgerSequence != bucket.LedgerSeq { - panic(fmt.Errorf("ledgers not contiguous: expected ledger sequence %v but received %v", expectedLedgerSequence, bucket.LedgerSeq)) + return &LedgerBucket[T]{}, fmt.Errorf("error appending ledgers: ledgers not contiguous: expected ledger sequence %v but received %v", expectedLedgerSequence, bucket.LedgerSeq) } } @@ -57,7 +57,7 @@ func (w *LedgerBucketWindow[T]) Append(bucket LedgerBucket[T]) *LedgerBucket[T] w.start = (w.start + 1) % length } - return evicted + return evicted, nil } // Len returns the length (number of buckets in the window) diff --git a/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow_test.go b/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow_test.go index b472af0b..2e50ed6d 100644 --- a/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow_test.go +++ b/cmd/soroban-rpc/internal/ledgerbucketwindow/ledgerbucketwindow_test.go @@ -18,118 +18,87 @@ func TestAppend(t *testing.T) { m := NewLedgerBucketWindow[uint32](3) require.Equal(t, uint32(0), m.Len()) - // test appending first bucket of events - evicted := m.Append(bucket(5)) + // Test appending first bucket of events + evicted, err := m.Append(bucket(5)) + require.NoError(t, err) require.Nil(t, evicted) require.Equal(t, uint32(1), m.Len()) require.Equal(t, bucket(5), *m.Get(0)) - // the next bucket must follow the previous bucket (ledger 5) - - require.PanicsWithError( - t, "ledgers not contiguous: expected ledger sequence 6 but received 10", - func() { - m.Append(LedgerBucket[uint32]{ - LedgerSeq: 10, - LedgerCloseTimestamp: 100, - BucketContent: 10, - }) - }, - ) - require.PanicsWithError( - t, "ledgers not contiguous: expected ledger sequence 6 but received 4", - func() { - m.Append(LedgerBucket[uint32]{ - LedgerSeq: 4, - LedgerCloseTimestamp: 100, - BucketContent: 4, - }) - }, - ) - require.PanicsWithError( - t, "ledgers not contiguous: expected ledger sequence 6 but received 5", - func() { - m.Append(LedgerBucket[uint32]{ - LedgerSeq: 5, - LedgerCloseTimestamp: 100, - BucketContent: 5, - }) - }, - ) + // The next bucket must follow the previous bucket (ledger 5) + _, err = m.Append(LedgerBucket[uint32]{ + LedgerSeq: 10, + LedgerCloseTimestamp: 100, + BucketContent: 10, + }) + require.Errorf(t, err, "ledgers not contiguous: expected ledger sequence 6 but received 10") + + _, err = m.Append(LedgerBucket[uint32]{ + LedgerSeq: 4, + LedgerCloseTimestamp: 100, + BucketContent: 4, + }) + require.Errorf(t, err, "ledgers not contiguous: expected ledger sequence 6 but received 4") + // check that none of the calls above modified our buckets require.Equal(t, uint32(1), m.Len()) require.Equal(t, bucket(5), *m.Get(0)) - // append ledger 6 bucket, now we have two buckets filled - evicted = m.Append(bucket(6)) + // Append ledger 6 bucket, now we have two buckets filled + evicted, err = m.Append(bucket(6)) + require.NoError(t, err) require.Nil(t, evicted) require.Equal(t, uint32(2), m.Len()) require.Equal(t, bucket(5), *m.Get(0)) require.Equal(t, bucket(6), *m.Get(1)) - // the next bucket of events must follow the previous bucket (ledger 6) - require.PanicsWithError( - t, "ledgers not contiguous: expected ledger sequence 7 but received 10", - func() { - m.Append(LedgerBucket[uint32]{ - LedgerSeq: 10, - LedgerCloseTimestamp: 100, - BucketContent: 10, - }) - }, - ) - require.PanicsWithError( - t, "ledgers not contiguous: expected ledger sequence 7 but received 4", - func() { - m.Append(LedgerBucket[uint32]{ - LedgerSeq: 4, - LedgerCloseTimestamp: 100, - BucketContent: 4, - }) - }, - ) - require.PanicsWithError( - t, "ledgers not contiguous: expected ledger sequence 7 but received 5", - func() { - m.Append(LedgerBucket[uint32]{ - LedgerSeq: 5, - LedgerCloseTimestamp: 100, - BucketContent: 5, - }) - }, - ) - - // append ledger 7, now we have all three buckets filled - evicted = m.Append(bucket(7)) - require.Nil(t, evicted) + // Append ledger 7, now we have all three buckets filled + evicted, err = m.Append(bucket(7)) + require.NoError(t, err) require.Nil(t, evicted) require.Equal(t, uint32(3), m.Len()) require.Equal(t, bucket(5), *m.Get(0)) require.Equal(t, bucket(6), *m.Get(1)) require.Equal(t, bucket(7), *m.Get(2)) - // append ledger 8, but all buckets are full, so we need to evict ledger 5 - evicted = m.Append(bucket(8)) + // Append ledger 8, but all buckets are full, so we need to evict ledger 5 + evicted, err = m.Append(bucket(8)) + require.NoError(t, err) require.Equal(t, bucket(5), *evicted) require.Equal(t, uint32(3), m.Len()) require.Equal(t, bucket(6), *m.Get(0)) require.Equal(t, bucket(7), *m.Get(1)) require.Equal(t, bucket(8), *m.Get(2)) - // append ledger 9 events, but all buckets are full, so we need to evict ledger 6 - evicted = m.Append(bucket(9)) + // Append ledger 9 events, but all buckets are full, so we need to evict ledger 6 + evicted, err = m.Append(bucket(9)) + require.NoError(t, err) require.Equal(t, bucket(6), *evicted) require.Equal(t, uint32(3), m.Len()) require.Equal(t, bucket(7), *m.Get(0)) require.Equal(t, bucket(8), *m.Get(1)) require.Equal(t, bucket(9), *m.Get(2)) - // append ledger 10, but all buckets are full, so we need to evict ledger 7. + // Append ledger 10, but all buckets are full, so we need to evict ledger 7. // The start index must have wrapped around - evicted = m.Append(bucket(10)) + evicted, err = m.Append(bucket(10)) + require.NoError(t, err) require.Equal(t, bucket(7), *evicted) require.Equal(t, uint32(3), m.Len()) require.Equal(t, bucket(8), *m.Get(0)) require.Equal(t, bucket(9), *m.Get(1)) require.Equal(t, bucket(10), *m.Get(2)) } + +func TestAppendError(t *testing.T) { + m := NewLedgerBucketWindow[uint32](3) + require.Equal(t, uint32(0), m.Len()) + + evicted, err := m.Append(bucket(5)) + require.NoError(t, err) + require.Nil(t, evicted) + + evicted, err = m.Append(bucket(1)) + require.Error(t, err) + require.Contains(t, err.Error(), "error appending ledgers: ledgers not contiguous: expected ledger sequence 6 but received 1") +} diff --git a/cmd/soroban-rpc/internal/transactions/transactions.go b/cmd/soroban-rpc/internal/transactions/transactions.go index 8d58a035..5ed719ad 100644 --- a/cmd/soroban-rpc/internal/transactions/transactions.go +++ b/cmd/soroban-rpc/internal/transactions/transactions.go @@ -122,7 +122,10 @@ func (m *MemoryStore) IngestTransactions(ledgerCloseMeta xdr.LedgerCloseMeta) er m.lock.Lock() defer m.lock.Unlock() - evicted := m.transactionsByLedger.Append(bucket) + evicted, err := m.transactionsByLedger.Append(bucket) + if err != nil { + return err + } if evicted != nil { // garbage-collect evicted entries for _, evictedTxHash := range evicted.BucketContent { From e9005852d6e0ab8f5559aa5e40a50995d57efabb Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Wed, 31 Jan 2024 11:05:42 +0100 Subject: [PATCH 2/3] rpc: Reduce event memory footprint (#1183) 1. Get rid of the in-memory operation index (since it was always set to zero anyways) 2. Keep events serialized while in memory (it saves quite a bit of space due to the inneficient representation of unions in golang). (cherry picked from commit 433cd44dbc897c85fadf899448acbb31da62256d) --- cmd/soroban-rpc/internal/events/cursor.go | 2 + cmd/soroban-rpc/internal/events/events.go | 40 +++++---- .../internal/events/events_test.go | 82 +++++++++---------- 3 files changed, 61 insertions(+), 63 deletions(-) diff --git a/cmd/soroban-rpc/internal/events/cursor.go b/cmd/soroban-rpc/internal/events/cursor.go index 9f37b513..3fbfbecb 100644 --- a/cmd/soroban-rpc/internal/events/cursor.go +++ b/cmd/soroban-rpc/internal/events/cursor.go @@ -20,6 +20,8 @@ type Cursor struct { // Tx is the index of the transaction within the ledger which emitted the event. Tx uint32 // Op is the index of the operation within the transaction which emitted the event. + // Note: Currently, there is no use for it (events are transaction-wide and not operation-specific) + // but we keep it in order to make the API future-proof. Op uint32 // Event is the index of the event within in the operation which emitted the event. Event uint32 diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go index 0c5fdc83..e2c9030b 100644 --- a/cmd/soroban-rpc/internal/events/events.go +++ b/cmd/soroban-rpc/internal/events/events.go @@ -15,24 +15,16 @@ import ( "github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ledgerbucketwindow" ) -type bucket struct { - ledgerSeq uint32 - ledgerCloseTimestamp int64 - events []event -} - type event struct { - contents xdr.DiagnosticEvent - txIndex uint32 - opIndex uint32 - eventIndex uint32 + diagnosticEventXDR []byte + txIndex uint32 + eventIndex uint32 } func (e event) cursor(ledgerSeq uint32) Cursor { return Cursor{ Ledger: ledgerSeq, Tx: e.txIndex, - Op: e.opIndex, Event: e.eventIndex, } } @@ -129,7 +121,12 @@ func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor, if eventRange.End.Cmp(cur) <= 0 { return lastLedgerInWindow, nil } - if !f(event.contents, cur, timestamp) { + var diagnosticEvent xdr.DiagnosticEvent + err := xdr.SafeUnmarshal(event.diagnosticEventXDR, &diagnosticEvent) + if err != nil { + return 0, err + } + if !f(diagnosticEvent, cur, timestamp) { return lastLedgerInWindow, nil } } @@ -201,7 +198,9 @@ func (m *MemoryStore) IngestEvents(ledgerCloseMeta xdr.LedgerCloseMeta) error { BucketContent: events, } m.lock.Lock() - m.eventsByLedger.Append(bucket) + if _, err = m.eventsByLedger.Append(bucket); err != nil { + return err + } m.lock.Unlock() m.eventsDurationMetric.With(prometheus.Labels{"operation": "ingest"}). Observe(time.Since(startTime).Seconds()) @@ -241,15 +240,14 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) ( return nil, err } for index, e := range txEvents { + diagnosticEventXDR, err := e.MarshalBinary() + if err != nil { + return nil, err + } events = append(events, event{ - contents: e, - txIndex: tx.Index, - // NOTE: we cannot really index by operation since all events - // are provided as part of the transaction. However, - // that shouldn't matter in practice since a transaction - // can only contain a single Host Function Invocation. - opIndex: 0, - eventIndex: uint32(index), + diagnosticEventXDR: diagnosticEventXDR, + txIndex: tx.Index, + eventIndex: uint32(index), }) } } diff --git a/cmd/soroban-rpc/internal/events/events_test.go b/cmd/soroban-rpc/internal/events/events_test.go index 9f6a3fe0..fc4d3c6b 100644 --- a/cmd/soroban-rpc/internal/events/events_test.go +++ b/cmd/soroban-rpc/internal/events/events_test.go @@ -1,6 +1,7 @@ package events import ( + "bytes" "testing" "github.com/stellar/go/xdr" @@ -13,24 +14,24 @@ import ( var ( ledger5CloseTime = ledgerCloseTime(5) ledger5Events = []event{ - newEvent(1, 0, 0, 100), - newEvent(1, 0, 1, 200), - newEvent(2, 0, 0, 300), - newEvent(2, 1, 0, 400), + newEvent(1, 0, 100), + newEvent(1, 1, 200), + newEvent(2, 0, 300), + newEvent(2, 1, 400), } ledger6CloseTime = ledgerCloseTime(6) ledger6Events []event = nil ledger7CloseTime = ledgerCloseTime(7) ledger7Events = []event{ - newEvent(1, 0, 0, 500), + newEvent(1, 0, 500), } ledger8CloseTime = ledgerCloseTime(8) ledger8Events = []event{ - newEvent(1, 0, 0, 600), - newEvent(2, 0, 0, 700), - newEvent(2, 0, 1, 800), - newEvent(2, 0, 2, 900), - newEvent(2, 1, 0, 1000), + newEvent(1, 0, 600), + newEvent(2, 0, 700), + newEvent(2, 1, 800), + newEvent(2, 2, 900), + newEvent(2, 3, 1000), } ) @@ -38,43 +39,39 @@ func ledgerCloseTime(seq uint32) int64 { return int64(seq)*25 + 100 } -func newEvent(txIndex, opIndex, eventIndex, val uint32) event { +func newEvent(txIndex, eventIndex, val uint32) event { v := xdr.Uint32(val) - return event{ - contents: xdr.DiagnosticEvent{ - InSuccessfulContractCall: true, - Event: xdr.ContractEvent{ - Type: xdr.ContractEventTypeSystem, - Body: xdr.ContractEventBody{ - V: 0, - V0: &xdr.ContractEventV0{ - Data: xdr.ScVal{ - Type: xdr.ScValTypeScvU32, - U32: &v, - }, + + e := xdr.DiagnosticEvent{ + InSuccessfulContractCall: true, + Event: xdr.ContractEvent{ + Type: xdr.ContractEventTypeSystem, + Body: xdr.ContractEventBody{ + V: 0, + V0: &xdr.ContractEventV0{ + Data: xdr.ScVal{ + Type: xdr.ScValTypeScvU32, + U32: &v, }, }, }, }, - txIndex: txIndex, - opIndex: opIndex, - eventIndex: eventIndex, } -} - -func mustMarshal(e xdr.DiagnosticEvent) string { - result, err := xdr.MarshalBase64(e) + diagnosticEventXDR, err := e.MarshalBinary() if err != nil { panic(err) } - return result + return event{ + diagnosticEventXDR: diagnosticEventXDR, + txIndex: txIndex, + eventIndex: eventIndex, + } } func (e event) equals(other event) bool { return e.txIndex == other.txIndex && - e.opIndex == other.opIndex && e.eventIndex == other.eventIndex && - mustMarshal(e.contents) == mustMarshal(other.contents) + bytes.Equal(e.diagnosticEventXDR, other.diagnosticEventXDR) } func eventsAreEqual(t *testing.T, a, b []event) { @@ -291,7 +288,7 @@ func TestScan(t *testing.T) { }, { Range{ - Start: Cursor{Ledger: 5, Tx: 1, Op: 2}, + Start: Cursor{Ledger: 5, Tx: 2}, ClampStart: false, End: Cursor{Ledger: 9}, ClampEnd: false, @@ -327,7 +324,7 @@ func TestScan(t *testing.T) { }, { Range{ - Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0}, + Start: Cursor{Ledger: 8, Tx: 2, Event: 3}, ClampStart: false, End: MaxCursor, ClampEnd: true, @@ -336,7 +333,7 @@ func TestScan(t *testing.T) { }, { Range{ - Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0}, + Start: Cursor{Ledger: 8, Tx: 2, Event: 3}, ClampStart: false, End: Cursor{Ledger: 9}, ClampEnd: false, @@ -354,9 +351,9 @@ func TestScan(t *testing.T) { }, { Range{ - Start: Cursor{Ledger: 5, Tx: 1, Op: 2}, + Start: Cursor{Ledger: 5, Tx: 2}, ClampStart: false, - End: Cursor{Ledger: 8, Tx: 1, Op: 4}, + End: Cursor{Ledger: 8, Tx: 2}, ClampEnd: false, }, concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events[:1]), @@ -367,11 +364,12 @@ func TestScan(t *testing.T) { iterateAll := true f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool { require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp) + diagnosticEventXDR, err := contractEvent.MarshalBinary() + require.NoError(t, err) events = append(events, event{ - contents: contractEvent, - txIndex: cursor.Tx, - opIndex: cursor.Op, - eventIndex: cursor.Event, + diagnosticEventXDR: diagnosticEventXDR, + txIndex: cursor.Tx, + eventIndex: cursor.Event, }) return iterateAll } From ea95387a1558dd0ee46991a13ff0132fcef0c715 Mon Sep 17 00:00:00 2001 From: Alfonso Acosta Date: Thu, 1 Feb 2024 17:26:42 +0100 Subject: [PATCH 3/3] rpc: Store and serve the event transaction ID (#1185) (cherry picked from commit 0c77361264c08c4d785694a13fca903416b76650) --- cmd/soroban-rpc/internal/events/events.go | 10 ++++++-- .../internal/events/events_test.go | 5 ++-- .../internal/methods/get_events.go | 12 ++++++---- .../internal/methods/get_events_test.go | 24 ++++++++++++++----- 4 files changed, 37 insertions(+), 14 deletions(-) diff --git a/cmd/soroban-rpc/internal/events/events.go b/cmd/soroban-rpc/internal/events/events.go index e2c9030b..e854cd77 100644 --- a/cmd/soroban-rpc/internal/events/events.go +++ b/cmd/soroban-rpc/internal/events/events.go @@ -19,6 +19,7 @@ type event struct { diagnosticEventXDR []byte txIndex uint32 eventIndex uint32 + txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction) } func (e event) cursor(ledgerSeq uint32) Cursor { @@ -90,13 +91,15 @@ type Range struct { ClampEnd bool } +type ScanFunction func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool + // Scan applies f on all the events occurring in the given range. // The events are processed in sorted ascending Cursor order. // If f returns false, the scan terminates early (f will not be applied on // remaining events in the range). Note that a read lock is held for the // entire duration of the Scan function so f should be written in a way // to minimize latency. -func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor, int64) bool) (uint32, error) { +func (m *MemoryStore) Scan(eventRange Range, f ScanFunction) (uint32, error) { startTime := time.Now() m.lock.RLock() defer m.lock.RUnlock() @@ -126,7 +129,7 @@ func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor, if err != nil { return 0, err } - if !f(diagnosticEvent, cur, timestamp) { + if !f(diagnosticEvent, cur, timestamp, event.txHash) { return lastLedgerInWindow, nil } } @@ -235,10 +238,12 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) ( if !tx.Result.Successful() { continue } + txEvents, err := tx.GetDiagnosticEvents() if err != nil { return nil, err } + txHash := tx.Result.TransactionHash for index, e := range txEvents { diagnosticEventXDR, err := e.MarshalBinary() if err != nil { @@ -248,6 +253,7 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) ( diagnosticEventXDR: diagnosticEventXDR, txIndex: tx.Index, eventIndex: uint32(index), + txHash: &txHash, }) } } diff --git a/cmd/soroban-rpc/internal/events/events_test.go b/cmd/soroban-rpc/internal/events/events_test.go index fc4d3c6b..df9b3385 100644 --- a/cmd/soroban-rpc/internal/events/events_test.go +++ b/cmd/soroban-rpc/internal/events/events_test.go @@ -83,7 +83,7 @@ func eventsAreEqual(t *testing.T, a, b []event) { func TestScanRangeValidation(t *testing.T) { m := NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 4) - assertNoCalls := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, timestamp int64) bool { + assertNoCalls := func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool { t.Fatalf("unexpected call") return true } @@ -362,7 +362,7 @@ func TestScan(t *testing.T) { for _, input := range genEquivalentInputs(testCase.input) { var events []event iterateAll := true - f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool { + f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64, hash *xdr.Hash) bool { require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp) diagnosticEventXDR, err := contractEvent.MarshalBinary() require.NoError(t, err) @@ -370,6 +370,7 @@ func TestScan(t *testing.T) { diagnosticEventXDR: diagnosticEventXDR, txIndex: cursor.Tx, eventIndex: cursor.Event, + txHash: hash, }) return iterateAll } diff --git a/cmd/soroban-rpc/internal/methods/get_events.go b/cmd/soroban-rpc/internal/methods/get_events.go index e5bf3628..71cad1e9 100644 --- a/cmd/soroban-rpc/internal/methods/get_events.go +++ b/cmd/soroban-rpc/internal/methods/get_events.go @@ -76,6 +76,7 @@ type EventInfo struct { Topic []string `json:"topic"` Value string `json:"value"` InSuccessfulContractCall bool `json:"inSuccessfulContractCall"` + TransactionHash string `json:"txHash"` } type GetEventsRequest struct { @@ -299,7 +300,7 @@ type GetEventsResponse struct { } type eventScanner interface { - Scan(eventRange events.Range, f func(xdr.DiagnosticEvent, events.Cursor, int64) bool) (uint32, error) + Scan(eventRange events.Range, f events.ScanFunction) (uint32, error) } type eventsRPCHandler struct { @@ -334,6 +335,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse cursor events.Cursor ledgerCloseTimestamp int64 event xdr.DiagnosticEvent + txHash *xdr.Hash } var found []entry latestLedger, err := h.scanner.Scan( @@ -343,9 +345,9 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse End: events.MaxCursor, ClampEnd: true, }, - func(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCloseTimestamp int64) bool { + func(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCloseTimestamp int64, txHash *xdr.Hash) bool { if request.Matches(event) { - found = append(found, entry{cursor, ledgerCloseTimestamp, event}) + found = append(found, entry{cursor, ledgerCloseTimestamp, event, txHash}) } return uint(len(found)) < limit }, @@ -363,6 +365,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse entry.event, entry.cursor, time.Unix(entry.ledgerCloseTimestamp, 0).UTC().Format(time.RFC3339), + entry.txHash.HexString(), ) if err != nil { return GetEventsResponse{}, errors.Wrap(err, "could not parse event") @@ -375,7 +378,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse }, nil } -func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerClosedAt string) (EventInfo, error) { +func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerClosedAt string, txHash string) (EventInfo, error) { v0, ok := event.Event.Body.GetV0() if !ok { return EventInfo{}, errors.New("unknown event version") @@ -411,6 +414,7 @@ func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCl Topic: topic, Value: data, InSuccessfulContractCall: event.InSuccessfulContractCall, + TransactionHash: txHash, } if event.Event.ContractId != nil { info.ContractID = strkey.MustEncode(strkey.VersionByteContract, (*event.Event.ContractId)[:]) diff --git a/cmd/soroban-rpc/internal/methods/get_events_test.go b/cmd/soroban-rpc/internal/methods/get_events_test.go index 4d15e2c0..5d1b929c 100644 --- a/cmd/soroban-rpc/internal/methods/get_events_test.go +++ b/cmd/soroban-rpc/internal/methods/get_events_test.go @@ -591,7 +591,8 @@ func TestGetEvents(t *testing.T) { ), )) } - assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) + ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) + assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) handler := eventsRPCHandler{ scanner: store, @@ -626,6 +627,7 @@ func TestGetEvents(t *testing.T) { Topic: []string{value}, Value: value, InSuccessfulContractCall: true, + TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(), }) } assert.Equal(t, GetEventsResponse{expected, 1}, results) @@ -699,7 +701,8 @@ func TestGetEvents(t *testing.T) { ), )) } - assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) + ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) + assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) number := xdr.Uint64(4) handler := eventsRPCHandler{ @@ -738,6 +741,7 @@ func TestGetEvents(t *testing.T) { Topic: []string{counterXdr, value}, Value: value, InSuccessfulContractCall: true, + TransactionHash: ledgerCloseMeta.TransactionHash(4).HexString(), }, } assert.Equal(t, GetEventsResponse{expected, 1}, results) @@ -792,7 +796,8 @@ func TestGetEvents(t *testing.T) { ), ), } - assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) + ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) + assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) handler := eventsRPCHandler{ scanner: store, @@ -832,6 +837,7 @@ func TestGetEvents(t *testing.T) { Topic: []string{counterXdr, value}, Value: value, InSuccessfulContractCall: true, + TransactionHash: ledgerCloseMeta.TransactionHash(3).HexString(), }, } assert.Equal(t, GetEventsResponse{expected, 1}, results) @@ -865,7 +871,8 @@ func TestGetEvents(t *testing.T) { ), ), } - assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) + ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) + assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) handler := eventsRPCHandler{ scanner: store, @@ -892,6 +899,7 @@ func TestGetEvents(t *testing.T) { Topic: []string{counterXdr}, Value: counterXdr, InSuccessfulContractCall: true, + TransactionHash: ledgerCloseMeta.TransactionHash(0).HexString(), }, } assert.Equal(t, GetEventsResponse{expected, 1}, results) @@ -913,7 +921,8 @@ func TestGetEvents(t *testing.T) { ), )) } - assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...))) + ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...) + assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) handler := eventsRPCHandler{ scanner: store, @@ -947,6 +956,7 @@ func TestGetEvents(t *testing.T) { Topic: []string{value}, Value: value, InSuccessfulContractCall: true, + TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(), }) } assert.Equal(t, GetEventsResponse{expected, 1}, results) @@ -996,7 +1006,8 @@ func TestGetEvents(t *testing.T) { ), ), } - assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(5, now.Unix(), txMeta...))) + ledgerCloseMeta := ledgerCloseMetaWithEvents(5, now.Unix(), txMeta...) + assert.NoError(t, store.IngestEvents(ledgerCloseMeta)) id := &events.Cursor{Ledger: 5, Tx: 1, Op: 0, Event: 0} handler := eventsRPCHandler{ @@ -1031,6 +1042,7 @@ func TestGetEvents(t *testing.T) { Topic: []string{counterXdr}, Value: expectedXdr, InSuccessfulContractCall: true, + TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(), }) } assert.Equal(t, GetEventsResponse{expected, 5}, results)