Skip to content

Commit

Permalink
Merge branch 'main' into add-getVersion-endpoint
Browse files Browse the repository at this point in the history
  • Loading branch information
psheth9 committed Apr 15, 2024
2 parents 1010049 + 367a936 commit 4c283d4
Show file tree
Hide file tree
Showing 26 changed files with 568 additions and 146 deletions.
3 changes: 3 additions & 0 deletions .github/actions/setup-integration-tests/action.yml
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,9 @@ runs:
sudo apt-get remove -y moby-compose
sudo apt-get install -y docker-compose-plugin
# add alias for docker compose
ln -f -s /usr/libexec/docker/cli-plugins/docker-compose /usr/local/bin/docker-compose
echo "Docker Compose Version:"
docker-compose version
Expand Down
6 changes: 3 additions & 3 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ jobs:
runs-on: ${{ matrix.os }}
env:
VERSION: '${{ github.event.release.name }}'
NAME: 'soroban-rpc-${{ github.event.release.name }}-${{ matrix.target }}'
NAME: 'stellar-rpc-client-${{ github.event.release.name }}-${{ matrix.target }}'
steps:
- uses: actions/checkout@v3
- run: rustup update
Expand All @@ -49,8 +49,8 @@ jobs:
CARGO_TARGET_AARCH64_UNKNOWN_LINUX_GNU_LINKER: aarch64-linux-gnu-gcc
run: |
cd target/package
tar xvfz soroban-rpc-$VERSION.crate
cd soroban-rpc-$VERSION
tar xvfz stellar-rpc-client-$VERSION.crate
cd stellar-rpc-client-$VERSION
cargo build --target-dir=../.. --release --target ${{ matrix.target }}
- name: Compress
run: |
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/rust.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ jobs:
cargo-hack-feature-options: --features opt --ignore-unknown-features
uses: stellar/actions/.github/workflows/rust-publish-dry-run-v2.yml@main
with:
crates: soroban-rpc
crates: stellar-rpc-client
runs-on: ${{ matrix.os }}
target: ${{ matrix.target }}
cargo-hack-feature-options: ${{ matrix.cargo-hack-feature-options }}
3 changes: 1 addition & 2 deletions .github/workflows/soroban-rpc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,6 @@ jobs:
matrix:
os: [ubuntu-20.04, ubuntu-22.04]
go: [1.22]
test: ['.*CLI.*', '^Test(([^C])|(C[^L])|(CL[^I])).*$']
runs-on: ${{ matrix.os }}
env:
SOROBAN_RPC_INTEGRATION_TESTS_ENABLED: true
Expand All @@ -127,4 +126,4 @@ jobs:
- name: Run Soroban RPC Integration Tests
run: |
make install_rust
go test -race -run '${{ matrix.test }}' -timeout 60m -v ./cmd/soroban-rpc/internal/test/...
go test -race -timeout 60m -v ./cmd/soroban-rpc/internal/test/...
4 changes: 2 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ default-members = ["cmd/crates/stellar-rpc-client"]
#exclude = ["cmd/crates/soroban-test/tests/fixtures/hello"]

[workspace.package]
version = "20.3.3"
version = "20.3.5"
rust-version = "1.74.0"

[workspace.dependencies.soroban-env-host]
Expand Down Expand Up @@ -61,7 +61,7 @@ version = "=20.3.2"
# rev = "4aef54ff9295c2fca4c5b9fbd2c92d0ff99f67de"

[workspace.dependencies.stellar-rpc-client]
version = "20.3.3"
version = "20.3.5"
path = "cmd/crates/stellar-rpc-client"

