From 5b9f36dd16526f67ac641b8e3ad760b8070d59ec Mon Sep 17 00:00:00 2001 From: Fabrizio Sestito Date: Mon, 4 Dec 2023 12:19:24 +0100 Subject: [PATCH 1/4] refactor: clean-up cli module from parsing and etup functions Signed-off-by: Fabrizio Sestito --- src/cli.rs | 135 ----------------------------------------------------- 1 file changed, 135 deletions(-) diff --git a/src/cli.rs b/src/cli.rs index 4b682bbc..2ca54a3e 100644 --- a/src/cli.rs +++ b/src/cli.rs @@ -1,20 +1,8 @@ -use crate::settings::{read_policies_file, Policy}; -use anyhow::{anyhow, Result}; use clap::builder::PossibleValue; use clap::{crate_authors, crate_description, crate_name, crate_version, Arg, Command}; use itertools::Itertools; use lazy_static::lazy_static; use policy_evaluator::burrego; -use policy_evaluator::policy_fetcher::{ - sources::{read_sources_file, Sources}, - verify::config::{read_verification_file, LatestVerificationConfig}, -}; -use std::{collections::HashMap, env, net::SocketAddr, path::Path}; -use tracing_subscriber::prelude::*; -use tracing_subscriber::{fmt, EnvFilter}; - -static SERVICE_NAME: &str = "kubewarden-policy-server"; -const DOCKER_CONFIG_ENV_VAR: &str = "DOCKER_CONFIG"; lazy_static! { static ref VERSION_AND_BUILTINS: String = { @@ -30,8 +18,6 @@ lazy_static! { builtins, ) }; - pub(crate) static ref HOSTNAME: String = - std::env::var("HOSTNAME").unwrap_or_else(|_| String::from("unknown")); } pub(crate) fn build_cli() -> Command { @@ -232,124 +218,3 @@ pub(crate) fn build_cli() -> Command { ) .long_version(VERSION_AND_BUILTINS.as_str()) } - -pub(crate) fn api_bind_address(matches: &clap::ArgMatches) -> Result { - format!( - "{}:{}", - matches.get_one::("address").unwrap(), - matches.get_one::("port").unwrap() - ) - .parse() - .map_err(|e| anyhow!("error parsing arguments: {}", e)) -} - -pub(crate) fn tls_files(matches: &clap::ArgMatches) -> Result<(String, String)> { - let cert_file = matches.get_one::("cert-file").unwrap().to_owned(); - let key_file = matches.get_one::("key-file").unwrap().to_owned(); - if cert_file.is_empty() != key_file.is_empty() { - Err(anyhow!("error parsing arguments: either both --cert-file and --key-file must be provided, or neither")) - } else { - Ok((cert_file, key_file)) - } -} - -pub(crate) fn policies(matches: &clap::ArgMatches) -> Result> { - let policies_file = Path::new(matches.get_one::("policies").unwrap()); - read_policies_file(policies_file).map_err(|e| { - anyhow!( - "error while loading policies from {:?}: {}", - policies_file, - e - ) - }) -} - -pub(crate) fn verification_config( - matches: &clap::ArgMatches, -) -> Result> { - match matches.get_one::("verification-path") { - None => Ok(None), - Some(path) => { - let verification_file = Path::new(path); - Ok(Some(read_verification_file(verification_file)?)) - } - } -} - -// Setup the tracing system. This MUST be done inside of a tokio Runtime -// because some collectors rely on it and would panic otherwise. -pub(crate) fn setup_tracing(matches: &clap::ArgMatches) -> Result<()> { - // setup logging - let filter_layer = EnvFilter::new(matches.get_one::("log-level").unwrap()) - // some of our dependencies generate trace events too, but we don't care about them -> - // let's filter them - .add_directive("cranelift_codegen=off".parse().unwrap()) - .add_directive("cranelift_wasm=off".parse().unwrap()) - .add_directive("h2=off".parse().unwrap()) - .add_directive("hyper=off".parse().unwrap()) - .add_directive("regalloc=off".parse().unwrap()) - .add_directive("tower=off".parse().unwrap()) - .add_directive("wasmtime_cranelift=off".parse().unwrap()) - .add_directive("wasmtime_jit=off".parse().unwrap()); - - match matches.get_one::("log-fmt").unwrap().as_str() { - "json" => tracing_subscriber::registry() - .with(filter_layer) - .with(fmt::layer().json()) - .init(), - "text" => { - let enable_color = !matches.contains_id("log-no-color"); - let layer = fmt::layer().with_ansi(enable_color); - - tracing_subscriber::registry() - .with(filter_layer) - .with(layer) - .init() - } - "otlp" => { - // Create a new OpenTelemetry pipeline sending events to a - // OpenTelemetry collector using the OTLP format. - // The collector must run on localhost (eg: use a sidecar inside of k8s) - // using GRPC - let tracer = opentelemetry_otlp::new_pipeline() - .tracing() - .with_exporter(opentelemetry_otlp::new_exporter().tonic()) - .with_trace_config(opentelemetry_sdk::trace::config().with_resource( - opentelemetry_sdk::Resource::new(vec![opentelemetry::KeyValue::new( - "service.name", - SERVICE_NAME, - )]), - )) - .install_batch(opentelemetry_sdk::runtime::Tokio)?; - - // Create a tracing layer with the configured tracer - let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); - tracing_subscriber::registry() - .with(filter_layer) - .with(telemetry) - .with(fmt::layer()) - .init() - } - - _ => return Err(anyhow!("Unknown log message format")), - }; - - Ok(()) -} - -pub(crate) fn remote_server_options(matches: &clap::ArgMatches) -> Result> { - let sources = match matches.get_one::("sources-path") { - Some(sources_file) => Some( - read_sources_file(Path::new(sources_file)) - .map_err(|e| anyhow!("error while loading sources from {}: {}", sources_file, e))?, - ), - None => None, - }; - - if let Some(docker_config_json_path) = matches.get_one::("docker-config-json-path") { - // docker_credential crate expects the config path in the $DOCKER_CONFIG. Keep docker-config-json-path parameter for backwards compatibility - env::set_var(DOCKER_CONFIG_ENV_VAR, docker_config_json_path); - } - - Ok(sources) -} From 6a62c6cf29f38b9e4620035085dc9217443d35f3 Mon Sep 17 00:00:00 2001 From: Fabrizio Sestito Date: Mon, 4 Dec 2023 12:20:02 +0100 Subject: [PATCH 2/4] refactor: rename settings module to config module; add policy_server Config struct and cli parsing Signed-off-by: Fabrizio Sestito --- src/api/audit_and_validation.rs | 6 +- src/communication.rs | 2 +- src/{settings.rs => config.rs} | 196 +++++++++++++++++++++++++++++++- src/policy_downloader.rs | 12 +- src/server.rs | 7 +- src/worker.rs | 4 +- src/worker_pool.rs | 9 +- 7 files changed, 215 insertions(+), 21 deletions(-) rename src/{settings.rs => config.rs} (50%) diff --git a/src/api/audit_and_validation.rs b/src/api/audit_and_validation.rs index 76a08904..5c4afeef 100644 --- a/src/api/audit_and_validation.rs +++ b/src/api/audit_and_validation.rs @@ -24,7 +24,7 @@ use crate::{ name = "audit", fields( request_uid=tracing::field::Empty, - host=crate::cli::HOSTNAME.as_str(), + host=crate::config::HOSTNAME.as_str(), policy_id=policy_id.as_str(), name=tracing::field::Empty, namespace=tracing::field::Empty, @@ -60,7 +60,7 @@ pub(crate) async fn audit( name = "validation", fields( request_uid=tracing::field::Empty, - host=crate::cli::HOSTNAME.as_str(), + host=crate::config::HOSTNAME.as_str(), policy_id=policy_id.as_str(), name=tracing::field::Empty, namespace=tracing::field::Empty, @@ -91,7 +91,7 @@ pub(crate) async fn validation( name = "validation_raw", fields( request_uid=tracing::field::Empty, - host=crate::cli::HOSTNAME.as_str(), + host=crate::config::HOSTNAME.as_str(), policy_id=policy_id.as_str(), allowed=tracing::field::Empty, mutated=tracing::field::Empty, diff --git a/src/communication.rs b/src/communication.rs index 1f68fb9b..a59ae38f 100644 --- a/src/communication.rs +++ b/src/communication.rs @@ -5,8 +5,8 @@ use policy_evaluator::policy_evaluator::ValidateRequest; use std::collections::HashMap; use tokio::sync::oneshot; +use crate::config::Policy; use crate::policy_downloader::FetchedPolicies; -use crate::settings::Policy; #[derive(Debug, Clone)] pub(crate) enum RequestOrigin { diff --git a/src/settings.rs b/src/config.rs similarity index 50% rename from src/settings.rs rename to src/config.rs index a9ba2a6c..62c98311 100644 --- a/src/settings.rs +++ b/src/config.rs @@ -1,12 +1,204 @@ use anyhow::{anyhow, Result}; +use clap::ArgMatches; +use lazy_static::lazy_static; +use policy_evaluator::policy_fetcher::sources::{read_sources_file, Sources}; +use policy_evaluator::policy_fetcher::verify::config::{ + read_verification_file, LatestVerificationConfig, VerificationConfigV1, +}; use policy_evaluator::policy_metadata::ContextAwareResource; use serde::Deserialize; use serde_yaml::Value; use std::collections::{BTreeSet, HashMap}; +use std::env; use std::fs::File; use std::iter::FromIterator; -use std::path::Path; +use std::net::SocketAddr; +use std::path::{Path, PathBuf}; + +pub static SERVICE_NAME: &str = "kubewarden-policy-server"; +const DOCKER_CONFIG_ENV_VAR: &str = "DOCKER_CONFIG"; + +lazy_static! { + pub(crate) static ref HOSTNAME: String = + std::env::var("HOSTNAME").unwrap_or_else(|_| String::from("unknown")); +} + +pub struct Config { + pub addr: SocketAddr, + pub sources: Option, + pub policies: HashMap, + pub policies_download_dir: PathBuf, + pub ignore_kubernetes_connection_failure: bool, + pub always_accept_admission_reviews_on_namespace: Option, + pub policy_evaluation_limit: Option, + pub tls_config: Option, + pub pool_size: usize, + pub metrics_enabled: bool, + pub sigstore_cache_dir: PathBuf, + pub verification_config: Option, + pub log_level: String, + pub log_fmt: String, + pub log_no_color: bool, + pub daemon: bool, + pub daemon_pid_file: String, + pub daemon_stdout_file: Option, + pub daemon_stderr_file: Option, +} + +pub struct TlsConfig { + pub cert_file: String, + pub key_file: String, +} + +impl Config { + pub fn from_args(matches: &ArgMatches) -> Result { + // init some variables based on the cli parameters + let addr = api_bind_address(matches)?; + + let policies = policies(matches)?; + let policies_download_dir = matches + .get_one::("policies-download-dir") + .map(PathBuf::from) + .expect("This should not happen, there's a default value for policies-download-dir"); + let policy_evaluation_limit = if matches.contains_id("disable-timeout-protection") { + None + } else { + Some( + matches + .get_one::("policy-timeout") + .expect("policy-timeout should always be set") + .parse::()?, + ) + }; + let sources = remote_server_options(matches)?; + let pool_size = matches + .get_one::("workers") + .map_or_else(num_cpus::get, |v| { + v.parse::() + .expect("error parsing the number of workers") + }); + let always_accept_admission_reviews_on_namespace = matches + .get_one::("always-accept-admission-reviews-on-namespace") + .map(|s| s.to_owned()); + + let metrics_enabled = matches.contains_id("enable-metrics"); + let ignore_kubernetes_connection_failure = + matches.contains_id("ignore-kubernetes-connection-failure"); + let verification_config = verification_config(matches)?; + let sigstore_cache_dir = matches + .get_one::("sigstore-cache-dir") + .map(PathBuf::from) + .expect("This should not happen, there's a default value for sigstore-cache-dir"); + + let daemon = matches.contains_id("daemon"); + let daemon_pid_file = matches + .get_one::("daemon-pid-file") + .expect("This should not happen, there's a default value for daemon-pid-file") + .to_owned(); + let daemon_stdout_file = matches.get_one::("daemon-stdout-file").cloned(); + let daemon_stderr_file = matches.get_one::("daemon-stderr-file").cloned(); + + let log_level = matches + .get_one::("log-level") + .expect("This should not happen, there's a default value for log-level") + .to_owned(); + let log_fmt = matches + .get_one::("log-fmt") + .expect("This should not happen, there's a default value for log-fmt") + .to_owned(); + let log_no_color = matches.contains_id("log-no-color"); + let (cert_file, key_file) = tls_files(matches)?; + let tls_config = if cert_file.is_empty() { + None + } else { + Some(TlsConfig { + cert_file, + key_file, + }) + }; + + Ok(Self { + addr, + sources, + policies, + policies_download_dir, + ignore_kubernetes_connection_failure, + tls_config, + always_accept_admission_reviews_on_namespace, + policy_evaluation_limit, + pool_size, + metrics_enabled, + sigstore_cache_dir, + verification_config, + log_level, + log_fmt, + log_no_color, + daemon, + daemon_pid_file, + daemon_stdout_file, + daemon_stderr_file, + }) + } +} + +fn api_bind_address(matches: &clap::ArgMatches) -> Result { + format!( + "{}:{}", + matches.get_one::("address").unwrap(), + matches.get_one::("port").unwrap() + ) + .parse() + .map_err(|e| anyhow!("error parsing arguments: {}", e)) +} + +fn tls_files(matches: &clap::ArgMatches) -> Result<(String, String)> { + let cert_file = matches.get_one::("cert-file").unwrap().to_owned(); + let key_file = matches.get_one::("key-file").unwrap().to_owned(); + if cert_file.is_empty() != key_file.is_empty() { + Err(anyhow!("error parsing arguments: either both --cert-file and --key-file must be provided, or neither")) + } else { + Ok((cert_file, key_file)) + } +} + +fn policies(matches: &clap::ArgMatches) -> Result> { + let policies_file = Path::new(matches.get_one::("policies").unwrap()); + read_policies_file(policies_file).map_err(|e| { + anyhow!( + "error while loading policies from {:?}: {}", + policies_file, + e + ) + }) +} + +fn verification_config(matches: &clap::ArgMatches) -> Result> { + match matches.get_one::("verification-path") { + None => Ok(None), + Some(path) => { + let verification_file = Path::new(path); + Ok(Some(read_verification_file(verification_file)?)) + } + } +} + +fn remote_server_options(matches: &clap::ArgMatches) -> Result> { + let sources = match matches.get_one::("sources-path") { + Some(sources_file) => Some( + read_sources_file(Path::new(sources_file)) + .map_err(|e| anyhow!("error while loading sources from {}: {}", sources_file, e))?, + ), + None => None, + }; + + if let Some(docker_config_json_path) = matches.get_one::("docker-config-json-path") { + // docker_credential crate expects the config path in the $DOCKER_CONFIG. Keep docker-config-json-path parameter for backwards compatibility + env::set_var(DOCKER_CONFIG_ENV_VAR, docker_config_json_path); + } + + Ok(sources) +} #[derive(Deserialize, Debug, Clone, Default)] pub enum PolicyMode { @@ -87,7 +279,7 @@ fn convert_yaml_map_to_json( /// and Policy as values. The key is the name of the policy as provided by the user /// inside of the configuration file. This name is used to build the API path /// exposing the policy. -pub fn read_policies_file(path: &Path) -> Result> { +fn read_policies_file(path: &Path) -> Result> { let settings_file = File::open(path)?; let ps: HashMap = serde_yaml::from_reader(&settings_file)?; Ok(ps) diff --git a/src/policy_downloader.rs b/src/policy_downloader.rs index 6f92848a..82e2f36f 100644 --- a/src/policy_downloader.rs +++ b/src/policy_downloader.rs @@ -8,6 +8,7 @@ use policy_evaluator::{ verify::{config::LatestVerificationConfig, FulcioAndRekorData, Verifier}, }, }; +use std::path::Path; use std::{ collections::{HashMap, HashSet}, fs, @@ -16,7 +17,7 @@ use std::{ use tokio::task::spawn_blocking; use tracing::{debug, info}; -use crate::settings::Policy; +use crate::config::Policy; /// A Map with the `policy.url` as key, /// and a `PathBuf` as value. The `PathBuf` points to the location where @@ -59,12 +60,15 @@ impl Downloader { pub async fn download_policies( &mut self, policies: &HashMap, - destination: &str, + destination: impl AsRef, verification_config: Option<&LatestVerificationConfig>, ) -> Result { let policies_total = policies.len(); info!( - download_dir = destination, + download_dir = destination + .as_ref() + .to_str() + .expect("cannot convert path to string"), policies_count = policies_total, status = "init", "policies download", @@ -128,7 +132,7 @@ impl Downloader { let fetched_policy = policy_fetcher::fetch_policy( &policy.url, - policy_fetcher::PullDestination::Store(PathBuf::from(destination)), + policy_fetcher::PullDestination::Store(destination.as_ref().to_path_buf()), self.sources.as_ref(), ) .await diff --git a/src/server.rs b/src/server.rs index 58f9e730..53f51aa9 100644 --- a/src/server.rs +++ b/src/server.rs @@ -1,12 +1,7 @@ use std::net::SocketAddr; use tokio::sync::mpsc::Sender; -use crate::communication::EvalRequest; - -pub(crate) struct TlsConfig { - pub cert_file: String, - pub key_file: String, -} +use crate::{communication::EvalRequest, config::TlsConfig}; pub(crate) async fn run_server( addr: &SocketAddr, diff --git a/src/worker.rs b/src/worker.rs index 42a2a177..59c7a2c2 100644 --- a/src/worker.rs +++ b/src/worker.rs @@ -11,8 +11,8 @@ use tokio::sync::mpsc::{Receiver, Sender}; use tracing::{error, info, info_span}; use crate::communication::{EvalRequest, RequestOrigin}; +use crate::config::{Policy, PolicyMode}; use crate::metrics::{self}; -use crate::settings::{Policy, PolicyMode}; use crate::worker_pool::PrecompiledPolicies; struct PolicyEvaluatorWithSettings { @@ -42,7 +42,7 @@ impl fmt::Display for PolicyErrors { impl Worker { #[tracing::instrument( name = "worker_new", - fields(host=crate::cli::HOSTNAME.as_str()), + fields(host=crate::config::HOSTNAME.as_str()), skip_all, )] pub(crate) fn new( diff --git a/src/worker_pool.rs b/src/worker_pool.rs index 0abbb8ba..f29d3f9c 100644 --- a/src/worker_pool.rs +++ b/src/worker_pool.rs @@ -25,9 +25,12 @@ use std::{ use tokio::sync::{mpsc, oneshot}; use tracing::{debug, error, info, warn}; -use crate::communication::{EvalRequest, WorkerPoolBootRequest}; use crate::policy_downloader::FetchedPolicies; use crate::worker::Worker; +use crate::{ + communication::{EvalRequest, WorkerPoolBootRequest}, + config, +}; lazy_static! { static ref KUBEWARDEN_VERSION: Version = { @@ -338,7 +341,7 @@ impl WorkerPool { pub(crate) fn build_policy_evaluator( policy_id: &str, - policy: &crate::settings::Policy, + policy: &config::Policy, engine: &wasmtime::Engine, policy_modules: &PrecompiledPolicies, callback_handler_tx: mpsc::Sender, @@ -426,7 +429,7 @@ fn precompile_policies( fn verify_policy_settings( engine: &wasmtime::Engine, - policies: &HashMap, + policies: &HashMap, policy_modules: &HashMap, callback_handler_tx: mpsc::Sender, policy_evaluation_limit_seconds: Option, From 173ec86629be99b90318970248cb95ca449130a8 Mon Sep 17 00:00:00 2001 From: Fabrizio Sestito Date: Mon, 4 Dec 2023 12:49:16 +0100 Subject: [PATCH 3/4] refactor: introduce lib.rs; refactor main.rs Signed-off-by: Fabrizio Sestito --- src/lib.rs | 412 ++++++++++++++++++++++++++++++++++++++++++++++++++++ src/main.rs | 409 +-------------------------------------------------- 2 files changed, 415 insertions(+), 406 deletions(-) create mode 100644 src/lib.rs diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 00000000..3011945d --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,412 @@ +extern crate k8s_openapi; +extern crate policy_evaluator; + +use anyhow::{anyhow, Result}; +use config::Config; +use lazy_static::lazy_static; +use opentelemetry::global::shutdown_tracer_provider; +use policy_evaluator::policy_fetcher::sigstore; +use policy_evaluator::policy_fetcher::verify::FulcioAndRekorData; +use policy_evaluator::{callback_handler::CallbackHandlerBuilder, kube}; +use std::fs; +use std::{process, sync::RwLock, thread}; +use tokio::{runtime::Runtime, sync::mpsc, sync::oneshot}; +use tracing::{debug, error, info, warn}; +use tracing_subscriber::prelude::*; +use tracing_subscriber::{fmt, EnvFilter}; + +mod admission_review; +mod api; +pub mod config; +mod metrics; +mod raw_review; +mod server; +mod worker; + +mod policy_downloader; +use policy_downloader::Downloader; + +mod worker_pool; +use worker_pool::WorkerPool; + +mod communication; +use communication::{EvalRequest, WorkerPoolBootRequest}; + +lazy_static! { + static ref TRACE_SYSTEM_INITIALIZED: RwLock = RwLock::new(false); +} + +pub fn run(config: Config) -> Result<()> { + // Run in daemon mode if specified by the user + if config.daemon { + println!("Running instance as a daemon"); + + let mut daemonize = daemonize::Daemonize::new().pid_file(config.daemon_pid_file); + if let Some(stdout_file) = config.daemon_stdout_file { + let file = fs::File::create(stdout_file) + .map_err(|e| anyhow!("Cannot create file for daemon stdout: {}", e))?; + daemonize = daemonize.stdout(file); + } + if let Some(stderr_file) = config.daemon_stderr_file { + let file = fs::File::create(stderr_file) + .map_err(|e| anyhow!("Cannot create file for daemon stderr: {}", e))?; + daemonize = daemonize.stderr(file); + } + + daemonize + .start() + .map_err(|e| anyhow!("Cannot daemonize: {}", e))?; + + println!("Detached from shell, now running in background."); + } + + //////////////////////////////////////////////////////////////////////////// + // // + // Phase 1: setup the CallbackHandler. This is used by the synchronous // + // world (the waPC host_callback) to request the execution of code that // + // can be run only inside of asynchronous world. // + // An example of that, is a policy that changes the container image // + // references to ensure they use immutable shasum instead of tags. // + // The act of retrieving the container image manifest digest requires a // + // network request, which is fulfilled using asynchronous code. // + // // + // The communication between the two worlds happens via tokio channels. // + // // + //////////////////////////////////////////////////////////////////////////// + + // This is a channel used to stop the tokio task that is run + // inside of the CallbackHandler + let (callback_handler_shutdown_channel_tx, callback_handler_shutdown_channel_rx) = + oneshot::channel(); + + let fulcio_and_rekor_data = match sigstore::tuf::SigstoreRepository::fetch(None) { + Ok(repo) => Some(FulcioAndRekorData::FromTufRepository { repo }), + Err(e) => { + // We cannot rely on `tracing` yet, because the tracing system has not + // been initialized, this has to be done inside of an async block, which + // we cannot use yet + eprintln!("Cannot fetch TUF repository: {e:?}"); + eprintln!("Sigstore Verifier created without Fulcio data: keyless signatures are going to be discarded because they cannot be verified"); + eprintln!( + "Sigstore Verifier created without Rekor data: transparency log data won't be used" + ); + eprintln!("Sigstore capabilities are going to be limited"); + None + } + }; + + let mut callback_handler_builder = + CallbackHandlerBuilder::new(callback_handler_shutdown_channel_rx) + .registry_config(config.sources.clone()) + .fulcio_and_rekor_data(fulcio_and_rekor_data.as_ref()); + + // Attempt to build kube::Client instance, this unfortunately needs an async context + // for a really limited amount of time. + // + // Important: the tokio runtime used to create the `kube::Client` **must** + // be the very same one used later on when the client is used. + let rt = Runtime::new()?; + + let kube_client: Option = rt.block_on(async { + match kube::Client::try_default().await { + Ok(client) => Some(client), + Err(e) => { + // We cannot rely on `tracing` yet, because the tracing system has not + // been initialized yet + eprintln!("Cannot connect to Kubernetes cluster: {e}"); + None + } + } + }); + + match kube_client { + Some(client) => { + callback_handler_builder = callback_handler_builder.kube_client(client); + } + None => { + if config.ignore_kubernetes_connection_failure { + // We cannot rely on `tracing` yet, because the tracing system has not + // been initialized yet + eprintln!( + "Cannot connect to Kubernetes, context aware policies will not work properly" + ); + } else { + return Err(anyhow!( + "Cannot connect to Kubernetes, context aware policies would not work properly" + )); + } + } + }; + + let mut callback_handler = callback_handler_builder.build()?; + let callback_sender_channel = callback_handler.sender_channel(); + + //////////////////////////////////////////////////////////////////////////// + // // + // Phase 2: setup the Wasm worker pool, this "lives" inside of a // + // dedicated system thread. // + // // + // The communication between the "synchronous world" (aka the Wasm worker // + // pool) and the "asynchronous world" (aka the http server) happens via // + // tokio channels. // + // // + //////////////////////////////////////////////////////////////////////////// + + // This is the channel used by the http server to communicate with the + // Wasm workers + let (api_tx, api_rx) = mpsc::channel::(32); + + // This is the channel used to have the asynchronous code trigger the + // bootstrap of the worker pool. The bootstrap must be triggered + // from within the asynchronous code because some of the tracing collectors + // (e.g. OpenTelemetry) require a tokio::Runtime to be available. + let (worker_pool_bootstrap_req_tx, worker_pool_bootstrap_req_rx) = + oneshot::channel::(); + + // Spawn the system thread that runs the main loop of the worker pool manager + let wasm_thread = thread::spawn(move || { + let worker_pool = WorkerPool::new( + worker_pool_bootstrap_req_rx, + api_rx, + callback_sender_channel, + config.always_accept_admission_reviews_on_namespace, + config.policy_evaluation_limit, + ); + worker_pool.run(); + }); + + //////////////////////////////////////////////////////////////////////////// + // // + // Phase 3: setup the asynchronous world. // + // // + // We setup the tokio Runtime manually, instead of relying on the the // + // `tokio::main` macro, because we don't want the "synchronous" world to // + // be spawned inside of one of the threads owned by the runtime. // + // // + //////////////////////////////////////////////////////////////////////////// + + rt.block_on(async { + // Setup the tracing system. This MUST be done inside of a tokio Runtime + // because some collectors rely on it and would panic otherwise. + match setup_tracing(&config.log_level, &config.log_fmt, config.log_no_color) { + Err(err) => { + fatal_error(err.to_string()); + unreachable!(); + } + Ok(_) => { + debug!("tracing system ready"); + let mut w = TRACE_SYSTEM_INITIALIZED.write().unwrap(); + *w = true; + } + }; + + // The unused variable is required so the meter is not dropped early and + // lives for the whole block lifetime, exporting metrics + let _meter = if config.metrics_enabled { + Some(metrics::init_meter()) + } else { + None + }; + + // Download policies + let mut downloader = match Downloader::new( + config.sources.clone(), + config.verification_config.is_some(), + Some(config.sigstore_cache_dir.clone()), + ) + .await + { + Ok(d) => d, + Err(e) => { + fatal_error(e.to_string()); + unreachable!() + } + }; + + let fetched_policies = match downloader + .download_policies( + &config.policies, + &config.policies_download_dir, + config.verification_config.as_ref(), + ) + .await + { + Ok(fp) => fp, + Err(e) => { + fatal_error(e.to_string()); + unreachable!() + } + }; + + // Spawn the tokio task used by the CallbackHandler + let callback_handle = tokio::spawn(async move { + info!(status = "init", "CallbackHandler task"); + callback_handler.loop_eval().await; + info!(status = "exit", "CallbackHandler task"); + }); + + // Bootstrap the worker pool + info!(status = "init", "worker pool bootstrap"); + let (worker_pool_bootstrap_res_tx, mut worker_pool_bootstrap_res_rx) = + oneshot::channel::>(); + let bootstrap_data = WorkerPoolBootRequest { + policies: config.policies, + fetched_policies, + pool_size: config.pool_size, + resp_chan: worker_pool_bootstrap_res_tx, + }; + if worker_pool_bootstrap_req_tx.send(bootstrap_data).is_err() { + fatal_error("Cannot send bootstrap data to worker pool".to_string()); + } + + // Wait for the worker pool to be fully bootstraped before moving on. + // + // It's really important to NOT start the web server before the workers are ready. + // Our Kubernetes deployment exposes a readiness probe that relies on the web server + // to be listening. The API server will start hitting the policy server as soon as the + // readiness probe marks the instance as ready. + // We don't want Kubernetes API server to send admission reviews before ALL the workers + // are ready. + loop { + match worker_pool_bootstrap_res_rx.try_recv() { + Ok(res) => match res { + Ok(_) => break, + Err(e) => fatal_error(e.to_string()), + }, + Err(oneshot::error::TryRecvError::Empty) => { + // the channel is empty, keep waiting + } + _ => { + fatal_error("Cannot receive worker pool bootstrap result".to_string()); + return; + } + } + } + info!(status = "done", "worker pool bootstrap"); + memory_usage(config.pool_size); + + // All is good, we can start listening for incoming requests through the + // web server + server::run_server(&config.addr, config.tls_config, api_tx).await; + + // The evaluation is done, we can shutdown the tokio task that is running + // the CallbackHandler + if callback_handler_shutdown_channel_tx.send(()).is_err() { + error!("Cannot shut down the CallbackHandler task"); + } else if let Err(e) = callback_handle.await { + error!( + error = e.to_string().as_str(), + "Error waiting for the CallbackHandler task" + ); + } + }); + + if let Err(e) = wasm_thread.join() { + fatal_error(format!("error while waiting for worker threads: {e:?}")); + }; + + Ok(()) +} + +fn memory_usage(pool_size: usize) { + let process = match procfs::process::Process::myself() { + Ok(p) => p, + Err(e) => { + warn!(error =? e, "cannot access process stats"); + return; + } + }; + let mem_stats = match process.statm() { + Ok(s) => s, + Err(e) => { + warn!(error =? e, "cannot access process memory stats"); + return; + } + }; + + let formatter = humansize::make_format(humansize::DECIMAL); + + let vm_size = mem_stats.size * procfs::page_size(); + let vm_rss = mem_stats.resident * procfs::page_size(); + + debug!( + VmSize = formatter(vm_size), + VmSizeBytes = vm_size, + VmRSS = formatter(vm_rss), + VmRSSBytes = vm_rss, + pool_size, + "memory usage" + ); +} + +// Setup the tracing system. This MUST be done inside of a tokio Runtime +// because some collectors rely on it and would panic otherwise. +fn setup_tracing(log_level: &str, log_fmt: &str, log_no_color: bool) -> Result<()> { + // setup logging + let filter_layer = EnvFilter::new(log_level) + // some of our dependencies generate trace events too, but we don't care about them -> + // let's filter them + .add_directive("cranelift_codegen=off".parse().unwrap()) + .add_directive("cranelift_wasm=off".parse().unwrap()) + .add_directive("h2=off".parse().unwrap()) + .add_directive("hyper=off".parse().unwrap()) + .add_directive("regalloc=off".parse().unwrap()) + .add_directive("tower=off".parse().unwrap()) + .add_directive("wasmtime_cranelift=off".parse().unwrap()) + .add_directive("wasmtime_jit=off".parse().unwrap()); + + match log_fmt { + "json" => tracing_subscriber::registry() + .with(filter_layer) + .with(fmt::layer().json()) + .init(), + "text" => { + let layer = fmt::layer().with_ansi(log_no_color); + + tracing_subscriber::registry() + .with(filter_layer) + .with(layer) + .init() + } + "otlp" => { + // Create a new OpenTelemetry pipeline sending events to a + // OpenTelemetry collector using the OTLP format. + // The collector must run on localhost (eg: use a sidecar inside of k8s) + // using GRPC + let tracer = opentelemetry_otlp::new_pipeline() + .tracing() + .with_exporter(opentelemetry_otlp::new_exporter().tonic()) + .with_trace_config(opentelemetry_sdk::trace::config().with_resource( + opentelemetry_sdk::Resource::new(vec![opentelemetry::KeyValue::new( + "service.name", + config::SERVICE_NAME, + )]), + )) + .install_batch(opentelemetry_sdk::runtime::Tokio)?; + + // Create a tracing layer with the configured tracer + let telemetry = tracing_opentelemetry::layer().with_tracer(tracer); + tracing_subscriber::registry() + .with(filter_layer) + .with(telemetry) + .with(fmt::layer()) + .init() + } + + _ => return Err(anyhow!("Unknown log message format")), + }; + + Ok(()) +} + +pub fn fatal_error(msg: String) { + let trace_system_ready = TRACE_SYSTEM_INITIALIZED.read().unwrap(); + if *trace_system_ready { + error!("{}", msg); + shutdown_tracer_provider(); + } else { + eprintln!("{msg}"); + } + + process::exit(1); +} diff --git a/src/main.rs b/src/main.rs index 27cbac94..89a82160 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,413 +1,10 @@ -extern crate k8s_openapi; -extern crate policy_evaluator; - -use anyhow::{anyhow, Result}; -use lazy_static::lazy_static; -use opentelemetry::global::shutdown_tracer_provider; -use policy_evaluator::policy_fetcher::sigstore; -use policy_evaluator::policy_fetcher::verify::FulcioAndRekorData; -use policy_evaluator::{callback_handler::CallbackHandlerBuilder, kube}; -use std::{fs, path::PathBuf, process, sync::RwLock, thread}; -use tokio::{runtime::Runtime, sync::mpsc, sync::oneshot}; -use tracing::{debug, error, info, warn}; - -mod admission_review; -mod api; mod cli; -mod metrics; -mod raw_review; -mod server; -mod settings; -mod worker; - -mod policy_downloader; -use policy_downloader::Downloader; -mod worker_pool; -use worker_pool::WorkerPool; - -mod communication; -use communication::{EvalRequest, WorkerPoolBootRequest}; - -lazy_static! { - static ref TRACE_SYSTEM_INITIALIZED: RwLock = RwLock::new(false); -} +use anyhow::Result; fn main() -> Result<()> { let matches = cli::build_cli().get_matches(); + let config = policy_server::config::Config::from_args(&matches)?; - // init some variables based on the cli parameters - let addr = cli::api_bind_address(&matches)?; - let (cert_file, key_file) = cli::tls_files(&matches)?; - let policies = cli::policies(&matches)?; - let sources = cli::remote_server_options(&matches)?; - let pool_size = matches - .get_one::("workers") - .map_or_else(num_cpus::get, |v| { - v.parse::() - .expect("error parsing the number of workers") - }); - let always_accept_admission_reviews_on_namespace = matches - .get_one::("always-accept-admission-reviews-on-namespace") - .map(|s| s.to_owned()); - - let metrics_enabled = matches.contains_id("enable-metrics"); - let ignore_kubernetes_connection_failure = - matches.contains_id("ignore-kubernetes-connection-failure"); - let verification_config = cli::verification_config(&matches).unwrap_or_else(|e| { - fatal_error(format!("Cannot create sigstore verification config: {e:?}")); - unreachable!() - }); - let sigstore_cache_dir = matches - .get_one::("sigstore-cache-dir") - .map(PathBuf::from) - .expect("This should not happen, there's a default value for sigstore-cache-dir"); - - let policy_evaluation_limit = if matches.contains_id("disable-timeout-protection") { - None - } else { - match matches - .get_one::("policy-timeout") - .expect("policy-timeout should always be set") - .parse::() - { - Ok(v) => Some(v), - Err(e) => { - fatal_error(format!( - "'policy-timeout' value cannot be converted to unsigned int: {e}" - )); - unreachable!() - } - } - }; - - // Run in daemon mode if specified by the user - if matches.contains_id("daemon") { - println!("Running instance as a daemon"); - - let mut daemonize = daemonize::Daemonize::new().pid_file( - matches - .get_one::("daemon-pid-file") - .expect("pid-file should always have a value"), - ); - if let Some(stdout_file) = matches.get_one::("daemon-stdout-file") { - let file = fs::File::create(stdout_file) - .map_err(|e| anyhow!("Cannot create file for daemon stdout: {}", e))?; - daemonize = daemonize.stdout(file); - } - if let Some(stderr_file) = matches.get_one::("daemon-stderr-file") { - let file = fs::File::create(stderr_file) - .map_err(|e| anyhow!("Cannot create file for daemon stderr: {}", e))?; - daemonize = daemonize.stderr(file); - } - match daemonize.start() { - Ok(_) => println!("Detached from shell, now running in background."), - Err(e) => fatal_error(format!("Something went wrong while daemonizing: {e}")), - } - } - - //////////////////////////////////////////////////////////////////////////// - // // - // Phase 1: setup the CallbackHandler. This is used by the synchronous // - // world (the waPC host_callback) to request the execution of code that // - // can be run only inside of asynchronous world. // - // An example of that, is a policy that changes the container image // - // references to ensure they use immutable shasum instead of tags. // - // The act of retrieving the container image manifest digest requires a // - // network request, which is fulfilled using asynchronous code. // - // // - // The communication between the two worlds happens via tokio channels. // - // // - //////////////////////////////////////////////////////////////////////////// - - // This is a channel used to stop the tokio task that is run - // inside of the CallbackHandler - let (callback_handler_shutdown_channel_tx, callback_handler_shutdown_channel_rx) = - oneshot::channel(); - - let fulcio_and_rekor_data = match sigstore::tuf::SigstoreRepository::fetch(None) { - Ok(repo) => Some(FulcioAndRekorData::FromTufRepository { repo }), - Err(e) => { - // We cannot rely on `tracing` yet, because the tracing system has not - // been initialized, this has to be done inside of an async block, which - // we cannot use yet - eprintln!("Cannot fetch TUF repository: {e:?}"); - eprintln!("Sigstore Verifier created without Fulcio data: keyless signatures are going to be discarded because they cannot be verified"); - eprintln!( - "Sigstore Verifier created without Rekor data: transparency log data won't be used" - ); - eprintln!("Sigstore capabilities are going to be limited"); - None - } - }; - - let mut callback_handler_builder = - CallbackHandlerBuilder::new(callback_handler_shutdown_channel_rx) - .registry_config(sources.clone()) - .fulcio_and_rekor_data(fulcio_and_rekor_data.as_ref()); - - // Attempt to build kube::Client instance, this unfortunately needs an async context - // for a really limited amount of time. - // - // Important: the tokio runtime used to create the `kube::Client` **must** - // be the very same one used later on when the client is used. - let rt = match Runtime::new() { - Ok(r) => r, - Err(error) => { - fatal_error(format!("error initializing tokio runtime: {error}")); - unreachable!(); - } - }; - - let kube_client: Option = rt.block_on(async { - match kube::Client::try_default().await { - Ok(client) => Some(client), - Err(e) => { - // We cannot rely on `tracing` yet, because the tracing system has not - // been initialized yet - eprintln!("Cannot connect to Kubernetes cluster: {e}"); - None - } - } - }); - - match kube_client { - Some(client) => { - callback_handler_builder = callback_handler_builder.kube_client(client); - } - None => { - if ignore_kubernetes_connection_failure { - // We cannot rely on `tracing` yet, because the tracing system has not - // been initialized yet - eprintln!( - "Cannot connect to Kubernetes, context aware policies will not work properly" - ); - } else { - return Err(anyhow!( - "Cannot connect to Kubernetes, context aware policies would not work properly" - )); - } - } - }; - - let mut callback_handler = callback_handler_builder.build()?; - let callback_sender_channel = callback_handler.sender_channel(); - - //////////////////////////////////////////////////////////////////////////// - // // - // Phase 2: setup the Wasm worker pool, this "lives" inside of a // - // dedicated system thread. // - // // - // The communication between the "synchronous world" (aka the Wasm worker // - // pool) and the "asynchronous world" (aka the http server) happens via // - // tokio channels. // - // // - //////////////////////////////////////////////////////////////////////////// - - // This is the channel used by the http server to communicate with the - // Wasm workers - let (api_tx, api_rx) = mpsc::channel::(32); - - // This is the channel used to have the asynchronous code trigger the - // bootstrap of the worker pool. The bootstrap must be triggered - // from within the asynchronous code because some of the tracing collectors - // (e.g. OpenTelemetry) require a tokio::Runtime to be available. - let (worker_pool_bootstrap_req_tx, worker_pool_bootstrap_req_rx) = - oneshot::channel::(); - - // Spawn the system thread that runs the main loop of the worker pool manager - let wasm_thread = thread::spawn(move || { - let worker_pool = WorkerPool::new( - worker_pool_bootstrap_req_rx, - api_rx, - callback_sender_channel, - always_accept_admission_reviews_on_namespace, - policy_evaluation_limit, - ); - worker_pool.run(); - }); - - //////////////////////////////////////////////////////////////////////////// - // // - // Phase 3: setup the asynchronous world. // - // // - // We setup the tokio Runtime manually, instead of relying on the the // - // `tokio::main` macro, because we don't want the "synchronous" world to // - // be spawned inside of one of the threads owned by the runtime. // - // // - //////////////////////////////////////////////////////////////////////////// - - rt.block_on(async { - // Setup the tracing system. This MUST be done inside of a tokio Runtime - // because some collectors rely on it and would panic otherwise. - match cli::setup_tracing(&matches) { - Err(err) => { - fatal_error(err.to_string()); - unreachable!(); - } - Ok(_) => { - debug!("tracing system ready"); - let mut w = TRACE_SYSTEM_INITIALIZED.write().unwrap(); - *w = true; - } - }; - - // The unused variable is required so the meter is not dropped early and - // lives for the whole block lifetime, exporting metrics - let _meter = if metrics_enabled { - Some(metrics::init_meter()) - } else { - None - }; - - // Download policies - let mut downloader = match Downloader::new( - sources, - verification_config.is_some(), - Some(sigstore_cache_dir), - ) - .await - { - Ok(d) => d, - Err(e) => { - fatal_error(e.to_string()); - unreachable!() - } - }; - - let policies_download_dir = matches.get_one::("policies-download-dir").unwrap(); - let fetched_policies = match downloader - .download_policies( - &policies, - policies_download_dir, - verification_config.as_ref(), - ) - .await - { - Ok(fp) => fp, - Err(e) => { - fatal_error(e.to_string()); - unreachable!() - } - }; - - // Spawn the tokio task used by the CallbackHandler - let callback_handle = tokio::spawn(async move { - info!(status = "init", "CallbackHandler task"); - callback_handler.loop_eval().await; - info!(status = "exit", "CallbackHandler task"); - }); - - // Bootstrap the worker pool - info!(status = "init", "worker pool bootstrap"); - let (worker_pool_bootstrap_res_tx, mut worker_pool_bootstrap_res_rx) = - oneshot::channel::>(); - let bootstrap_data = WorkerPoolBootRequest { - policies, - fetched_policies, - pool_size, - resp_chan: worker_pool_bootstrap_res_tx, - }; - if worker_pool_bootstrap_req_tx.send(bootstrap_data).is_err() { - fatal_error("Cannot send bootstrap data to worker pool".to_string()); - } - - // Wait for the worker pool to be fully bootstraped before moving on. - // - // It's really important to NOT start the web server before the workers are ready. - // Our Kubernetes deployment exposes a readiness probe that relies on the web server - // to be listening. The API server will start hitting the policy server as soon as the - // readiness probe marks the instance as ready. - // We don't want Kubernetes API server to send admission reviews before ALL the workers - // are ready. - loop { - match worker_pool_bootstrap_res_rx.try_recv() { - Ok(res) => match res { - Ok(_) => break, - Err(e) => fatal_error(e.to_string()), - }, - Err(oneshot::error::TryRecvError::Empty) => { - // the channel is empty, keep waiting - } - _ => { - fatal_error("Cannot receive worker pool bootstrap result".to_string()); - return; - } - } - } - info!(status = "done", "worker pool bootstrap"); - memory_usage(pool_size); - - // All is good, we can start listening for incoming requests through the - // web server - let tls_config = if cert_file.is_empty() { - None - } else { - Some(crate::server::TlsConfig { - cert_file: cert_file.to_string(), - key_file: key_file.to_string(), - }) - }; - server::run_server(&addr, tls_config, api_tx).await; - - // The evaluation is done, we can shutdown the tokio task that is running - // the CallbackHandler - if callback_handler_shutdown_channel_tx.send(()).is_err() { - error!("Cannot shut down the CallbackHandler task"); - } else if let Err(e) = callback_handle.await { - error!( - error = e.to_string().as_str(), - "Error waiting for the CallbackHandler task" - ); - } - }); - - if let Err(e) = wasm_thread.join() { - fatal_error(format!("error while waiting for worker threads: {e:?}")); - }; - - Ok(()) -} - -fn memory_usage(pool_size: usize) { - let process = match procfs::process::Process::myself() { - Ok(p) => p, - Err(e) => { - warn!(error =? e, "cannot access process stats"); - return; - } - }; - let mem_stats = match process.statm() { - Ok(s) => s, - Err(e) => { - warn!(error =? e, "cannot access process memory stats"); - return; - } - }; - - let formatter = humansize::make_format(humansize::DECIMAL); - - let vm_size = mem_stats.size * procfs::page_size(); - let vm_rss = mem_stats.resident * procfs::page_size(); - - debug!( - VmSize = formatter(vm_size), - VmSizeBytes = vm_size, - VmRSS = formatter(vm_rss), - VmRSSBytes = vm_rss, - pool_size, - "memory usage" - ); -} - -fn fatal_error(msg: String) { - let trace_system_ready = TRACE_SYSTEM_INITIALIZED.read().unwrap(); - if *trace_system_ready { - error!("{}", msg); - shutdown_tracer_provider(); - } else { - eprintln!("{msg}"); - } - - process::exit(1); + policy_server::run(config) } From a35bdee0bf7b2b5496133c7a87d4596acc94b003 Mon Sep 17 00:00:00 2001 From: Fabrizio Sestito Date: Mon, 4 Dec 2023 16:10:09 +0100 Subject: [PATCH 4/4] chore: remove extern crate and reorg imports Signed-off-by: Fabrizio Sestito --- Cargo.lock | 1 + src/lib.rs | 29 ++++++++++++----------------- 2 files changed, 13 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index f3492845..e4075298 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -3733,6 +3733,7 @@ dependencies = [ "percent-encoding", "pin-project-lite", "rustls", + "rustls-native-certs", "rustls-pemfile", "serde", "serde_json", diff --git a/src/lib.rs b/src/lib.rs index 3011945d..62831d48 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,8 +1,15 @@ -extern crate k8s_openapi; -extern crate policy_evaluator; +mod admission_review; +mod api; +mod communication; +pub mod config; +mod metrics; +mod policy_downloader; +mod raw_review; +mod server; +mod worker; +mod worker_pool; use anyhow::{anyhow, Result}; -use config::Config; use lazy_static::lazy_static; use opentelemetry::global::shutdown_tracer_provider; use policy_evaluator::policy_fetcher::sigstore; @@ -15,23 +22,11 @@ use tracing::{debug, error, info, warn}; use tracing_subscriber::prelude::*; use tracing_subscriber::{fmt, EnvFilter}; -mod admission_review; -mod api; -pub mod config; -mod metrics; -mod raw_review; -mod server; -mod worker; - -mod policy_downloader; +use communication::{EvalRequest, WorkerPoolBootRequest}; +use config::Config; use policy_downloader::Downloader; - -mod worker_pool; use worker_pool::WorkerPool; -mod communication; -use communication::{EvalRequest, WorkerPoolBootRequest}; - lazy_static! { static ref TRACE_SYSTEM_INITIALIZED: RwLock = RwLock::new(false); }