From 21cbcc528259908d2b7f5aeba38c54fd496688c7 Mon Sep 17 00:00:00 2001 From: Daniyar Itegulov Date: Wed, 18 Dec 2024 17:18:03 +1100 Subject: [PATCH] move server building logic to `api_server` crate --- Cargo.lock | 8 +- crates/api_server/Cargo.toml | 4 + crates/api_server/src/lib.rs | 2 + crates/api_server/src/server.rs | 138 ++++++++++++++++++++++++++++++++ crates/cli/Cargo.toml | 4 - crates/cli/src/main.rs | 133 ++++++++---------------------- 6 files changed, 181 insertions(+), 108 deletions(-) create mode 100644 crates/api_server/src/server.rs diff --git a/Cargo.lock b/Cargo.lock index d57872be..a8d0df6c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -391,7 +391,6 @@ name = "anvil-zksync" version = "0.2.1" dependencies = [ "alloy-signer-local", - "anvil_zksync_api_decl", "anvil_zksync_api_server", "anvil_zksync_config", "anvil_zksync_core", @@ -399,10 +398,7 @@ dependencies = [ "anyhow", "clap", "eyre", - "futures 0.3.31", "hex", - "http 1.1.0", - "jsonrpsee", "rand 0.8.5", "serde", "serde_json", @@ -433,9 +429,13 @@ dependencies = [ "anvil_zksync_core", "anvil_zksync_types", "anyhow", + "futures 0.3.31", "hex", + "http 1.1.0", "jsonrpsee", "thiserror", + "tower 0.4.13", + "tower-http", "tracing", "zksync_types", "zksync_web3_decl", diff --git a/crates/api_server/Cargo.toml b/crates/api_server/Cargo.toml index 991353c8..c040ce81 100644 --- a/crates/api_server/Cargo.toml +++ b/crates/api_server/Cargo.toml @@ -19,7 +19,11 @@ zksync_types.workspace = true zksync_web3_decl.workspace = true anyhow.workspace = true +futures.workspace = true hex.workspace = true +http.workspace = true jsonrpsee.workspace = true thiserror.workspace = true +tower.workspace = true +tower-http.workspace = true tracing.workspace = true diff --git a/crates/api_server/src/lib.rs b/crates/api_server/src/lib.rs index e6d9541d..6c0596e8 100644 --- a/crates/api_server/src/lib.rs +++ b/crates/api_server/src/lib.rs @@ -1,7 +1,9 @@ mod error; mod impls; +mod server; pub use impls::{ AnvilNamespace, ConfigNamespace, DebugNamespace, EthNamespace, EthTestNamespace, EvmNamespace, NetNamespace, Web3Namespace, ZksNamespace, }; +pub use server::{NodeServerBuilder, NodeServerHandle, NodeServerOptions}; diff --git a/crates/api_server/src/server.rs b/crates/api_server/src/server.rs new file mode 100644 index 00000000..13c1e10a --- /dev/null +++ b/crates/api_server/src/server.rs @@ -0,0 +1,138 @@ +use crate::{ + AnvilNamespace, ConfigNamespace, DebugNamespace, EthNamespace, EthTestNamespace, EvmNamespace, + NetNamespace, Web3Namespace, ZksNamespace, +}; +use anvil_zksync_api_decl::{ + AnvilNamespaceServer, ConfigNamespaceServer, DebugNamespaceServer, EthNamespaceServer, + EthTestNamespaceServer, EvmNamespaceServer, NetNamespaceServer, Web3NamespaceServer, + ZksNamespaceServer, +}; +use anvil_zksync_core::node::InMemoryNode; +use http::Method; +use jsonrpsee::server::middleware::http::ProxyGetRequestLayer; +use jsonrpsee::server::{AlreadyStoppedError, RpcServiceBuilder, ServerBuilder, ServerHandle}; +use jsonrpsee::RpcModule; +use std::future::Future; +use std::net::SocketAddr; +use std::pin::Pin; +use tower_http::cors::{AllowOrigin, CorsLayer}; + +#[derive(Debug, Default)] +pub struct NodeServerOptions { + health_api_enabled: bool, + cors_enabled: bool, + allow_origin: AllowOrigin, +} + +impl NodeServerOptions { + pub fn enable_health_api(&mut self) { + self.health_api_enabled = true; + } + + pub fn enable_cors(&mut self) { + self.health_api_enabled = true; + } + + pub fn set_allow_origin(&mut self, allow_origin: AllowOrigin) { + self.allow_origin = allow_origin; + } + + pub fn to_builder(self, node: InMemoryNode) -> NodeServerBuilder { + NodeServerBuilder::new(self, node) + } +} + +pub struct NodeServerBuilder { + options: NodeServerOptions, + rpc: RpcModule<()>, + server_futs: Vec>>>, +} + +impl NodeServerBuilder { + fn new(options: NodeServerOptions, node: InMemoryNode) -> Self { + let rpc = Self::default_rpc(node); + Self { + options, + rpc, + server_futs: Vec::new(), + } + } + + fn default_rpc(node: InMemoryNode) -> RpcModule<()> { + let mut rpc = RpcModule::new(()); + rpc.merge(EthNamespace::new(node.clone()).into_rpc()) + .unwrap(); + rpc.merge(EthTestNamespace::new(node.clone()).into_rpc()) + .unwrap(); + rpc.merge(AnvilNamespace::new(node.clone()).into_rpc()) + .unwrap(); + rpc.merge(EvmNamespace::new(node.clone()).into_rpc()) + .unwrap(); + rpc.merge(DebugNamespace::new(node.clone()).into_rpc()) + .unwrap(); + rpc.merge(NetNamespace::new(node.clone()).into_rpc()) + .unwrap(); + rpc.merge(ConfigNamespace::new(node.clone()).into_rpc()) + .unwrap(); + rpc.merge(ZksNamespace::new(node).into_rpc()).unwrap(); + rpc.merge(Web3Namespace.into_rpc()).unwrap(); + rpc + } + + pub async fn serve(&mut self, addr: SocketAddr) { + let cors_layers = tower::util::option_layer(self.options.cors_enabled.then(|| { + // `CorsLayer` adds CORS-specific headers to responses but does not do filtering by itself. + // CORS relies on browsers respecting server's access list response headers. + // See [`tower_http::cors`](https://docs.rs/tower-http/latest/tower_http/cors/index.html) + // for more details. + CorsLayer::new() + .allow_origin(self.options.allow_origin.clone()) + .allow_headers([http::header::CONTENT_TYPE]) + .allow_methods([Method::GET, Method::POST]) + })); + let health_api_layer = tower::util::option_layer( + self.options + .health_api_enabled + .then(|| ProxyGetRequestLayer::new("/health", "web3_clientVersion").unwrap()), + ); + let server_builder = ServerBuilder::default() + .http_only() + .set_http_middleware( + tower::ServiceBuilder::new() + .layer(cors_layers) + .layer(health_api_layer), + ) + .set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(100)); + + let server = server_builder.build(addr).await.unwrap(); + let rpc = self.rpc.clone(); + self.server_futs + .push(Box::pin(async move { server.start(rpc) })); + } + + pub async fn run(self) -> NodeServerHandle { + let handles = futures::future::join_all(self.server_futs).await; + NodeServerHandle { handles } + } +} + +/// Node's server handle. +/// +/// When all [`NodeServerHandle`]'s have been `dropped` or `stop` has been called +/// the server will be stopped. +#[derive(Debug, Clone)] +pub struct NodeServerHandle { + handles: Vec, +} + +impl NodeServerHandle { + /// Tell the server to stop without waiting for the server to stop. + pub fn stop(&self) -> Result<(), AlreadyStoppedError> { + self.handles.iter().map(|handle| handle.stop()).collect() + } + + /// Wait for the server to stop. + pub async fn stopped(self) { + futures::future::join_all(self.handles.into_iter().map(|handle| handle.stopped())).await; + } +} diff --git a/crates/cli/Cargo.toml b/crates/cli/Cargo.toml index f724dfbb..fab5be84 100644 --- a/crates/cli/Cargo.toml +++ b/crates/cli/Cargo.toml @@ -14,7 +14,6 @@ categories.workspace = true anvil_zksync_core.workspace = true anvil_zksync_config.workspace = true anvil_zksync_api_server.workspace = true -anvil_zksync_api_decl.workspace = true anvil_zksync_types.workspace = true zksync_types.workspace = true @@ -25,9 +24,6 @@ anyhow.workspace = true clap.workspace = true eyre.workspace = true hex.workspace = true -http.workspace = true -jsonrpsee.workspace = true -futures.workspace = true rand.workspace = true serde.workspace = true serde_json.workspace = true diff --git a/crates/cli/src/main.rs b/crates/cli/src/main.rs index 9ec12eea..455ed59e 100644 --- a/crates/cli/src/main.rs +++ b/crates/cli/src/main.rs @@ -1,15 +1,7 @@ use crate::bytecode_override::override_bytecodes; use crate::cli::{Cli, Command}; use crate::utils::update_with_fork_details; -use anvil_zksync_api_decl::{ - AnvilNamespaceServer, ConfigNamespaceServer, DebugNamespaceServer, EthNamespaceServer, - EthTestNamespaceServer, EvmNamespaceServer, NetNamespaceServer, Web3NamespaceServer, - ZksNamespaceServer, -}; -use anvil_zksync_api_server::{ - AnvilNamespace, ConfigNamespace, DebugNamespace, EthNamespace, EthTestNamespace, EvmNamespace, - NetNamespace, Web3Namespace, ZksNamespace, -}; +use anvil_zksync_api_server::NodeServerOptions; use anvil_zksync_config::constants::{ DEFAULT_ESTIMATE_GAS_PRICE_SCALE_FACTOR, DEFAULT_ESTIMATE_GAS_SCALE_FACTOR, DEFAULT_FAIR_PUBDATA_PRICE, DEFAULT_L1_GAS_PRICE, DEFAULT_L2_GAS_PRICE, LEGACY_RICH_WALLETS, @@ -24,96 +16,20 @@ use anvil_zksync_core::node::{ }; use anvil_zksync_core::observability::Observability; use anvil_zksync_core::system_contracts::SystemContracts; -use anyhow::anyhow; +use anyhow::{anyhow, Context}; use clap::Parser; -use futures::{ - channel::oneshot, - future::{self}, - FutureExt, -}; -use http::Method; -use jsonrpsee::server::middleware::http::ProxyGetRequestLayer; -use jsonrpsee::server::{RpcServiceBuilder, ServerBuilder}; use std::fs::File; use std::{env, net::SocketAddr, str::FromStr}; -use tower_http::cors::{AllowOrigin, CorsLayer}; +use tower_http::cors::AllowOrigin; use tracing_subscriber::filter::LevelFilter; use zksync_types::fee_model::{FeeModelConfigV2, FeeParams}; use zksync_types::H160; -use zksync_web3_decl::jsonrpsee::RpcModule; use zksync_web3_decl::namespaces::ZksNamespaceClient; mod bytecode_override; mod cli; mod utils; -#[allow(clippy::too_many_arguments)] -async fn build_json_http( - addr: SocketAddr, - node: InMemoryNode, - enable_health_api: bool, - cors_allow_origin: String, - enable_cors: bool, -) -> tokio::task::JoinHandle<()> { - let (sender, recv) = oneshot::channel::<()>(); - - let mut rpc = RpcModule::new(()); - rpc.merge(EthNamespace::new(node.clone()).into_rpc()) - .unwrap(); - rpc.merge(EthTestNamespace::new(node.clone()).into_rpc()) - .unwrap(); - rpc.merge(AnvilNamespace::new(node.clone()).into_rpc()) - .unwrap(); - rpc.merge(EvmNamespace::new(node.clone()).into_rpc()) - .unwrap(); - rpc.merge(DebugNamespace::new(node.clone()).into_rpc()) - .unwrap(); - rpc.merge(NetNamespace::new(node.clone()).into_rpc()) - .unwrap(); - rpc.merge(ConfigNamespace::new(node.clone()).into_rpc()) - .unwrap(); - rpc.merge(ZksNamespace::new(node.clone()).into_rpc()) - .unwrap(); - rpc.merge(Web3Namespace.into_rpc()).unwrap(); - - let cors_layers = tower::util::option_layer(enable_cors.then(|| { - // `CorsLayer` adds CORS-specific headers to responses but does not do filtering by itself. - // CORS relies on browsers respecting server's access list response headers. - // See [`tower_http::cors`](https://docs.rs/tower-http/latest/tower_http/cors/index.html) - // for more details. - CorsLayer::new() - .allow_origin(AllowOrigin::exact( - cors_allow_origin.parse().expect("malformed allow origin"), - )) - .allow_headers([http::header::CONTENT_TYPE]) - .allow_methods([Method::GET, Method::POST]) - })); - let health_api_layer = tower::util::option_layer(if enable_health_api { - Some(ProxyGetRequestLayer::new("/health", "web3_clientVersion").unwrap()) - } else { - None - }); - let server_builder = ServerBuilder::default() - .http_only() - .set_http_middleware( - tower::ServiceBuilder::new() - .layer(cors_layers) - .layer(health_api_layer), - ) - .set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(1024)); - - let server = server_builder.build(addr).await.unwrap(); - - tokio::spawn(async move { - let server_handle = server.start(rpc); - - server_handle.stopped().await; - drop(sender); - }); - - tokio::spawn(recv.map(drop)) -} - #[tokio::main] async fn main() -> anyhow::Result<()> { // Check for deprecated options @@ -338,31 +254,48 @@ async fn main() -> anyhow::Result<()> { node.set_rich_account(H160::from_str(address).unwrap(), config.genesis_balance); } - let mut threads = future::join_all(config.host.iter().map(|host| { + let mut server_options = NodeServerOptions::default(); + server_options.set_allow_origin(AllowOrigin::exact( + config + .allow_origin + .parse() + .context("allow origin is malformed")?, + )); + if config.health_check_endpoint { + server_options.enable_health_api() + } + if !config.no_cors { + server_options.enable_cors(); + } + let mut server_builder = server_options.to_builder(node.clone()); + for host in &config.host { let addr = SocketAddr::new(*host, config.port); - build_json_http( - addr, - node.clone(), - config.health_check_endpoint, - config.allow_origin.clone(), - !config.no_cors, - ) - })) - .await; + server_builder.serve(addr).await; + } + let server_handle = server_builder.run().await; let system_contracts = SystemContracts::from_options(&config.system_contracts_options, config.use_evm_emulator); let block_producer_handle = tokio::task::spawn(BlockProducer::new( - node.clone(), + node, pool, block_sealer, system_contracts, )); - threads.push(block_producer_handle); config.print(fork_print_info.as_ref()); - future::select_all(threads).await.0.unwrap(); + tokio::select! { + _ = tokio::signal::ctrl_c() => { + tracing::trace!("received shutdown signal, shutting down"); + }, + _ = server_handle.stopped() => { + tracing::trace!("node server was stopped") + }, + _ = block_producer_handle => { + tracing::trace!("block producer was stopped") + } + } Ok(()) }