diff --git a/Cargo.lock b/Cargo.lock index 7ea2f9d..2caeb20 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -233,6 +233,7 @@ version = "0.1.0" dependencies = [ "axum", "capture", + "envconfig", "time", "tokio", "tracing", @@ -379,6 +380,26 @@ dependencies = [ "cfg-if", ] +[[package]] +name = "envconfig" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ea81cc7e21f55a9d9b1efb6816904978d0bfbe31a50347cb24b2e75564bcac9b" +dependencies = [ + "envconfig_derive", +] + +[[package]] +name = "envconfig_derive" +version = "0.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7dfca278e5f84b45519acaaff758ebfa01f18e96998bc24b8f1b722dd804b9bf" +dependencies = [ + "proc-macro2", + "quote", + "syn 1.0.109", +] + [[package]] name = "equivalent" version = "1.0.1" diff --git a/capture-server/Cargo.toml b/capture-server/Cargo.toml index 6378532..fa7151e 100644 --- a/capture-server/Cargo.toml +++ b/capture-server/Cargo.toml @@ -10,3 +10,4 @@ tokio = { workspace = true } tracing-subscriber = { workspace = true } tracing = { workspace = true } time = { workspace = true } +envconfig = "0.10.0" diff --git a/capture-server/src/main.rs b/capture-server/src/main.rs index 1a78f22..46d8e28 100644 --- a/capture-server/src/main.rs +++ b/capture-server/src/main.rs @@ -1,4 +1,4 @@ -use std::env; +use envconfig::Envconfig; use std::net::SocketAddr; use std::sync::Arc; @@ -6,6 +6,17 @@ use capture::{billing_limits::BillingLimiter, redis::RedisClient, router, sink}; use time::Duration; use tokio::signal; +#[derive(Envconfig)] +struct Config { + #[envconfig(default = "false")] + print_sink: bool, + #[envconfig(default = "127.0.0.1:3000")] + address: SocketAddr, + redis_url: String, + kafka_hosts: String, + kafka_topic: String, +} + async fn shutdown() { let mut term = signal::unix::signal(signal::unix::SignalKind::terminate()) .expect("failed to register SIGTERM handler"); @@ -23,18 +34,15 @@ async fn shutdown() { #[tokio::main] async fn main() { - let use_print_sink = env::var("PRINT_SINK").is_ok(); - let address = env::var("ADDRESS").unwrap_or(String::from("127.0.0.1:3000")); - let redis_addr = - env::var("REDIS_URL").expect("redis required; please set the REDIS_URL env var"); + let config = Config::init_from_env().expect("Invalid configuration:"); let redis_client = - Arc::new(RedisClient::new(redis_addr).expect("failed to create redis client")); + Arc::new(RedisClient::new(config.redis_url).expect("failed to create redis client")); let billing = BillingLimiter::new(Duration::seconds(5), redis_client.clone()) .expect("failed to create billing limiter"); - let app = if use_print_sink { + let app = if config.print_sink { router::router( capture::time::SystemTime {}, sink::PrintSink {}, @@ -43,10 +51,7 @@ async fn main() { true, ) } else { - let brokers = env::var("KAFKA_HOSTS").expect("Expected KAFKA_HOSTS"); - let topic = env::var("KAFKA_TOPIC").expect("Expected KAFKA_TOPIC"); - - let sink = sink::KafkaSink::new(topic, brokers).unwrap(); + let sink = sink::KafkaSink::new(config.kafka_topic, config.kafka_hosts).unwrap(); router::router( capture::time::SystemTime {}, @@ -58,14 +63,14 @@ async fn main() { }; // initialize tracing - tracing_subscriber::fmt::init(); + // run our app with hyper // `axum::Server` is a re-export of `hyper::Server` - tracing::info!("listening on {}", address); + tracing::info!("listening on {}", config.address); - axum::Server::bind(&address.parse().unwrap()) + axum::Server::bind(&config.address) .serve(app.into_make_service_with_connect_info::()) .with_graceful_shutdown(shutdown()) .await