[workspace.dependencies.stellar-xdr]
Expand Down
71 changes: 51 additions & 20 deletions cmd/crates/stellar-rpc-client/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
use http::{uri::Authority, Uri};
use itertools::Itertools;
use jsonrpsee_core::params::ObjectParams;
use jsonrpsee_core::{self, client::ClientT, rpc_params};
use jsonrpsee_core::{self, client::ClientT};
use jsonrpsee_http_client::{HeaderMap, HttpClient, HttpClientBuilder};
use serde_aux::prelude::{
deserialize_default_from_null, deserialize_number_from_string,
Expand Down Expand Up @@ -279,6 +279,21 @@ pub struct SimulateHostFunctionResult {
pub xdr: xdr::ScVal,
}

#[derive(serde::Deserialize, serde::Serialize, Debug, Clone, PartialEq)]
#[serde(tag = "type")]
pub enum LedgerEntryChange {
#[serde(rename = "created")]
Created { key: String, after: String },
#[serde(rename = "deleted")]
Deleted { key: String, before: String },
#[serde(rename = "updated")]
Updated {
key: String,
before: String,
after: String,
},
}

#[derive(serde::Deserialize, serde::Serialize, Debug, Default, Clone)]
pub struct SimulateTransactionResponse {
#[serde(
Expand All @@ -305,6 +320,12 @@ pub struct SimulateTransactionResponse {
default
)]
pub restore_preamble: Option<RestorePreamble>,
#[serde(
rename = "stateChanges",
skip_serializing_if = "Option::is_none",
default
)]
pub state_changes: Option<Vec<LedgerEntryChange>>,
#[serde(rename = "latestLedger")]
pub latest_ledger: u32,
#[serde(skip_serializing_if = "Option::is_none", default)]
Expand Down Expand Up @@ -637,7 +658,10 @@ impl Client {
/// # Errors
pub async fn get_network(&self) -> Result<GetNetworkResponse, Error> {
tracing::trace!("Getting network");
Ok(self.client()?.request("getNetwork", rpc_params![]).await?)
Ok(self
.client()?
.request("getNetwork", ObjectParams::new())
.await?)
}

///
Expand All @@ -646,7 +670,7 @@ impl Client {
tracing::trace!("Getting latest ledger");
Ok(self
.client()?
.request("getLatestLedger", rpc_params![])
.request("getLatestLedger", ObjectParams::new())
.await?)
}

Expand Down Expand Up @@ -691,16 +715,15 @@ soroban config identity fund {address} --helper-url <url>"#
) -> Result<GetTransactionResponse, Error> {
let client = self.client()?;
tracing::trace!("Sending:\n{tx:#?}");
let mut oparams = ObjectParams::new();
oparams.insert("transaction", tx.to_xdr_base64(Limits::none())?)?;
let SendTransactionResponse {
hash,
error_result_xdr,
status,
..
} = client
.request(
"sendTransaction",
rpc_params![tx.to_xdr_base64(Limits::none())?],
)
.request("sendTransaction", oparams)
.await
.map_err(|err| {
Error::TransactionSubmissionFailed(format!("No status yet:\n {err:#?}"))
Expand Down Expand Up @@ -761,11 +784,11 @@ soroban config identity fund {address} --helper-url <url>"#
) -> Result<SimulateTransactionResponse, Error> {
tracing::trace!("Simulating:\n{tx:#?}");
let base64_tx = tx.to_xdr_base64(Limits::none())?;
let mut builder = ObjectParams::new();
builder.insert("transaction", base64_tx)?;
let mut oparams = ObjectParams::new();
oparams.insert("transaction", base64_tx)?;
let response: SimulateTransactionResponse = self
.client()?
.request("simulateTransaction", builder)
.request("simulateTransaction", oparams)
.await?;
tracing::trace!("Simulation response:\n {response:#?}");
match response.error {
Expand Down Expand Up @@ -835,10 +858,9 @@ soroban config identity fund {address} --helper-url <url>"#
///
/// # Errors
pub async fn get_transaction(&self, tx_id: &str) -> Result<GetTransactionResponseRaw, Error> {
Ok(self
.client()?
.request("getTransaction", rpc_params![tx_id])
.await?)
let mut oparams = ObjectParams::new();
oparams.insert("hash", tx_id)?;
Ok(self.client()?.request("getTransaction", oparams).await?)
}

///
Expand All @@ -855,10 +877,9 @@ soroban config identity fund {address} --helper-url <url>"#
}
base64_keys.push(k.to_xdr_base64(Limits::none())?);
}
Ok(self
.client()?
.request("getLedgerEntries", rpc_params![base64_keys])
.await?)
let mut oparams = ObjectParams::new();
oparams.insert("keys", base64_keys)?;
Ok(self.client()?.request("getLedgerEntries", oparams).await?)
}

///
Expand Down Expand Up @@ -1070,10 +1091,20 @@ mod tests {
"minResourceFee": "100000000",
"cost": { "cpuInsns": "1000", "memBytes": "1000" },
"transactionData": "",
"latestLedger": 1234
}"#;
"latestLedger": 1234,
"stateChanges": [{
"type": "created",
"key": "AAAAAAAAAABuaCbVXZ2DlXWarV6UxwbW3GNJgpn3ASChIFp5bxSIWg==",
"before": null,
"after": "AAAAZAAAAAAAAAAAbmgm1V2dg5V1mq1elMcG1txjSYKZ9wEgoSBaeW8UiFoAAAAAAAAAZAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA="
}]
}"#;

let resp: SimulateTransactionResponse = serde_json::from_str(s).unwrap();
assert_eq!(
resp.state_changes.unwrap()[0],
LedgerEntryChange::Created { key: "AAAAAAAAAABuaCbVXZ2DlXWarV6UxwbW3GNJgpn3ASChIFp5bxSIWg==".to_string(), after: "AAAAZAAAAAAAAAAAbmgm1V2dg5V1mq1elMcG1txjSYKZ9wEgoSBaeW8UiFoAAAAAAAAAZAAAAAAAAAABAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=".to_string() },
);
assert_eq!(resp.min_resource_fee, 100_000_000);
}

