Skip to content

Commit

Permalink
soroban-rpc: Ingest diagnostic events (stellar#536)
Browse files Browse the repository at this point in the history
  • Loading branch information
tamirms authored Mar 27, 2023
1 parent 9b982d9 commit fda19d9
Show file tree
Hide file tree
Showing 6 changed files with 324 additions and 88 deletions.
6 changes: 3 additions & 3 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type bucket struct {
}

type event struct {
contents xdr.ContractEvent
contents xdr.DiagnosticEvent
txIndex uint32
opIndex uint32
eventIndex uint32
Expand Down Expand Up @@ -81,7 +81,7 @@ type Range struct {
// remaining events in the range). Note that a read lock is held for the
// entire duration of the Scan function so f should be written in a way
// to minimize latency.
func (m *MemoryStore) Scan(eventRange Range, f func(xdr.ContractEvent, Cursor, int64) bool) (uint32, error) {
func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor, int64) bool) (uint32, error) {
m.lock.RLock()
defer m.lock.RUnlock()

Expand Down Expand Up @@ -208,7 +208,7 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
}
for i := range tx.Envelope.Operations() {
opIndex := uint32(i)
var opEvents []xdr.ContractEvent
var opEvents []xdr.DiagnosticEvent
opEvents, err = tx.GetOperationEvents(opIndex)
if err != nil {
return
Expand Down
25 changes: 14 additions & 11 deletions cmd/soroban-rpc/internal/events/events_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,14 +41,17 @@ func ledgerCloseTime(seq uint32) int64 {
func newEvent(txIndex, opIndex, eventIndex, val uint32) event {
v := xdr.Uint32(val)
return event{
contents: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
contents: xdr.DiagnosticEvent{
InSuccessfulContractCall: true,
Event: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
},
},
},
},
Expand All @@ -59,7 +62,7 @@ func newEvent(txIndex, opIndex, eventIndex, val uint32) event {
}
}

func mustMarshal(e xdr.ContractEvent) string {
func mustMarshal(e xdr.DiagnosticEvent) string {
result, err := xdr.MarshalBase64(e)
if err != nil {
panic(err)
Expand All @@ -83,7 +86,7 @@ func eventsAreEqual(t *testing.T, a, b []event) {

func TestScanRangeValidation(t *testing.T) {
m := NewMemoryStore("unit-tests", 4)
assertNoCalls := func(contractEvent xdr.ContractEvent, cursor Cursor, timestamp int64) bool {
assertNoCalls := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, timestamp int64) bool {
t.Fatalf("unexpected call")
return true
}
Expand Down Expand Up @@ -362,7 +365,7 @@ func TestScan(t *testing.T) {
for _, input := range genEquivalentInputs(testCase.input) {
var events []event
iterateAll := true
f := func(contractEvent xdr.ContractEvent, cursor Cursor, ledgerCloseTimestamp int64) bool {
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool {
require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp)
events = append(events, event{
contents: contractEvent,
Expand Down
124 changes: 86 additions & 38 deletions cmd/soroban-rpc/internal/methods/get_events.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/hex"
"encoding/json"
"fmt"
"strings"
"time"

"github.com/creachadair/jrpc2"
Expand All @@ -17,15 +18,65 @@ import (
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/events"
)

type eventTypeSet map[string]interface{}

func (e eventTypeSet) valid() error {
for key := range e {
switch key {
case EventTypeSystem, EventTypeContract, EventTypeDiagnostic:
// ok
default:
return errors.New("if set, type must be either 'system', 'contract' or 'diagnostic'")
}
}
return nil
}

func (e *eventTypeSet) UnmarshalJSON(data []byte) error {
if len(data) == 0 {
*e = map[string]interface{}{}
return nil
}
var joined string
if err := json.Unmarshal(data, &joined); err != nil {
return err
}
*e = map[string]interface{}{}
if len(joined) == 0 {
return nil
}
for _, key := range strings.Split(joined, ",") {
(*e)[key] = nil
}
return nil
}

func (e eventTypeSet) MarshalJSON() ([]byte, error) {
var keys []string
for key := range e {
keys = append(keys, key)
}
return json.Marshal(strings.Join(keys, ","))
}

func (e eventTypeSet) matches(event xdr.ContractEvent) bool {
if len(e) == 0 {
return true
}
_, ok := e[eventTypeFromXDR[event.Type]]
return ok
}

type EventInfo struct {
EventType string `json:"type"`
Ledger int32 `json:"ledger,string"`
LedgerClosedAt string `json:"ledgerClosedAt"`
ContractID string `json:"contractId"`
ID string `json:"id"`
PagingToken string `json:"pagingToken"`
Topic []string `json:"topic"`
Value EventInfoValue `json:"value"`
EventType string `json:"type"`
Ledger int32 `json:"ledger,string"`
LedgerClosedAt string `json:"ledgerClosedAt"`
ContractID string `json:"contractId"`
ID string `json:"id"`
PagingToken string `json:"pagingToken"`
Topic []string `json:"topic"`
Value EventInfoValue `json:"value"`
InSuccessfulContractCall bool `json:"inSuccessfulContractCall"`
}

type EventInfoValue struct {
Expand Down Expand Up @@ -65,7 +116,7 @@ func (g *GetEventsRequest) Valid(maxLimit uint) error {
return nil
}

func (g *GetEventsRequest) Matches(event xdr.ContractEvent) bool {
func (g *GetEventsRequest) Matches(event xdr.DiagnosticEvent) bool {
if len(g.Filters) == 0 {
return true
}
Expand All @@ -88,17 +139,14 @@ var eventTypeFromXDR = map[xdr.ContractEventType]string{
}

type EventFilter struct {
EventType string `json:"type,omitempty"`
EventType eventTypeSet `json:"type,omitempty"`
ContractIDs []string `json:"contractIds,omitempty"`
Topics []TopicFilter `json:"topics,omitempty"`
}

func (e *EventFilter) Valid() error {
switch e.EventType {
case "", EventTypeSystem, EventTypeContract, EventTypeDiagnostic:
// ok
default:
return errors.New("if set, type must be either 'system', 'contract' or 'diagnostic'")
if err := e.EventType.valid(); err != nil {
return errors.Wrap(err, "filter type invalid")
}
if len(e.ContractIDs) > 5 {
return errors.New("maximum 5 contract IDs per filter")
Expand All @@ -120,12 +168,8 @@ func (e *EventFilter) Valid() error {
return nil
}

func (e *EventFilter) Matches(event xdr.ContractEvent) bool {
return e.matchesEventType(event) && e.matchesContractIDs(event) && e.matchesTopics(event)
}

func (e *EventFilter) matchesEventType(event xdr.ContractEvent) bool {
return e.EventType == "" || e.EventType == eventTypeFromXDR[event.Type]
func (e *EventFilter) Matches(event xdr.DiagnosticEvent) bool {
return e.EventType.matches(event.Event) && e.matchesContractIDs(event.Event) && e.matchesTopics(event.Event)
}

func (e *EventFilter) matchesContractIDs(event xdr.ContractEvent) bool {
Expand Down Expand Up @@ -260,7 +304,7 @@ type GetEventsResponse struct {
}

type eventScanner interface {
Scan(eventRange events.Range, f func(xdr.ContractEvent, events.Cursor, int64) bool) (uint32, error)
Scan(eventRange events.Range, f func(xdr.DiagnosticEvent, events.Cursor, int64) bool) (uint32, error)
}

type eventsRPCHandler struct {
Expand Down Expand Up @@ -294,7 +338,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
type entry struct {
cursor events.Cursor
ledgerCloseTimestamp int64
event xdr.ContractEvent
event xdr.DiagnosticEvent
}
var found []entry
latestLedger, err := h.scanner.Scan(
Expand All @@ -304,7 +348,7 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
End: events.MaxCursor,
ClampEnd: true,
},
func(event xdr.ContractEvent, cursor events.Cursor, ledgerCloseTimestamp int64) bool {
func(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerCloseTimestamp int64) bool {
if request.Matches(event) {
found = append(found, entry{cursor, ledgerCloseTimestamp, event})
}
Expand Down Expand Up @@ -336,15 +380,15 @@ func (h eventsRPCHandler) getEvents(request GetEventsRequest) (GetEventsResponse
}, nil
}

func eventInfoForEvent(event xdr.ContractEvent, cursor events.Cursor, ledgerClosedAt string) (EventInfo, error) {
v0, ok := event.Body.GetV0()
func eventInfoForEvent(event xdr.DiagnosticEvent, cursor events.Cursor, ledgerClosedAt string) (EventInfo, error) {
v0, ok := event.Event.Body.GetV0()
if !ok {
return EventInfo{}, errors.New("unknown event version")
}

eventType, ok := eventTypeFromXDR[event.Type]
eventType, ok := eventTypeFromXDR[event.Event.Type]
if !ok {
return EventInfo{}, fmt.Errorf("unknown XDR ContractEventType type: %d", event.Type)
return EventInfo{}, fmt.Errorf("unknown XDR ContractEventType type: %d", event.Event.Type)
}

// base64-xdr encode the topic
Expand All @@ -363,16 +407,20 @@ func eventInfoForEvent(event xdr.ContractEvent, cursor events.Cursor, ledgerClos
return EventInfo{}, err
}

return EventInfo{
EventType: eventType,
Ledger: int32(cursor.Ledger),
LedgerClosedAt: ledgerClosedAt,
ContractID: hex.EncodeToString((*event.ContractId)[:]),
ID: cursor.String(),
PagingToken: cursor.String(),
Topic: topic,
Value: EventInfoValue{XDR: data},
}, nil
info := EventInfo{
EventType: eventType,
Ledger: int32(cursor.Ledger),
LedgerClosedAt: ledgerClosedAt,
ID: cursor.String(),
PagingToken: cursor.String(),
Topic: topic,
Value: EventInfoValue{XDR: data},
InSuccessfulContractCall: event.InSuccessfulContractCall,
}
if event.Event.ContractId != nil {
info.ContractID = hex.EncodeToString((*event.Event.ContractId)[:])
}
return info, nil
}

// NewGetEventsHandler returns a json rpc handler to fetch and filter events
Expand Down
Loading

0 comments on commit fda19d9

Please sign in to comment.