Skip to content

Commit

Permalink
Merge branch 'main' of https://github.com/stellar/soroban-rpc into rp…
Browse files Browse the repository at this point in the history
…c-publish-dry-run
  • Loading branch information
stellarsaur committed Feb 13, 2024
2 parents 3feacbc + 258cef7 commit 7b99e3a
Show file tree
Hide file tree
Showing 26 changed files with 327 additions and 70 deletions.
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ type Config struct {
EventLedgerRetentionWindow uint32
FriendbotURL string
HistoryArchiveURLs []string
HistoryArchiveUserAgent string
IngestionTimeout time.Duration
LogFormat LogFormat
LogLevel logrus.Level
Expand Down Expand Up @@ -64,6 +65,13 @@ type Config struct {
flagset *pflag.FlagSet
}

func (cfg *Config) ExtendedUserAgent(extension string) string {
if cfg.HistoryArchiveUserAgent == "" {
return extension
}
return cfg.HistoryArchiveUserAgent + "/" + extension
}

func (cfg *Config) SetValues(lookupEnv func(string) (string, bool)) error {
// We start with the defaults
if err := cfg.loadDefaults(); err != nil {
Expand Down
8 changes: 8 additions & 0 deletions cmd/soroban-rpc/internal/config/config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,14 @@ func TestConfigLoadDefaults(t *testing.T) {
assert.Equal(t, uint(runtime.NumCPU()), cfg.PreflightWorkerCount)
}

func TestConfigExtendedUserAgent(t *testing.T) {
cfg := Config{
HistoryArchiveUserAgent: "Test",
}
require.NoError(t, cfg.loadDefaults())
assert.Equal(t, "Test/123", cfg.ExtendedUserAgent("123"))
}

func TestConfigLoadFlagsDefaultValuesOverrideExisting(t *testing.T) {
// Set up a config with an existing non-default value
cfg := Config{
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/config/options_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func TestAllConfigFieldsMustHaveASingleOption(t *testing.T) {

// Allow us to explicitly exclude any fields on the Config struct, which are not going to have Options.
// e.g. "ConfigPath"
excluded := map[string]bool{}
excluded := map[string]bool{"HistoryArchiveUserAgent": true}

cfg := Config{}
cfgValue := reflect.ValueOf(cfg)
Expand Down
13 changes: 10 additions & 3 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/stellar/go/ingest/ledgerbackend"
supporthttp "github.com/stellar/go/support/http"
supportlog "github.com/stellar/go/support/log"
"github.com/stellar/go/support/storage"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-tools/cmd/soroban-rpc/internal"
Expand Down Expand Up @@ -121,7 +122,7 @@ func newCaptiveCore(cfg *config.Config, logger *supportlog.Entry) (*ledgerbacken
CheckpointFrequency: cfg.CheckpointFrequency,
Log: logger.WithField("subservice", "stellar-core"),
Toml: captiveCoreToml,
UserAgent: "captivecore",
UserAgent: cfg.ExtendedUserAgent("captivecore"),
UseDB: true,
}
return ledgerbackend.NewCaptive(captiveConfig)
Expand All @@ -144,12 +145,18 @@ func MustNew(cfg *config.Config) *Daemon {
if len(cfg.HistoryArchiveURLs) == 0 {
logger.Fatal("no history archives url were provided")
}
historyArchive, err := historyarchive.Connect(
cfg.HistoryArchiveURLs[0],

historyArchive, err := historyarchive.NewArchivePool(
cfg.HistoryArchiveURLs,
historyarchive.ArchiveOptions{
NetworkPassphrase: cfg.NetworkPassphrase,
CheckpointFrequency: cfg.CheckpointFrequency,
ConnectOptions: storage.ConnectOptions{
Context: context.Background(),
UserAgent: cfg.HistoryArchiveUserAgent},
},
)

if err != nil {
logger.WithError(err).Fatal("could not connect to history archive")
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ type event struct {
diagnosticEventXDR []byte
txIndex uint32
eventIndex uint32
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
txHash *xdr.Hash // intentionally stored as a pointer to save memory (amortized as soon as there are two events in a transaction)
}

func (e event) cursor(ledgerSeq uint32) Cursor {
Expand Down
8 changes: 4 additions & 4 deletions cmd/soroban-rpc/internal/ingest/ledgerentry.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,10 @@ func (s *Service) ingestLedgerEntryChanges(ctx context.Context, reader ingest.Ch
results := changeStatsProcessor.GetResults()
for stat, value := range results.Map() {
stat = strings.Replace(stat, "stats_", "change_", 1)
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": stat}).Add(float64(value.(int64)))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand All @@ -66,10 +66,10 @@ func (s *Service) ingestTempLedgerEntryEvictions(
}

for evictionType, count := range counts {
s.ledgerStatsMetric.
s.metrics.ledgerStatsMetric.
With(prometheus.Labels{"type": evictionType}).Add(float64(count))
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "evicted_temp_ledger_entries"}).Observe(time.Since(startTime).Seconds())
return ctx.Err()
}
Expand Down
57 changes: 33 additions & 24 deletions cmd/soroban-rpc/internal/ingest/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,19 +73,24 @@ func newService(cfg Config) *Service {
[]string{"type"},
)

cfg.Daemon.MetricsRegistry().MustRegister(ingestionDurationMetric, latestLedgerMetric, ledgerStatsMetric)
cfg.Daemon.MetricsRegistry().MustRegister(
ingestionDurationMetric,
latestLedgerMetric,
ledgerStatsMetric)

service := &Service{
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
logger: cfg.Logger,
db: cfg.DB,
eventStore: cfg.EventStore,
transactionStore: cfg.TransactionStore,
ledgerBackend: cfg.LedgerBackend,
networkPassPhrase: cfg.NetworkPassPhrase,
timeout: cfg.Timeout,
metrics: Metrics{
ingestionDurationMetric: ingestionDurationMetric,
latestLedgerMetric: latestLedgerMetric,
ledgerStatsMetric: ledgerStatsMetric,
},
}

return service
Expand Down Expand Up @@ -119,21 +124,25 @@ func startService(service *Service, cfg Config) {
})
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
type Metrics struct {
ingestionDurationMetric *prometheus.SummaryVec
latestLedgerMetric prometheus.Gauge
ledgerStatsMetric *prometheus.CounterVec
}

type Service struct {
logger *log.Entry
db db.ReadWriter
eventStore *events.MemoryStore
transactionStore *transactions.MemoryStore
ledgerBackend backends.LedgerBackend
timeout time.Duration
networkPassPhrase string
done context.CancelFunc
wg sync.WaitGroup
metrics Metrics
}

func (s *Service) Close() error {
s.done()
s.wg.Wait()
Expand Down Expand Up @@ -286,9 +295,9 @@ func (s *Service) ingest(ctx context.Context, sequence uint32) error {
}
s.logger.Debugf("Ingested ledger %d", sequence)

s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "total"}).Observe(time.Since(startTime).Seconds())
s.latestLedgerMetric.Set(float64(sequence))
s.metrics.latestLedgerMetric.Set(float64(sequence))
return nil
}

Expand All @@ -297,7 +306,7 @@ func (s *Service) ingestLedgerCloseMeta(tx db.WriteTx, ledgerCloseMeta xdr.Ledge
if err := tx.LedgerWriter().InsertLedger(ledgerCloseMeta); err != nil {
return err
}
s.ingestionDurationMetric.
s.metrics.ingestionDurationMetric.
With(prometheus.Labels{"type": "ledger_close_meta"}).Observe(time.Since(startTime).Seconds())

if err := s.eventStore.IngestEvents(ledgerCloseMeta); err != nil {
Expand Down
1 change: 1 addition & 0 deletions cmd/soroban-rpc/internal/ingest/service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ func TestRetryRunningIngestion(t *testing.T) {
func TestIngestion(t *testing.T) {
mockDB := &MockDB{}
mockLedgerBackend := &ledgerbackend.MockDatabaseBackend{}

daemon := interfaces.MakeNoOpDeamon()
config := Config{
Logger: supportlog.New(),
Expand Down
6 changes: 6 additions & 0 deletions cmd/soroban-rpc/internal/methods/get_transaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,10 @@ type GetTransactionResponse struct {
Ledger uint32 `json:"ledger,omitempty"`
// LedgerCloseTime is the unix timestamp of when the transaction was included in the ledger.
LedgerCloseTime int64 `json:"createdAt,string,omitempty"`

// DiagnosticEventsXDR is present only if Status is equal to TransactionFailed.
// DiagnosticEventsXDR is a base64-encoded slice of xdr.DiagnosticEvent
DiagnosticEventsXDR []string `json:"diagnosticEventsXdr,omitempty"`
}

type GetTransactionRequest struct {
Expand Down Expand Up @@ -104,6 +108,8 @@ func GetTransaction(getter transactionGetter, request GetTransactionRequest) (Ge
response.ResultXdr = base64.StdEncoding.EncodeToString(tx.Result)
response.EnvelopeXdr = base64.StdEncoding.EncodeToString(tx.Envelope)
response.ResultMetaXdr = base64.StdEncoding.EncodeToString(tx.Meta)
response.DiagnosticEventsXDR = base64EncodeSlice(tx.Events)

if tx.Successful {
response.Status = TransactionStatusSuccess
} else {
Expand Down
80 changes: 80 additions & 0 deletions cmd/soroban-rpc/internal/methods/get_transaction_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,44 @@ func txMeta(acctSeq uint32, successful bool) xdr.LedgerCloseMeta {
}
}

func txMetaWithEvents(acctSeq uint32, successful bool) xdr.LedgerCloseMeta {

meta := txMeta(acctSeq, successful)

contractIDBytes, _ := hex.DecodeString("df06d62447fd25da07c0135eed7557e5a5497ee7d15b7fe345bd47e191d8f577")
var contractID xdr.Hash
copy(contractID[:], contractIDBytes)
counter := xdr.ScSymbol("COUNTER")

meta.V1.TxProcessing[0].TxApplyProcessing.V3 = &xdr.TransactionMetaV3{
SorobanMeta: &xdr.SorobanTransactionMeta{
Events: []xdr.ContractEvent{{
ContractId: &contractID,
Type: xdr.ContractEventTypeContract,
Body: xdr.ContractEventBody{
V: 0,
V0: &xdr.ContractEventV0{
Topics: []xdr.ScVal{{
Type: xdr.ScValTypeScvSymbol,
Sym: &counter,
}},
Data: xdr.ScVal{
Type: xdr.ScValTypeScvSymbol,
Sym: &counter,
},
},
},
}},
ReturnValue: xdr.ScVal{
Type: xdr.ScValTypeScvSymbol,
Sym: &counter,
},
},
}

return meta
}

func txEnvelope(acctSeq uint32) xdr.TransactionEnvelope {
envelope, err := xdr.NewTransactionEnvelope(xdr.EnvelopeTypeEnvelopeTypeTx, xdr.TransactionV1Envelope{
Tx: xdr.Transaction{
Expand Down Expand Up @@ -154,6 +192,7 @@ func TestGetTransaction(t *testing.T) {
ResultMetaXdr: expectedTxMeta,
Ledger: 101,
LedgerCloseTime: 2625,
DiagnosticEventsXDR: []string{},
}, tx)

// ingest another (failed) transaction
Expand All @@ -177,6 +216,7 @@ func TestGetTransaction(t *testing.T) {
ResultMetaXdr: expectedTxMeta,
Ledger: 101,
LedgerCloseTime: 2625,
DiagnosticEventsXDR: []string{},
}, tx)

// the new transaction should also be there
Expand Down Expand Up @@ -206,5 +246,45 @@ func TestGetTransaction(t *testing.T) {
ResultMetaXdr: expectedTxMeta,
Ledger: 102,
LedgerCloseTime: 2650,
DiagnosticEventsXDR: []string{},
}, tx)

// Test Txn with events
meta = txMetaWithEvents(3, true)
err = store.IngestTransactions(meta)
require.NoError(t, err)

xdrHash = txHash(3)
hash = hex.EncodeToString(xdrHash[:])

expectedTxResult, err = xdr.MarshalBase64(meta.V1.TxProcessing[0].Result.Result)
require.NoError(t, err)
expectedEnvelope, err = xdr.MarshalBase64(txEnvelope(3))
require.NoError(t, err)
expectedTxMeta, err = xdr.MarshalBase64(meta.V1.TxProcessing[0].TxApplyProcessing)
require.NoError(t, err)

diagnosticEvents, err := meta.V1.TxProcessing[0].TxApplyProcessing.GetDiagnosticEvents()
require.NoError(t, err)
expectedEventsMeta, err := xdr.MarshalBase64(diagnosticEvents[0])

tx, err = GetTransaction(store, GetTransactionRequest{hash})
require.NoError(t, err)
require.NoError(t, err)
require.Equal(t, GetTransactionResponse{
Status: TransactionStatusSuccess,
LatestLedger: 103,
LatestLedgerCloseTime: 2675,
OldestLedger: 101,
OldestLedgerCloseTime: 2625,
ApplicationOrder: 1,
FeeBump: false,
EnvelopeXdr: expectedEnvelope,
ResultXdr: expectedTxResult,
ResultMetaXdr: expectedTxMeta,
Ledger: 103,
LedgerCloseTime: 2675,
DiagnosticEventsXDR: []string{expectedEventsMeta},
}, tx)

}
25 changes: 25 additions & 0 deletions cmd/soroban-rpc/internal/test/archive_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package test

import (
"net/http"
"sync"
"testing"

"github.com/stretchr/testify/assert"
)

func TestArchiveUserAgent(t *testing.T) {
userAgents := sync.Map{}
cfg := &TestConfig{
historyArchiveProxyCallback: func(r *http.Request) {
userAgents.Store(r.Header["User-Agent"][0], "")
},
}
NewTest(t, cfg)

_, ok := userAgents.Load("testing")
assert.True(t, ok, "rpc service should set user agent for history archives")

_, ok = userAgents.Load("testing/captivecore")
assert.True(t, ok, "rpc captive core should set user agent for history archives")
}
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/cli_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ func getCLIDefaultAccount(t *testing.T) string {
}

func NewCLITest(t *testing.T) *Test {
test := NewTest(t)
test := NewTest(t, nil)
fundAccount(t, test, getCLIDefaultAccount(t), "1000000")
return test
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/test/cors_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
// Specifically, when we include an Origin header in the request, a soroban-rpc should response
// with a corresponding Access-Control-Allow-Origin.
func TestCORS(t *testing.T) {
test := NewTest(t)
test := NewTest(t, nil)

request, err := http.NewRequest("POST", test.sorobanRPCURL(), bytes.NewBufferString("{\"jsonrpc\": \"2.0\", \"id\": 1, \"method\": \"getHealth\"}"))
require.NoError(t, err)
Expand Down
Loading

0 comments on commit 7b99e3a

Please sign in to comment.