From ff3930e2b355b0413f3733e7e7098a4800c2a83c Mon Sep 17 00:00:00 2001 From: Adrien Guillo Date: Fri, 26 Apr 2024 09:08:26 -0400 Subject: [PATCH] Add ability to disable ingest v1 --- quickwit/quickwit-config/src/lib.rs | 17 +++++++++++++++-- quickwit/quickwit-config/src/node_config/mod.rs | 7 ------- .../quickwit-config/src/source_config/mod.rs | 16 ++++++++-------- .../src/elasticsearch_api/bulk.rs | 9 ++++++++- .../src/ingest_api/rest_handler.rs | 6 +++++- 5 files changed, 36 insertions(+), 19 deletions(-) diff --git a/quickwit/quickwit-config/src/lib.rs b/quickwit/quickwit-config/src/lib.rs index 3b458819922..fd8d1a129e6 100644 --- a/quickwit/quickwit-config/src/lib.rs +++ b/quickwit/quickwit-config/src/lib.rs @@ -19,6 +19,7 @@ #![deny(clippy::disallowed_methods)] +use std::env; use std::str::FromStr; use anyhow::{bail, ensure, Context}; @@ -69,8 +70,8 @@ pub use crate::metastore_config::{ MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig, }; pub use crate::node_config::{ - enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, - SplitCacheLimits, DEFAULT_QW_CONFIG_PATH, + IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits, + DEFAULT_QW_CONFIG_PATH, }; use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig}; pub use crate::storage_config::{ @@ -78,6 +79,18 @@ pub use crate::storage_config::{ S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs, }; +/// Returns true if the ingest API v2 is enabled. +pub fn enable_ingest_v2() -> bool { + static ENABLE_INGEST_V2: Lazy = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok()); + *ENABLE_INGEST_V2 +} + +/// Returns true if the ingest API v1 is disabled. +pub fn disable_ingest_v1() -> bool { + static ENABLE_INGEST_V2: Lazy = Lazy::new(|| env::var("QW_DISABLE_INGEST_V1").is_ok()); + *ENABLE_INGEST_V2 +} + #[derive(utoipa::OpenApi)] #[openapi(components(schemas( IndexingResources, diff --git a/quickwit/quickwit-config/src/node_config/mod.rs b/quickwit/quickwit-config/src/node_config/mod.rs index 12473933c7e..d44aff68c9b 100644 --- a/quickwit/quickwit-config/src/node_config/mod.rs +++ b/quickwit/quickwit-config/src/node_config/mod.rs @@ -29,7 +29,6 @@ use std::time::Duration; use anyhow::{bail, ensure}; use bytesize::ByteSize; use http::HeaderMap; -use once_cell::sync::Lazy; use quickwit_common::net::HostAddr; use quickwit_common::uri::Uri; use quickwit_proto::indexing::CpuCapacity; @@ -284,12 +283,6 @@ impl Default for IngestApiConfig { } } -/// Returns true if the ingest API v2 is enabled. -pub fn enable_ingest_v2() -> bool { - static ENABLE_INGEST_V2: Lazy = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok()); - *ENABLE_INGEST_V2 -} - impl IngestApiConfig { pub fn replication_factor(&self) -> anyhow::Result { if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") { diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 9992729811f..320f06317a6 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -34,7 +34,7 @@ pub use serialize::load_source_config_from_user_config; // For backward compatibility. use serialize::VersionedSourceConfig; -use crate::{enable_ingest_v2, TestableForRegression}; +use crate::{disable_ingest_v1, enable_ingest_v2, TestableForRegression}; /// Reserved source ID for the `quickwit index ingest` CLI command. pub const CLI_SOURCE_ID: &str = "_ingest-cli-source"; @@ -107,7 +107,7 @@ impl SourceConfig { pub fn cli() -> Self { Self { source_id: CLI_SOURCE_ID.to_string(), - num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params: SourceParams::IngestCli, transform_config: None, @@ -119,7 +119,7 @@ impl SourceConfig { pub fn ingest_v2() -> Self { Self { source_id: INGEST_V2_SOURCE_ID.to_string(), - num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::MIN, enabled: enable_ingest_v2(), source_params: SourceParams::Ingest, transform_config: None, @@ -131,8 +131,8 @@ impl SourceConfig { pub fn ingest_api_default() -> Self { Self { source_id: INGEST_API_SOURCE_ID.to_string(), - num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), - enabled: true, + num_pipelines: NonZeroUsize::MIN, + enabled: !disable_ingest_v1(), source_params: SourceParams::IngestApi, transform_config: None, input_format: SourceInputFormat::Json, @@ -143,7 +143,7 @@ impl SourceConfig { pub fn for_test(source_id: &str, source_params: SourceParams) -> Self { Self { source_id: source_id.to_string(), - num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params, transform_config: None, @@ -652,7 +652,7 @@ mod tests { load_source_config_from_user_config(config_format, file_content.as_bytes()).unwrap(); let expected_source_config = SourceConfig { source_id: "hdfs-logs-kinesis-source".to_string(), - num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params: SourceParams::Kinesis(KinesisSourceParams { stream_name: "emr-cluster-logs".to_string(), @@ -1078,7 +1078,7 @@ mod tests { let source_config: SourceConfig = ConfigFormat::Json.parse(&file_content).unwrap(); let expected_source_config = SourceConfig { source_id: INGEST_API_SOURCE_ID.to_string(), - num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"), + num_pipelines: NonZeroUsize::MIN, enabled: true, source_params: SourceParams::IngestApi, transform_config: Some(TransformConfig { diff --git a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs index d0e1d24c51e..24b03021e7e 100644 --- a/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs +++ b/quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs @@ -21,7 +21,7 @@ use std::collections::HashMap; use std::time::Instant; use hyper::StatusCode; -use quickwit_config::enable_ingest_v2; +use quickwit_config::{disable_ingest_v1, enable_ingest_v2}; use quickwit_ingest::{ CommitType, DocBatchBuilder, IngestRequest, IngestService, IngestServiceClient, }; @@ -85,6 +85,13 @@ async fn elastic_ingest_bulk( if enable_ingest_v2() || bulk_options.enable_ingest_v2 { return elastic_bulk_ingest_v2(default_index_id, body, bulk_options, ingest_router).await; } + if disable_ingest_v1() { + return Err(ElasticsearchError::new( + StatusCode::INTERNAL_SERVER_ERROR, + "ingest v1 is disabled: environment variable `QW_DISABLE_INGEST_V1` is set".to_string(), + None, + )); + } let now = Instant::now(); let mut doc_batch_builders = HashMap::new(); let mut lines = lines(&body.content).enumerate(); diff --git a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs index 133e07f28a0..895a095f0fa 100644 --- a/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs +++ b/quickwit/quickwit-serve/src/ingest_api/rest_handler.rs @@ -18,7 +18,7 @@ // along with this program. If not, see . use bytes::{Buf, Bytes}; -use quickwit_config::{IngestApiConfig, INGEST_V2_SOURCE_ID}; +use quickwit_config::{disable_ingest_v1, IngestApiConfig, INGEST_V2_SOURCE_ID}; use quickwit_ingest::{ CommitType, DocBatchBuilder, DocBatchV2Builder, FetchResponse, IngestRequest, IngestResponse, IngestService, IngestServiceClient, IngestServiceError, TailRequest, @@ -213,6 +213,10 @@ async fn ingest( ingest_options: IngestOptions, mut ingest_service: IngestServiceClient, ) -> Result { + if disable_ingest_v1() { + let message = "ingest v1 is disabled: environment variable `QW_DISABLE_INGEST_V1` is set"; + return Err(IngestServiceError::Internal(message.to_string())); + } // The size of the body should be an upper bound of the size of the batch. The removal of the // end of line character for each doc compensates the addition of the `DocCommand` header. let mut doc_batch_builder = DocBatchBuilder::with_capacity(index_id, body.content.remaining());