Skip to content

Commit

Permalink
rpc: upgrade jsonrpsee v0.23 (#4730)
Browse files Browse the repository at this point in the history
This is PR updates jsonrpsee v0.23 which mainly changes:
- Add `Extensions` which we now is using to get the connection id (used
by the rpc spec v2 impl)
- Update hyper to v1.0, http v1.0, soketto and related crates
(hyper::service::make_service_fn is removed)
- The subscription API for the client is modified to know why a
subscription was closed.

Full changelog here:
https://github.com/paritytech/jsonrpsee/releases/tag/v0.23.0

---------

Co-authored-by: Bastian Köcher <[email protected]>
  • Loading branch information
niklasad1 and bkchr authored Jun 26, 2024
1 parent 20aecad commit 7a2592e
Show file tree
Hide file tree
Showing 18 changed files with 548 additions and 268 deletions.
400 changes: 303 additions & 97 deletions Cargo.lock

Large diffs are not rendered by default.

7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -780,6 +780,7 @@ hkdf = { version = "0.12.0" }
hmac = { version = "0.12.1" }
honggfuzz = { version = "0.5.55" }
http = { version = "0.2.8" }
http-body-util = { version = "0.1.2" }
hyper = { version = "0.14.27", default-features = false }
hyper-rustls = { version = "0.24.0" }
impl-serde = { version = "0.4.0", default-features = false }
Expand All @@ -793,8 +794,8 @@ is_executable = { version = "1.0.1" }
isahc = { version = "1.2" }
itertools = { version = "0.11" }
jsonpath_lib = { version = "0.3" }
jsonrpsee = { version = "0.22.5" }
jsonrpsee-core = { version = "0.22" }
jsonrpsee = { version = "0.23.2" }
jsonrpsee-core = { version = "0.23.2" }
k256 = { version = "0.13.3", default-features = false }
kitchensink-runtime = { path = "substrate/bin/node/runtime" }
kvdb = { version = "0.13.0" }
Expand Down Expand Up @@ -1298,7 +1299,7 @@ tokio-util = { version = "0.7.8" }
toml = { version = "0.8.8" }
toml_edit = { version = "0.19" }
tower = { version = "0.4.13" }
tower-http = { version = "0.4.0" }
tower-http = { version = "0.5.2" }
tracing = { version = "0.1.37", default-features = false }
tracing-core = { version = "0.1.32", default-features = false }
tracing-futures = { version = "0.2.4" }
Expand Down
1 change: 1 addition & 0 deletions bridges/relays/client-substrate/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ jsonrpsee = { features = ["macros", "ws-client"], workspace = true }
log = { workspace = true }
num-traits = { workspace = true, default-features = true }
rand = { workspace = true, default-features = true }
serde_json = { workspace = true }
scale-info = { features = ["derive"], workspace = true, default-features = true }
tokio = { features = ["rt-multi-thread"], workspace = true, default-features = true }
thiserror = { workspace = true }
Expand Down
3 changes: 1 addition & 2 deletions bridges/relays/client-substrate/src/client/subscription.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ use async_std::{
stream::StreamExt,
};
use futures::{FutureExt, Stream};
use jsonrpsee::core::ClientError;
use sp_runtime::DeserializeOwned;
use std::{
fmt::Debug,
Expand Down Expand Up @@ -143,7 +142,7 @@ impl<T: 'static + Clone + DeserializeOwned + Send> Subscription<T> {
/// Create new forwarded subscription.
pub fn new_forwarded(
desc: StreamDescription,
subscription: impl Stream<Item = StdResult<T, ClientError>> + Unpin + Send + 'static,
subscription: impl Stream<Item = StdResult<T, serde_json::Error>> + Unpin + Send + 'static,
) -> Self {
Self {
desc: desc.clone(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
use futures::{channel::mpsc::Sender, prelude::*, stream::FuturesUnordered};
use jsonrpsee::core::client::{
Client as JsonRpseeClient, ClientBuilder, ClientT, Error, ReceivedMessage, TransportReceiverT,
Client as JsonRpseeClient, ClientBuilder, ClientT, ReceivedMessage, TransportReceiverT,
TransportSenderT,
};
use smoldot_light::{ChainId, Client as SmoldotClient, JsonRpcResponses};
Expand Down Expand Up @@ -124,7 +124,7 @@ pub struct LightClientRpcWorker {
}

fn handle_notification(
maybe_header: Option<Result<RelayHeader, Error>>,
maybe_header: Option<Result<RelayHeader, serde_json::Error>>,
senders: &mut Vec<Sender<RelayHeader>>,
) -> Result<(), ()> {
match maybe_header {
Expand Down
25 changes: 25 additions & 0 deletions prdoc/pr_4730.prdoc
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
# Schema: Polkadot SDK PRDoc Schema (prdoc) v1.0.0
# See doc at https://raw.githubusercontent.com/paritytech/polkadot-sdk/master/prdoc/schema_user.json

title: rpc upgrade jsonrpsee to v0.23.1

doc:
- audience: Node Dev
description: |
Upgrade the rpc library jsonrpsee to v0.23.1 to utilize:

- Add Extensions which we now is using to get the connection id (used by the rpc spec v2)
- Update hyper to v1.0, http v1.0, soketto and related crates (hyper::service::make_service_fn is removed)
- The subscription API for the client is modified to know why a subscription was closed.

crates:
- name: sc-rpc-spec-v2
bump: patch
- name: sc-rpc
bump: patch
- name: sc-rpc-server
bump: patch
- name: cumulus-relay-chain-rpc-interface
bump: patch
- name: frame-remote-externalities
bump: patch
12 changes: 8 additions & 4 deletions substrate/client/rpc-servers/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,18 @@ targets = ["x86_64-unknown-linux-gnu"]
forwarded-header-value = { workspace = true }
futures = { workspace = true }
governor = { workspace = true }
http = { workspace = true }
hyper = { workspace = true, default-features = true }
http-body-util = { workspace = true }
ip_network = { workspace = true }
jsonrpsee = { features = ["server"], workspace = true }
log = { workspace = true, default-features = true }
prometheus-endpoint = { workspace = true, default-features = true }
serde = { workspace = true }
serde_json = { workspace = true, default-features = true }
tokio = { features = ["parking_lot"], workspace = true, default-features = true }
tower = { features = ["util"], workspace = true }
tower-http = { features = ["cors"], workspace = true }
tower = { workspace = true, features = ["util"] }
tower-http = { workspace = true, features = ["cors"] }

# Dependencies outside of the polkadot-sdk workspace
# which requires hyper v1 and http v1
http = "1.1"
hyper = "1.3"
82 changes: 48 additions & 34 deletions substrate/client/rpc-servers/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,16 +23,13 @@
pub mod middleware;
pub mod utils;

use std::{
convert::Infallible, error::Error as StdError, net::SocketAddr, num::NonZeroU32, time::Duration,
};
use std::{error::Error as StdError, net::SocketAddr, num::NonZeroU32, sync::Arc, time::Duration};

use hyper::{
server::conn::AddrStream,
service::{make_service_fn, service_fn},
};
use jsonrpsee::{
server::{stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder},
core::BoxError,
server::{
serve_with_graceful_shutdown, stop_channel, ws, PingConfig, StopHandle, TowerServiceBuilder,
},
Methods, RpcModule,
};
use middleware::NodeHealthProxyLayer;
Expand Down Expand Up @@ -97,6 +94,7 @@ struct PerConnection<RpcMiddleware, HttpMiddleware> {
metrics: Option<RpcMetrics>,
tokio_handle: tokio::runtime::Handle,
service_builder: TowerServiceBuilder<RpcMiddleware, HttpMiddleware>,
rate_limit_whitelisted_ips: Arc<Vec<IpNetwork>>,
}

/// Start RPC server listening on given address.
Expand Down Expand Up @@ -124,8 +122,8 @@ where
rate_limit_trust_proxy_headers,
} = config;

let std_listener = TcpListener::bind(addrs.as_slice()).await?.into_std()?;
let local_addr = std_listener.local_addr().ok();
let listener = TcpListener::bind(addrs.as_slice()).await?;
let local_addr = listener.local_addr().ok();
let host_filter = host_filtering(cors.is_some(), local_addr);

let http_middleware = tower::ServiceBuilder::new()
Expand Down Expand Up @@ -161,20 +159,38 @@ where
methods: build_rpc_api(rpc_api).into(),
service_builder: builder.to_service_builder(),
metrics,
tokio_handle,
stop_handle: stop_handle.clone(),
tokio_handle: tokio_handle.clone(),
stop_handle,
rate_limit_whitelisted_ips: Arc::new(rate_limit_whitelisted_ips),
};

let make_service = make_service_fn(move |addr: &AddrStream| {
let cfg = cfg.clone();
let rate_limit_whitelisted_ips = rate_limit_whitelisted_ips.clone();
let ip = addr.remote_addr().ip();

async move {
let cfg = cfg.clone();
let rate_limit_whitelisted_ips = rate_limit_whitelisted_ips.clone();
tokio_handle.spawn(async move {
loop {
let (sock, remote_addr) = tokio::select! {
res = listener.accept() => {
match res {
Ok(s) => s,
Err(e) => {
log::debug!(target: "rpc", "Failed to accept ipv4 connection: {:?}", e);
continue;
}
}
}
_ = cfg.stop_handle.clone().shutdown() => break,
};

let ip = remote_addr.ip();
let cfg2 = cfg.clone();
let svc = tower::service_fn(move |req: http::Request<hyper::body::Incoming>| {
let PerConnection {
methods,
service_builder,
metrics,
tokio_handle,
stop_handle,
rate_limit_whitelisted_ips,
} = cfg2.clone();

Ok::<_, Infallible>(service_fn(move |req| {
let proxy_ip =
if rate_limit_trust_proxy_headers { get_proxy_ip(&req) } else { None };

Expand All @@ -191,9 +207,6 @@ where
rate_limit
};

let PerConnection { service_builder, metrics, tokio_handle, stop_handle, methods } =
cfg.clone();

let is_websocket = ws::is_upgrade_request(&req);
let transport_label = if is_websocket { "ws" } else { "http" };

Expand All @@ -213,7 +226,6 @@ where

let rpc_middleware =
RpcServiceBuilder::new().option_layer(middleware_layer.clone());

let mut svc =
service_builder.set_rpc_middleware(rpc_middleware).build(methods, stop_handle);

Expand All @@ -230,17 +242,19 @@ where
});
}

svc.call(req).await
// https://github.com/rust-lang/rust/issues/102211 the error type can't be inferred
// to be `Box<dyn std::error::Error + Send + Sync>` so we need to convert it to
// a concrete type as workaround.
svc.call(req).await.map_err(|e| BoxError::from(e))
}
}))
}
});

let server = hyper::Server::from_tcp(std_listener)?.serve(make_service);
});

tokio::spawn(async move {
let graceful = server.with_graceful_shutdown(async move { stop_handle.shutdown().await });
let _ = graceful.await;
cfg.tokio_handle.spawn(serve_with_graceful_shutdown(
sock,
svc,
cfg.stop_handle.clone().shutdown(),
));
}
});

log::info!(
Expand Down
42 changes: 23 additions & 19 deletions substrate/client/rpc-servers/src/middleware/node_health.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@ use std::{

use futures::future::FutureExt;
use http::{HeaderValue, Method, StatusCode, Uri};
use hyper::Body;
use jsonrpsee::types::{Response as RpcResponse, ResponseSuccess as RpcResponseSuccess};
use jsonrpsee::{
server::{HttpBody, HttpRequest, HttpResponse},
types::{Response as RpcResponse, ResponseSuccess as RpcResponseSuccess},
};
use tower::Service;

const RPC_SYSTEM_HEALTH_CALL: &str = r#"{"jsonrpc":"2.0","method":"system_health","id":0}"#;
Expand Down Expand Up @@ -57,9 +59,9 @@ impl<S> NodeHealthProxy<S> {
}
}

impl<S> tower::Service<http::Request<Body>> for NodeHealthProxy<S>
impl<S> tower::Service<http::Request<hyper::body::Incoming>> for NodeHealthProxy<S>
where
S: Service<http::Request<Body>, Response = http::Response<Body>>,
S: Service<HttpRequest, Response = HttpResponse>,
S::Response: 'static,
S::Error: Into<Box<dyn Error + Send + Sync>> + 'static,
S::Future: Send + 'static,
Expand All @@ -73,7 +75,8 @@ where
self.0.poll_ready(cx).map_err(Into::into)
}

fn call(&mut self, mut req: http::Request<Body>) -> Self::Future {
fn call(&mut self, req: http::Request<hyper::body::Incoming>) -> Self::Future {
let mut req = req.map(|body| HttpBody::new(body));
let maybe_intercept = InterceptRequest::from_http(&req);

// Modify the request and proxy it to `system_health`
Expand All @@ -88,7 +91,7 @@ where
req.headers_mut().insert(http::header::ACCEPT, HEADER_VALUE_JSON);

// Adjust the body to reflect the method call.
req = req.map(|_| Body::from(RPC_SYSTEM_HEALTH_CALL));
req = req.map(|_| HttpBody::from(RPC_SYSTEM_HEALTH_CALL));
}

// Call the inner service and get a future that resolves to the response.
Expand All @@ -99,7 +102,7 @@ where

Ok(match maybe_intercept {
InterceptRequest::Deny =>
http_response(StatusCode::METHOD_NOT_ALLOWED, Body::empty()),
http_response(StatusCode::METHOD_NOT_ALLOWED, HttpBody::empty()),
InterceptRequest::No => res,
InterceptRequest::Health => {
let health = parse_rpc_response(res.into_body()).await?;
Expand All @@ -108,7 +111,7 @@ where
InterceptRequest::Readiness => {
let health = parse_rpc_response(res.into_body()).await?;
if (!health.is_syncing && health.peers > 0) || !health.should_have_peers {
http_ok_response(Body::empty())
http_ok_response(HttpBody::empty())
} else {
http_internal_error()
}
Expand All @@ -133,27 +136,28 @@ struct Health {
pub should_have_peers: bool,
}

fn http_ok_response<S: Into<hyper::Body>>(body: S) -> hyper::Response<hyper::Body> {
fn http_ok_response<S: Into<HttpBody>>(body: S) -> HttpResponse {
http_response(StatusCode::OK, body)
}

fn http_response<S: Into<hyper::Body>>(
status_code: StatusCode,
body: S,
) -> hyper::Response<hyper::Body> {
hyper::Response::builder()
fn http_response<S: Into<HttpBody>>(status_code: StatusCode, body: S) -> HttpResponse {
HttpResponse::builder()
.status(status_code)
.header(http::header::CONTENT_TYPE, HEADER_VALUE_JSON)
.body(body.into())
.expect("Header is valid; qed")
}

fn http_internal_error() -> hyper::Response<hyper::Body> {
http_response(hyper::StatusCode::INTERNAL_SERVER_ERROR, Body::empty())
fn http_internal_error() -> HttpResponse {
http_response(hyper::StatusCode::INTERNAL_SERVER_ERROR, HttpBody::empty())
}

async fn parse_rpc_response(body: Body) -> Result<Health, Box<dyn Error + Send + Sync + 'static>> {
let bytes = hyper::body::to_bytes(body).await?;
async fn parse_rpc_response(
body: HttpBody,
) -> Result<Health, Box<dyn Error + Send + Sync + 'static>> {
use http_body_util::BodyExt;

let bytes = body.collect().await?.to_bytes();

let raw_rp = serde_json::from_slice::<RpcResponse<Health>>(&bytes)?;
let rp = RpcResponseSuccess::<Health>::try_from(raw_rp)?;
Expand All @@ -178,7 +182,7 @@ enum InterceptRequest {
}

impl InterceptRequest {
fn from_http(req: &http::Request<Body>) -> InterceptRequest {
fn from_http(req: &HttpRequest) -> InterceptRequest {
match req.uri().path() {
"/health" =>
if req.method() == http::Method::GET {
Expand Down
Loading

0 comments on commit 7a2592e

Please sign in to comment.