Skip to content

Commit

Permalink
move server building logic to api_server crate
Browse files Browse the repository at this point in the history
  • Loading branch information
itegulov committed Dec 18, 2024
1 parent 88f18e4 commit 21cbcc5
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 108 deletions.
8 changes: 4 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions crates/api_server/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,11 @@ zksync_types.workspace = true
zksync_web3_decl.workspace = true

anyhow.workspace = true
futures.workspace = true
hex.workspace = true
http.workspace = true
jsonrpsee.workspace = true
thiserror.workspace = true
tower.workspace = true
tower-http.workspace = true
tracing.workspace = true
2 changes: 2 additions & 0 deletions crates/api_server/src/lib.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,9 @@
mod error;
mod impls;
mod server;

pub use impls::{
AnvilNamespace, ConfigNamespace, DebugNamespace, EthNamespace, EthTestNamespace, EvmNamespace,
NetNamespace, Web3Namespace, ZksNamespace,
};
pub use server::{NodeServerBuilder, NodeServerHandle, NodeServerOptions};
138 changes: 138 additions & 0 deletions crates/api_server/src/server.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use crate::{
AnvilNamespace, ConfigNamespace, DebugNamespace, EthNamespace, EthTestNamespace, EvmNamespace,
NetNamespace, Web3Namespace, ZksNamespace,
};
use anvil_zksync_api_decl::{
AnvilNamespaceServer, ConfigNamespaceServer, DebugNamespaceServer, EthNamespaceServer,
EthTestNamespaceServer, EvmNamespaceServer, NetNamespaceServer, Web3NamespaceServer,
ZksNamespaceServer,
};
use anvil_zksync_core::node::InMemoryNode;
use http::Method;
use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
use jsonrpsee::server::{AlreadyStoppedError, RpcServiceBuilder, ServerBuilder, ServerHandle};
use jsonrpsee::RpcModule;
use std::future::Future;
use std::net::SocketAddr;
use std::pin::Pin;
use tower_http::cors::{AllowOrigin, CorsLayer};

#[derive(Debug, Default)]
pub struct NodeServerOptions {
health_api_enabled: bool,
cors_enabled: bool,
allow_origin: AllowOrigin,
}

impl NodeServerOptions {
pub fn enable_health_api(&mut self) {
self.health_api_enabled = true;
}

pub fn enable_cors(&mut self) {
self.health_api_enabled = true;
}

pub fn set_allow_origin(&mut self, allow_origin: AllowOrigin) {
self.allow_origin = allow_origin;
}

pub fn to_builder(self, node: InMemoryNode) -> NodeServerBuilder {
NodeServerBuilder::new(self, node)
}
}

pub struct NodeServerBuilder {
options: NodeServerOptions,
rpc: RpcModule<()>,
server_futs: Vec<Pin<Box<dyn Future<Output = ServerHandle>>>>,
}

