diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 538855c1c64..46ec9c0401b 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -31,6 +31,7 @@ mod path_hasher; mod progress; pub mod pubsub; pub mod rand; +pub mod rate_limited_tracing; pub mod rate_limiter; pub mod rendezvous_hasher; pub mod retry; diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs new file mode 100644 index 00000000000..0fab23af045 --- /dev/null +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -0,0 +1,99 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#[macro_export] +macro_rules! rate_limited_tracing { + ($log_fn:ident, limit_per_min=$limit:literal, $($args:tt)*) => {{ + use ::std::sync::atomic::{AtomicU32, Ordering}; + use ::std::sync::Mutex; + use ::std::time::{Instant, Duration}; + + static COUNT: AtomicU32 = AtomicU32::new(0); + // we can't build an Instant from const context, so we pinitialize with a None + static LAST_RESET: Mutex> = Mutex::new(None); + + let count = COUNT.fetch_add(1, Ordering::Relaxed); + if count == 0 { + // this can only be reached the very 1st time we log + *LAST_RESET.lock().unwrap() = Some(Instant::now()); + } + + let do_log = if count >= $limit { + let mut last_reset = LAST_RESET.lock().unwrap(); + let current_time = Instant::now(); + let should_reset = last_reset + .map(|last_reset| current_time.duration_since(last_reset) >= Duration::from_secs(60)) + .unwrap_or(true); + + if should_reset { + *last_reset = Some(current_time); + // we store 1 because we are already about to log something + COUNT.store(1, Ordering::Relaxed); + true + } else { + // we are over-limit and not far enough in time to reset: don't log + false + } + } else { + true + }; + + if do_log { + ::tracing::$log_fn!($($args)*); + } + }}; +} + +#[macro_export] +macro_rules! rate_limited_trace { + ($unit:ident=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(trace, $unit=$limit, $($args)*) + }; +} +#[macro_export] +macro_rules! rate_limited_debug { + ($unit:ident=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(debug, $unit=$limit, $($args)*) + }; +} +#[macro_export] +macro_rules! rate_limited_info { + ($unit:ident=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(info, $unit=$limit, $($args)*) + }; +} +#[macro_export] +macro_rules! rate_limited_warn { + ($unit:ident=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(warn, $unit=$limit, $($args)*) + }; +} +#[macro_export] +macro_rules! rate_limited_error { + ($unit:literal=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(error, $unit=$limit, $($args)*) + }; +} + +#[doc(hidden)] +pub use rate_limited_tracing; +pub use { + rate_limited_debug, rate_limited_error, rate_limited_info, rate_limited_trace, + rate_limited_warn, +}; diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 2980e844c6b..9df96a55a08 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -25,6 +25,7 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use bytes::Bytes; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::rate_limited_tracing::rate_limited_warn; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceInputFormat, TransformConfig}; use quickwit_doc_mapper::{DocMapper, DocParsingError, JsonObject}; @@ -37,7 +38,6 @@ use tantivy::schema::{Field, Value}; use tantivy::{DateTime, TantivyDocument}; use thiserror::Error; use tokio::runtime::Handle; -use tracing::warn; #[cfg(feature = "vrl")] use super::vrl_processing::*; @@ -413,7 +413,8 @@ impl DocProcessor { processed_docs.push(processed_doc); } Err(error) => { - warn!( + rate_limited_warn!( + limit_per_min = 5, index_id = self.counters.index_id, source_id = self.counters.source_id, "{}",