Expand Down
38 changes: 30 additions & 8 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,11 @@ import (
)

const (
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second
prometheusNamespace = "soroban_rpc"
maxLedgerEntryWriteBatchSize = 150
defaultReadTimeout = 5 * time.Second
defaultShutdownGracePeriod = 10 * time.Second
inMemoryInitializationLedgerLogPeriod = 1_000_000
)

type Daemon struct {
Expand Down Expand Up @@ -137,6 +138,11 @@ func MustNew(cfg *config.Config) *Daemon {
logger.UseJSONFormatter()
}

logger.WithFields(supportlog.F{
"version": config.Version,
"commit": config.CommitHash,
}).Info("starting Soroban RPC")

core, err := newCaptiveCore(cfg, logger)
if err != nil {
logger.WithError(err).Fatal("could not create captive core")
Expand Down Expand Up @@ -196,7 +202,20 @@ func MustNew(cfg *config.Config) *Daemon {
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetads can be larger than the store retention windows)
// but it's probably not worth the pain.
var initialSeq uint32
var currentSeq uint32
err = db.NewLedgerReader(dbConn).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
logger.WithError(err).Fatal("could not initialize event memory store")
}
Expand All @@ -205,6 +224,11 @@ func MustNew(cfg *config.Config) *Daemon {
}
return nil
})
if currentSeq != 0 {
logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("finished initializing in-memory store")
}
if err != nil {
logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
Expand Down Expand Up @@ -285,10 +309,8 @@ func MustNew(cfg *config.Config) *Daemon {

func (d *Daemon) Run() {
d.logger.WithFields(supportlog.F{
"version": config.Version,
"commit": config.CommitHash,
"addr": d.server.Addr,
}).Info("starting Soroban JSON RPC server")
"addr": d.server.Addr,
}).Info("starting HTTP server")

panicGroup := util.UnrecoverablePanicGroup.Log(d.logger)
panicGroup.Go(func() {
Expand Down
34 changes: 23 additions & 11 deletions cmd/soroban-rpc/internal/events/events.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,18 +99,25 @@ type ScanFunction func(xdr.DiagnosticEvent, Cursor, int64, *xdr.Hash) bool
// remaining events in the range). Note that a read lock is held for the
// entire duration of the Scan function so f should be written in a way
// to minimize latency.
func (m *MemoryStore) Scan(eventRange Range, f ScanFunction) (uint32, error) {
func (m *MemoryStore) Scan(eventRange Range, f ScanFunction) (lastLedgerInWindow uint32, err error) {
startTime := time.Now()
defer func() {
if err == nil {
m.eventsDurationMetric.With(prometheus.Labels{"operation": "scan"}).
Observe(time.Since(startTime).Seconds())
}
}()

m.lock.RLock()
defer m.lock.RUnlock()

if err := m.validateRange(&eventRange); err != nil {
return 0, err
if err = m.validateRange(&eventRange); err != nil {
return
}

firstLedgerInRange := eventRange.Start.Ledger
firstLedgerInWindow := m.eventsByLedger.Get(0).LedgerSeq
lastLedgerInWindow := firstLedgerInWindow + (m.eventsByLedger.Len() - 1)
lastLedgerInWindow = firstLedgerInWindow + (m.eventsByLedger.Len() - 1)
for i := firstLedgerInRange - firstLedgerInWindow; i < m.eventsByLedger.Len(); i++ {
bucket := m.eventsByLedger.Get(i)
events := bucket.BucketContent
Expand All @@ -122,21 +129,19 @@ func (m *MemoryStore) Scan(eventRange Range, f ScanFunction) (uint32, error) {
for _, event := range events {
cur := event.cursor(bucket.LedgerSeq)
if eventRange.End.Cmp(cur) <= 0 {
return lastLedgerInWindow, nil
return
}
var diagnosticEvent xdr.DiagnosticEvent
err := xdr.SafeUnmarshal(event.diagnosticEventXDR, &diagnosticEvent)
err = xdr.SafeUnmarshal(event.diagnosticEventXDR, &diagnosticEvent)
if err != nil {
return 0, err
return
}
if !f(diagnosticEvent, cur, timestamp, event.txHash) {
return lastLedgerInWindow, nil
return
}
}
}
m.eventsDurationMetric.With(prometheus.Labels{"operation": "scan"}).
Observe(time.Since(startTime).Seconds())
return lastLedgerInWindow, nil
return
}

// validateRange checks if the range falls within the bounds
Expand Down Expand Up @@ -259,3 +264,10 @@ func readEvents(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (
}
return events, err
}

// GetLedgerRange returns the first and latest ledger available in the store.
func (m *MemoryStore) GetLedgerRange() ledgerbucketwindow.LedgerRange {
m.lock.RLock()
defer m.lock.RUnlock()
return m.eventsByLedger.GetLedgerRange()
}
Loading

0 comments on commit 4c283d4

Please sign in to comment.