From 11165f8b19af56563c1571b6763a88bfca749e6c Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Mon, 13 May 2024 17:21:56 +0900 Subject: [PATCH] Making ingest timeout configurable. --- .../quickwit-ingest/src/ingest_v2/router.rs | 28 +++++++++++++------ 1 file changed, 19 insertions(+), 9 deletions(-) diff --git a/quickwit/quickwit-ingest/src/ingest_v2/router.rs b/quickwit/quickwit-ingest/src/ingest_v2/router.rs index 59fc86119ae..ba78f4585db 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/router.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/router.rs @@ -19,7 +19,7 @@ use std::collections::{HashMap, HashSet}; use std::fmt; -use std::sync::{Arc, Weak}; +use std::sync::{Arc, OnceLock, Weak}; use std::time::Duration; use async_trait::async_trait; @@ -53,11 +53,21 @@ use super::IngesterPool; use crate::{get_ingest_router_buffer_size, LeaderId}; /// Duration after which ingest requests time out with [`IngestV2Error::Timeout`]. -pub(super) const INGEST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) { - Duration::from_millis(10) -} else { - Duration::from_secs(35) -}; +fn ingest_request_timeout() -> Duration { + const DEFAULT_INGEST_REQUEST_TIMEOUT: Duration = if cfg!(any(test, feature = "testsuite")) { + Duration::from_millis(10) + } else { + Duration::from_secs(35) + }; + static TIMEOUT: OnceLock = OnceLock::new(); + *TIMEOUT.get_or_init(|| { + let duration_ms = quickwit_common::get_from_env( + "QW_INGEST_REQUEST_TIMEOUT_MS", + DEFAULT_INGEST_REQUEST_TIMEOUT.as_millis() as u64, + ); + Duration::from_millis(duration_ms) + }) +} const MAX_PERSIST_ATTEMPTS: usize = 5; @@ -435,8 +445,8 @@ impl IngestRouter { .await .map_err(|_| { let message = format!( - "ingest request timed out after {} seconds", - INGEST_REQUEST_TIMEOUT.as_secs() + "ingest request timed out after {} millis", + timeout_duration.as_millis() ); IngestV2Error::Timeout(message) })? @@ -460,7 +470,7 @@ impl IngestRouterService for IngestRouter { .try_acquire_many_owned(request_size_bytes as u32) .map_err(|_| IngestV2Error::TooManyRequests)?; - self.ingest_timeout(ingest_request, INGEST_REQUEST_TIMEOUT) + self.ingest_timeout(ingest_request, ingest_request_timeout()) .await } }