Skip to content

Commit

Permalink
Merge pull request #55 from stellar/soroban-tools-rpc
Browse files Browse the repository at this point in the history
soroban-rpc: Pull in Recent Soroban RPC changes from soroban-tools
  • Loading branch information
stellarsaur authored Feb 7, 2024
2 parents dd97dad + ea95387 commit 97ab094
Show file tree
Hide file tree
Showing 12 changed files with 170 additions and 166 deletions.
4 changes: 1 addition & 3 deletions cmd/soroban-rpc/internal/config/config_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func (o *ConfigOption) setValue(i interface{}) (err error) {
if o.CustomSetValue != nil {
return o.CustomSetValue(o, i)
}
// it's unfortunate that Set below panics when it cannot set the value..
// we'll want to catch this so that we can alert the user nicely.
defer func() {
if recoverRes := recover(); recoverRes != nil {
var ok bool
Expand All @@ -101,7 +99,7 @@ func (o *ConfigOption) setValue(i interface{}) (err error) {
}
}()
parser := func(option *ConfigOption, i interface{}) error {
panic(fmt.Sprintf("no parser for flag %s", o.Name))
return errors.Errorf("no parser for flag %s", o.Name)
}
switch o.ConfigKey.(type) {
case *bool:
Expand Down
11 changes: 11 additions & 0 deletions cmd/soroban-rpc/internal/config/config_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -115,6 +116,16 @@ func TestUnassignableField(t *testing.T) {
require.Contains(t, err.Error(), co.Name)
}

func TestNoParserForFlag(t *testing.T) {
var co ConfigOption
var invalidKey []time.Duration
co.Name = "mykey"
co.ConfigKey = &invalidKey
err := co.setValue("abc")
require.Error(t, err)
require.Contains(t, err.Error(), "no parser for flag mykey")
}

func TestSetValue(t *testing.T) {
var b bool
var i int
Expand Down
12 changes: 7 additions & 5 deletions cmd/soroban-rpc/internal/config/log_format.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import "fmt"
import (
"fmt"
)

type LogFormat int

Expand Down Expand Up @@ -47,13 +49,13 @@ func (f *LogFormat) UnmarshalTOML(i interface{}) error {
}
}

func (f LogFormat) String() string {
func (f LogFormat) String() (string, error) {
switch f {
case LogFormatText:
return "text"
return "text", nil
case LogFormatJSON:
return "json"
return "json", nil
default:
panic(fmt.Sprintf("unknown log format: %d", f))
return "", fmt.Errorf("unknown log format: %d", f)
}
}
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (cfg *Config) options() ConfigOptions {
return nil
},
MarshalTOML: func(option *ConfigOption) (interface{}, error) {
return cfg.LogFormat.String(), nil
return cfg.LogFormat.String()
},
},
{
Expand Down
2 changes: 2 additions & 0 deletions cmd/soroban-rpc/internal/events/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Cursor struct {
// Tx is the index of the transaction within the ledger which emitted the event.
Tx uint32
// Op is the index of the operation within the transaction which emitted the event.
// Note: Currently, there is no use for it (events are transaction-wide and not operation-specific)
// but we keep it in order to make the API future-proof.
Op uint32
// Event is the index of the event within in the operation which emitted the event.
Event uint32
Expand Down
48 changes: 26 additions & 22 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,17 @@ import (
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

type bucket struct {
ledgerSeq uint32
ledgerCloseTimestamp int64
events []event
}

type event struct {
contents xdr.DiagnosticEvent
txIndex uint32
opIndex uint32
eventIndex uint32
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
}

func (e event) cursor(ledgerSeq uint32) Cursor {
return Cursor{
Ledger: ledgerSeq,
Tx: e.txIndex,
Op: e.opIndex,
Event: e.eventIndex,
}
}
Expand Down Expand Up @@ -98,13 +91,15 @@ type Range struct {
ClampEnd bool
}

type ScanFunction func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool

// Scan applies f on all the events occurring in the given range.
// The events are processed in sorted ascending Cursor order.
// If f returns false, the scan terminates early (f will not be applied on
// remaining events in the range). Note that a read lock is held for the
// entire duration of the Scan function so f should be written in a way
// to minimize latency.
func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor, int64) bool) (uint32, error) {
func (m *MemoryStore) Scan(eventRange Range, f ScanFunction) (uint32, error) {
startTime := time.Now()
m.lock.RLock()
defer m.lock.RUnlock()
Expand All @@ -129,7 +124,12 @@ func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor,
if eventRange.End.Cmp(cur) <= 0 {
return lastLedgerInWindow, nil
}
if !f(event.contents, cur, timestamp) {
var diagnosticEvent xdr.DiagnosticEvent
err := xdr.SafeUnmarshal(event.diagnosticEventXDR, &diagnosticEvent)
if err != nil {
return 0, err
}
if !f(diagnosticEvent, cur, timestamp, event.txHash) {
return lastLedgerInWindow, nil
}
}
Expand Down Expand Up @@ -201,7 +201,9 @@ func (m *MemoryStore) IngestEvents(ledgerCloseMeta xdr.LedgerCloseMeta) error {
BucketContent: events,
}
m.lock.Lock()
m.eventsByLedger.Append(bucket)
if _, err = m.eventsByLedger.Append(bucket); err != nil {
return err
}
m.lock.Unlock()
m.eventsDurationMetric.With(prometheus.Labels{"operation": "ingest"}).
Observe(time.Since(startTime).Seconds())
Expand Down Expand Up @@ -236,20 +238,22 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
if !tx.Result.Successful() {
continue
}

txEvents, err := tx.GetDiagnosticEvents()
if err != nil {
return nil, err
}
txHash := tx.Result.TransactionHash
for index, e := range txEvents {
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
return nil, err
}
events = append(events, event{
contents: e,
txIndex: tx.Index,
// NOTE: we cannot really index by operation since all events
// are provided as part of the transaction. However,
// that shouldn't matter in practice since a transaction
// can only contain a single Host Function Invocation.
opIndex: 0,
eventIndex: uint32(index),
diagnosticEventXDR: diagnosticEventXDR,
txIndex: tx.Index,
eventIndex: uint32(index),
txHash: &txHash,
})
}
}
Expand Down
87 changes: 43 additions & 44 deletions cmd/soroban-rpc/internal/events/events_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"bytes"
"testing"

"github.com/stellar/go/xdr"
Expand All @@ -13,68 +14,64 @@ import (
var (
ledger5CloseTime = ledgerCloseTime(5)
ledger5Events = []event{
newEvent(1, 0, 0, 100),
newEvent(1, 0, 1, 200),
newEvent(2, 0, 0, 300),
newEvent(2, 1, 0, 400),
newEvent(1, 0, 100),
newEvent(1, 1, 200),
newEvent(2, 0, 300),
newEvent(2, 1, 400),
}
ledger6CloseTime = ledgerCloseTime(6)
ledger6Events []event = nil
ledger7CloseTime = ledgerCloseTime(7)
ledger7Events = []event{
newEvent(1, 0, 0, 500),
newEvent(1, 0, 500),
}
ledger8CloseTime = ledgerCloseTime(8)
ledger8Events = []event{
newEvent(1, 0, 0, 600),
newEvent(2, 0, 0, 700),
newEvent(2, 0, 1, 800),
newEvent(2, 0, 2, 900),
newEvent(2, 1, 0, 1000),
newEvent(1, 0, 600),
newEvent(2, 0, 700),
newEvent(2, 1, 800),
newEvent(2, 2, 900),
newEvent(2, 3, 1000),
}
)

func ledgerCloseTime(seq uint32) int64 {
return int64(seq)*25 + 100
}

func newEvent(txIndex, opIndex, eventIndex, val uint32) event {
func newEvent(txIndex, eventIndex, val uint32) event {
v := xdr.Uint32(val)
return event{
contents: xdr.DiagnosticEvent{
InSuccessfulContractCall: true,
Event: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
},

e := xdr.DiagnosticEvent{
InSuccessfulContractCall: true,
Event: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
},
},
},
},
txIndex: txIndex,
opIndex: opIndex,
eventIndex: eventIndex,
}
}

func mustMarshal(e xdr.DiagnosticEvent) string {
result, err := xdr.MarshalBase64(e)
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
panic(err)
}
return result
return event{
diagnosticEventXDR: diagnosticEventXDR,
txIndex: txIndex,
eventIndex: eventIndex,
}
}

func (e event) equals(other event) bool {
return e.txIndex == other.txIndex &&
e.opIndex == other.opIndex &&
e.eventIndex == other.eventIndex &&
mustMarshal(e.contents) == mustMarshal(other.contents)
bytes.Equal(e.diagnosticEventXDR, other.diagnosticEventXDR)
}

func eventsAreEqual(t *testing.T, a, b []event) {
Expand All @@ -86,7 +83,7 @@ func eventsAreEqual(t *testing.T, a, b []event) {

func TestScanRangeValidation(t *testing.T) {
m := NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 4)
assertNoCalls := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, timestamp int64) bool {
assertNoCalls := func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool {
t.Fatalf("unexpected call")
return true
}
Expand Down Expand Up @@ -291,7 +288,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 5, Tx: 1, Op: 2},
Start: Cursor{Ledger: 5, Tx: 2},
ClampStart: false,
End: Cursor{Ledger: 9},
ClampEnd: false,
Expand Down Expand Up @@ -327,7 +324,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0},
Start: Cursor{Ledger: 8, Tx: 2, Event: 3},
ClampStart: false,
End: MaxCursor,
ClampEnd: true,
Expand All @@ -336,7 +333,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0},
Start: Cursor{Ledger: 8, Tx: 2, Event: 3},
ClampStart: false,
End: Cursor{Ledger: 9},
ClampEnd: false,
Expand All @@ -354,9 +351,9 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 5, Tx: 1, Op: 2},
Start: Cursor{Ledger: 5, Tx: 2},
ClampStart: false,
End: Cursor{Ledger: 8, Tx: 1, Op: 4},
End: Cursor{Ledger: 8, Tx: 2},
ClampEnd: false,
},
concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events[:1]),
Expand All @@ -365,13 +362,15 @@ func TestScan(t *testing.T) {
for _, input := range genEquivalentInputs(testCase.input) {
var events []event
iterateAll := true
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool {
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64, hash *xdr.Hash) bool {
require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp)
diagnosticEventXDR, err := contractEvent.MarshalBinary()
require.NoError(t, err)
events = append(events, event{
contents: contractEvent,
txIndex: cursor.Tx,
opIndex: cursor.Op,
eventIndex: cursor.Event,
diagnosticEventXDR: diagnosticEventXDR,
txIndex: cursor.Tx,
eventIndex: cursor.Event,
txHash: hash,
})
return iterateAll
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func NewLedgerBucketWindow[T any](retentionWindow uint32) *LedgerBucketWindow[T]
}

// Append adds a new bucket to the window. If the window is full a bucket will be evicted and returned.
func (w *LedgerBucketWindow[T]) Append(bucket LedgerBucket[T]) *LedgerBucket[T] {
func (w *LedgerBucketWindow[T]) Append(bucket LedgerBucket[T]) (*LedgerBucket[T], error) {
length := w.Len()
if length > 0 {
expectedLedgerSequence := w.buckets[w.start].LedgerSeq + length
if expectedLedgerSequence != bucket.LedgerSeq {
panic(fmt.Errorf("ledgers not contiguous: expected ledger sequence %v but received %v", expectedLedgerSequence, bucket.LedgerSeq))
return &LedgerBucket[T]{}, fmt.Errorf("error appending ledgers: ledgers not contiguous: expected ledger sequence %v but received %v", expectedLedgerSequence, bucket.LedgerSeq)
}
}

Expand All @@ -57,7 +57,7 @@ func (w *LedgerBucketWindow[T]) Append(bucket LedgerBucket[T]) *LedgerBucket[T]
w.start = (w.start + 1) % length
}

return evicted
return evicted, nil
}

// Len returns the length (number of buckets in the window)
Expand Down
Loading

0 comments on commit 97ab094

Please sign in to comment.