From 333fc9088ca9bc71e945ca93cc0c78443876098f Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 31 Jan 2024 12:08:06 +0100 Subject: [PATCH 1/3] introduce rate limited variant of tracing::* macros --- quickwit/quickwit-common/src/lib.rs | 1 + .../src/rate_limited_tracing.rs | 99 +++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 quickwit/quickwit-common/src/rate_limited_tracing.rs diff --git a/quickwit/quickwit-common/src/lib.rs b/quickwit/quickwit-common/src/lib.rs index 538855c1c64..46ec9c0401b 100644 --- a/quickwit/quickwit-common/src/lib.rs +++ b/quickwit/quickwit-common/src/lib.rs @@ -31,6 +31,7 @@ mod path_hasher; mod progress; pub mod pubsub; pub mod rand; +pub mod rate_limited_tracing; pub mod rate_limiter; pub mod rendezvous_hasher; pub mod retry; diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs new file mode 100644 index 00000000000..1aa7bb27d5c --- /dev/null +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -0,0 +1,99 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +#[macro_export] +macro_rules! rate_limited_tracing { + ($log_fn:ident, limit=$limit:literal, $($args:tt)*) => {{ + use ::std::sync::atomic::{AtomicU32, Ordering}; + use ::std::sync::Mutex; + use ::std::time::{Instant, Duration}; + + static COUNT: AtomicU32 = AtomicU32::new(0); + // we can't build an Instant from const context, so we pinitialize with a None + static LAST_RESET: Mutex> = Mutex::new(None); + + let count = COUNT.fetch_add(1, Ordering::Relaxed); + if count == 0 { + // this can only be reached the very 1st time we log + *LAST_RESET.lock().unwrap() = Some(Instant::now()); + } + + let do_log = if count >= $limit { + let mut last_reset = LAST_RESET.lock().unwrap(); + let current_time = Instant::now(); + let should_reset = last_reset + .map(|last_reset| current_time.duration_since(last_reset) >= Duration::from_secs(60)) + .unwrap_or(true); + + if should_reset { + *last_reset = Some(current_time); + // we store 1 because we are already about to log something + COUNT.store(1, Ordering::Relaxed); + true + } else { + // we are over-limit and not far enough in time to reset: don't log + false + } + } else { + true + }; + + if do_log { + ::tracing::$log_fn!($($args)*); + } + }}; +} + +#[macro_export] +macro_rules! rate_limited_trace { + (limit=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(trace, limit=$limit, $($args)*) + }; +} +#[macro_export] +macro_rules! rate_limited_debug { + (limit=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(debug, limit=$limit, $($args)*) + }; +} +#[macro_export] +macro_rules! rate_limited_info { + (limit=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(info, limit=$limit, $($args)*) + }; +} +#[macro_export] +macro_rules! rate_limited_warn { + (limit=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(warn, limit=$limit, $($args)*) + }; +} +#[macro_export] +macro_rules! rate_limited_error { + (limit=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(error, limit=$limit, $($args)*) + }; +} + +#[doc(hidden)] +pub use rate_limited_tracing; +pub use { + rate_limited_debug as debug, rate_limited_error as error, rate_limited_info as info, + rate_limited_trace as trace, rate_limited_warn as warn, +}; From 5ce1cc639ece54c901357822c1a00127757bfec9 Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 31 Jan 2024 12:13:19 +0100 Subject: [PATCH 2/3] use rate limited warn in doc processor --- quickwit/quickwit-common/src/rate_limited_tracing.rs | 4 ++-- quickwit/quickwit-indexing/src/actors/doc_processor.rs | 5 +++-- 2 files changed, 5 insertions(+), 4 deletions(-) diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs index 1aa7bb27d5c..bb961c3136e 100644 --- a/quickwit/quickwit-common/src/rate_limited_tracing.rs +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -94,6 +94,6 @@ macro_rules! rate_limited_error { #[doc(hidden)] pub use rate_limited_tracing; pub use { - rate_limited_debug as debug, rate_limited_error as error, rate_limited_info as info, - rate_limited_trace as trace, rate_limited_warn as warn, + rate_limited_debug, rate_limited_error, rate_limited_info, rate_limited_trace, + rate_limited_warn, }; diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 2980e844c6b..53341bf2f5d 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -25,6 +25,7 @@ use anyhow::{bail, Context}; use async_trait::async_trait; use bytes::Bytes; use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity}; +use quickwit_common::rate_limited_tracing::rate_limited_warn; use quickwit_common::runtimes::RuntimeType; use quickwit_config::{SourceInputFormat, TransformConfig}; use quickwit_doc_mapper::{DocMapper, DocParsingError, JsonObject}; @@ -37,7 +38,6 @@ use tantivy::schema::{Field, Value}; use tantivy::{DateTime, TantivyDocument}; use thiserror::Error; use tokio::runtime::Handle; -use tracing::warn; #[cfg(feature = "vrl")] use super::vrl_processing::*; @@ -413,7 +413,8 @@ impl DocProcessor { processed_docs.push(processed_doc); } Err(error) => { - warn!( + rate_limited_warn!( + limit = 5, index_id = self.counters.index_id, source_id = self.counters.source_id, "{}", From 7796542aa3833622c771b086d2ac54e77b6caecc Mon Sep 17 00:00:00 2001 From: trinity-1686a Date: Wed, 31 Jan 2024 15:55:27 +0100 Subject: [PATCH 3/3] change keyword to be more explicit about unit --- .../src/rate_limited_tracing.rs | 22 +++++++++---------- .../src/actors/doc_processor.rs | 2 +- 2 files changed, 12 insertions(+), 12 deletions(-) diff --git a/quickwit/quickwit-common/src/rate_limited_tracing.rs b/quickwit/quickwit-common/src/rate_limited_tracing.rs index bb961c3136e..0fab23af045 100644 --- a/quickwit/quickwit-common/src/rate_limited_tracing.rs +++ b/quickwit/quickwit-common/src/rate_limited_tracing.rs @@ -19,7 +19,7 @@ #[macro_export] macro_rules! rate_limited_tracing { - ($log_fn:ident, limit=$limit:literal, $($args:tt)*) => {{ + ($log_fn:ident, limit_per_min=$limit:literal, $($args:tt)*) => {{ use ::std::sync::atomic::{AtomicU32, Ordering}; use ::std::sync::Mutex; use ::std::time::{Instant, Duration}; @@ -62,32 +62,32 @@ macro_rules! rate_limited_tracing { #[macro_export] macro_rules! rate_limited_trace { - (limit=$limit:literal, $($args:tt)*) => { - $crate::rate_limited_tracing::rate_limited_tracing!(trace, limit=$limit, $($args)*) + ($unit:ident=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(trace, $unit=$limit, $($args)*) }; } #[macro_export] macro_rules! rate_limited_debug { - (limit=$limit:literal, $($args:tt)*) => { - $crate::rate_limited_tracing::rate_limited_tracing!(debug, limit=$limit, $($args)*) + ($unit:ident=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(debug, $unit=$limit, $($args)*) }; } #[macro_export] macro_rules! rate_limited_info { - (limit=$limit:literal, $($args:tt)*) => { - $crate::rate_limited_tracing::rate_limited_tracing!(info, limit=$limit, $($args)*) + ($unit:ident=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(info, $unit=$limit, $($args)*) }; } #[macro_export] macro_rules! rate_limited_warn { - (limit=$limit:literal, $($args:tt)*) => { - $crate::rate_limited_tracing::rate_limited_tracing!(warn, limit=$limit, $($args)*) + ($unit:ident=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(warn, $unit=$limit, $($args)*) }; } #[macro_export] macro_rules! rate_limited_error { - (limit=$limit:literal, $($args:tt)*) => { - $crate::rate_limited_tracing::rate_limited_tracing!(error, limit=$limit, $($args)*) + ($unit:literal=$limit:literal, $($args:tt)*) => { + $crate::rate_limited_tracing::rate_limited_tracing!(error, $unit=$limit, $($args)*) }; } diff --git a/quickwit/quickwit-indexing/src/actors/doc_processor.rs b/quickwit/quickwit-indexing/src/actors/doc_processor.rs index 53341bf2f5d..9df96a55a08 100644 --- a/quickwit/quickwit-indexing/src/actors/doc_processor.rs +++ b/quickwit/quickwit-indexing/src/actors/doc_processor.rs @@ -414,7 +414,7 @@ impl DocProcessor { } Err(error) => { rate_limited_warn!( - limit = 5, + limit_per_min = 5, index_id = self.counters.index_id, source_id = self.counters.source_id, "{}",