impl NodeServerBuilder {
fn new(options: NodeServerOptions, node: InMemoryNode) -> Self {
let rpc = Self::default_rpc(node);
Self {
options,
rpc,
server_futs: Vec::new(),
}
}

fn default_rpc(node: InMemoryNode) -> RpcModule<()> {
let mut rpc = RpcModule::new(());
rpc.merge(EthNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(EthTestNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(AnvilNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(EvmNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(DebugNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(NetNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(ConfigNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(ZksNamespace::new(node).into_rpc()).unwrap();
rpc.merge(Web3Namespace.into_rpc()).unwrap();
rpc
}

pub async fn serve(&mut self, addr: SocketAddr) {
let cors_layers = tower::util::option_layer(self.options.cors_enabled.then(|| {
// `CorsLayer` adds CORS-specific headers to responses but does not do filtering by itself.
// CORS relies on browsers respecting server's access list response headers.
// See [`tower_http::cors`](https://docs.rs/tower-http/latest/tower_http/cors/index.html)
// for more details.
CorsLayer::new()
.allow_origin(self.options.allow_origin.clone())
.allow_headers([http::header::CONTENT_TYPE])
.allow_methods([Method::GET, Method::POST])
}));
let health_api_layer = tower::util::option_layer(
self.options
.health_api_enabled
.then(|| ProxyGetRequestLayer::new("/health", "web3_clientVersion").unwrap()),
);
let server_builder = ServerBuilder::default()
.http_only()
.set_http_middleware(
tower::ServiceBuilder::new()
.layer(cors_layers)
.layer(health_api_layer),
)
.set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(100));

let server = server_builder.build(addr).await.unwrap();
let rpc = self.rpc.clone();
self.server_futs
.push(Box::pin(async move { server.start(rpc) }));
}

pub async fn run(self) -> NodeServerHandle {
let handles = futures::future::join_all(self.server_futs).await;
NodeServerHandle { handles }
}
}

/// Node's server handle.
///
/// When all [`NodeServerHandle`]'s have been `dropped` or `stop` has been called
/// the server will be stopped.
#[derive(Debug, Clone)]
pub struct NodeServerHandle {
handles: Vec<ServerHandle>,
}

impl NodeServerHandle {
/// Tell the server to stop without waiting for the server to stop.
pub fn stop(&self) -> Result<(), AlreadyStoppedError> {
self.handles.iter().map(|handle| handle.stop()).collect()

Check failure on line 131 in crates/api_server/src/server.rs

View workflow job for this annotation

GitHub Actions / lint

`.map().collect()` can be replaced with `.try_for_each()`
}

/// Wait for the server to stop.
pub async fn stopped(self) {
futures::future::join_all(self.handles.into_iter().map(|handle| handle.stopped())).await;
}
}
4 changes: 0 additions & 4 deletions crates/cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ categories.workspace = true
anvil_zksync_core.workspace = true
anvil_zksync_config.workspace = true
anvil_zksync_api_server.workspace = true
anvil_zksync_api_decl.workspace = true
anvil_zksync_types.workspace = true

zksync_types.workspace = true
Expand All @@ -25,9 +24,6 @@ anyhow.workspace = true
clap.workspace = true
eyre.workspace = true
hex.workspace = true
http.workspace = true
jsonrpsee.workspace = true
futures.workspace = true
rand.workspace = true
serde.workspace = true
serde_json.workspace = true
Expand Down
133 changes: 33 additions & 100 deletions crates/cli/src/main.rs
Original file line number Diff line number Diff line change
@@ -1,15 +1,7 @@
use crate::bytecode_override::override_bytecodes;
use crate::cli::{Cli, Command};
use crate::utils::update_with_fork_details;
use anvil_zksync_api_decl::{
AnvilNamespaceServer, ConfigNamespaceServer, DebugNamespaceServer, EthNamespaceServer,
EthTestNamespaceServer, EvmNamespaceServer, NetNamespaceServer, Web3NamespaceServer,
ZksNamespaceServer,
};
use anvil_zksync_api_server::{
AnvilNamespace, ConfigNamespace, DebugNamespace, EthNamespace, EthTestNamespace, EvmNamespace,
NetNamespace, Web3Namespace, ZksNamespace,
};
use anvil_zksync_api_server::NodeServerOptions;
use anvil_zksync_config::constants::{
DEFAULT_ESTIMATE_GAS_PRICE_SCALE_FACTOR, DEFAULT_ESTIMATE_GAS_SCALE_FACTOR,
DEFAULT_FAIR_PUBDATA_PRICE, DEFAULT_L1_GAS_PRICE, DEFAULT_L2_GAS_PRICE, LEGACY_RICH_WALLETS,
Expand All @@ -24,96 +16,20 @@ use anvil_zksync_core::node::{
};
use anvil_zksync_core::observability::Observability;
use anvil_zksync_core::system_contracts::SystemContracts;
use anyhow::anyhow;
use anyhow::{anyhow, Context};
use clap::Parser;
use futures::{
channel::oneshot,
future::{self},
FutureExt,
};
use http::Method;
use jsonrpsee::server::middleware::http::ProxyGetRequestLayer;
use jsonrpsee::server::{RpcServiceBuilder, ServerBuilder};
use std::fs::File;
use std::{env, net::SocketAddr, str::FromStr};
use tower_http::cors::{AllowOrigin, CorsLayer};
use tower_http::cors::AllowOrigin;
use tracing_subscriber::filter::LevelFilter;
use zksync_types::fee_model::{FeeModelConfigV2, FeeParams};
use zksync_types::H160;
use zksync_web3_decl::jsonrpsee::RpcModule;
use zksync_web3_decl::namespaces::ZksNamespaceClient;

mod bytecode_override;
mod cli;
mod utils;

#[allow(clippy::too_many_arguments)]
async fn build_json_http(
addr: SocketAddr,
node: InMemoryNode,
enable_health_api: bool,
cors_allow_origin: String,
enable_cors: bool,
) -> tokio::task::JoinHandle<()> {
let (sender, recv) = oneshot::channel::<()>();

let mut rpc = RpcModule::new(());
rpc.merge(EthNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(EthTestNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(AnvilNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(EvmNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(DebugNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(NetNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(ConfigNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(ZksNamespace::new(node.clone()).into_rpc())
.unwrap();
rpc.merge(Web3Namespace.into_rpc()).unwrap();

let cors_layers = tower::util::option_layer(enable_cors.then(|| {
// `CorsLayer` adds CORS-specific headers to responses but does not do filtering by itself.
// CORS relies on browsers respecting server's access list response headers.
// See [`tower_http::cors`](https://docs.rs/tower-http/latest/tower_http/cors/index.html)
// for more details.
CorsLayer::new()
.allow_origin(AllowOrigin::exact(
cors_allow_origin.parse().expect("malformed allow origin"),
))
.allow_headers([http::header::CONTENT_TYPE])
.allow_methods([Method::GET, Method::POST])
}));
let health_api_layer = tower::util::option_layer(if enable_health_api {
Some(ProxyGetRequestLayer::new("/health", "web3_clientVersion").unwrap())
} else {
None
});
let server_builder = ServerBuilder::default()
.http_only()
.set_http_middleware(
tower::ServiceBuilder::new()
.layer(cors_layers)
.layer(health_api_layer),
)
.set_rpc_middleware(RpcServiceBuilder::new().rpc_logger(1024));

let server = server_builder.build(addr).await.unwrap();

tokio::spawn(async move {
let server_handle = server.start(rpc);

server_handle.stopped().await;
drop(sender);
});

tokio::spawn(recv.map(drop))
}

#[tokio::main]
async fn main() -> anyhow::Result<()> {
// Check for deprecated options
Expand Down Expand Up @@ -338,31 +254,48 @@ async fn main() -> anyhow::Result<()> {
node.set_rich_account(H160::from_str(address).unwrap(), config.genesis_balance);
}

let mut threads = future::join_all(config.host.iter().map(|host| {
let mut server_options = NodeServerOptions::default();
server_options.set_allow_origin(AllowOrigin::exact(
config
.allow_origin
.parse()
.context("allow origin is malformed")?,
));
if config.health_check_endpoint {
server_options.enable_health_api()
}
if !config.no_cors {
server_options.enable_cors();
}
let mut server_builder = server_options.to_builder(node.clone());
for host in &config.host {
let addr = SocketAddr::new(*host, config.port);
build_json_http(
addr,
node.clone(),
config.health_check_endpoint,
config.allow_origin.clone(),
!config.no_cors,
)
}))
.await;
server_builder.serve(addr).await;
}
let server_handle = server_builder.run().await;

let system_contracts =
SystemContracts::from_options(&config.system_contracts_options, config.use_evm_emulator);
let block_producer_handle = tokio::task::spawn(BlockProducer::new(
node.clone(),
node,
pool,
block_sealer,
system_contracts,
));
threads.push(block_producer_handle);

config.print(fork_print_info.as_ref());

future::select_all(threads).await.0.unwrap();
tokio::select! {
_ = tokio::signal::ctrl_c() => {
tracing::trace!("received shutdown signal, shutting down");
},
_ = server_handle.stopped() => {
tracing::trace!("node server was stopped")
},
_ = block_producer_handle => {
tracing::trace!("block producer was stopped")
}
}

Ok(())
}

0 comments on commit 21cbcc5

Please sign in to comment.