From 5b904281e780e3ea9c632c3e2f518a439e8630a2 Mon Sep 17 00:00:00 2001 From: Paul Masurel Date: Tue, 4 Jun 2024 11:48:06 +0900 Subject: [PATCH] Fix indexing regression by disabling payload compression (#5064) This disables default compression. After this PR, compression is only enabled if the `QW_MINIMUM_COMPRESSION_SIZE` environment variable is set. This PR also enables ZSTD compression, and sets the compression quality to "Fast". Closes #5049 Co-authored-by: Remi Dettai --- quickwit/Cargo.lock | 2 ++ quickwit/Cargo.toml | 6 +++- quickwit/quickwit-serve/src/rest.rs | 44 ++++++++++++++++++++++++----- 3 files changed, 44 insertions(+), 8 deletions(-) diff --git a/quickwit/Cargo.lock b/quickwit/Cargo.lock index 0700fdda797..0d58253ef34 100644 --- a/quickwit/Cargo.lock +++ b/quickwit/Cargo.lock @@ -269,6 +269,8 @@ dependencies = [ "memchr", "pin-project-lite", "tokio", + "zstd 0.13.1", + "zstd-safe 7.1.0", ] [[package]] diff --git a/quickwit/Cargo.toml b/quickwit/Cargo.toml index 0367b2e6fc4..0f1778a2309 100644 --- a/quickwit/Cargo.toml +++ b/quickwit/Cargo.toml @@ -244,7 +244,11 @@ tower = { version = "0.4.13", features = [ "retry", "util", ] } -tower-http = { version = "0.4.0", features = ["compression-gzip", "cors"] } +tower-http = { version = "0.4.0", features = [ + "compression-zstd", + "compression-gzip", + "cors", +] } tracing = "0.1.37" tracing-opentelemetry = "0.20.0" tracing-subscriber = { version = "0.3.16", features = [ diff --git a/quickwit/quickwit-serve/src/rest.rs b/quickwit/quickwit-serve/src/rest.rs index 85f80eb9325..c3e90550865 100644 --- a/quickwit/quickwit-serve/src/rest.rs +++ b/quickwit/quickwit-serve/src/rest.rs @@ -26,7 +26,7 @@ use hyper::{http, Method, StatusCode}; use quickwit_common::tower::BoxFutureInfaillible; use tower::make::Shared; use tower::ServiceBuilder; -use tower_http::compression::predicate::{DefaultPredicate, Predicate, SizeAbove}; +use tower_http::compression::predicate::{NotForContentType, Predicate, SizeAbove}; use tower_http::compression::CompressionLayer; use tower_http::cors::CorsLayer; use tracing::{error, info}; @@ -52,10 +52,6 @@ use crate::template_api::index_template_api_handlers; use crate::ui_handler::ui_handler; use crate::{BodyFormat, BuildInfo, QuickwitServices, RuntimeInfo}; -/// The minimum size a response body must be in order to -/// be automatically compressed with gzip. -const MINIMUM_RESPONSE_COMPRESSION_SIZE: u16 = 10 << 10; - #[derive(Debug)] pub(crate) struct InvalidJsonRequest(pub serde_json::Error); @@ -88,6 +84,39 @@ impl std::fmt::Display for InternalError { } } +/// Env variable key to define the minimum size above which a response should be compressed. +/// If unset, no compression is applied. +const QW_MINIMUM_COMPRESSION_SIZE_KEY: &str = "QW_MINIMUM_COMPRESSION_SIZE"; + +#[derive(Clone, Copy)] +struct CompressionPredicate { + size_above_opt: Option, +} + +impl CompressionPredicate { + fn from_env() -> CompressionPredicate { + let minimum_compression_size_opt: Option = quickwit_common::get_from_env_opt::( + QW_MINIMUM_COMPRESSION_SIZE_KEY, + ) + .map(|minimum_compression_size: usize| { + u16::try_from(minimum_compression_size).unwrap_or(u16::MAX) + }); + let size_above_opt = minimum_compression_size_opt.map(SizeAbove::new); + CompressionPredicate { size_above_opt } + } +} + +impl Predicate for CompressionPredicate { + fn should_compress(&self, response: &http::Response) -> bool + where B: hyper::body::HttpBody { + if let Some(size_above) = self.size_above_opt { + size_above.should_compress(response) + } else { + false + } + } +} + /// Starts REST services. pub(crate) async fn start_rest_server( rest_listen_addr: SocketAddr, @@ -158,14 +187,15 @@ pub(crate) async fn start_rest_server( .boxed(); let warp_service = warp::service(rest_routes); - let compression_predicate = - DefaultPredicate::new().and(SizeAbove::new(MINIMUM_RESPONSE_COMPRESSION_SIZE)); + let compression_predicate = CompressionPredicate::from_env().and(NotForContentType::IMAGES); let cors = build_cors(&quickwit_services.node_config.rest_config.cors_allow_origins); let service = ServiceBuilder::new() .layer( CompressionLayer::new() + .zstd(true) .gzip(true) + .quality(tower_http::CompressionLevel::Fastest) .compress_when(compression_predicate), ) .layer(cors)