diff --git a/Dockerfile b/Dockerfile index c28c5d71..3892a9d8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,13 +17,9 @@ COPY --from=builder /app/target/release/tester . FROM debian:stable-slim as executor-runtime COPY /node/tools/docker_binaries/executor /node/ -COPY /node/tools/k8s_configs/ /node/k8s_config -COPY /node/tools/docker-config/ /node/docker_config -COPY docker-entrypoint.sh /node/ COPY k8s_entrypoint.sh /node/ WORKDIR /node -RUN chmod +x docker-entrypoint.sh RUN chmod +x k8s_entrypoint.sh ENTRYPOINT ["./docker-entrypoint.sh"] diff --git a/Makefile b/Makefile index ad3e44b9..f11b97c1 100644 --- a/Makefile +++ b/Makefile @@ -23,10 +23,9 @@ docker_node_image: # Kubernetes commands start_k8s_nodes: - cd ${EXECUTABLE_NODE_DIR} && cargo run --bin deployer generate-config --nodes ${NODES} $(MAKE) docker_node_image minikube image load consensus-node:latest - cd ${EXECUTABLE_NODE_DIR} && cargo run --release --bin deployer deploy --nodes ${NODES} --seed-nodes ${SEED_NODES} + cd ${EXECUTABLE_NODE_DIR} && cargo run --release --bin deployer -- --nodes ${NODES} --seed-nodes ${SEED_NODES} # Clean commands diff --git a/k8s_entrypoint.sh b/k8s_entrypoint.sh index 8f6f498f..4e9857cd 100644 --- a/k8s_entrypoint.sh +++ b/k8s_entrypoint.sh @@ -1,6 +1,5 @@ #!/bin/bash # This file works as an entrypoint of the kubernetes cluster running the node binary copied inside of it. -cd k8s_config/${NODE_ID} export RUST_LOG=INFO -../../executor $@ +./executor $@ diff --git a/node/Cargo.toml b/node/Cargo.toml index 80f955fe..0d2e3bcd 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -156,5 +156,7 @@ wildcard_dependencies = "warn" redundant_locals = "allow" needless_pass_by_ref_mut = "allow" box_default = "allow" +# remove once fix to https://github.com/rust-lang/rust-clippy/issues/11764 is available on CI. +map_identity = "allow" # &*x is not equivalent to x, because it affects borrowing in closures. borrow_deref_ref = "allow" diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index bf71df85..57a026f0 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -1,9 +1,8 @@ //! Deployer for the kubernetes cluster. -use anyhow::Context; -use clap::{Parser, Subcommand}; -use std::{collections::HashMap, fs, path::PathBuf}; -use zksync_consensus_crypto::{Text, TextFmt}; +use clap::Parser; +use std::collections::HashMap; use zksync_consensus_roles::{node, validator}; +use zksync_consensus_tools::k8s::ConsensusNode; use zksync_consensus_tools::{k8s, AppConfig}; /// K8s namespace for consensus nodes. @@ -13,14 +12,6 @@ const NAMESPACE: &str = "consensus"; #[derive(Debug, Parser)] #[command(name = "deployer")] struct DeployerCLI { - /// Subcommand to run. - #[command(subcommand)] - command: DeployerCommands, -} - -/// Subcommand arguments. -#[derive(Debug, Parser)] -struct SubCommandArgs { /// Number of total nodes to deploy. #[arg(long)] nodes: usize, @@ -29,19 +20,11 @@ struct SubCommandArgs { seed_nodes: Option, } -/// Subcommands. -#[derive(Subcommand, Debug)] -enum DeployerCommands { - /// Generate configs for the nodes. - GenerateConfig(SubCommandArgs), - /// Deploy the nodes. - Deploy(SubCommandArgs), -} - -/// Generates config for the nodes to run in the kubernetes cluster -/// Creates a directory for each node in the parent k8s_configs directory. -fn generate_config(nodes: usize) -> anyhow::Result<()> { +/// Generates the configuration for all the nodes to run in the kubernetes cluster +/// and creates a ConsensusNode for each to track their progress +fn generate_consensus_nodes(nodes: usize, seed_nodes_amount: Option) -> Vec { assert!(nodes > 0, "at least 1 node has to be specified"); + let seed_nodes_amount = seed_nodes_amount.unwrap_or(1); // Generate the keys for all the replicas. let rng = &mut rand::thread_rng(); @@ -56,54 +39,40 @@ fn generate_config(nodes: usize) -> anyhow::Result<()> { let default_config = AppConfig::default_for(setup.genesis.clone()); - let mut cfgs: Vec<_> = (0..nodes).map(|_| default_config.clone()).collect(); + let mut cfgs: Vec = (0..nodes) + .map(|i| ConsensusNode { + id: format!("consensus-node-{i:0>2}"), + config: default_config.clone(), + key: node_keys[i].clone(), + validator_key: Some(validator_keys[i].clone()), + node_addr: None, //It's not assigned yet + is_seed: i < seed_nodes_amount, + }) + .collect(); // Construct a gossip network with optimal diameter. for (i, node) in node_keys.iter().enumerate() { for j in 0..peers { let next = (i * peers + j + 1) % nodes; - cfgs[next].add_gossip_static_inbound(node.public()); + cfgs[next].config.add_gossip_static_inbound(node.public()); } } - let manifest_path = std::env::var("CARGO_MANIFEST_DIR")?; - let root = PathBuf::from(manifest_path).join("k8s_configs"); - let _ = fs::remove_dir_all(&root); - for (i, cfg) in cfgs.into_iter().enumerate() { - let node_config_dir = root.join(format!("consensus-node-{i:0>2}")); - fs::create_dir_all(&node_config_dir) - .with_context(|| format!("create_dir_all({:?})", node_config_dir))?; - - cfg.write_to_file(&node_config_dir)?; - fs::write( - node_config_dir.join("validator_key"), - &TextFmt::encode(&validator_keys[i]), - ) - .context("fs::write()")?; - fs::write( - node_config_dir.join("node_key"), - &TextFmt::encode(&node_keys[i]), - ) - .context("fs::write()")?; - } - - Ok(()) + cfgs } /// Deploys the nodes to the kubernetes cluster. async fn deploy(nodes_amount: usize, seed_nodes_amount: Option) -> anyhow::Result<()> { + let mut consensus_nodes = generate_consensus_nodes(nodes_amount, seed_nodes_amount); let client = k8s::get_client().await?; k8s::create_or_reuse_namespace(&client, NAMESPACE).await?; - let seed_nodes_amount = seed_nodes_amount.unwrap_or(1); let seed_nodes = &mut HashMap::new(); let mut non_seed_nodes = HashMap::new(); // Split the nodes in different hash maps as they will be deployed at different stages - let mut consensus_nodes = from_configs(nodes_amount)?; - for (index, node) in consensus_nodes.iter_mut().enumerate() { - if index < seed_nodes_amount { - node.is_seed = true; + for node in consensus_nodes.iter_mut() { + if node.is_seed { seed_nodes.insert(node.id.to_owned(), node); } else { non_seed_nodes.insert(node.id.to_owned(), node); @@ -120,58 +89,31 @@ async fn deploy(nodes_amount: usize, seed_nodes_amount: Option) -> anyhow node.fetch_and_assign_pod_ip(&client, NAMESPACE).await?; } - // Build a vector of seed peers NodeAddrs to provide as gossip_static_outbound to the rest of the nodes + // Build a vector of (PublicKey, SocketAddr) to provide as gossip_static_outbound + // to the rest of the nodes let peers: Vec<_> = seed_nodes .values() .map(|n| { - n.node_addr + let node_addr = n + .node_addr .as_ref() .expect("Seed node address not defined") - .clone() + .clone(); + (node_addr.key, node_addr.addr) }) .collect(); // Deploy the rest of the nodes for node in non_seed_nodes.values_mut() { - node.gossip_static_outbound = peers.clone(); + node.config.gossip_static_outbound.extend(peers.clone()); node.deploy(&client, NAMESPACE).await?; } Ok(()) } -/// Build ConsensusNodes representation list from configurations -// TODO once we can provide config via cli args, this will be replaced -// using in-memory config structs -fn from_configs(nodes: usize) -> anyhow::Result> { - let manifest_path = std::env::var("CARGO_MANIFEST_DIR")?; - let root = PathBuf::from(manifest_path).join("k8s_configs"); - let mut consensus_nodes = vec![]; - - for i in 0..nodes { - let node_id = format!("consensus-node-{i:0>2}"); - let node_key_path = root.join(&node_id).join("node_key"); - let key_string = fs::read_to_string(node_key_path).context("failed reading file")?; - let key = Text::new(&key_string) - .decode() - .context("failed decoding key")?; - consensus_nodes.push(k8s::ConsensusNode { - id: node_id, - key, - node_addr: None, - is_seed: false, - gossip_static_outbound: vec![], - }); - } - Ok(consensus_nodes) -} - #[tokio::main] async fn main() -> anyhow::Result<()> { - let DeployerCLI { command } = DeployerCLI::parse(); - - match command { - DeployerCommands::GenerateConfig(args) => generate_config(args.nodes), - DeployerCommands::Deploy(args) => deploy(args.nodes, args.seed_nodes).await, - } + let args = DeployerCLI::parse(); + deploy(args.nodes, args.seed_nodes).await } diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 53f608e2..ffb8410c 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -150,19 +150,35 @@ impl ProtoFmt for AppConfig { } } -/// This struct holds the file path to each of the config files. +/// Configuration information. #[derive(Debug)] -pub struct ConfigPaths<'a> { - /// Path to a JSON file with node configuration. - pub app: &'a Path, - /// Path to a validator key file. - pub validator_key: Option<&'a Path>, - /// Path to a node key file. - pub node_key: &'a Path, +pub struct ConfigArgs<'a> { + /// Node configuration. + pub config_args: ConfigSource<'a>, /// Path to the rocksdb database. pub database: &'a Path, } +#[derive(Debug)] +pub enum ConfigSource<'a> { + CliConfig { + /// Node configuration from command line. + config: AppConfig, + /// Node key as a string. + node_key: node::SecretKey, + /// Validator key as a string. + validator_key: Option, + }, + PathConfig { + /// Path to a JSON file with node configuration. + config_file: &'a Path, + /// Path to a validator key file. + validator_key_file: &'a Path, + /// Path to a node key file. + node_key_file: &'a Path, + }, +} + pub struct Configs { pub app: AppConfig, pub validator_key: Option, @@ -170,37 +186,47 @@ pub struct Configs { pub database: PathBuf, } -impl<'a> ConfigPaths<'a> { +impl<'a> ConfigArgs<'a> { // Loads configs from the file system. pub fn load(self) -> anyhow::Result { - Ok(Configs { - app: (|| { - let app = fs::read_to_string(self.app).context("failed reading file")?; - decode_json::>(&app).context("failed decoding JSON") - })() - .with_context(|| self.app.display().to_string())? - .0, - - validator_key: self - .validator_key - .as_ref() - .map(|file| { - (|| { - let key = fs::read_to_string(file).context("failed reading file")?; - Text::new(&key).decode().context("failed decoding key") - })() - .with_context(|| file.display().to_string()) - }) - .transpose()?, - - node_key: (|| { - let key = fs::read_to_string(self.node_key).context("failed reading file")?; - Text::new(&key).decode().context("failed decoding key") - })() - .with_context(|| self.node_key.display().to_string())?, - - database: self.database.into(), - }) + match self.config_args { + ConfigSource::CliConfig { + config, + node_key, + validator_key, + } => Ok(Configs { + app: config.clone(), + validator_key: validator_key.clone(), + node_key: node_key.clone(), + database: self.database.into(), + }), + ConfigSource::PathConfig { + config_file, + validator_key_file, + node_key_file, + } => Ok(Configs { + app: (|| { + let app = fs::read_to_string(config_file).context("failed reading file")?; + decode_json::>(&app).context("failed decoding JSON") + })() + .with_context(|| config_file.display().to_string())? + .0, + + validator_key: fs::read_to_string(validator_key_file) + .ok() + .map(|value| Text::new(&value).decode().context("failed decoding key")) + .transpose() + .with_context(|| validator_key_file.display().to_string())?, + + node_key: (|| { + let key = fs::read_to_string(node_key_file).context("failed reading file")?; + Text::new(&key).decode().context("failed decoding key") + })() + .with_context(|| node_key_file.display().to_string())?, + + database: self.database.into(), + }), + } } } diff --git a/node/tools/src/k8s.rs b/node/tools/src/k8s.rs index f53bfa12..e1c757b7 100644 --- a/node/tools/src/k8s.rs +++ b/node/tools/src/k8s.rs @@ -1,4 +1,4 @@ -use crate::{config, NodeAddr}; +use crate::{config, AppConfig, NodeAddr}; use anyhow::{ensure, Context}; use k8s_openapi::{ api::{ @@ -18,7 +18,8 @@ use kube::{ use std::{collections::BTreeMap, net::SocketAddr, time::Duration}; use tokio::time; use tracing::log::info; -use zksync_consensus_roles::node; +use zksync_consensus_crypto::TextFmt; +use zksync_consensus_roles::{node, validator}; use zksync_protobuf::serde::Serde; /// Docker image name for consensus nodes. @@ -32,14 +33,16 @@ pub const DEFAULT_NAMESPACE: &str = "consensus"; pub struct ConsensusNode { /// Node identifier pub id: String, + /// Node configuration + pub config: AppConfig, /// Node key pub key: node::SecretKey, + /// Node key + pub validator_key: Option, /// Full NodeAddr pub node_addr: Option, /// Is seed node (meaning it has no gossipStaticOutbound configuration) pub is_seed: bool, - /// known gossipStaticOutbound peers - pub gossip_static_outbound: Vec, } impl ConsensusNode { @@ -79,7 +82,7 @@ impl ConsensusNode { /// Creates a deployment pub async fn deploy(&self, client: &Client, namespace: &str) -> anyhow::Result<()> { - let cli_args = get_cli_args(&self.gossip_static_outbound); + let cli_args = get_cli_args(self); let deployment = Deployment { metadata: ObjectMeta { name: Some(self.id.to_owned()), @@ -322,9 +325,7 @@ async fn get_running_pod(pods: &Api, label: &str) -> anyhow::Result { .items .pop() .with_context(|| format!("Pod not found: {label}"))?; - if !is_pod_running(&pod) { - anyhow::bail!("Pod is not running"); - } + anyhow::ensure!(is_pod_running(&pod), "Pod is not running"); Ok(pod) } @@ -337,22 +338,21 @@ fn is_pod_running(pod: &Pod) -> bool { false } -fn get_cli_args(peers: &[NodeAddr]) -> Vec { - if peers.is_empty() { - [].to_vec() - } else { - [ - "--add-gossip-static-outbound".to_string(), - config::encode_with_serializer( - &peers - .iter() - .map(|e| Serde(e.clone())) - .collect::>>(), - serde_json::Serializer::new(vec![]), - ), - ] - .to_vec() - } +fn get_cli_args(consensus_node: &ConsensusNode) -> Vec { + let mut cli_args = [ + "--config".to_string(), + config::encode_with_serializer( + &Serde(consensus_node.config.clone()), + serde_json::Serializer::new(vec![]), + ), + "--node-key".to_string(), + TextFmt::encode(&consensus_node.key), + ] + .to_vec(); + if let Some(key) = &consensus_node.validator_key { + cli_args.append(&mut ["--validator-key".to_string(), TextFmt::encode(key)].to_vec()) + }; + cli_args } async fn retry(retries: usize, delay: Duration, mut f: F) -> anyhow::Result @@ -368,5 +368,5 @@ where return result; } } - unreachable!("Loop sould always return") + unreachable!("Loop should always return") } diff --git a/node/tools/src/lib.rs b/node/tools/src/lib.rs index 554e9904..9a6263b8 100644 --- a/node/tools/src/lib.rs +++ b/node/tools/src/lib.rs @@ -9,5 +9,5 @@ mod store; #[cfg(test)] mod tests; -pub use config::{decode_json, AppConfig, ConfigPaths, NodeAddr, NODES_PORT}; +pub use config::{decode_json, AppConfig, ConfigArgs, ConfigSource, NodeAddr, NODES_PORT}; pub use rpc::server::RPCServer; diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index 9dd4b335..0131bdf2 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -7,52 +7,83 @@ use tracing::metadata::LevelFilter; use tracing_subscriber::{prelude::*, Registry}; use vise_exporter::MetricsExporter; use zksync_concurrency::{ctx, scope}; -use zksync_consensus_tools::{decode_json, ConfigPaths, NodeAddr, RPCServer}; +use zksync_consensus_crypto::{Text, TextFmt}; +use zksync_consensus_roles::{node, validator}; +use zksync_consensus_tools::{decode_json, AppConfig, ConfigArgs, ConfigSource, RPCServer}; use zksync_protobuf::serde::Serde; -/// Wrapper for Vec. -#[derive(Debug, Clone)] -struct NodeAddrs(Vec>); - -impl std::str::FromStr for NodeAddrs { - type Err = anyhow::Error; - fn from_str(s: &str) -> Result { - Ok(Self(decode_json(s)?)) - } -} - /// Command-line application launching a node executor. #[derive(Debug, Parser)] -struct Args { +struct Cli { + /// Full json config + #[arg(long, + value_parser(parse_config), + requires="node_key", + conflicts_with_all=["config_file", "validator_key_file", "node_key_file"])] + config: Option>, + /// Plain node key + #[arg(long, + value_parser(parse_key::), + requires="config", + conflicts_with_all=["config_file", "validator_key_file", "node_key_file"])] + node_key: Option, + /// Plain validator key + #[arg(long, + value_parser(parse_key::), + requires_all=["config", "node_key"], + conflicts_with_all=["config_file", "validator_key_file", "node_key_file"])] + validator_key: Option, /// Path to a validator key file. If set to an empty string, validator key will not be read /// (i.e., a node will be initialized as a non-validator node). - #[arg(long, default_value = "./validator_key")] - validator_key: PathBuf, + #[arg(long, + default_value = "./validator_key", + conflicts_with_all=["config", "validator_key", "node_key"])] + validator_key_file: PathBuf, /// Path to a JSON file with node configuration. - #[arg(long, default_value = "./config.json")] + #[arg(long, + default_value = "./config.json", + conflicts_with_all=["config", "validator_key", "node_key"])] config_file: PathBuf, /// Path to a node key file. - #[arg(long, default_value = "./node_key")] - node_key: PathBuf, + #[arg(long, + default_value = "./node_key", + conflicts_with_all=["config", "validator_key", "node_key"])] + node_key_file: PathBuf, /// Path to the rocksdb database of the node. #[arg(long, default_value = "./database")] database: PathBuf, /// Port for the RPC server. #[arg(long)] rpc_port: Option, - /// IP address and key of the seed peers. - #[arg(long)] - add_gossip_static_outbound: Option, } -impl Args { +/// Function to let clap parse the command line `config` argument +fn parse_config(val: &str) -> anyhow::Result> { + decode_json(val) +} + +/// Node/validator key parser for clap +fn parse_key(val: &str) -> anyhow::Result { + Text::new(val).decode().context("failed decoding key") +} + +impl Cli { /// Extracts configuration paths from these args. - fn config_paths(&self) -> ConfigPaths<'_> { - ConfigPaths { - app: &self.config_file, - node_key: &self.node_key, - validator_key: (!self.validator_key.as_os_str().is_empty()) - .then_some(&self.validator_key), + fn config_args(&self) -> ConfigArgs<'_> { + let config_args = match &self.config { + Some(config) => ConfigSource::CliConfig { + config: config.clone().0, + node_key: self.node_key.clone().unwrap(), // node_key is present as it is enforced by clap rules + validator_key: self.validator_key.clone(), + }, + None => ConfigSource::PathConfig { + config_file: &self.config_file, + validator_key_file: &self.validator_key_file, + node_key_file: &self.node_key_file, + }, + }; + ConfigArgs { + config_args, database: &self.database, } } @@ -60,7 +91,7 @@ impl Args { #[tokio::main] async fn main() -> anyhow::Result<()> { - let args: Args = Args::parse(); + let args: Cli = Cli::parse(); tracing::trace!(?args, "Starting node"); let ctx = &ctx::root(); @@ -96,22 +127,11 @@ async fn main() -> anyhow::Result<()> { // Load the config files. tracing::debug!("Loading config files."); - let mut configs = args - .config_paths() - .load() - .context("config_paths().load()")?; + let mut configs = args.config_args().load().context("config_args().load()")?; // if `PUBLIC_ADDR` env var is set, use it to override publicAddr in config configs.app.check_public_addr().context("Public Address")?; - // Add gossipStaticOutbound pairs from cli to config - if let Some(addrs) = args.add_gossip_static_outbound { - configs - .app - .gossip_static_outbound - .extend(addrs.0.into_iter().map(|e| (e.0.key, e.0.addr))); - } - let (executor, runner) = configs .make_executor(ctx) .await