From 1f77a0e6e0556767b2f9c183c608379ef39269f2 Mon Sep 17 00:00:00 2001 From: ElFantasma Date: Tue, 19 Mar 2024 10:53:04 -0300 Subject: [PATCH] Full config in cli (BFT-427) (#71) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What ❔ Consensus nodes can receive the full configuration via cli args, as well as node and validator keys That, in turn is used to start the nodes in k8s pods without requiring any configuration in local filesystem ## Why ❔ This is cleaner and easier. Now the docker image built is independent of the number of nodes to deploy, as every configuration parameter is provided via command line --- Dockerfile | 4 -- Makefile | 3 +- k8s_entrypoint.sh | 3 +- node/Cargo.toml | 2 + node/tools/src/bin/deployer.rs | 120 +++++++++------------------------ node/tools/src/config.rs | 100 +++++++++++++++++---------- node/tools/src/k8s.rs | 50 +++++++------- node/tools/src/lib.rs | 2 +- node/tools/src/main.rs | 102 +++++++++++++++++----------- 9 files changed, 185 insertions(+), 201 deletions(-) diff --git a/Dockerfile b/Dockerfile index c28c5d71..3892a9d8 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,13 +17,9 @@ COPY --from=builder /app/target/release/tester . FROM debian:stable-slim as executor-runtime COPY /node/tools/docker_binaries/executor /node/ -COPY /node/tools/k8s_configs/ /node/k8s_config -COPY /node/tools/docker-config/ /node/docker_config -COPY docker-entrypoint.sh /node/ COPY k8s_entrypoint.sh /node/ WORKDIR /node -RUN chmod +x docker-entrypoint.sh RUN chmod +x k8s_entrypoint.sh ENTRYPOINT ["./docker-entrypoint.sh"] diff --git a/Makefile b/Makefile index ad3e44b9..f11b97c1 100644 --- a/Makefile +++ b/Makefile @@ -23,10 +23,9 @@ docker_node_image: # Kubernetes commands start_k8s_nodes: - cd ${EXECUTABLE_NODE_DIR} && cargo run --bin deployer generate-config --nodes ${NODES} $(MAKE) docker_node_image minikube image load consensus-node:latest - cd ${EXECUTABLE_NODE_DIR} && cargo run --release --bin deployer deploy --nodes ${NODES} --seed-nodes ${SEED_NODES} + cd ${EXECUTABLE_NODE_DIR} && cargo run --release --bin deployer -- --nodes ${NODES} --seed-nodes ${SEED_NODES} # Clean commands diff --git a/k8s_entrypoint.sh b/k8s_entrypoint.sh index 8f6f498f..4e9857cd 100644 --- a/k8s_entrypoint.sh +++ b/k8s_entrypoint.sh @@ -1,6 +1,5 @@ #!/bin/bash # This file works as an entrypoint of the kubernetes cluster running the node binary copied inside of it. -cd k8s_config/${NODE_ID} export RUST_LOG=INFO -../../executor $@ +./executor $@ diff --git a/node/Cargo.toml b/node/Cargo.toml index 80f955fe..0d2e3bcd 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -156,5 +156,7 @@ wildcard_dependencies = "warn" redundant_locals = "allow" needless_pass_by_ref_mut = "allow" box_default = "allow" +# remove once fix to https://github.com/rust-lang/rust-clippy/issues/11764 is available on CI. +map_identity = "allow" # &*x is not equivalent to x, because it affects borrowing in closures. borrow_deref_ref = "allow" diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index bf71df85..57a026f0 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -1,9 +1,8 @@ //! Deployer for the kubernetes cluster. -use anyhow::Context; -use clap::{Parser, Subcommand}; -use std::{collections::HashMap, fs, path::PathBuf}; -use zksync_consensus_crypto::{Text, TextFmt}; +use clap::Parser; +use std::collections::HashMap; use zksync_consensus_roles::{node, validator}; +use zksync_consensus_tools::k8s::ConsensusNode; use zksync_consensus_tools::{k8s, AppConfig}; /// K8s namespace for consensus nodes. @@ -13,14 +12,6 @@ const NAMESPACE: &str = "consensus"; #[derive(Debug, Parser)] #[command(name = "deployer")] struct DeployerCLI { - /// Subcommand to run. - #[command(subcommand)] - command: DeployerCommands, -} - -/// Subcommand arguments. -#[derive(Debug, Parser)] -struct SubCommandArgs { /// Number of total nodes to deploy. #[arg(long)] nodes: usize, @@ -29,19 +20,11 @@ struct SubCommandArgs { seed_nodes: Option, } -/// Subcommands. -#[derive(Subcommand, Debug)] -enum DeployerCommands { - /// Generate configs for the nodes. - GenerateConfig(SubCommandArgs), - /// Deploy the nodes. - Deploy(SubCommandArgs), -} - -/// Generates config for the nodes to run in the kubernetes cluster -/// Creates a directory for each node in the parent k8s_configs directory. -fn generate_config(nodes: usize) -> anyhow::Result<()> { +/// Generates the configuration for all the nodes to run in the kubernetes cluster +/// and creates a ConsensusNode for each to track their progress +fn generate_consensus_nodes(nodes: usize, seed_nodes_amount: Option) -> Vec { assert!(nodes > 0, "at least 1 node has to be specified"); + let seed_nodes_amount = seed_nodes_amount.unwrap_or(1); // Generate the keys for all the replicas. let rng = &mut rand::thread_rng(); @@ -56,54 +39,40 @@ fn generate_config(nodes: usize) -> anyhow::Result<()> { let default_config = AppConfig::default_for(setup.genesis.clone()); - let mut cfgs: Vec<_> = (0..nodes).map(|_| default_config.clone()).collect(); + let mut cfgs: Vec = (0..nodes) + .map(|i| ConsensusNode { + id: format!("consensus-node-{i:0>2}"), + config: default_config.clone(), + key: node_keys[i].clone(), + validator_key: Some(validator_keys[i].clone()), + node_addr: None, //It's not assigned yet + is_seed: i < seed_nodes_amount, + }) + .collect(); // Construct a gossip network with optimal diameter. for (i, node) in node_keys.iter().enumerate() { for j in 0..peers { let next = (i * peers + j + 1) % nodes; - cfgs[next].add_gossip_static_inbound(node.public()); + cfgs[next].config.add_gossip_static_inbound(node.public()); } } - let manifest_path = std::env::var("CARGO_MANIFEST_DIR")?; - let root = PathBuf::from(manifest_path).join("k8s_configs"); - let _ = fs::remove_dir_all(&root); - for (i, cfg) in cfgs.into_iter().enumerate() { - let node_config_dir = root.join(format!("consensus-node-{i:0>2}")); - fs::create_dir_all(&node_config_dir) - .with_context(|| format!("create_dir_all({:?})", node_config_dir))?; - - cfg.write_to_file(&node_config_dir)?; - fs::write( - node_config_dir.join("validator_key"), - &TextFmt::encode(&validator_keys[i]), - ) - .context("fs::write()")?; - fs::write( - node_config_dir.join("node_key"), - &TextFmt::encode(&node_keys[i]), - ) - .context("fs::write()")?; - } - - Ok(()) + cfgs } /// Deploys the nodes to the kubernetes cluster. async fn deploy(nodes_amount: usize, seed_nodes_amount: Option) -> anyhow::Result<()> { + let mut consensus_nodes = generate_consensus_nodes(nodes_amount, seed_nodes_amount); let client = k8s::get_client().await?; k8s::create_or_reuse_namespace(&client, NAMESPACE).await?; - let seed_nodes_amount = seed_nodes_amount.unwrap_or(1); let seed_nodes = &mut HashMap::new(); let mut non_seed_nodes = HashMap::new(); // Split the nodes in different hash maps as they will be deployed at different stages - let mut consensus_nodes = from_configs(nodes_amount)?; - for (index, node) in consensus_nodes.iter_mut().enumerate() { - if index < seed_nodes_amount { - node.is_seed = true; + for node in consensus_nodes.iter_mut() { + if node.is_seed { seed_nodes.insert(node.id.to_owned(), node); } else { non_seed_nodes.insert(node.id.to_owned(), node); @@ -120,58 +89,31 @@ async fn deploy(nodes_amount: usize, seed_nodes_amount: Option) -> anyhow node.fetch_and_assign_pod_ip(&client, NAMESPACE).await?; } - // Build a vector of seed peers NodeAddrs to provide as gossip_static_outbound to the rest of the nodes + // Build a vector of (PublicKey, SocketAddr) to provide as gossip_static_outbound + // to the rest of the nodes let peers: Vec<_> = seed_nodes .values() .map(|n| { - n.node_addr + let node_addr = n + .node_addr .as_ref() .expect("Seed node address not defined") - .clone() + .clone(); + (node_addr.key, node_addr.addr) }) .collect(); // Deploy the rest of the nodes for node in non_seed_nodes.values_mut() { - node.gossip_static_outbound = peers.clone(); + node.config.gossip_static_outbound.extend(peers.clone()); node.deploy(&client, NAMESPACE).await?; } Ok(()) } -/// Build ConsensusNodes representation list from configurations -// TODO once we can provide config via cli args, this will be replaced -// using in-memory config structs -fn from_configs(nodes: usize) -> anyhow::Result> { - let manifest_path = std::env::var("CARGO_MANIFEST_DIR")?; - let root = PathBuf::from(manifest_path).join("k8s_configs"); - let mut consensus_nodes = vec![]; - - for i in 0..nodes { - let node_id = format!("consensus-node-{i:0>2}"); - let node_key_path = root.join(&node_id).join("node_key"); - let key_string = fs::read_to_string(node_key_path).context("failed reading file")?; - let key = Text::new(&key_string) - .decode() - .context("failed decoding key")?; - consensus_nodes.push(k8s::ConsensusNode { - id: node_id, - key, - node_addr: None, - is_seed: false, - gossip_static_outbound: vec![], - }); - } - Ok(consensus_nodes) -} - #[tokio::main] async fn main() -> anyhow::Result<()> { - let DeployerCLI { command } = DeployerCLI::parse(); - - match command { - DeployerCommands::GenerateConfig(args) => generate_config(args.nodes), - DeployerCommands::Deploy(args) => deploy(args.nodes, args.seed_nodes).await, - } + let args = DeployerCLI::parse(); + deploy(args.nodes, args.seed_nodes).await } diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 53f608e2..ffb8410c 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -150,19 +150,35 @@ impl ProtoFmt for AppConfig { } } -/// This struct holds the file path to each of the config files. +/// Configuration information. #[derive(Debug)] -pub struct ConfigPaths<'a> { - /// Path to a JSON file with node configuration. - pub app: &'a Path, - /// Path to a validator key file. - pub validator_key: Option<&'a Path>, - /// Path to a node key file. - pub node_key: &'a Path, +pub struct ConfigArgs<'a> { + /// Node configuration. + pub config_args: ConfigSource<'a>, /// Path to the rocksdb database. pub database: &'a Path, } +#[derive(Debug)] +pub enum ConfigSource<'a> { + CliConfig { + /// Node configuration from command line. + config: AppConfig, + /// Node key as a string. + node_key: node::SecretKey, + /// Validator key as a string. + validator_key: Option, + }, + PathConfig { + /// Path to a JSON file with node configuration. + config_file: &'a Path, + /// Path to a validator key file. + validator_key_file: &'a Path, + /// Path to a node key file. + node_key_file: &'a Path, + }, +} + pub struct Configs { pub app: AppConfig, pub validator_key: Option, @@ -170,37 +186,47 @@ pub struct Configs { pub database: PathBuf, } -impl<'a> ConfigPaths<'a> { +impl<'a> ConfigArgs<'a> { // Loads configs from the file system. pub fn load(self) -> anyhow::Result { - Ok(Configs { - app: (|| { - let app = fs::read_to_string(self.app).context("failed reading file")?; - decode_json::>(&app).context("failed decoding JSON") - })() - .with_context(|| self.app.display().to_string())? - .0, - - validator_key: self - .validator_key - .as_ref() - .map(|file| { - (|| { - let key = fs::read_to_string(file).context("failed reading file")?; - Text::new(&key).decode().context("failed decoding key") - })() - .with_context(|| file.display().to_string()) - }) - .transpose()?, - - node_key: (|| { - let key = fs::read_to_string(self.node_key).context("failed reading file")?; - Text::new(&key).decode().context("failed decoding key") - })() - .with_context(|| self.node_key.display().to_string())?, - - database: self.database.into(), - }) + match self.config_args { + ConfigSource::CliConfig { + config, + node_key, + validator_key, + } => Ok(Configs { + app: config.clone(), + validator_key: validator_key.clone(), + node_key: node_key.clone(), + database: self.database.into(), + }), + ConfigSource::PathConfig { + config_file, + validator_key_file, + node_key_file, + } => Ok(Configs { + app: (|| { + let app = fs::read_to_string(config_file).context("failed reading file")?; + decode_json::>(&app).context("failed decoding JSON") + })() + .with_context(|| config_file.display().to_string())? + .0, + + validator_key: fs::read_to_string(validator_key_file) + .ok() + .map(|value| Text::new(&value).decode().context("failed decoding key")) + .transpose() + .with_context(|| validator_key_file.display().to_string())?, + + node_key: (|| { + let key = fs::read_to_string(node_key_file).context("failed reading file")?; + Text::new(&key).decode().context("failed decoding key") + })() + .with_context(|| node_key_file.display().to_string())?, + + database: self.database.into(), + }), + } } } diff --git a/node/tools/src/k8s.rs b/node/tools/src/k8s.rs index f53bfa12..e1c757b7 100644 --- a/node/tools/src/k8s.rs +++ b/node/tools/src/k8s.rs @@ -1,4 +1,4 @@ -use crate::{config, NodeAddr}; +use crate::{config, AppConfig, NodeAddr}; use anyhow::{ensure, Context}; use k8s_openapi::{ api::{ @@ -18,7 +18,8 @@ use kube::{ use std::{collections::BTreeMap, net::SocketAddr, time::Duration}; use tokio::time; use tracing::log::info; -use zksync_consensus_roles::node; +use zksync_consensus_crypto::TextFmt; +use zksync_consensus_roles::{node, validator}; use zksync_protobuf::serde::Serde; /// Docker image name for consensus nodes. @@ -32,14 +33,16 @@ pub const DEFAULT_NAMESPACE: &str = "consensus"; pub struct ConsensusNode { /// Node identifier pub id: String, + /// Node configuration + pub config: AppConfig, /// Node key pub key: node::SecretKey, + /// Node key + pub validator_key: Option, /// Full NodeAddr pub node_addr: Option, /// Is seed node (meaning it has no gossipStaticOutbound configuration) pub is_seed: bool, - /// known gossipStaticOutbound peers - pub gossip_static_outbound: Vec, } impl ConsensusNode { @@ -79,7 +82,7 @@ impl ConsensusNode { /// Creates a deployment pub async fn deploy(&self, client: &Client, namespace: &str) -> anyhow::Result<()> { - let cli_args = get_cli_args(&self.gossip_static_outbound); + let cli_args = get_cli_args(self); let deployment = Deployment { metadata: ObjectMeta { name: Some(self.id.to_owned()), @@ -322,9 +325,7 @@ async fn get_running_pod(pods: &Api, label: &str) -> anyhow::Result { .items .pop() .with_context(|| format!("Pod not found: {label}"))?; - if !is_pod_running(&pod) { - anyhow::bail!("Pod is not running"); - } + anyhow::ensure!(is_pod_running(&pod), "Pod is not running"); Ok(pod) } @@ -337,22 +338,21 @@ fn is_pod_running(pod: &Pod) -> bool { false } -fn get_cli_args(peers: &[NodeAddr]) -> Vec { - if peers.is_empty() { - [].to_vec() - } else { - [ - "--add-gossip-static-outbound".to_string(), - config::encode_with_serializer( - &peers - .iter() - .map(|e| Serde(e.clone())) - .collect::>>(), - serde_json::Serializer::new(vec![]), - ), - ] - .to_vec() - } +fn get_cli_args(consensus_node: &ConsensusNode) -> Vec { + let mut cli_args = [ + "--config".to_string(), + config::encode_with_serializer( + &Serde(consensus_node.config.clone()), + serde_json::Serializer::new(vec![]), + ), + "--node-key".to_string(), + TextFmt::encode(&consensus_node.key), + ] + .to_vec(); + if let Some(key) = &consensus_node.validator_key { + cli_args.append(&mut ["--validator-key".to_string(), TextFmt::encode(key)].to_vec()) + }; + cli_args } async fn retry(retries: usize, delay: Duration, mut f: F) -> anyhow::Result @@ -368,5 +368,5 @@ where return result; } } - unreachable!("Loop sould always return") + unreachable!("Loop should always return") } diff --git a/node/tools/src/lib.rs b/node/tools/src/lib.rs index 554e9904..9a6263b8 100644 --- a/node/tools/src/lib.rs +++ b/node/tools/src/lib.rs @@ -9,5 +9,5 @@ mod store; #[cfg(test)] mod tests; -pub use config::{decode_json, AppConfig, ConfigPaths, NodeAddr, NODES_PORT}; +pub use config::{decode_json, AppConfig, ConfigArgs, ConfigSource, NodeAddr, NODES_PORT}; pub use rpc::server::RPCServer; diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index 9dd4b335..0131bdf2 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -7,52 +7,83 @@ use tracing::metadata::LevelFilter; use tracing_subscriber::{prelude::*, Registry}; use vise_exporter::MetricsExporter; use zksync_concurrency::{ctx, scope}; -use zksync_consensus_tools::{decode_json, ConfigPaths, NodeAddr, RPCServer}; +use zksync_consensus_crypto::{Text, TextFmt}; +use zksync_consensus_roles::{node, validator}; +use zksync_consensus_tools::{decode_json, AppConfig, ConfigArgs, ConfigSource, RPCServer}; use zksync_protobuf::serde::Serde; -/// Wrapper for Vec. -#[derive(Debug, Clone)] -struct NodeAddrs(Vec>); - -impl std::str::FromStr for NodeAddrs { - type Err = anyhow::Error; - fn from_str(s: &str) -> Result { - Ok(Self(decode_json(s)?)) - } -} - /// Command-line application launching a node executor. #[derive(Debug, Parser)] -struct Args { +struct Cli { + /// Full json config + #[arg(long, + value_parser(parse_config), + requires="node_key", + conflicts_with_all=["config_file", "validator_key_file", "node_key_file"])] + config: Option>, + /// Plain node key + #[arg(long, + value_parser(parse_key::), + requires="config", + conflicts_with_all=["config_file", "validator_key_file", "node_key_file"])] + node_key: Option, + /// Plain validator key + #[arg(long, + value_parser(parse_key::), + requires_all=["config", "node_key"], + conflicts_with_all=["config_file", "validator_key_file", "node_key_file"])] + validator_key: Option, /// Path to a validator key file. If set to an empty string, validator key will not be read /// (i.e., a node will be initialized as a non-validator node). - #[arg(long, default_value = "./validator_key")] - validator_key: PathBuf, + #[arg(long, + default_value = "./validator_key", + conflicts_with_all=["config", "validator_key", "node_key"])] + validator_key_file: PathBuf, /// Path to a JSON file with node configuration. - #[arg(long, default_value = "./config.json")] + #[arg(long, + default_value = "./config.json", + conflicts_with_all=["config", "validator_key", "node_key"])] config_file: PathBuf, /// Path to a node key file. - #[arg(long, default_value = "./node_key")] - node_key: PathBuf, + #[arg(long, + default_value = "./node_key", + conflicts_with_all=["config", "validator_key", "node_key"])] + node_key_file: PathBuf, /// Path to the rocksdb database of the node. #[arg(long, default_value = "./database")] database: PathBuf, /// Port for the RPC server. #[arg(long)] rpc_port: Option, - /// IP address and key of the seed peers. - #[arg(long)] - add_gossip_static_outbound: Option, } -impl Args { +/// Function to let clap parse the command line `config` argument +fn parse_config(val: &str) -> anyhow::Result> { + decode_json(val) +} + +/// Node/validator key parser for clap +fn parse_key(val: &str) -> anyhow::Result { + Text::new(val).decode().context("failed decoding key") +} + +impl Cli { /// Extracts configuration paths from these args. - fn config_paths(&self) -> ConfigPaths<'_> { - ConfigPaths { - app: &self.config_file, - node_key: &self.node_key, - validator_key: (!self.validator_key.as_os_str().is_empty()) - .then_some(&self.validator_key), + fn config_args(&self) -> ConfigArgs<'_> { + let config_args = match &self.config { + Some(config) => ConfigSource::CliConfig { + config: config.clone().0, + node_key: self.node_key.clone().unwrap(), // node_key is present as it is enforced by clap rules + validator_key: self.validator_key.clone(), + }, + None => ConfigSource::PathConfig { + config_file: &self.config_file, + validator_key_file: &self.validator_key_file, + node_key_file: &self.node_key_file, + }, + }; + ConfigArgs { + config_args, database: &self.database, } } @@ -60,7 +91,7 @@ impl Args { #[tokio::main] async fn main() -> anyhow::Result<()> { - let args: Args = Args::parse(); + let args: Cli = Cli::parse(); tracing::trace!(?args, "Starting node"); let ctx = &ctx::root(); @@ -96,22 +127,11 @@ async fn main() -> anyhow::Result<()> { // Load the config files. tracing::debug!("Loading config files."); - let mut configs = args - .config_paths() - .load() - .context("config_paths().load()")?; + let mut configs = args.config_args().load().context("config_args().load()")?; // if `PUBLIC_ADDR` env var is set, use it to override publicAddr in config configs.app.check_public_addr().context("Public Address")?; - // Add gossipStaticOutbound pairs from cli to config - if let Some(addrs) = args.add_gossip_static_outbound { - configs - .app - .gossip_static_outbound - .extend(addrs.0.into_iter().map(|e| (e.0.key, e.0.addr))); - } - let (executor, runner) = configs .make_executor(ctx) .await