Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rpc: Store and serve the event transaction ID #1185

Merged
merged 1 commit into from
Feb 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 8 additions & 2 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ type event struct {
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
}

func (e event) cursor(ledgerSeq uint32) Cursor {
Expand Down Expand Up @@ -90,13 +91,15 @@ type Range struct {
ClampEnd bool
}

type ScanFunction func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool

// Scan applies f on all the events occurring in the given range.
// The events are processed in sorted ascending Cursor order.
// If f returns false, the scan terminates early (f will not be applied on
// remaining events in the range). Note that a read lock is held for the
// entire duration of the Scan function so f should be written in a way
// to minimize latency.
func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor, int64) bool) (uint32, error) {
func (m *MemoryStore) Scan(eventRange Range, f ScanFunction) (uint32, error) {
startTime := time.Now()
m.lock.RLock()
defer m.lock.RUnlock()
Expand Down Expand Up @@ -126,7 +129,7 @@ func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor,
if err != nil {
return 0, err
}
if !f(diagnosticEvent, cur, timestamp) {
if !f(diagnosticEvent, cur, timestamp, event.txHash) {
return lastLedgerInWindow, nil
}
}
Expand Down Expand Up @@ -235,10 +238,12 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
if !tx.Result.Successful() {
continue
}

txEvents, err := tx.GetDiagnosticEvents()
if err != nil {
return nil, err
}
txHash := tx.Result.TransactionHash
for index, e := range txEvents {
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
Expand All @@ -248,6 +253,7 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
diagnosticEventXDR: diagnosticEventXDR,
txIndex: tx.Index,
eventIndex: uint32(index),
txHash: &txHash,
})
}
}
Expand Down
5 changes: 3 additions & 2 deletions cmd/soroban-rpc/internal/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func eventsAreEqual(t *testing.T, a, b []event) {

func TestScanRangeValidation(t *testing.T) {
m := NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 4)
assertNoCalls := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, timestamp int64) bool {
assertNoCalls := func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool {
t.Fatalf("unexpected call")
return true
}
Expand Down Expand Up @@ -362,14 +362,15 @@ func TestScan(t *testing.T) {
for _, input := range genEquivalentInputs(testCase.input) {
var events []event
iterateAll := true
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool {
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64, hash *xdr.Hash) bool {
require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp)
diagnosticEventXDR, err := contractEvent.MarshalBinary()
require.NoError(t, err)
events = append(events, event{
diagnosticEventXDR: diagnosticEventXDR,
txIndex: cursor.Tx,
eventIndex: cursor.Event,
txHash: hash,
})
return iterateAll
}
Expand Down
12 changes: 8 additions & 4 deletions cmd/soroban-rpc/internal/methods/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ type EventInfo struct {
Topic []string `json:"topic"`
Value string `json:"value"`
InSuccessfulContractCall bool `json:"inSuccessfulContractCall"`
TransactionHash string `json:"txHash"`
}

type GetEventsRequest struct {
Expand Down Expand Up @@ -299,7 +300,7 @@ type GetEventsResponse struct {
}

type eventScanner interface {
Scan(eventRange events.Range, f func(xdr.DiagnosticEvent, events.Cursor, int64) bool) (uint32, error)
Scan(eventRange events.Range, f events.ScanFunction) (uint32, error)
}

type eventsRPCHandler struct {
Expand Down Expand Up @@ -334,6 +335,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
cursor events.Cursor
ledgerCloseTimestamp int64
event xdr.DiagnosticEvent
txHash *xdr.Hash
}
var found []entry
latestLedger, err := h.scanner.Scan(
Expand All @@ -343,9 +345,9 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
End: events.MaxCursor,
ClampEnd: true,
},
func(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCloseTimestamp int64) bool {
func(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCloseTimestamp int64, txHash *xdr.Hash) bool {
if request.Matches(event) {
found = append(found, entry{cursor, ledgerCloseTimestamp, event})
found = append(found, entry{cursor, ledgerCloseTimestamp, event, txHash})
}
return uint(len(found)) < limit
},
Expand All @@ -363,6 +365,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
entry.event,
entry.cursor,
time.Unix(entry.ledgerCloseTimestamp, 0).UTC().Format(time.RFC3339),
entry.txHash.HexString(),
)
if err != nil {
return GetEventsResponse{}, errors.Wrap(err, "could not parse event")
Expand All @@ -375,7 +378,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
}, nil
}

func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerClosedAt string) (EventInfo, error) {
func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerClosedAt string, txHash string) (EventInfo, error) {
v0, ok := event.Event.Body.GetV0()
if !ok {
return EventInfo{}, errors.New("unknown event version")
Expand Down Expand Up @@ -411,6 +414,7 @@ func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCl
Topic: topic,
Value: data,
InSuccessfulContractCall: event.InSuccessfulContractCall,
TransactionHash: txHash,
}
if event.Event.ContractId != nil {
info.ContractID = strkey.MustEncode(strkey.VersionByteContract, (*event.Event.ContractId)[:])
Expand Down
24 changes: 18 additions & 6 deletions cmd/soroban-rpc/internal/methods/get_events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -591,7 +591,8 @@ func TestGetEvents(t *testing.T) {
),
))
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

handler := eventsRPCHandler{
scanner: store,
Expand Down Expand Up @@ -626,6 +627,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{value},
Value: value,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(),
})
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand Down Expand Up @@ -699,7 +701,8 @@ func TestGetEvents(t *testing.T) {
),
))
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

number := xdr.Uint64(4)
handler := eventsRPCHandler{
Expand Down Expand Up @@ -738,6 +741,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{counterXdr, value},
Value: value,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(4).HexString(),
},
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand Down Expand Up @@ -792,7 +796,8 @@ func TestGetEvents(t *testing.T) {
),
),
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

handler := eventsRPCHandler{
scanner: store,
Expand Down Expand Up @@ -832,6 +837,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{counterXdr, value},
Value: value,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(3).HexString(),
},
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand Down Expand Up @@ -865,7 +871,8 @@ func TestGetEvents(t *testing.T) {
),
),
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

handler := eventsRPCHandler{
scanner: store,
Expand All @@ -892,6 +899,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{counterXdr},
Value: counterXdr,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(0).HexString(),
},
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand All @@ -913,7 +921,8 @@ func TestGetEvents(t *testing.T) {
),
))
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(1, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

handler := eventsRPCHandler{
scanner: store,
Expand Down Expand Up @@ -947,6 +956,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{value},
Value: value,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(),
})
}
assert.Equal(t, GetEventsResponse{expected, 1}, results)
Expand Down Expand Up @@ -996,7 +1006,8 @@ func TestGetEvents(t *testing.T) {
),
),
}
assert.NoError(t, store.IngestEvents(ledgerCloseMetaWithEvents(5, now.Unix(), txMeta...)))
ledgerCloseMeta := ledgerCloseMetaWithEvents(5, now.Unix(), txMeta...)
assert.NoError(t, store.IngestEvents(ledgerCloseMeta))

id := &events.Cursor{Ledger: 5, Tx: 1, Op: 0, Event: 0}
handler := eventsRPCHandler{
Expand Down Expand Up @@ -1031,6 +1042,7 @@ func TestGetEvents(t *testing.T) {
Topic: []string{counterXdr},
Value: expectedXdr,
InSuccessfulContractCall: true,
TransactionHash: ledgerCloseMeta.TransactionHash(i).HexString(),
})
}
assert.Equal(t, GetEventsResponse{expected, 5}, results)
Expand Down
Loading