From 34cf2c667ab60dcab14af3b0bc9a13698ca26866 Mon Sep 17 00:00:00 2001 From: N Date: Wed, 26 Jun 2024 11:48:04 -0400 Subject: [PATCH] chore: synchonizer uses typed rpc --- Cargo.lock | 1 + core/rpc/src/lib.rs | 2 +- core/rpc/src/utils.rs | 58 ------- core/syncronizer/Cargo.toml | 1 + core/syncronizer/src/rpc.rs | 244 +++++++++++++--------------- core/syncronizer/src/syncronizer.rs | 42 +---- core/syncronizer/src/utils.rs | 16 +- 7 files changed, 128 insertions(+), 236 deletions(-) delete mode 100644 core/rpc/src/utils.rs diff --git a/Cargo.lock b/Cargo.lock index 9851ff559..34aaefe33 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -6870,6 +6870,7 @@ dependencies = [ "humantime-serde", "lightning-interfaces", "lightning-metrics", + "lightning-rpc", "lightning-utils", "rand", "reqwest", diff --git a/core/rpc/src/lib.rs b/core/rpc/src/lib.rs index e12a4d061..589b44454 100644 --- a/core/rpc/src/lib.rs +++ b/core/rpc/src/lib.rs @@ -42,8 +42,8 @@ pub mod config; pub mod error; mod logic; mod server; -pub mod utils; pub use server::create_hmac; + #[cfg(test)] mod tests; diff --git a/core/rpc/src/utils.rs b/core/rpc/src/utils.rs deleted file mode 100644 index ace6532ed..000000000 --- a/core/rpc/src/utils.rs +++ /dev/null @@ -1,58 +0,0 @@ -use anyhow::anyhow; -pub use lightning_utils::rpc::make_request; -use lightning_utils::rpc::{get_admin_nonce, get_timestamp, RpcSuccessResponse}; -use reqwest::header::HeaderMap; -use serde::de::DeserializeOwned; - -use crate::create_hmac; - -/// todo(n): -/// -/// jsonrpsee creats typed clients for each rpc method -/// we can probaly expose those here (or somewhere) and remove anyone from using this -/// we would need to override the admin methods to use the hmac -pub async fn rpc_request( - client: &reqwest::Client, - address: String, - req: String, - hmac: Option<&[u8; 32]>, -) -> anyhow::Result> { - let mut headers = HeaderMap::new(); - headers.insert("Content-Type", "application/json".parse().unwrap()); - - if let Some(secret) = hmac { - let timestamp = get_timestamp(); - let nonce = get_admin_nonce(client, address.clone()).await?; - let hmac = create_hmac(secret, req.as_bytes(), timestamp, nonce); - - headers.insert("X-Lightning-HMAC", hmac.parse().unwrap()); - headers.insert( - "X-Lightning-Timestamp", - timestamp.to_string().parse().unwrap(), - ); - headers.insert("X-Lightning-Nonce", nonce.to_string().parse().unwrap()); - } - - let res = client - .post(address) - .headers(headers) - .body(req) - .send() - .await?; - - if res.status().is_success() { - let value: serde_json::Value = res.json().await?; - if value.get("result").is_some() { - let value: RpcSuccessResponse = serde_json::from_value(value)?; - Ok(value) - } else { - Err(anyhow!("Failed to parse response")) - } - } else { - Err(anyhow!( - "Request failed with status: {}, err {}", - res.status(), - res.text().await? - )) - } -} diff --git a/core/syncronizer/Cargo.toml b/core/syncronizer/Cargo.toml index 9fbca9b4c..ae035308e 100644 --- a/core/syncronizer/Cargo.toml +++ b/core/syncronizer/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true lightning-interfaces = { path = "../interfaces" } lightning-utils = { path = "../utils" } lightning-metrics = { path = "../metrics" } +lightning-rpc = { path = "../rpc" } futures.workspace = true anyhow.workspace = true tracing.workspace = true diff --git a/core/syncronizer/src/rpc.rs b/core/syncronizer/src/rpc.rs index f195fe75b..7c6517102 100644 --- a/core/syncronizer/src/rpc.rs +++ b/core/syncronizer/src/rpc.rs @@ -1,67 +1,13 @@ use std::future::Future; -use std::net::IpAddr; use anyhow::{anyhow, Result}; use fleek_crypto::NodePublicKey; +use futures::stream::FuturesOrdered; +use futures::StreamExt; use lightning_interfaces::types::{Epoch, EpochInfo, NodeIndex, NodeInfo}; -use serde::de::DeserializeOwned; +use lightning_rpc::{Fleek, RpcClient}; use tokio::runtime::Handle; -pub async fn rpc_request( - client: &reqwest::Client, - ip: IpAddr, - port: u16, - req: String, -) -> Result> { - let res = client - .post(format!("http://{ip}:{port}/rpc/v0")) - .header("Content-Type", "application/json") - .body(req) - .send() - .await?; - if res.status().is_success() { - let value: serde_json::Value = res.json().await?; - if value.get("result").is_some() { - let value: RpcResponse = serde_json::from_value(value)?; - Ok(value) - } else { - Err(anyhow!("Failed to parse response")) - } - } else { - Err(anyhow!("Request failed with status: {}", res.status())) - } -} - -pub async fn ask_nodes( - req: String, - nodes: &[(NodeIndex, NodeInfo)], - rpc_client: &reqwest::Client, -) -> Result> { - let mut futs = Vec::new(); - for (_, node) in nodes { - let req_clone = req.clone(); - let fut = async move { - rpc_request::(rpc_client, node.domain, node.ports.rpc, req_clone) - .await - .ok() - }; - futs.push(fut); - } - - let results: Vec = futures::future::join_all(futs) - .await - .into_iter() - .flatten() - .map(|x| x.result) - .collect(); - - if results.is_empty() { - Err(anyhow!("Unable to get a response from nodes")) - } else { - Ok(results) - } -} - /// Runs the given future to completion on the current tokio runtime. /// This call is intentionally blocking. pub fn sync_call(fut: F) -> F::Output @@ -76,12 +22,9 @@ where } /// Returns the epoch info from the epoch the bootstrap nodes are on -pub async fn get_epoch_info( - nodes: Vec<(NodeIndex, NodeInfo)>, - rpc_client: reqwest::Client, -) -> Result { - let mut epochs: Vec = - ask_nodes(rpc_epoch_info().to_string(), &nodes, &rpc_client).await?; +pub async fn get_epoch_info(nodes: Vec<(NodeIndex, NodeInfo)>) -> Result { + let mut epochs = ask_epoch_info(nodes).await; + if epochs.is_empty() { return Err(anyhow!("Failed to get epoch info from bootstrap nodes")); } @@ -89,38 +32,68 @@ pub async fn get_epoch_info( Ok(epochs.pop().unwrap()) } +/// A list of the nodes reported epoch info +/// +/// ### Empty if all the requests fail +pub async fn ask_epoch_info(nodes: Vec<(NodeIndex, NodeInfo)>) -> Vec { + nodes + .into_iter() + .map(|(_, node)| async move { + let client = + RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc)) + .ok()?; + + client.get_epoch_info().await.ok() + }) + .collect::>() + .filter_map(std::future::ready) + .collect::>() + .await +} + /// Returns the node info for our node, if it's already on the state. pub async fn get_node_info( node_public_key: NodePublicKey, nodes: Vec<(NodeIndex, NodeInfo)>, - rpc_client: reqwest::Client, ) -> Result> { - let mut node_info: Vec<(Option, Epoch)> = ask_nodes( - rpc_node_info(&node_public_key).to_string(), - &nodes, - &rpc_client, - ) - .await?; + let mut node_info = ask_node_info(nodes, node_public_key).await; if node_info.is_empty() { return Err(anyhow!("Failed to get node info from bootstrap nodes")); } + node_info.sort_by(|(_, a), (_, b)| a.partial_cmp(b).unwrap()); Ok(node_info.pop().unwrap().0) } +/// A list of the nodes reported node info +/// +/// ### Empty if all the requests fail +pub async fn ask_node_info( + nodes: Vec<(NodeIndex, NodeInfo)>, + pk: NodePublicKey, +) -> Vec<(Option, Epoch)> { + nodes + .into_iter() + .map(|(_, node)| async move { + let client = + RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc)) + .ok()?; + + client.get_node_info_epoch(pk).await.ok() + }) + .collect::>() + .filter_map(std::future::ready) + .collect::>() + .await +} + /// Returns the node info for our node, if it's already on the state. pub async fn check_is_valid_node( node_public_key: NodePublicKey, nodes: Vec<(NodeIndex, NodeInfo)>, - rpc_client: reqwest::Client, ) -> Result { - let mut is_valid: Vec<(bool, Epoch)> = ask_nodes( - rpc_is_valid_node(&node_public_key).to_string(), - &nodes, - &rpc_client, - ) - .await?; + let mut is_valid = ask_is_valid_node(nodes, node_public_key).await; if is_valid.is_empty() { return Err(anyhow!("Failed to get node validity from bootstrap nodes")); @@ -129,13 +102,31 @@ pub async fn check_is_valid_node( Ok(is_valid.pop().unwrap().0) } +/// Ask the nodes if the given node is valid +/// +/// ### Empty if all the requests fail +pub async fn ask_is_valid_node( + nodes: Vec<(NodeIndex, NodeInfo)>, + pk: NodePublicKey, +) -> Vec<(bool, Epoch)> { + nodes + .into_iter() + .map(|(_, node)| async move { + let client = + RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc)) + .ok()?; + + client.is_valid_node_epoch(pk).await.ok() + }) + .collect::>() + .filter_map(std::future::ready) + .collect::>() + .await +} + /// Returns the hash of the last epoch ckpt, and the current epoch. -pub async fn last_epoch_hash( - nodes: &[(NodeIndex, NodeInfo)], - rpc_client: &reqwest::Client, -) -> Result<[u8; 32]> { - let mut hash: Vec<([u8; 32], Epoch)> = - ask_nodes(rpc_last_epoch_hash().to_string(), nodes, rpc_client).await?; +pub async fn last_epoch_hash(nodes: &[(NodeIndex, NodeInfo)]) -> Result<[u8; 32]> { + let mut hash = ask_last_epoch_hash(nodes.to_vec()).await; if hash.is_empty() { return Err(anyhow!( @@ -146,55 +137,46 @@ pub async fn last_epoch_hash( Ok(hash.pop().unwrap().0) } -#[derive(serde::Serialize, serde::Deserialize, Debug)] -pub struct RpcResponse { - pub jsonrpc: String, - pub id: usize, - pub result: T, -} - -// todo(dalton): Lazy static? -pub fn rpc_last_epoch_hash() -> serde_json::Value { - serde_json::json!({ - "jsonrpc": "2.0", - "method":"flk_get_last_epoch_hash", - "params":[], - "id":1, - }) -} - -pub fn rpc_epoch() -> serde_json::Value { - serde_json::json!({ - "jsonrpc": "2.0", - "method":"flk_get_epoch", - "params":[], - "id":1, - }) -} - -pub fn rpc_epoch_info() -> serde_json::Value { - serde_json::json!({ - "jsonrpc": "2.0", - "method":"flk_get_epoch_info", - "params":[], - "id":1, - }) +/// A list of the nodes reported last epoch hash +/// +/// ### Empty if all the requests fail +pub async fn ask_last_epoch_hash(nodes: Vec<(NodeIndex, NodeInfo)>) -> Vec<([u8; 32], Epoch)> { + nodes + .into_iter() + .map(|(_, node)| async move { + let client = + RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc)) + .ok()?; + + client.get_last_epoch_hash().await.ok() + }) + .collect::>() + .filter_map(std::future::ready) + .collect::>() + .await } -pub fn rpc_node_info(public_key: &NodePublicKey) -> serde_json::Value { - serde_json::json!({ - "jsonrpc": "2.0", - "method":"flk_get_node_info_epoch", - "params":{"public_key": public_key}, - "id":1, - }) -} +/// Returns the epoch info from the epoch the bootstrap nodes are on +/// +/// ### Empty if all the requests fail +pub async fn get_epoch(nodes: Vec<(NodeIndex, NodeInfo)>) -> Result> { + let results = nodes + .into_iter() + .map(|(_, node)| async move { + let client = + RpcClient::new_no_auth(&format!("http://{}:{}", node.domain, node.ports.rpc)) + .ok()?; + + client.get_epoch().await.ok() + }) + .collect::>() + .filter_map(std::future::ready) + .collect::>() + .await; -pub fn rpc_is_valid_node(public_key: &NodePublicKey) -> serde_json::Value { - serde_json::json!({ - "jsonrpc": "2.0", - "method":"flk_is_valid_node_epoch", - "params":{"public_key": public_key}, - "id":1, - }) + if results.is_empty() { + Err(anyhow!("Unable to get a response from nodes")) + } else { + Ok(results) + } } diff --git a/core/syncronizer/src/syncronizer.rs b/core/syncronizer/src/syncronizer.rs index 564e57b66..5e0f99030 100644 --- a/core/syncronizer/src/syncronizer.rs +++ b/core/syncronizer/src/syncronizer.rs @@ -15,12 +15,10 @@ use lightning_interfaces::types::{ use lightning_metrics::increment_counter; use lightning_utils::application::QueryRunnerExt; use rand::seq::SliceRandom; -use serde::de::DeserializeOwned; use tracing::error; use crate::config::Config; -use crate::rpc::{self, rpc_epoch}; -use crate::utils; +use crate::{rpc, utils}; pub struct Syncronizer { state: State, @@ -37,7 +35,6 @@ struct SyncronizerInner { notifier: C::NotifierInterface, blockstore_server_socket: BlockstoreServerSocket, genesis_committee: Vec<(NodeIndex, NodeInfo)>, - rpc_client: reqwest::Client, epoch_change_delta: Duration, } @@ -59,14 +56,9 @@ impl Syncronizer { let our_public_key = keystore.get_ed25519_pk(); - let rpc_client = reqwest::Client::builder() - .timeout(Duration::from_secs(5)) - .connect_timeout(Duration::from_secs(5)) - .build()?; - if !cfg!(debug_assertions) { // We only run the prelude in prod mode to avoid interfering with tests. - Syncronizer::::prelude(our_public_key, &genesis_committee, &rpc_client); + Syncronizer::::prelude(our_public_key, &genesis_committee); } let inner = SyncronizerInner::new( @@ -76,7 +68,6 @@ impl Syncronizer { notifier.clone(), blockstore_server, config.epoch_change_delta, - rpc_client, )?; Ok(Self { @@ -100,11 +91,7 @@ impl Syncronizer { ); } - fn prelude( - our_public_key: NodePublicKey, - genesis_committee: &Vec<(NodeIndex, NodeInfo)>, - rpc_client: &reqwest::Client, - ) { + fn prelude(our_public_key: NodePublicKey, genesis_committee: &Vec<(NodeIndex, NodeInfo)>) { // Check if node is on genesis committee. for (_, node_info) in genesis_committee { if our_public_key == node_info.public_key { @@ -120,7 +107,6 @@ impl Syncronizer { let is_valid = rpc::sync_call(rpc::check_is_valid_node( our_public_key, genesis_committee.clone(), - rpc_client.clone(), )) .expect("Cannot reach bootstrap nodes"); if !is_valid { @@ -133,16 +119,12 @@ impl Syncronizer { let node_info = rpc::sync_call(rpc::get_node_info( our_public_key, genesis_committee.clone(), - rpc_client.clone(), )) .expect("Cannot reach bootstrap nodes") .unwrap(); // we unwrap here because we already checked if the node is valid above - let epoch_info = rpc::sync_call(rpc::get_epoch_info( - genesis_committee.clone(), - rpc_client.clone(), - )) - .expect("Cannot reach bootstrap nodes"); + let epoch_info = rpc::sync_call(rpc::get_epoch_info(genesis_committee.clone())) + .expect("Cannot reach bootstrap nodes"); // Check participation status. match node_info.participation { @@ -157,7 +139,6 @@ impl Syncronizer { rpc::sync_call(utils::wait_to_next_epoch( epoch_info, genesis_committee.clone(), - rpc_client.clone(), )); }, _ => (), @@ -190,7 +171,6 @@ impl SyncronizerInner { notifier: C::NotifierInterface, blockstore_server: &C::BlockstoreServerInterface, epoch_change_delta: Duration, - rpc_client: reqwest::Client, ) -> Result { Ok(Self { our_public_key, @@ -198,7 +178,6 @@ impl SyncronizerInner { blockstore_server_socket: blockstore_server.get_socket(), notifier, genesis_committee, - rpc_client, epoch_change_delta, }) } @@ -280,8 +259,6 @@ impl SyncronizerInner { // Get the epoch the bootstrap nodes are at let bootstrap_epoch = self.get_current_epoch().await?; - //let bootstrap_epoch = self.ask_bootstrap_nodes(rpc_epoch().to_string()).await?; - if bootstrap_epoch <= current_epoch { bail!("Bootstrap nodes are on the same epoch"); } @@ -303,11 +280,6 @@ impl SyncronizerInner { } } - /// This function will rpc request genesis nodes in sequence and stop when one of them responds - async fn ask_bootstrap_nodes(&self, req: String) -> Result> { - rpc::ask_nodes(req, &self.genesis_committee, &self.rpc_client).await - } - async fn download_checkpoint_from_bootstrap(&self, checkpoint_hash: [u8; 32]) -> Result<()> { for (node_index, _) in &self.genesis_committee { let mut res = self @@ -331,12 +303,12 @@ impl SyncronizerInner { // This function will hit the bootstrap nodes(Genesis committee) to ask what epoch they are on // who the current committee is async fn get_latest_checkpoint_hash(&self) -> Result<[u8; 32]> { - rpc::last_epoch_hash(&self.genesis_committee, &self.rpc_client).await + rpc::last_epoch_hash(&self.genesis_committee).await } /// Returns the epoch the bootstrap nodes are on async fn get_current_epoch(&self) -> Result { - let epochs = self.ask_bootstrap_nodes(rpc_epoch().to_string()).await?; + let epochs = rpc::get_epoch(self.genesis_committee.clone()).await?; let epoch = epochs .into_iter() .max() diff --git a/core/syncronizer/src/utils.rs b/core/syncronizer/src/utils.rs index a5c41df76..ef5b62a8b 100644 --- a/core/syncronizer/src/utils.rs +++ b/core/syncronizer/src/utils.rs @@ -10,7 +10,6 @@ use crate::rpc; pub async fn wait_to_next_epoch( epoch_info: EpochInfo, genesis_committee: Vec<(NodeIndex, NodeInfo)>, - rpc_client: reqwest::Client, ) { let shutdown_controller = ShutdownController::default(); shutdown_controller.install_handlers(); @@ -22,16 +21,12 @@ pub async fn wait_to_next_epoch( println!("Exiting..."); std::process::exit(0); } - _ = wait_loop(epoch_info, genesis_committee, rpc_client) => { + _ = wait_loop(epoch_info, genesis_committee) => { } } } -async fn wait_loop( - epoch_info: EpochInfo, - genesis_committee: Vec<(NodeIndex, NodeInfo)>, - rpc_client: reqwest::Client, -) { +async fn wait_loop(epoch_info: EpochInfo, genesis_committee: Vec<(NodeIndex, NodeInfo)>) { let mut stdout = stdout(); println!(); loop { @@ -40,10 +35,9 @@ async fn wait_loop( .unwrap() .as_millis() as u64; if now > epoch_info.epoch_end { - let new_epoch_info = - rpc::get_epoch_info(genesis_committee.to_vec(), rpc_client.clone()) - .await - .expect("Cannot reach bootstrap nodes"); + let new_epoch_info = rpc::get_epoch_info(genesis_committee.to_vec()) + .await + .expect("Cannot reach bootstrap nodes"); if new_epoch_info.epoch > epoch_info.epoch { // The new epoch started, time to start the node. println!();