Skip to content

Commit

Permalink
Merge branch 'main' into weighted_validators
Browse files Browse the repository at this point in the history
  • Loading branch information
ElFantasma committed Mar 20, 2024
2 parents f6a3b52 + 20eddf2 commit 283ae7c
Show file tree
Hide file tree
Showing 12 changed files with 266 additions and 293 deletions.
262 changes: 136 additions & 126 deletions node/Cargo.lock

Large diffs are not rendered by default.

12 changes: 4 additions & 8 deletions node/tests/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
//! This is a simple test for the RPC server. It checks if the server is running and can respond to.
use anyhow::{ensure, Context};
use clap::{Parser, Subcommand};
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params, types::Params};
use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params};
use std::{fs, io::Write, net::SocketAddr, path::PathBuf, str::FromStr};
use zksync_consensus_tools::{
k8s,
rpc::methods::{health_check::HealthCheck, RPCMethod},
};
use zksync_consensus_tools::{k8s, rpc::methods::health_check};

/// Command line arguments.
#[derive(Debug, Parser)]
Expand Down Expand Up @@ -80,12 +77,11 @@ pub async fn sanity_test() {
let socket = SocketAddr::from_str(socket).unwrap();
let url = format!("http://{}", socket);
let rpc_client = HttpClientBuilder::default().build(url).unwrap();
let params = Params::new(None);
let response: serde_json::Value = rpc_client
.request(HealthCheck::method(), rpc_params!())
.request(health_check::method(), rpc_params!())
.await
.unwrap();
assert_eq!(response, HealthCheck::callback(params).unwrap());
assert_eq!(response, health_check::callback().unwrap());
}
}

Expand Down
17 changes: 8 additions & 9 deletions node/tools/src/bin/deployer.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
//! Deployer for the kubernetes cluster.
use clap::Parser;
use std::collections::HashMap;
use zksync_consensus_roles::{node, validator};
use zksync_consensus_roles::node::SecretKey;
use zksync_consensus_roles::validator;
use zksync_consensus_tools::k8s::ConsensusNode;
use zksync_consensus_tools::{k8s, AppConfig};

/// K8s namespace for consensus nodes.
const NAMESPACE: &str = "consensus";

/// Command line arguments.
#[derive(Debug, Parser)]
#[command(name = "deployer")]
Expand Down Expand Up @@ -35,7 +33,7 @@ fn generate_consensus_nodes(nodes: usize, seed_nodes_amount: Option<usize>) -> V
// Each node will have `gossip_peers` outbound peers.
let peers = 2;

let node_keys: Vec<node::SecretKey> = (0..nodes).map(|_| node::SecretKey::generate()).collect();
let node_keys: Vec<SecretKey> = (0..nodes).map(|_| SecretKey::generate()).collect();

let default_config = AppConfig::default_for(setup.genesis.clone());

