From c21b76f476f369a44e3dea3a6f0b7a1579523f18 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Fri, 30 Aug 2024 21:57:52 +0800 Subject: [PATCH 1/4] api server --- bin/api/Cargo.toml | 2 + bin/api/src/main.rs | 253 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 247 insertions(+), 8 deletions(-) diff --git a/bin/api/Cargo.toml b/bin/api/Cargo.toml index 7ae4c19..2b49882 100644 --- a/bin/api/Cargo.toml +++ b/bin/api/Cargo.toml @@ -12,6 +12,7 @@ exclude.workspace = true eth2.workspace = true clap.workspace = true +ctrlc.workspace = true futures-util.workspace = true reqwest.workspace = true serde_json.workspace = true @@ -21,6 +22,7 @@ tokio.workspace = true eyre.workspace = true again.workspace = true hex.workspace = true +tracing-appender.workspace = true tracing-subscriber.workspace = true uuid.workspace = true warp.workspace = true diff --git a/bin/api/src/main.rs b/bin/api/src/main.rs index 280245a..0ff795e 100644 --- a/bin/api/src/main.rs +++ b/bin/api/src/main.rs @@ -1,23 +1,260 @@ +use std::fs; +use std::path::PathBuf; +use std::str::FromStr; +use std::sync::Arc; +use std::time::Duration; + +use clap::Parser; +use ctrlc::set_handler; +use eth2::types::Hash256; +use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts}; +use serde::{Deserialize, Serialize}; +use tokio::sync::Mutex; +use tracing_appender::rolling::{RollingFileAppender, Rotation}; +use tracing_subscriber::layer::SubscriberExt; +use tracing_subscriber::util::SubscriberInitExt; +use tracing_subscriber::{fmt, EnvFilter}; + +use blob_archiver_beacon::beacon_client; +use blob_archiver_beacon::beacon_client::BeaconClientEth2; +use blob_archiver_storage::fs::FSStorage; +use blob_archiver_storage::s3::{S3Config, S3Storage}; +use blob_archiver_storage::storage; +use blob_archiver_storage::storage::{Storage, StorageType}; + mod api; #[allow(dead_code)] static INIT: std::sync::Once = std::sync::Once::new(); -use clap::Parser; +#[tokio::main] +async fn main() { + let args = CliArgs::parse(); + + let config: Config = args.to_config(); + init_logging( + config.log_config.verbose, + config.log_config.log_dir.clone(), + config + .log_config + .log_rotation + .clone() + .map(|s| to_rotation(s.as_str())), + ); + let beacon_client = BeaconNodeHttpClient::new( + SensitiveUrl::from_str(config.beacon_config.beacon_endpoint.as_str()).unwrap(), + Timeouts::set_all(config.beacon_config.beacon_client_timeout), + ); + let storage: Arc> = if config.storage_config.storage_type == StorageType::FS + { + Arc::new(Mutex::new( + FSStorage::new(config.storage_config.fs_dir.clone().unwrap()) + .await + .unwrap(), + )) + } else { + Arc::new(Mutex::new( + S3Storage::new(config.storage_config.s3_config.clone().unwrap()) + .await + .unwrap(), + )) + }; + let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false); + set_handler(move || { + tracing::info!("shutting down"); + shutdown_tx + .send(true) + .expect("could not send shutdown signal"); + }) + .expect("could not register shutdown handler"); + + let beacon_client_eth2 = BeaconClientEth2 { beacon_client }; + let api = api::Api::new(Arc::new(Mutex::new(beacon_client_eth2)), storage.clone()); + let addr: std::net::SocketAddr = config.listen_addr.parse().expect("Invalid listen address"); + + let (_, server) = warp::serve(api.routes()) + .bind_with_graceful_shutdown(addr, async move { + shutdown_rx.clone().changed().await.ok(); + }); + server.await; + +} + +fn init_logging(verbose: u8, log_dir: Option, rotation: Option) { + INIT.call_once(|| { + setup_tracing(verbose, log_dir, rotation).expect("Failed to setup tracing"); + }); +} + +#[allow(dead_code)] +pub fn setup_tracing( + verbose: u8, + log_dir: Option, + rotation: Option, +) -> eyre::Result<()> { + let filter = match verbose { + 0 => EnvFilter::new("error"), + 1 => EnvFilter::new("warn"), + 2 => EnvFilter::new("info"), + 3 => EnvFilter::new("debug"), + _ => EnvFilter::new("trace"), + }; + + let subscriber = tracing_subscriber::registry() + .with(EnvFilter::from_default_env()) + .with(filter); + + if let Some(log_dir) = log_dir { + fs::create_dir_all(&log_dir) + .map_err(|e| eyre::eyre!("Failed to create log directory: {}", e))?; -fn main() { - println!("Hello, world!"); + let file_appender = RollingFileAppender::new( + rotation.unwrap_or(Rotation::DAILY), + log_dir, + "blob-archiver.log", + ); + let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); + let file_layer = fmt::layer().with_writer(non_blocking).with_ansi(false); + + subscriber + .with(file_layer) + .with(fmt::layer().with_writer(std::io::stdout)) + .try_init()?; + } else { + subscriber + .with(fmt::layer().with_writer(std::io::stdout)) + .try_init()?; + } + + Ok(()) } #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] struct CliArgs { - #[clap(short, long, value_parser, default_value = "config.toml")] - config: String, + #[clap(short = 'v', long, action = clap::ArgAction::Count, default_value = "4")] + verbose: u8, + + #[clap(long)] + log_dir: Option, + + #[clap(long, help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER")] + log_rotation: Option, + + #[clap(long, required = true)] + beacon_endpoint: String, + + #[clap(long, default_value = "10")] + beacon_client_timeout: u64, + + #[clap(long, default_value = "6")] + poll_interval: u64, + + #[clap(long, default_value = "0.0.0.0:8000")] + listen_addr: String, - #[clap(short, long, action = clap::ArgAction::Count)] + #[clap(long, required = true)] + origin_block: String, + + #[clap(long, default_value = "s3")] + storage_type: String, + + #[clap(long)] + s3_endpoint: Option, + + #[clap(long)] + s3_bucket: Option, + + #[clap(long)] + s3_path: Option, + + #[clap(long, default_value = "false")] + s3_compress: Option, + #[clap(long)] + fs_dir: Option, +} + +impl CliArgs { + fn to_config(&self) -> Config { + let log_config = LogConfig { + log_dir: self.log_dir.as_ref().map(PathBuf::from), + log_rotation: self.log_rotation.clone(), + verbose: self.verbose, + }; + + let beacon_config = beacon_client::Config { + beacon_endpoint: self.beacon_endpoint.clone(), + beacon_client_timeout: Duration::from_secs(self.beacon_client_timeout), + }; + + let storage_type = StorageType::from_str(self.storage_type.as_str()).unwrap(); + + let s3_config = if storage_type == StorageType::S3 { + Some(S3Config { + endpoint: self.s3_endpoint.clone().unwrap(), + bucket: self.s3_bucket.clone().unwrap(), + path: self.s3_path.clone().unwrap(), + compression: self.s3_compress.unwrap(), + }) + } else { + None + }; + + let fs_dir = self.fs_dir.as_ref().map(PathBuf::from); + + let storage_config = storage::Config { + storage_type, + s3_config, + fs_dir, + }; + + let mut padded_hex = self + .origin_block + .strip_prefix("0x") + .unwrap_or(&self.origin_block) + .to_string(); + padded_hex = format!("{:0<64}", padded_hex); + let origin_block = Hash256::from_slice(&hex::decode(padded_hex).expect("Invalid hex")); + + Config { + poll_interval: Duration::from_secs(self.poll_interval), + listen_addr: self.listen_addr.clone(), + origin_block, + beacon_config, + storage_config, + log_config, + } + } +} + +#[allow(dead_code)] +fn to_rotation(s: &str) -> Rotation { + match s { + "DAILY" => Rotation::DAILY, + "HOURLY" => Rotation::HOURLY, + "MINUTELY" => Rotation::MINUTELY, + _ => Rotation::NEVER, + } +} + +#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)] +pub struct LogConfig { + log_dir: Option, + log_rotation: Option, verbose: u8, +} + +#[derive(Debug, PartialEq, Eq, Clone, Default, Serialize, Deserialize)] +pub struct Config { + pub poll_interval: Duration, + + pub listen_addr: String, + + pub origin_block: Hash256, + + pub beacon_config: beacon_client::Config, + + pub storage_config: storage::Config, - #[clap(short, long)] - dry_run: bool, + pub log_config: LogConfig, } From bb721375c3416b795b5c58a11d270ec167973e26 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Sat, 31 Aug 2024 22:27:56 +0800 Subject: [PATCH 2/4] add git ignore --- .gitignore | 6 ++++++ 1 file changed, 6 insertions(+) diff --git a/.gitignore b/.gitignore index 6985cf1..839fd39 100644 --- a/.gitignore +++ b/.gitignore @@ -12,3 +12,9 @@ Cargo.lock # MSVC Windows builds of rustc generate these, which store debugging information *.pdb + +.idea +.vscode +fs-logs +fs + From a81d09b124c7745d6169b88a0a27d6ef858d29a5 Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Sat, 31 Aug 2024 23:35:20 +0800 Subject: [PATCH 3/4] add git ignore and modify verbose flag --- .env.default | 50 +++++++++++++++++++++ Cargo.lock | 2 + api.Dockerfile | 25 +++++++++++ archiver.Dockerfile | 6 ++- bin/api/src/main.rs | 93 ++++++++++++++++++++++------------------ bin/archiver/src/main.rs | 28 ++++++------ docker-compose.yaml | 44 +++++++++++++++++++ 7 files changed, 191 insertions(+), 57 deletions(-) create mode 100644 .env.default create mode 100644 api.Dockerfile create mode 100644 docker-compose.yaml diff --git a/.env.default b/.env.default new file mode 100644 index 0000000..2ca8ab4 --- /dev/null +++ b/.env.default @@ -0,0 +1,50 @@ +# LOG_LEVEL +VERBOSE=3 + +# log output path +# LOG_DIR=./logs + +# log rotation values: DAILY, HOURLY, MINUTELY, NEVER +# LOG_ROTATION=DAILY + +# beacon chain endpoint url +BEACON_ENDPOINT=http://localhost:3500 + +# beacon client request timeout +#BEACON_CLIENT_TIMEOUT=10 + +# beacon client poll interval +#POLL_INTERVAL=6 + +# blob archiver api server listening address +LISTEN_ADDR=0.0.0.0:8000 + +# blob archiver origin block number +ORIGIN_BLOCK=0x1 + +# blob data storage strategy type: s3, fs +STORAGE_TYPE=s3 + +# s3 storage credentials access key id +AWS_ACCESS_KEY_ID=admin + +# s3 storage credentials secret access key +AWS_SECRET_ACCESS_KEY=password + +# s3 storage region just use default value: us-east-1 +AWS_REGION=us-east-1 + +# s3 storage endpoint +S3_ENDPOINT=http://localhost:9000 + +# s3 storage bucket +S3_BUCKET= + +# s3 storage path +S3_PATH= + +# s3 storage compress +S3_COMPRESS=true + +# local file storage directory +FS_DIR=./fs diff --git a/Cargo.lock b/Cargo.lock index 256144c..157852d 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -320,6 +320,7 @@ dependencies = [ "blob-archiver-beacon", "blob-archiver-storage", "clap", + "ctrlc", "eth2", "eyre", "futures-util", @@ -330,6 +331,7 @@ dependencies = [ "ssz_rs", "tokio", "tracing", + "tracing-appender", "tracing-subscriber", "uuid 1.10.0", "warp", diff --git a/api.Dockerfile b/api.Dockerfile new file mode 100644 index 0000000..c88e9e4 --- /dev/null +++ b/api.Dockerfile @@ -0,0 +1,25 @@ +# Use the official Rust image as the base +FROM rust:latest as builder + +RUN apt-get update && apt-get install -y cmake + +# Set the working directory +WORKDIR /usr/src/blob-archiver-rs + +# Copy the entire project +COPY . . + +# Build the project +RUN cargo build --release -p api + +# Create a new stage with a minimal image +FROM ubuntu:latest + +# Install OpenSSL +RUN apt-get update && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/* + +# Copy the built binary from the builder stage +COPY --from=builder /usr/src/blob-archiver-rs/target/release/api /usr/local/bin/api + +# Set the entrypoint +ENTRYPOINT ["api"] diff --git a/archiver.Dockerfile b/archiver.Dockerfile index 76b5140..8416c8d 100644 --- a/archiver.Dockerfile +++ b/archiver.Dockerfile @@ -1,6 +1,8 @@ # Use the official Rust image as the base FROM rust:latest as builder +RUN apt-get update && apt-get install -y cmake + # Set the working directory WORKDIR /usr/src/blob-archiver-rs @@ -8,10 +10,10 @@ WORKDIR /usr/src/blob-archiver-rs COPY . . # Build the project -RUN cargo build --release +RUN cargo build --release -p archiver # Create a new stage with a minimal image -FROM debian:buster-slim +FROM ubuntu:latest # Install OpenSSL RUN apt-get update && apt-get install -y openssl ca-certificates && rm -rf /var/lib/apt/lists/* diff --git a/bin/api/src/main.rs b/bin/api/src/main.rs index 0ff795e..cd5f60f 100644 --- a/bin/api/src/main.rs +++ b/bin/api/src/main.rs @@ -8,12 +8,14 @@ use clap::Parser; use ctrlc::set_handler; use eth2::types::Hash256; use eth2::{BeaconNodeHttpClient, SensitiveUrl, Timeouts}; +use eyre::eyre; use serde::{Deserialize, Serialize}; use tokio::sync::Mutex; +use tracing::Level; use tracing_appender::rolling::{RollingFileAppender, Rotation}; +use tracing_subscriber::fmt; +use tracing_subscriber::fmt::writer::MakeWriterExt; use tracing_subscriber::layer::SubscriberExt; -use tracing_subscriber::util::SubscriberInitExt; -use tracing_subscriber::{fmt, EnvFilter}; use blob_archiver_beacon::beacon_client; use blob_archiver_beacon::beacon_client::BeaconClientEth2; @@ -72,12 +74,10 @@ async fn main() { let api = api::Api::new(Arc::new(Mutex::new(beacon_client_eth2)), storage.clone()); let addr: std::net::SocketAddr = config.listen_addr.parse().expect("Invalid listen address"); - let (_, server) = warp::serve(api.routes()) - .bind_with_graceful_shutdown(addr, async move { + let (_, server) = warp::serve(api.routes()).bind_with_graceful_shutdown(addr, async move { shutdown_rx.clone().changed().await.ok(); }); server.await; - } fn init_logging(verbose: u8, log_dir: Option, rotation: Option) { @@ -92,18 +92,6 @@ pub fn setup_tracing( log_dir: Option, rotation: Option, ) -> eyre::Result<()> { - let filter = match verbose { - 0 => EnvFilter::new("error"), - 1 => EnvFilter::new("warn"), - 2 => EnvFilter::new("info"), - 3 => EnvFilter::new("debug"), - _ => EnvFilter::new("trace"), - }; - - let subscriber = tracing_subscriber::registry() - .with(EnvFilter::from_default_env()) - .with(filter); - if let Some(log_dir) = log_dir { fs::create_dir_all(&log_dir) .map_err(|e| eyre::eyre!("Failed to create log directory: {}", e))?; @@ -114,63 +102,86 @@ pub fn setup_tracing( "blob-archiver.log", ); let (non_blocking, _guard) = tracing_appender::non_blocking(file_appender); - let file_layer = fmt::layer().with_writer(non_blocking).with_ansi(false); - - subscriber - .with(file_layer) - .with(fmt::layer().with_writer(std::io::stdout)) - .try_init()?; + let file_layer = fmt::layer() + .with_writer(non_blocking.with_max_level(match verbose { + 0 => Level::ERROR, + 1 => Level::WARN, + 2 => Level::INFO, + 3 => Level::DEBUG, + _ => Level::TRACE, + })) + .with_ansi(false); + + let subscriber = + tracing_subscriber::registry() + .with(file_layer) + .with( + fmt::layer().with_writer(std::io::stdout.with_max_level(match verbose { + 0 => Level::ERROR, + 1 => Level::WARN, + 2 => Level::INFO, + 3 => Level::DEBUG, + _ => Level::TRACE, + })), + ); + tracing::subscriber::set_global_default(subscriber).map_err(|e| eyre!(e))?; } else { - subscriber - .with(fmt::layer().with_writer(std::io::stdout)) - .try_init()?; + let subscriber = tracing_subscriber::registry().with(fmt::layer().with_writer( + std::io::stdout.with_max_level(match verbose { + 0 => Level::ERROR, + 1 => Level::WARN, + 2 => Level::INFO, + 3 => Level::DEBUG, + _ => Level::TRACE, + }), + )); + tracing::subscriber::set_global_default(subscriber).map_err(|e| eyre!(e))?; } - Ok(()) } #[derive(Parser, Debug)] #[clap(author, version, about, long_about = None)] struct CliArgs { - #[clap(short = 'v', long, action = clap::ArgAction::Count, default_value = "4")] + #[clap(long, env = "VERBOSE", default_value = "2")] verbose: u8, - #[clap(long)] + #[clap(long, env = "LOG_DIR")] log_dir: Option, - #[clap(long, help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER")] + #[clap(long, env = "LOG_ROTATION", help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER")] log_rotation: Option, - #[clap(long, required = true)] + #[clap(long, env = "BEACON_ENDPOINT", required = true)] beacon_endpoint: String, - #[clap(long, default_value = "10")] + #[clap(long, env = "BEACON_CLIENT_TIMEOUT", default_value = "10")] beacon_client_timeout: u64, - #[clap(long, default_value = "6")] + #[clap(long, env = "POLL_INTERVAL", default_value = "6")] poll_interval: u64, - #[clap(long, default_value = "0.0.0.0:8000")] + #[clap(long, env = "LISTEN_ADDR", default_value = "0.0.0.0:8000")] listen_addr: String, - #[clap(long, required = true)] + #[clap(long, env = "ORIGIN_BLOCK", required = true)] origin_block: String, - #[clap(long, default_value = "s3")] + #[clap(long, env = "STORAGE_TYPE", default_value = "s3")] storage_type: String, - #[clap(long)] + #[clap(long, env = "S3_ENDPOINT")] s3_endpoint: Option, - #[clap(long)] + #[clap(long, env = "S3_BUCKET")] s3_bucket: Option, - #[clap(long)] + #[clap(long, env = "S3_PATH")] s3_path: Option, - #[clap(long, default_value = "false")] + #[clap(long, env = "S3_COMPRESS", default_value = "false")] s3_compress: Option, - #[clap(long)] + #[clap(long, env = "FS_DIR")] fs_dir: Option, } diff --git a/bin/archiver/src/main.rs b/bin/archiver/src/main.rs index 80958c0..ddfa07a 100644 --- a/bin/archiver/src/main.rs +++ b/bin/archiver/src/main.rs @@ -171,45 +171,45 @@ pub fn setup_tracing( #[derive(Parser, Serialize)] pub struct CliArgs { - #[clap(short = 'v', long, action = clap::ArgAction::Count, default_value = "2")] + #[clap(long, env = "VERBOSE", default_value = "2")] verbose: u8, - #[clap(long)] + #[clap(long, env = "LOG_DIR")] log_dir: Option, - #[clap(long, help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER")] + #[clap(long, env = "LOG_ROTATION", help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER")] log_rotation: Option, - #[clap(long, required = true)] + #[clap(long, env = "BEACON_ENDPOINT", required = true)] beacon_endpoint: String, - #[clap(long, default_value = "10")] + #[clap(long, env = "BEACON_CLIENT_TIMEOUT", default_value = "10")] beacon_client_timeout: u64, - #[clap(long, default_value = "6")] + #[clap(long, env = "POLL_INTERVAL", default_value = "6")] poll_interval: u64, - #[clap(long, default_value = "0.0.0.0:8000")] + #[clap(long, env = "LISTEN_ADDR", default_value = "0.0.0.0:8000")] listen_addr: String, - #[clap(long, required = true)] + #[clap(long, env = "ORIGIN_BLOCK", required = true)] origin_block: String, - #[clap(long, default_value = "s3")] + #[clap(long, env = "STORAGE_TYPE", default_value = "s3")] storage_type: String, - #[clap(long)] + #[clap(long, env = "S3_ENDPOINT")] s3_endpoint: Option, - #[clap(long)] + #[clap(long, env = "S3_BUCKET")] s3_bucket: Option, - #[clap(long)] + #[clap(long, env = "S3_PATH")] s3_path: Option, - #[clap(long, default_value = "false")] + #[clap(long, env = "S3_COMPRESS", default_value = "false")] s3_compress: Option, - #[clap(long)] + #[clap(long, env = "FS_DIR")] fs_dir: Option, } diff --git a/docker-compose.yaml b/docker-compose.yaml new file mode 100644 index 0000000..acac179 --- /dev/null +++ b/docker-compose.yaml @@ -0,0 +1,44 @@ +version: "3.7" + +services: + api: + build: + context: . + dockerfile: api.Dockerfile + env_file: + - .env + ports: + - "8000:8000" + depends_on: + - minio + - create-buckets + archiver: + build: + context: . + dockerfile: archiver.Dockerfile + env_file: + - .env + depends_on: + - minio + - create-buckets + minio: + restart: unless-stopped + image: minio/minio:latest + ports: + - "9000:9000" + - "9999:9999" + environment: + MINIO_ROOT_USER: admin + MINIO_ROOT_PASSWORD: password + entrypoint: minio server /data --console-address ":9999" + create-buckets: + image: minio/mc + depends_on: + - minio + entrypoint: > + /bin/sh -c " + /usr/bin/mc alias set minio http://minio:9000 admin password; + /usr/bin/mc mb minio/blobs; + /usr/bin/mc anonymous set public minio/blobs; + exit 0; + " \ No newline at end of file From c6df1f69fae50d810ea73231c8752ce895aba73f Mon Sep 17 00:00:00 2001 From: thinkAfCod Date: Mon, 2 Sep 2024 17:57:47 +0800 Subject: [PATCH 4/4] fix format --- bin/api/src/main.rs | 6 +++++- bin/archiver/src/main.rs | 6 +++++- 2 files changed, 10 insertions(+), 2 deletions(-) diff --git a/bin/api/src/main.rs b/bin/api/src/main.rs index cd5f60f..7bf3433 100644 --- a/bin/api/src/main.rs +++ b/bin/api/src/main.rs @@ -149,7 +149,11 @@ struct CliArgs { #[clap(long, env = "LOG_DIR")] log_dir: Option, - #[clap(long, env = "LOG_ROTATION", help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER")] + #[clap( + long, + env = "LOG_ROTATION", + help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER" + )] log_rotation: Option, #[clap(long, env = "BEACON_ENDPOINT", required = true)] diff --git a/bin/archiver/src/main.rs b/bin/archiver/src/main.rs index ddfa07a..fee7f63 100644 --- a/bin/archiver/src/main.rs +++ b/bin/archiver/src/main.rs @@ -177,7 +177,11 @@ pub struct CliArgs { #[clap(long, env = "LOG_DIR")] log_dir: Option, - #[clap(long, env = "LOG_ROTATION", help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER")] + #[clap( + long, + env = "LOG_ROTATION", + help = "Log rotation values: DAILY, HOURLY, MINUTELY, NEVER" + )] log_rotation: Option, #[clap(long, env = "BEACON_ENDPOINT", required = true)]