From 467de5590881baf22eb425baa3571bef82e00112 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bruno=20Fran=C3=A7a?= Date: Mon, 20 Nov 2023 14:36:24 +0000 Subject: [PATCH 1/2] Delete unused Docker files + misc updates (#36) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit # What ❔ - Removing old Docker files. They are no longer used. - Updated PR template. - Updated docs. - Updated code owners. ## Why ❔ Just cleaning up the repo. --- .dockerignore | 3 - .github/CODEOWNERS | 9 +- .github/pull_request_template.md | 2 +- .github/workflows/docker_build.yaml | 57 ----------- composes/generate/Cargo.lock | 7 -- composes/generate/Cargo.toml | 8 -- composes/generate/src/main.rs | 145 ---------------------------- docker/Dockerfile | 22 ----- docker/compose.Dockerfile | 31 ------ docker/localenv.Dockerfile | 61 ------------ docs/architecture.md | 9 +- docs/launch.md | 16 +-- 12 files changed, 12 insertions(+), 358 deletions(-) delete mode 100644 .dockerignore delete mode 100644 .github/workflows/docker_build.yaml delete mode 100644 composes/generate/Cargo.lock delete mode 100644 composes/generate/Cargo.toml delete mode 100644 composes/generate/src/main.rs delete mode 100644 docker/Dockerfile delete mode 100644 docker/compose.Dockerfile delete mode 100644 docker/localenv.Dockerfile diff --git a/.dockerignore b/.dockerignore deleted file mode 100644 index 05e62c70..00000000 --- a/.dockerignore +++ /dev/null @@ -1,3 +0,0 @@ -**/.git -node/target -tools/target diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index cffd80d3..b77c7568 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -1,9 +1,12 @@ CODEOWNERS @brunoffranca -/node/actors/consensus/ @brunoffranca -/node/actors/executor/ @brunoffranca @pompon0 +/node/actors/consensus/ @brunoffranca @moshababo +/node/actors/executor/ @brunoffranca @pompon0 @slowli /node/actors/network/ @pompon0 /node/actors/sync_blocks/ @slowli /node/libs/concurrency/ @pompon0 -/node/libs/crypto/ @brunoffranca \ No newline at end of file +/node/libs/crypto/ @brunoffranca +/node/libs/protobuf/ @pompon0 +/node/libs/protobuf_build/ @pompon0 +/node/libs/storage/ @slowli \ No newline at end of file diff --git a/.github/pull_request_template.md b/.github/pull_request_template.md index 304b0d3a..6dc78a03 100644 --- a/.github/pull_request_template.md +++ b/.github/pull_request_template.md @@ -1,4 +1,4 @@ -# What ❔ +## What ❔ diff --git a/.github/workflows/docker_build.yaml b/.github/workflows/docker_build.yaml deleted file mode 100644 index 08735179..00000000 --- a/.github/workflows/docker_build.yaml +++ /dev/null @@ -1,57 +0,0 @@ -name: Build and push Docker image - -on: - push: - branches: [ "main" ] - -concurrency: - group: ${{ github.workflow }}-${{ github.event.pull_request.number || github.ref }} - cancel-in-progress: true - -jobs: - build-push-image: - name: Build and push Docker image - runs-on: [matterlabs-ci-runner] - steps: - - uses: actions/checkout@v3 - - - name: Set outputs - id: vars - run: echo "sha_short=$(git rev-parse --short HEAD)" >> $GITHUB_OUTPUT - - - name: Login to GAR - run: | - gcloud auth configure-docker us-docker.pkg.dev -q - - name: Set up QEMU - uses: docker/setup-qemu-action@v2 - - name: Set up Docker Buildx - uses: docker/setup-buildx-action@v2 - - name: Generate build ID for Flux Image Automation - id: set_version - run: | - if [[ ${{ github.ref }} =~ "refs/tags" ]]; then - echo "image_tag_suffix=${GITHUB_REF#refs/*/}" >> $GITHUB_OUTPUT - else - sha=$(git rev-parse --short HEAD) - ts=$(date +%s%N | cut -b1-13) - echo "image_tag_suffix=${sha}-${ts}" >> $GITHUB_OUTPUT - fi - - name: Log in to Docker Hub - uses: docker/login-action@f4ef78c080cd8ba55a85445d5b36e214a81df20a # v2.1.0 - with: - username: ${{ secrets.DOCKERHUB_USER }} - password: ${{ secrets.DOCKERHUB_TOKEN }} - - name: Build and push - id: docker_build_push_tag - uses: docker/build-push-action@v3 - with: - context: . - file: docker/Dockerfile - push: ${{ github.event_name == 'push' && (github.ref == 'refs/heads/main' || startsWith(github.ref, 'refs/tags/')) }} - tags: | - "matterlabs/zksync-bft:latest" - "matterlabs/zksync-bft:${{ steps.set_version.outputs.image_tag_suffix }}" - "us-docker.pkg.dev/matterlabs-infra/matterlabs-docker/zksync-bft:latest" - "us-docker.pkg.dev/matterlabs-infra/matterlabs-docker/zksync-bft:${{ steps.set_version.outputs.image_tag_suffix }}" - - name: Print image digest - run: echo ${{ steps.docker_build.outputs.digest }} diff --git a/composes/generate/Cargo.lock b/composes/generate/Cargo.lock deleted file mode 100644 index 41cb2aea..00000000 --- a/composes/generate/Cargo.lock +++ /dev/null @@ -1,7 +0,0 @@ -# This file is automatically @generated by Cargo. -# It is not intended for manual editing. -version = 3 - -[[package]] -name = "generate" -version = "0.1.0" diff --git a/composes/generate/Cargo.toml b/composes/generate/Cargo.toml deleted file mode 100644 index 22de7aa4..00000000 --- a/composes/generate/Cargo.toml +++ /dev/null @@ -1,8 +0,0 @@ -[package] -name = "generate" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] diff --git a/composes/generate/src/main.rs b/composes/generate/src/main.rs deleted file mode 100644 index 5a4f66e3..00000000 --- a/composes/generate/src/main.rs +++ /dev/null @@ -1,145 +0,0 @@ -use std::env; -use std::fs::File; -use std::io::prelude::*; - -const START: &str = r#" -# this file is auto generated by generate.py -# do not edit it manually - -version: '3.9' -services: -"#; - -const END: &str = r#" -networks: - intrnl: - ipam: - driver: default - config: - - subnet: "192.168.92.0/24" - -# This Docker Compose file defines a multi-container application with four services: -# app0, app1, app2, and app3. The file specifies how the services should be built, -# configured, and run. - -# This app0 ... app3 are the nodes of the blockchain network. Each node is a container -# running a blockchain node. The nodes are connected to each other via a TCP protocol. - -# This specifies that the app0...3 service should be built using the Dockerfile located -# in the composes directory, with the node argument set to node0...3. The service should -# also expose port 3333..3336 on the host and map it to port 3333..3336 on the container. -# The command field specifies the command that should be run when the container is started, -# which in this case is ./executor 0..3. The networks field specifies that the service -# should be connected to the internal network and should be assigned the IP address 192.168.92.21...24. -"#; - -const LOOP_BODY: &str = r#" - app#i: - build: - context: .. - dockerfile: ./docker/compose.Dockerfile - args: - node: node#i - port: #port - ports: - - '#port:#port' - command: ./executor #i --ci-mode - environment: - - 'RUST_LOG=DEBUG' - networks: - intrnl: - ipv4_address: '#ip' - volumes: - - ../logs/node#i:/usr/src/myapp/artifacts/node#i/logs -"#; - -fn get_ip_by_i(index: usize) -> String { - format!("192.168.92.{}", index + 21) -} - -fn get_port_by_i(index: usize) -> String { - format!("{}", index + 3333) -} - -fn get_loop_body(size: usize) -> String { - let mut body = String::new(); - for i in 0..size { - body += &LOOP_BODY - .replace("#ip", &get_ip_by_i(i)) - .replace("#port", &get_port_by_i(i)) - .replace("#i", &i.to_string()); - } - body -} - -fn get_compose_file(size: usize) -> String { - format!("{}{}{}", START, get_loop_body(size), END) -} - -fn sizes() -> Vec { - let small: Vec = (1..4).map(|i| 3 * i + 1).collect(); - let big: Vec = (1..4).map(|i| 3 * 5 * i + 1).collect(); - let possible: Vec = (1..334).map(|i| 3 * i + 1).collect(); - let pow2: Vec = (1..7) - .filter_map(|i| { - let num = 2usize.pow(i as u32); - if possible.contains(&num) { - Some(num) - } else { - None - } - }) - .collect(); - - let mut sizes: Vec = small; - sizes.extend(big); - sizes.extend(pow2); - sizes.sort(); - - sizes -} - -fn save_compose_file(size: usize) { - let file_path = format!("../../composes/docker-compose.local{}.yml", size); - let mut file = File::create(&file_path).expect("Failed to create file"); - let compose_file = get_compose_file(size); - file.write_all(compose_file.as_bytes()) - .expect("Failed to write to file"); - println!("saved {}", file_path); -} - -fn main() { - let args: Vec = env::args().collect(); - - if args.len() < 2 { - println!("usage: cargo run "); - std::process::exit(1); - } - - let what_to_do = &args[1]; - - if what_to_do == "all" { - for i in sizes() { - save_compose_file(i); - } - std::process::exit(0); - } - - let size = match args[1].parse::() { - Ok(value) => value, - Err(_) => { - println!("Invalid size argument"); - std::process::exit(1); - } - }; - - let supported_sizes = (1..=67).step_by(3).collect::>(); - - if !supported_sizes.contains(&size) { - println!("size {} is not supported", size); - std::process::exit(1); - } - - save_compose_file(size); - std::process::exit(0); -} diff --git a/docker/Dockerfile b/docker/Dockerfile deleted file mode 100644 index 75838ea9..00000000 --- a/docker/Dockerfile +++ /dev/null @@ -1,22 +0,0 @@ -# syntax=docker/dockerfile:experimental -FROM rust:1.71-buster as builder - -RUN apt update -RUN apt install -y clang - -WORKDIR /usr/src/zksync-bft/ -COPY . . - -WORKDIR /usr/src/zksync-bft/node/ -RUN cargo build --release --target-dir ./ - -FROM debian:buster-slim -EXPOSE 3333 -EXPOSE 3334 -EXPOSE 3335 -EXPOSE 3336 -COPY --from=builder --chmod=+x /usr/src/zksync-bft/node/release/executor /usr/bin/executor - -ENV RUST_LOG=DEBUG - -CMD ["/usr/bin/executor"] diff --git a/docker/compose.Dockerfile b/docker/compose.Dockerfile deleted file mode 100644 index b7c8d786..00000000 --- a/docker/compose.Dockerfile +++ /dev/null @@ -1,31 +0,0 @@ -FROM rust:1.71 - -# Define a build-time argument called "node" -ARG node - -# Set the working directory to /usr/src/myapp/artifacts -# Directory structure: -# artifacts -# node0..4 -# config.toml -# private_key.toml -# executor -WORKDIR /usr/src/myapp/artifacts - -# Copy the contents of the node0..4 artifacts directory from the "localenvc" image -# into the current working directory's -COPY --from=localenvc /usr/src/myapp/node/artifacts/$node/* $node/ - -# Make the executor binary in the node0..4 directory executable -RUN chmod +x $node/executor - -# Expose public ports 3333, 3334, 3335, and 3336 where node tcp servers will be listening -EXPOSE $port - -# Set the working directory to the directory specified by the "node" argument -# it will be node0..4 -WORKDIR /usr/src/myapp/artifacts/$node/ -RUN mkdir /usr/src/myapp/artifacts/$node/logs/ - -# You can ignore this command. In docker-compose.yml file we have specified the different command -CMD ["./executor", "0"] diff --git a/docker/localenv.Dockerfile b/docker/localenv.Dockerfile deleted file mode 100644 index 426293ba..00000000 --- a/docker/localenv.Dockerfile +++ /dev/null @@ -1,61 +0,0 @@ -# Start from the official Rust image with version 1.71 -# and give it the alias "localenvc" -FROM rust:1.71 as localenvc - -ARG nodes - -WORKDIR /usr/src/myapp - -# Copy the current directory (.) into the working directory of the image -COPY . . - -# Install -# clang package required for rocksdb -# cmake package required for capnp -RUN apt update -RUN apt install -y clang -RUN apt install -y cmake - -# Build tools crate and create the config files -WORKDIR /usr/src/myapp/node -RUN cargo run -p tools --bin localnet_config -- --nodes=$nodes - -# Build binary file in release mode and create a main release binary -WORKDIR /usr/src/myapp/node -RUN cargo build -p tools --bin executor --release - -# Create the artifacts directory -WORKDIR /usr/src/myapp/node/ -RUN i=0; \ - while [ $i -lt $nodes ]; do \ - mkdir -p "artifacts/node$i"; \ - i=$(expr $i + 1); \ - done - -# Copy the binary file to the artifacts directory -RUN i=0; \ - while [ $i -lt $nodes ]; do \ - cp target/release/executor "artifacts/node$i/"; \ - i=$(expr $i + 1); \ - done - -# Copy the config file to the artifacts directory -RUN i=0; \ - while [ $i -lt $nodes ]; do \ - cp ../configs/localnet/node$i/* "artifacts/node$i/"; \ - i=$(expr $i + 1); \ - done - -# Check config files for each node -WORKDIR /usr/src/myapp/node/artifacts/ -RUN i=0; \ - while [ $i -lt $nodes ]; do \ - chmod +x "node$i/executor"; \ - cd "node$i/"; \ - ./executor $i --verify-config; \ - cd "../"; \ - i=$(expr $i + 1); \ - done - -# Report succes -CMD ["echo", "Done"] diff --git a/docs/architecture.md b/docs/architecture.md index 15fc4cd5..a9bbfc63 100644 --- a/docs/architecture.md +++ b/docs/architecture.md @@ -24,9 +24,9 @@ The library crates are just crates that have some basic functionality that can b - the `crypto` crate, which provides several cryptographic primitives. -- the `roles` crate, which implements the types necessary for each role in the network. We have just two roles: `Node` and `Validator`. +- the `protobuf` and `protbuf_build` crates, which contains all the code to create the protobuf schemas used by the other crates. -- the `schema` crate, which contains all the protobuf schemas used by the other crates. +- the `roles` crate, which implements the types necessary for each role in the network. We have just two roles: `Node` and `Validator`. - the `storage` crate is responsible for storing the current state of the system and providing an interface to access the stored data. It is a key component of the system that ensures the persistence of data and the ability to retrieve it when needed. @@ -36,8 +36,6 @@ The library crates are just crates that have some basic functionality that can b This section provides a physical map of folders & files in this repository. -- `/composes` & `/docker`: Project docker files. - - `/infrastructure`: Infrastructure scripts that are needed to test the zkSync Era Consensus Layer. - `/node` @@ -53,8 +51,9 @@ This section provides a physical map of folders & files in this repository. - `/concurrency`: Crate with essential primitives for structured concurrency. - `/crypto`: Cryptographic primitives used by the other crates. + - `/protobuf`: Code generated from protobuf schema files and utilities for serialization. + - `/protobuf_build`: Generates rust code from the proto files. - `/roles`: Essential types for the different node roles. - - `/schema`: Protobuf schemas used by the other crates. - `/storage`: Storage layer for the node. - `/utils`: Collection of small utilities. diff --git a/docs/launch.md b/docs/launch.md index 42940b7f..05f06c1b 100644 --- a/docs/launch.md +++ b/docs/launch.md @@ -12,18 +12,4 @@ sysctl -w net.ipv4.tcp_slow_start_after_idle=0 ## Running as an application -TBD - -## Running as a local testnet (OUTDATED) - -You can see localenv.Dockerfile where we build all local configurations and prepare binary file for node - -``` -DOCKER_BUILDKIT=1 docker image build --no-cache --build-arg nodes=16 . -f docker/localenv.Dockerfile -t localenvc -``` - -Using docker compose we start 16 nodes - -``` -docker compose -f composes/docker-compose.local16.yml up -``` +TBD \ No newline at end of file From 9f58e688e70f277550b14cb2a9bc343c874bb0eb Mon Sep 17 00:00:00 2001 From: pompon0 Date: Tue, 21 Nov 2023 00:09:27 +0100 Subject: [PATCH 2/2] Made consensus actor write blocks directly to storage (#38) Consensus actor has access to storage anyway and storage supports observing its state. Having a dedicated channel for broadcasting final blocks is redundant in this setup. --- node/actors/bft/src/io.rs | 3 - node/actors/bft/src/lib.rs | 14 +-- node/actors/bft/src/metrics.rs | 2 + node/actors/bft/src/replica/block.rs | 22 +++-- node/actors/bft/src/replica/leader_commit.rs | 6 +- node/actors/bft/src/replica/leader_prepare.rs | 6 +- node/actors/bft/src/replica/new_view.rs | 4 +- node/actors/bft/src/replica/state_machine.rs | 83 +++++++--------- node/actors/bft/src/replica/tests.rs | 3 +- node/actors/bft/src/testonly/make.rs | 4 +- node/actors/bft/src/testonly/node.rs | 23 +---- node/actors/bft/src/testonly/run.rs | 98 +++++++++---------- node/actors/executor/src/io.rs | 15 --- node/actors/executor/src/lib.rs | 25 ++--- node/actors/executor/src/metrics.rs | 15 --- node/actors/executor/src/tests.rs | 61 +++--------- node/libs/storage/src/lib.rs | 2 +- node/libs/storage/src/replica_state.rs | 35 ++++--- node/tools/src/main.rs | 13 +-- 19 files changed, 172 insertions(+), 262 deletions(-) delete mode 100644 node/actors/executor/src/metrics.rs diff --git a/node/actors/bft/src/io.rs b/node/actors/bft/src/io.rs index 3636e051..b29dc758 100644 --- a/node/actors/bft/src/io.rs +++ b/node/actors/bft/src/io.rs @@ -1,7 +1,6 @@ //! Input and output messages for the Consensus actor. These are processed by the executor actor. use zksync_consensus_network::io::{ConsensusInputMessage, ConsensusReq}; -use zksync_consensus_roles::validator; /// All the messages that other actors can send to the Consensus actor. #[derive(Debug)] @@ -15,8 +14,6 @@ pub enum InputMessage { pub enum OutputMessage { /// Message types to the Network actor. Network(ConsensusInputMessage), - /// Message types to the Sync actor. - FinalizedBlock(validator::FinalBlock), } impl From for OutputMessage { diff --git a/node/actors/bft/src/lib.rs b/node/actors/bft/src/lib.rs index dcc0b142..dda99da5 100644 --- a/node/actors/bft/src/lib.rs +++ b/node/actors/bft/src/lib.rs @@ -21,7 +21,7 @@ use inner::ConsensusInner; use tracing::{info, instrument}; use zksync_concurrency::ctx; use zksync_consensus_roles::validator; -use zksync_consensus_storage::FallbackReplicaStateStore; +use zksync_consensus_storage::ReplicaStore; use zksync_consensus_utils::pipe::ActorPipe; mod inner; @@ -53,7 +53,7 @@ impl Consensus { pipe: ActorPipe, secret_key: validator::SecretKey, validator_set: validator::ValidatorSet, - storage: FallbackReplicaStateStore, + storage: ReplicaStore, ) -> anyhow::Result { Ok(Consensus { inner: ConsensusInner { @@ -69,7 +69,7 @@ impl Consensus { /// Starts the Consensus actor. It will start running, processing incoming messages and /// sending output messages. This is a blocking method. #[instrument(level = "trace", ret)] - pub fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { info!( "Starting consensus actor {:?}", self.inner.secret_key.public() @@ -78,6 +78,7 @@ impl Consensus { // We need to start the replica before processing inputs. self.replica .start(ctx, &self.inner) + .await .context("replica.start()")?; // This is the infinite loop where the consensus actually runs. The validator waits for either @@ -87,7 +88,7 @@ impl Consensus { .inner .pipe .recv(&ctx.with_deadline(self.replica.timeout_deadline)) - .block() + .await .ok(); // We check if the context is active before processing the input. If the context is not active, @@ -114,7 +115,8 @@ impl Consensus { validator::ConsensusMsg::LeaderPrepare(_) | validator::ConsensusMsg::LeaderCommit(_) => { self.replica - .process_input(ctx, &self.inner, Some(req.msg))?; + .process_input(ctx, &self.inner, Some(req.msg)) + .await?; } } // Notify network actor that the message has been processed. @@ -122,7 +124,7 @@ impl Consensus { let _ = req.ack.send(()); } None => { - self.replica.process_input(ctx, &self.inner, None)?; + self.replica.process_input(ctx, &self.inner, None).await?; } } } diff --git a/node/actors/bft/src/metrics.rs b/node/actors/bft/src/metrics.rs index 8fef767e..6412860e 100644 --- a/node/actors/bft/src/metrics.rs +++ b/node/actors/bft/src/metrics.rs @@ -68,6 +68,8 @@ pub(crate) struct ConsensusMetrics { /// Latency of processing messages by the leader. #[metrics(buckets = Buckets::LATENCIES, unit = Unit::Seconds)] pub(crate) leader_processing_latency: Family>, + /// Number of the last finalized block observed by the node. + pub(crate) finalized_block_number: Gauge, } /// Global instance of [`ConsensusMetrics`]. diff --git a/node/actors/bft/src/replica/block.rs b/node/actors/bft/src/replica/block.rs index 92985b74..66a4562b 100644 --- a/node/actors/bft/src/replica/block.rs +++ b/node/actors/bft/src/replica/block.rs @@ -1,6 +1,8 @@ use super::StateMachine; -use crate::{inner::ConsensusInner, io::OutputMessage}; +use crate::inner::ConsensusInner; +use anyhow::Context as _; use tracing::{info, instrument}; +use zksync_concurrency::ctx; use zksync_consensus_roles::validator; impl StateMachine { @@ -8,11 +10,12 @@ impl StateMachine { /// block proposal cache for the matching block, and if we find it we build the block. /// If this method succeeds, it sends the finalized block to the executor. #[instrument(level = "trace", ret)] - pub(crate) fn build_block( + pub(crate) async fn save_block( &mut self, + ctx: &ctx::Ctx, consensus: &ConsensusInner, commit_qc: &validator::CommitQC, - ) { + ) -> anyhow::Result<()> { // TODO(gprusak): for availability of finalized blocks, // replicas should be able to broadcast highest quorums without // the corresponding block (same goes for synchronization). @@ -20,10 +23,10 @@ impl StateMachine { .block_proposal_cache .get(&commit_qc.message.proposal.number) else { - return; + return Ok(()); }; let Some(payload) = cache.get(&commit_qc.message.proposal.payload) else { - return; + return Ok(()); }; let block = validator::FinalBlock { header: commit_qc.message.proposal, @@ -35,7 +38,14 @@ impl StateMachine { "Finalized a block!\nFinal block: {:#?}", block.header.hash() ); + self.storage + .put_block(ctx, &block) + .await + .context("store.put_block()")?; - consensus.pipe.send(OutputMessage::FinalizedBlock(block)); + let number_metric = &crate::metrics::METRICS.finalized_block_number; + let current_number = number_metric.get(); + number_metric.set(current_number.max(block.header.number.0)); + Ok(()) } } diff --git a/node/actors/bft/src/replica/leader_commit.rs b/node/actors/bft/src/replica/leader_commit.rs index ba0fd3e6..5eccb92d 100644 --- a/node/actors/bft/src/replica/leader_commit.rs +++ b/node/actors/bft/src/replica/leader_commit.rs @@ -41,7 +41,7 @@ impl StateMachine { /// Processes a leader commit message. We can approve this leader message even if we /// don't have the block proposal stored. It is enough to see the justification. #[instrument(level = "trace", err)] - pub(crate) fn process_leader_commit( + pub(crate) async fn process_leader_commit( &mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner, @@ -86,7 +86,8 @@ impl StateMachine { // ----------- All checks finished. Now we process the message. -------------- // Try to create a finalized block with this CommitQC and our block proposal cache. - self.build_block(consensus, &message.justification); + self.save_block(ctx, consensus, &message.justification) + .await?; // Update the state machine. We don't update the view and phase (or backup our state) here // because we will do it when we start the new view. @@ -97,6 +98,7 @@ impl StateMachine { // Start a new view. But first we skip to the view of this message. self.view = view; self.start_new_view(ctx, consensus) + .await .context("start_new_view()")?; Ok(()) diff --git a/node/actors/bft/src/replica/leader_prepare.rs b/node/actors/bft/src/replica/leader_prepare.rs index 47b21744..ffed5991 100644 --- a/node/actors/bft/src/replica/leader_prepare.rs +++ b/node/actors/bft/src/replica/leader_prepare.rs @@ -108,7 +108,7 @@ pub(crate) enum Error { impl StateMachine { /// Processes a leader prepare message. #[instrument(level = "trace", ret)] - pub(crate) fn process_leader_prepare( + pub(crate) async fn process_leader_prepare( &mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner, @@ -189,7 +189,7 @@ impl StateMachine { // Try to create a finalized block with this CommitQC and our block proposal cache. // This gives us another chance to finalize a block that we may have missed before. - self.build_block(consensus, &highest_qc); + self.save_block(ctx, consensus, &highest_qc).await?; // ----------- Checking the block proposal -------------- @@ -276,7 +276,7 @@ impl StateMachine { } // Backup our state. - self.backup_state(ctx).context("backup_state()")?; + self.backup_state(ctx).await.context("backup_state()")?; // Send the replica message to the leader. let output_message = ConsensusInputMessage { diff --git a/node/actors/bft/src/replica/new_view.rs b/node/actors/bft/src/replica/new_view.rs index f9571f2c..9ccc34c9 100644 --- a/node/actors/bft/src/replica/new_view.rs +++ b/node/actors/bft/src/replica/new_view.rs @@ -9,7 +9,7 @@ use zksync_consensus_roles::validator; impl StateMachine { /// This blocking method is used whenever we start a new view. #[instrument(level = "trace", err)] - pub(crate) fn start_new_view( + pub(crate) async fn start_new_view( &mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner, @@ -27,7 +27,7 @@ impl StateMachine { .retain(|k, _| k > &self.high_qc.message.proposal.number); // Backup our state. - self.backup_state(ctx).context("backup_state")?; + self.backup_state(ctx).await.context("backup_state")?; // Send the replica message to the next leader. let output_message = ConsensusInputMessage { diff --git a/node/actors/bft/src/replica/state_machine.rs b/node/actors/bft/src/replica/state_machine.rs index a3a3e983..dcc2f480 100644 --- a/node/actors/bft/src/replica/state_machine.rs +++ b/node/actors/bft/src/replica/state_machine.rs @@ -2,10 +2,10 @@ use crate::{metrics, ConsensusInner}; use anyhow::Context as _; use std::collections::{BTreeMap, HashMap}; use tracing::instrument; -use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _, scope, time}; +use zksync_concurrency::{ctx, metrics::LatencyHistogramExt as _, time}; use zksync_consensus_roles::validator; use zksync_consensus_storage as storage; -use zksync_consensus_storage::{FallbackReplicaStateStore, StorageError}; +use zksync_consensus_storage::ReplicaStore; /// The StateMachine struct contains the state of the replica. This is the most complex state machine and is responsible /// for validating and voting on blocks. When participating in consensus we are always a replica. @@ -24,17 +24,15 @@ pub(crate) struct StateMachine { BTreeMap>, /// The deadline to receive an input message. pub(crate) timeout_deadline: time::Deadline, - /// A reference to the storage module. We use it to backup the replica state. - pub(crate) storage: FallbackReplicaStateStore, + /// A reference to the storage module. We use it to backup the replica state and store + /// finalized blocks. + pub(crate) storage: ReplicaStore, } impl StateMachine { /// Creates a new StateMachine struct. We try to recover a past state from the storage module, /// otherwise we initialize the state machine with whatever head block we have. - pub(crate) async fn new( - ctx: &ctx::Ctx, - storage: FallbackReplicaStateStore, - ) -> anyhow::Result { + pub(crate) async fn new(ctx: &ctx::Ctx, storage: ReplicaStore) -> anyhow::Result { let backup = storage.replica_state(ctx).await?; let mut block_proposal_cache: BTreeMap<_, HashMap<_, _>> = BTreeMap::new(); for proposal in backup.proposals { @@ -59,13 +57,14 @@ impl StateMachine { /// we are able to process inputs. If we are in the genesis block, then we start a new view, /// this will kick start the consensus algorithm. Otherwise, we just start the timer. #[instrument(level = "trace", ret)] - pub(crate) fn start( + pub(crate) async fn start( &mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner, ) -> anyhow::Result<()> { if self.view == validator::ViewNumber(0) { self.start_new_view(ctx, consensus) + .await .context("start_new_view") } else { self.reset_timer(ctx); @@ -77,7 +76,7 @@ impl StateMachine { /// the main entry point for the state machine. We need read-access to the inner consensus struct. /// As a result, we can modify our state machine or send a message to the executor. #[instrument(level = "trace", ret)] - pub(crate) fn process_input( + pub(crate) async fn process_input( &mut self, ctx: &ctx::Ctx, consensus: &ConsensusInner, @@ -86,38 +85,42 @@ impl StateMachine { let Some(signed_msg) = input else { tracing::warn!("We timed out before receiving a message."); // Start new view. - self.start_new_view(ctx, consensus)?; + self.start_new_view(ctx, consensus).await?; return Ok(()); }; let now = ctx.now(); let label = match &signed_msg.msg { validator::ConsensusMsg::LeaderPrepare(_) => { - let res = - match self.process_leader_prepare(ctx, consensus, signed_msg.cast().unwrap()) { - Err(super::leader_prepare::Error::Internal(err)) => { - return Err(err).context("process_leader_prepare()") - } - Err(err) => { - tracing::warn!("process_leader_prepare(): {err:#}"); - Err(()) - } - Ok(()) => Ok(()), - }; + let res = match self + .process_leader_prepare(ctx, consensus, signed_msg.cast().unwrap()) + .await + { + Err(super::leader_prepare::Error::Internal(err)) => { + return Err(err).context("process_leader_prepare()") + } + Err(err) => { + tracing::warn!("process_leader_prepare(): {err:#}"); + Err(()) + } + Ok(()) => Ok(()), + }; metrics::ConsensusMsgLabel::LeaderPrepare.with_result(&res) } validator::ConsensusMsg::LeaderCommit(_) => { - let res = - match self.process_leader_commit(ctx, consensus, signed_msg.cast().unwrap()) { - Err(super::leader_commit::Error::Internal(err)) => { - return Err(err).context("process_leader_commit()") - } - Err(err) => { - tracing::warn!("process_leader_commit(): {err:#}"); - Err(()) - } - Ok(()) => Ok(()), - }; + let res = match self + .process_leader_commit(ctx, consensus, signed_msg.cast().unwrap()) + .await + { + Err(super::leader_commit::Error::Internal(err)) => { + return Err(err).context("process_leader_commit()") + } + Err(err) => { + tracing::warn!("process_leader_commit(): {err:#}"); + Err(()) + } + Ok(()) => Ok(()), + }; metrics::ConsensusMsgLabel::LeaderCommit.with_result(&res) } _ => unreachable!(), @@ -127,7 +130,7 @@ impl StateMachine { } /// Backups the replica state to disk. - pub(crate) fn backup_state(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub(crate) async fn backup_state(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let mut proposals = vec![]; for (number, payloads) in &self.block_proposal_cache { proposals.extend(payloads.values().map(|p| storage::Proposal { @@ -142,17 +145,7 @@ impl StateMachine { high_qc: self.high_qc.clone(), proposals, }; - - let store_result = scope::run_blocking!(ctx, |ctx, s| { - let backup_future = self.storage.put_replica_state(ctx, &backup); - s.spawn(backup_future).join(ctx).block()?; - Ok(()) - }); - match store_result { - Ok(()) => { /* Everything went fine */ } - Err(StorageError::Canceled(_)) => tracing::trace!("Storing replica state was canceled"), - Err(StorageError::Database(err)) => return Err(err), - } + self.storage.put_replica_state(ctx, &backup).await?; Ok(()) } } diff --git a/node/actors/bft/src/replica/tests.rs b/node/actors/bft/src/replica/tests.rs index 8fb3aedc..337a73c8 100644 --- a/node/actors/bft/src/replica/tests.rs +++ b/node/actors/bft/src/replica/tests.rs @@ -21,10 +21,11 @@ async fn start_new_view_not_leader() { consensus.replica.high_qc.message.view = ViewNumber(0); scope::run!(ctx, |ctx, s| { - s.spawn_blocking(|| { + s.spawn(async { consensus .replica .start_new_view(ctx, &consensus.inner) + .await .unwrap(); Ok(()) }) diff --git a/node/actors/bft/src/testonly/make.rs b/node/actors/bft/src/testonly/make.rs index ca99edec..568096f3 100644 --- a/node/actors/bft/src/testonly/make.rs +++ b/node/actors/bft/src/testonly/make.rs @@ -7,7 +7,7 @@ use crate::{ use std::sync::Arc; use zksync_concurrency::ctx; use zksync_consensus_roles::validator; -use zksync_consensus_storage::{FallbackReplicaStateStore, InMemoryStorage}; +use zksync_consensus_storage::{InMemoryStorage, ReplicaStore}; use zksync_consensus_utils::pipe::{self, DispatcherPipe}; /// This creates a mock Consensus struct for unit tests. @@ -27,7 +27,7 @@ pub async fn make_consensus( consensus_pipe, key.clone(), validator_set.clone(), - FallbackReplicaStateStore::from_store(Arc::new(storage)), + ReplicaStore::from_store(Arc::new(storage)), ); let consensus = consensus .await diff --git a/node/actors/bft/src/testonly/node.rs b/node/actors/bft/src/testonly/node.rs index ae8b6f68..d696a325 100644 --- a/node/actors/bft/src/testonly/node.rs +++ b/node/actors/bft/src/testonly/node.rs @@ -1,19 +1,13 @@ use super::Fuzz; use crate::io; use rand::Rng; -use zksync_concurrency::{ctx, ctx::channel, scope}; +use std::sync::Arc; +use zksync_concurrency::{ctx, scope}; use zksync_consensus_network as network; use zksync_consensus_network::io::ConsensusInputMessage; -use zksync_consensus_roles::validator; +use zksync_consensus_storage::InMemoryStorage; use zksync_consensus_utils::pipe::DispatcherPipe; -/// A struct containing metrics information. Right now it's just a finalized block. -#[derive(Debug)] -pub(super) struct Metrics { - pub(crate) validator: validator::PublicKey, - pub(crate) finalized_block: validator::FinalBlock, -} - /// Enum representing the behavior of the node. #[derive(Clone, Copy, Debug, PartialEq, Eq)] pub(crate) enum Behavior { @@ -33,6 +27,7 @@ pub(crate) enum Behavior { pub(super) struct Node { pub(crate) net: network::testonly::Instance, pub(crate) behavior: Behavior, + pub(crate) storage: Arc, } impl Node { @@ -42,15 +37,12 @@ impl Node { ctx: &ctx::Ctx, consensus_pipe: DispatcherPipe, network_pipe: DispatcherPipe, - metrics: channel::UnboundedSender, ) -> anyhow::Result<()> { - let key = self.net.consensus_config().key.public(); let rng = &mut ctx.rng(); let mut net_recv = network_pipe.recv; let net_send = network_pipe.send; let mut con_recv = consensus_pipe.recv; let con_send = consensus_pipe.send; - scope::run!(ctx, |ctx, s| async { s.spawn(async { while let Ok(network_message) = net_recv.recv(ctx).await { @@ -69,13 +61,6 @@ impl Node { // Get the next message from the channel. Our response depends on what type of replica we are. while let Ok(msg) = con_recv.recv(ctx).await { match msg { - io::OutputMessage::FinalizedBlock(block) => { - // Send the finalized block to the watcher. - metrics.send(Metrics { - validator: key.clone(), - finalized_block: block, - }) - } io::OutputMessage::Network(mut message) => { let message_to_send = match self.behavior { Behavior::Offline => continue, diff --git a/node/actors/bft/src/testonly/run.rs b/node/actors/bft/src/testonly/run.rs index d1eb3eae..9581a556 100644 --- a/node/actors/bft/src/testonly/run.rs +++ b/node/actors/bft/src/testonly/run.rs @@ -1,15 +1,12 @@ -use super::{Behavior, Metrics, Node}; +use super::{Behavior, Node}; use crate::{testonly, Consensus}; use anyhow::Context; -use std::{ - collections::{HashMap, HashSet}, - sync::Arc, -}; +use std::{collections::HashMap, sync::Arc}; use tracing::Instrument as _; -use zksync_concurrency::{ctx, ctx::channel, oneshot, scope, signal}; +use zksync_concurrency::{ctx, oneshot, scope, signal, sync}; use zksync_consensus_network as network; use zksync_consensus_roles::validator; -use zksync_consensus_storage::{FallbackReplicaStateStore, InMemoryStorage}; +use zksync_consensus_storage::{BlockStore, InMemoryStorage, ReplicaStore}; use zksync_consensus_utils::pipe; #[derive(Clone, Copy)] @@ -30,63 +27,61 @@ impl Test { /// Run a test with the given parameters. pub(crate) async fn run(&self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let rng = &mut ctx.rng(); - let nodes: Vec<_> = network::testonly::Instance::new(rng, self.nodes.len(), 1) + let nodes: Vec<_> = network::testonly::Instance::new(rng, self.nodes.len(), 1); + let keys: Vec<_> = nodes + .iter() + .map(|node| node.consensus_config().key.clone()) + .collect(); + let (genesis_block, _) = testonly::make_genesis(&keys, validator::Payload(vec![])); + let nodes: Vec<_> = nodes .into_iter() .enumerate() .map(|(i, net)| Node { net, behavior: self.nodes[i], + storage: Arc::new(InMemoryStorage::new(genesis_block.clone())), }) .collect(); - scope::run!(ctx, |ctx, s| async { - let (metrics_send, mut metrics_recv) = channel::unbounded(); - s.spawn_bg(run_nodes(ctx, self.network, &nodes, metrics_send)); - - // Get only the honest replicas. - let honest: HashSet<_> = nodes - .iter() - .filter(|node| node.behavior == Behavior::Honest) - .map(|node| node.net.consensus_config().key.public()) - .collect(); - assert!(!honest.is_empty()); - let mut finalized: HashMap = - HashMap::new(); - let mut observers: HashMap> = - HashMap::new(); - let mut fully_observed = 0; + // Get only the honest replicas. + let honest: Vec<_> = nodes + .iter() + .filter(|node| node.behavior == Behavior::Honest) + .collect(); + assert!(!honest.is_empty()); - while fully_observed < self.blocks_to_finalize { - let metric = metrics_recv.recv(ctx).await?; - if !honest.contains(&metric.validator) { - continue; - } - let block = metric.finalized_block; - let hash = block.header.hash(); - assert_eq!(*finalized.entry(block.header.number).or_insert(hash), hash); - let observers = observers.entry(block.header.number).or_default(); - if observers.insert(metric.validator.clone()) && observers.len() == honest.len() { - fully_observed += 1; - } + // Run the nodes until all honest nodes store enough finalized blocks. + scope::run!(ctx, |ctx, s| async { + s.spawn_bg(run_nodes(ctx, self.network, &nodes)); + for n in &honest { + s.spawn(async { + sync::wait_for( + ctx, + &mut n.storage.subscribe_to_block_writes(), + |block_number| block_number.0 >= self.blocks_to_finalize as u64, + ) + .await?; + Ok(()) + }); } Ok(()) }) - .await + .await?; + + // Check that the stored blocks are consistent. + for i in 0..self.blocks_to_finalize as u64 + 1 { + let i = validator::BlockNumber(i); + let want = honest[0].storage.block(ctx, i).await?; + for n in &honest[1..] { + assert_eq!(want, n.storage.block(ctx, i).await?); + } + } + Ok(()) } } /// Run a set of nodes. -async fn run_nodes( - ctx: &ctx::Ctx, - network: Network, - nodes: &[Node], - metrics: channel::UnboundedSender, -) -> anyhow::Result<()> { - let keys: Vec<_> = nodes - .iter() - .map(|node| node.net.consensus_config().key.clone()) - .collect(); - let (genesis_block, _) = testonly::make_genesis(&keys, validator::Payload(vec![])); +async fn run_nodes(ctx: &ctx::Ctx, network: Network, nodes: &[Node]) -> anyhow::Result<()> { let network_ready = signal::Once::new(); let mut network_pipes = HashMap::new(); let mut network_send = HashMap::new(); @@ -102,8 +97,7 @@ async fn run_nodes( network_pipes.insert(validator_key.public(), network_actor_pipe); s.spawn( async { - let storage = InMemoryStorage::new(genesis_block.clone()); - let storage = FallbackReplicaStateStore::from_store(Arc::new(storage)); + let storage = ReplicaStore::from_store(node.storage.clone()); let consensus = Consensus::new( ctx, @@ -117,8 +111,8 @@ async fn run_nodes( scope::run!(ctx, |ctx, s| async { network_ready.recv(ctx).await?; - s.spawn_blocking(|| consensus.run(ctx).context("consensus.run()")); - node.run_executor(ctx, consensus_pipe, network_pipe, metrics.clone()) + s.spawn(async { consensus.run(ctx).await.context("consensus.run()") }); + node.run_executor(ctx, consensus_pipe, network_pipe) .await .context("executor.run()") }) diff --git a/node/actors/executor/src/io.rs b/node/actors/executor/src/io.rs index 76b1df00..5379092f 100644 --- a/node/actors/executor/src/io.rs +++ b/node/actors/executor/src/io.rs @@ -1,6 +1,4 @@ //! Module to manage the communication between actors. It simply converts and forwards messages from and to each different actor. - -use crate::metrics; use tracing::instrument; use zksync_concurrency::{ ctx::{self, channel}, @@ -12,7 +10,6 @@ use zksync_consensus_bft::io::{ use zksync_consensus_network::io::{ InputMessage as NetworkInputMessage, OutputMessage as NetworkOutputMessage, }; -use zksync_consensus_roles::validator::FinalBlock; use zksync_consensus_sync_blocks::io::{ InputMessage as SyncBlocksInputMessage, OutputMessage as SyncBlocksOutputMessage, }; @@ -28,7 +25,6 @@ pub(super) struct Dispatcher { sync_blocks_output: channel::UnboundedReceiver, network_input: channel::UnboundedSender, network_output: channel::UnboundedReceiver, - blocks_sender: channel::UnboundedSender, } impl Dispatcher { @@ -37,7 +33,6 @@ impl Dispatcher { consensus_pipe: DispatcherPipe, sync_blocks_pipe: DispatcherPipe, network_pipe: DispatcherPipe, - blocks_sender: channel::UnboundedSender, ) -> Self { Dispatcher { consensus_input: consensus_pipe.send, @@ -46,7 +41,6 @@ impl Dispatcher { sync_blocks_output: sync_blocks_pipe.recv, network_input: network_pipe.send, network_output: network_pipe.recv, - blocks_sender, } } @@ -61,15 +55,6 @@ impl Dispatcher { ConsensusOutputMessage::Network(message) => { self.network_input.send(message.into()); } - ConsensusOutputMessage::FinalizedBlock(block) => { - let number_metric = &metrics::METRICS.finalized_block_number; - let current_number = number_metric.get(); - number_metric.set(current_number.max(block.header.number.0)); - // This works because this is the only place where `finalized_block_number` - // is modified, and there should be a single running `Dispatcher`. - - self.blocks_sender.send(block); - } } } Ok(()) diff --git a/node/actors/executor/src/lib.rs b/node/actors/executor/src/lib.rs index a1d779ca..e61abdbc 100644 --- a/node/actors/executor/src/lib.rs +++ b/node/actors/executor/src/lib.rs @@ -2,18 +2,17 @@ use crate::io::Dispatcher; use anyhow::Context as _; -use std::{mem, sync::Arc}; -use zksync_concurrency::{ctx, ctx::channel, net, scope}; +use std::sync::Arc; +use zksync_concurrency::{ctx, net, scope}; use zksync_consensus_bft::{misc::consensus_threshold, Consensus}; use zksync_consensus_network as network; -use zksync_consensus_roles::{node, validator, validator::FinalBlock}; -use zksync_consensus_storage::{FallbackReplicaStateStore, ReplicaStateStore, WriteBlockStore}; +use zksync_consensus_roles::{node, validator}; +use zksync_consensus_storage::{ReplicaStateStore, ReplicaStore, WriteBlockStore}; use zksync_consensus_sync_blocks::SyncBlocks; use zksync_consensus_utils::pipe; mod config; mod io; -mod metrics; pub mod testonly; #[cfg(test)] mod tests; @@ -29,8 +28,6 @@ struct ValidatorExecutor { key: validator::SecretKey, /// Store for replica state. replica_state_store: Arc, - /// Sender of blocks finalized by the consensus algorithm. - blocks_sender: channel::UnboundedSender, } impl ValidatorExecutor { @@ -85,7 +82,6 @@ impl Executor { config: ConsensusConfig, key: validator::SecretKey, replica_state_store: Arc, - blocks_sender: channel::UnboundedSender, ) -> anyhow::Result<()> { let public = &config.key; anyhow::ensure!( @@ -104,7 +100,6 @@ impl Executor { config, key, replica_state_store, - blocks_sender, }); } else { tracing::info!( @@ -142,31 +137,25 @@ impl Executor { } /// Runs this executor to completion. This should be spawned on a separate task. - pub async fn run(mut self, ctx: &ctx::Ctx) -> anyhow::Result<()> { + pub async fn run(self, ctx: &ctx::Ctx) -> anyhow::Result<()> { let network_config = self.network_config(); // Generate the communication pipes. We have one for each actor. let (consensus_actor_pipe, consensus_dispatcher_pipe) = pipe::new(); let (sync_blocks_actor_pipe, sync_blocks_dispatcher_pipe) = pipe::new(); let (network_actor_pipe, network_dispatcher_pipe) = pipe::new(); - let blocks_sender = if let Some(validator) = &mut self.validator { - mem::replace(&mut validator.blocks_sender, channel::unbounded().0) - } else { - channel::unbounded().0 - }; // Create the IO dispatcher. let mut dispatcher = Dispatcher::new( consensus_dispatcher_pipe, sync_blocks_dispatcher_pipe, network_dispatcher_pipe, - blocks_sender, ); // Create each of the actors. let validator_set = &self.executor_config.validators; let consensus = if let Some(validator) = self.validator { let consensus_storage = - FallbackReplicaStateStore::new(validator.replica_state_store, self.storage.clone()); + ReplicaStore::new(validator.replica_state_store, self.storage.clone()); let consensus = Consensus::new( ctx, consensus_actor_pipe, @@ -208,7 +197,7 @@ impl Executor { .context("Network stopped") }); if let Some(consensus) = consensus { - s.spawn_blocking(|| consensus.run(ctx).context("Consensus stopped")); + s.spawn(async { consensus.run(ctx).await.context("Consensus stopped") }); } sync_blocks.run(ctx).await.context("Syncing blocks stopped") }) diff --git a/node/actors/executor/src/metrics.rs b/node/actors/executor/src/metrics.rs deleted file mode 100644 index e404722f..00000000 --- a/node/actors/executor/src/metrics.rs +++ /dev/null @@ -1,15 +0,0 @@ -//! Metrics defined by the executor module. - -use vise::{Gauge, Metrics}; - -/// Metrics defined by the executor module. -#[derive(Debug, Metrics)] -#[metrics(prefix = "executor")] -pub(crate) struct ExecutorMetrics { - /// Number of the last finalized block observed by the node. - pub(crate) finalized_block_number: Gauge, -} - -/// Global instance of [`ExecutorMetrics`]. -#[vise::register] -pub(crate) static METRICS: vise::Global = vise::Global::new(); diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index 8a7a2a77..23c63e07 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -6,26 +6,8 @@ use rand::Rng; use std::iter; use test_casing::test_casing; use zksync_concurrency::{sync, testonly::abort_on_panic, time}; -use zksync_consensus_roles::validator::{BlockNumber, Payload}; -use zksync_consensus_storage::{BlockStore, InMemoryStorage, StorageError}; - -async fn store_final_blocks( - ctx: &ctx::Ctx, - mut blocks_receiver: channel::UnboundedReceiver, - storage: Arc, -) -> anyhow::Result<()> { - while let Ok(block) = blocks_receiver.recv(ctx).await { - tracing::trace!(number = %block.header.number, "Finalized new block"); - if let Err(err) = storage.put_block(ctx, &block).await { - if matches!(err, StorageError::Canceled(_)) { - break; - } else { - return Err(err.into()); - } - } - } - Ok(()) -} +use zksync_consensus_roles::validator::{BlockNumber, FinalBlock, Payload}; +use zksync_consensus_storage::{BlockStore, InMemoryStorage}; impl FullValidatorConfig { fn gen_blocks(&self, rng: &mut impl Rng, count: usize) -> Vec { @@ -49,24 +31,12 @@ impl FullValidatorConfig { blocks.skip(1).take(count).collect() } - fn into_executor( - self, - storage: Arc, - ) -> ( - Executor, - channel::UnboundedReceiver, - ) { - let (blocks_sender, blocks_receiver) = channel::unbounded(); + fn into_executor(self, storage: Arc) -> Executor { let mut executor = Executor::new(self.node_config, self.node_key, storage.clone()).unwrap(); executor - .set_validator( - self.consensus_config, - self.validator_key, - storage, - blocks_sender, - ) + .set_validator(self.consensus_config, self.validator_key, storage) .unwrap(); - (executor, blocks_receiver) + executor } } @@ -80,19 +50,16 @@ async fn executing_single_validator() { let genesis_block = &validator.node_config.genesis_block; let storage = InMemoryStorage::new(genesis_block.clone()); let storage = Arc::new(storage); - let (executor, mut blocks_receiver) = validator.into_executor(storage); + let executor = validator.into_executor(storage.clone()); scope::run!(ctx, |ctx, s| async { s.spawn_bg(executor.run(ctx)); - - let mut expected_block_number = BlockNumber(1); - while expected_block_number < BlockNumber(5) { - let final_block = blocks_receiver.recv(ctx).await?; - tracing::trace!(number = %final_block.header.number, "Finalized new block"); - assert_eq!(final_block.header.number, expected_block_number); - expected_block_number = expected_block_number.next(); - } - anyhow::Ok(()) + let want = BlockNumber(5); + sync::wait_for(ctx, &mut storage.subscribe_to_block_writes(), |n| { + n >= &want + }) + .await?; + Ok(()) }) .await .unwrap(); @@ -114,7 +81,7 @@ async fn executing_validator_and_full_node() { let full_node_storage = Arc::new(full_node_storage); let mut full_node_subscriber = full_node_storage.subscribe_to_block_writes(); - let (validator, blocks_receiver) = validator.into_executor(validator_storage.clone()); + let validator = validator.into_executor(validator_storage.clone()); let full_node = Executor::new( full_node.node_config, full_node.node_key, @@ -125,8 +92,6 @@ async fn executing_validator_and_full_node() { scope::run!(ctx, |ctx, s| async { s.spawn_bg(validator.run(ctx)); s.spawn_bg(full_node.run(ctx)); - s.spawn_bg(store_final_blocks(ctx, blocks_receiver, validator_storage)); - for _ in 0..5 { let number = *sync::changed(ctx, &mut full_node_subscriber).await?; tracing::trace!(%number, "Full node received block"); diff --git a/node/libs/storage/src/lib.rs b/node/libs/storage/src/lib.rs index 0afd516d..9b0947f4 100644 --- a/node/libs/storage/src/lib.rs +++ b/node/libs/storage/src/lib.rs @@ -16,7 +16,7 @@ mod types; pub use crate::rocksdb::RocksdbStorage; pub use crate::{ in_memory::InMemoryStorage, - replica_state::FallbackReplicaStateStore, + replica_state::ReplicaStore, traits::{BlockStore, ReplicaStateStore, WriteBlockStore}, types::{Proposal, ReplicaState, StorageError, StorageResult}, }; diff --git a/node/libs/storage/src/replica_state.rs b/node/libs/storage/src/replica_state.rs index e10dc547..8d4da417 100644 --- a/node/libs/storage/src/replica_state.rs +++ b/node/libs/storage/src/replica_state.rs @@ -1,7 +1,7 @@ //! `FallbackReplicaStateStore` type. use crate::{ - traits::{BlockStore, ReplicaStateStore}, + traits::{ReplicaStateStore, WriteBlockStore}, types::{ReplicaState, StorageResult}, }; use std::sync::Arc; @@ -22,35 +22,35 @@ impl From for ReplicaState { /// [`ReplicaStateStore`] wrapper that falls back to a specified block store. #[derive(Debug, Clone)] -pub struct FallbackReplicaStateStore { - base: Arc, - fallback: Arc, +pub struct ReplicaStore { + state: Arc, + blocks: Arc, } -impl FallbackReplicaStateStore { +impl ReplicaStore { /// Creates a store from a type implementing both replica state and block storage. pub fn from_store(store: Arc) -> Self where - S: ReplicaStateStore + BlockStore + 'static, + S: ReplicaStateStore + WriteBlockStore + 'static, { Self { - base: store.clone(), - fallback: store, + state: store.clone(), + blocks: store, } } /// Creates a new replica state store with a fallback. - pub fn new(base: Arc, fallback: Arc) -> Self { - Self { base, fallback } + pub fn new(state: Arc, blocks: Arc) -> Self { + Self { state, blocks } } /// Gets the replica state. If it's not present, falls back to recover it from the fallback block store. pub async fn replica_state(&self, ctx: &ctx::Ctx) -> StorageResult { - let replica_state = self.base.replica_state(ctx).await?; + let replica_state = self.state.replica_state(ctx).await?; if let Some(replica_state) = replica_state { Ok(replica_state) } else { - let head_block = self.fallback.head_block(ctx).await?; + let head_block = self.blocks.head_block(ctx).await?; Ok(ReplicaState::from(head_block.justification)) } } @@ -61,6 +61,15 @@ impl FallbackReplicaStateStore { ctx: &ctx::Ctx, replica_state: &ReplicaState, ) -> StorageResult<()> { - self.base.put_replica_state(ctx, replica_state).await + self.state.put_replica_state(ctx, replica_state).await + } + + /// Puts a block into this storage. + pub async fn put_block( + &self, + ctx: &ctx::Ctx, + block: &validator::FinalBlock, + ) -> StorageResult<()> { + self.blocks.put_block(ctx, block).await } } diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index b781a248..b133f949 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -11,10 +11,7 @@ use std::{ use tracing::metadata::LevelFilter; use tracing_subscriber::{prelude::*, Registry}; use vise_exporter::MetricsExporter; -use zksync_concurrency::{ - ctx::{self, channel}, - scope, time, -}; +use zksync_concurrency::{ctx, scope, time}; use zksync_consensus_executor::Executor; use zksync_consensus_storage::{BlockStore, RocksdbStorage}; use zksync_consensus_tools::{ConfigPaths, Configs}; @@ -112,14 +109,8 @@ async fn main() -> anyhow::Result<()> { let mut executor = Executor::new(configs.executor, configs.node_key, storage.clone()) .context("Executor::new()")?; if let Some((consensus_config, validator_key)) = configs.consensus { - let blocks_sender = channel::unbounded().0; // Just drop finalized blocks executor - .set_validator( - consensus_config, - validator_key, - storage.clone(), - blocks_sender, - ) + .set_validator(consensus_config, validator_key, storage.clone()) .context("Executor::set_validator()")?; }