diff --git a/core/cli/src/commands/dev.rs b/core/cli/src/commands/dev.rs index 2c5d50069..6b7e9e1df 100644 --- a/core/cli/src/commands/dev.rs +++ b/core/cli/src/commands/dev.rs @@ -38,7 +38,7 @@ where // by default loads or creates if hmac_secret_path.is_none() the secret from ~/.lightning let url = format!("http://127.0.0.1:{}/admin", port); let secret = lightning_rpc::load_hmac_secret(hmac_secret_path)?; - let client = lightning_rpc::clients::RpcClient::new(&url, Some(&secret)).await?; + let client = lightning_rpc::RpcClient::new(&url, Some(&secret)).await?; for path in &input { if let Some(path) = path.to_str() { diff --git a/core/cli/src/commands/opt.rs b/core/cli/src/commands/opt.rs index 871e6d34b..4e507d1b6 100644 --- a/core/cli/src/commands/opt.rs +++ b/core/cli/src/commands/opt.rs @@ -273,7 +273,7 @@ pub async fn send_txn(update_request: UpdateRequest, nodes: &[NodeInfo]) -> Resu let client = RpcClient::new_no_auth(&rpc_address)?; Ok( - lightning_rpc::clients::FleekRpcClient::send_txn(&client, update_request.into()) + lightning_rpc::Fleek::send_txn(&client, update_request.into()) .await .map(|_| ())?, ) @@ -294,7 +294,7 @@ pub async fn get_node_info_from_genesis_commitee( .ok()?; Some(( - lightning_rpc::clients::FleekRpcClient::get_node_info_epoch(&client, public_key) + lightning_rpc::Fleek::get_node_info_epoch(&client, public_key) .await .ok()?, address, @@ -330,9 +330,7 @@ pub async fn get_epoch_info_from_genesis_commitee( .ok()?; Some(( - lightning_rpc::clients::FleekRpcClient::get_epoch_info(&client) - .await - .ok()?, + lightning_rpc::Fleek::get_epoch_info(&client).await.ok()?, address, )) }; diff --git a/core/e2e/src/utils/mod.rs b/core/e2e/src/utils/mod.rs index 016af3a75..9e0eff05f 100644 --- a/core/e2e/src/utils/mod.rs +++ b/core/e2e/src/utils/mod.rs @@ -1,3 +1,2 @@ pub mod networking; -pub mod rpc; pub mod shutdown; diff --git a/core/e2e/src/utils/rpc.rs b/core/e2e/src/utils/rpc.rs deleted file mode 100644 index 4b6b933bd..000000000 --- a/core/e2e/src/utils/rpc.rs +++ /dev/null @@ -1,35 +0,0 @@ -use anyhow::Result; -use reqwest::{Client, Response}; -use serde::de::DeserializeOwned; -use serde::{Deserialize, Serialize}; -use serde_json::Value; - -pub async fn parse_response(response: Response) -> Result { - if !response.status().is_success() { - panic!("Request failed with status: {}", response.status()); - } - let value: Value = response.json().await?; - if value.get("result").is_some() { - let success_res: RpcSuccessResponse = serde_json::from_value(value)?; - Ok(success_res.result) - } else { - panic!("Rpc Error: {value}") - } -} - -pub async fn rpc_request(address: String, request: String) -> Result { - let client = Client::new(); - Ok(client - .post(address) - .header("Content-Type", "application/json") - .body(request) - .send() - .await?) -} - -#[derive(Serialize, Deserialize, Debug)] -pub struct RpcSuccessResponse { - jsonrpc: String, - id: usize, - result: T, -} diff --git a/core/e2e/tests/checkpoint.rs b/core/e2e/tests/checkpoint.rs index 27aa214a7..c93de26ac 100644 --- a/core/e2e/tests/checkpoint.rs +++ b/core/e2e/tests/checkpoint.rs @@ -3,12 +3,10 @@ use std::time::{Duration, SystemTime}; use anyhow::Result; use lightning_e2e::swarm::Swarm; -use lightning_e2e::utils::rpc; -use lightning_interfaces::types::Epoch; +use lightning_rpc::{Fleek, RpcClient}; use lightning_test_utils::config::LIGHTNING_TEST_HOME_DIR; use lightning_test_utils::logging; use resolved_pathbuf::ResolvedPathBuf; -use serde_json::json; use serial_test::serial; #[tokio::test] @@ -39,38 +37,18 @@ async fn e2e_checkpoint() -> Result<()> { // Wait for the epoch to change. tokio::time::sleep(Duration::from_secs(35)).await; - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_epoch", - "params":[], - "id":1, - }); for (_, address) in swarm.get_rpc_addresses() { - let response = rpc::rpc_request(address, request.to_string()) - .await - .unwrap(); + let client = RpcClient::new_no_auth(&address)?; + let epoch = client.get_epoch().await?; - let epoch = rpc::parse_response::(response) - .await - .expect("Failed to parse response."); assert_eq!(epoch, 1); } - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_last_epoch_hash", - "params":[], - "id":1, - }); let mut target_hash = None; for (_, address) in swarm.get_rpc_addresses() { - let response = rpc::rpc_request(address, request.to_string()) - .await - .unwrap(); + let client = RpcClient::new_no_auth(&address)?; + let (epoch_hash, _) = client.get_last_epoch_hash().await?; - let (epoch_hash, _) = rpc::parse_response::<([u8; 32], Epoch)>(response) - .await - .expect("Failed to parse response."); if target_hash.is_none() { target_hash = Some(epoch_hash); // Make sure that we stored an epoch hash. diff --git a/core/e2e/tests/epoch_change.rs b/core/e2e/tests/epoch_change.rs index 4b8545379..adaa33a91 100644 --- a/core/e2e/tests/epoch_change.rs +++ b/core/e2e/tests/epoch_change.rs @@ -6,12 +6,11 @@ use anyhow::Result; use fleek_crypto::NodePublicKey; use hp_fixed::unsigned::HpUfixed; use lightning_e2e::swarm::{Swarm, SwarmNode}; -use lightning_e2e::utils::rpc; use lightning_interfaces::types::Staking; +use lightning_rpc::{Fleek, RpcClient}; use lightning_test_utils::config::LIGHTNING_TEST_HOME_DIR; use lightning_test_utils::logging; use resolved_pathbuf::ResolvedPathBuf; -use serde_json::json; use serial_test::serial; #[tokio::test] @@ -43,20 +42,10 @@ async fn e2e_epoch_change_all_nodes_on_committee() -> Result<()> { // Wait a bit for the nodes to start. tokio::time::sleep(Duration::from_secs(5)).await; - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_epoch", - "params":[], - "id":1, - }); for (_, address) in swarm.get_rpc_addresses() { - let response = rpc::rpc_request(address, request.to_string()) - .await - .unwrap(); + let client = RpcClient::new_no_auth(&address)?; - let epoch = rpc::parse_response::(response) - .await - .expect("Failed to parse response."); + let epoch = client.get_epoch().await?; assert_eq!(epoch, 0); } @@ -64,20 +53,10 @@ async fn e2e_epoch_change_all_nodes_on_committee() -> Result<()> { // To give some time for the epoch change, we will wait another 30 seconds here. tokio::time::sleep(Duration::from_secs(30)).await; - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_epoch", - "params":[], - "id":1, - }); for (_, address) in swarm.get_rpc_addresses() { - let response = rpc::rpc_request(address, request.to_string()) - .await - .unwrap(); + let client = RpcClient::new_no_auth(&address)?; - let epoch = rpc::parse_response::(response) - .await - .expect("Failed to parse response."); + let epoch = client.get_epoch().await?; assert_eq!(epoch, 1); } @@ -115,20 +94,10 @@ async fn e2e_epoch_change_with_edge_node() -> Result<()> { // Wait a bit for the nodes to start. tokio::time::sleep(Duration::from_secs(5)).await; - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_epoch", - "params":[], - "id":1, - }); for (_, address) in swarm.get_rpc_addresses() { - let response = rpc::rpc_request(address, request.to_string()) - .await - .unwrap(); + let client = RpcClient::new_no_auth(&address)?; + let epoch = client.get_epoch().await?; - let epoch = rpc::parse_response::(response) - .await - .expect("Failed to parse response."); assert_eq!(epoch, 0); } @@ -136,20 +105,10 @@ async fn e2e_epoch_change_with_edge_node() -> Result<()> { // To give some time for the epoch change, we will wait another 30 seconds here. tokio::time::sleep(Duration::from_secs(30)).await; - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_epoch", - "params":[], - "id":1, - }); for (_key, address) in swarm.get_rpc_addresses() { - let response = rpc::rpc_request(address, request.to_string()) - .await - .unwrap(); + let client = RpcClient::new_no_auth(&address)?; - let epoch = rpc::parse_response::(response) - .await - .expect("Failed to parse response."); + let epoch = client.get_epoch().await?; assert_eq!(epoch, 1); } @@ -282,59 +241,23 @@ async fn e2e_test_staking_auction() -> Result<()> { // Wait for epoch to change. tokio::time::sleep(Duration::from_secs(30)).await; - // Get the new committee after the epoch change - let committee_member_request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_committee_members", - "params":[], - "id":1, - }); - - let response = rpc::rpc_request(rpc_endpoint.1.clone(), committee_member_request.to_string()) - .await - .unwrap(); - - let current_committee: BTreeSet = - rpc::parse_response::>(response) - .await - .expect("Failed to parse response.") - .into_iter() - .collect(); + let client = RpcClient::new_no_auth(&rpc_endpoint.1)?; + let response = client.get_committee_members(None).await?; + let current_committee: BTreeSet = response.into_iter().collect(); current_committee .iter() .for_each(|node| println!("{:?}", node)); - // Figure out the rep of our low staked nodes so we know which one shouldnt be on the committee - let rep_request_one = json!({ - "jsonrpc": "2.0", - "method":"flk_get_reputation", - "params":[low_stake_nodes[0].clone()], - "id":1, - }); - let rep_request_two = json!({ - "jsonrpc": "2.0", - "method":"flk_get_reputation", - "params":[low_stake_nodes[1].clone()], - "id":1, - }); - - let response_one = rpc::rpc_request(rpc_endpoint.1.clone(), rep_request_one.to_string()) - .await - .unwrap(); - let response_two = rpc::rpc_request(rpc_endpoint.1.clone(), rep_request_two.to_string()) - .await - .unwrap(); - - let rep_one: Option = rpc::parse_response::>(response_one) - .await - .expect("Failed to parse response."); - let rep_two: Option = rpc::parse_response::>(response_two) - .await - .expect("Failed to parse response."); + let rep_one = client + .get_reputation(low_stake_nodes[0].clone(), None) + .await?; + let rep_two = client + .get_reputation(low_stake_nodes[1].clone(), None) + .await?; // Make sure the lower reputation node lost the tiebreaker and is not on the active node list - if rep_one.unwrap() <= rep_two.unwrap() { + if rep_one <= rep_two { assert!(!current_committee.contains(low_stake_nodes[0])); } else { assert!(!current_committee.contains(low_stake_nodes[1])); @@ -348,24 +271,15 @@ async fn compare_committee( rpc_addresses: HashMap, committee_size: usize, ) -> BTreeSet { - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_committee_members", - "params":[], - "id":1, - }); - let rpc_addresses: Vec<(NodePublicKey, String)> = rpc_addresses.into_iter().collect(); - let response = rpc::rpc_request(rpc_addresses[0].1.clone(), request.to_string()) + let client = RpcClient::new_no_auth(&rpc_addresses[0].1).unwrap(); + let target_committee: BTreeSet<_> = client + .get_committee_members(None) .await - .unwrap(); - let target_committee: BTreeSet = - rpc::parse_response::>(response) - .await - .expect("Failed to parse response.") - .into_iter() - .collect(); + .unwrap() + .into_iter() + .collect(); // Make sure that the committee size equals the configured size. assert_eq!(target_committee.len(), committee_size); @@ -374,15 +288,14 @@ async fn compare_committee( if &rpc_addresses[0].1 == address { continue; } - let response = rpc::rpc_request(address.clone(), request.to_string()) + let client = RpcClient::new_no_auth(&address).unwrap(); + + let committee: BTreeSet<_> = client + .get_committee_members(None) .await - .unwrap(); - let committee: BTreeSet = - rpc::parse_response::>(response) - .await - .expect("Failed to parse response.") - .into_iter() - .collect(); + .unwrap() + .into_iter() + .collect(); // Make sure all nodes have the same committee assert_eq!(target_committee, committee); diff --git a/core/e2e/tests/pinger.rs b/core/e2e/tests/pinger.rs index 83df27c62..7ade42a82 100644 --- a/core/e2e/tests/pinger.rs +++ b/core/e2e/tests/pinger.rs @@ -3,12 +3,12 @@ use std::time::{Duration, SystemTime}; use anyhow::Result; use lightning_e2e::swarm::Swarm; -use lightning_e2e::utils::rpc; -use lightning_interfaces::types::{NodeInfo, Participation}; +use lightning_interfaces::types::Participation; +use lightning_rpc::api::RpcClient; +use lightning_rpc::Fleek; use lightning_test_utils::config::LIGHTNING_TEST_HOME_DIR; use lightning_test_utils::logging; use resolved_pathbuf::ResolvedPathBuf; -use serde_json::json; use serial_test::serial; #[tokio::test] @@ -40,21 +40,10 @@ async fn e2e_detect_offline_node() -> Result<()> { // Wait for the epoch to change. tokio::time::sleep(Duration::from_secs(30)).await; - // Make sure that all genesis nodes changed epoch. - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_epoch", - "params":[], - "id":1, - }); for (_, address) in swarm.get_genesis_committee_rpc_addresses() { - let response = rpc::rpc_request(address, request.to_string()) - .await - .unwrap(); + let client = RpcClient::new_no_auth(&address)?; + let epoch = client.get_epoch().await?; - let epoch = rpc::parse_response::(response) - .await - .expect("Failed to parse response."); assert_eq!(epoch, 1); } @@ -66,21 +55,13 @@ async fn e2e_detect_offline_node() -> Result<()> { .unwrap(); // Make sure that the offline node was removed from participation. - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_node_info", - "params": {"public_key": pubkey}, - "id":1, - }); for (_, address) in swarm.get_genesis_committee_rpc_addresses() { - let response = rpc::rpc_request(address, request.to_string()) - .await - .unwrap(); + let client = RpcClient::new_no_auth(&address)?; + let node_info = client + .get_node_info(pubkey, None) + .await? + .expect("No node info recieved from rpc"); - let node_info = rpc::parse_response::>(response) - .await - .expect("Failed to parse response.") - .unwrap(); assert_eq!(node_info.participation, Participation::False); } diff --git a/core/e2e/tests/syncronizer.rs b/core/e2e/tests/syncronizer.rs index eb3c29c89..8354defd3 100644 --- a/core/e2e/tests/syncronizer.rs +++ b/core/e2e/tests/syncronizer.rs @@ -4,12 +4,12 @@ use std::time::{Duration, SystemTime}; use anyhow::Result; use fleek_blake3 as blake3; use lightning_e2e::swarm::Swarm; -use lightning_e2e::utils::rpc; use lightning_interfaces::prelude::*; +use lightning_rpc::api::RpcClient; +use lightning_rpc::Fleek; use lightning_test_utils::config::LIGHTNING_TEST_HOME_DIR; use lightning_test_utils::logging; use resolved_pathbuf::ResolvedPathBuf; -use serde_json::json; use serial_test::serial; #[tokio::test] @@ -43,21 +43,10 @@ async fn e2e_syncronize_state() -> Result<()> { // Wait for the epoch to change. tokio::time::sleep(Duration::from_secs(20)).await; - // Make sure that all genesis nodes changed epoch. - let request = json!({ - "jsonrpc": "2.0", - "method":"flk_get_epoch", - "params":[], - "id":1, - }); for (_, address) in swarm.get_genesis_committee_rpc_addresses() { - let response = rpc::rpc_request(address, request.to_string()) - .await - .unwrap(); + let client = RpcClient::new_no_auth(&address)?; + let epoch = client.get_epoch().await?; - let epoch = rpc::parse_response::(response) - .await - .expect("Failed to parse response."); assert_eq!(epoch, 1); } diff --git a/core/rpc/src/client.rs b/core/rpc/src/client.rs new file mode 100644 index 000000000..873063499 --- /dev/null +++ b/core/rpc/src/client.rs @@ -0,0 +1,385 @@ +use std::pin::Pin; +use std::sync::atomic::{AtomicU32, Ordering}; +use std::sync::Arc; + +use anyhow::Context; +use futures::Stream; +use hyper::header::CONTENT_TYPE; +use hyper::Body; +use jsonrpsee::core::client::{BatchResponse, ClientT, Subscription, SubscriptionClientT}; +use jsonrpsee::core::params::BatchRequestBuilder; +use jsonrpsee::core::traits::ToRpcParams; +use jsonrpsee::http_client::transport::HttpBackend; +use jsonrpsee::http_client::HttpClient; +use lightning_utils::rpc::get_timestamp; +use serde::de::DeserializeOwned; +use tower::ServiceBuilder; + +use crate::server::{LIGHTINING_HMAC_HEADER, LIGHTINING_NONCE_HEADER, LIGHTINING_TIMESTAMP_HEADER}; + +pub fn make_plain_rpc_client(address: &str) -> anyhow::Result> { + HttpClient::::builder() + .build(address) + .context(format!("Trying to build rpc client for {address}")) +} + +pub enum RpcClient { + WithHmac(HmacClient), + Http(HttpClient), +} + +impl RpcClient { + pub async fn new(address: &str, key: Option<&[u8; 32]>) -> anyhow::Result { + match key { + Some(key) => { + if !address.ends_with("/admin") { + return Err(anyhow::anyhow!( + "HMAC is only supported for /admin endpoints" + )); + } + + Ok(Self::WithHmac(HmacClient::new(address, key).await?)) + }, + None => Ok(Self::Http(make_plain_rpc_client(address)?)), + } + } + + pub fn new_no_auth(address: &str) -> anyhow::Result { + Ok(Self::Http(make_plain_rpc_client(address)?)) + } +} + +pub struct HmacClient { + client: HttpClient>, + nonce_client: reqwest::Client, + address: String, + nonce: Arc, +} + +impl HmacClient { + /// Create a new client with HMAC support, fetching the curernt nonce + pub async fn new(address: &str, key: &[u8; 32]) -> anyhow::Result { + if !address.ends_with("/admin") { + return Err(anyhow::anyhow!( + "HMAC is only supported for /admin endpoints" + )); + } + + // fetch the latest nonce + let nonce_client = reqwest::Client::new(); + let nonce = Arc::new(AtomicU32::new(get_nonce(&nonce_client, address).await?)); + let middleware = HmacMiddlewareLayer::new(nonce.clone(), *key); + + // create the backend + let backend = HttpClient::::builder() + .set_http_middleware(ServiceBuilder::new().layer(middleware)) + .build(address)?; + + Ok(Self { + client: backend, + nonce_client, + address: address.to_string(), + nonce, + }) + } + + /// Aligns the current nonce with the state of the server + pub async fn sync_nonce(&mut self) -> anyhow::Result<()> { + let nonce = get_nonce(&self.nonce_client, &self.address).await?; + + // Relaxed, we have a mut ref to self + self.nonce.store(nonce, Ordering::Relaxed); + Ok(()) + } +} + +async fn get_nonce(client: &reqwest::Client, address: &str) -> anyhow::Result { + Ok(client + .get(format!("{}/nonce", address)) + .send() + .await? + .text() + .await? + .parse()?) +} + +pub struct HmacMiddlewareLayer { + nonce: Arc, + key: [u8; 32], +} + +impl HmacMiddlewareLayer { + pub fn new(nonce: Arc, key: [u8; 32]) -> Self { + Self { nonce, key } + } +} + +impl tower::Layer for HmacMiddlewareLayer { + type Service = HmacMiddleware; + + fn layer(&self, inner: S) -> Self::Service { + HmacMiddleware { + nonce: self.nonce.clone(), + key: Arc::new(self.key), + inner, + } + } +} + +#[derive(Clone)] +pub struct HmacMiddleware { + nonce: Arc, + key: Arc<[u8; 32]>, + inner: S, +} + +impl HmacMiddleware { + fn add_hmac_headers(&self, req: hyper::Request) -> hyper::Request { + let content_type = req.headers().get(CONTENT_TYPE).cloned(); + + // check that the content type is what we expct + if let Some(content_type) = content_type { + if content_type != "application/json" { + return req; + } + } else { + return req; + } + + let (mut parts, mut body) = req.into_parts(); + + // we need to poll to get the full body, if the body is in the normal form of a one time + // blob of bytes then this should neven return pending. + let dummy_waker = futures::task::noop_waker(); + let dummy_cx = &mut std::task::Context::from_waker(&dummy_waker); + if let std::task::Poll::Ready(bytes) = Pin::new(&mut body).poll_next(dummy_cx) { + match bytes { + // we got a resposne the first poll and we check the content type + // already so we know this is the full body + // surely + Some(Ok(b)) => { + let buf = b.to_vec(); + + let timestamp = get_timestamp(); + let nonce = self.nonce.fetch_add(1, Ordering::AcqRel); + let hmac = super::create_hmac(&self.key, &buf, timestamp, nonce); + + parts + .headers + .insert(LIGHTINING_HMAC_HEADER, hmac.parse().unwrap()); + parts.headers.insert( + LIGHTINING_TIMESTAMP_HEADER, + timestamp.to_string().parse().unwrap(), + ); + parts + .headers + .insert(LIGHTINING_NONCE_HEADER, nonce.to_string().parse().unwrap()); + + return hyper::Request::from_parts(parts, hyper::Body::from(b)); + }, + // theres no body, lets replace an empty body so we dont get any + // unexpected behavior + None => { + return hyper::Request::from_parts(parts, hyper::Body::empty()); + }, + // no op, its only the first poll, just pass this through + Some(Err(_)) => {}, + } + }; + + #[cfg(debug_assertions)] + { + let dummy_waker = futures::task::noop_waker(); + let dummy_cx = &mut std::task::Context::from_waker(&dummy_waker); + + match Pin::new(&mut body).poll_next(dummy_cx) { + std::task::Poll::Ready(Some(_)) => { + tracing::warn!(target = "debug_check", "bytes returned another some value"); + }, + std::task::Poll::Pending => { + tracing::warn!(target = "debug_check", "body is pending"); + }, + std::task::Poll::Ready(None) => {}, + } + } + + hyper::Request::from_parts(parts, body) + } +} + +impl tower::Service> for HmacMiddleware +where + S: tower::Service, Response = Res>, + S: Clone, +{ + type Error = S::Error; + type Response = Res; + type Future = S::Future; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: hyper::Request) -> Self::Future { + self.inner.call(self.add_hmac_headers(req)) + } +} + +#[async_trait::async_trait] +impl ClientT for RpcClient +where + HttpClient>: ClientT, +{ + async fn notification( + &self, + method: &str, + params: Params, + ) -> Result<(), jsonrpsee::core::client::Error> + where + Params: ToRpcParams + Send, + { + match self { + Self::Http(client) => client.notification(method, params).await, + Self::WithHmac(client) => client.notification(method, params).await, + } + } + + async fn request( + &self, + method: &str, + params: Params, + ) -> Result + where + R: DeserializeOwned, + Params: ToRpcParams + Send, + { + match self { + Self::Http(client) => client.request(method, params).await, + Self::WithHmac(client) => client.request(method, params).await, + } + } + + async fn batch_request<'a, R>( + &self, + batch: BatchRequestBuilder<'a>, + ) -> Result, jsonrpsee::core::client::Error> + where + R: DeserializeOwned + std::fmt::Debug + 'a, + { + match self { + Self::Http(client) => client.batch_request(batch).await, + Self::WithHmac(client) => client.batch_request(batch).await, + } + } +} + +/// Implementations needed to make it friendly with the jsonrpsee trait bounds +#[async_trait::async_trait] +impl ClientT for HmacClient +where + HttpClient>: ClientT, +{ + async fn notification( + &self, + method: &str, + params: Params, + ) -> Result<(), jsonrpsee::core::client::Error> + where + Params: ToRpcParams + Send, + { + self.client.notification(method, params).await + } + + async fn request( + &self, + method: &str, + params: Params, + ) -> Result + where + R: DeserializeOwned, + Params: ToRpcParams + Send, + { + self.client.request(method, params).await + } + + async fn batch_request<'a, R>( + &self, + batch: BatchRequestBuilder<'a>, + ) -> Result, jsonrpsee::core::client::Error> + where + R: DeserializeOwned + std::fmt::Debug + 'a, + { + self.client.batch_request(batch).await + } +} + +#[async_trait::async_trait] +impl SubscriptionClientT for HmacClient { + async fn subscribe<'a, Notif, Params>( + &self, + subscribe_method: &'a str, + params: Params, + unsubscribe_method: &'a str, + ) -> Result, jsonrpsee::core::client::Error> + where + Params: ToRpcParams + Send, + Notif: DeserializeOwned, + { + self.client + .subscribe(subscribe_method, params, unsubscribe_method) + .await + } + + async fn subscribe_to_method<'a, Notif>( + &self, + method: &'a str, + ) -> Result, jsonrpsee::core::client::Error> + where + Notif: DeserializeOwned, + { + self.client.subscribe_to_method(method).await + } +} + +#[async_trait::async_trait] +impl SubscriptionClientT for RpcClient { + async fn subscribe<'a, Notif, Params>( + &self, + subscribe_method: &'a str, + params: Params, + unsubscribe_method: &'a str, + ) -> Result, jsonrpsee::core::client::Error> + where + Params: ToRpcParams + Send, + Notif: DeserializeOwned, + { + match self { + Self::Http(client) => { + client + .subscribe(subscribe_method, params, unsubscribe_method) + .await + }, + Self::WithHmac(client) => { + client + .subscribe(subscribe_method, params, unsubscribe_method) + .await + }, + } + } + + async fn subscribe_to_method<'a, Notif>( + &self, + method: &'a str, + ) -> Result, jsonrpsee::core::client::Error> + where + Notif: DeserializeOwned, + { + match self { + Self::Http(client) => client.subscribe_to_method(method).await, + Self::WithHmac(client) => client.subscribe_to_method(method).await, + } + } +} diff --git a/core/rpc/src/lib.rs b/core/rpc/src/lib.rs index c602dc6da..e12a4d061 100644 --- a/core/rpc/src/lib.rs +++ b/core/rpc/src/lib.rs @@ -22,21 +22,20 @@ use crate::error::RPCError; use crate::logic::AdminApi; pub use crate::logic::{EthApi, FleekApi, NetApi}; -pub mod api; -pub mod clients { - pub use super::api::{ - make_plain_rpc_client, - AdminApiClient as AdminRpcClient, - EthApiClient as EthRpcClient, - FleekApiClient as FleekRpcClient, - HmacClient as HmacRpcClient, - NetApiClient as NetRpcClient, - RpcClient, - }; +pub mod client; - pub type JsonRpcClient = - jsonrpsee::http_client::HttpClient; -} +pub mod api; +pub use api::{ + make_plain_rpc_client, + AdminApiClient as Admin, + EthApiClient as Eth, + FleekApiClient as Fleek, + NetApiClient as Net, +}; +pub use client::{HmacClient, RpcClient}; + +pub type JsonRpcClient = + jsonrpsee::http_client::HttpClient; pub mod api_types; pub mod config;