Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ability to disable ingest v1 #4917

Merged
merged 1 commit into from
May 6, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 15 additions & 2 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#![deny(clippy::disallowed_methods)]

use std::env;
use std::str::FromStr;

use anyhow::{bail, ensure, Context};
Expand Down Expand Up @@ -69,15 +70,27 @@ pub use crate::metastore_config::{
MetastoreBackend, MetastoreConfig, MetastoreConfigs, PostgresMetastoreConfig,
};
pub use crate::node_config::{
enable_ingest_v2, IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig,
SplitCacheLimits, DEFAULT_QW_CONFIG_PATH,
IndexerConfig, IngestApiConfig, JaegerConfig, NodeConfig, SearcherConfig, SplitCacheLimits,
DEFAULT_QW_CONFIG_PATH,
};
use crate::source_config::serialize::{SourceConfigV0_7, SourceConfigV0_8, VersionedSourceConfig};
pub use crate::storage_config::{
AzureStorageConfig, FileStorageConfig, GoogleCloudStorageConfig, RamStorageConfig,
S3StorageConfig, StorageBackend, StorageBackendFlavor, StorageConfig, StorageConfigs,
};

/// Returns true if the ingest API v2 is enabled.
pub fn enable_ingest_v2() -> bool {
static ENABLE_INGEST_V2: Lazy<bool> = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok());
*ENABLE_INGEST_V2
}

/// Returns true if the ingest API v1 is disabled.
pub fn disable_ingest_v1() -> bool {
static ENABLE_INGEST_V2: Lazy<bool> = Lazy::new(|| env::var("QW_DISABLE_INGEST_V1").is_ok());
*ENABLE_INGEST_V2
}

#[derive(utoipa::OpenApi)]
#[openapi(components(schemas(
IndexingResources,
Expand Down
7 changes: 0 additions & 7 deletions quickwit/quickwit-config/src/node_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ use std::time::Duration;
use anyhow::{bail, ensure};
use bytesize::ByteSize;
use http::HeaderMap;
use once_cell::sync::Lazy;
use quickwit_common::net::HostAddr;
use quickwit_common::uri::Uri;
use quickwit_proto::indexing::CpuCapacity;
Expand Down Expand Up @@ -284,12 +283,6 @@ impl Default for IngestApiConfig {
}
}

/// Returns true if the ingest API v2 is enabled.
pub fn enable_ingest_v2() -> bool {
static ENABLE_INGEST_V2: Lazy<bool> = Lazy::new(|| env::var("QW_ENABLE_INGEST_V2").is_ok());
*ENABLE_INGEST_V2
}

impl IngestApiConfig {
pub fn replication_factor(&self) -> anyhow::Result<NonZeroUsize> {
if let Ok(replication_factor_str) = env::var("QW_INGEST_REPLICATION_FACTOR") {
Expand Down
16 changes: 8 additions & 8 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ pub use serialize::load_source_config_from_user_config;
// For backward compatibility.
use serialize::VersionedSourceConfig;

use crate::{enable_ingest_v2, TestableForRegression};
use crate::{disable_ingest_v1, enable_ingest_v2, TestableForRegression};

/// Reserved source ID for the `quickwit index ingest` CLI command.
pub const CLI_SOURCE_ID: &str = "_ingest-cli-source";
Expand Down Expand Up @@ -107,7 +107,7 @@ impl SourceConfig {
pub fn cli() -> Self {
Self {
source_id: CLI_SOURCE_ID.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::IngestCli,
transform_config: None,
Expand All @@ -119,7 +119,7 @@ impl SourceConfig {
pub fn ingest_v2() -> Self {
Self {
source_id: INGEST_V2_SOURCE_ID.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: enable_ingest_v2(),
source_params: SourceParams::Ingest,
transform_config: None,
Expand All @@ -131,8 +131,8 @@ impl SourceConfig {
pub fn ingest_api_default() -> Self {
Self {
source_id: INGEST_API_SOURCE_ID.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
enabled: true,
num_pipelines: NonZeroUsize::MIN,
enabled: !disable_ingest_v1(),
source_params: SourceParams::IngestApi,
transform_config: None,
input_format: SourceInputFormat::Json,
Expand All @@ -143,7 +143,7 @@ impl SourceConfig {
pub fn for_test(source_id: &str, source_params: SourceParams) -> Self {
Self {
source_id: source_id.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params,
transform_config: None,
Expand Down Expand Up @@ -652,7 +652,7 @@ mod tests {
load_source_config_from_user_config(config_format, file_content.as_bytes()).unwrap();
let expected_source_config = SourceConfig {
source_id: "hdfs-logs-kinesis-source".to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::Kinesis(KinesisSourceParams {
stream_name: "emr-cluster-logs".to_string(),
Expand Down Expand Up @@ -1078,7 +1078,7 @@ mod tests {
let source_config: SourceConfig = ConfigFormat::Json.parse(&file_content).unwrap();
let expected_source_config = SourceConfig {
source_id: INGEST_API_SOURCE_ID.to_string(),
num_pipelines: NonZeroUsize::new(1).expect("1 should be non-zero"),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::IngestApi,
transform_config: Some(TransformConfig {
Expand Down
9 changes: 8 additions & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/bulk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ use std::collections::HashMap;
use std::time::Instant;

use hyper::StatusCode;
use quickwit_config::enable_ingest_v2;
use quickwit_config::{disable_ingest_v1, enable_ingest_v2};
use quickwit_ingest::{
CommitType, DocBatchBuilder, IngestRequest, IngestService, IngestServiceClient,
};
Expand Down Expand Up @@ -85,6 +85,13 @@ async fn elastic_ingest_bulk(
if enable_ingest_v2() || bulk_options.enable_ingest_v2 {
return elastic_bulk_ingest_v2(default_index_id, body, bulk_options, ingest_router).await;
}
if disable_ingest_v1() {
return Err(ElasticsearchError::new(
StatusCode::INTERNAL_SERVER_ERROR,
"ingest v1 is disabled: environment variable `QW_DISABLE_INGEST_V1` is set".to_string(),
None,
));
}
let now = Instant::now();
let mut doc_batch_builders = HashMap::new();
let mut lines = lines(&body.content).enumerate();
Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-serve/src/ingest_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use bytes::{Buf, Bytes};
use quickwit_config::{IngestApiConfig, INGEST_V2_SOURCE_ID};
use quickwit_config::{disable_ingest_v1, IngestApiConfig, INGEST_V2_SOURCE_ID};
use quickwit_ingest::{
CommitType, DocBatchBuilder, DocBatchV2Builder, FetchResponse, IngestRequest, IngestResponse,
IngestService, IngestServiceClient, IngestServiceError, TailRequest,
Expand Down Expand Up @@ -213,6 +213,10 @@ async fn ingest(
ingest_options: IngestOptions,
mut ingest_service: IngestServiceClient,
) -> Result<IngestResponse, IngestServiceError> {
if disable_ingest_v1() {
let message = "ingest v1 is disabled: environment variable `QW_DISABLE_INGEST_V1` is set";
return Err(IngestServiceError::Internal(message.to_string()));
}
// The size of the body should be an upper bound of the size of the batch. The removal of the
// end of line character for each doc compensates the addition of the `DocCommand` header.
let mut doc_batch_builder = DocBatchBuilder::with_capacity(index_id, body.content.remaining());
Expand Down
Loading