Skip to content

Commit

Permalink
rpc: Reduce event memory footprint (#1183)
Browse files Browse the repository at this point in the history
1. Get rid of the in-memory operation index (since it was always set to zero anyways)
2. Keep events serialized while in memory (it saves quite a bit of space due to the inneficient representation of unions in golang).
  • Loading branch information
2opremio authored Jan 31, 2024
1 parent b6671e2 commit 433cd44
Show file tree
Hide file tree
Showing 3 changed files with 61 additions and 63 deletions.
2 changes: 2 additions & 0 deletions cmd/soroban-rpc/internal/events/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Cursor struct {
// Tx is the index of the transaction within the ledger which emitted the event.
Tx uint32
// Op is the index of the operation within the transaction which emitted the event.
// Note: Currently, there is no use for it (events are transaction-wide and not operation-specific)
// but we keep it in order to make the API future-proof.
Op uint32
// Event is the index of the event within in the operation which emitted the event.
Event uint32
Expand Down
40 changes: 19 additions & 21 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,16 @@ import (
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

type bucket struct {
ledgerSeq uint32
ledgerCloseTimestamp int64
events []event
}

type event struct {
contents xdr.DiagnosticEvent
txIndex uint32
opIndex uint32
eventIndex uint32
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
}

func (e event) cursor(ledgerSeq uint32) Cursor {
return Cursor{
Ledger: ledgerSeq,
Tx: e.txIndex,
Op: e.opIndex,
Event: e.eventIndex,
}
}
Expand Down Expand Up @@ -129,7 +121,12 @@ func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor,
if eventRange.End.Cmp(cur) <= 0 {
return lastLedgerInWindow, nil
}
if !f(event.contents, cur, timestamp) {
var diagnosticEvent xdr.DiagnosticEvent
err := xdr.SafeUnmarshal(event.diagnosticEventXDR, &diagnosticEvent)
if err != nil {
return 0, err
}
if !f(diagnosticEvent, cur, timestamp) {
return lastLedgerInWindow, nil
}
}
Expand Down Expand Up @@ -201,7 +198,9 @@ func (m *MemoryStore) IngestEvents(ledgerCloseMeta xdr.LedgerCloseMeta) error {
BucketContent: events,
}
m.lock.Lock()
m.eventsByLedger.Append(bucket)
if _, err = m.eventsByLedger.Append(bucket); err != nil {
return err
}
m.lock.Unlock()
m.eventsDurationMetric.With(prometheus.Labels{"operation": "ingest"}).
Observe(time.Since(startTime).Seconds())
Expand Down Expand Up @@ -241,15 +240,14 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
return nil, err
}
for index, e := range txEvents {
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
return nil, err
}
events = append(events, event{
contents: e,
txIndex: tx.Index,
// NOTE: we cannot really index by operation since all events
// are provided as part of the transaction. However,
// that shouldn't matter in practice since a transaction
// can only contain a single Host Function Invocation.
opIndex: 0,
eventIndex: uint32(index),
diagnosticEventXDR: diagnosticEventXDR,
txIndex: tx.Index,
eventIndex: uint32(index),
})
}
}
Expand Down
82 changes: 40 additions & 42 deletions cmd/soroban-rpc/internal/events/events_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"bytes"
"testing"

"github.com/stellar/go/xdr"
Expand All @@ -13,68 +14,64 @@ import (
var (
ledger5CloseTime = ledgerCloseTime(5)
ledger5Events = []event{
newEvent(1, 0, 0, 100),
newEvent(1, 0, 1, 200),
newEvent(2, 0, 0, 300),
newEvent(2, 1, 0, 400),
newEvent(1, 0, 100),
newEvent(1, 1, 200),
newEvent(2, 0, 300),
newEvent(2, 1, 400),
}
ledger6CloseTime = ledgerCloseTime(6)
ledger6Events []event = nil
ledger7CloseTime = ledgerCloseTime(7)
ledger7Events = []event{
newEvent(1, 0, 0, 500),
newEvent(1, 0, 500),
}
ledger8CloseTime = ledgerCloseTime(8)
ledger8Events = []event{
newEvent(1, 0, 0, 600),
newEvent(2, 0, 0, 700),
newEvent(2, 0, 1, 800),
newEvent(2, 0, 2, 900),
newEvent(2, 1, 0, 1000),
newEvent(1, 0, 600),
newEvent(2, 0, 700),
newEvent(2, 1, 800),
newEvent(2, 2, 900),
newEvent(2, 3, 1000),
}
)

func ledgerCloseTime(seq uint32) int64 {
return int64(seq)*25 + 100
}

func newEvent(txIndex, opIndex, eventIndex, val uint32) event {
func newEvent(txIndex, eventIndex, val uint32) event {
v := xdr.Uint32(val)
return event{
contents: xdr.DiagnosticEvent{
InSuccessfulContractCall: true,
Event: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
},

e := xdr.DiagnosticEvent{
InSuccessfulContractCall: true,
Event: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
},
},
},
},
txIndex: txIndex,
opIndex: opIndex,
eventIndex: eventIndex,
}
}

func mustMarshal(e xdr.DiagnosticEvent) string {
result, err := xdr.MarshalBase64(e)
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
panic(err)
}
return result
return event{
diagnosticEventXDR: diagnosticEventXDR,
txIndex: txIndex,
eventIndex: eventIndex,
}
}

func (e event) equals(other event) bool {
return e.txIndex == other.txIndex &&
e.opIndex == other.opIndex &&
e.eventIndex == other.eventIndex &&
mustMarshal(e.contents) == mustMarshal(other.contents)
bytes.Equal(e.diagnosticEventXDR, other.diagnosticEventXDR)
}

func eventsAreEqual(t *testing.T, a, b []event) {
Expand Down Expand Up @@ -291,7 +288,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 5, Tx: 1, Op: 2},
Start: Cursor{Ledger: 5, Tx: 2},
ClampStart: false,
End: Cursor{Ledger: 9},
ClampEnd: false,
Expand Down Expand Up @@ -327,7 +324,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0},
Start: Cursor{Ledger: 8, Tx: 2, Event: 3},
ClampStart: false,
End: MaxCursor,
ClampEnd: true,
Expand All @@ -336,7 +333,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0},
Start: Cursor{Ledger: 8, Tx: 2, Event: 3},
ClampStart: false,
End: Cursor{Ledger: 9},
ClampEnd: false,
Expand All @@ -354,9 +351,9 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 5, Tx: 1, Op: 2},
Start: Cursor{Ledger: 5, Tx: 2},
ClampStart: false,
End: Cursor{Ledger: 8, Tx: 1, Op: 4},
End: Cursor{Ledger: 8, Tx: 2},
ClampEnd: false,
},
concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events[:1]),
Expand All @@ -367,11 +364,12 @@ func TestScan(t *testing.T) {
iterateAll := true
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool {
require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp)
diagnosticEventXDR, err := contractEvent.MarshalBinary()
require.NoError(t, err)
events = append(events, event{
contents: contractEvent,
txIndex: cursor.Tx,
opIndex: cursor.Op,
eventIndex: cursor.Event,
diagnosticEventXDR: diagnosticEventXDR,
txIndex: cursor.Tx,
eventIndex: cursor.Event,
})
return iterateAll
}
Expand Down

0 comments on commit 433cd44

Please sign in to comment.