Expand Down Expand Up @@ -65,7 +63,7 @@ fn generate_consensus_nodes(nodes: usize, seed_nodes_amount: Option<usize>) -> V
async fn deploy(nodes_amount: usize, seed_nodes_amount: Option<usize>) -> anyhow::Result<()> {
let mut consensus_nodes = generate_consensus_nodes(nodes_amount, seed_nodes_amount);
let client = k8s::get_client().await?;
k8s::create_or_reuse_namespace(&client, NAMESPACE).await?;
k8s::create_or_reuse_namespace(&client, k8s::DEFAULT_NAMESPACE).await?;

let seed_nodes = &mut HashMap::new();
let mut non_seed_nodes = HashMap::new();
Expand All @@ -81,12 +79,13 @@ async fn deploy(nodes_amount: usize, seed_nodes_amount: Option<usize>) -> anyhow

// Deploy seed peer(s)
for node in seed_nodes.values_mut() {
node.deploy(&client, NAMESPACE).await?;
node.deploy(&client, k8s::DEFAULT_NAMESPACE).await?;
}

// Fetch and complete node addrs into seed nodes
for node in seed_nodes.values_mut() {
node.fetch_and_assign_pod_ip(&client, NAMESPACE).await?;
node.fetch_and_assign_pod_ip(&client, k8s::DEFAULT_NAMESPACE)
.await?;
}

// Build a vector of (PublicKey, SocketAddr) to provide as gossip_static_outbound
Expand All @@ -106,7 +105,7 @@ async fn deploy(nodes_amount: usize, seed_nodes_amount: Option<usize>) -> anyhow
// Deploy the rest of the nodes
for node in non_seed_nodes.values_mut() {
node.config.gossip_static_outbound.extend(peers.clone());
node.deploy(&client, NAMESPACE).await?;
node.deploy(&client, k8s::DEFAULT_NAMESPACE).await?;
}

Ok(())
Expand Down
4 changes: 2 additions & 2 deletions node/tools/src/k8s.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ pub async fn get_consensus_nodes_address(client: &Client) -> anyhow::Result<Vec<
"No consensus pods found in the k8s cluster"
);
let mut node_rpc_addresses: Vec<SocketAddr> = Vec::new();
for pod in pods.into_iter() {
for pod in pods {
let pod_spec = pod.spec.as_ref().context("Failed to get pod spec")?;
let pod_container = pod_spec
.containers
Expand All @@ -218,7 +218,7 @@ pub async fn get_consensus_nodes_address(client: &Client) -> anyhow::Result<Vec<
.context("Failed to get ports of container")?
.iter()
.find_map(|port| {
let port: u16 = port.container_port.try_into().ok()?;
let port = port.container_port.try_into().ok()?;
(port != config::NODES_PORT).then_some(port)
})
.context("Failed parsing container port")?;
Expand Down
6 changes: 3 additions & 3 deletions node/tools/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -144,9 +144,9 @@ async fn main() -> anyhow::Result<()> {
rpc_addr.set_port(rpc_addr.port() + 100);
}

// cloning configuration to let RPCServer show it
// TODO this should be queried in real time instead, to reflect any possible change in config
let rpc_server = RPCServer::new(rpc_addr, configs.app.clone());
// Create the RPC server with the executor's storage.
let node_storage = executor.block_store.clone();
let rpc_server = RPCServer::new(rpc_addr, node_storage);

// Initialize the storage.
scope::run!(ctx, |ctx, s| async {
Expand Down
47 changes: 0 additions & 47 deletions node/tools/src/rpc/methods/config.rs

This file was deleted.

31 changes: 12 additions & 19 deletions node/tools/src/rpc/methods/health_check.rs
Original file line number Diff line number Diff line change
@@ -1,23 +1,16 @@
//! Health check method for RPC server.
use super::RPCMethod;
use jsonrpsee::types::{error::ErrorCode, Params};
use jsonrpsee::core::RpcResult;

/// Health check method for RPC server.
pub struct HealthCheck;

impl RPCMethod for HealthCheck {
/// Health check response for /health endpoint.
fn callback(_params: Params) -> Result<serde_json::Value, ErrorCode> {
Ok(serde_json::json!({"health": true}))
}
/// Health check response for /health endpoint.
pub fn callback() -> RpcResult<serde_json::Value> {
Ok(serde_json::json!({"health": true}))
}

/// Health check method name.
fn method() -> &'static str {
"health_check"
}
/// Health check method name.
pub fn method() -> &'static str {
"health_check"
}

/// Method path for GET requests.
fn path() -> &'static str {
"/health"
}
/// Method path for GET requests.
pub fn path() -> &'static str {
"/health"
}
34 changes: 34 additions & 0 deletions node/tools/src/rpc/methods/last_commited_block.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Peers method for RPC server.
use anyhow::Context;
use jsonrpsee::{
core::RpcResult,
types::{error::ErrorCode, ErrorObjectOwned},
};
use std::sync::Arc;
use zksync_consensus_storage::BlockStore;

/// Last view response for /last_view endpoint.
pub fn callback(node_storage: Arc<BlockStore>) -> RpcResult<serde_json::Value> {
let sub = &mut node_storage.subscribe();
let state = sub.borrow().clone();
let last_commited_block_header = state
.last
.context("Failed to get last state")
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))?
.header()
.number
.0;
Ok(serde_json::json!({
"last_commited_block": last_commited_block_header
}))
}

/// Last view method name.
pub fn method() -> &'static str {
"last_commited_block"
}

/// Method path for GET requests.
pub fn path() -> &'static str {
"/last_commited_block"
}
34 changes: 34 additions & 0 deletions node/tools/src/rpc/methods/last_view.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
//! Peers method for RPC server.
use anyhow::Context;
use jsonrpsee::{
core::RpcResult,
types::{error::ErrorCode, ErrorObjectOwned},
};
use std::sync::Arc;
use zksync_consensus_storage::BlockStore;

/// Last view response for /last_view endpoint.
pub fn callback(node_storage: Arc<BlockStore>) -> RpcResult<serde_json::Value> {
let sub = &mut node_storage.subscribe();
let state = sub.borrow().clone();
let last_view = state
.last
.context("Failed to get last state")
.map_err(|_| ErrorObjectOwned::from(ErrorCode::InternalError))?
.view()
.number
.0;
Ok(serde_json::json!({
"last_view": last_view
}))
}

/// Last view method name.
pub fn method() -> &'static str {
"last_view"
}

/// Method path for GET requests.
pub fn path() -> &'static str {
"/last_view"
}
16 changes: 2 additions & 14 deletions node/tools/src/rpc/methods/mod.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,3 @@
use jsonrpsee::types::{error::ErrorCode, Params};

/// Trait to implement for new RPC methods.
pub trait RPCMethod {
/// Method response logic when called.
fn callback(params: Params) -> Result<serde_json::Value, ErrorCode>;
/// Method name.
fn method() -> &'static str;
/// Method path for GET requests.
fn path() -> &'static str;
}

pub(crate) mod config;
pub mod health_check;
pub(crate) mod peers;
pub mod last_commited_block;
pub mod last_view;
40 changes: 0 additions & 40 deletions node/tools/src/rpc/methods/peers.rs

This file was deleted.

Loading

0 comments on commit 283ae7c

Please sign in to comment.