diff --git a/.github/CONTRIBUTING.md b/.github/CONTRIBUTING.md index ad353249..80cf46cb 100644 --- a/.github/CONTRIBUTING.md +++ b/.github/CONTRIBUTING.md @@ -1,7 +1,13 @@ # Contribution Guidelines Hello! Thanks for your interest in joining the mission to accelerate the mass adoption of crypto for personal -sovereignty! We welcome contributions from anyone on the internet, and are grateful for even the smallest of fixes! +sovereignty! We welcome contributions from anyone on the internet. + +Note, however, that all the contributions are subject to review, and not every contribution is guaranteed to be merged. +It is highly advised to reach out to developers (for example, by creating an issue) before preparing a significant +change in the codebase, and explicitly confirm that this contribution will be considered for merge. Otherwise, it is +possible to discover that a feature you have spent some time on does not align with the core team vision or capacity to +maintain a high quality of given submission long term. ## Ways to contribute diff --git a/Dockerfile b/Dockerfile index 51d53010..39d3c454 100644 --- a/Dockerfile +++ b/Dockerfile @@ -17,13 +17,9 @@ COPY --from=builder /app/target/release/tester . FROM debian:stable-slim as executor-runtime COPY /node/tools/docker_binaries/executor /node/ -COPY /node/tools/k8s_configs/ /node/k8s_config -COPY /node/tools/docker-config/ /node/docker_config -COPY docker-entrypoint.sh /node/ COPY k8s_entrypoint.sh /node/ WORKDIR /node -RUN chmod +x docker-entrypoint.sh RUN chmod +x k8s_entrypoint.sh ENTRYPOINT ["./docker-entrypoint.sh"] diff --git a/Makefile b/Makefile index 6314df8f..10d52bdf 100644 --- a/Makefile +++ b/Makefile @@ -1,6 +1,5 @@ .PHONY: node nodes_config docker_nodes_config node_docker consensus_docker_example clean clean_docker addresses_file blank_configs -NODE?=0 -DOCKER_IP=172.12.0.10 +IP?=127.0.0.1:3054 EXECUTABLE_NODE_DIR=node/tools NODES=4 SEED_NODES=1 @@ -8,7 +7,7 @@ SEED_NODES=1 # Locally run commands node: - export RUST_LOG=INFO && cd ${EXECUTABLE_NODE_DIR}/nodes-config/node_${NODE} && cargo run -- --database ../../database/node_${NODE} + export RUST_LOG=INFO && cd ${EXECUTABLE_NODE_DIR}/nodes-config/${IP} && cargo run -- --database ../../database/node_${NODE} nodes_config: cd ${EXECUTABLE_NODE_DIR} && cargo run --bin localnet_config -- --input-addrs addresses.txt --output-dir nodes-config @@ -21,28 +20,12 @@ docker_build_executor: docker_node_image: docker build -t consensus-node --target=executor-runtime . -docker_nodes_config: - cd ${EXECUTABLE_NODE_DIR} && cargo run --bin localnet_config -- --input-addrs docker-config/addresses.txt --output-dir docker-config - -docker_node: - $(MAKE) docker_node_image - docker run -d --name consensus-node-${NODE} --env NODE_ID="consensus-node-${NODE}" consensus-node - -consensus_docker_example: - mkdir -p ${EXECUTABLE_NODE_DIR}/docker-config - cd ${EXECUTABLE_NODE_DIR}/docker-config && rm -rf addresses.txt && echo 172.12.0.10:3054 >> addresses.txt && echo 172.12.0.11:3054 >> addresses.txt - $(MAKE) docker_nodes_config - $(MAKE) docker_node_image - docker-compose up -d - -stop_docker_nodes: - docker stop consensus-node-1 consensus-node-2 +# Kubernetes commands start_k8s_nodes: - cd ${EXECUTABLE_NODE_DIR} && cargo run --bin deployer generate-config --nodes ${NODES} $(MAKE) docker_node_image minikube image load consensus-node:latest - cd ${EXECUTABLE_NODE_DIR} && cargo run --release --bin deployer deploy --nodes ${NODES} --seed-nodes ${SEED_NODES} + cd ${EXECUTABLE_NODE_DIR} && cargo run --release --bin deployer -- --nodes ${NODES} --seed-nodes ${SEED_NODES} install_chaos_mesh: curl -sSL https://mirrors.chaos-mesh.org/v2.6.3/install.sh | bash @@ -55,26 +38,11 @@ clean: clean_docker clean_k8s clean_k8s: rm -rf ${EXECUTABLE_NODE_DIR}/k8s_configs - kubectl delete deployments --all - kubectl delete pods --all + kubectl delete deployments --all -n consensus + kubectl delete pods --all -n consensus + kubectl delete services --all -n consensus + kubectl delete namespaces consensus clean_docker: - docker rm -f consensus-node-1 - docker rm -f consensus-node-2 - docker network rm -f node-net - docker image rm -f consensus-node + docker image rm -f consensus-node:latest docker image rm -f test-suite - -addresses_file: - mkdir -p ${EXECUTABLE_NODE_DIR}/docker-config - cd ${EXECUTABLE_NODE_DIR}/docker-config && \ - rm -rf addresses.txt && \ - touch addresses.txt && \ - for n in $$(seq 0 $$((${NODES} - 1))); do echo 0.0.0.$$n:3054 >> addresses.txt; done - -blank_configs: addresses_file docker_node_configs - for n in $$(seq 0 $$((${NODES} - 1))); do \ - jq '.publicAddr = "0.0.0.0:3054"' node/tools/docker-config/nodes-config/node_$$n/config.json | \ - jq '.gossipStaticOutbound = "[]"' > node/tools/docker-config/nodes-config/node_$$n/config.tmp && \ - mv -f node/tools/docker-config/nodes-config/node_$$n/config.tmp node/tools/docker-config/nodes-config/node_$$n/config.json; \ - done diff --git a/compose.yaml b/compose.yaml deleted file mode 100644 index 46edd5c3..00000000 --- a/compose.yaml +++ /dev/null @@ -1,34 +0,0 @@ -version: "3.9" - -services: - node-1: - build: - context: . - target: runtime - image: consensus-node - container_name: consensus-node-1 - ports: - - "3154:3154" - networks: - node_net: - # This allow us to know the ip of the node-1 container to fill the address in the config file - # Only for test purposes, may be removed in the future - ipv4_address: 172.12.0.10 - node-2: - image: consensus-node - container_name: consensus-node-2 - ports: - - "3155:3154" - networks: - node_net: - # This allow us to know the ip of the node-2 container to fill the address in the config file - # Only for test purposes, may be removed in the future - ipv4_address: 172.12.0.11 - -networks: - node_net: - name: node-net - ipam: - config: - - subnet: "172.12.0.0/24" - gateway: "172.12.0.1" diff --git a/k8s_entrypoint.sh b/k8s_entrypoint.sh index 8f6f498f..4e9857cd 100644 --- a/k8s_entrypoint.sh +++ b/k8s_entrypoint.sh @@ -1,6 +1,5 @@ #!/bin/bash # This file works as an entrypoint of the kubernetes cluster running the node binary copied inside of it. -cd k8s_config/${NODE_ID} export RUST_LOG=INFO -../../executor $@ +./executor $@ diff --git a/node/Cargo.lock b/node/Cargo.lock index 3be382ca..e6a52622 100644 --- a/node/Cargo.lock +++ b/node/Cargo.lock @@ -67,9 +67,9 @@ dependencies = [ [[package]] name = "aho-corasick" -version = "1.1.2" +version = "1.1.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b2969dcb958b36655471fc61f7e416fa76033bdd4bfed0678d8fee1e2d07a1f0" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" dependencies = [ "memchr", ] @@ -151,9 +151,9 @@ dependencies = [ [[package]] name = "anyhow" -version = "1.0.80" +version = "1.0.81" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "5ad32ce52e4161730f7098c077cd2ed6229b5804ccf99e5366be1ab72a98b4e1" +checksum = "0952808a6c2afd1aa8947271f3a60f1a6763c7b912d210184c5149b5cf147247" [[package]] name = "assert_matches" @@ -163,13 +163,13 @@ checksum = "9b34d609dfbaf33d6889b2b7106d3ca345eacad44200913df5ba02bfd31d2ba9" [[package]] name = "async-trait" -version = "0.1.77" +version = "0.1.78" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c980ee35e870bd1a4d2c8294d4c04d0499e67bca1e4b5cefcc693c2fa00caea9" +checksum = "461abc97219de0eaaf81fe3ef974a540158f3d079c2ab200f891f1a2ef201e85" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -249,7 +249,7 @@ dependencies = [ "regex", "rustc-hash", "shlex", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -266,9 +266,9 @@ checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" [[package]] name = "bitflags" -version = "2.4.2" +version = "2.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ed570934406eb16438a4e976b1b4500774099c13b8cb96eec99f620f05090ddf" +checksum = "cf4b9d6a944f767f8e5e0db018570623c85f3d925ac718db4e06d0187adb21c1" [[package]] name = "bitmaps" @@ -320,9 +320,9 @@ dependencies = [ [[package]] name = "bumpalo" -version = "3.15.3" +version = "3.15.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8ea184aa71bb362a1157c896979544cc23974e08fd265f29ea96b59f0b4a555b" +checksum = "7ff69b9dd49fd426c69a0db9fc04dd934cdb6645ff000864d98f7e2af8830eaa" [[package]] name = "byteorder" @@ -466,9 +466,9 @@ dependencies = [ [[package]] name = "clap" -version = "4.5.2" +version = "4.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b230ab84b0ffdf890d5a10abdbc8b83ae1c4918275daea1ab8801f71536b2651" +checksum = "949626d00e063efc93b6dca932419ceb5432f99769911c0b995f7e884c778813" dependencies = [ "clap_builder", "clap_derive", @@ -488,14 +488,14 @@ dependencies = [ [[package]] name = "clap_derive" -version = "4.5.0" +version = "4.5.3" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "307bc0538d5f0f83b8248db3087aa92fe504e4691294d0c96c0eabc33f47ba47" +checksum = "90239a040c80f5e14809ca132ddc4176ab33d5e17e49691793296e3fcb34d72f" dependencies = [ - "heck", + "heck 0.5.0", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -658,7 +658,7 @@ checksum = "f46882e17999c6cc590af592290432be3bce0428cb0d5f8b6715e4dc7b383eb3" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -682,7 +682,7 @@ dependencies = [ "proc-macro2", "quote", "strsim 0.10.0", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -693,7 +693,7 @@ checksum = "a668eda54683121533a393014d8692171709ff57a7d61f187b6e782719f8933f" dependencies = [ "darling_core", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -856,9 +856,9 @@ dependencies = [ [[package]] name = "fiat-crypto" -version = "0.2.6" +version = "0.2.7" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1676f435fc1dadde4d03e43f5d62b259e1ce5f40bd4ffb21db2b42ebe59c1382" +checksum = "c007b1ae3abe1cb6f85a16305acd418b7ca6343b953633fee2b76d8f108b830f" [[package]] name = "fixedbitset" @@ -943,7 +943,7 @@ checksum = "87750cf4b7a4c0625b1529e4c543c2182106e4dedc60a2a6455e00d212c489ac" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1021,9 +1021,9 @@ checksum = "d2fabcfbdc87f4758337ca535fb41a6d701b65693ce38287d856d1674551ec9b" [[package]] name = "h2" -version = "0.3.24" +version = "0.3.25" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bb2c4422095b67ee78da96fbb51a4cc413b3b25883c7717ff7ca1ab31022c9c9" +checksum = "4fbd2820c5e49886948654ab546d0688ff24530286bdcf8fca3cefb16d4618eb" dependencies = [ "bytes", "fnv", @@ -1064,6 +1064,12 @@ version = "0.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "95505c38b4572b2d910cecb0281560f54b440a19336cbbcb27bf6ce6adc6f5a8" +[[package]] +name = "heck" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" + [[package]] name = "hermit-abi" version = "0.3.9" @@ -1527,7 +1533,7 @@ dependencies = [ "proc-macro2", "quote", "serde_json", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1628,7 +1634,7 @@ checksum = "adf157a4dc5a29b7b464aa8fe7edeff30076e07e13646a1c3874f58477dc99f8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1673,7 +1679,7 @@ dependencies = [ "proc-macro2", "quote", "regex-syntax 0.6.29", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1730,7 +1736,7 @@ checksum = "49e7bc1560b95a3c4a25d03de42fe76ca718ab92d1a22a55b9b4cf67b3ae635c" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -1972,7 +1978,7 @@ dependencies = [ "pest_meta", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -2013,7 +2019,7 @@ checksum = "2f38a4412a78282e09a2cf38d195ea5420d15ba0602cb375210efbc877243965" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -2130,14 +2136,14 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "a41cf62165e97c7f814d2221421dbb9afcbcdb0a88068e5ea206e19951c2cbb5" dependencies = [ "proc-macro2", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] name = "proc-macro2" -version = "1.0.78" +version = "1.0.79" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e2422ad645d89c99f8f3e6b88a9fdeca7fabeac836b1002371c4367c8f984aae" +checksum = "e835ff2298f5721608eb1a980ecaee1aef2c132bf95ecc026a11b7bf3c01c02e" dependencies = [ "unicode-ident", ] @@ -2162,7 +2168,7 @@ checksum = "440f724eba9f6996b75d63681b0a92b06947f1457076d503a4d2e2c8f56442b8" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -2182,7 +2188,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c55e02e35260070b6f716a2423c2ff1c3bb1642ddca6f99e1f26d06268a0e2d2" dependencies = [ "bytes", - "heck", + "heck 0.4.1", "itertools 0.11.0", "log", "multimap", @@ -2192,7 +2198,7 @@ dependencies = [ "prost", "prost-types", "regex", - "syn 2.0.52", + "syn 2.0.53", "tempfile", "which", ] @@ -2207,7 +2213,7 @@ dependencies = [ "itertools 0.11.0", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -2483,11 +2489,11 @@ dependencies = [ [[package]] name = "rustix" -version = "0.38.31" +version = "0.38.32" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6ea3e1a662af26cd7a3ba09c0297a31af215563ecf42817c98df621387f4e949" +checksum = "65e04861e65f21776e67888bfbea442b3642beaa0138fdb1dd7a84a52dffdb89" dependencies = [ - "bitflags 2.4.2", + "bitflags 2.5.0", "errno", "libc", "linux-raw-sys", @@ -2667,7 +2673,7 @@ checksum = "7eb0b34b42edc17f6b7cac84a52a1c5f0e1bb2227e997ca9011ea3dd34e8610b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -2694,9 +2700,9 @@ dependencies = [ [[package]] name = "serde_yaml" -version = "0.9.32" +version = "0.9.33" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8fd075d994154d4a774f95b51fb96bdc2832b0ea48425c92546073816cda1f2f" +checksum = "a0623d197252096520c6f2a5e1171ee436e5af99a5d7caa2891e55e61950e6d9" dependencies = [ "indexmap", "itoa", @@ -2892,9 +2898,9 @@ dependencies = [ [[package]] name = "syn" -version = "2.0.52" +version = "2.0.53" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b699d15b36d1f02c3e7c69f8ffef53de37aefae075d8488d4ba1a7788d574a07" +checksum = "7383cd0e49fff4b6b90ca5670bfd3e9d6a733b3f90c686605aa7eec8c4996032" dependencies = [ "proc-macro2", "quote", @@ -2930,7 +2936,7 @@ checksum = "f9b53c7124dd88026d5d98a1eb1fd062a578b7d783017c9298825526c7fb6427" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -2951,22 +2957,22 @@ dependencies = [ [[package]] name = "thiserror" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1e45bcbe8ed29775f228095caf2cd67af7a4ccf756ebff23a306bf3e8b47b24b" +checksum = "03468839009160513471e86a034bb2c5c0e4baae3b43f79ffc55c4a5427b3297" dependencies = [ "thiserror-impl", ] [[package]] name = "thiserror-impl" -version = "1.0.57" +version = "1.0.58" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a953cb265bef375dae3de6663da4d3804eee9682ea80d8e2542529b73c531c81" +checksum = "c61f3ba182994efc43764a46c018c347bc492c79f024e705f46567b418f6d4f7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -3069,18 +3075,7 @@ checksum = "5b8a1e28f2deaa14e508979454cb3a223b10b938b45af148bc0986de36f1923b" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", -] - -[[package]] -name = "tokio-retry" -version = "0.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7f57eb36ecbe0fc510036adff84824dd3c24bb781e21bfa67b69d556aa85214f" -dependencies = [ - "pin-project", - "rand 0.8.5", - "tokio", + "syn 2.0.53", ] [[package]] @@ -3095,9 +3090,9 @@ dependencies = [ [[package]] name = "tokio-stream" -version = "0.1.14" +version = "0.1.15" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "397c988d37662c7dda6d2208364a706264bf3d6138b11d436cbac0ad38832842" +checksum = "267ac89e0bec6e691e5813911606935d77c476ff49024f98abcea3e7b15e37af" dependencies = [ "futures-core", "pin-project-lite", @@ -3144,7 +3139,7 @@ source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "61c5bb1d698276a2443e5ecfabc1008bf15a36c12e6a7176e7bf089ea9131140" dependencies = [ "base64 0.21.7", - "bitflags 2.4.2", + "bitflags 2.5.0", "bytes", "futures-core", "futures-util", @@ -3190,7 +3185,7 @@ checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -3298,9 +3293,9 @@ dependencies = [ [[package]] name = "unsafe-libyaml" -version = "0.2.10" +version = "0.2.11" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "ab4c90930b95a82d00dc9e9ac071b4991924390d46cbd0dfe566148667605e4b" +checksum = "673aac59facbab8a9007c7f6108d11f63b603f7cabff99fabf650fea5c32b861" [[package]] name = "untrusted" @@ -3375,7 +3370,7 @@ source = "git+https://github.com/matter-labs/vise.git?rev=1c9cc500e92cf9ea052b23 dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -3424,7 +3419,7 @@ dependencies = [ "once_cell", "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "wasm-bindgen-shared", ] @@ -3446,7 +3441,7 @@ checksum = "e94f17b526d0a461a191c78ea52bbce64071ed5c04c9ffe424dcb38f74171bb7" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", "wasm-bindgen-backend", "wasm-bindgen-shared", ] @@ -3674,7 +3669,7 @@ checksum = "9ce1b18ccd8e73a9321186f97e46f9f04b778851177567b1975109d26a08d2a6" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -3694,7 +3689,7 @@ checksum = "ce36e65b0d2999d2aafac989fb249189a141aee1f53c612c1f37d72631959f69" dependencies = [ "proc-macro2", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] @@ -3882,7 +3877,6 @@ dependencies = [ "serde_json", "tempfile", "tokio", - "tokio-retry", "tower", "tracing", "tracing-subscriber", @@ -3931,14 +3925,14 @@ name = "zksync_protobuf_build" version = "0.1.0" dependencies = [ "anyhow", - "heck", + "heck 0.4.1", "prettyplease", "proc-macro2", "prost-build", "prost-reflect", "protox", "quote", - "syn 2.0.52", + "syn 2.0.53", ] [[package]] diff --git a/node/Cargo.toml b/node/Cargo.toml index d08616dc..0d2e3bcd 100644 --- a/node/Cargo.toml +++ b/node/Cargo.toml @@ -81,7 +81,6 @@ test-casing = "0.1.0" thiserror = "1.0.40" time = "0.3.23" tokio = { version = "1.34.0", features = ["full"] } -tokio-retry = "0.3.0" tracing = { version = "0.1.37", features = ["attributes"] } tracing-subscriber = { version = "0.3.16", features = ["env-filter", "fmt"] } kube = { version = "0.88.1", features = ["runtime", "derive"] } diff --git a/node/actors/bft/src/leader/state_machine.rs b/node/actors/bft/src/leader/state_machine.rs index 459b1e15..8d44909a 100644 --- a/node/actors/bft/src/leader/state_machine.rs +++ b/node/actors/bft/src/leader/state_machine.rs @@ -167,9 +167,9 @@ impl StateMachine { // The previous block was finalized, so we can propose a new block. _ => { let fork = &cfg.genesis().fork; - let (parent, number) = match high_qc { - Some(qc) => (Some(qc.header().hash()), qc.header().number.next()), - None => (fork.first_parent, fork.first_block), + let number = match high_qc { + Some(qc) => qc.header().number.next(), + None => fork.first_block, }; // Defensively assume that PayloadManager cannot propose until the previous block is stored. if let Some(prev) = number.prev() { @@ -189,7 +189,6 @@ impl StateMachine { .observe(payload.0.len()); let proposal = validator::BlockHeader { number, - parent, payload: payload.hash(), }; (proposal, Some(payload)) diff --git a/node/actors/bft/src/leader/tests.rs b/node/actors/bft/src/leader/tests.rs index b74e4411..9974c5c8 100644 --- a/node/actors/bft/src/leader/tests.rs +++ b/node/actors/bft/src/leader/tests.rs @@ -37,13 +37,6 @@ async fn replica_prepare_sanity_yield_leader_prepare() { .unwrap() .unwrap(); assert_eq!(leader_prepare.msg.view(), &replica_prepare.view); - assert_eq!( - leader_prepare.msg.proposal.parent, - replica_prepare - .high_vote - .as_ref() - .map(|v| v.proposal.hash()), - ); assert_eq!( leader_prepare.msg.justification, util.new_prepare_qc(|msg| *msg = replica_prepare) diff --git a/node/actors/bft/src/replica/block.rs b/node/actors/bft/src/replica/block.rs index eb00cd9f..20ca6f3b 100644 --- a/node/actors/bft/src/replica/block.rs +++ b/node/actors/bft/src/replica/block.rs @@ -39,7 +39,7 @@ impl StateMachine { tracing::info!( "Finalized block {}: {:#?}", block.header().number, - block.header().hash(), + block.header().payload, ); self.config .block_store diff --git a/node/actors/bft/src/replica/leader_prepare.rs b/node/actors/bft/src/replica/leader_prepare.rs index 7e017bcb..daa57825 100644 --- a/node/actors/bft/src/replica/leader_prepare.rs +++ b/node/actors/bft/src/replica/leader_prepare.rs @@ -42,8 +42,9 @@ pub(crate) enum Error { /// Invalid message. #[error("invalid message: {0:#}")] InvalidMessage(#[source] validator::LeaderPrepareVerifyError), - /// Previous proposal was not finalized. - + /// Leader proposed a block that was already pruned from replica's storage. + #[error("leader proposed a block that was already pruned from replica's storage")] + ProposalAlreadyPruned, /// Oversized payload. #[error("block proposal with an oversized payload (payload size: {payload_size})")] ProposalOversizedPayload { @@ -110,6 +111,14 @@ impl StateMachine { }); } + // Replica MUSTN'T vote for blocks which have been already pruned for storage. + // (because it won't be able to persist and broadcast them once finalized). + // TODO(gprusak): it should never happen, we should add safety checks to prevent + // pruning blocks not known to be finalized. + if message.proposal.number < self.config.block_store.subscribe().borrow().first { + return Err(Error::ProposalAlreadyPruned); + } + // ----------- Checking the message -------------- signed_message.verify().map_err(Error::InvalidSignature)?; diff --git a/node/actors/bft/src/replica/tests.rs b/node/actors/bft/src/replica/tests.rs index 3866e813..c1b5be08 100644 --- a/node/actors/bft/src/replica/tests.rs +++ b/node/actors/bft/src/replica/tests.rs @@ -7,7 +7,7 @@ use assert_matches::assert_matches; use rand::Rng; use zksync_concurrency::{ctx, scope}; use zksync_consensus_roles::validator::{ - self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, ViewNumber, + self, CommitQC, Payload, PrepareQC, ReplicaCommit, ReplicaPrepare, }; /// Sanity check of the happy path. @@ -101,10 +101,6 @@ async fn leader_prepare_invalid_leader() { let (mut util, runner) = UTHarness::new(ctx, 2).await; s.spawn_bg(runner.run(ctx)); - let view = ViewNumber(2); - util.set_view(view); - assert_eq!(util.view_leader(view), util.keys[0].public()); - let replica_prepare = util.new_replica_prepare(); assert!(util .process_replica_prepare(ctx, util.sign(replica_prepare.clone())) @@ -167,6 +163,35 @@ async fn leader_prepare_old_view() { .unwrap(); } +#[tokio::test] +async fn leader_prepare_pruned_block() { + zksync_concurrency::testonly::abort_on_panic(); + let ctx = &ctx::test_root(&ctx::RealClock); + scope::run!(ctx, |ctx, s| async { + let (mut util, runner) = UTHarness::new(ctx, 1).await; + s.spawn_bg(runner.run(ctx)); + + let mut leader_prepare = util.new_leader_prepare(ctx).await; + // We assume default replica state and nontrivial `genesis.fork.first_block` here. + leader_prepare.proposal.number = util + .replica + .config + .block_store + .subscribe() + .borrow() + .first + .prev() + .unwrap(); + let res = util + .process_leader_prepare(ctx, util.sign(leader_prepare)) + .await; + assert_matches!(res, Err(leader_prepare::Error::ProposalAlreadyPruned)); + Ok(()) + }) + .await + .unwrap(); +} + /// Tests that `WriteBlockStore::verify_payload` is applied before signing a vote. #[tokio::test] async fn leader_prepare_invalid_payload() { @@ -338,33 +363,6 @@ async fn leader_prepare_proposal_when_previous_not_finalized() { .unwrap(); } -#[tokio::test] -async fn leader_prepare_bad_parent_hash() { - zksync_concurrency::testonly::abort_on_panic(); - let ctx = &ctx::test_root(&ctx::RealClock); - scope::run!(ctx, |ctx, s| async { - let (mut util, runner) = UTHarness::new(ctx, 1).await; - s.spawn_bg(runner.run(ctx)); - - tracing::info!("Produce initial block."); - util.produce_block(ctx).await; - tracing::info!("Make leader propose the next block."); - let mut leader_prepare = util.new_leader_prepare(ctx).await; - tracing::info!("Modify the proposal.parent so that it doesn't match the previous block"); - leader_prepare.proposal.parent = Some(ctx.rng().gen()); - let res = util.process_leader_prepare(ctx, util.sign(leader_prepare.clone())).await; - assert_matches!(res, Err(leader_prepare::Error::InvalidMessage( - validator::LeaderPrepareVerifyError::BadParentHash { got, want } - )) => { - assert_eq!(want, Some(leader_prepare.justification.high_qc().unwrap().message.proposal.hash())); - assert_eq!(got, leader_prepare.proposal.parent); - }); - Ok(()) - }) - .await - .unwrap(); -} - #[tokio::test] async fn leader_prepare_bad_block_number() { zksync_concurrency::testonly::abort_on_panic(); @@ -500,7 +498,7 @@ async fn leader_prepare_reproposal_invalid_block() { .unwrap(); } -/// Check that replica provides expecte high_vote and high_qc after finalizing a block. +/// Check that replica provides expected high_vote and high_qc after finalizing a block. #[tokio::test] async fn leader_commit_sanity_yield_replica_prepare() { zksync_concurrency::testonly::abort_on_panic(); diff --git a/node/actors/bft/src/testonly/fuzz.rs b/node/actors/bft/src/testonly/fuzz.rs index 077be553..fd0cdd4a 100644 --- a/node/actors/bft/src/testonly/fuzz.rs +++ b/node/actors/bft/src/testonly/fuzz.rs @@ -174,11 +174,9 @@ impl Fuzz for validator::Payload { impl Fuzz for validator::BlockHeader { fn mutate(&mut self, rng: &mut impl Rng) { - match rng.gen_range(0..3) { - 0 => self.parent = rng.gen(), - 1 => self.number = rng.gen(), - 2 => self.payload = rng.gen(), - _ => unreachable!(), + match rng.gen_range(0..2) { + 0 => self.number = rng.gen(), + _ => self.payload = rng.gen(), } } } diff --git a/node/actors/bft/src/testonly/ut_harness.rs b/node/actors/bft/src/testonly/ut_harness.rs index 8880788c..29590cce 100644 --- a/node/actors/bft/src/testonly/ut_harness.rs +++ b/node/actors/bft/src/testonly/ut_harness.rs @@ -137,19 +137,6 @@ impl UTHarness { self.replica.view = view; } - pub(crate) fn set_view(&mut self, view: ViewNumber) { - self.set_replica_view(view); - self.set_leader_view(view); - } - - pub(crate) fn set_leader_view(&mut self, view: ViewNumber) { - self.leader.view = view - } - - pub(crate) fn set_replica_view(&mut self, view: ViewNumber) { - self.replica.view = view - } - pub(crate) fn replica_view(&self) -> validator::View { validator::View { protocol_version: self.protocol_version(), diff --git a/node/actors/executor/src/tests.rs b/node/actors/executor/src/tests.rs index f95b6641..2b34e7a0 100644 --- a/node/actors/executor/src/tests.rs +++ b/node/actors/executor/src/tests.rs @@ -5,32 +5,48 @@ use zksync_consensus_bft as bft; use zksync_consensus_network::testonly::{new_configs, new_fullnode}; use zksync_consensus_roles::validator::{testonly::Setup, BlockNumber}; use zksync_consensus_storage::{ - testonly::{in_memory, new_store}, + testonly::{in_memory, new_store, new_store_with_first}, BlockStore, }; -fn make_executor(cfg: &network::Config, block_store: Arc) -> Executor { +fn config(cfg: &network::Config) -> Config { + Config { + server_addr: *cfg.server_addr, + public_addr: cfg.public_addr, + max_payload_size: usize::MAX, + node_key: cfg.gossip.key.clone(), + gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, + gossip_static_inbound: cfg.gossip.static_inbound.clone(), + gossip_static_outbound: cfg.gossip.static_outbound.clone(), + } +} + +fn validator( + cfg: &network::Config, + block_store: Arc, + replica_store: impl ReplicaStore, +) -> Executor { Executor { - config: Config { - server_addr: *cfg.server_addr, - public_addr: cfg.public_addr, - max_payload_size: usize::MAX, - node_key: cfg.gossip.key.clone(), - gossip_dynamic_inbound_limit: cfg.gossip.dynamic_inbound_limit, - gossip_static_inbound: cfg.gossip.static_inbound.clone(), - gossip_static_outbound: cfg.gossip.static_outbound.clone(), - }, + config: config(cfg), block_store, - validator: cfg.validator_key.as_ref().map(|key| Validator { - key: key.clone(), - replica_store: Box::new(in_memory::ReplicaStore::default()), + validator: Some(Validator { + key: cfg.validator_key.clone().unwrap(), + replica_store: Box::new(replica_store), payload_manager: Box::new(bft::testonly::RandomPayload(1000)), }), } } +fn fullnode(cfg: &network::Config, block_store: Arc) -> Executor { + Executor { + config: config(cfg), + block_store, + validator: None, + } +} + #[tokio::test] -async fn executing_single_validator() { +async fn test_single_validator() { abort_on_panic(); let ctx = &ctx::root(); let rng = &mut ctx.rng(); @@ -38,9 +54,10 @@ async fn executing_single_validator() { let setup = Setup::new(rng, 1); let cfgs = new_configs(rng, &setup, 0); scope::run!(ctx, |ctx, s| async { + let replica_store = in_memory::ReplicaStore::default(); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(make_executor(&cfgs[0], store.clone()).run(ctx)); + s.spawn_bg(validator(&cfgs[0], store.clone(), replica_store).run(ctx)); store.wait_until_persisted(ctx, BlockNumber(5)).await?; Ok(()) }) @@ -49,7 +66,7 @@ async fn executing_single_validator() { } #[tokio::test] -async fn executing_validator_and_full_node() { +async fn test_fullnode_syncing_from_validator() { abort_on_panic(); let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); let rng = &mut ctx.rng(); @@ -58,17 +75,73 @@ async fn executing_validator_and_full_node() { let cfgs = new_configs(rng, &setup, 0); scope::run!(ctx, |ctx, s| async { // Spawn validator. + let replica_store = in_memory::ReplicaStore::default(); let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(make_executor(&cfgs[0], store).run(ctx)); + s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx)); // Spawn full node. let (store, runner) = new_store(ctx, &setup.genesis).await; s.spawn_bg(runner.run(ctx)); - s.spawn_bg(make_executor(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx)); + s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), store.clone()).run(ctx)); // Wait for blocks in full node store. - store.wait_until_persisted(ctx, BlockNumber(5)).await?; + store + .wait_until_persisted(ctx, setup.genesis.fork.first_block + 5) + .await?; + Ok(()) + }) + .await + .unwrap(); +} + +/// Test in which validator is syncing missing blocks from a full node before producing blocks. +#[tokio::test] +async fn test_validator_syncing_from_fullnode() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(20.0)); + let rng = &mut ctx.rng(); + + let setup = Setup::new(rng, 1); + let cfgs = new_configs(rng, &setup, 0); + scope::run!(ctx, |ctx, s| async { + // Spawn full node. + let (node_store, runner) = new_store(ctx, &setup.genesis).await; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(fullnode(&new_fullnode(rng, &cfgs[0]), node_store.clone()).run(ctx)); + + // Run validator and produce some blocks. + // Wait for the blocks to be fetched by the full node. + let replica_store = in_memory::ReplicaStore::default(); + scope::run!(ctx, |ctx, s| async { + let (store, runner) = new_store(ctx, &setup.genesis).await; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(validator(&cfgs[0], store, replica_store.clone()).run(ctx)); + node_store + .wait_until_persisted(ctx, setup.genesis.fork.first_block + 4) + .await?; + Ok(()) + }) + .await + .unwrap(); + + // Restart the validator with empty store (but preserved replica state) and non-trivial + // `store.state.first`. + // Validator should fetch the past blocks from the full node before producing next blocks. + let last_block = node_store + .subscribe() + .borrow() + .last + .as_ref() + .unwrap() + .header() + .number; + let (store, runner) = + new_store_with_first(ctx, &setup.genesis, setup.genesis.fork.first_block + 2).await; + s.spawn_bg(runner.run(ctx)); + s.spawn_bg(validator(&cfgs[0], store, replica_store).run(ctx)); + node_store.wait_until_persisted(ctx, last_block + 3).await?; + Ok(()) }) .await diff --git a/node/actors/network/src/mux/reusable_stream.rs b/node/actors/network/src/mux/reusable_stream.rs index 882040db..381cf561 100644 --- a/node/actors/network/src/mux/reusable_stream.rs +++ b/node/actors/network/src/mux/reusable_stream.rs @@ -71,7 +71,7 @@ pub(crate) struct StreamQueue { } impl StreamQueue { - /// Constructs a new StreamQueue with the specificied number of reusable streams. + /// Constructs a new StreamQueue with the specified number of reusable streams. /// During multiplexer handshake, peers exchange information about /// how many reusable streams they support per capability. pub(crate) fn new(max_streams: u32) -> Arc { @@ -289,7 +289,7 @@ impl ReusableStream { read_receiver = new_read_receiver; let (write_lock, new_write_receiver) = sync::ExclusiveLock::new(write); write_receiver = new_write_receiver; - // Sending may fail because the requestor is not interested in the stream any more. + // Sending may fail because the requester is not interested in the stream any more. // In this case we just close the transient stream immediately. let _ = reservation.send(Stream { read: ReadStream(read_lock), diff --git a/node/actors/sync_blocks/src/peers/mod.rs b/node/actors/sync_blocks/src/peers/mod.rs index f57ea8bd..bc589ccb 100644 --- a/node/actors/sync_blocks/src/peers/mod.rs +++ b/node/actors/sync_blocks/src/peers/mod.rs @@ -63,17 +63,14 @@ impl PeerStates { /// Updates the known `BlockStore` state of the given peer. /// This information is used to decide from which peer to fetch - /// a given block from. + /// a given block. pub(crate) fn update( &self, peer: &node::PublicKey, state: BlockStoreState, ) -> anyhow::Result<()> { use std::collections::hash_map::Entry; - let Some(last) = &state.last else { - return Ok(()); - }; - last.verify(self.genesis()).context("state.last.verify()")?; + state.verify(self.genesis()).context("state.verify()")?; let mut peers = self.peers.lock().unwrap(); match peers.entry(peer.clone()) { Entry::Occupied(mut e) => e.get_mut().state = state.clone(), @@ -85,14 +82,16 @@ impl PeerStates { }); } } - self.highest_peer_block - .send_if_modified(|highest_peer_block| { - if *highest_peer_block >= last.header().number { - return false; - } - *highest_peer_block = last.header().number; - true - }); + if let Some(last) = &state.last { + self.highest_peer_block + .send_if_modified(|highest_peer_block| { + if *highest_peer_block >= last.header().number { + return false; + } + *highest_peer_block = last.header().number; + true + }); + } Ok(()) } diff --git a/node/actors/sync_blocks/src/peers/tests/mod.rs b/node/actors/sync_blocks/src/peers/tests/mod.rs index d6c9d66d..ee28e436 100644 --- a/node/actors/sync_blocks/src/peers/tests/mod.rs +++ b/node/actors/sync_blocks/src/peers/tests/mod.rs @@ -98,3 +98,62 @@ async fn test_peer_states(test: T) { .await .unwrap(); } + +#[tokio::test] +async fn test_try_acquire_peer_permit() { + let clock = ctx::ManualClock::new(); + let ctx = &ctx::test_root(&clock); + let rng = &mut ctx.rng(); + let mut setup = validator::testonly::Setup::new(rng, 1); + setup.push_blocks(rng, 10); + scope::run!(ctx, |ctx, s| async { + let (store, runner) = new_store(ctx, &setup.genesis).await; + s.spawn_bg(runner.run(ctx)); + let (send, _recv) = ctx::channel::unbounded(); + let peer_states = PeerStates::new(Config::default(), store, send); + + let peer: node::PublicKey = rng.gen(); + let b = &setup.blocks; + for s in [ + // Empty entry. + BlockStoreState { + first: b[0].number(), + last: None, + }, + // Entry with some blocks. + BlockStoreState { + first: b[0].number(), + last: Some(b[3].justification.clone()), + }, + // Entry with changed first. + BlockStoreState { + first: b[1].number(), + last: Some(b[3].justification.clone()), + }, + // Empty entry again. + BlockStoreState { + first: b[1].number(), + last: None, + }, + ] { + peer_states.update(&peer, s.clone()).unwrap(); + for block in b { + let got = peer_states + .try_acquire_peer_permit(block.number()) + .map(|p| p.0); + if s.first <= block.number() + && s.last + .as_ref() + .map_or(false, |last| block.number() <= last.header().number) + { + assert_eq!(Some(peer.clone()), got); + } else { + assert_eq!(None, got); + } + } + } + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/actors/sync_blocks/src/peers/tests/snapshots.rs b/node/actors/sync_blocks/src/peers/tests/snapshots.rs deleted file mode 100644 index bdb7116d..00000000 --- a/node/actors/sync_blocks/src/peers/tests/snapshots.rs +++ /dev/null @@ -1,10 +0,0 @@ -//! Tests related to snapshot storage. - -use super::*; -use crate::tests::{send_block, sync_state}; -use zksync_consensus_network::io::GetBlockError; - -#[tokio::test] -async fn backfilling_peer_history() { - test_peer_states(BackfillingPeerHistory).await; -} diff --git a/node/actors/sync_blocks/src/tests/end_to_end.rs b/node/actors/sync_blocks/src/tests/end_to_end.rs index 018b99ee..b078a804 100644 --- a/node/actors/sync_blocks/src/tests/end_to_end.rs +++ b/node/actors/sync_blocks/src/tests/end_to_end.rs @@ -13,7 +13,7 @@ use zksync_concurrency::{ testonly::{abort_on_panic, set_timeout}, }; use zksync_consensus_network as network; -use zksync_consensus_storage::testonly::new_store; +use zksync_consensus_storage::testonly::new_store_with_first; type NetworkDispatcherPipe = pipe::DispatcherPipe; @@ -27,7 +27,16 @@ struct Node { impl Node { async fn new(ctx: &ctx::Ctx, network: network::Config, setup: &Setup) -> (Self, NodeRunner) { - let (store, store_runner) = new_store(ctx, &setup.genesis).await; + Self::new_with_first(ctx, network, setup, setup.genesis.fork.first_block).await + } + + async fn new_with_first( + ctx: &ctx::Ctx, + network: network::Config, + setup: &Setup, + first: validator::BlockNumber, + ) -> (Self, NodeRunner) { + let (store, store_runner) = new_store_with_first(ctx, &setup.genesis, first).await; let (start_send, start_recv) = channel::bounded(1); let (terminate_send, terminate_recv) = channel::bounded(1); @@ -331,3 +340,54 @@ impl GossipNetworkTest for SwitchingOnNodes { async fn switching_on_nodes(node_count: usize) { test_sync_blocks(SwitchingOnNodes { node_count }).await; } + +/// Test checking that nodes with different first block can synchronize. +#[tokio::test(flavor = "multi_thread")] +async fn test_different_first_block() { + abort_on_panic(); + let ctx = &ctx::test_root(&ctx::AffineClock::new(25.)); + let rng = &mut ctx.rng(); + + let mut setup = validator::testonly::Setup::new(rng, 2); + let n = 4; + setup.push_blocks(rng, 10); + scope::run!(ctx, |ctx, s| async { + let mut nodes = vec![]; + // Spawn `n` nodes, all connected to each other. + for (i, net) in network::testonly::new_configs(rng, &setup, n) + .into_iter() + .enumerate() + { + // Choose the first block for the node at random. + let first = setup.blocks.choose(rng).unwrap().number(); + let (node, runner) = Node::new_with_first(ctx, net, &setup, first).await; + s.spawn_bg(runner.run(ctx).instrument(tracing::info_span!("node", i))); + node.start(); + nodes.push(node); + } + // Randomize the order of nodes. + nodes.shuffle(rng); + + for block in &setup.blocks { + // Find nodes interested in the next block. + let interested_nodes: Vec<_> = nodes + .iter() + .filter(|n| n.store.subscribe().borrow().first <= block.number()) + .collect(); + // Store this block to one of them. + if let Some(node) = interested_nodes.choose(rng) { + node.store.queue_block(ctx, block.clone()).await.unwrap(); + } + // Wait until all remaining nodes get the new block. + for node in interested_nodes { + node.store + .wait_until_persisted(ctx, block.number()) + .await + .unwrap(); + } + } + Ok(()) + }) + .await + .unwrap(); +} diff --git a/node/libs/concurrency/src/ctx/clock.rs b/node/libs/concurrency/src/ctx/clock.rs index 31396ca9..68138b34 100644 --- a/node/libs/concurrency/src/ctx/clock.rs +++ b/node/libs/concurrency/src/ctx/clock.rs @@ -27,7 +27,7 @@ use std::{ }; use tokio::sync::watch; -// Instant doesn't have a deterministic contructor. +// Instant doesn't have a deterministic constructor. // However since Instant is not convertible to an unix timestamp, // we can snapshot Instant::now() once and treat it as a constant. // All observable effects will be then deterministic. diff --git a/node/libs/concurrency/src/limiter/mod.rs b/node/libs/concurrency/src/limiter/mod.rs index 57dc4e33..9f609b26 100644 --- a/node/libs/concurrency/src/limiter/mod.rs +++ b/node/libs/concurrency/src/limiter/mod.rs @@ -1,6 +1,6 @@ //! Rate limiter which supports delayed permit consumption. use crate::{ctx, sync, time}; -use std::sync::Mutex; +use std::{fmt, sync::Mutex}; #[cfg(test)] mod tests; @@ -102,6 +102,12 @@ pub struct Limiter { acquire: sync::Mutex>, } +impl fmt::Debug for Limiter { + fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result { + f.debug_struct("Limiter").finish_non_exhaustive() + } +} + /// Permit reservation returned by `Limit::acquire()`. /// Represents a set of reserved permits. /// You need to drop it for the permits to get consumed @@ -151,7 +157,7 @@ impl Limiter { } /// Acquires reservation for `permits` permits from the rate limiter. - /// It blocks until enought permits are available. + /// It blocks until enough permits are available. /// It is fair in a sense that in case a later acquire() call is /// executed, but for a smaller number of permits, it has to wait /// until the previous call (for a larger number of permits) completes. diff --git a/node/libs/concurrency/src/limiter/tests.rs b/node/libs/concurrency/src/limiter/tests.rs index 978c81db..713fc107 100644 --- a/node/libs/concurrency/src/limiter/tests.rs +++ b/node/libs/concurrency/src/limiter/tests.rs @@ -45,7 +45,7 @@ async fn immediate_permit_consumption() { } #[tokio::test] -async fn inifinite_refresh_rate() { +async fn infinite_refresh_rate() { testonly::abort_on_panic(); let clock = &ctx::ManualClock::new(); let ctx = &ctx::test_root(clock); diff --git a/node/libs/concurrency/src/scope/task.rs b/node/libs/concurrency/src/scope/task.rs index 85958dba..35b45dc9 100644 --- a/node/libs/concurrency/src/scope/task.rs +++ b/node/libs/concurrency/src/scope/task.rs @@ -30,7 +30,7 @@ //! Task can be either async or blocking: //! * Async tasks are Futures executed via `Task::run`. They MUSN'T call blocking operations, //! because they are executed on a shared thread pool. -//! * Blocking tasks are `FnOnce()` functions/closures exeucted via `Task::run_blocking`. Blocking +//! * Blocking tasks are `FnOnce()` functions/closures executed via `Task::run_blocking`. Blocking //! task MUST be executed on a dedicated thread rather than a shared thread pool. //! * All functions which perform blocking calls should be documented as blocking. //! If a function has multiple versions and the async version is called ``, then the sync @@ -117,7 +117,7 @@ impl Task { } /// Runs an sync blocking task in the scope. MUST be executed on a dedicated thread. - /// See `Task::run` for behavior. See module docs for desciption of blocking tasks. + /// See `Task::run` for behavior. See module docs for description of blocking tasks. pub(super) fn run_blocking(self, f: impl FnOnce() -> Result) -> Result { let panic_reporter = PanicReporter::new(self); let res = f(); diff --git a/node/libs/crypto/src/bn254/testonly.rs b/node/libs/crypto/src/bn254/testonly.rs index 8ee10d18..711f3a14 100644 --- a/node/libs/crypto/src/bn254/testonly.rs +++ b/node/libs/crypto/src/bn254/testonly.rs @@ -1,7 +1,7 @@ -//! Random key generation, intended for use in testing +//! Random key generation, intended for use in testing. use super::{AggregateSignature, PublicKey, SecretKey, Signature}; -use pairing::bn256::{Fr, G1, G2}; +use pairing::bn256::{Fr, G1}; use rand::{distributions::Standard, prelude::Distribution, Rng, RngCore}; use rand04::Rand; @@ -17,7 +17,6 @@ impl rand04::Rng for RngWrapper { } } -/// Generates a random SecretKey. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { let scalar = Fr::rand(&mut RngWrapper(rng)); @@ -25,15 +24,12 @@ impl Distribution for Standard { } } -/// Generates a random PublicKey. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> PublicKey { - let p = G2::rand(&mut RngWrapper(rng)); - PublicKey(p) + rng.gen::().public() } } -/// Generates a random Signature. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Signature { let p = G1::rand(&mut RngWrapper(rng)); @@ -41,7 +37,6 @@ impl Distribution for Standard { } } -/// Generates a random AggregateSignature. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> AggregateSignature { let p = G1::rand(&mut RngWrapper(rng)); diff --git a/node/libs/crypto/src/ed25519/testonly.rs b/node/libs/crypto/src/ed25519/testonly.rs index ecca0148..6b8d800f 100644 --- a/node/libs/crypto/src/ed25519/testonly.rs +++ b/node/libs/crypto/src/ed25519/testonly.rs @@ -1,6 +1,6 @@ //! Random key generation, intended for use in testing -use super::{SecretKey, Signature}; +use super::{PublicKey, SecretKey, Signature}; use crate::ByteFmt; use ed25519_dalek as ed; use rand::{ @@ -8,7 +8,6 @@ use rand::{ Rng, }; -/// Generates a random SecretKey. This is meant for testing purposes. impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { let raw: [u8; ed::SECRET_KEY_LENGTH] = rng.gen(); @@ -16,7 +15,12 @@ impl Distribution for Standard { } } -/// Generates a random Signature. This is meant for testing purposes. +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> PublicKey { + rng.gen::().public() + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> Signature { let key = rng.gen::(); diff --git a/node/libs/protobuf/src/proto_fmt.rs b/node/libs/protobuf/src/proto_fmt.rs index 53238a64..2031cefc 100644 --- a/node/libs/protobuf/src/proto_fmt.rs +++ b/node/libs/protobuf/src/proto_fmt.rs @@ -197,7 +197,7 @@ pub(super) fn read_fields( } /// Converts an encoded protobuf message to its canonical form, given the descriptor of the message -/// type. Retuns an error if: +/// type. Returns an error if: /// * an unknown field is detected /// * the message type doesn't support canonical encoding (implicit presence, map fields) pub fn canonical_raw( diff --git a/node/libs/roles/src/node/testonly.rs b/node/libs/roles/src/node/testonly.rs index 1d712ebb..c8efd30f 100644 --- a/node/libs/roles/src/node/testonly.rs +++ b/node/libs/roles/src/node/testonly.rs @@ -1,4 +1,4 @@ -use super::{Msg, MsgHash, SecretKey, SessionId, Signature, Signed}; +use super::{Msg, MsgHash, PublicKey, SecretKey, SessionId, Signature, Signed}; use rand::{ distributions::{Distribution, Standard}, Rng, @@ -27,6 +27,12 @@ impl Distribution for Standard { } } +impl Distribution for Standard { + fn sample(&self, rng: &mut R) -> PublicKey { + PublicKey(rng.gen()) + } +} + impl Distribution for Standard { fn sample(&self, rng: &mut R) -> SecretKey { SecretKey(Arc::new(rng.gen())) diff --git a/node/libs/roles/src/proto/validator.proto b/node/libs/roles/src/proto/validator.proto index f1ec2438..2b1d6329 100644 --- a/node/libs/roles/src/proto/validator.proto +++ b/node/libs/roles/src/proto/validator.proto @@ -7,7 +7,6 @@ import "zksync/std.proto"; message Fork { optional uint64 number = 1; // required; ForkId optional uint64 first_block = 2; // required; BlockNumber - optional BlockHeaderHash first_parent = 3; // optional } message Genesis { @@ -23,17 +22,11 @@ message PayloadHash { optional bytes keccak256 = 1; // required } -message BlockHeaderHash { - optional bytes keccak256 = 1; // required -} - message BlockHeader { - // Hash of the parent Block. - optional BlockHeaderHash parent = 2; // optional // Sequential number of the block = parent.number + 1. - optional uint64 number = 3; // required + optional uint64 number = 1; // required // Hash of the block payload. - optional PayloadHash payload = 4; // required + optional PayloadHash payload = 2; // required } message FinalBlock { @@ -117,7 +110,7 @@ message NetAddress { // Currently the IP of the validator is static, but this scheme might also // be used to provide dynamic IP support (if validator discovers that its // own IP has changed - by pinging a trusted STUN server for example - it can - // broadcast a new discovery message), or (mutli)proxy support (a validator + // broadcast a new discovery message), or (multi)proxy support (a validator // may maintain a dynamic set of trusted proxy servers which forward traffic // to it - this way validator wouldn't have to have a public IP at all). optional uint64 version = 2; // required diff --git a/node/libs/roles/src/validator/conv.rs b/node/libs/roles/src/validator/conv.rs index 04886834..9e616320 100644 --- a/node/libs/roles/src/validator/conv.rs +++ b/node/libs/roles/src/validator/conv.rs @@ -1,7 +1,7 @@ use super::{ - AggregateSignature, BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, ConsensusMsg, - FinalBlock, Fork, ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, - NetAddress, Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, + AggregateSignature, BlockHeader, BlockNumber, CommitQC, ConsensusMsg, FinalBlock, Fork, + ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, + Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, Signature, Signed, Signers, ValidatorSet, View, ViewNumber, }; use crate::{node::SessionId, proto::validator as proto}; @@ -17,14 +17,12 @@ impl ProtoFmt for Fork { Ok(Self { number: ForkNumber(*required(&r.number).context("number")?), first_block: BlockNumber(*required(&r.first_block).context("first_block")?), - first_parent: read_optional(&r.first_parent).context("first_parent")?, }) } fn build(&self) -> Self::Proto { Self::Proto { number: Some(self.number.0), first_block: Some(self.first_block.0), - first_parent: self.first_parent.as_ref().map(|x| x.build()), } } } @@ -64,18 +62,6 @@ impl ProtoFmt for GenesisHash { } } -impl ProtoFmt for BlockHeaderHash { - type Proto = proto::BlockHeaderHash; - fn read(r: &Self::Proto) -> anyhow::Result { - Ok(Self(ByteFmt::decode(required(&r.keccak256)?)?)) - } - fn build(&self) -> Self::Proto { - Self::Proto { - keccak256: Some(self.0.encode()), - } - } -} - impl ProtoFmt for PayloadHash { type Proto = proto::PayloadHash; fn read(r: &Self::Proto) -> anyhow::Result { @@ -92,14 +78,12 @@ impl ProtoFmt for BlockHeader { type Proto = proto::BlockHeader; fn read(r: &Self::Proto) -> anyhow::Result { Ok(Self { - parent: read_optional(&r.parent).context("parent")?, number: BlockNumber(*required(&r.number).context("number")?), payload: read_required(&r.payload).context("payload")?, }) } fn build(&self) -> Self::Proto { Self::Proto { - parent: self.parent.as_ref().map(ProtoFmt::build), number: Some(self.number.0), payload: Some(self.payload.build()), } diff --git a/node/libs/roles/src/validator/messages/block.rs b/node/libs/roles/src/validator/messages/block.rs index 261035e5..43d49ac3 100644 --- a/node/libs/roles/src/validator/messages/block.rs +++ b/node/libs/roles/src/validator/messages/block.rs @@ -52,15 +52,13 @@ impl Payload { } /// Sequential number of the block. -/// Genesis block can have an arbitrary block number. -/// For blocks other than genesis: block.number = block.parent.number + 1. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct BlockNumber(pub u64); impl BlockNumber { /// Returns the next block number. pub fn next(self) -> Self { - Self(self.0 + 1) + Self(self.0.checked_add(1).unwrap()) } /// Returns the previous block number. @@ -69,77 +67,28 @@ impl BlockNumber { } } -impl fmt::Display for BlockNumber { - fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt::Display::fmt(&self.0, formatter) - } -} - -/// Hash of the block header. -#[derive(Clone, Copy, PartialEq, Eq, PartialOrd, Ord, Hash)] -pub struct BlockHeaderHash(pub(crate) Keccak256); - -impl BlockHeaderHash { - /// Interprets the specified `bytes` as a block header hash digest (i.e., a reverse operation to [`Self::as_bytes()`]). - /// It is caller's responsibility to ensure that `bytes` are actually a block header hash digest. - pub fn from_bytes(bytes: [u8; 32]) -> Self { - Self(Keccak256::from_bytes(bytes)) - } - - /// Returns a reference to the bytes of this hash. - pub fn as_bytes(&self) -> &[u8; 32] { - self.0.as_bytes() - } -} - -impl TextFmt for BlockHeaderHash { - fn decode(text: Text) -> anyhow::Result { - text.strip("block_header_hash:keccak256:")? - .decode_hex() - .map(Self) - } - - fn encode(&self) -> String { - format!( - "block_header_hash:keccak256:{}", - hex::encode(ByteFmt::encode(&self.0)) - ) +impl std::ops::Add for BlockNumber { + type Output = BlockNumber; + fn add(self, n: u64) -> Self { + Self(self.0.checked_add(n).unwrap()) } } -impl fmt::Debug for BlockHeaderHash { - fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result { - fmt.write_str(&TextFmt::encode(self)) +impl fmt::Display for BlockNumber { + fn fmt(&self, formatter: &mut fmt::Formatter<'_>) -> fmt::Result { + fmt::Display::fmt(&self.0, formatter) } } /// A block header. #[derive(Clone, Copy, Debug, PartialEq, Eq, PartialOrd, Ord, Hash)] pub struct BlockHeader { - /// Hash of the parent block. - pub parent: Option, /// Number of the block. pub number: BlockNumber, /// Payload of the block. pub payload: PayloadHash, } -impl BlockHeader { - /// Returns the hash of the block. - pub fn hash(&self) -> BlockHeaderHash { - BlockHeaderHash(Keccak256::new(&zksync_protobuf::canonical(self))) - } - - /// Creates a child block for the given parent. - pub fn next(parent: &BlockHeader, payload: PayloadHash) -> Self { - Self { - parent: Some(parent.hash()), - number: parent.number.next(), - payload, - } - } -} - /// A block that has been finalized by the consensus protocol. #[derive(Clone, Debug, PartialEq, Eq)] pub struct FinalBlock { @@ -194,16 +143,6 @@ impl ByteFmt for FinalBlock { } } -impl TextFmt for FinalBlock { - fn decode(text: Text) -> anyhow::Result { - text.strip("final_block:")?.decode_hex() - } - - fn encode(&self) -> String { - format!("final_block:{}", hex::encode(ByteFmt::encode(self))) - } -} - /// Errors that can occur validating a `FinalBlock` received from a node. #[derive(Debug, thiserror::Error)] #[non_exhaustive] diff --git a/node/libs/roles/src/validator/messages/consensus.rs b/node/libs/roles/src/validator/messages/consensus.rs index 870355b0..4261443e 100644 --- a/node/libs/roles/src/validator/messages/consensus.rs +++ b/node/libs/roles/src/validator/messages/consensus.rs @@ -1,7 +1,5 @@ //! Messages related to the consensus protocol. -use super::{ - BlockHeaderHash, BlockNumber, LeaderCommit, LeaderPrepare, Msg, ReplicaCommit, ReplicaPrepare, -}; +use super::{BlockNumber, LeaderCommit, LeaderPrepare, Msg, ReplicaCommit, ReplicaPrepare}; use crate::validator; use bit_vec::BitVec; use std::{ @@ -62,8 +60,6 @@ pub struct Fork { pub number: ForkNumber, /// First block of a fork. pub first_block: BlockNumber, - /// Parent fo the first block of a fork. - pub first_parent: Option, } impl Default for Fork { @@ -71,7 +67,6 @@ impl Default for Fork { Self { number: ForkNumber(0), first_block: BlockNumber(0), - first_parent: None, } } } diff --git a/node/libs/roles/src/validator/messages/leader_prepare.rs b/node/libs/roles/src/validator/messages/leader_prepare.rs index 15580bb3..37d07e14 100644 --- a/node/libs/roles/src/validator/messages/leader_prepare.rs +++ b/node/libs/roles/src/validator/messages/leader_prepare.rs @@ -1,5 +1,5 @@ use super::{ - BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, Genesis, Payload, ReplicaPrepare, + BlockHeader, BlockNumber, CommitQC, Genesis, Payload, ReplicaPrepare, ReplicaPrepareVerifyError, Signed, Signers, View, }; use crate::validator; @@ -177,14 +177,6 @@ pub enum LeaderPrepareVerifyError { /// Received proposal number. got: BlockNumber, }, - /// Bad parent hash. - #[error("bad parent hash: got {got:?}, want {want:?}")] - BadParentHash { - /// Correct parent hash. - want: Option, - /// Received parent hash. - got: Option, - }, /// New block proposal when the previous proposal was not finalized. #[error("new block proposal when the previous proposal was not finalized")] ProposalWhenPreviousNotFinalized, @@ -231,16 +223,10 @@ impl LeaderPrepare { { return Err(Error::ProposalWhenPreviousNotFinalized); } - let (want_parent, want_number) = match high_qc { - Some(qc) => (Some(qc.header().hash()), qc.header().number.next()), - None => (genesis.fork.first_parent, genesis.fork.first_block), + let want_number = match high_qc { + Some(qc) => qc.header().number.next(), + None => genesis.fork.first_block, }; - if self.proposal.parent != want_parent { - return Err(Error::BadParentHash { - got: self.proposal.parent, - want: want_parent, - }); - } if self.proposal.number != want_number { return Err(Error::BadBlockNumber { got: self.proposal.number, diff --git a/node/libs/roles/src/validator/messages/replica_commit.rs b/node/libs/roles/src/validator/messages/replica_commit.rs index 963f8f39..d08a7069 100644 --- a/node/libs/roles/src/validator/messages/replica_commit.rs +++ b/node/libs/roles/src/validator/messages/replica_commit.rs @@ -14,12 +14,6 @@ impl ReplicaCommit { pub fn verify(&self, genesis: &Genesis) -> anyhow::Result<()> { anyhow::ensure!(self.view.fork == genesis.fork.number); anyhow::ensure!(self.proposal.number >= genesis.fork.first_block); - if self.proposal.number == genesis.fork.first_block { - anyhow::ensure!( - self.proposal.parent == genesis.fork.first_parent, - "bad parent of the first block of the fork" - ); - } Ok(()) } } diff --git a/node/libs/roles/src/validator/testonly.rs b/node/libs/roles/src/validator/testonly.rs index 27290f29..4bc7db79 100644 --- a/node/libs/roles/src/validator/testonly.rs +++ b/node/libs/roles/src/validator/testonly.rs @@ -1,8 +1,8 @@ //! Test-only utilities. use super::{ - AggregateSignature, BlockHeader, BlockHeaderHash, BlockNumber, CommitQC, ConsensusMsg, - FinalBlock, Fork, ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, - NetAddress, Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, + AggregateSignature, BlockHeader, BlockNumber, CommitQC, ConsensusMsg, FinalBlock, Fork, + ForkNumber, Genesis, GenesisHash, LeaderCommit, LeaderPrepare, Msg, MsgHash, NetAddress, + Payload, PayloadHash, Phase, PrepareQC, ProtocolVersion, PublicKey, ReplicaCommit, ReplicaPrepare, SecretKey, Signature, Signed, Signers, ValidatorSet, View, ViewNumber, }; use bit_vec::BitVec; @@ -38,7 +38,6 @@ impl Setup { let fork = Fork { number: ForkNumber(rng.gen_range(0..100)), first_block: BlockNumber(rng.gen_range(0..100)), - first_parent: Some(rng.gen()), }; Self::new_with_fork(rng, validators, fork) } @@ -64,9 +63,11 @@ impl Setup { .unwrap_or(ViewNumber(0)), }; let proposal = match self.0.blocks.last() { - Some(b) => BlockHeader::next(b.header(), payload.hash()), + Some(b) => BlockHeader { + number: b.number().next(), + payload: payload.hash(), + }, None => BlockHeader { - parent: self.genesis.fork.first_parent, number: self.genesis.fork.first_block, payload: payload.hash(), }, @@ -181,7 +182,6 @@ impl Distribution for Standard { Fork { number: rng.gen(), first_block: rng.gen(), - first_parent: Some(rng.gen()), } } } @@ -201,16 +201,9 @@ impl Distribution for Standard { } } -impl Distribution for Standard { - fn sample(&self, rng: &mut R) -> BlockHeaderHash { - BlockHeaderHash(rng.gen()) - } -} - impl Distribution for Standard { fn sample(&self, rng: &mut R) -> BlockHeader { BlockHeader { - parent: rng.gen(), number: rng.gen(), payload: rng.gen(), } diff --git a/node/libs/roles/src/validator/tests.rs b/node/libs/roles/src/validator/tests.rs index 5f38b3d8..9affb1e0 100644 --- a/node/libs/roles/src/validator/tests.rs +++ b/node/libs/roles/src/validator/tests.rs @@ -72,17 +72,6 @@ fn test_text_encoding() { Text::new(&t).decode::().unwrap() ); - let block_header_hash: BlockHeaderHash = rng.gen(); - let t = TextFmt::encode(&block_header_hash); - assert_eq!( - block_header_hash, - Text::new(&t).decode::().unwrap() - ); - - let final_block: FinalBlock = rng.gen(); - let t = TextFmt::encode(&final_block); - assert_eq!(final_block, Text::new(&t).decode::().unwrap()); - let msg_hash: MsgHash = rng.gen(); let t = TextFmt::encode(&msg_hash); assert_eq!(msg_hash, Text::new(&t).decode::().unwrap()); @@ -98,7 +87,6 @@ fn test_schema_encoding() { let rng = &mut ctx.rng(); test_encode_random::(rng); test_encode_random::(rng); - test_encode_random::(rng); test_encode_random::(rng); test_encode_random::>(rng); test_encode_random::(rng); diff --git a/node/libs/storage/src/block_store/metrics.rs b/node/libs/storage/src/block_store/metrics.rs index 70447ad7..05b64133 100644 --- a/node/libs/storage/src/block_store/metrics.rs +++ b/node/libs/storage/src/block_store/metrics.rs @@ -7,9 +7,9 @@ pub(super) struct PersistentBlockStore { /// Latency of a successful `genesis()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) genesis_latency: vise::Histogram, - /// Latency of a successful `last()` call. + /// Latency of a successful `state()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] - pub(super) last_latency: vise::Histogram, + pub(super) state_latency: vise::Histogram, /// Latency of a successful `block()` call. #[metrics(unit = vise::Unit::Seconds, buckets = vise::Buckets::LATENCIES)] pub(super) block_latency: vise::Histogram, diff --git a/node/libs/storage/src/block_store/mod.rs b/node/libs/storage/src/block_store/mod.rs index 456bc2e1..e769aecf 100644 --- a/node/libs/storage/src/block_store/mod.rs +++ b/node/libs/storage/src/block_store/mod.rs @@ -10,7 +10,7 @@ mod metrics; #[derive(Debug, Clone, PartialEq, Eq)] pub struct BlockStoreState { /// Stored block with the lowest number. - /// Currently always same as `genesis.first_block`. + /// If last is `None`, this is the first block that should be fetched. pub first: validator::BlockNumber, /// Stored block with the highest number. /// None iff store is empty. @@ -32,21 +32,41 @@ impl BlockStoreState { None => self.first, } } + + /// Verifies `BlockStoreState'. + pub fn verify(&self, genesis: &validator::Genesis) -> anyhow::Result<()> { + anyhow::ensure!( + genesis.fork.first_block <= self.first, + "first block ({}) doesn't belong to the fork (which starts at block {})", + self.first, + genesis.fork.first_block + ); + if let Some(last) = &self.last { + anyhow::ensure!( + self.first <= last.header().number, + "first block {} has bigger number than the last block {}", + self.first, + last.header().number + ); + last.verify(genesis).context("last.verify()")?; + } + Ok(()) + } } /// Storage of a continuous range of L2 blocks. /// /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. #[async_trait::async_trait] -pub trait PersistentBlockStore: fmt::Debug + Send + Sync { +pub trait PersistentBlockStore: 'static + fmt::Debug + Send + Sync { /// Genesis matching the block store content. /// Consensus code calls this method only once. async fn genesis(&self, ctx: &ctx::Ctx) -> ctx::Result; - /// Last block available in storage. + /// Range of blocks available in storage. /// Consensus code calls this method only once and then tracks the /// range of available blocks internally. - async fn last(&self, ctx: &ctx::Ctx) -> ctx::Result>; + async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result; /// Gets a block by its number. /// Returns error if block is missing. @@ -115,11 +135,11 @@ impl BlockStoreRunner { tracing::info!( "stored block #{}: {:#?}", block.header().number, - block.header().hash() + block.header().payload ); self.0.inner.send_modify(|inner| { - debug_assert_eq!(inner.persisted_state.next(), block.header().number); + debug_assert_eq!(inner.persisted_state.next(), block.number()); inner.persisted_state.last = Some(block.justification.clone()); inner.queue.pop_front(); }); @@ -145,16 +165,10 @@ impl BlockStore { let t = metrics::PERSISTENT_BLOCK_STORE.genesis_latency.start(); let genesis = persistent.genesis(ctx).await.wrap("persistent.genesis()")?; t.observe(); - let t = metrics::PERSISTENT_BLOCK_STORE.last_latency.start(); - let last = persistent.last(ctx).await.wrap("persistent.last()")?; + let t = metrics::PERSISTENT_BLOCK_STORE.state_latency.start(); + let state = persistent.state(ctx).await.wrap("persistent.state()")?; t.observe(); - if let Some(last) = &last { - last.verify(&genesis).context("last.verify()")?; - } - let state = BlockStoreState { - first: genesis.fork.first_block, - last, - }; + state.verify(&genesis).context("state.verify()")?; let this = Arc::new(Self { inner: sync::watch::channel(Inner { queued_state: sync::watch::channel(state.clone()).0, @@ -165,12 +179,6 @@ impl BlockStore { genesis, persistent, }); - // Verify the first block. - if let Some(block) = this.block(ctx, this.genesis.fork.first_block).await? { - block - .verify(&this.genesis) - .with_context(|| format!("verify({:?})", this.genesis.fork.first_block))?; - } Ok((this.clone(), BlockStoreRunner(this))) } @@ -227,17 +235,6 @@ impl BlockStore { return Ok(()); } block.verify(&self.genesis).context("block.verify()")?; - // Verify parent hash, if previous block is available. - if let Some(last) = queued_state.last.as_ref() { - if Some(last.header().hash()) != block.header().parent { - return Err(anyhow::format_err!( - "block.parent = {:?}, want {:?}", - block.header().parent, - last.header().hash() - ) - .into()); - } - } } self.inner.send_if_modified(|inner| { let modified = inner.queued_state.send_if_modified(|queued_state| { @@ -258,6 +255,7 @@ impl BlockStore { } /// Waits until the given block is queued to be stored. + /// If `number < state.first` then it immetiately returns `Ok(())`. pub async fn wait_until_queued( &self, ctx: &ctx::Ctx, @@ -271,6 +269,7 @@ impl BlockStore { } /// Waits until the given block is stored persistently. + /// If `number < state.first` then it immetiately returns `Ok(())`. pub async fn wait_until_persisted( &self, ctx: &ctx::Ctx, diff --git a/node/libs/storage/src/replica_state.rs b/node/libs/storage/src/replica_state.rs deleted file mode 100644 index 634118dc..00000000 --- a/node/libs/storage/src/replica_state.rs +++ /dev/null @@ -1,3 +0,0 @@ -//! `FallbackReplicaStateStore` type. - - diff --git a/node/libs/storage/src/replica_store.rs b/node/libs/storage/src/replica_store.rs index 0eaa96fd..465d26d6 100644 --- a/node/libs/storage/src/replica_store.rs +++ b/node/libs/storage/src/replica_store.rs @@ -10,7 +10,7 @@ use zksync_protobuf::{read_optional, read_required, required, ProtoFmt}; /// /// Implementations **must** propagate context cancellation using [`StorageError::Canceled`]. #[async_trait::async_trait] -pub trait ReplicaStore: fmt::Debug + Send + Sync { +pub trait ReplicaStore: 'static + fmt::Debug + Send + Sync { /// Gets the replica state, if it is contained in the database. async fn state(&self, ctx: &ctx::Ctx) -> ctx::Result; diff --git a/node/libs/storage/src/testonly/in_memory.rs b/node/libs/storage/src/testonly/in_memory.rs index 209d046c..1fd7398c 100644 --- a/node/libs/storage/src/testonly/in_memory.rs +++ b/node/libs/storage/src/testonly/in_memory.rs @@ -1,5 +1,5 @@ //! In-memory storage implementation. -use crate::{PersistentBlockStore, ReplicaState}; +use crate::{BlockStoreState, PersistentBlockStore, ReplicaState}; use anyhow::Context as _; use std::{ collections::VecDeque, @@ -10,6 +10,7 @@ use zksync_consensus_roles::validator; #[derive(Debug)] struct BlockStoreInner { + first: validator::BlockNumber, genesis: validator::Genesis, blocks: Mutex>, } @@ -24,8 +25,10 @@ pub struct ReplicaStore(Arc>); impl BlockStore { /// New In-memory `BlockStore`. - pub fn new(genesis: validator::Genesis) -> Self { + pub fn new(genesis: validator::Genesis, first: validator::BlockNumber) -> Self { + assert!(genesis.fork.first_block <= first); Self(Arc::new(BlockStoreInner { + first, genesis, blocks: Mutex::default(), })) @@ -38,14 +41,17 @@ impl PersistentBlockStore for BlockStore { Ok(self.0.genesis.clone()) } - async fn last(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - Ok(self - .0 - .blocks - .lock() - .unwrap() - .back() - .map(|b| b.justification.clone())) + async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { + Ok(BlockStoreState { + first: self.0.first, + last: self + .0 + .blocks + .lock() + .unwrap() + .back() + .map(|b| b.justification.clone()), + }) } async fn block( @@ -69,11 +75,12 @@ impl PersistentBlockStore for BlockStore { ) -> ctx::Result<()> { let mut blocks = self.0.blocks.lock().unwrap(); let got = block.header().number; - if let Some(last) = blocks.back() { - let want = last.header().number.next(); - if got != want { - return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into()); - } + let want = match blocks.back() { + Some(last) => last.header().number.next(), + None => self.0.first, + }; + if got != want { + return Err(anyhow::anyhow!("got block {got:?}, while expected {want:?}").into()); } blocks.push_back(block.clone()); Ok(()) diff --git a/node/libs/storage/src/testonly/mod.rs b/node/libs/storage/src/testonly/mod.rs index 3a44800b..f0d66052 100644 --- a/node/libs/storage/src/testonly/mod.rs +++ b/node/libs/storage/src/testonly/mod.rs @@ -29,49 +29,63 @@ impl Distribution for Standard { } } -/// Constructs a new in-memory store with a genesis block. +/// Constructs a new in-memory store. pub async fn new_store( ctx: &ctx::Ctx, genesis: &validator::Genesis, ) -> (Arc, BlockStoreRunner) { - BlockStore::new(ctx, Box::new(in_memory::BlockStore::new(genesis.clone()))) - .await - .unwrap() + new_store_with_first(ctx, genesis, genesis.fork.first_block).await +} + +/// Constructs a new in-memory store with a custom expected first block +/// (i.e. possibly different than `genesis.fork.first_block`). +pub async fn new_store_with_first( + ctx: &ctx::Ctx, + genesis: &validator::Genesis, + first: validator::BlockNumber, +) -> (Arc, BlockStoreRunner) { + BlockStore::new( + ctx, + Box::new(in_memory::BlockStore::new(genesis.clone(), first)), + ) + .await + .unwrap() } /// Dumps all the blocks stored in `store`. pub async fn dump(ctx: &ctx::Ctx, store: &dyn PersistentBlockStore) -> Vec { let genesis = store.genesis(ctx).await.unwrap(); - let last = store.last(ctx).await.unwrap(); + let state = store.state(ctx).await.unwrap(); + assert!(genesis.fork.first_block <= state.first); let mut blocks = vec![]; - let begin = genesis.fork.first_block; - let end = last + let after = state + .last .as_ref() .map(|qc| qc.header().number.next()) - .unwrap_or(begin); - for n in (begin.0..end.0).map(validator::BlockNumber) { + .unwrap_or(state.first); + for n in (state.first.0..after.0).map(validator::BlockNumber) { let block = store.block(ctx, n).await.unwrap(); assert_eq!(block.header().number, n); blocks.push(block); } - assert!(store.block(ctx, end).await.is_err()); + if let Some(before) = state.first.prev() { + assert!(store.block(ctx, before).await.is_err()); + } + assert!(store.block(ctx, after).await.is_err()); blocks } /// Verifies storage content. pub async fn verify(ctx: &ctx::Ctx, store: &BlockStore) -> anyhow::Result<()> { let range = store.subscribe().borrow().clone(); - let mut parent: Option = None; for n in (range.first.0..range.next().0).map(validator::BlockNumber) { async { - let block = store.block(ctx, n).await?.context("missing")?; - block.verify(store.genesis())?; - // Ignore checking the first block parent - if parent.is_some() { - anyhow::ensure!(parent == block.header().parent); - } - parent = Some(block.header().hash()); - Ok(()) + store + .block(ctx, n) + .await? + .context("missing")? + .verify(store.genesis()) + .context("verify()") } .await .context(n)?; diff --git a/node/libs/storage/src/tests.rs b/node/libs/storage/src/tests.rs index 4a50c8a0..221be6f7 100644 --- a/node/libs/storage/src/tests.rs +++ b/node/libs/storage/src/tests.rs @@ -1,6 +1,6 @@ use super::*; -use crate::{testonly::new_store, ReplicaState}; -use zksync_concurrency::{ctx, scope, sync, testonly::abort_on_panic}; +use crate::{testonly::new_store_with_first, ReplicaState}; +use zksync_concurrency::{ctx, scope, testonly::abort_on_panic}; use zksync_consensus_roles::validator::testonly::Setup; #[tokio::test] @@ -10,7 +10,10 @@ async fn test_inmemory_block_store() { let mut setup = Setup::new(rng, 3); setup.push_blocks(rng, 5); - let store = &testonly::in_memory::BlockStore::new(setup.genesis.clone()); + let store = &testonly::in_memory::BlockStore::new( + setup.genesis.clone(), + setup.genesis.fork.first_block, + ); let mut want = vec![]; for block in &setup.blocks { store.store_next_block(ctx, block).await.unwrap(); @@ -32,26 +35,50 @@ async fn test_state_updates() { let ctx = &ctx::test_root(&ctx::RealClock); let rng = &mut ctx.rng(); let mut setup = Setup::new(rng, 1); - setup.push_blocks(rng, 1); - let (store, runner) = new_store(ctx, &setup.genesis).await; + setup.push_blocks(rng, 5); + // Create store with non-trivial first block. + let first_block = &setup.blocks[2]; + let (store, runner) = new_store_with_first(ctx, &setup.genesis, first_block.number()).await; scope::run!(ctx, |ctx, s| async { s.spawn_bg(runner.run(ctx)); let sub = &mut store.subscribe(); - let state = sub.borrow().clone(); - assert_eq!(state.first, setup.genesis.fork.first_block); - assert_eq!(state.last, None); + let want = BlockStoreState { + first: first_block.number(), + last: None, + }; - store - .queue_block(ctx, setup.blocks[0].clone()) - .await - .unwrap(); + // Waiting for blocks before genesis first block (or before `state.first_block`) should be ok + // and should complete immediately. + for n in [ + setup.genesis.fork.first_block.prev().unwrap(), + first_block.number().prev().unwrap(), + ] { + store.wait_until_queued(ctx, n).await.unwrap(); + store.wait_until_persisted(ctx, n).await.unwrap(); + assert_eq!(want, *sub.borrow()); + } - let state = sync::wait_for(ctx, sub, |state| { - state.last.as_ref() == Some(&setup.blocks[0].justification) - }) - .await? - .clone(); - assert_eq!(state.first, setup.blocks[0].header().number); + for block in &setup.blocks { + store.queue_block(ctx, block.clone()).await.unwrap(); + if block.number() < first_block.number() { + // Queueing block before first block should be a noop. + store.wait_until_queued(ctx, block.number()).await.unwrap(); + store + .wait_until_persisted(ctx, block.number()) + .await + .unwrap(); + assert_eq!(want, *sub.borrow()); + } else { + // Otherwise the state should be updated as soon as block is queued. + assert_eq!( + BlockStoreState { + first: first_block.number(), + last: Some(block.justification.clone()), + }, + *sub.borrow() + ); + } + } Ok(()) }) .await diff --git a/node/tools/Cargo.toml b/node/tools/Cargo.toml index 93e80fa4..3546e97c 100644 --- a/node/tools/Cargo.toml +++ b/node/tools/Cargo.toml @@ -26,7 +26,6 @@ rocksdb.workspace = true serde.workspace = true serde_json.workspace = true tokio.workspace = true -tokio-retry.workspace = true tracing.workspace = true tracing-subscriber.workspace = true vise-exporter.workspace = true diff --git a/node/tools/README.md b/node/tools/README.md index b6349665..6bf203a0 100644 --- a/node/tools/README.md +++ b/node/tools/README.md @@ -1,6 +1,6 @@ # Running Test Consensus Nodes -These instructions guide you through the process of setting up and running a test consensus node in both local and Dockerized environments. Additionally, examples are provided to demonstrate how to run two or more nodes, communicating and doing consensus between them. +These instructions guide you through the process of setting up and running a test consensus node in both local and Clusterized environments. ## Local Setup @@ -12,47 +12,13 @@ These instructions guide you through the process of setting up and running a tes make nodes_config ``` - This command creates a directory named `nodes-config` and generates a folder for each address listed in the `.txt` file with the name `node_{NODE_NUMBER}`, providing essential configuration files for the corresponding node. The `NODE_NUMBER` is simply a numerical identifier for each node. + This command creates a directory named `nodes-config` and generates a folder for each address listed in the `.txt` file with the ip as the directory name, providing essential configuration files for the corresponding node. ```bash - make node NODE= + make node IP= ``` - The default value for this command is set to `0` for launching the initial node, and you can increment the number for subsequent nodes. Note that running this command will take control of the terminal. - -## Dockerized Setup - -To launch a standalone consensus node in a Docker container, run the following command in the project root (era-consensus): - -```bash -make node_docker -``` - -This command creates a container running a single node that advances views and finalizes blocks. - -For a simple example with two nodes communicating in different containers, use: - -```bash -make consensus_docker_example -``` - -This sets up two containers, each hosting a consensus node, interlinked and progressing through views to finalize blocks, achieving consensus between them. - -To stop the node containers, use: - -```bash -make stop_docker_nodes -``` - -The node will resume the last viewed block from the previous session when initiated again. - -To clean all states after running these commands, use: - -```bash -make clean_docker -``` - -> This deletes the generated images and containers, requiring regeneration. + The default value for this command is set to `127.0.0.1:3054` for launching the initial node, to run a different node just use the IP previously detailed in the `addresses.txt` file. Note that running this command will take control of the terminal. ## Running in minikube @@ -97,7 +63,12 @@ To start the minikube dashboard in order to inspect the deployed pods. Remember Finally to clean everything up ```bash -minikube delete --all +make clean ``` -will remove all namespaces, deployments and pods from the minikube environment. \ No newline at end of file +This will remove all namespaces, deployments and pods from the minikube environment and the images generated in Docker. +If you want to stop the `minikube` environment just do: + +```bash +minikube delete --all +``` diff --git a/node/tools/src/bin/deployer.rs b/node/tools/src/bin/deployer.rs index f36d61c9..ebda24c2 100644 --- a/node/tools/src/bin/deployer.rs +++ b/node/tools/src/bin/deployer.rs @@ -14,14 +14,6 @@ use zksync_consensus_tools::{k8s, AppConfig, NodeAddr, NODES_PORT}; #[derive(Debug, Parser)] #[command(name = "deployer")] struct DeployerCLI { - /// Subcommand to run. - #[command(subcommand)] - command: DeployerCommands, -} - -/// Subcommand arguments. -#[derive(Debug, Parser)] -struct SubCommandArgs { /// Number of total nodes to deploy. #[arg(long)] nodes: usize, @@ -30,19 +22,11 @@ struct SubCommandArgs { seed_nodes: Option, } -/// Subcommands. -#[derive(Subcommand, Debug)] -enum DeployerCommands { - /// Generate configs for the nodes. - GenerateConfig(SubCommandArgs), - /// Deploy the nodes. - Deploy(SubCommandArgs), -} - -/// Generates config for the nodes to run in the kubernetes cluster -/// Creates a directory for each node in the parent k8s_configs directory. -fn generate_config(nodes: usize) -> anyhow::Result<()> { +/// Generates the configuration for all the nodes to run in the kubernetes cluster +/// and creates a ConsensusNode for each to track their progress +fn generate_consensus_nodes(nodes: usize, seed_nodes_amount: Option) -> Vec { assert!(nodes > 0, "at least 1 node has to be specified"); + let seed_nodes_amount = seed_nodes_amount.unwrap_or(1); // Generate the keys for all the replicas. let rng = &mut rand::thread_rng(); @@ -57,97 +41,82 @@ fn generate_config(nodes: usize) -> anyhow::Result<()> { let default_config = AppConfig::default_for(setup.genesis.clone()); - let mut cfgs: Vec<_> = (0..nodes).map(|_| default_config.clone()).collect(); + let mut cfgs: Vec = (0..nodes) + .map(|i| ConsensusNode { + id: format!("consensus-node-{i:0>2}"), + config: default_config.clone(), + key: node_keys[i].clone(), + validator_key: Some(validator_keys[i].clone()), + node_addr: None, //It's not assigned yet + is_seed: i < seed_nodes_amount, + }) + .collect(); // Construct a gossip network with optimal diameter. for (i, node) in node_keys.iter().enumerate() { for j in 0..peers { let next = (i * peers + j + 1) % nodes; - cfgs[next].add_gossip_static_inbound(node.public()); + cfgs[next].config.add_gossip_static_inbound(node.public()); } } - let manifest_path = std::env::var("CARGO_MANIFEST_DIR")?; - let root = PathBuf::from(manifest_path).join("k8s_configs"); - let _ = fs::remove_dir_all(&root); - for (i, cfg) in cfgs.into_iter().enumerate() { - let node_config_dir = root.join(format!("consensus-node-{i:0>2}")); - fs::create_dir_all(&node_config_dir) - .with_context(|| format!("create_dir_all({:?})", node_config_dir))?; - - cfg.write_to_file(&node_config_dir)?; - fs::write( - node_config_dir.join("validator_key"), - &TextFmt::encode(&validator_keys[i]), - ) - .context("fs::write()")?; - fs::write( - node_config_dir.join("node_key"), - &TextFmt::encode(&node_keys[i]), - ) - .context("fs::write()")?; - } - - Ok(()) + cfgs } /// Deploys the nodes to the kubernetes cluster. -async fn deploy(nodes: usize, seed_nodes: Option) -> anyhow::Result<()> { +async fn deploy(nodes_amount: usize, seed_nodes_amount: Option) -> anyhow::Result<()> { + let mut consensus_nodes = generate_consensus_nodes(nodes_amount, seed_nodes_amount); let client = k8s::get_client().await?; k8s::create_or_reuse_namespace(&client, k8s::DEFAULT_NAMESPACE).await?; - let seed_nodes = seed_nodes.unwrap_or(1); - - // deploy seed peer(s) - for i in 0..seed_nodes { - k8s::deploy_node( - &client, - i, - true, - vec![], // Seed peers don't have other peer information - k8s::DEFAULT_NAMESPACE, - ) - .await?; - } + let seed_nodes = &mut HashMap::new(); + let mut non_seed_nodes = HashMap::new(); - // obtain seed peer(s) IP(s) - let peer_ips = k8s::get_seed_node_addrs(&client, seed_nodes, k8s::DEFAULT_NAMESPACE).await?; + // Split the nodes in different hash maps as they will be deployed at different stages + for node in consensus_nodes.iter_mut() { + if node.is_seed { + seed_nodes.insert(node.id.to_owned(), node); + } else { + non_seed_nodes.insert(node.id.to_owned(), node); + } + } - let mut peers = vec![]; + // Deploy seed peer(s) + for node in seed_nodes.values_mut() { + node.deploy(&client, k8s::DEFAULT_NAMESPACE).await?; + } - for i in 0..seed_nodes { - let node_id = &format!("consensus-node-{i:0>2}"); - let node_key = read_node_key_from_config(node_id)?; - let address = peer_ips.get(node_id).context("IP address not found")?; - peers.push(NodeAddr { - key: node_key.public(), - addr: SocketAddr::from_str(&format!("{address}:{NODES_PORT}"))?, - }); + // Fetch and complete node addrs into seed nodes + for node in seed_nodes.values_mut() { + node.fetch_and_assign_pod_ip(&client, k8s::DEFAULT_NAMESPACE) + .await?; } - // deploy the rest of nodes - for i in seed_nodes..nodes { - k8s::deploy_node(&client, i, false, peers.clone(), k8s::DEFAULT_NAMESPACE).await?; + // Build a vector of (PublicKey, SocketAddr) to provide as gossip_static_outbound + // to the rest of the nodes + let peers: Vec<_> = seed_nodes + .values() + .map(|n| { + let node_addr = n + .node_addr + .as_ref() + .expect("Seed node address not defined") + .clone(); + (node_addr.key, node_addr.addr) + }) + .collect(); + + // Deploy the rest of the nodes + for node in non_seed_nodes.values_mut() { + node.config.gossip_static_outbound.extend(peers.clone()); + node.deploy(&client, k8s::DEFAULT_NAMESPACE).await?; } Ok(()) } -/// Obtain node key from config file. -fn read_node_key_from_config(node_id: &String) -> anyhow::Result { - let manifest_path = std::env::var("CARGO_MANIFEST_DIR")?; - let root = PathBuf::from(manifest_path).join("k8s_configs"); - let node_key_path = root.join(node_id).join("node_key"); - let key = fs::read_to_string(node_key_path).context("failed reading file")?; - Text::new(&key).decode().context("failed decoding key") -} - #[tokio::main] async fn main() -> anyhow::Result<()> { - let DeployerCLI { command } = DeployerCLI::parse(); - - match command { - DeployerCommands::GenerateConfig(args) => generate_config(args.nodes), - DeployerCommands::Deploy(args) => deploy(args.nodes, args.seed_nodes).await, - } + let args = DeployerCLI::parse(); + deploy(args.nodes, args.seed_nodes).await } diff --git a/node/tools/src/config.rs b/node/tools/src/config.rs index 43494057..ffb8410c 100644 --- a/node/tools/src/config.rs +++ b/node/tools/src/config.rs @@ -2,13 +2,12 @@ use crate::{proto, store}; use anyhow::Context as _; use serde_json::{ser::Formatter, Serializer}; -use std::net::Ipv4Addr; -use std::str::FromStr; use std::{ collections::{HashMap, HashSet}, fs, - net::SocketAddr, + net::{Ipv4Addr, SocketAddr}, path::{Path, PathBuf}, + str::FromStr, }; use zksync_concurrency::ctx; use zksync_consensus_bft as bft; @@ -151,19 +150,35 @@ impl ProtoFmt for AppConfig { } } -/// This struct holds the file path to each of the config files. +/// Configuration information. #[derive(Debug)] -pub struct ConfigPaths<'a> { - /// Path to a JSON file with node configuration. - pub app: &'a Path, - /// Path to a validator key file. - pub validator_key: Option<&'a Path>, - /// Path to a node key file. - pub node_key: &'a Path, +pub struct ConfigArgs<'a> { + /// Node configuration. + pub config_args: ConfigSource<'a>, /// Path to the rocksdb database. pub database: &'a Path, } +#[derive(Debug)] +pub enum ConfigSource<'a> { + CliConfig { + /// Node configuration from command line. + config: AppConfig, + /// Node key as a string. + node_key: node::SecretKey, + /// Validator key as a string. + validator_key: Option, + }, + PathConfig { + /// Path to a JSON file with node configuration. + config_file: &'a Path, + /// Path to a validator key file. + validator_key_file: &'a Path, + /// Path to a node key file. + node_key_file: &'a Path, + }, +} + pub struct Configs { pub app: AppConfig, pub validator_key: Option, @@ -171,37 +186,47 @@ pub struct Configs { pub database: PathBuf, } -impl<'a> ConfigPaths<'a> { +impl<'a> ConfigArgs<'a> { // Loads configs from the file system. pub fn load(self) -> anyhow::Result { - Ok(Configs { - app: (|| { - let app = fs::read_to_string(self.app).context("failed reading file")?; - decode_json::>(&app).context("failed decoding JSON") - })() - .with_context(|| self.app.display().to_string())? - .0, - - validator_key: self - .validator_key - .as_ref() - .map(|file| { - (|| { - let key = fs::read_to_string(file).context("failed reading file")?; - Text::new(&key).decode().context("failed decoding key") - })() - .with_context(|| file.display().to_string()) - }) - .transpose()?, - - node_key: (|| { - let key = fs::read_to_string(self.node_key).context("failed reading file")?; - Text::new(&key).decode().context("failed decoding key") - })() - .with_context(|| self.node_key.display().to_string())?, - - database: self.database.into(), - }) + match self.config_args { + ConfigSource::CliConfig { + config, + node_key, + validator_key, + } => Ok(Configs { + app: config.clone(), + validator_key: validator_key.clone(), + node_key: node_key.clone(), + database: self.database.into(), + }), + ConfigSource::PathConfig { + config_file, + validator_key_file, + node_key_file, + } => Ok(Configs { + app: (|| { + let app = fs::read_to_string(config_file).context("failed reading file")?; + decode_json::>(&app).context("failed decoding JSON") + })() + .with_context(|| config_file.display().to_string())? + .0, + + validator_key: fs::read_to_string(validator_key_file) + .ok() + .map(|value| Text::new(&value).decode().context("failed decoding key")) + .transpose() + .with_context(|| validator_key_file.display().to_string())?, + + node_key: (|| { + let key = fs::read_to_string(node_key_file).context("failed reading file")?; + Text::new(&key).decode().context("failed decoding key") + })() + .with_context(|| node_key_file.display().to_string())?, + + database: self.database.into(), + }), + } } } diff --git a/node/tools/src/k8s/operations.rs b/node/tools/src/k8s/operations.rs index 5e13ef21..c148538a 100644 --- a/node/tools/src/k8s/operations.rs +++ b/node/tools/src/k8s/operations.rs @@ -1,12 +1,12 @@ +use crate::config::AppConfig; use crate::{config, NodeAddr}; -use anyhow::{anyhow, Context}; +use anyhow::Context; use k8s_openapi::{ api::{ apps::v1::{Deployment, DeploymentSpec}, core::v1::{ Container, ContainerPort, EnvVar, EnvVarSource, HTTPGetAction, Namespace, - ObjectFieldSelector, Pod, PodSpec, PodTemplateSpec, Probe, Service, ServicePort, - ServiceSpec, + ObjectFieldSelector, Pod, PodSpec, PodTemplateSpec, Probe, }, rbac::v1::{PolicyRule, Role, RoleBinding, RoleRef, Subject}, }, @@ -14,16 +14,14 @@ use k8s_openapi::{ }; use kube::{ api::{ListParams, PostParams}, - core::{ObjectList, ObjectMeta}, - Api, Client, ResourceExt, + core::ObjectMeta, + Api, Client, }; -use std::{ - collections::{BTreeMap, HashMap}, - net::SocketAddr, -}; -use tokio_retry::strategy::FixedInterval; -use tokio_retry::Retry; +use std::{collections::BTreeMap, net::SocketAddr, time::Duration}; +use tokio::time; use tracing::log::info; +use zksync_consensus_crypto::TextFmt; +use zksync_consensus_roles::{node, validator}; use zksync_protobuf::serde::Serde; /// Docker image name for consensus nodes. @@ -31,6 +29,158 @@ const DOCKER_IMAGE_NAME: &str = "consensus-node"; /// K8s namespace for consensus nodes. pub const DEFAULT_NAMESPACE: &str = "consensus"; +/// Consensus Node Representation +#[derive(Debug)] +pub struct ConsensusNode { + /// Node identifier + pub id: String, + /// Node configuration + pub config: AppConfig, + /// Node key + pub key: node::SecretKey, + /// Node key + pub validator_key: Option, + /// Full NodeAddr + pub node_addr: Option, + /// Is seed node (meaning it has no gossipStaticOutbound configuration) + pub is_seed: bool, +} + +impl ConsensusNode { + /// Wait for a deployed consensus node to be ready and have an IP address + pub async fn await_running_pod( + &mut self, + client: &Client, + namespace: &str, + ) -> anyhow::Result { + let pods: Api = Api::namespaced(client.clone(), namespace); + // Wait until the pod is running, otherwise we get an error. + retry(15, Duration::from_millis(1000), || async { + get_running_pod(&pods, &self.id).await + }) + .await + } + + /// Fetchs the pod's IP address and assignts to self.node_addr + pub async fn fetch_and_assign_pod_ip( + &mut self, + client: &Client, + namespace: &str, + ) -> anyhow::Result<()> { + let ip = self + .await_running_pod(client, namespace) + .await? + .status + .context("Status not present")? + .pod_ip + .context("Pod IP address not present")?; + self.node_addr = Some(NodeAddr { + key: self.key.public(), + addr: SocketAddr::new(ip.parse()?, config::NODES_PORT), + }); + Ok(()) + } + + /// Creates a deployment + pub async fn deploy(&self, client: &Client, namespace: &str) -> anyhow::Result<()> { + let cli_args = get_cli_args(self); + let deployment = Deployment { + metadata: ObjectMeta { + name: Some(self.id.to_owned()), + namespace: Some(namespace.to_owned()), + ..Default::default() + }, + spec: Some(DeploymentSpec { + selector: LabelSelector { + match_labels: Some(BTreeMap::from([("app".to_owned(), self.id.to_owned())])), + ..Default::default() + }, + replicas: Some(1), + template: PodTemplateSpec { + metadata: Some(ObjectMeta { + labels: Some(BTreeMap::from([ + ("app".to_owned(), self.id.to_owned()), + ("id".to_owned(), self.id.to_owned()), + ("seed".to_owned(), self.is_seed.to_string()), + ])), + ..Default::default() + }), + spec: Some(PodSpec { + containers: vec![Container { + name: self.id.to_owned(), + image: Some(DOCKER_IMAGE_NAME.to_owned()), + env: Some(vec![ + EnvVar { + name: "NODE_ID".to_owned(), + value: Some(self.id.to_owned()), + ..Default::default() + }, + EnvVar { + name: "PUBLIC_ADDR".to_owned(), + value_from: Some(EnvVarSource { + field_ref: Some(ObjectFieldSelector { + field_path: "status.podIP".to_owned(), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }, + ]), + command: Some(vec!["./k8s_entrypoint.sh".to_owned()]), + args: Some(cli_args), + image_pull_policy: Some("Never".to_owned()), + ports: Some(vec![ + ContainerPort { + container_port: i32::from(config::NODES_PORT), + ..Default::default() + }, + ContainerPort { + container_port: 3154, + ..Default::default() + }, + ]), + liveness_probe: Some(Probe { + http_get: Some(HTTPGetAction { + path: Some("/health".to_owned()), + port: Int(3154), + ..Default::default() + }), + ..Default::default() + }), + readiness_probe: Some(Probe { + http_get: Some(HTTPGetAction { + path: Some("/health".to_owned()), + port: Int(3154), + ..Default::default() + }), + ..Default::default() + }), + ..Default::default() + }], + ..Default::default() + }), + }, + ..Default::default() + }), + ..Default::default() + }; + + let deployments: Api = Api::namespaced(client.clone(), namespace); + let post_params = PostParams::default(); + let result = deployments.create(&post_params, &deployment).await?; + + info!( + "Deployment: {} , created", + result + .metadata + .name + .context("Name not defined in metadata")? + ); + Ok(()) + } +} + /// Get a kube client pub async fn get_client() -> anyhow::Result { Ok(Client::try_default().await?) @@ -108,42 +258,6 @@ pub async fn get_consensus_nodes_rpc_address(client: &Client) -> anyhow::Result< Ok(nodes_rpc_address) } -/// Returns a HashMap with mapping: node_id -> IP address -pub async fn get_seed_node_addrs( - client: &Client, - amount: usize, - namespace: &str, -) -> anyhow::Result> { - let mut seed_nodes = HashMap::new(); - let pods: Api = Api::namespaced(client.clone(), namespace); - - // Will retry 15 times during 15 seconds to allow pods to start and obtain an IP - let retry_strategy = FixedInterval::from_millis(1000).take(15); - let pod_list = Retry::spawn(retry_strategy, || get_seed_pods(&pods, amount)).await?; - - for p in pod_list { - let node_id = p.labels()["id"].to_owned(); - seed_nodes.insert( - node_id, - p.status - .context("Status not present")? - .pod_ip - .context("Pod IP address not present")?, - ); - } - Ok(seed_nodes) -} - -async fn get_seed_pods(pods: &Api, amount: usize) -> anyhow::Result> { - let lp = ListParams::default().labels("seed=true"); - let p = pods.list(&lp).await?; - if p.items.len() == amount && p.iter().all(is_pod_running) { - Ok(p) - } else { - Err(anyhow!("Pods are not ready")) - } -} - fn is_pod_running(pod: &Pod) -> bool { if let Some(status) = &pod.status { if let Some(phase) = &status.phase { @@ -153,213 +267,16 @@ fn is_pod_running(pod: &Pod) -> bool { false } -fn get_cli_args(peers: Vec) -> Vec { - if peers.is_empty() { - [].to_vec() - } else { - [ - "--add-gossip-static-outbound".to_string(), - config::encode_with_serializer( - &peers - .iter() - .map(|e| Serde(e.clone())) - .collect::>>(), - serde_json::Serializer::new(vec![]), - ), - ] - .to_vec() - } -} - -/// Creates a deployment -pub async fn deploy_node( - client: &Client, - node_index: usize, - is_seed: bool, - peers: Vec, - namespace: &str, -) -> anyhow::Result<()> { - let cli_args = get_cli_args(peers); - let node_name = format!("consensus-node-{node_index:0>2}"); - let deployment = Deployment { - metadata: ObjectMeta { - name: Some(node_name.to_owned()), - namespace: Some(namespace.to_owned()), - ..Default::default() - }, - spec: Some(DeploymentSpec { - selector: LabelSelector { - match_labels: Some(BTreeMap::from([("app".to_owned(), node_name.to_owned())])), - ..Default::default() - }, - replicas: Some(1), - template: PodTemplateSpec { - metadata: Some(ObjectMeta { - labels: Some(BTreeMap::from([ - ("app".to_owned(), node_name.to_owned()), - ("id".to_owned(), node_name.to_owned()), - ("seed".to_owned(), is_seed.to_string()), - ])), - ..Default::default() - }), - spec: Some(PodSpec { - containers: vec![Container { - name: node_name.to_owned(), - image: Some("consensus-node".to_owned()), - env: Some(vec![ - EnvVar { - name: "NODE_ID".to_owned(), - value: Some(node_name.to_owned()), - ..Default::default() - }, - EnvVar { - name: "PUBLIC_ADDR".to_owned(), - value_from: Some(EnvVarSource { - field_ref: Some(ObjectFieldSelector { - field_path: "status.podIP".to_owned(), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - }, - ]), - command: Some(vec!["./k8s_entrypoint.sh".to_owned()]), - args: Some(cli_args), - image_pull_policy: Some("Never".to_owned()), - ports: Some(vec![ - ContainerPort { - container_port: i32::from(config::NODES_PORT), - ..Default::default() - }, - ContainerPort { - container_port: 3154, - ..Default::default() - }, - ]), - liveness_probe: Some(Probe { - http_get: Some(HTTPGetAction { - path: Some("/health".to_owned()), - port: Int(3154), - ..Default::default() - }), - ..Default::default() - }), - readiness_probe: Some(Probe { - http_get: Some(HTTPGetAction { - path: Some("/health".to_owned()), - port: Int(3154), - ..Default::default() - }), - ..Default::default() - }), - ..Default::default() - }], - ..Default::default() - }), - }, - ..Default::default() - }), - ..Default::default() - }; - - let deployments: Api = Api::namespaced(client.clone(), namespace); - let post_params = PostParams::default(); - let result = deployments.create(&post_params, &deployment).await?; - - info!( - "Deployment: {} , created", - result - .metadata - .name - .context("Name not defined in metadata")? - ); - Ok(()) -} - -pub async fn create_tests_deployment(client: &Client) -> anyhow::Result<()> { - let deployment: Deployment = Deployment { - metadata: ObjectMeta { - name: Some("tests-deployment".to_string()), - namespace: Some(DEFAULT_NAMESPACE.to_string()), - labels: Some([("app".to_string(), "test-node".to_string())].into()), - ..Default::default() - }, - spec: Some(DeploymentSpec { - selector: LabelSelector { - match_labels: Some([("app".to_string(), "test-node".to_string())].into()), - ..Default::default() - }, - replicas: Some(1), - template: PodTemplateSpec { - metadata: Some(ObjectMeta { - labels: Some([("app".to_string(), "test-node".to_string())].into()), - ..Default::default() - }), - spec: Some(PodSpec { - containers: vec![Container { - name: "test-suite".to_string(), - image: Some("test-suite:latest".to_string()), - image_pull_policy: Some("Never".to_string()), - command: Some(vec!["./tester_entrypoint.sh".to_string()]), - ..Default::default() - }], - ..Default::default() - }), - }, - ..Default::default() - }), - ..Default::default() - }; - - let deployments: Api = Api::namespaced(client.to_owned(), DEFAULT_NAMESPACE); - let post_params = PostParams::default(); - let result = deployments.create(&post_params, &deployment).await?; - - info!( - "Deployment: {} , created", - result - .metadata - .name - .context("Name not defined in metadata")? - ); - Ok(()) -} - -pub async fn expose_tester_rpc(client: &Client) -> anyhow::Result<()> { - let load_balancer_sevice = Service { - metadata: ObjectMeta { - name: Some("tester-rpc".to_string()), - namespace: Some(DEFAULT_NAMESPACE.to_string()), - ..Default::default() - }, - spec: Some(ServiceSpec { - type_: Some("LoadBalancer".to_owned()), - selector: Some([("app".to_string(), "test-node".to_string())].into()), - ports: vec![ServicePort { - port: 3030, - target_port: Some(Int(3030)), - ..Default::default() - }] - .into(), - ..Default::default() - }), - ..Default::default() - }; - - let services: Api = Api::namespaced(client.clone(), DEFAULT_NAMESPACE); - let post_params = PostParams::default(); - let result = services.create(&post_params, &load_balancer_sevice).await?; - - info!( - "Service: {} , created", - result - .metadata - .name - .context("Name not defined in metadata")? - ); - - Ok(()) +async fn get_running_pod(pods: &Api, label: &str) -> anyhow::Result { + let lp = ListParams::default().labels(&format!("app={label}")); + let pod = pods + .list(&lp) + .await? + .items + .pop() + .with_context(|| format!("Pod not found: {label}"))?; + anyhow::ensure!(is_pod_running(&pod), "Pod is not running"); + Ok(pod) } /// Creates a namespace in k8s cluster @@ -481,3 +398,36 @@ pub async fn create_or_reuse_pod_reader_role(client: &Client) -> anyhow::Result< Ok(()) } + +fn get_cli_args(consensus_node: &ConsensusNode) -> Vec { + let mut cli_args = [ + "--config".to_string(), + config::encode_with_serializer( + &Serde(consensus_node.config.clone()), + serde_json::Serializer::new(vec![]), + ), + "--node-key".to_string(), + TextFmt::encode(&consensus_node.key), + ] + .to_vec(); + if let Some(key) = &consensus_node.validator_key { + cli_args.append(&mut ["--validator-key".to_string(), TextFmt::encode(key)].to_vec()) + }; + cli_args +} + +async fn retry(retries: usize, delay: Duration, mut f: F) -> anyhow::Result +where + F: FnMut() -> Fut, + Fut: std::future::Future>, +{ + let mut interval = time::interval(delay); + for count in 0.. { + interval.tick().await; + let result = f().await; + if result.is_ok() || count > retries { + return result; + } + } + unreachable!("Loop should always return") +} diff --git a/node/tools/src/lib.rs b/node/tools/src/lib.rs index 554e9904..9a6263b8 100644 --- a/node/tools/src/lib.rs +++ b/node/tools/src/lib.rs @@ -9,5 +9,5 @@ mod store; #[cfg(test)] mod tests; -pub use config::{decode_json, AppConfig, ConfigPaths, NodeAddr, NODES_PORT}; +pub use config::{decode_json, AppConfig, ConfigArgs, ConfigSource, NodeAddr, NODES_PORT}; pub use rpc::server::RPCServer; diff --git a/node/tools/src/main.rs b/node/tools/src/main.rs index 4d74990b..a17efa32 100644 --- a/node/tools/src/main.rs +++ b/node/tools/src/main.rs @@ -7,52 +7,83 @@ use tracing::metadata::LevelFilter; use tracing_subscriber::{prelude::*, Registry}; use vise_exporter::MetricsExporter; use zksync_concurrency::{ctx, scope}; -use zksync_consensus_tools::{decode_json, ConfigPaths, NodeAddr, RPCServer}; +use zksync_consensus_crypto::{Text, TextFmt}; +use zksync_consensus_roles::{node, validator}; +use zksync_consensus_tools::{decode_json, AppConfig, ConfigArgs, ConfigSource, RPCServer}; use zksync_protobuf::serde::Serde; -/// Wrapper for Vec. -#[derive(Debug, Clone)] -struct NodeAddrs(Vec>); - -impl std::str::FromStr for NodeAddrs { - type Err = anyhow::Error; - fn from_str(s: &str) -> Result { - Ok(Self(decode_json(s)?)) - } -} - /// Command-line application launching a node executor. #[derive(Debug, Parser)] -struct Args { +struct Cli { + /// Full json config + #[arg(long, + value_parser(parse_config), + requires="node_key", + conflicts_with_all=["config_file", "validator_key_file", "node_key_file"])] + config: Option>, + /// Plain node key + #[arg(long, + value_parser(parse_key::), + requires="config", + conflicts_with_all=["config_file", "validator_key_file", "node_key_file"])] + node_key: Option, + /// Plain validator key + #[arg(long, + value_parser(parse_key::), + requires_all=["config", "node_key"], + conflicts_with_all=["config_file", "validator_key_file", "node_key_file"])] + validator_key: Option, /// Path to a validator key file. If set to an empty string, validator key will not be read /// (i.e., a node will be initialized as a non-validator node). - #[arg(long, default_value = "./validator_key")] - validator_key: PathBuf, + #[arg(long, + default_value = "./validator_key", + conflicts_with_all=["config", "validator_key", "node_key"])] + validator_key_file: PathBuf, /// Path to a JSON file with node configuration. - #[arg(long, default_value = "./config.json")] + #[arg(long, + default_value = "./config.json", + conflicts_with_all=["config", "validator_key", "node_key"])] config_file: PathBuf, /// Path to a node key file. - #[arg(long, default_value = "./node_key")] - node_key: PathBuf, + #[arg(long, + default_value = "./node_key", + conflicts_with_all=["config", "validator_key", "node_key"])] + node_key_file: PathBuf, /// Path to the rocksdb database of the node. #[arg(long, default_value = "./database")] database: PathBuf, /// Port for the RPC server. #[arg(long)] rpc_port: Option, - /// IP address and key of the seed peers. - #[arg(long)] - add_gossip_static_outbound: Option, } -impl Args { +/// Function to let clap parse the command line `config` argument +fn parse_config(val: &str) -> anyhow::Result> { + decode_json(val) +} + +/// Node/validator key parser for clap +fn parse_key(val: &str) -> anyhow::Result { + Text::new(val).decode().context("failed decoding key") +} + +impl Cli { /// Extracts configuration paths from these args. - fn config_paths(&self) -> ConfigPaths<'_> { - ConfigPaths { - app: &self.config_file, - node_key: &self.node_key, - validator_key: (!self.validator_key.as_os_str().is_empty()) - .then_some(&self.validator_key), + fn config_args(&self) -> ConfigArgs<'_> { + let config_args = match &self.config { + Some(config) => ConfigSource::CliConfig { + config: config.clone().0, + node_key: self.node_key.clone().unwrap(), // node_key is present as it is enforced by clap rules + validator_key: self.validator_key.clone(), + }, + None => ConfigSource::PathConfig { + config_file: &self.config_file, + validator_key_file: &self.validator_key_file, + node_key_file: &self.node_key_file, + }, + }; + ConfigArgs { + config_args, database: &self.database, } } @@ -60,7 +91,7 @@ impl Args { #[tokio::main] async fn main() -> anyhow::Result<()> { - let args: Args = Args::parse(); + let args: Cli = Cli::parse(); tracing::trace!(?args, "Starting node"); let ctx = &ctx::root(); @@ -96,22 +127,11 @@ async fn main() -> anyhow::Result<()> { // Load the config files. tracing::debug!("Loading config files."); - let mut configs = args - .config_paths() - .load() - .context("config_paths().load()")?; + let mut configs = args.config_args().load().context("config_args().load()")?; // if `PUBLIC_ADDR` env var is set, use it to override publicAddr in config configs.app.check_public_addr().context("Public Address")?; - // Add gossipStaticOutbound pairs from cli to config - if let Some(addrs) = args.add_gossip_static_outbound { - configs - .app - .gossip_static_outbound - .extend(addrs.0.into_iter().map(|e| (e.0.key, e.0.addr))); - } - let (executor, runner) = configs .make_executor(ctx) .await diff --git a/node/tools/src/store.rs b/node/tools/src/store.rs index f2832ae3..af581000 100644 --- a/node/tools/src/store.rs +++ b/node/tools/src/store.rs @@ -8,7 +8,7 @@ use std::{ }; use zksync_concurrency::{ctx, error::Wrap as _, scope}; use zksync_consensus_roles::validator; -use zksync_consensus_storage::{PersistentBlockStore, ReplicaState, ReplicaStore}; +use zksync_consensus_storage::{BlockStoreState, PersistentBlockStore, ReplicaState, ReplicaStore}; /// Enum used to represent a key in the database. It also acts as a separator between different stores. #[derive(Debug, Clone, PartialEq, Eq)] @@ -107,8 +107,12 @@ impl PersistentBlockStore for RocksDB { Ok(self.0.genesis.clone()) } - async fn last(&self, _ctx: &ctx::Ctx) -> ctx::Result> { - Ok(scope::wait_blocking(|| self.last_blocking()).await?) + async fn state(&self, _ctx: &ctx::Ctx) -> ctx::Result { + Ok(BlockStoreState { + // `RocksDB` is assumed to store all blocks starting from genesis. + first: self.0.genesis.fork.first_block, + last: scope::wait_blocking(|| self.last_blocking()).await?, + }) } async fn block(