Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

rate limit some logs #4483

Merged
merged 4 commits into from
Jan 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/quickwit-common/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ mod path_hasher;
mod progress;
pub mod pubsub;
pub mod rand;
pub mod rate_limited_tracing;
pub mod rate_limiter;
pub mod rendezvous_hasher;
pub mod retry;
Expand Down
99 changes: 99 additions & 0 deletions quickwit/quickwit-common/src/rate_limited_tracing.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

#[macro_export]
macro_rules! rate_limited_tracing {
($log_fn:ident, limit_per_min=$limit:literal, $($args:tt)*) => {{
use ::std::sync::atomic::{AtomicU32, Ordering};
use ::std::sync::Mutex;
use ::std::time::{Instant, Duration};

static COUNT: AtomicU32 = AtomicU32::new(0);
// we can't build an Instant from const context, so we pinitialize with a None
static LAST_RESET: Mutex<Option<Instant>> = Mutex::new(None);

let count = COUNT.fetch_add(1, Ordering::Relaxed);
if count == 0 {
// this can only be reached the very 1st time we log
*LAST_RESET.lock().unwrap() = Some(Instant::now());
}

let do_log = if count >= $limit {
let mut last_reset = LAST_RESET.lock().unwrap();
let current_time = Instant::now();
let should_reset = last_reset
.map(|last_reset| current_time.duration_since(last_reset) >= Duration::from_secs(60))
.unwrap_or(true);

if should_reset {
*last_reset = Some(current_time);
// we store 1 because we are already about to log something
COUNT.store(1, Ordering::Relaxed);
true
} else {
// we are over-limit and not far enough in time to reset: don't log
false
}
} else {
true
};

if do_log {
::tracing::$log_fn!($($args)*);
}
}};
}

#[macro_export]
macro_rules! rate_limited_trace {
($unit:ident=$limit:literal, $($args:tt)*) => {
$crate::rate_limited_tracing::rate_limited_tracing!(trace, $unit=$limit, $($args)*)
};
}
#[macro_export]
macro_rules! rate_limited_debug {
($unit:ident=$limit:literal, $($args:tt)*) => {
$crate::rate_limited_tracing::rate_limited_tracing!(debug, $unit=$limit, $($args)*)
};
}
#[macro_export]
macro_rules! rate_limited_info {
($unit:ident=$limit:literal, $($args:tt)*) => {
$crate::rate_limited_tracing::rate_limited_tracing!(info, $unit=$limit, $($args)*)
};
}
#[macro_export]
macro_rules! rate_limited_warn {
($unit:ident=$limit:literal, $($args:tt)*) => {
$crate::rate_limited_tracing::rate_limited_tracing!(warn, $unit=$limit, $($args)*)
};
}
#[macro_export]
macro_rules! rate_limited_error {
($unit:literal=$limit:literal, $($args:tt)*) => {
$crate::rate_limited_tracing::rate_limited_tracing!(error, $unit=$limit, $($args)*)
};
}

#[doc(hidden)]
pub use rate_limited_tracing;
pub use {
rate_limited_debug, rate_limited_error, rate_limited_info, rate_limited_trace,
rate_limited_warn,
};
5 changes: 3 additions & 2 deletions quickwit/quickwit-indexing/src/actors/doc_processor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use anyhow::{bail, Context};
use async_trait::async_trait;
use bytes::Bytes;
use quickwit_actors::{Actor, ActorContext, ActorExitStatus, Handler, Mailbox, QueueCapacity};
use quickwit_common::rate_limited_tracing::rate_limited_warn;
use quickwit_common::runtimes::RuntimeType;
use quickwit_config::{SourceInputFormat, TransformConfig};
use quickwit_doc_mapper::{DocMapper, DocParsingError, JsonObject};
Expand All @@ -37,7 +38,6 @@ use tantivy::schema::{Field, Value};
use tantivy::{DateTime, TantivyDocument};
use thiserror::Error;
use tokio::runtime::Handle;
use tracing::warn;

#[cfg(feature = "vrl")]
use super::vrl_processing::*;
Expand Down Expand Up @@ -413,7 +413,8 @@ impl DocProcessor {
processed_docs.push(processed_doc);
}
Err(error) => {
warn!(
rate_limited_warn!(
limit_per_min = 5,
index_id = self.counters.index_id,
source_id = self.counters.source_id,
"{}",
Expand Down
Loading