From 0217158281c75de9101a43a822e4fe1436ee7c0d Mon Sep 17 00:00:00 2001 From: Alex Ostrovski Date: Mon, 30 Oct 2023 17:53:03 +0200 Subject: [PATCH] feat: Full node mode (#13) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # What ❔ Allows to run a node without the consensus module. Refactors the `executor` library to be usable as a library and moves its binary to the `tools` crate. ## Why ❔ We need to be able to use `executor` as a library from the consensus code; including when running an external node. --- .github/workflows/load_testing.yaml | 2 +- .gitignore | 6 - docker/compose.Dockerfile | 2 +- docker/localenv.Dockerfile | 2 +- node/Cargo.lock | 58 +---- node/Cargo.toml | 4 +- node/actors/consensus/src/lib.rs | 5 +- .../consensus/src/replica/state_machine.rs | 59 ++--- node/actors/consensus/src/testonly/make.rs | 4 +- node/actors/consensus/src/testonly/node.rs | 2 +- node/actors/consensus/src/testonly/run.rs | 27 +-- node/actors/executor/Cargo.toml | 20 +- .../node_config.rs => config/mod.rs} | 84 ++++--- .../src/{lib/configurator => config}/tests.rs | 14 +- node/actors/executor/src/{lib => }/io.rs | 31 ++- node/actors/executor/src/lib.rs | 215 ++++++++++++++++++ .../src/lib/configurator/config_paths.rs | 106 --------- .../executor/src/lib/configurator/mod.rs | 61 ----- node/actors/executor/src/lib/lib.rs | 5 - node/actors/executor/src/{lib => }/metrics.rs | 0 node/actors/executor/src/tests.rs | 184 +++++++++++++++ node/actors/network/src/consensus/runner.rs | 25 +- node/actors/network/src/consensus/state.rs | 28 ++- node/actors/network/src/consensus/tests.rs | 53 +++-- node/actors/network/src/gossip/runner.rs | 33 ++- node/actors/network/src/gossip/state.rs | 21 +- node/actors/network/src/gossip/tests.rs | 93 +++++--- node/actors/network/src/metrics.rs | 20 +- node/actors/network/src/rpc/mod.rs | 11 +- node/actors/network/src/state.rs | 76 +++++-- node/actors/network/src/testonly.rs | 83 ++++--- node/actors/network/src/tests.rs | 2 +- node/actors/sync_blocks/src/lib.rs | 2 + .../sync_blocks/src/tests/end_to_end.rs | 2 +- node/actors/sync_blocks/src/tests/mod.rs | 2 +- node/libs/concurrency/Cargo.toml | 2 - node/libs/concurrency/src/metrics.rs | 17 -- node/libs/schema/proto/executor/config.proto | 31 ++- node/libs/storage/src/lib.rs | 2 + node/libs/storage/src/replica_state.rs | 66 ++++++ node/libs/storage/src/traits.rs | 4 +- node/tools/Cargo.toml | 17 +- node/tools/src/bin/localnet_config.rs | 24 +- node/tools/src/config.rs | 125 ++++++++++ node/tools/src/lib.rs | 5 + node/{actors/executor => tools}/src/main.rs | 148 ++++++------ 46 files changed, 1138 insertions(+), 645 deletions(-) rename node/actors/executor/src/{lib/configurator/node_config.rs => config/mod.rs} (78%) rename node/actors/executor/src/{lib/configurator => config}/tests.rs (81%) rename node/actors/executor/src/{lib => }/io.rs (80%) create mode 100644 node/actors/executor/src/lib.rs delete mode 100644 node/actors/executor/src/lib/configurator/config_paths.rs delete mode 100644 node/actors/executor/src/lib/configurator/mod.rs delete mode 100644 node/actors/executor/src/lib/lib.rs rename node/actors/executor/src/{lib => }/metrics.rs (100%) create mode 100644 node/actors/executor/src/tests.rs create mode 100644 node/libs/storage/src/replica_state.rs create mode 100644 node/tools/src/config.rs create mode 100644 node/tools/src/lib.rs rename node/{actors/executor => tools}/src/main.rs (55%) diff --git a/.github/workflows/load_testing.yaml b/.github/workflows/load_testing.yaml index a6354588..412f48e7 100644 --- a/.github/workflows/load_testing.yaml +++ b/.github/workflows/load_testing.yaml @@ -113,7 +113,7 @@ jobs: - name: Build executor binary working-directory: node run: | - build_output=$(cargo build --release -p executor --bin executor --message-format=json) || exit 1 + build_output=$(cargo build --release -p tools --bin executor --message-format=json) || exit 1 echo "$build_output" | jq -r 'select(.executable != null) | .executable' \ | while read binary; do cp "$binary" artifacts/binaries/ diff --git a/.gitignore b/.gitignore index 2f7db538..c8556338 100644 --- a/.gitignore +++ b/.gitignore @@ -2,15 +2,9 @@ # will have compiled files and executables target/ -# These are backup files generated by rustfmt -/*.rs.bk - # Debug logs logs/ -# Config files -config/ - # Local load test leftovers .terraform .ssh diff --git a/docker/compose.Dockerfile b/docker/compose.Dockerfile index 9f4df93a..b7c8d786 100644 --- a/docker/compose.Dockerfile +++ b/docker/compose.Dockerfile @@ -28,4 +28,4 @@ WORKDIR /usr/src/myapp/artifacts/$node/ RUN mkdir /usr/src/myapp/artifacts/$node/logs/ # You can ignore this command. In docker-compose.yml file we have specified the different command -CMD ["./executor", "0"] \ No newline at end of file +CMD ["./executor", "0"] diff --git a/docker/localenv.Dockerfile b/docker/localenv.Dockerfile index 029be8bd..426293ba 100644 --- a/docker/localenv.Dockerfile +++ b/docker/localenv.Dockerfile @@ -22,7 +22,7 @@ RUN cargo run -p tools --bin localnet_config -- --nodes=$nodes # Build binary file in release mode and create a main release binary WORKDIR /usr/src/myapp/node -RUN cargo build -p executor --release +RUN cargo build -p tools --bin executor --release # Create the artifacts directory WORKDIR /usr/src/myapp/node/ diff --git a/node/Cargo.lock b/node/Cargo.lock index 86f18a03..390ba435 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -389,8 +389,6 @@ name = "concurrency" version = "0.1.0" dependencies = [ "anyhow", - "futures", - "hyper", "once_cell", "pin-project", "rand", @@ -607,22 +605,17 @@ dependencies = [ "concurrency", "consensus", "crypto", - "hex", "network", - "once_cell", "rand", "roles", "schema", - "serde", - "serde_json", "storage", "sync_blocks", + "tempfile", "tokio", "tracing", - "tracing-subscriber", "utils", "vise", - "vise-exporter", ] [[package]] @@ -781,25 +774,6 @@ version = "0.3.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" -[[package]] -name = "h2" -version = "0.3.20" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "97ec8491ebaf99c8eaa73058b045fe58073cd6be7f596ac993ced0b0a0c01049" -dependencies = [ - "bytes", - "fnv", - "futures-core", - "futures-sink", - "futures-util", - "http", - "indexmap", - "slab", - "tokio", - "tokio-util", - "tracing", -] - [[package]] name = "hashbrown" version = "0.12.3" @@ -868,7 +842,6 @@ dependencies = [ "futures-channel", "futures-core", "futures-util", - "h2", "http", "http-body", "httparse", @@ -2080,34 +2053,25 @@ dependencies = [ "syn 2.0.29", ] -[[package]] -name = "tokio-util" -version = "0.7.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "806fe8c2c87eccc8b3267cbae29ed3ab2d0bd37fca70ab622e46aaa9375ddb7d" -dependencies = [ - "bytes", - "futures-core", - "futures-sink", - "pin-project-lite", - "tokio", - "tracing", -] - [[package]] name = "tools" version = "0.1.0" dependencies = [ "anyhow", "clap", + "concurrency", "consensus", "crypto", "executor", - "hex", "rand", "roles", "schema", - "serde_json", + "storage", + "tokio", + "tracing", + "tracing-subscriber", + "utils", + "vise-exporter", ] [[package]] @@ -2241,7 +2205,7 @@ checksum = "49874b5167b65d7193b8aba1567f5c7d93d001cafc34600cee003eda787e483f" [[package]] name = "vise" version = "0.1.0" -source = "git+https://github.com/matter-labs/vise.git?rev=8322ddc4bb115a7d11127626730b94f93b804cbe#8322ddc4bb115a7d11127626730b94f93b804cbe" +source = "git+https://github.com/matter-labs/vise.git?rev=dd05139b76ab0843443ab3ff730174942c825dae#dd05139b76ab0843443ab3ff730174942c825dae" dependencies = [ "elsa", "linkme", @@ -2253,7 +2217,7 @@ dependencies = [ [[package]] name = "vise-exporter" version = "0.1.0" -source = "git+https://github.com/matter-labs/vise.git?rev=8322ddc4bb115a7d11127626730b94f93b804cbe#8322ddc4bb115a7d11127626730b94f93b804cbe" +source = "git+https://github.com/matter-labs/vise.git?rev=dd05139b76ab0843443ab3ff730174942c825dae#dd05139b76ab0843443ab3ff730174942c825dae" dependencies = [ "hyper", "once_cell", @@ -2265,7 +2229,7 @@ dependencies = [ [[package]] name = "vise-macros" version = "0.1.0" -source = "git+https://github.com/matter-labs/vise.git?rev=8322ddc4bb115a7d11127626730b94f93b804cbe#8322ddc4bb115a7d11127626730b94f93b804cbe" +source = "git+https://github.com/matter-labs/vise.git?rev=dd05139b76ab0843443ab3ff730174942c825dae#dd05139b76ab0843443ab3ff730174942c825dae" dependencies = [ "proc-macro2", "quote", diff --git a/node/Cargo.toml b/node/Cargo.toml index 4ef28062..d560d722 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -58,8 +58,8 @@ time = "0.3.23" tokio = { version = "1.28.1", features = ["full"] } tracing = { version = "0.1.37", features = ["attributes"] } tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } -vise = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "8322ddc4bb115a7d11127626730b94f93b804cbe" } -vise-exporter = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "8322ddc4bb115a7d11127626730b94f93b804cbe" } +vise = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "dd05139b76ab0843443ab3ff730174942c825dae" } +vise-exporter = { version = "0.1.0", git = "https://github.com/matter-labs/vise.git", rev = "dd05139b76ab0843443ab3ff730174942c825dae" } # Note that "bench" profile inherits from "release" profile and # "test" profile inherits from "dev" profile. diff --git a/node/actors/consensus/src/lib.rs b/node/actors/consensus/src/lib.rs index 6d278e05..db5c02c5 100644 --- a/node/actors/consensus/src/lib.rs +++ b/node/actors/consensus/src/lib.rs @@ -20,8 +20,7 @@ use anyhow::Context as _; use concurrency::ctx; use inner::ConsensusInner; use roles::validator; -use std::sync::Arc; -use storage::ReplicaStateStore; +use storage::FallbackReplicaStateStore; use tracing::{info, instrument}; use utils::pipe::ActorPipe; @@ -54,7 +53,7 @@ impl Consensus { pipe: ActorPipe, secret_key: validator::SecretKey, validator_set: validator::ValidatorSet, - storage: Arc, + storage: FallbackReplicaStateStore, ) -> anyhow::Result { Ok(Consensus { inner: ConsensusInner { diff --git a/node/actors/consensus/src/replica/state_machine.rs b/node/actors/consensus/src/replica/state_machine.rs index 9f252d6e..ecc846ce 100644 --- a/node/actors/consensus/src/replica/state_machine.rs +++ b/node/actors/consensus/src/replica/state_machine.rs @@ -2,11 +2,8 @@ use crate::{metrics, ConsensusInner}; use anyhow::Context as _; use concurrency::{ctx, metrics::LatencyHistogramExt as _, scope, time}; use roles::validator; -use std::{ - collections::{BTreeMap, HashMap}, - sync::Arc, -}; -use storage::{ReplicaStateStore, StorageError}; +use std::collections::{BTreeMap, HashMap}; +use storage::{FallbackReplicaStateStore, StorageError}; use tracing::instrument; /// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible @@ -27,7 +24,7 @@ pub(crate) struct StateMachine { /// The deadline to receive an input message. pub(crate) timeout_deadline: time::Deadline, /// A reference to the storage module. We use it to backup the replica state. - pub(crate) storage: Arc, + pub(crate) storage: FallbackReplicaStateStore, } impl StateMachine { @@ -35,39 +32,25 @@ impl StateMachine { /// otherwise we initialize the state machine with whatever head block we have. pub(crate) async fn new( ctx: &ctx::Ctx, - storage: Arc, + storage: FallbackReplicaStateStore, ) -> anyhow::Result { - Ok(match storage.replica_state(ctx).await? { - Some(backup) => { - let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); - for p in backup.proposals { - block_proposal_cache - .entry(p.number) - .or_default() - .insert(p.payload.hash(), p.payload); - } - Self { - view: backup.view, - phase: backup.phase, - high_vote: backup.high_vote, - high_qc: backup.high_qc, - block_proposal_cache, - timeout_deadline: time::Deadline::Infinite, - storage, - } - } - None => { - let head = storage.head_block(ctx).await?; - Self { - view: head.justification.message.view, - phase: validator::Phase::Prepare, - high_vote: head.justification.message, - high_qc: head.justification, - block_proposal_cache: BTreeMap::new(), - timeout_deadline: time::Deadline::Infinite, - storage, - } - } + let backup = storage.replica_state(ctx).await?; + let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); + for proposal in backup.proposals { + block_proposal_cache + .entry(proposal.number) + .or_default() + .insert(proposal.payload.hash(), proposal.payload); + } + + Ok(Self { + view: backup.view, + phase: backup.phase, + high_vote: backup.high_vote, + high_qc: backup.high_qc, + block_proposal_cache, + timeout_deadline: time::Deadline::Infinite, + storage, }) } diff --git a/node/actors/consensus/src/testonly/make.rs b/node/actors/consensus/src/testonly/make.rs index 9af2b6d4..7215ec7a 100644 --- a/node/actors/consensus/src/testonly/make.rs +++ b/node/actors/consensus/src/testonly/make.rs @@ -7,7 +7,7 @@ use crate::{ use concurrency::ctx; use roles::validator; use std::sync::Arc; -use storage::RocksdbStorage; +use storage::{FallbackReplicaStateStore, RocksdbStorage}; use tempfile::tempdir; use utils::pipe::{self, DispatcherPipe}; @@ -33,7 +33,7 @@ pub async fn make_consensus( consensus_pipe, key.clone(), validator_set.clone(), - Arc::new(storage), + FallbackReplicaStateStore::from_store(Arc::new(storage)), ); let consensus = consensus .await diff --git a/node/actors/consensus/src/testonly/node.rs b/node/actors/consensus/src/testonly/node.rs index 9a871db3..6d93a7f8 100644 --- a/node/actors/consensus/src/testonly/node.rs +++ b/node/actors/consensus/src/testonly/node.rs @@ -43,7 +43,7 @@ impl Node { network_pipe: DispatcherPipe, metrics: channel::UnboundedSender, ) -> anyhow::Result<()> { - let key = self.net.state().cfg().consensus.key.public(); + let key = self.net.consensus_config().key.public(); let rng = &mut ctx.rng(); let mut net_recv = network_pipe.recv; let net_send = network_pipe.send; diff --git a/node/actors/consensus/src/testonly/run.rs b/node/actors/consensus/src/testonly/run.rs index 8735d0b6..6a1a1d93 100644 --- a/node/actors/consensus/src/testonly/run.rs +++ b/node/actors/consensus/src/testonly/run.rs @@ -7,7 +7,7 @@ use std::{ collections::{HashMap, HashSet}, sync::Arc, }; -use storage::RocksdbStorage; +use storage::{FallbackReplicaStateStore, RocksdbStorage}; use tracing::Instrument as _; use utils::pipe; @@ -44,8 +44,8 @@ impl Test { // Get only the honest replicas. let honest: HashSet<_> = nodes .iter() - .filter(|n| n.behavior == Behavior::Honest) - .map(|n| n.net.state().cfg().consensus.key.public()) + .filter(|node| node.behavior == Behavior::Honest) + .map(|node| node.net.consensus_config().key.public()) .collect(); assert!(!honest.is_empty()); @@ -83,7 +83,7 @@ async fn run_nodes( ) -> anyhow::Result<()> { let keys: Vec<_> = nodes .iter() - .map(|r| r.net.state().cfg().consensus.key.clone()) + .map(|node| node.net.consensus_config().key.clone()) .collect(); let (genesis_block, _) = testonly::make_genesis(&keys, validator::Payload(vec![])); let network_ready = signal::Once::new(); @@ -91,9 +91,10 @@ async fn run_nodes( let mut network_send = HashMap::new(); let mut network_recv = HashMap::new(); scope::run!(ctx, |ctx, s| async { - for (i, n) in nodes.iter().enumerate() { - let validator_key = n.net.state().cfg().consensus.key.clone(); - let validator_set = n.net.state().cfg().consensus.validators.clone(); + for (i, node) in nodes.iter().enumerate() { + let consensus_config = node.net.consensus_config(); + let validator_key = consensus_config.key.clone(); + let validator_set = node.net.to_config().validators; let (consensus_actor_pipe, consensus_pipe) = pipe::new(); let (network_actor_pipe, network_pipe) = pipe::new(); @@ -105,12 +106,12 @@ async fn run_nodes( RocksdbStorage::new(ctx, &genesis_block, &dir.path().join("storage")) .await .context("RocksdbStorage")?; - let storage = Arc::new(storage); + let storage = FallbackReplicaStateStore::from_store(Arc::new(storage)); let consensus = Consensus::new( ctx, consensus_actor_pipe, - n.net.state().cfg().consensus.key.clone(), + node.net.consensus_config().key.clone(), validator_set, storage, ) @@ -120,7 +121,7 @@ async fn run_nodes( scope::run!(ctx, |ctx, s| async { network_ready.recv(ctx).await?; s.spawn_blocking(|| consensus.run(ctx).context("consensus.run()")); - n.run_executor(ctx, consensus_pipe, network_pipe, metrics.clone()) + node.run_executor(ctx, consensus_pipe, network_pipe, metrics.clone()) .await .context("executor.run()") }) @@ -131,10 +132,10 @@ async fn run_nodes( } match network { Network::Real => { - for (i, n) in nodes.iter().enumerate() { - let state = n.net.state().clone(); + for (i, node) in nodes.iter().enumerate() { + let state = node.net.state().clone(); let pipe = network_pipes - .remove(&state.cfg().consensus.key.public()) + .remove(&node.net.consensus_config().key.public()) .unwrap(); s.spawn( async { diff --git a/node/actors/executor/Cargo.toml b/node/actors/executor/Cargo.toml index bcd3376a..5a8e9b4f 100644 --- a/node/actors/executor/Cargo.toml +++ b/node/actors/executor/Cargo.toml @@ -5,20 +5,11 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true -default-run = "executor" [dependencies] anyhow.workspace = true -hex.workspace = true -once_cell.workspace = true -rand.workspace = true -serde.workspace = true -serde_json.workspace = true -tokio.workspace = true tracing.workspace = true -tracing-subscriber.workspace = true vise.workspace = true -vise-exporter.workspace = true concurrency = { path = "../../libs/concurrency" } crypto = { path = "../../libs/crypto" } @@ -31,10 +22,7 @@ consensus = { path = "../consensus" } network = { path = "../network" } sync_blocks = { path = "../sync_blocks" } -[lib] -name = "executor" -path = "src/lib/lib.rs" - -[[bin]] -name = "executor" -path = "src/main.rs" +[dev-dependencies] +rand.workspace = true +tempfile.workspace = true +tokio.workspace = true diff --git a/node/actors/executor/src/lib/configurator/node_config.rs b/node/actors/executor/src/config/mod.rs similarity index 78% rename from node/actors/executor/src/lib/configurator/node_config.rs rename to node/actors/executor/src/config/mod.rs index e53010b7..178728ac 100644 --- a/node/actors/executor/src/lib/configurator/node_config.rs +++ b/node/actors/executor/src/config/mod.rs @@ -1,6 +1,8 @@ -//! Module that contains the definitions for the config files. +//! Module to create the configuration for the consensus node. + use anyhow::Context as _; -use crypto::{read_optional_text, read_required_text, Text, TextFmt}; +use crypto::{read_required_text, Text, TextFmt}; +use network::{consensus, gossip}; use roles::{node, validator}; use schema::{proto::executor::config as proto, read_required, required, ProtoFmt}; use std::{ @@ -8,6 +10,9 @@ use std::{ net, }; +#[cfg(test)] +mod tests; + /// Consensus network config. See `network::ConsensusConfig`. #[derive(Clone, Debug, PartialEq, Eq)] pub struct ConsensusConfig { @@ -16,34 +21,32 @@ pub struct ConsensusConfig { pub key: validator::PublicKey, /// Public TCP address that other validators are expected to connect to. /// It is announced over gossip network. - pub public_addr: std::net::SocketAddr, - /// Static specification of validators for Proof of Authority. Should be deprecated once we move - /// to Proof of Stake. - pub validators: validator::ValidatorSet, + pub public_addr: net::SocketAddr, +} + +impl From for ConsensusConfig { + fn from(config: consensus::Config) -> Self { + Self { + key: config.key.public(), + public_addr: config.public_addr, + } + } } impl ProtoFmt for ConsensusConfig { type Proto = proto::ConsensusConfig; + fn read(r: &Self::Proto) -> anyhow::Result { - let mut validators = vec![]; - for (i, v) in r.validators.iter().enumerate() { - validators.push( - Text::new(v) - .decode() - .with_context(|| format!("validators[{i}]"))?, - ); - } Ok(Self { key: read_required_text(&r.key).context("key")?, public_addr: read_required_text(&r.public_addr).context("public_addr")?, - validators: validator::ValidatorSet::new(validators).context("validators")?, }) } + fn build(&self) -> Self::Proto { Self::Proto { key: Some(self.key.encode()), public_addr: Some(self.public_addr.encode()), - validators: self.validators.iter().map(|v| v.encode()).collect(), } } } @@ -64,8 +67,20 @@ pub struct GossipConfig { pub static_outbound: HashMap, } +impl From for GossipConfig { + fn from(config: gossip::Config) -> Self { + Self { + key: config.key.public(), + dynamic_inbound_limit: config.dynamic_inbound_limit, + static_inbound: config.static_inbound, + static_outbound: config.static_outbound, + } + } +} + impl ProtoFmt for GossipConfig { type Proto = proto::GossipConfig; + fn read(r: &Self::Proto) -> anyhow::Result { let mut static_inbound = HashSet::new(); for (i, v) in r.static_inbound.iter().enumerate() { @@ -91,6 +106,7 @@ impl ProtoFmt for GossipConfig { static_outbound, }) } + fn build(&self) -> Self::Proto { Self::Proto { key: Some(self.key.encode()), @@ -108,45 +124,47 @@ impl ProtoFmt for GossipConfig { } } -/// Config of the node. +/// Config of the node executor. #[derive(Clone, Debug, PartialEq, Eq)] -pub struct NodeConfig { +pub struct ExecutorConfig { /// IP:port to listen on, for incoming TCP connections. /// Use `0.0.0.0:` to listen on all network interfaces (i.e. on all IPs exposed by this VM). pub server_addr: net::SocketAddr, - /// IP:port to serve metrics data for scraping. - /// Use `0.0.0.0:` to listen on all network interfaces. - pub metrics_server_addr: Option, - - /// Consensus network config. - // NOTE: we currently only support validator nodes, but eventually it will be optional. - pub consensus: ConsensusConfig, /// Gossip network config. pub gossip: GossipConfig, - /// Specifies the genesis block of the blockchain. pub genesis_block: validator::FinalBlock, + /// Static specification of validators for Proof of Authority. Should be deprecated once we move + /// to Proof of Stake. + pub validators: validator::ValidatorSet, } -impl ProtoFmt for NodeConfig { - type Proto = proto::NodeConfig; +impl ProtoFmt for ExecutorConfig { + type Proto = proto::ExecutorConfig; + fn read(r: &Self::Proto) -> anyhow::Result { + let validators = r.validators.iter().enumerate().map(|(i, v)| { + Text::new(v) + .decode() + .with_context(|| format!("validators[{i}]")) + }); + let validators: anyhow::Result> = validators.collect(); + let validators = validator::ValidatorSet::new(validators?).context("validators")?; + Ok(Self { server_addr: read_required_text(&r.server_addr).context("server_addr")?, - metrics_server_addr: read_optional_text(&r.metrics_server_addr) - .context("metrics_server_addr")?, - consensus: read_required(&r.consensus).context("consensus")?, gossip: read_required(&r.gossip).context("gossip")?, genesis_block: read_required_text(&r.genesis_block).context("genesis_block")?, + validators, }) } + fn build(&self) -> Self::Proto { Self::Proto { server_addr: Some(TextFmt::encode(&self.server_addr)), - metrics_server_addr: self.metrics_server_addr.as_ref().map(TextFmt::encode), - consensus: Some(self.consensus.build()), gossip: Some(self.gossip.build()), genesis_block: Some(TextFmt::encode(&self.genesis_block)), + validators: self.validators.iter().map(|v| v.encode()).collect(), } } } diff --git a/node/actors/executor/src/lib/configurator/tests.rs b/node/actors/executor/src/config/tests.rs similarity index 81% rename from node/actors/executor/src/lib/configurator/tests.rs rename to node/actors/executor/src/config/tests.rs index 41a106aa..d296ea8a 100644 --- a/node/actors/executor/src/lib/configurator/tests.rs +++ b/node/actors/executor/src/config/tests.rs @@ -1,4 +1,4 @@ -use super::node_config::{ConsensusConfig, GossipConfig, NodeConfig}; +use super::{ConsensusConfig, ExecutorConfig, GossipConfig}; use concurrency::ctx; use rand::{ distributions::{Distribution, Standard}, @@ -16,7 +16,6 @@ impl Distribution for Standard { ConsensusConfig { key: rng.gen::().public(), public_addr: make_addr(rng), - validators: rng.gen(), } } } @@ -36,14 +35,13 @@ impl Distribution for Standard { } } -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> NodeConfig { - NodeConfig { +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> ExecutorConfig { + ExecutorConfig { server_addr: make_addr(rng), - metrics_server_addr: Some(make_addr(rng)), - consensus: rng.gen(), gossip: rng.gen(), genesis_block: rng.gen(), + validators: rng.gen(), } } } @@ -54,5 +52,5 @@ fn test_schema_encoding() { let rng = &mut ctx.rng(); test_encode_random::<_, ConsensusConfig>(rng); test_encode_random::<_, GossipConfig>(rng); - test_encode_random::<_, NodeConfig>(rng); + test_encode_random::<_, ExecutorConfig>(rng); } diff --git a/node/actors/executor/src/lib/io.rs b/node/actors/executor/src/io.rs similarity index 80% rename from node/actors/executor/src/lib/io.rs rename to node/actors/executor/src/io.rs index c0cc6bc5..b659fec9 100644 --- a/node/actors/executor/src/lib/io.rs +++ b/node/actors/executor/src/io.rs @@ -9,6 +9,7 @@ use consensus::io::{ InputMessage as ConsensusInputMessage, OutputMessage as ConsensusOutputMessage, }; use network::io::{InputMessage as NetworkInputMessage, OutputMessage as NetworkOutputMessage}; +use roles::validator::FinalBlock; use sync_blocks::io::{ InputMessage as SyncBlocksInputMessage, OutputMessage as SyncBlocksOutputMessage, }; @@ -18,47 +19,65 @@ use utils::pipe::DispatcherPipe; /// The IO dispatcher, it is the main struct to handle actor messages. It simply contains a sender and a receiver for /// a pair of channels for each actor. This of course allows us to send and receive messages to and from each actor. #[derive(Debug)] -pub struct Dispatcher { +pub(super) struct Dispatcher { consensus_input: channel::UnboundedSender, consensus_output: channel::UnboundedReceiver, sync_blocks_input: channel::UnboundedSender, + sync_blocks_output: channel::UnboundedReceiver, network_input: channel::UnboundedSender, network_output: channel::UnboundedReceiver, + blocks_sender: channel::UnboundedSender, } impl Dispatcher { /// Creates a new IO Dispatcher. - pub fn new( + pub(super) fn new( consensus_pipe: DispatcherPipe, sync_blocks_pipe: DispatcherPipe, network_pipe: DispatcherPipe, + blocks_sender: channel::UnboundedSender, ) -> Self { Dispatcher { consensus_input: consensus_pipe.send, consensus_output: consensus_pipe.recv, sync_blocks_input: sync_blocks_pipe.send, + sync_blocks_output: sync_blocks_pipe.recv, network_input: network_pipe.send, network_output: network_pipe.recv, + blocks_sender, } } /// Method to start the IO dispatcher. It is simply a loop to receive messages from the actors and then forward them. #[instrument(level = "trace", ret)] - pub fn run(&mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub(super) fn run(&mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { scope::run_blocking!(ctx, |ctx, s| { // Start a task to handle the messages from the consensus actor. s.spawn(async { while let Ok(msg) = self.consensus_output.recv(ctx).await { match msg { ConsensusOutputMessage::Network(message) => { - self.network_input.send(message.into()) + self.network_input.send(message.into()); } - ConsensusOutputMessage::FinalizedBlock(b) => { + ConsensusOutputMessage::FinalizedBlock(block) => { let number_metric = &metrics::METRICS.finalized_block_number; let current_number = number_metric.get(); - number_metric.set(current_number.max(b.header.number.0)); + number_metric.set(current_number.max(block.header.number.0)); // This works because this is the only place where `finalized_block_number` // is modified, and there should be a single running `Dispatcher`. + + self.blocks_sender.send(block); + } + } + } + Ok(()) + }); + + s.spawn(async { + while let Ok(msg) = self.sync_blocks_output.recv(ctx).await { + match msg { + SyncBlocksOutputMessage::Network(message) => { + self.network_input.send(message.into()); } } } diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs new file mode 100644 index 00000000..93e987de --- /dev/null +++ b/node/actors/executor/src/lib.rs @@ -0,0 +1,215 @@ +//! Library files for the executor. We have it separate from the binary so that we can use these files in the tools crate. + +use crate::io::Dispatcher; +use anyhow::Context as _; +use concurrency::{ctx, ctx::channel, net, scope}; +use consensus::Consensus; +use roles::{node, validator, validator::FinalBlock}; +use std::{mem, sync::Arc}; +use storage::{FallbackReplicaStateStore, ReplicaStateStore, WriteBlockStore}; +use sync_blocks::SyncBlocks; +use utils::pipe; + +mod config; +mod io; +mod metrics; +#[cfg(test)] +mod tests; + +pub use self::config::{ConsensusConfig, ExecutorConfig, GossipConfig}; + +/// Validator-related part of [`Executor`]. +#[derive(Debug)] +struct ValidatorExecutor { + /// Consensus network configuration. + config: ConsensusConfig, + /// Validator key. + key: validator::SecretKey, + /// Store for replica state. + replica_state_store: Arc, + /// Sender of blocks finalized by the consensus algorithm. + blocks_sender: channel::UnboundedSender, +} + +impl ValidatorExecutor { + /// Returns consensus network configuration. + fn consensus_config(&self) -> network::consensus::Config { + network::consensus::Config { + // Consistency of the validator key has been verified in constructor. + key: self.key.clone(), + public_addr: self.config.public_addr, + } + } +} + +/// Executor allowing to spin up all actors necessary for a consensus node. +#[derive(Debug)] +pub struct Executor { + /// General-purpose executor configuration. + executor_config: ExecutorConfig, + /// Secret key of the node. + node_key: node::SecretKey, + /// Block and replica state storage used by the node. + storage: Arc, + /// Validator-specific node data. + validator: Option, +} + +impl Executor { + /// Creates a new executor with the specified parameters. + pub fn new( + node_config: ExecutorConfig, + node_key: node::SecretKey, + storage: Arc, + ) -> anyhow::Result { + anyhow::ensure!( + node_config.gossip.key == node_key.public(), + "config.gossip.key = {:?} doesn't match the secret key {:?}", + node_config.gossip.key, + node_key + ); + + Ok(Self { + executor_config: node_config, + node_key, + storage, + validator: None, + }) + } + + /// Sets validator-related data for the executor. + pub fn set_validator( + &mut self, + config: ConsensusConfig, + key: validator::SecretKey, + replica_state_store: Arc, + blocks_sender: channel::UnboundedSender, + ) -> anyhow::Result<()> { + let public = &config.key; + anyhow::ensure!( + *public == key.public(), + "config.consensus.key = {public:?} doesn't match the secret key {key:?}" + ); + + // TODO: this logic must be refactored once dynamic validator sets are implemented + let is_validator = self + .executor_config + .validators + .iter() + .any(|validator_key| validator_key == public); + if is_validator { + self.validator = Some(ValidatorExecutor { + config, + key, + replica_state_store, + blocks_sender, + }); + } else { + tracing::info!( + "Key {public:?} is not a validator per validator set {:?}; the executor will not \ + run consensus", + self.executor_config.validators + ); + } + Ok(()) + } + + /// Returns gossip network configuration. + fn gossip_config(&self) -> network::gossip::Config { + let gossip = &self.executor_config.gossip; + network::gossip::Config { + key: self.node_key.clone(), + dynamic_inbound_limit: gossip.dynamic_inbound_limit, + static_inbound: gossip.static_inbound.clone(), + static_outbound: gossip.static_outbound.clone(), + enable_pings: true, + } + } + + /// Extracts a network crate config. + fn network_config(&self) -> network::Config { + network::Config { + server_addr: net::tcp::ListenerAddr::new(self.executor_config.server_addr), + validators: self.executor_config.validators.clone(), + gossip: self.gossip_config(), + consensus: self + .validator + .as_ref() + .map(ValidatorExecutor::consensus_config), + } + } + + /// Runs this executor to completion. This should be spawned on a separate task. + pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + let network_config = self.network_config(); + + // Generate the communication pipes. We have one for each actor. + let (consensus_actor_pipe, consensus_dispatcher_pipe) = pipe::new(); + let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new(); + let (network_actor_pipe, network_dispatcher_pipe) = pipe::new(); + let blocks_sender = if let Some(validator) = &mut self.validator { + mem::replace(&mut validator.blocks_sender, channel::unbounded().0) + } else { + channel::unbounded().0 + }; + // Create the IO dispatcher. + let mut dispatcher = Dispatcher::new( + consensus_dispatcher_pipe, + sync_blocks_dispatcher_pipe, + network_dispatcher_pipe, + blocks_sender, + ); + + // Create each of the actors. + let validator_set = &self.executor_config.validators; + let consensus = if let Some(validator) = self.validator { + let consensus_storage = + FallbackReplicaStateStore::new(validator.replica_state_store, self.storage.clone()); + let consensus = Consensus::new( + ctx, + consensus_actor_pipe, + validator.key.clone(), + validator_set.clone(), + consensus_storage, + ) + .await + .context("consensus")?; + Some(consensus) + } else { + None + }; + + let sync_blocks_config = sync_blocks::Config::new( + validator_set.clone(), + consensus::misc::consensus_threshold(validator_set.len()), + )?; + let sync_blocks = SyncBlocks::new( + ctx, + sync_blocks_actor_pipe, + self.storage, + sync_blocks_config, + ) + .await + .context("sync_blocks")?; + + let sync_blocks_subscriber = sync_blocks.subscribe_to_state_updates(); + + tracing::debug!("Starting actors in separate threads."); + scope::run!(ctx, |ctx, s| async { + s.spawn_blocking(|| dispatcher.run(ctx).context("IO Dispatcher stopped")); + s.spawn(async { + let state = network::State::new(network_config, None, Some(sync_blocks_subscriber)) + .context("Invalid network config")?; + state.register_metrics(); + network::run_network(ctx, state, network_actor_pipe) + .await + .context("Network stopped") + }); + if let Some(consensus) = consensus { + s.spawn_blocking(|| consensus.run(ctx).context("Consensus stopped")); + } + sync_blocks.run(ctx).await.context("Syncing blocks stopped") + }) + .await + } +} diff --git a/node/actors/executor/src/lib/configurator/config_paths.rs b/node/actors/executor/src/lib/configurator/config_paths.rs deleted file mode 100644 index 90798764..00000000 --- a/node/actors/executor/src/lib/configurator/config_paths.rs +++ /dev/null @@ -1,106 +0,0 @@ -use crate::configurator::Configs; -use anyhow::Context as _; -use crypto::Text; -use tracing::{debug, instrument}; - -/// This struct holds the file path to each of the config files. -#[derive(Debug)] -pub(crate) struct ConfigPaths { - config: String, - validator_key: String, - node_key: String, -} - -impl ConfigPaths { - /// This function gets the paths for the config file and the key files. - /// First, we try to get the path from the command line arguments. If that fails, we try to get - /// it as an environment variable. If that also fails, we just use a default value. - #[instrument(level = "trace")] - pub(crate) fn resolve(args: &[String]) -> Self { - Self { - config: PathSpec { - name: "Config file", - flag: "--config-file", - env: "CONFIG_FILE", - default: "config.json", - } - .resolve(args), - validator_key: PathSpec { - name: "Validator key", - flag: "--validator-key", - env: "VALIDATOR_KEY", - default: "validator_key", - } - .resolve(args), - node_key: PathSpec { - name: "Node key", - flag: "--node-key", - env: "NODE_KEY", - default: "node_key", - } - .resolve(args), - } - } - - /// This function parses the config files from the paths given as command line arguments. - #[instrument(level = "trace", ret)] - pub(crate) fn read(self) -> anyhow::Result { - let cfg = Configs { - config: schema::decode_json( - &std::fs::read_to_string(&self.config).context(self.config)?, - )?, - validator_key: Text::new( - &std::fs::read_to_string(&self.validator_key).context(self.validator_key)?, - ) - .decode()?, - node_key: Text::new(&std::fs::read_to_string(&self.node_key).context(self.node_key)?) - .decode()?, - }; - if cfg.config.gossip.key != cfg.node_key.public() { - anyhow::bail!( - "config.gossip.key = {:?} doesn't match the secret key {:?}", - cfg.config.gossip.key, - cfg.node_key - ); - } - let public = cfg.config.consensus.key.clone(); - let secret = cfg.validator_key.clone(); - if public != secret.public() { - anyhow::bail!( - "config.consensus.key = {public:?} doesn't match the secret key {secret:?}" - ); - } - Ok(cfg) - } -} - -#[derive(Debug)] -struct PathSpec<'a> { - name: &'a str, - flag: &'a str, - env: &'a str, - default: &'a str, -} - -impl<'a> PathSpec<'a> { - #[instrument(level = "trace", ret)] - fn resolve(&self, args: &[String]) -> String { - if let Some(path) = find_flag(args, self.flag) { - debug!("{} path found in command line arguments.", self.name); - return path.clone(); - } - - if let Ok(path) = std::env::var(self.env) { - debug!("{} path found in environment variable.", self.name); - return path; - } - - debug!("Using default {} path.", self.name); - self.default.to_string() - } -} - -fn find_flag<'a>(args: &'a [String], flag: &'a str) -> Option<&'a String> { - let pos = args.iter().position(|x| x == flag)?; - args.get(pos + 1) -} diff --git a/node/actors/executor/src/lib/configurator/mod.rs b/node/actors/executor/src/lib/configurator/mod.rs deleted file mode 100644 index 52f00360..00000000 --- a/node/actors/executor/src/lib/configurator/mod.rs +++ /dev/null @@ -1,61 +0,0 @@ -//! Module to create the configuration for the consensus node. -use concurrency::net; -use roles::{node, validator}; -use tracing::instrument; - -mod config_paths; -pub mod node_config; - -#[cfg(test)] -mod tests; - -/// Main struct that holds the config options for the node. -#[derive(Debug)] -pub struct Configs { - /// This config file describes the environment for the node. - pub config: node_config::NodeConfig, - /// The validator secret key for this node. - // NOTE: we currently only support validator nodes, but eventually it will be optional. - pub validator_key: validator::SecretKey, - /// The node secret key. This key is used by both full nodes and validators to identify themselves - /// in the P2P network. - pub node_key: node::SecretKey, -} - -impl Configs { - /// Method to fetch the node config. - #[instrument(level = "trace", ret)] - pub fn read(args: &[String]) -> anyhow::Result { - config_paths::ConfigPaths::resolve(args).read() - } - - fn gossip_config(&self) -> network::gossip::Config { - let gossip = &self.config.gossip; - network::gossip::Config { - key: self.node_key.clone(), - dynamic_inbound_limit: gossip.dynamic_inbound_limit, - static_inbound: gossip.static_inbound.clone(), - static_outbound: gossip.static_outbound.clone(), - enable_pings: true, - } - } - - fn consensus_config(&self) -> network::consensus::Config { - let consensus = &self.config.consensus; - network::consensus::Config { - // Consistency of the validator key has been verified in constructor. - key: self.validator_key.clone(), - public_addr: consensus.public_addr, - validators: consensus.validators.clone(), - } - } - - /// Extracts a network crate config. - pub fn network_config(&self) -> network::Config { - network::Config { - server_addr: net::tcp::ListenerAddr::new(self.config.server_addr), - gossip: self.gossip_config(), - consensus: self.consensus_config(), - } - } -} diff --git a/node/actors/executor/src/lib/lib.rs b/node/actors/executor/src/lib/lib.rs deleted file mode 100644 index 8177f880..00000000 --- a/node/actors/executor/src/lib/lib.rs +++ /dev/null @@ -1,5 +0,0 @@ -//! Library files for the executor. We have it separate from the binary so that we can use these files in the tools crate. - -pub mod configurator; -pub mod io; -mod metrics; diff --git a/node/actors/executor/src/lib/metrics.rs b/node/actors/executor/src/metrics.rs similarity index 100% rename from node/actors/executor/src/lib/metrics.rs rename to node/actors/executor/src/metrics.rs diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs new file mode 100644 index 00000000..c85750e3 --- /dev/null +++ b/node/actors/executor/src/tests.rs @@ -0,0 +1,184 @@ +//! High-level tests for `Executor`. + +use super::*; +use concurrency::sync; +use consensus::testonly::make_genesis; +use network::testonly::Instance; +use rand::Rng; +use roles::validator::{BlockNumber, Payload}; +use std::collections::HashMap; +use storage::{BlockStore, RocksdbStorage, StorageError}; + +async fn run_executor(ctx: &ctx::Ctx, executor: Executor) -> anyhow::Result<()> { + executor.run(ctx).await.or_else(|err| { + if err.root_cause().is::() { + Ok(()) // Test has successfully finished + } else { + Err(err) + } + }) +} + +async fn store_final_blocks( + ctx: &ctx::Ctx, + mut blocks_receiver: channel::UnboundedReceiver, + storage: Arc, +) -> anyhow::Result<()> { + while let Ok(block) = blocks_receiver.recv(ctx).await { + tracing::trace!(number = %block.header.number, "Finalized new block"); + if let Err(err) = storage.put_block(ctx, &block).await { + if matches!(err, StorageError::Canceled(_)) { + break; + } else { + return Err(err.into()); + } + } + } + Ok(()) +} + +#[derive(Debug)] +struct FullValidatorConfig { + node_config: ExecutorConfig, + node_key: node::SecretKey, + consensus_config: ConsensusConfig, + validator_key: validator::SecretKey, +} + +impl FullValidatorConfig { + fn for_single_validator(rng: &mut impl Rng) -> Self { + let mut net_configs = Instance::new_configs(rng, 1, 0); + assert_eq!(net_configs.len(), 1); + let net_config = net_configs.pop().unwrap(); + let consensus_config = net_config.consensus.unwrap(); + let validator_key = consensus_config.key.clone(); + let consensus_config = ConsensusConfig::from(consensus_config); + + let (genesis_block, validators) = make_genesis(&[validator_key.clone()], Payload(vec![])); + let node_key = net_config.gossip.key.clone(); + let node_config = ExecutorConfig { + server_addr: *net_config.server_addr, + gossip: net_config.gossip.into(), + genesis_block, + validators, + }; + + Self { + node_config, + node_key, + consensus_config, + validator_key, + } + } + + fn into_executor( + self, + storage: Arc, + ) -> ( + Executor, + channel::UnboundedReceiver, + ) { + let (blocks_sender, blocks_receiver) = channel::unbounded(); + let mut executor = Executor::new(self.node_config, self.node_key, storage.clone()).unwrap(); + executor + .set_validator( + self.consensus_config, + self.validator_key, + storage, + blocks_sender, + ) + .unwrap(); + (executor, blocks_receiver) + } +} + +#[tokio::test] +async fn executing_single_validator() { + concurrency::testonly::abort_on_panic(); + let ctx = &ctx::root(); + let rng = &mut ctx.rng(); + + let validator = FullValidatorConfig::for_single_validator(rng); + let genesis_block = &validator.node_config.genesis_block; + let temp_dir = tempfile::tempdir().unwrap(); + let storage = RocksdbStorage::new(ctx, genesis_block, temp_dir.path()); + let storage = Arc::new(storage.await.unwrap()); + let (executor, mut blocks_receiver) = validator.into_executor(storage); + + scope::run!(ctx, |ctx, s| async { + s.spawn_bg(run_executor(ctx, executor)); + + let mut expected_block_number = BlockNumber(1); + while expected_block_number < BlockNumber(5) { + let final_block = blocks_receiver.recv(ctx).await?; + tracing::trace!(number = %final_block.header.number, "Finalized new block"); + assert_eq!(final_block.header.number, expected_block_number); + expected_block_number = expected_block_number.next(); + } + anyhow::Ok(()) + }) + .await + .unwrap(); +} + +#[tokio::test] +async fn executing_validator_and_external_node() { + concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); + let rng = &mut ctx.rng(); + + let mut validator = FullValidatorConfig::for_single_validator(rng); + + let external_node_key: node::SecretKey = rng.gen(); + let external_node_addr = net::tcp::testonly::reserve_listener(); + let external_node_config = ExecutorConfig { + server_addr: *external_node_addr, + gossip: GossipConfig { + key: external_node_key.public(), + static_outbound: HashMap::from([( + validator.node_key.public(), + validator.node_config.server_addr, + )]), + ..validator.node_config.gossip.clone() + }, + ..validator.node_config.clone() + }; + + validator + .node_config + .gossip + .static_inbound + .insert(external_node_key.public()); + + let genesis_block = &validator.node_config.genesis_block; + let temp_dir = tempfile::tempdir().unwrap(); + let validator_storage = + RocksdbStorage::new(ctx, genesis_block, &temp_dir.path().join("validator")).await; + let validator_storage = Arc::new(validator_storage.unwrap()); + let external_node_storage = + RocksdbStorage::new(ctx, genesis_block, &temp_dir.path().join("en")).await; + let external_node_storage = Arc::new(external_node_storage.unwrap()); + let mut en_subscriber = external_node_storage.subscribe_to_block_writes(); + + let (validator, blocks_receiver) = validator.into_executor(validator_storage.clone()); + let external_node = Executor::new( + external_node_config, + external_node_key, + external_node_storage.clone(), + ) + .unwrap(); + + scope::run!(ctx, |ctx, s| async { + s.spawn_bg(run_executor(ctx, validator)); + s.spawn_bg(run_executor(ctx, external_node)); + s.spawn_bg(store_final_blocks(ctx, blocks_receiver, validator_storage)); + + for _ in 0..5 { + let number = *sync::changed(ctx, &mut en_subscriber).await?; + tracing::trace!(%number, "External node received block"); + } + anyhow::Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/network/src/consensus/runner.rs b/node/actors/network/src/consensus/runner.rs index 0732b235..d6098fe9 100644 --- a/node/actors/network/src/consensus/runner.rs +++ b/node/actors/network/src/consensus/runner.rs @@ -1,4 +1,5 @@ //! run_client routine maintaining outbound connections to validators. + use super::handshake; use crate::{io, noise, preface, rpc, State}; use anyhow::Context as _; @@ -39,12 +40,12 @@ impl rpc::Handler for channel::UnboundedSender, mut stream: noise::Stream, ) -> anyhow::Result<()> { - let peer = handshake::inbound(ctx, &state.cfg.consensus.key, &mut stream).await?; - state.consensus.inbound.insert(peer.clone()).await?; + let peer = handshake::inbound(ctx, &state.cfg.key, &mut stream).await?; + state.inbound.insert(peer.clone()).await?; let ping_client = rpc::Client::::new(ctx); let res = scope::run!(ctx, |ctx, s| async { s.spawn(ping_client.ping_loop(ctx, PING_TIMEOUT)); @@ -57,20 +58,20 @@ pub(crate) async fn run_inbound_stream( Ok(()) }) .await; - state.consensus.inbound.remove(&peer).await; + state.inbound.remove(&peer).await; res } async fn run_outbound_stream( ctx: &ctx::Ctx, - state: &State, + state: &super::State, client: &rpc::Client, peer: &validator::PublicKey, addr: std::net::SocketAddr, ) -> anyhow::Result<()> { let mut stream = preface::connect(ctx, addr, preface::Endpoint::ConsensusNet).await?; - handshake::outbound(ctx, &state.cfg.consensus.key, &mut stream, peer).await?; - state.consensus.outbound.insert(peer.clone()).await?; + handshake::outbound(ctx, &state.cfg.key, &mut stream, peer).await?; + state.outbound.insert(peer.clone()).await?; let ping_client = rpc::Client::::new(ctx); let res = scope::run!(ctx, |ctx, s| async { s.spawn(ping_client.ping_loop(ctx, PING_TIMEOUT)); @@ -83,19 +84,19 @@ async fn run_outbound_stream( Ok(()) }) .await; - state.consensus.outbound.remove(peer).await; + state.outbound.remove(peer).await; res } /// Runs an Rpc client trying to maintain 1 outbound connection per validator. pub(crate) async fn run_client( ctx: &ctx::Ctx, - state: &State, + state: &super::State, + shared_state: &State, mut receiver: channel::UnboundedReceiver, ) -> anyhow::Result<()> { - let clients: HashMap<_, _> = state + let clients: HashMap<_, _> = shared_state .cfg - .consensus .validators .iter() .map(|peer| (peer.clone(), rpc::Client::::new(ctx))) @@ -105,7 +106,7 @@ pub(crate) async fn run_client( for (peer, client) in &clients { s.spawn::<()>(async { let client = &*client; - let addrs = &mut state.gossip.validator_addrs.subscribe(); + let addrs = &mut shared_state.gossip.validator_addrs.subscribe(); let mut addr = None; while ctx.is_active() { if let Ok(new) = diff --git a/node/actors/network/src/consensus/state.rs b/node/actors/network/src/consensus/state.rs index d9b7d936..07c2c65b 100644 --- a/node/actors/network/src/consensus/state.rs +++ b/node/actors/network/src/consensus/state.rs @@ -1,9 +1,9 @@ use crate::pool::PoolWatch; -use roles::validator; +use roles::{validator, validator::ValidatorSet}; use std::collections::HashSet; /// Configuration of the consensus network. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Config { /// Private key of the validator. Currently only validator nodes /// are supported, but eventually it will become optional. @@ -12,15 +12,12 @@ pub struct Config { /// Public TCP address that other validators are expected to connect to. /// It is announced over gossip network. pub public_addr: std::net::SocketAddr, - - /// Validators which - /// * client should establish outbound connections to. - /// * server should accept inbound connections from (1 per validator). - pub validators: validator::ValidatorSet, } /// Consensus network state. pub(crate) struct State { + /// Consensus configuration. + pub(crate) cfg: Config, /// Set of the currently open inbound connections. pub(crate) inbound: PoolWatch, /// Set of the currently open outbound connections. @@ -29,11 +26,20 @@ pub(crate) struct State { impl State { /// Constructs a new State. - pub(crate) fn new(cfg: &Config) -> Self { - let validators: HashSet<_> = cfg.validators.iter().cloned().collect(); - Self { + pub(crate) fn new(cfg: Config, validators: &ValidatorSet) -> anyhow::Result { + let validators: HashSet<_> = validators.iter().cloned().collect(); + let current_validator_key = cfg.key.public(); + anyhow::ensure!( + validators.contains(¤t_validator_key), + "Validators' public keys {validators:?} do not contain the current validator \ + {current_validator_key:?}; this is not yet supported" + ); + // ^ This check will be relaxed once we support dynamic validator membership + + Ok(Self { + cfg, inbound: PoolWatch::new(validators.clone(), 0), outbound: PoolWatch::new(validators, 0), - } + }) } } diff --git a/node/actors/network/src/consensus/tests.rs b/node/actors/network/src/consensus/tests.rs index da3b3069..faf83064 100644 --- a/node/actors/network/src/consensus/tests.rs +++ b/node/actors/network/src/consensus/tests.rs @@ -23,28 +23,45 @@ async fn test_one_connection_per_validator() { ctx, n.state.clone(), network_pipe - ).instrument(tracing::info_span!("node",i))); + ).instrument(tracing::info_span!("node", i))); } tracing::info!("waiting for all gossip to be established"); - for n in &mut nodes { - n.wait_for_gossip_connections().await; + for node in &mut nodes { + node.wait_for_gossip_connections().await; } tracing::info!("waiting for all connections to be established"); - for n in &mut nodes { - n.wait_for_consensus_connections().await; + for node in &mut nodes { + node.wait_for_consensus_connections().await; } tracing::info!("Impersonate node 1, and try to establish additional connection to node 0. It should close automatically after the handshake."); - let mut stream = preface::connect(ctx,*nodes[0].state.cfg.server_addr,preface::Endpoint::ConsensusNet).await?; - handshake::outbound(ctx, &nodes[1].state.cfg.consensus.key, &mut stream, &nodes[0].state.cfg.consensus.key.public()).await?; + let mut stream = preface::connect( + ctx, + *nodes[0].state.cfg.server_addr, + preface::Endpoint::ConsensusNet, + ) + .await?; + + handshake::outbound( + ctx, + &nodes[1].consensus_config().key, + &mut stream, + &nodes[0].consensus_config().key.public(), + ) + .await?; // The connection is expected to be closed automatically by node 0. // The multiplexer runner should exit gracefully. let _ = rpc::Service::new().run(ctx, stream).await; - tracing::info!("Exiting the main task. Context will get canceled, all the nodes are expected to terminate gracefully."); + tracing::info!( + "Exiting the main task. Context will get canceled, all the nodes are expected \ + to terminate gracefully" + ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } #[tokio::test(flavor = "multi_thread")] @@ -77,15 +94,17 @@ async fn test_address_change() { .context("run1")?; // All nodes should lose connection to node[0]. - let key0 = nodes[0].state.cfg.consensus.key.public(); - for n in &nodes { - let c = &n.state.consensus; - c.inbound + let key0 = nodes[0].consensus_config().key.public(); + for node in &nodes { + let consensus_state = node.state.consensus.as_ref().unwrap(); + consensus_state + .inbound .subscribe() .wait_for(|got| !got.current().contains(&key0)) .await .unwrap(); - c.outbound + consensus_state + .outbound .subscribe() .wait_for(|got| !got.current().contains(&key0)) .await @@ -97,9 +116,9 @@ async fn test_address_change() { // node[0] is expected to connect to its gossip peers. // Then it should broadcast its new address and the consensus network // should get reconstructed. - let mut cfg = nodes[0].state.cfg.clone(); + let mut cfg = nodes[0].to_config(); cfg.server_addr = net::tcp::testonly::reserve_listener(); - cfg.consensus.public_addr = *cfg.server_addr; + cfg.consensus.as_mut().unwrap().public_addr = *cfg.server_addr; nodes[0] = testonly::Instance::from_cfg(cfg); scope::run!(ctx, |ctx, s| async { @@ -149,7 +168,7 @@ async fn test_transmission() { let want: validator::Signed = rng.gen(); let in_message = io::ConsensusInputMessage { message: want.clone(), - recipient: io::Target::Validator(nodes[1].state.cfg.consensus.key.public()), + recipient: io::Target::Validator(nodes[1].consensus_config().key.public()), }; pipes[0].send(in_message.into()); diff --git a/node/actors/network/src/gossip/runner.rs b/node/actors/network/src/gossip/runner.rs index fe40da4b..dcc79baf 100644 --- a/node/actors/network/src/gossip/runner.rs +++ b/node/actors/network/src/gossip/runner.rs @@ -1,5 +1,6 @@ use super::{handshake, ValidatorAddrs}; use crate::{ + consensus, event::{Event, StreamEvent}, io, noise, preface, rpc, State, }; @@ -152,7 +153,7 @@ async fn run_stream( let sync_blocks_server = SyncBlocksServer { peer, sender }; let sync_state_client = rpc::Client::::new(ctx); - let enable_pings = state.cfg.gossip.enable_pings; + let enable_pings = state.gossip.cfg.enable_pings; scope::run!(ctx, |ctx, s| async { s.spawn(async { @@ -194,7 +195,7 @@ async fn run_stream( state .gossip .validator_addrs - .update(&state.cfg.consensus, &resp.0[..]) + .update(&state.cfg.validators, &resp.0[..]) .await?; state.event(Event::ValidatorAddrsUpdated); } @@ -240,7 +241,7 @@ async fn handle_clients_and_run_stream( level = "trace", skip_all, err, - fields(my_key = ?state.cfg().gossip.key.public(), peer), + fields(my_key = ?state.gossip.cfg.key.public(), peer), )] pub(crate) async fn run_inbound_stream( ctx: &ctx::Ctx, @@ -248,7 +249,7 @@ pub(crate) async fn run_inbound_stream( sender: &channel::UnboundedSender, mut stream: noise::Stream, ) -> anyhow::Result<()> { - let peer = handshake::inbound(ctx, &state.cfg.gossip, &mut stream).await?; + let peer = handshake::inbound(ctx, &state.gossip.cfg, &mut stream).await?; tracing::Span::current().record("peer", tracing::field::debug(&peer)); state.gossip.inbound.insert(peer.clone()).await?; @@ -266,7 +267,7 @@ pub(crate) async fn run_inbound_stream( level = "trace", skip(ctx, state, sender), err, - fields(my_key = ?state.cfg().gossip.key.public()) + fields(my_key = ?state.gossip.cfg.key.public()) )] async fn run_outbound_stream( ctx: &ctx::Ctx, @@ -276,7 +277,7 @@ async fn run_outbound_stream( addr: std::net::SocketAddr, ) -> anyhow::Result<()> { let mut stream = preface::connect(ctx, addr, preface::Endpoint::GossipNet).await?; - handshake::outbound(ctx, &state.cfg.gossip, &mut stream, peer).await?; + handshake::outbound(ctx, &state.gossip.cfg, &mut stream, peer).await?; state.gossip.outbound.insert(peer.clone()).await?; state.event(Event::Gossip(StreamEvent::OutboundOpened(peer.clone()))); @@ -289,9 +290,13 @@ async fn run_outbound_stream( res } -async fn run_address_announcer(ctx: &ctx::Ctx, state: &State) -> ctx::OrCanceled<()> { - let key = &state.cfg.consensus.key; - let my_addr = state.cfg.consensus.public_addr; +async fn run_address_announcer( + ctx: &ctx::Ctx, + state: &State, + consensus_state: &consensus::State, +) -> ctx::OrCanceled<()> { + let key = &consensus_state.cfg.key; + let my_addr = consensus_state.cfg.public_addr; let mut sub = state.gossip.validator_addrs.subscribe(); loop { if !ctx.is_active() { @@ -311,7 +316,7 @@ async fn run_address_announcer(ctx: &ctx::Ctx, state: &State) -> ctx::OrCanceled .gossip .validator_addrs .update( - &state.cfg.consensus, + &state.cfg.validators, &[Arc::new(key.sign_msg(validator::NetAddress { addr: my_addr, version: next_version, @@ -359,7 +364,7 @@ pub(crate) async fn run_client( ) -> anyhow::Result<()> { let res = scope::run!(ctx, |ctx, s| async { // Spawn a tasks handling static outbound connections. - for (peer, addr) in &state.cfg.gossip.static_outbound { + for (peer, addr) in &state.gossip.cfg.static_outbound { s.spawn::<()>(async { loop { let run_result = run_outbound_stream(ctx, state, sender, peer, *addr).await; @@ -383,7 +388,11 @@ pub(crate) async fn run_client( Ok(()) }); - run_address_announcer(ctx, state).await + if let Some(consensus_state) = &state.consensus { + run_address_announcer(ctx, state, consensus_state).await + } else { + Ok(()) + } }) .await; diff --git a/node/actors/network/src/gossip/state.rs b/node/actors/network/src/gossip/state.rs index 23349f38..49bd4bb5 100644 --- a/node/actors/network/src/gossip/state.rs +++ b/node/actors/network/src/gossip/state.rs @@ -1,4 +1,4 @@ -use crate::{consensus, io::SyncState, pool::PoolWatch, rpc, watch::Watch}; +use crate::{io::SyncState, pool::PoolWatch, rpc, watch::Watch}; use concurrency::sync::{self, watch, Mutex}; use roles::{node, validator}; use std::{ @@ -43,7 +43,7 @@ impl ValidatorAddrs { /// Returns true iff some new entry was added. pub(super) fn update( &mut self, - cfg: &consensus::Config, + validators: &validator::ValidatorSet, data: &[Arc>], ) -> anyhow::Result { let mut changed = false; @@ -57,7 +57,7 @@ impl ValidatorAddrs { anyhow::bail!("duplicate entry for {:?}", d.key); } done.insert(d.key.clone()); - if !cfg.validators.contains(&d.key) { + if !validators.contains(&d.key) { // We just skip the entries we are not interested in. // For now the set of validators is static, so we could treat this as an error, // however we eventually want the validator set to be dynamic. @@ -99,20 +99,20 @@ impl ValidatorAddrsWatch { /// invalid entry should be banned. pub(crate) async fn update( &self, - cfg: &consensus::Config, + validators: &validator::ValidatorSet, data: &[Arc>], ) -> anyhow::Result<()> { let this = self.0.lock().await; - let mut m = this.borrow().clone(); - if m.update(cfg, data)? { - this.send(m).ok().unwrap(); + let mut validator_addrs = this.borrow().clone(); + if validator_addrs.update(validators, data)? { + this.send(validator_addrs).ok().unwrap(); } Ok(()) } } /// Gossip network configuration. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Config { /// Private key of the node, every node should have one. pub key: node::SecretKey, @@ -132,6 +132,8 @@ type ClientMap = Mutex>>>; /// Gossip network state. pub(crate) struct State { + /// Gossip network configuration. + pub(crate) cfg: Config, /// Currently open inbound connections. pub(crate) inbound: PoolWatch, /// Currently open outbound connections. @@ -146,7 +148,7 @@ pub(crate) struct State { impl State { /// Constructs a new State. - pub(crate) fn new(cfg: &Config, sync_state: Option>) -> Self { + pub(crate) fn new(cfg: Config, sync_state: Option>) -> Self { Self { inbound: PoolWatch::new( cfg.static_inbound.clone(), @@ -156,6 +158,7 @@ impl State { validator_addrs: ValidatorAddrsWatch::default(), sync_state, get_block_clients: ClientMap::default(), + cfg, } } } diff --git a/node/actors/network/src/gossip/tests.rs b/node/actors/network/src/gossip/tests.rs index 5256548f..a5156284 100644 --- a/node/actors/network/src/gossip/tests.rs +++ b/node/actors/network/src/gossip/tests.rs @@ -1,5 +1,5 @@ use super::*; -use crate::{consensus, event::Event, io, preface, rpc, rpc::Rpc as _, run_network, testonly}; +use crate::{event::Event, io, preface, rpc, rpc::Rpc as _, run_network, testonly}; use anyhow::Context as _; use concurrency::{ ctx::{self, channel}, @@ -40,23 +40,43 @@ async fn test_one_connection_per_node() { } tracing::info!("waiting for all connections to be established"); - for n in &mut nodes { - tracing::info!("node {:?} awaiting connections",n.state.cfg.consensus.key.public()); - let want = &n.state.cfg.gossip.static_outbound.keys().cloned().collect(); - n.state.gossip.outbound.subscribe().wait_for(|got|got.current()==want).await.unwrap(); + for node in &mut nodes { + tracing::info!("node {:?} awaiting connections", node.consensus_config().key.public()); + let want = &node.state.gossip.cfg.static_outbound.keys().cloned().collect(); + let mut subscription = node.state.gossip.outbound.subscribe(); + subscription + .wait_for(|got| got.current() == want) + .await + .unwrap(); } - tracing::info!("Impersonate a node, and try to establish additional connection to an already connected peer."); - let me = &nodes[0]; - let (peer,addr) = nodes[0].state.cfg.gossip.static_outbound.iter().next().unwrap(); - let mut stream = preface::connect(ctx,*addr,preface::Endpoint::GossipNet).await.context("preface::connect")?; - handshake::outbound(ctx, &me.state.cfg.gossip, &mut stream, peer).await.context("handshake::outbound")?; + tracing::info!( + "Impersonate a node, and try to establish additional connection to an already connected peer." + ); + let my_gossip_config = &nodes[0].state.gossip.cfg; + let (peer, addr) = my_gossip_config.static_outbound.iter().next().unwrap(); + let mut stream = preface::connect( + ctx, + *addr, + preface::Endpoint::GossipNet, + ) + .await + .context("preface::connect")?; + + handshake::outbound(ctx, my_gossip_config, &mut stream, peer) + .await + .context("handshake::outbound")?; tracing::info!("The connection is expected to be closed automatically by peer."); // The multiplexer runner should exit gracefully. - let _ = rpc::Service::new().run(ctx,stream).await; - tracing::info!("Exiting the main task. Context will get canceled, all the nodes are expected to terminate gracefully."); + let _ = rpc::Service::new().run(ctx, stream).await; + tracing::info!( + "Exiting the main task. Context will get canceled, all the nodes are expected \ + to terminate gracefully." + ); Ok(()) - }).await.unwrap(); + }) + .await + .unwrap(); } fn mk_addr(rng: &mut R) -> std::net::SocketAddr { @@ -134,12 +154,7 @@ async fn test_validator_addrs() { let rng = &mut ctx::test_root(&ctx::RealClock).rng(); let keys: Vec = (0..8).map(|_| rng.gen()).collect(); - let cfg = consensus::Config { - key: rng.gen(), - public_addr: mk_addr(rng), - validators: validator::ValidatorSet::new(keys.iter().map(|k| k.public())).unwrap(), - }; - + let validators = validator::ValidatorSet::new(keys.iter().map(|k| k.public())).unwrap(); let va = ValidatorAddrsWatch::default(); let mut sub = va.subscribe(); @@ -148,7 +163,7 @@ async fn test_validator_addrs() { for k in &keys[0..6] { want.insert(random_netaddr(rng, k)); } - va.update(&cfg, &want.as_vec()).await.unwrap(); + va.update(&validators, &want.as_vec()).await.unwrap(); assert_eq!(want.0, sub.borrow_and_update().0); // Update values. @@ -189,7 +204,7 @@ async fn test_validator_addrs() { // no entry at all for keys[7] k8v1.clone(), ]; - va.update(&cfg, &update).await.unwrap(); + va.update(&validators, &update).await.unwrap(); assert_eq!(want.0, sub.borrow_and_update().0); // Invalid signature. @@ -200,11 +215,11 @@ async fn test_validator_addrs() { mk_timestamp(rng), ); k0v3.key = keys[0].public(); - assert!(va.update(&cfg, &[Arc::new(k0v3)]).await.is_err()); + assert!(va.update(&validators, &[Arc::new(k0v3)]).await.is_err()); assert_eq!(want.0, sub.borrow_and_update().0); // Duplicate entry in the update. - assert!(va.update(&cfg, &[k8v1.clone(), k8v1]).await.is_err()); + assert!(va.update(&validators, &[k8v1.clone(), k8v1]).await.is_err()); assert_eq!(want.0, sub.borrow_and_update().0); } @@ -230,16 +245,16 @@ async fn test_validator_addrs_propagation() { } let want: HashMap<_, _> = nodes .iter() - .map(|n| { + .map(|node| { ( - n.state.cfg.consensus.key.public(), - n.state.cfg.consensus.public_addr, + node.consensus_config().key.public(), + node.consensus_config().public_addr, ) }) .collect(); - for (i, n) in nodes.iter().enumerate() { + for (i, node) in nodes.iter().enumerate() { tracing::info!("awaiting for node[{i}] to learn validator_addrs"); - n.state + node.state .gossip .validator_addrs .subscribe() @@ -415,7 +430,7 @@ async fn uncoordinated_block_syncing( for node in &nodes { let (network_pipe, dispatcher_pipe) = pipe::new(); s.spawn_bg(run_network(ctx, node.state.clone(), network_pipe)); - let node_key = node.state.cfg.gossip.key.public(); + let node_key = node.state.gossip.cfg.key.public(); s.spawn(run_mock_uncoordinated_dispatcher( ctx, dispatcher_pipe, @@ -478,7 +493,7 @@ async fn getting_blocks_from_peers(node_count: usize, gossip_peers: usize) { } let node_keys: Vec<_> = nodes .iter() - .map(|node| node.state.cfg().gossip.key.public()) + .map(|node| node.state.gossip.cfg.key.public()) .collect(); let block: FinalBlock = rng.gen(); @@ -521,7 +536,7 @@ async fn getting_blocks_from_peers(node_count: usize, gossip_peers: usize) { if response_receiver.recv_or_disconnected(ctx).await?.is_ok() { successful_peer_responses += 1; } else { - let self_key = node.state.cfg().gossip.key.public(); + let self_key = node.state.gossip.cfg.key.public(); tracing::trace!("Request from {self_key:?} to {peer_key:?} was dropped"); } } @@ -606,9 +621,10 @@ async fn validator_node_restart() { let start = ctx.now_utc(); for clock_shift in [zero, sec, -2 * sec, 4 * sec, 10 * sec, -30 * sec] { // Set the new addr to broadcast. - let key0 = cfgs[0].consensus.key.public(); + let mutated_config = cfgs[0].consensus.as_mut().unwrap(); + let key0 = mutated_config.key.public(); let addr0 = mk_addr(rng); - cfgs[0].consensus.public_addr = addr0; + mutated_config.public_addr = addr0; // Shift the UTC clock. let now = start + clock_shift; assert!( @@ -676,12 +692,15 @@ async fn rate_limiting() { let mut cfgs = testonly::Instance::new_configs(rng, n, 0); let want: HashMap<_, _> = cfgs .iter() - .map(|cfg| (cfg.consensus.key.public(), cfg.consensus.public_addr)) + .map(|cfg| { + let consensus_cfg = cfg.consensus.as_ref().unwrap(); + (consensus_cfg.key.public(), consensus_cfg.public_addr) + }) .collect(); for i in 1..n { let key = cfgs[i].gossip.key.public().clone(); - let addr = cfgs[i].consensus.public_addr; - cfgs[0].gossip.static_outbound.insert(key, addr); + let public_addr = cfgs[i].consensus.as_ref().unwrap().public_addr; + cfgs[0].gossip.static_outbound.insert(key, public_addr); } let mut nodes: Vec<_> = cfgs.into_iter().map(testonly::Instance::from_cfg).collect(); @@ -698,7 +717,7 @@ async fn rate_limiting() { .gossip .validator_addrs .subscribe() - .wait_for(|got| got.get(&node.state.cfg.consensus.key.public()).is_some()) + .wait_for(|got| got.get(&node.consensus_config().key.public()).is_some()) .await .unwrap(); } diff --git a/node/actors/network/src/metrics.rs b/node/actors/network/src/metrics.rs index 601c0ad2..77cda040 100644 --- a/node/actors/network/src/metrics.rs +++ b/node/actors/network/src/metrics.rs @@ -1,14 +1,16 @@ //! General-purpose network metrics. use crate::state::State; -use concurrency::{ctx, io, metrics::GaugeGuard, net}; +use concurrency::{ctx, io, net}; use std::{ net::SocketAddr, pin::Pin, sync::Weak, task::{ready, Context, Poll}, }; -use vise::{Collector, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, Metrics, Unit}; +use vise::{ + Collector, Counter, EncodeLabelSet, EncodeLabelValue, Family, Gauge, GaugeGuard, Metrics, Unit, +}; /// Metered TCP stream. #[pin_project::pin_project] @@ -49,7 +51,7 @@ impl MeteredStream { TCP_METRICS.established[&direction].inc(); Self { stream, - _active: GaugeGuard::from(TCP_METRICS.active[&direction].clone()), + _active: TCP_METRICS.active[&direction].inc_guard(1), } } } @@ -147,11 +149,13 @@ impl NetworkGauges { gauges.gossip_inbound_connections.set(len); let len = state.gossip.outbound.subscribe().borrow().current().len(); gauges.gossip_outbound_connections.set(len); - let len = state.consensus.inbound.subscribe().borrow().current().len(); - gauges.consensus_inbound_connections.set(len); - let subscriber = state.consensus.outbound.subscribe(); - let len = subscriber.borrow().current().len(); - gauges.consensus_outbound_connections.set(len); + if let Some(consensus_state) = &state.consensus { + let len = consensus_state.inbound.subscribe().borrow().current().len(); + gauges.consensus_inbound_connections.set(len); + let subscriber = consensus_state.outbound.subscribe(); + let len = subscriber.borrow().current().len(); + gauges.consensus_outbound_connections.set(len); + } gauges }) }); diff --git a/node/actors/network/src/rpc/mod.rs b/node/actors/network/src/rpc/mod.rs index b77b359f..a4609016 100644 --- a/node/actors/network/src/rpc/mod.rs +++ b/node/actors/network/src/rpc/mod.rs @@ -18,11 +18,7 @@ use self::metrics::{CallLatencyType, CallType, RPC_METRICS}; use crate::{frame, mux}; use anyhow::Context as _; -use concurrency::{ - ctx, io, limiter, - metrics::{GaugeGuard, LatencyHistogramExt as _}, - scope, -}; +use concurrency::{ctx, io, limiter, metrics::LatencyHistogramExt as _, scope}; use std::{collections::BTreeMap, sync::Arc}; pub(crate) mod consensus; @@ -92,7 +88,7 @@ impl<'a, R: Rpc> ReservedCall<'a, R> { drop(self.permit); let res = async { let metric_labels = CallType::Client.to_labels::(req); - let _guard = GaugeGuard::from(RPC_METRICS.inflight[&metric_labels].clone()); + let _guard = RPC_METRICS.inflight[&metric_labels].inc_guard(1); let msg_size = frame::mux_send_proto(ctx, &mut stream.write, req).await?; RPC_METRICS.message_size[&CallType::ReqSent.to_labels::(req)].observe(msg_size); drop(stream.write); @@ -193,8 +189,7 @@ impl> ServerTrait for Server { let resp_size_labels = CallType::RespSent.to_labels::(&req); RPC_METRICS.message_size[&size_labels].observe(msg_size); let inflight_labels = CallType::Server.to_labels::(&req); - let _guard = - GaugeGuard::from(RPC_METRICS.inflight[&inflight_labels].clone()); + let _guard = RPC_METRICS.inflight[&inflight_labels].inc_guard(1); let mut server_process_labels = CallLatencyType::ServerProcess.to_labels::(&req, &Ok(())); let mut recv_send_labels = diff --git a/node/actors/network/src/state.rs b/node/actors/network/src/state.rs index ee2f9ef4..4d6edbdc 100644 --- a/node/actors/network/src/state.rs +++ b/node/actors/network/src/state.rs @@ -3,26 +3,42 @@ use super::{consensus, event::Event, gossip, metrics, preface}; use crate::io::{InputMessage, OutputMessage, SyncState}; use anyhow::Context as _; use concurrency::{ctx, ctx::channel, net, scope, sync::watch}; +use roles::validator; use std::sync::Arc; use utils::pipe::ActorPipe; /// Network actor config. -#[derive(Clone)] +#[derive(Debug, Clone)] pub struct Config { /// TCP socket address to listen for inbound connections at. pub server_addr: net::tcp::ListenerAddr, + /// Validators which + /// - client should establish outbound connections to. + /// - server should accept inbound connections from (1 per validator). + pub validators: validator::ValidatorSet, /// Gossip network config. pub gossip: gossip::Config, - /// Consensus network config. - pub consensus: consensus::Config, + /// Consensus network config. If not present, the node will not participate in the consensus network. + pub consensus: Option, +} + +/// Part of configuration shared among network modules. +#[derive(Debug)] +pub(crate) struct SharedConfig { + /// TCP socket address to listen for inbound connections at. + pub(crate) server_addr: net::tcp::ListenerAddr, + /// Validators which + /// - client should establish outbound connections to. + /// - server should accept inbound connections from (1 per validator). + pub(crate) validators: validator::ValidatorSet, } /// State of the network actor observable outside of the actor. pub struct State { - /// Network configuration. - pub(crate) cfg: Config, + /// Configuration shared among network modules. + pub(crate) cfg: SharedConfig, /// Consensus network state. - pub(crate) consensus: consensus::State, + pub(crate) consensus: Option, /// Gossip network state. pub(crate) gossip: gossip::State, @@ -38,18 +54,21 @@ impl State { cfg: Config, events: Option>, sync_state: Option>, - ) -> Arc { - Arc::new(Self { - gossip: gossip::State::new(&cfg.gossip, sync_state), - consensus: consensus::State::new(&cfg.consensus), + ) -> anyhow::Result> { + let consensus = cfg + .consensus + .map(|consensus_cfg| consensus::State::new(consensus_cfg, &cfg.validators)) + .transpose()?; + let this = Self { + gossip: gossip::State::new(cfg.gossip, sync_state), + consensus, events, - cfg, - }) - } - - /// Config getter. - pub fn cfg(&self) -> &Config { - &self.cfg + cfg: SharedConfig { + server_addr: cfg.server_addr, + validators: cfg.validators, + }, + }; + Ok(Arc::new(this)) } /// Registers metrics for this state. @@ -95,11 +114,13 @@ pub async fn run_network( .context("gossip::run_client") }); - s.spawn(async { - consensus::run_client(ctx, state.as_ref(), consensus_recv) - .await - .context("consensus::run_client") - }); + if let Some(consensus_state) = &state.consensus { + s.spawn(async { + consensus::run_client(ctx, consensus_state, state.as_ref(), consensus_recv) + .await + .context("consensus::run_client") + }); + } // TODO(gprusak): add rate limit and inflight limit for inbound handshakes. while let Ok(stream) = metrics::MeteredStream::listen(ctx, &mut listener).await { @@ -109,9 +130,18 @@ pub async fn run_network( let (stream, endpoint) = preface::accept(ctx, stream).await?; match endpoint { preface::Endpoint::ConsensusNet => { - consensus::run_inbound_stream(ctx, &state, &pipe.send, stream) + if let Some(consensus_state) = &state.consensus { + consensus::run_inbound_stream( + ctx, + consensus_state, + &pipe.send, + stream, + ) .await .context("consensus::run_inbound_stream()") + } else { + anyhow::bail!("Node does not accept consensus network connections"); + } } preface::Endpoint::GossipNet => { gossip::run_inbound_stream(ctx, &state, &pipe.send, stream) diff --git a/node/actors/network/src/testonly.rs b/node/actors/network/src/testonly.rs index cbc92fab..fd047daf 100644 --- a/node/actors/network/src/testonly.rs +++ b/node/actors/network/src/testonly.rs @@ -50,7 +50,7 @@ pub struct Instance { impl Instance { /// Construct configs for `n` validators of the consensus. - pub(crate) fn new_configs(rng: &mut R, n: usize, gossip_peers: usize) -> Vec { + pub fn new_configs(rng: &mut R, n: usize, gossip_peers: usize) -> Vec { let keys: Vec = (0..n).map(|_| rng.gen()).collect(); let validators = validator::ValidatorSet::new(keys.iter().map(|k| k.public())).unwrap(); let mut cfgs: Vec<_> = (0..n) @@ -58,11 +58,11 @@ impl Instance { let addr = net::tcp::testonly::reserve_listener(); Config { server_addr: addr, - consensus: consensus::Config { + validators: validators.clone(), + consensus: Some(consensus::Config { key: keys[i].clone(), public_addr: *addr, - validators: validators.clone(), - }, + }), gossip: gossip::Config { key: rng.gen(), dynamic_inbound_limit: n as u64, @@ -88,7 +88,7 @@ impl Instance { pub(crate) fn from_cfg(cfg: Config) -> Self { let (events_send, events_recv) = channel::unbounded(); Self { - state: State::new(cfg, Some(events_send), None), + state: State::new(cfg, Some(events_send), None).expect("Invalid network config"), events: events_recv, } } @@ -107,6 +107,35 @@ impl Instance { &self.state } + /// Returns the consensus config for this node, assuming it is a validator. + pub fn consensus_config(&self) -> &consensus::Config { + &self + .state + .consensus + .as_ref() + .expect("Node is not a validator") + .cfg + } + + /// Returns the gossip config for this node. + pub fn gossip_config(&self) -> &gossip::Config { + &self.state.gossip.cfg + } + + /// Returns the overall config for this node. + pub fn to_config(&self) -> Config { + Config { + server_addr: self.state.cfg.server_addr, + validators: self.state.cfg.validators.clone(), + gossip: self.state.gossip.cfg.clone(), + consensus: self + .state + .consensus + .as_ref() + .map(|consensus_state| consensus_state.cfg.clone()), + } + } + /// Sets a `SyncState` subscriber for the node. Panics if the node state is already shared. pub fn set_sync_state_subscriber(&mut self, sync_state: watch::Receiver) { let state = Arc::get_mut(&mut self.state).expect("node state is shared"); @@ -116,21 +145,14 @@ impl Instance { /// Disables ping messages over the gossip network. pub fn disable_gossip_pings(&mut self) { let state = Arc::get_mut(&mut self.state).expect("node state is shared"); - state.cfg.gossip.enable_pings = false; + state.gossip.cfg.enable_pings = false; } /// Wait for static outbound gossip connections to be established. pub async fn wait_for_gossip_connections(&self) { - let want: HashSet<_> = self - .state - .cfg - .gossip - .static_outbound - .keys() - .cloned() - .collect(); - self.state - .gossip + let gossip_state = &self.state.gossip; + let want: HashSet<_> = gossip_state.cfg.static_outbound.keys().cloned().collect(); + gossip_state .outbound .subscribe() .wait_for(|got| want.is_subset(got.current())) @@ -140,23 +162,16 @@ impl Instance { /// Waits for all the consensus connections to be established. pub async fn wait_for_consensus_connections(&self) { - let want: HashSet<_> = self - .state - .cfg - .consensus - .validators - .iter() - .cloned() - .collect(); - self.state - .consensus + let consensus_state = self.state.consensus.as_ref().unwrap(); + + let want: HashSet<_> = self.state.cfg.validators.iter().cloned().collect(); + consensus_state .inbound .subscribe() .wait_for(|got| got.current() == &want) .await .unwrap(); - self.state - .consensus + consensus_state .outbound .subscribe() .wait_for(|got| got.current() == &want) @@ -176,9 +191,9 @@ pub async fn instant_network( // Collect validator addrs. let mut addrs = vec![]; let nodes: Vec<_> = nodes.collect(); - for n in &nodes { - let key = n.state.cfg.consensus.key.public(); - let sub = &mut n.state.gossip.validator_addrs.subscribe(); + for node in &nodes { + let key = node.consensus_config().key.public(); + let sub = &mut node.state.gossip.validator_addrs.subscribe(); loop { if let Some(addr) = sync::changed(ctx, sub).await?.get(&key) { addrs.push(addr.clone()); @@ -187,11 +202,11 @@ pub async fn instant_network( } } // Broadcast validator addrs. - for n in &nodes { - n.state + for node in &nodes { + node.state .gossip .validator_addrs - .update(&n.state.cfg.consensus, &addrs) + .update(&node.state.cfg.validators, &addrs) .await .unwrap(); } diff --git a/node/actors/network/src/tests.rs b/node/actors/network/src/tests.rs index 89eb5c4c..fda4675c 100644 --- a/node/actors/network/src/tests.rs +++ b/node/actors/network/src/tests.rs @@ -21,7 +21,7 @@ async fn test_metrics() { } testonly::instant_network(ctx, nodes.iter()).await?; - let registry = vise::Registry::collect(); + let registry = vise::MetricsCollection::default().collect(); nodes[0].state().register_metrics(); let mut encoded_metrics = String::new(); registry.encode(&mut encoded_metrics, vise::Format::OpenMetricsForPrometheus)?; diff --git a/node/actors/sync_blocks/src/lib.rs b/node/actors/sync_blocks/src/lib.rs index 3c471782..a8028b51 100644 --- a/node/actors/sync_blocks/src/lib.rs +++ b/node/actors/sync_blocks/src/lib.rs @@ -27,6 +27,8 @@ mod tests; pub use crate::config::Config; use crate::peers::PeerStates; +// FIXME(slowli): when run on validator node, the actor creates unnecessary `GetBlocks` requests + /// Block syncing actor responsible for synchronizing L2 blocks with other nodes. #[derive(Debug)] pub struct SyncBlocks { diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 99b22fdc..46c30a27 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -96,7 +96,7 @@ impl Node { } fn key(&self) -> node::PublicKey { - self.network.state().cfg().gossip.key.public() + self.network.gossip_config().key.public() } #[instrument(level = "trace", skip(ctx, test_validators, storage_dir), err)] diff --git a/node/actors/sync_blocks/src/tests/mod.rs b/node/actors/sync_blocks/src/tests/mod.rs index dd6619dd..4bb49a95 100644 --- a/node/actors/sync_blocks/src/tests/mod.rs +++ b/node/actors/sync_blocks/src/tests/mod.rs @@ -40,7 +40,7 @@ impl TestValidators { let validator_secret_keys: Vec = (0..validator_count).map(|_| rng.gen()).collect(); let validator_set = validator_secret_keys.iter().map(|sk| sk.public()); - let validator_set = validator::ValidatorSet::new(validator_set).unwrap(); + let validator_set = ValidatorSet::new(validator_set).unwrap(); let mut this = Self { validator_secret_keys, diff --git a/node/libs/concurrency/Cargo.toml b/node/libs/concurrency/Cargo.toml index bbad2377..81e73352 100644 --- a/node/libs/concurrency/Cargo.toml +++ b/node/libs/concurrency/Cargo.toml @@ -8,8 +8,6 @@ license.workspace = true [dependencies] anyhow.workspace = true -futures.workspace = true -hyper.workspace = true once_cell.workspace = true pin-project.workspace = true rand.workspace = true diff --git a/node/libs/concurrency/src/metrics.rs b/node/libs/concurrency/src/metrics.rs index 23698628..364a2167 100644 --- a/node/libs/concurrency/src/metrics.rs +++ b/node/libs/concurrency/src/metrics.rs @@ -3,23 +3,6 @@ use std::time::Duration; use vise::Gauge; -/// Guard which increments the gauge when constructed -/// and decrements it when dropped. -pub struct GaugeGuard(Gauge); - -impl From for GaugeGuard { - fn from(gauge: Gauge) -> Self { - gauge.inc_by(1); - Self(gauge) - } -} - -impl Drop for GaugeGuard { - fn drop(&mut self) { - self.0.dec_by(1); - } -} - /// Extension trait for latency histograms. pub trait LatencyHistogramExt { /// Observes latency. diff --git a/node/libs/schema/proto/executor/config.proto b/node/libs/schema/proto/executor/config.proto index 477e4033..87baafd0 100644 --- a/node/libs/schema/proto/executor/config.proto +++ b/node/libs/schema/proto/executor/config.proto @@ -48,7 +48,6 @@ message NodeAddr { message ConsensusConfig { optional string key = 1; // [required] ValidatorPublicKey optional string public_addr = 2; // [required] IpAddr - repeated string validators = 3; // [required] ValidatorPublicKey } // Config of the gossip network. @@ -66,24 +65,32 @@ message GossipConfig { repeated NodeAddr static_outbound = 4; } -// Top-level config. -message NodeConfig { +// Top-level executor config. +message ExecutorConfig { // IP:port to listen on, for incoming TCP connections. // Use `0.0.0.0:` to listen on all network interfaces (i.e. on all IPs exposed by this VM). optional string server_addr = 1; // [required] IpAddr - // IP:port to serve metrics data for scraping. - // Use `0.0.0.0:` to listen on all network interfaces. - // If not set, metrics data won't be served. - optional string metrics_server_addr = 2; // IpAddr - - // Consensus network config. - // NOTE: we currently only support validator nodes, but eventually it will be optional. - optional ConsensusConfig consensus = 3; // [required] - // Gossip network config. optional GossipConfig gossip = 4; // [required] // Genesis block of the blockchain. optional string genesis_block = 5; // [required] FinalBlock + // Public keys of all validators. + repeated string validators = 6; // [required] ValidatorPublicKey +} + +// Node configuration including executor configuration, optional validator configuration, +// and application-specific settings (e.g. metrics scraping). +message NodeConfig { + // Executor configuration. + optional ExecutorConfig executor = 1; // [required] + + // IP:port to serve metrics data for scraping. + // Use `0.0.0.0:` to listen on all network interfaces. + // If not set, metrics data won't be served. + optional string metrics_server_addr = 2; // [optional] IpAddr + + // Consensus network config. + optional ConsensusConfig consensus = 3; // [optional] } diff --git a/node/libs/storage/src/lib.rs b/node/libs/storage/src/lib.rs index e40b7aa9..44d23342 100644 --- a/node/libs/storage/src/lib.rs +++ b/node/libs/storage/src/lib.rs @@ -1,6 +1,7 @@ //! This module is responsible for persistent data storage, it provides schema-aware type-safe database access. Currently we use RocksDB, //! but this crate only exposes an abstraction of a database, so we can easily switch to a different storage engine in the future. +mod replica_state; mod rocksdb; mod testonly; #[cfg(test)] @@ -9,6 +10,7 @@ mod traits; mod types; pub use crate::{ + replica_state::FallbackReplicaStateStore, rocksdb::RocksdbStorage, traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, types::{Proposal, ReplicaState, StorageError, StorageResult}, diff --git a/node/libs/storage/src/replica_state.rs b/node/libs/storage/src/replica_state.rs new file mode 100644 index 00000000..69335e4f --- /dev/null +++ b/node/libs/storage/src/replica_state.rs @@ -0,0 +1,66 @@ +//! `FallbackReplicaStateStore` type. + +use crate::{ + traits::{BlockStore, ReplicaStateStore}, + types::{ReplicaState, StorageResult}, +}; +use concurrency::ctx; +use roles::validator; +use std::sync::Arc; + +impl From for ReplicaState { + fn from(certificate: validator::CommitQC) -> Self { + Self { + view: certificate.message.view, + phase: validator::Phase::Prepare, + high_vote: certificate.message, + high_qc: certificate, + proposals: vec![], + } + } +} + +/// [`ReplicaStateStore`] wrapper that falls back to a specified block store. +#[derive(Debug, Clone)] +pub struct FallbackReplicaStateStore { + base: Arc, + fallback: Arc, +} + +impl FallbackReplicaStateStore { + /// Creates a store from a type implementing both replica state and block storage. + pub fn from_store(store: Arc) -> Self + where + S: ReplicaStateStore + BlockStore + 'static, + { + Self { + base: store.clone(), + fallback: store, + } + } + + /// Creates a new replica state store with a fallback. + pub fn new(base: Arc, fallback: Arc) -> Self { + Self { base, fallback } + } + + /// Gets the replica state. If it's not present, falls back to recover it from the fallback block store. + pub async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult { + let replica_state = self.base.replica_state(ctx).await?; + if let Some(replica_state) = replica_state { + Ok(replica_state) + } else { + let head_block = self.fallback.head_block(ctx).await?; + Ok(ReplicaState::from(head_block.justification)) + } + } + + /// Stores the given replica state into the database. This just proxies to the base replica store. + pub async fn put_replica_state( + &self, + ctx: &ctx::Ctx, + replica_state: &ReplicaState, + ) -> StorageResult<()> { + self.base.put_replica_state(ctx, replica_state).await + } +} diff --git a/node/libs/storage/src/traits.rs b/node/libs/storage/src/traits.rs index e87f62ef..0d0b544f 100644 --- a/node/libs/storage/src/traits.rs +++ b/node/libs/storage/src/traits.rs @@ -98,11 +98,11 @@ impl WriteBlockStore for Arc { /// /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. #[async_trait] -pub trait ReplicaStateStore: BlockStore { +pub trait ReplicaStateStore: fmt::Debug + Send + Sync { /// Gets the replica state, if it is contained in the database. async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult>; - /// Store the given replica state into the database. + /// Stores the given replica state into the database. async fn put_replica_state( &self, ctx: &ctx::Ctx, diff --git a/node/tools/Cargo.toml b/node/tools/Cargo.toml index 9628b66f..655b1a6a 100644 --- a/node/tools/Cargo.toml +++ b/node/tools/Cargo.toml @@ -5,17 +5,28 @@ edition.workspace = true authors.workspace = true homepage.workspace = true license.workspace = true +publish = false +default-run = "executor" [dependencies] anyhow.workspace = true -hex.workspace = true -rand.workspace = true -serde_json.workspace = true clap.workspace = true +rand.workspace = true +tokio.workspace = true +tracing.workspace = true +tracing-subscriber.workspace = true +vise-exporter.workspace = true +concurrency = { path = "../libs/concurrency" } crypto = { path = "../libs/crypto" } roles = { path = "../libs/roles" } +storage = { path = "../libs/storage" } schema = { path = "../libs/schema" } +utils = { path = "../libs/utils" } consensus = { path = "../actors/consensus" } executor = { path = "../actors/executor" } + +[[bin]] +name = "executor" +path = "src/main.rs" diff --git a/node/tools/src/bin/localnet_config.rs b/node/tools/src/bin/localnet_config.rs index cc461fbf..20862ef5 100644 --- a/node/tools/src/bin/localnet_config.rs +++ b/node/tools/src/bin/localnet_config.rs @@ -3,10 +3,11 @@ use anyhow::Context as _; use clap::Parser; use consensus::testonly; use crypto::TextFmt; -use executor::configurator::node_config::{ConsensusConfig, GossipConfig, NodeConfig}; +use executor::{ConsensusConfig, ExecutorConfig, GossipConfig}; use rand::Rng; use roles::{node, validator}; use std::{fs, net::SocketAddr, path::PathBuf}; +use tools::NodeConfig; /// Replaces IP of the address with UNSPECIFIED (aka INADDR_ANY) of the corresponding IP type. /// Opening a listener socket with an UNSPECIFIED IP, means that the new connections @@ -20,17 +21,15 @@ fn with_unspecified_ip(addr: SocketAddr) -> SocketAddr { } /// Command line arguments. -#[derive(Parser, Debug)] +#[derive(Debug, Parser)] struct Args { /// Path to a file with newline separated IP:port addrs of the nodes to configure. /// Binary will generate a config for each IP in this file. #[arg(long)] input_addrs: PathBuf, - /// TCP port to serve metrics for scraping. #[arg(long)] metrics_server_port: Option, - /// Path to a directory in which the configs should be created. /// Configs for , will be in directory // #[arg(long)] @@ -91,16 +90,19 @@ fn main() -> anyhow::Result<()> { } for (i, gossip) in gossip_cfgs.into_iter().enumerate() { - let node_cfg = NodeConfig { - consensus: ConsensusConfig { - key: validator_keys[i].public(), - public_addr: addrs[i], - validators: validator_set.clone(), - }, + let executor_cfg = ExecutorConfig { gossip, server_addr: with_unspecified_ip(addrs[i]), - metrics_server_addr, genesis_block: genesis.clone(), + validators: validator_set.clone(), + }; + let node_cfg = NodeConfig { + executor: executor_cfg, + metrics_server_addr, + consensus: Some(ConsensusConfig { + key: validator_keys[i].public(), + public_addr: addrs[i], + }), }; // Recreate the directory for the node's config. diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs new file mode 100644 index 00000000..0776659d --- /dev/null +++ b/node/tools/src/config.rs @@ -0,0 +1,125 @@ +//! Node configuration. + +use anyhow::Context as _; +use crypto::{read_optional_text, Text, TextFmt}; +use executor::{ConsensusConfig, ExecutorConfig}; +use roles::{node, validator}; +use schema::{proto::executor::config as proto, read_optional, read_required, ProtoFmt}; +use std::{fs, net, path::Path}; + +/// This struct holds the file path to each of the config files. +#[derive(Debug)] +pub struct ConfigPaths<'a> { + /// Path to a JSON file with node configuration. + pub config: &'a Path, + /// Path to a validator key file. + pub validator_key: Option<&'a Path>, + /// Path to a node key file. + pub node_key: &'a Path, +} + +/// Node configuration including executor configuration, optional validator configuration, +/// and application-specific settings (e.g. metrics scraping). +#[derive(Debug)] +pub struct NodeConfig { + /// Executor configuration. + pub executor: ExecutorConfig, + /// IP:port to serve metrics data for scraping. + pub metrics_server_addr: Option, + /// Consensus network config. + pub consensus: Option, +} + +impl ProtoFmt for NodeConfig { + type Proto = proto::NodeConfig; + + fn read(r: &Self::Proto) -> anyhow::Result { + Ok(Self { + executor: read_required(&r.executor).context("executor")?, + metrics_server_addr: read_optional_text(&r.metrics_server_addr) + .context("metrics_server_addr")?, + consensus: read_optional(&r.consensus).context("consensus")?, + }) + } + + fn build(&self) -> Self::Proto { + Self::Proto { + executor: Some(self.executor.build()), + metrics_server_addr: self.metrics_server_addr.as_ref().map(TextFmt::encode), + consensus: self.consensus.as_ref().map(ProtoFmt::build), + } + } +} + +/// Main struct that holds the config options for the node. +#[derive(Debug)] +pub struct Configs { + /// Executor configuration of the node. + pub executor: ExecutorConfig, + /// IP:port to serve metrics data for scraping. + pub metrics_server_addr: Option, + /// Consensus-specific config extensions. Only set for validators. + pub consensus: Option<(ConsensusConfig, validator::SecretKey)>, + /// The validator secret key for this node. + /// The node secret key. This key is used by both full nodes and validators to identify themselves + /// in the P2P network. + pub node_key: node::SecretKey, +} + +impl Configs { + /// Method to fetch the node config. + #[tracing::instrument(level = "trace", ret)] + pub fn read(args: ConfigPaths<'_>) -> anyhow::Result { + let node_config = fs::read_to_string(args.config).with_context(|| { + format!( + "failed reading node config from `{}`", + args.config.display() + ) + })?; + let node_config: NodeConfig = schema::decode_json(&node_config).with_context(|| { + format!( + "failed decoding JSON node config at `{}`", + args.config.display() + ) + })?; + + let validator_key: Option = args + .validator_key + .as_ref() + .map(|validator_key| { + let read_key = fs::read_to_string(validator_key).with_context(|| { + format!( + "failed reading validator key from `{}`", + validator_key.display() + ) + })?; + Text::new(&read_key).decode().with_context(|| { + format!( + "failed decoding validator key at `{}`", + validator_key.display() + ) + }) + }) + .transpose()?; + let read_key = fs::read_to_string(args.node_key).with_context(|| { + format!("failed reading node key from `{}`", args.node_key.display()) + })?; + let node_key = Text::new(&read_key).decode().with_context(|| { + format!("failed decoding node key at `{}`", args.node_key.display()) + })?; + + anyhow::ensure!( + validator_key.is_some() == node_config.consensus.is_some(), + "Validator key and consensus config must be specified at the same time" + ); + let consensus = validator_key.and_then(|key| Some((node_config.consensus?, key))); + + let cfg = Configs { + executor: node_config.executor, + metrics_server_addr: node_config.metrics_server_addr, + consensus, + node_key, + }; + Ok(cfg) + } +} diff --git a/node/tools/src/lib.rs b/node/tools/src/lib.rs new file mode 100644 index 00000000..6b818859 --- /dev/null +++ b/node/tools/src/lib.rs @@ -0,0 +1,5 @@ +//! CLI tools for the consensus node. + +mod config; + +pub use self::config::{ConfigPaths, Configs, NodeConfig}; diff --git a/node/actors/executor/src/main.rs b/node/tools/src/main.rs similarity index 55% rename from node/actors/executor/src/main.rs rename to node/tools/src/main.rs index afb8e180..936b6dae 100644 --- a/node/actors/executor/src/main.rs +++ b/node/tools/src/main.rs @@ -2,32 +2,65 @@ //! manages communication between the actors. It is the main executable in this workspace. use anyhow::Context as _; -use concurrency::{ctx, scope, time}; -use consensus::Consensus; -use executor::{configurator::Configs, io::Dispatcher}; -use std::{fs, io::IsTerminal as _, path::Path, sync::Arc}; +use clap::Parser; +use concurrency::{ + ctx::{self, channel}, + scope, time, +}; +use executor::Executor; +use std::{ + fs, + io::IsTerminal as _, + path::{Path, PathBuf}, + sync::Arc, +}; use storage::{BlockStore, RocksdbStorage}; -use sync_blocks::SyncBlocks; +use tools::{ConfigPaths, Configs}; use tracing::metadata::LevelFilter; use tracing_subscriber::{prelude::*, Registry}; -use utils::{no_copy::NoCopy, pipe}; +use utils::no_copy::NoCopy; use vise_exporter::MetricsExporter; +/// Command-line application launching a node executor. +#[derive(Debug, Parser)] +struct Args { + /// Verify configuration instead of launching a node. + #[arg(long, conflicts_with_all = ["ci_mode", "validator_key", "config_file", "node_key"])] + verify_config: bool, + /// Exit after finalizing 100 blocks. + #[arg(long)] + ci_mode: bool, + /// Path to a validator key file. If set to an empty string, validator key will not be read + /// (i.e., a node will be initialized as a non-validator node). + #[arg(long, default_value = "validator_key")] + validator_key: PathBuf, + /// Path to a JSON file with node configuration. + #[arg(long, default_value = "config.json")] + config_file: PathBuf, + /// Path to a node key file. + #[arg(long, default_value = "node_key")] + node_key: PathBuf, +} + +impl Args { + /// Extracts configuration paths from these args. + fn config_paths(&self) -> ConfigPaths<'_> { + ConfigPaths { + config: &self.config_file, + node_key: &self.node_key, + validator_key: (!self.validator_key.as_os_str().is_empty()) + .then_some(&self.validator_key), + } + } +} + #[tokio::main] async fn main() -> anyhow::Result<()> { + let args: Args = Args::parse(); + tracing::trace!(?args, "Starting node"); let ctx = &ctx::root(); - // Get the command line arguments. - let args: Vec<_> = std::env::args().collect(); - - // Check if we are in config mode. - let config_mode = args.iter().any(|x| x == "--verify-config"); - - // Check if we are in CI mode. - // If we are in CI mode, we will exit after finalizing more than 100 blocks. - let ci_mode = args.iter().any(|x| x == "--ci-mode"); - - if !config_mode { + if !args.verify_config { // Create log file. fs::create_dir_all("logs/")?; let log_file = fs::File::create("logs/output.log")?; @@ -61,9 +94,9 @@ async fn main() -> anyhow::Result<()> { // Load the config files. tracing::debug!("Loading config files."); - let configs = Configs::read(&args).context("configs.read()")?; + let configs = Configs::read(args.config_paths()).context("configs.read()")?; - if config_mode { + if args.verify_config { tracing::info!("Configuration verified."); return Ok(()); } @@ -71,75 +104,43 @@ async fn main() -> anyhow::Result<()> { // Initialize the storage. tracing::debug!("Initializing storage."); - let storage = RocksdbStorage::new(ctx, &configs.config.genesis_block, Path::new("./database")); - let storage = Arc::new(storage.await.context("RocksdbStorage::new()")?); - - // Generate the communication pipes. We have one for each actor. - let (consensus_actor_pipe, consensus_dispatcher_pipe) = pipe::new(); - let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new(); - let (network_actor_pipe, network_dispatcher_pipe) = pipe::new(); - - // Create the IO dispatcher. - let mut dispatcher = Dispatcher::new( - consensus_dispatcher_pipe, - sync_blocks_dispatcher_pipe, - network_dispatcher_pipe, - ); - - // Create each of the actors. - let validator_set = &configs.config.consensus.validators; - let consensus = Consensus::new( - ctx, - consensus_actor_pipe, - configs.validator_key.clone(), - validator_set.clone(), - storage.clone(), - ) - .await - .context("consensus")?; - - let sync_blocks_config = sync_blocks::Config::new( - validator_set.clone(), - consensus::misc::consensus_threshold(validator_set.len()), - )?; - let sync_blocks = SyncBlocks::new( + let storage = RocksdbStorage::new( ctx, - sync_blocks_actor_pipe, - storage.clone(), - sync_blocks_config, - ) - .await - .context("sync_blocks")?; + &configs.executor.genesis_block, + Path::new("./database"), + ); + let storage = Arc::new(storage.await.context("RocksdbStorage::new()")?); + let mut executor = Executor::new(configs.executor, configs.node_key, storage.clone()) + .context("Executor::new()")?; + if let Some((consensus_config, validator_key)) = configs.consensus { + let blocks_sender = channel::unbounded().0; // Just drop finalized blocks + executor + .set_validator( + consensus_config, + validator_key, + storage.clone(), + blocks_sender, + ) + .context("Executor::set_validator()")?; + } - tracing::debug!("Starting actors in separate threads."); scope::run!(ctx, |ctx, s| async { - if let Some(addr) = configs.config.metrics_server_addr { + if let Some(addr) = configs.metrics_server_addr { let addr = NoCopy::from(addr); s.spawn_bg(async { let addr = addr; MetricsExporter::default() - .with_graceful_shutdown(ctx.canceled_owned()) // FIXME: support non-'static shutdown + .with_graceful_shutdown(ctx.canceled()) .start(*addr) .await?; Ok(()) }); } - s.spawn_blocking(|| dispatcher.run(ctx).context("IO Dispatcher stopped")); - - s.spawn(async { - let state = network::State::new(configs.network_config(), None, None); - state.register_metrics(); - network::run_network(ctx, state, network_actor_pipe) - .await - .context("Network stopped") - }); - - s.spawn_blocking(|| consensus.run(ctx).context("Consensus stopped")); - s.spawn(async { sync_blocks.run(ctx).await.context("Syncing blocks stopped") }); + s.spawn(executor.run(ctx)); // if we are in CI mode, we wait for the node to finalize 100 blocks and then we stop it - if ci_mode { + if args.ci_mode { let storage = storage.clone(); loop { let block_finalized = storage.head_block(ctx).await.context("head_block")?; @@ -158,7 +159,6 @@ async fn main() -> anyhow::Result<()> { tracing::info!("Cancel all tasks"); s.cancel(); } - Ok(()) }) .await