Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

soroban-rpc: Pull in Recent Soroban RPC changes from soroban-tools #55

Merged
merged 3 commits into from
Feb 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions cmd/soroban-rpc/internal/config/config_option.go
Original file line number Diff line number Diff line change
Expand Up @@ -88,8 +88,6 @@ func (o *ConfigOption) setValue(i interface{}) (err error) {
if o.CustomSetValue != nil {
return o.CustomSetValue(o, i)
}
// it's unfortunate that Set below panics when it cannot set the value..
// we'll want to catch this so that we can alert the user nicely.
defer func() {
if recoverRes := recover(); recoverRes != nil {
var ok bool
Expand All @@ -101,7 +99,7 @@ func (o *ConfigOption) setValue(i interface{}) (err error) {
}
}()
parser := func(option *ConfigOption, i interface{}) error {
panic(fmt.Sprintf("no parser for flag %s", o.Name))
return errors.Errorf("no parser for flag %s", o.Name)
}
switch o.ConfigKey.(type) {
case *bool:
Expand Down
11 changes: 11 additions & 0 deletions cmd/soroban-rpc/internal/config/config_option_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"math"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -115,6 +116,16 @@ func TestUnassignableField(t *testing.T) {
require.Contains(t, err.Error(), co.Name)
}

func TestNoParserForFlag(t *testing.T) {
var co ConfigOption
var invalidKey []time.Duration
co.Name = "mykey"
co.ConfigKey = &invalidKey
err := co.setValue("abc")
require.Error(t, err)
require.Contains(t, err.Error(), "no parser for flag mykey")
}

func TestSetValue(t *testing.T) {
var b bool
var i int
Expand Down
12 changes: 7 additions & 5 deletions cmd/soroban-rpc/internal/config/log_format.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package config

import "fmt"
import (
"fmt"
)

type LogFormat int

Expand Down Expand Up @@ -47,13 +49,13 @@ func (f *LogFormat) UnmarshalTOML(i interface{}) error {
}
}

func (f LogFormat) String() string {
func (f LogFormat) String() (string, error) {
switch f {
case LogFormatText:
return "text"
return "text", nil
case LogFormatJSON:
return "json"
return "json", nil
default:
panic(fmt.Sprintf("unknown log format: %d", f))
return "", fmt.Errorf("unknown log format: %d", f)
}
}
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (cfg *Config) options() ConfigOptions {
return nil
},
MarshalTOML: func(option *ConfigOption) (interface{}, error) {
return cfg.LogFormat.String(), nil
return cfg.LogFormat.String()
},
},
{
Expand Down
2 changes: 2 additions & 0 deletions cmd/soroban-rpc/internal/events/cursor.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ type Cursor struct {
// Tx is the index of the transaction within the ledger which emitted the event.
Tx uint32
// Op is the index of the operation within the transaction which emitted the event.
// Note: Currently, there is no use for it (events are transaction-wide and not operation-specific)
// but we keep it in order to make the API future-proof.
Op uint32
// Event is the index of the event within in the operation which emitted the event.
Event uint32
Expand Down
48 changes: 26 additions & 22 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,24 +15,17 @@ import (
"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal/ledgerbucketwindow"
)

type bucket struct {
ledgerSeq uint32
ledgerCloseTimestamp int64
events []event
}

type event struct {
contents xdr.DiagnosticEvent
txIndex uint32
opIndex uint32
eventIndex uint32
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
}

func (e event) cursor(ledgerSeq uint32) Cursor {
return Cursor{
Ledger: ledgerSeq,
Tx: e.txIndex,
Op: e.opIndex,
Event: e.eventIndex,
}
}
Expand Down Expand Up @@ -98,13 +91,15 @@ type Range struct {
ClampEnd bool
}

type ScanFunction func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool

// Scan applies f on all the events occurring in the given range.
// The events are processed in sorted ascending Cursor order.
// If f returns false, the scan terminates early (f will not be applied on
// remaining events in the range). Note that a read lock is held for the
// entire duration of the Scan function so f should be written in a way
// to minimize latency.
func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor, int64) bool) (uint32, error) {
func (m *MemoryStore) Scan(eventRange Range, f ScanFunction) (uint32, error) {
startTime := time.Now()
m.lock.RLock()
defer m.lock.RUnlock()
Expand All @@ -129,7 +124,12 @@ func (m *MemoryStore) Scan(eventRange Range, f func(xdr.DiagnosticEvent, Cursor,
if eventRange.End.Cmp(cur) <= 0 {
return lastLedgerInWindow, nil
}
if !f(event.contents, cur, timestamp) {
var diagnosticEvent xdr.DiagnosticEvent
err := xdr.SafeUnmarshal(event.diagnosticEventXDR, &diagnosticEvent)
if err != nil {
return 0, err
}
if !f(diagnosticEvent, cur, timestamp, event.txHash) {
return lastLedgerInWindow, nil
}
}
Expand Down Expand Up @@ -201,7 +201,9 @@ func (m *MemoryStore) IngestEvents(ledgerCloseMeta xdr.LedgerCloseMeta) error {
BucketContent: events,
}
m.lock.Lock()
m.eventsByLedger.Append(bucket)
if _, err = m.eventsByLedger.Append(bucket); err != nil {
return err
}
m.lock.Unlock()
m.eventsDurationMetric.With(prometheus.Labels{"operation": "ingest"}).
Observe(time.Since(startTime).Seconds())
Expand Down Expand Up @@ -236,20 +238,22 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
if !tx.Result.Successful() {
continue
}

txEvents, err := tx.GetDiagnosticEvents()
if err != nil {
return nil, err
}
txHash := tx.Result.TransactionHash
for index, e := range txEvents {
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
return nil, err
}
events = append(events, event{
contents: e,
txIndex: tx.Index,
// NOTE: we cannot really index by operation since all events
// are provided as part of the transaction. However,
// that shouldn't matter in practice since a transaction
// can only contain a single Host Function Invocation.
opIndex: 0,
eventIndex: uint32(index),
diagnosticEventXDR: diagnosticEventXDR,
txIndex: tx.Index,
eventIndex: uint32(index),
txHash: &txHash,
})
}
}
Expand Down
87 changes: 43 additions & 44 deletions cmd/soroban-rpc/internal/events/events_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package events

import (
"bytes"
"testing"

"github.com/stellar/go/xdr"
Expand All @@ -13,68 +14,64 @@ import (
var (
ledger5CloseTime = ledgerCloseTime(5)
ledger5Events = []event{
newEvent(1, 0, 0, 100),
newEvent(1, 0, 1, 200),
newEvent(2, 0, 0, 300),
newEvent(2, 1, 0, 400),
newEvent(1, 0, 100),
newEvent(1, 1, 200),
newEvent(2, 0, 300),
newEvent(2, 1, 400),
}
ledger6CloseTime = ledgerCloseTime(6)
ledger6Events []event = nil
ledger7CloseTime = ledgerCloseTime(7)
ledger7Events = []event{
newEvent(1, 0, 0, 500),
newEvent(1, 0, 500),
}
ledger8CloseTime = ledgerCloseTime(8)
ledger8Events = []event{
newEvent(1, 0, 0, 600),
newEvent(2, 0, 0, 700),
newEvent(2, 0, 1, 800),
newEvent(2, 0, 2, 900),
newEvent(2, 1, 0, 1000),
newEvent(1, 0, 600),
newEvent(2, 0, 700),
newEvent(2, 1, 800),
newEvent(2, 2, 900),
newEvent(2, 3, 1000),
}
)

func ledgerCloseTime(seq uint32) int64 {
return int64(seq)*25 + 100
}

func newEvent(txIndex, opIndex, eventIndex, val uint32) event {
func newEvent(txIndex, eventIndex, val uint32) event {
v := xdr.Uint32(val)
return event{
contents: xdr.DiagnosticEvent{
InSuccessfulContractCall: true,
Event: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
},

e := xdr.DiagnosticEvent{
InSuccessfulContractCall: true,
Event: xdr.ContractEvent{
Type: xdr.ContractEventTypeSystem,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Data: xdr.ScVal{
Type: xdr.ScValTypeScvU32,
U32: &v,
},
},
},
},
txIndex: txIndex,
opIndex: opIndex,
eventIndex: eventIndex,
}
}

func mustMarshal(e xdr.DiagnosticEvent) string {
result, err := xdr.MarshalBase64(e)
diagnosticEventXDR, err := e.MarshalBinary()
if err != nil {
panic(err)
}
return result
return event{
diagnosticEventXDR: diagnosticEventXDR,
txIndex: txIndex,
eventIndex: eventIndex,
}
}

func (e event) equals(other event) bool {
return e.txIndex == other.txIndex &&
e.opIndex == other.opIndex &&
e.eventIndex == other.eventIndex &&
mustMarshal(e.contents) == mustMarshal(other.contents)
bytes.Equal(e.diagnosticEventXDR, other.diagnosticEventXDR)
}

func eventsAreEqual(t *testing.T, a, b []event) {
Expand All @@ -86,7 +83,7 @@ func eventsAreEqual(t *testing.T, a, b []event) {

func TestScanRangeValidation(t *testing.T) {
m := NewMemoryStore(interfaces.MakeNoOpDeamon(), "unit-tests", 4)
assertNoCalls := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, timestamp int64) bool {
assertNoCalls := func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool {
t.Fatalf("unexpected call")
return true
}
Expand Down Expand Up @@ -291,7 +288,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 5, Tx: 1, Op: 2},
Start: Cursor{Ledger: 5, Tx: 2},
ClampStart: false,
End: Cursor{Ledger: 9},
ClampEnd: false,
Expand Down Expand Up @@ -327,7 +324,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0},
Start: Cursor{Ledger: 8, Tx: 2, Event: 3},
ClampStart: false,
End: MaxCursor,
ClampEnd: true,
Expand All @@ -336,7 +333,7 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 8, Tx: 2, Op: 1, Event: 0},
Start: Cursor{Ledger: 8, Tx: 2, Event: 3},
ClampStart: false,
End: Cursor{Ledger: 9},
ClampEnd: false,
Expand All @@ -354,9 +351,9 @@ func TestScan(t *testing.T) {
},
{
Range{
Start: Cursor{Ledger: 5, Tx: 1, Op: 2},
Start: Cursor{Ledger: 5, Tx: 2},
ClampStart: false,
End: Cursor{Ledger: 8, Tx: 1, Op: 4},
End: Cursor{Ledger: 8, Tx: 2},
ClampEnd: false,
},
concat(ledger5Events[2:], ledger6Events, ledger7Events, ledger8Events[:1]),
Expand All @@ -365,13 +362,15 @@ func TestScan(t *testing.T) {
for _, input := range genEquivalentInputs(testCase.input) {
var events []event
iterateAll := true
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64) bool {
f := func(contractEvent xdr.DiagnosticEvent, cursor Cursor, ledgerCloseTimestamp int64, hash *xdr.Hash) bool {
require.Equal(t, ledgerCloseTime(cursor.Ledger), ledgerCloseTimestamp)
diagnosticEventXDR, err := contractEvent.MarshalBinary()
require.NoError(t, err)
events = append(events, event{
contents: contractEvent,
txIndex: cursor.Tx,
opIndex: cursor.Op,
eventIndex: cursor.Event,
diagnosticEventXDR: diagnosticEventXDR,
txIndex: cursor.Tx,
eventIndex: cursor.Event,
txHash: hash,
})
return iterateAll
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,12 +35,12 @@ func NewLedgerBucketWindow[T any](retentionWindow uint32) *LedgerBucketWindow[T]
}

// Append adds a new bucket to the window. If the window is full a bucket will be evicted and returned.
func (w *LedgerBucketWindow[T]) Append(bucket LedgerBucket[T]) *LedgerBucket[T] {
func (w *LedgerBucketWindow[T]) Append(bucket LedgerBucket[T]) (*LedgerBucket[T], error) {
length := w.Len()
if length > 0 {
expectedLedgerSequence := w.buckets[w.start].LedgerSeq + length
if expectedLedgerSequence != bucket.LedgerSeq {
panic(fmt.Errorf("ledgers not contiguous: expected ledger sequence %v but received %v", expectedLedgerSequence, bucket.LedgerSeq))
return &LedgerBucket[T]{}, fmt.Errorf("error appending ledgers: ledgers not contiguous: expected ledger sequence %v but received %v", expectedLedgerSequence, bucket.LedgerSeq)
}
}

Expand All @@ -57,7 +57,7 @@ func (w *LedgerBucketWindow[T]) Append(bucket LedgerBucket[T]) *LedgerBucket[T]
w.start = (w.start + 1) % length
}

return evicted
return evicted, nil
}

// Len returns the length (number of buckets in the window)
Expand Down
Loading
Loading