Skip to content

Commit

Permalink
Add ability to disable ingest v1 (#4917)
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload authored May 6, 2024
1 parent 93cf7f3 commit f62e425
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 19 deletions.
17 changes: 15 additions & 2 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#![deny(clippy::disallowed_methods)]

use std::env;
use std::str::FromStr;

use anyhow::{bail, ensure, Context};
Expand Down Expand Up @@ -69,15 +70,27 @@ pub use crate::metastore_config::{
MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig,
};
pub use crate::node_config::{
enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig,
SplitCacheLimits, DEFAULT_QW_CONFIG_PATH,
IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits,
DEFAULT_QW_CONFIG_PATH,
};
use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig};
pub use crate::storage_config::{
AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig,
S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs,
};

/// Returns true if the ingest API v2 is enabled.
pub fn enable_ingest_v2() -> bool {
static ENABLE_INGEST_V2: Lazy<bool> = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok());
*ENABLE_INGEST_V2
}

/// Returns true if the ingest API v1 is disabled.
pub fn disable_ingest_v1() -> bool {
static ENABLE_INGEST_V2: Lazy<bool> = Lazy::new(|| env::var("QW_DISABLE_INGEST_V1").is_ok());
*ENABLE_INGEST_V2
}

#[derive(utoipa::OpenApi)]
#[openapi(components(schemas(
IndexingResources,
Expand Down
7 changes: 0 additions & 7 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use std::time::Duration;
use anyhow::{bail, ensure};
use bytesize::ByteSize;
use http::HeaderMap;
use once_cell::sync::Lazy;
use quickwit_common::net::HostAddr;
use quickwit_common::uri::Uri;
use quickwit_proto::indexing::CpuCapacity;
Expand Down Expand Up @@ -284,12 +283,6 @@ impl Default for IngestApiConfig {
}
}

/// Returns true if the ingest API v2 is enabled.
pub fn enable_ingest_v2() -> bool {
static ENABLE_INGEST_V2: Lazy<bool> = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok());
*ENABLE_INGEST_V2
}

impl IngestApiConfig {
pub fn replication_factor(&self) -> anyhow::Result<NonZeroUsize> {
if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") {
Expand Down
16 changes: 8 additions & 8 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use serialize::load_source_config_from_user_config;
// For backward compatibility.
use serialize::VersionedSourceConfig;

use crate::{enable_ingest_v2, TestableForRegression};
use crate::{disable_ingest_v1, enable_ingest_v2, TestableForRegression};

/// Reserved source ID for the `quickwit index ingest` CLI command.
pub const CLI_SOURCE_ID: &str = "_ingest-cli-source";
Expand Down Expand Up @@ -107,7 +107,7 @@ impl SourceConfig {
pub fn cli() -> Self {
Self {
source_id: CLI_SOURCE_ID.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::IngestCli,
transform_config: None,
Expand All @@ -119,7 +119,7 @@ impl SourceConfig {
pub fn ingest_v2() -> Self {
Self {
source_id: INGEST_V2_SOURCE_ID.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: enable_ingest_v2(),
source_params: SourceParams::Ingest,
transform_config: None,
Expand All @@ -131,8 +131,8 @@ impl SourceConfig {
pub fn ingest_api_default() -> Self {
Self {
source_id: INGEST_API_SOURCE_ID.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: true,
num_pipelines: NonZeroUsize::MIN,
enabled: !disable_ingest_v1(),
source_params: SourceParams::IngestApi,
transform_config: None,
input_format: SourceInputFormat::Json,
Expand All @@ -143,7 +143,7 @@ impl SourceConfig {
pub fn for_test(source_id: &str, source_params: SourceParams) -> Self {
Self {
source_id: source_id.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params,
transform_config: None,
Expand Down Expand Up @@ -652,7 +652,7 @@ mod tests {
load_source_config_from_user_config(config_format, file_content.as_bytes()).unwrap();
let expected_source_config = SourceConfig {
source_id: "hdfs-logs-kinesis-source".to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::Kinesis(KinesisSourceParams {
stream_name: "emr-cluster-logs".to_string(),
Expand Down Expand Up @@ -1078,7 +1078,7 @@ mod tests {
let source_config: SourceConfig = ConfigFormat::Json.parse(&file_content).unwrap();
let expected_source_config = SourceConfig {
source_id: INGEST_API_SOURCE_ID.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::IngestApi,
transform_config: Some(TransformConfig {
Expand Down
9 changes: 8 additions & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::HashMap;
use std::time::Instant;

use hyper::StatusCode;
use quickwit_config::enable_ingest_v2;
use quickwit_config::{disable_ingest_v1, enable_ingest_v2};
use quickwit_ingest::{
CommitType, DocBatchBuilder, IngestRequest, IngestService, IngestServiceClient,
};
Expand Down Expand Up @@ -85,6 +85,13 @@ async fn elastic_ingest_bulk(
if enable_ingest_v2() || bulk_options.enable_ingest_v2 {
return elastic_bulk_ingest_v2(default_index_id, body, bulk_options, ingest_router).await;
}
if disable_ingest_v1() {
return Err(ElasticsearchError::new(
StatusCode::INTERNAL_SERVER_ERROR,
"ingest v1 is disabled: environment variable `QW_DISABLE_INGEST_V1` is set".to_string(),
None,
));
}
let now = Instant::now();
let mut doc_batch_builders = HashMap::new();
let mut lines = lines(&body.content).enumerate();
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use bytes::{Buf, Bytes};
use quickwit_config::{IngestApiConfig, INGEST_V2_SOURCE_ID};
use quickwit_config::{disable_ingest_v1, IngestApiConfig, INGEST_V2_SOURCE_ID};
use quickwit_ingest::{
CommitType, DocBatchBuilder, DocBatchV2Builder, FetchResponse, IngestRequest, IngestResponse,
IngestService, IngestServiceClient, IngestServiceError, TailRequest,
Expand Down Expand Up @@ -213,6 +213,10 @@ async fn ingest(
ingest_options: IngestOptions,
mut ingest_service: IngestServiceClient,
) -> Result<IngestResponse, IngestServiceError> {
if disable_ingest_v1() {
let message = "ingest v1 is disabled: environment variable `QW_DISABLE_INGEST_V1` is set";
return Err(IngestServiceError::Internal(message.to_string()));
}
// The size of the body should be an upper bound of the size of the batch. The removal of the
// end of line character for each doc compensates the addition of the `DocCommand` header.
let mut doc_batch_builder = DocBatchBuilder::with_capacity(index_id, body.content.remaining());
Expand Down

0 comments on commit f62e425

Please sign in to comment.