From 862a7d86a7ed878f2ea32ab80286f1e4f74ba8e3 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 5 Mar 2024 12:39:11 +0900 Subject: [PATCH] Added a handler to dynamically change the log level. (#4662) * Added a handler to dynamically change the log level. Usage example: just visit. http://localhost:7280/api/v1/log_level?filter=debug,hyper=debug,tantivy=info,quickwit_serve=debug * CR comment and compilation fix --- quickwit/quickwit-cli/src/cli.rs | 5 +- quickwit/quickwit-cli/src/logger.rs | 16 +++-- quickwit/quickwit-cli/src/main.rs | 5 +- quickwit/quickwit-cli/src/service.rs | 5 +- quickwit/quickwit-cli/tests/helpers.rs | 5 +- .../src/test_utils/cluster_sandbox.rs | 1 + quickwit/quickwit-serve/src/lib.rs | 11 ++++ .../quickwit-serve/src/log_level_handler.rs | 61 +++++++++++++++++++ quickwit/quickwit-serve/src/rest.rs | 5 ++ 9 files changed, 102 insertions(+), 12 deletions(-) create mode 100644 quickwit/quickwit-serve/src/log_level_handler.rs diff --git a/quickwit/quickwit-cli/src/cli.rs b/quickwit/quickwit-cli/src/cli.rs index a4688677df9..0bd2e7123ca 100644 --- a/quickwit/quickwit-cli/src/cli.rs +++ b/quickwit/quickwit-cli/src/cli.rs @@ -19,6 +19,7 @@ use anyhow::{bail, Context}; use clap::{arg, Arg, ArgAction, ArgMatches, Command}; +use quickwit_serve::EnvFilterReloadFn; use tracing::Level; use crate::index::{build_index_command, IndexCliCommand}; @@ -90,10 +91,10 @@ impl CliCommand { } } - pub async fn execute(self) -> anyhow::Result<()> { + pub async fn execute(self, env_filter_reload_fn: EnvFilterReloadFn) -> anyhow::Result<()> { match self { CliCommand::Index(subcommand) => subcommand.execute().await, - CliCommand::Run(subcommand) => subcommand.execute().await, + CliCommand::Run(subcommand) => subcommand.execute(env_filter_reload_fn).await, CliCommand::Source(subcommand) => subcommand.execute().await, CliCommand::Split(subcommand) => subcommand.execute().await, CliCommand::Tool(subcommand) => subcommand.execute().await, diff --git a/quickwit/quickwit-cli/src/logger.rs b/quickwit/quickwit-cli/src/logger.rs index 4fbcd1cad43..ca22fc725ff 100644 --- a/quickwit/quickwit-cli/src/logger.rs +++ b/quickwit/quickwit-cli/src/logger.rs @@ -18,6 +18,7 @@ // along with this program. If not, see . use std::env; +use std::sync::Arc; use anyhow::Context; use opentelemetry::sdk::propagation::TraceContextPropagator; @@ -25,7 +26,7 @@ use opentelemetry::sdk::trace::BatchConfig; use opentelemetry::sdk::{trace, Resource}; use opentelemetry::{global, KeyValue}; use opentelemetry_otlp::WithExportConfig; -use quickwit_serve::BuildInfo; +use quickwit_serve::{BuildInfo, EnvFilterReloadFn}; use tracing::Level; use tracing_subscriber::fmt::time::UtcTime; use tracing_subscriber::prelude::*; @@ -39,12 +40,12 @@ pub fn setup_logging_and_tracing( level: Level, ansi_colors: bool, build_info: &BuildInfo, -) -> anyhow::Result<()> { +) -> anyhow::Result { #[cfg(feature = "tokio-console")] { if std::env::var_os(QW_ENABLE_TOKIO_CONSOLE_ENV_KEY).is_some() { console_subscriber::init(); - return Ok(()); + return Ok(quickwit_serve::do_nothing_env_filter_reload_fn()); } } let env_filter = env::var("RUST_LOG") @@ -52,7 +53,8 @@ pub fn setup_logging_and_tracing( .or_else(|_| EnvFilter::try_new(format!("quickwit={level},tantivy=WARN"))) .context("failed to set up tracing env filter")?; global::set_text_map_propagator(TraceContextPropagator::new()); - let registry = tracing_subscriber::registry().with(env_filter); + let (reloadable_env_filter, reload_handle) = tracing_subscriber::reload::Layer::new(env_filter); + let registry = tracing_subscriber::registry().with(reloadable_env_filter); let event_format = tracing_subscriber::fmt::format() .with_target(true) .with_timer( @@ -102,5 +104,9 @@ pub fn setup_logging_and_tracing( .try_init() .context("failed to register tracing subscriber")?; } - Ok(()) + Ok(Arc::new(move |env_filter_def: &str| { + let new_env_filter = EnvFilter::try_new(env_filter_def)?; + reload_handle.reload(new_env_filter)?; + Ok(()) + })) } diff --git a/quickwit/quickwit-cli/src/main.rs b/quickwit/quickwit-cli/src/main.rs index ee868127b85..14f266668b2 100644 --- a/quickwit/quickwit-cli/src/main.rs +++ b/quickwit/quickwit-cli/src/main.rs @@ -60,8 +60,9 @@ async fn main_impl() -> anyhow::Result<()> { start_jemalloc_metrics_loop(); let build_info = BuildInfo::get(); - setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?; - let return_code: i32 = if let Err(err) = command.execute().await { + let env_filter_reload_fn = + setup_logging_and_tracing(command.default_log_level(), ansi_colors, build_info)?; + let return_code: i32 = if let Err(err) = command.execute(env_filter_reload_fn).await { eprintln!("{} command failed: {:?}\n", "✘".color(RED_COLOR), err); 1 } else { diff --git a/quickwit/quickwit-cli/src/service.rs b/quickwit/quickwit-cli/src/service.rs index b75ab854eed..b23dc9d8454 100644 --- a/quickwit/quickwit-cli/src/service.rs +++ b/quickwit/quickwit-cli/src/service.rs @@ -27,7 +27,7 @@ use quickwit_common::runtimes::RuntimesConfig; use quickwit_common::uri::{Protocol, Uri}; use quickwit_config::service::QuickwitService; use quickwit_config::NodeConfig; -use quickwit_serve::{serve_quickwit, BuildInfo}; +use quickwit_serve::{serve_quickwit, BuildInfo, EnvFilterReloadFn}; use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent}; use tokio::signal; use tracing::{debug, info}; @@ -74,7 +74,7 @@ impl RunCliCommand { }) } - pub async fn execute(&self) -> anyhow::Result<()> { + pub async fn execute(&self, env_filter_reload_fn: EnvFilterReloadFn) -> anyhow::Result<()> { debug!(args = ?self, "run-service"); let version_text = BuildInfo::get_version_text(); info!("quickwit version: {version_text}"); @@ -115,6 +115,7 @@ impl RunCliCommand { metastore_resolver, storage_resolver, shutdown_signal, + env_filter_reload_fn, ) .await; let return_code = match serve_result { diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index 6c8228e937c..6b515c9d04f 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -158,7 +158,10 @@ impl TestEnv { services: Some(QuickwitService::supported_services()), }; tokio::spawn(async move { - if let Err(error) = run_command.execute().await { + if let Err(error) = run_command + .execute(quickwit_serve::do_nothing_env_filter_reload_fn()) + .await + { error!(err=?error, "failed to start a quickwit server"); } }); diff --git a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs index 738cd46515b..8d6c183547b 100644 --- a/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs +++ b/quickwit/quickwit-integration-tests/src/test_utils/cluster_sandbox.rs @@ -164,6 +164,7 @@ impl ClusterSandbox { metastore_resolver, storage_resolver, shutdown_signal, + quickwit_serve::do_nothing_env_filter_reload_fn(), ) .await?; Result::<_, anyhow::Error>::Ok(result) diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index e4e32f0df4b..7d3b57defac 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -30,6 +30,7 @@ mod index_api; mod indexing_api; mod ingest_api; mod jaeger_api; +mod log_level_handler; mod metrics; mod metrics_api; mod node_info_handler; @@ -131,6 +132,12 @@ const METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY: &str = "QW_METASTORE_CLIENT_MAX_ const DEFAULT_METASTORE_CLIENT_MAX_CONCURRENCY: usize = 6; const DISABLE_DELETE_TASK_SERVICE_ENV_KEY: &str = "QW_DISABLE_DELETE_TASK_SERVICE"; +pub type EnvFilterReloadFn = Arc anyhow::Result<()> + Send + Sync>; + +pub fn do_nothing_env_filter_reload_fn() -> EnvFilterReloadFn { + Arc::new(|_| Ok(())) +} + fn get_metastore_client_max_concurrency() -> usize { std::env::var(METASTORE_CLIENT_MAX_CONCURRENCY_ENV_KEY).ok() .and_then(|metastore_client_max_concurrency_str| { @@ -187,6 +194,8 @@ struct QuickwitServices { /// the root requests. pub search_service: Arc, + pub env_filter_reload_fn: EnvFilterReloadFn, + /// The control plane listens to various events. /// We must maintain a reference to the subscription handles to continue receiving /// notifications. Otherwise, the subscriptions are dropped. @@ -359,6 +368,7 @@ pub async fn serve_quickwit( metastore_resolver: MetastoreResolver, storage_resolver: StorageResolver, shutdown_signal: BoxFutureInfaillible<()>, + env_filter_reload_fn: EnvFilterReloadFn, ) -> anyhow::Result> { let cluster = start_cluster_service(&node_config).await?; @@ -627,6 +637,7 @@ pub async fn serve_quickwit( otlp_logs_service_opt, otlp_traces_service_opt, search_service, + env_filter_reload_fn, }); // Setup and start gRPC server. let (grpc_readiness_trigger_tx, grpc_readiness_signal_rx) = oneshot::channel::<()>(); diff --git a/quickwit/quickwit-serve/src/log_level_handler.rs b/quickwit/quickwit-serve/src/log_level_handler.rs new file mode 100644 index 00000000000..e3742e79201 --- /dev/null +++ b/quickwit/quickwit-serve/src/log_level_handler.rs @@ -0,0 +1,61 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use hyper::StatusCode; +use serde::Deserialize; +use tracing::{error, info}; +use warp::{Filter, Rejection}; + +use crate::{with_arg, EnvFilterReloadFn}; + +#[derive(Deserialize)] +struct EnvFilter { + filter: String, +} + +#[utoipa::path(get, tag = "Log level", path = "/log_level")] +pub fn log_level_handler( + env_filter_reload_fn: EnvFilterReloadFn, +) -> impl warp::Filter + Clone { + warp::path("log_level") + .and(warp::get().or(warp::post()).unify()) + .and(warp::path::end()) + .and(with_arg(env_filter_reload_fn)) + .and(warp::query::()) + .then( + |env_filter_reload_fn: EnvFilterReloadFn, env_filter: EnvFilter| async move { + match env_filter_reload_fn(env_filter.filter.as_str()) { + Ok(_) => { + info!(filter = env_filter.filter, "change log level"); + warp::reply::with_status( + format!("changed log level to:[{}]", env_filter.filter), + StatusCode::OK, + ) + } + Err(_) => { + error!(filter = env_filter.filter, "invalid log level"); + warp::reply::with_status( + format!("invalid log level:[{}]", env_filter.filter), + StatusCode::BAD_REQUEST, + ) + } + } + }, + ) +} diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 71036164953..d41578e6935 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -42,6 +42,7 @@ use crate::index_api::index_management_handlers; use crate::indexing_api::indexing_get_handler; use crate::ingest_api::ingest_api_handlers; use crate::jaeger_api::jaeger_api_handlers; +use crate::log_level_handler::log_level_handler; use crate::metrics_api::metrics_handler; use crate::node_info_handler::node_info_handler; use crate::otlp_api::otlp_ingest_api_handlers; @@ -172,6 +173,9 @@ fn api_v1_routes( RuntimeInfo::get(), quickwit_services.node_config.clone(), )) + .or(log_level_handler( + quickwit_services.env_filter_reload_fn.clone(), + )) .or(indexing_get_handler( quickwit_services.indexing_service_opt.clone(), )) @@ -630,6 +634,7 @@ mod tests { node_config: Arc::new(node_config.clone()), search_service: Arc::new(MockSearchService::new()), jaeger_service_opt: None, + env_filter_reload_fn: crate::do_nothing_env_filter_reload_fn(), }; let handler = api_v1_routes(Arc::new(quickwit_services))