From 558e2cacf0572a98d21b39ec953a1be4546d970c Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Thu, 22 Feb 2024 14:55:19 +0000 Subject: [PATCH] Wait for merges to complete --- distribution/lambda/Makefile | 2 + .../cdk/stacks/examples/mock_data_stack.py | 11 ++- .../cdk/stacks/services/indexer_service.py | 4 +- .../cdk/stacks/services/quickwit_service.py | 8 +- quickwit/quickwit-lambda/src/bin/indexer.rs | 2 +- quickwit/quickwit-lambda/src/bin/searcher.rs | 2 +- quickwit/quickwit-lambda/src/environment.rs | 34 +++++++++ .../src/indexer/environment.rs | 7 +- .../quickwit-lambda/src/indexer/handler.rs | 4 +- .../src/indexer/ingest/helpers.rs | 75 +++++++++++++------ .../quickwit-lambda/src/indexer/ingest/mod.rs | 49 ++++++------ quickwit/quickwit-lambda/src/lib.rs | 1 + quickwit/quickwit-lambda/src/logger.rs | 18 +++-- .../src/searcher/environment.rs | 3 - .../quickwit-lambda/src/searcher/handler.rs | 3 +- .../quickwit-lambda/src/searcher/search.rs | 3 +- 16 files changed, 154 insertions(+), 72 deletions(-) create mode 100644 quickwit/quickwit-lambda/src/environment.rs diff --git a/distribution/lambda/Makefile b/distribution/lambda/Makefile index 92b9e04c84a..c133d42223e 100644 --- a/distribution/lambda/Makefile +++ b/distribution/lambda/Makefile @@ -108,6 +108,7 @@ bench-index: done bench-search-term: + export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true mem_sizes=( 1024 2048 4096 8192 ) for mem_size in "$${mem_sizes[@]}" do @@ -117,6 +118,7 @@ bench-search-term: done bench-search-histogram: + export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true mem_sizes=( 1024 2048 4096 8192 ) for mem_size in "$${mem_sizes[@]}" do diff --git a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py index a83e4546075..4822e69723a 100644 --- a/distribution/lambda/cdk/stacks/examples/mock_data_stack.py +++ b/distribution/lambda/cdk/stacks/examples/mock_data_stack.py @@ -12,7 +12,7 @@ from constructs import Construct import yaml -from ..services.quickwit_service import QuickwitService +from ..services import quickwit_service SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name" INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name" @@ -28,7 +28,7 @@ def __init__( scope: Construct, construct_id: str, index_id: str, - qw_svc: QuickwitService, + qw_svc: quickwit_service.QuickwitService, **kwargs, ): super().__init__(scope, construct_id, **kwargs) @@ -83,7 +83,7 @@ def __init__( scope: Construct, construct_id: str, index_id: str, - qw_svc: QuickwitService, + qw_svc: quickwit_service.QuickwitService, api_key: str, **kwargs, ) -> None: @@ -149,12 +149,15 @@ def __init__( "mock-data-index-config", path=index_config_local_path, ) - qw_svc = QuickwitService( + lambda_env = quickwit_service.extract_local_env() + qw_svc = quickwit_service.QuickwitService( self, "Quickwit", index_id=index_id, index_config_bucket=index_config.s3_bucket_name, index_config_key=index_config.s3_object_key, + indexer_environment=lambda_env, + searcher_environment=lambda_env, indexer_package_location=indexer_package_location, searcher_package_location=searcher_package_location, ) diff --git a/distribution/lambda/cdk/stacks/services/indexer_service.py b/distribution/lambda/cdk/stacks/services/indexer_service.py index 1dee9230e6f..65a32ffb8a6 100644 --- a/distribution/lambda/cdk/stacks/services/indexer_service.py +++ b/distribution/lambda/cdk/stacks/services/indexer_service.py @@ -32,7 +32,9 @@ def __init__( "QW_LAMBDA_INDEX_CONFIG_URI": f"s3://{index_config_bucket}/{index_config_key}", **environment, }, - timeout=aws_cdk.Duration.minutes(15), + # use a strict timeout and retry policy to avoid unexpected costs + timeout=aws_cdk.Duration.minutes(1), + retry_attempts=0, reserved_concurrent_executions=1, memory_size=memory_size, ephemeral_storage_size=aws_cdk.Size.gibibytes(10), diff --git a/distribution/lambda/cdk/stacks/services/quickwit_service.py b/distribution/lambda/cdk/stacks/services/quickwit_service.py index 2887983f1c1..d0505b63faf 100644 --- a/distribution/lambda/cdk/stacks/services/quickwit_service.py +++ b/distribution/lambda/cdk/stacks/services/quickwit_service.py @@ -12,8 +12,12 @@ def extract_local_env() -> dict[str, str]: - """Extracts local environment variables that start with QW_LAMBDA_""" - return {k: os.environ[k] for k in os.environ.keys() if k.startswith("QW_LAMBDA_")} + """Extracts local environment variables QW_LAMBDA_* and QW_DISABLE_TELEMETRY""" + return { + k: os.environ[k] + for k in os.environ.keys() + if (k.startswith("QW_LAMBDA_") or k == "QW_DISABLE_TELEMETRY") + } class QuickwitService(Construct): diff --git a/quickwit/quickwit-lambda/src/bin/indexer.rs b/quickwit/quickwit-lambda/src/bin/indexer.rs index a62cccea377..f31196955f1 100644 --- a/quickwit/quickwit-lambda/src/bin/indexer.rs +++ b/quickwit/quickwit-lambda/src/bin/indexer.rs @@ -23,7 +23,7 @@ use quickwit_lambda::logger; #[tokio::main] async fn main() -> anyhow::Result<()> { - logger::setup_lambda_tracer(tracing::Level::DEBUG)?; + logger::setup_lambda_tracer(tracing::Level::INFO)?; let func = service_fn(handler); lambda_runtime::run(func) .await diff --git a/quickwit/quickwit-lambda/src/bin/searcher.rs b/quickwit/quickwit-lambda/src/bin/searcher.rs index 33ee5f16034..564ea4e6653 100644 --- a/quickwit/quickwit-lambda/src/bin/searcher.rs +++ b/quickwit/quickwit-lambda/src/bin/searcher.rs @@ -23,7 +23,7 @@ use quickwit_lambda::searcher::handler; #[tokio::main] async fn main() -> anyhow::Result<()> { - logger::setup_lambda_tracer(tracing::Level::DEBUG)?; + logger::setup_lambda_tracer(tracing::Level::INFO)?; let func = service_fn(handler); run(func).await.map_err(|e| anyhow::anyhow!(e)) } diff --git a/quickwit/quickwit-lambda/src/environment.rs b/quickwit/quickwit-lambda/src/environment.rs new file mode 100644 index 00000000000..f279a7bc81b --- /dev/null +++ b/quickwit/quickwit-lambda/src/environment.rs @@ -0,0 +1,34 @@ +// Copyright (C) 2024 Quickwit, Inc. +// +// Quickwit is offered under the AGPL v3.0 and as commercial software. +// For commercial licensing, contact us at hello@quickwit.io. +// +// AGPL: +// This program is free software: you can redistribute it and/or modify +// it under the terms of the GNU Affero General Public License as +// published by the Free Software Foundation, either version 3 of the +// License, or (at your option) any later version. +// +// This program is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Affero General Public License for more details. +// +// You should have received a copy of the GNU Affero General Public License +// along with this program. If not, see . + +use std::env::var; + +use once_cell::sync::Lazy; + +pub static INDEX_ID: Lazy = + Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set")); + +pub static LOG_SPAN_BOUNDARIES: Lazy = + Lazy::new(|| var("QW_LAMBDA_LOG_SPAN_BOUNDARIES").is_ok_and(|v| v.as_str() == "true")); + +pub static OPENTELEMETRY_URL: Lazy> = + Lazy::new(|| var("QW_LAMBDA_OPENTELEMETRY_URL").ok()); + +pub static OPENTELEMETRY_AUTHORIZATION: Lazy> = + Lazy::new(|| var("QW_LAMBDA_OPENTELEMETRY_AUTHORIZATION").ok()); diff --git a/quickwit/quickwit-lambda/src/indexer/environment.rs b/quickwit/quickwit-lambda/src/indexer/environment.rs index 92f264a268c..0a7e2f19acd 100644 --- a/quickwit/quickwit-lambda/src/indexer/environment.rs +++ b/quickwit/quickwit-lambda/src/indexer/environment.rs @@ -23,6 +23,7 @@ use once_cell::sync::Lazy; pub const CONFIGURATION_TEMPLATE: &str = "version: 0.6 node_id: lambda-indexer +cluster_id: lambda-ephemeral metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index data_dir: /tmp @@ -32,8 +33,8 @@ pub static INDEX_CONFIG_URI: Lazy = Lazy::new(|| { var("QW_LAMBDA_INDEX_CONFIG_URI").expect("QW_LAMBDA_INDEX_CONFIG_URI must be set") }); -pub static INDEX_ID: Lazy = - Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set")); - pub static DISABLE_MERGE: Lazy = Lazy::new(|| var("QW_LAMBDA_DISABLE_MERGE").is_ok_and(|v| v.as_str() == "true")); + +pub static DISABLE_JANITOR: Lazy = + Lazy::new(|| var("QW_LAMBDA_DISABLE_JANITOR").is_ok_and(|v| v.as_str() == "true")); diff --git a/quickwit/quickwit-lambda/src/indexer/handler.rs b/quickwit/quickwit-lambda/src/indexer/handler.rs index 9160b10b9b8..063b3a04973 100644 --- a/quickwit/quickwit-lambda/src/indexer/handler.rs +++ b/quickwit/quickwit-lambda/src/indexer/handler.rs @@ -21,9 +21,10 @@ use lambda_runtime::{Error, LambdaEvent}; use serde_json::Value; use tracing::{debug_span, error, info, info_span, Instrument}; -use super::environment::{DISABLE_MERGE, INDEX_CONFIG_URI, INDEX_ID}; +use super::environment::{DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI}; use super::ingest::{ingest, IngestArgs}; use super::model::IndexerEvent; +use crate::environment::INDEX_ID; use crate::logger; use crate::utils::LambdaContainerContext; @@ -45,6 +46,7 @@ async fn indexer_handler(event: LambdaEvent) -> Result { env.INDEX_CONFIG_URI = *INDEX_CONFIG_URI, env.INDEX_ID = *INDEX_ID, env.DISABLE_MERGE = *DISABLE_MERGE, + env.DISABLE_JANITOR = *DISABLE_JANITOR, cold = container_ctx.cold, container_id = container_ctx.container_id, )) diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs index 8d2a0bfa5ac..871c993fa1b 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs @@ -55,17 +55,21 @@ use quickwit_storage::StorageResolver; use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent}; use tracing::{debug, info, instrument}; -use crate::indexer::environment::{DISABLE_MERGE, INDEX_CONFIG_URI, INDEX_ID}; +use crate::environment::INDEX_ID; +use crate::indexer::environment::{DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI}; /// The indexing service needs to update its cluster chitchat state so that the control plane is /// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service /// and avoid impacting potential control plane running on the cluster. -pub(super) async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result { +pub(super) async fn create_empty_cluster( + config: &NodeConfig, + services: &[QuickwitService], +) -> anyhow::Result { let self_node = ClusterMember { node_id: NodeId::new(config.node_id.clone()), generation_id: quickwit_cluster::GenerationId::now(), is_ready: false, - enabled_services: HashSet::new(), + enabled_services: HashSet::from_iter(services.to_owned()), gossip_advertise_addr: config.gossip_advertise_addr, grpc_advertise_addr: config.grpc_advertise_addr, indexing_tasks: Vec::new(), @@ -198,9 +202,8 @@ pub(super) async fn spawn_services( node_config: &NodeConfig, runtime_config: RuntimesConfig, ) -> anyhow::Result<( - Mailbox, ActorHandle, - Mailbox, + Option>, )> { let event_broker = EventBroker::default(); @@ -211,7 +214,7 @@ pub(super) async fn spawn_services( universe.spawn_builder().spawn(merge_scheduler_service); // spawn indexer service - let indexing_server = IndexingService::new( + let indexing_service = IndexingService::new( node_config.node_id.clone(), node_config.data_dir_path.clone(), node_config.indexer_config.clone(), @@ -225,25 +228,26 @@ pub(super) async fn spawn_services( event_broker.clone(), ) .await?; - let (indexing_server_mailbox, indexing_server_handle) = - universe.spawn_builder().spawn(indexing_server); + let (_, indexing_service_handle) = universe.spawn_builder().spawn(indexing_service); - let janitor_service_mailbox = start_janitor_service( - universe, - node_config, - metastore, - SearchJobPlacer::default(), - storage_resolver, - event_broker, - true, - ) - .await?; - - Ok(( - indexing_server_mailbox, - indexing_server_handle, - janitor_service_mailbox, - )) + // spawn janitor service + let janitor_service_opt = if *DISABLE_JANITOR { + None + } else { + Some( + start_janitor_service( + universe, + node_config, + metastore, + SearchJobPlacer::default(), + storage_resolver, + event_broker, + true, + ) + .await?, + ) + }; + Ok((indexing_service_handle, janitor_service_opt)) } pub(super) async fn spawn_pipelines( @@ -267,3 +271,26 @@ pub(super) async fn spawn_pipelines( .await?; Ok((indexing_pipeline_handle, merge_pipeline_handle)) } + +pub(super) async fn wait_for_merges( + merge_pipeline_handle: ActorHandle, +) -> anyhow::Result<()> { + // TODO: find a way to stop the MergePlanner actor in the MergePipeline, + // otherwise a new merge might be scheduled after this loop. That shouldn't + // have any concrete impact as the merge will be immediately cancelled, but + // it might generate errors during the universe shutdown (i.e "Failed to + // acquire permit") + loop { + let state = merge_pipeline_handle.state(); + let obs = merge_pipeline_handle.observe().await; + debug!(state=?state, ongoing=obs.num_ongoing_merges, "merge pipeline state"); + if obs.num_ongoing_merges == 0 { + break; + } + // We tolerate a relatively low refresh rate because the indexer + // typically runs for longuer periods of times and merges happen only + // occasionally. + tokio::time::sleep(tokio::time::Duration::from_millis(500)).await; + } + Ok(()) +} diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs index 7a9c7a5660b..927fc684252 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs @@ -38,6 +38,7 @@ use quickwit_indexing::models::IndexingStatistics; use tracing::{debug, info}; use crate::indexer::environment::CONFIGURATION_TEMPLATE; +use crate::indexer::ingest::helpers::wait_for_merges; use crate::utils::load_node_config; #[derive(Debug, Eq, PartialEq)] @@ -68,42 +69,42 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { ) .await?; - let cluster = create_empty_cluster(&config).await?; + let services = [QuickwitService::Indexer, QuickwitService::Janitor]; + let cluster = create_empty_cluster(&config, &services).await?; let universe = Universe::new(); let runtimes_config = RuntimesConfig::default(); - start_actor_runtimes( + start_actor_runtimes(runtimes_config, &HashSet::from_iter(services))?; + + let (indexing_service_handle, _janitor_service_guard) = spawn_services( + &universe, + cluster, + metastore.clone(), + storage_resolver.clone(), + &config, runtimes_config, - &HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]), - )?; - - let (indexing_server_mailbox, indexing_server_handle, janitor_service_mailbox) = - spawn_services( - &universe, - cluster, - metastore.clone(), - storage_resolver.clone(), - &config, - runtimes_config, - ) - .await?; + ) + .await?; let (indexing_pipeline_handle, merge_pipeline_handle) = - spawn_pipelines(&indexing_server_mailbox, source_config).await?; + spawn_pipelines(indexing_service_handle.mailbox(), source_config).await?; - debug!("wait for indexing statistics"); + debug!("wait for indexing to complete"); let statistics = start_statistics_reporting_loop(indexing_pipeline_handle, false).await?; - debug!("indexing completed, tear down actors"); - merge_pipeline_handle.quit().await; - universe - .send_exit_with_success(&janitor_service_mailbox) - .await?; + debug!("wait for merges to complete"); + wait_for_merges(merge_pipeline_handle).await?; + + debug!("indexing completed, tearing down actors"); + // TODO: is it really necessary to terminate the indexing service? + // Quitting the universe should be enough. universe - .send_exit_with_success(&indexing_server_mailbox) + .send_exit_with_success(indexing_service_handle.mailbox()) .await?; - indexing_server_handle.join().await; + indexing_service_handle.join().await; + debug!("quitting universe"); universe.quit().await; + debug!("universe.quit() awaited"); if args.clear_cache { info!("clearing local cache directory"); diff --git a/quickwit/quickwit-lambda/src/lib.rs b/quickwit/quickwit-lambda/src/lib.rs index 0bb10d0cd83..3e5de5cb592 100644 --- a/quickwit/quickwit-lambda/src/lib.rs +++ b/quickwit/quickwit-lambda/src/lib.rs @@ -17,6 +17,7 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . +mod environment; pub mod indexer; pub mod logger; pub mod searcher; diff --git a/quickwit/quickwit-lambda/src/logger.rs b/quickwit/quickwit-lambda/src/logger.rs index 576c9708147..632c0abc817 100644 --- a/quickwit/quickwit-lambda/src/logger.rs +++ b/quickwit/quickwit-lambda/src/logger.rs @@ -32,6 +32,8 @@ use tracing_subscriber::prelude::*; use tracing_subscriber::registry::LookupSpan; use tracing_subscriber::{EnvFilter, Layer}; +use crate::environment::{LOG_SPAN_BOUNDARIES, OPENTELEMETRY_AUTHORIZATION, OPENTELEMETRY_URL}; + static TRACER_PROVIDER: OnceCell> = OnceCell::new(); pub(crate) const RUNTIME_CONTEXT_SPAN: &str = "runtime_context"; @@ -59,8 +61,13 @@ where ), ) .json(); + let fmt_span = if *LOG_SPAN_BOUNDARIES { + FmtSpan::NEW | FmtSpan::CLOSE + } else { + FmtSpan::NONE + }; tracing_subscriber::fmt::layer::() - .with_span_events(FmtSpan::NEW | FmtSpan::CLOSE) + .with_span_events(fmt_span) .event_format(event_format) .fmt_fields(JsonFields::default()) .with_ansi(false) @@ -114,12 +121,11 @@ where pub fn setup_lambda_tracer(level: Level) -> anyhow::Result<()> { global::set_text_map_propagator(TraceContextPropagator::new()); let registry = tracing_subscriber::registry(); - let otlp_config = ( - std::env::var("QW_LAMBDA_OPENTELEMETRY_URL"), - std::env::var("QW_LAMBDA_OPENTELEMETRY_AUTHORIZATION"), - ); let build_info = BuildInfo::get(); - if let (Ok(ot_url), Ok(ot_auth)) = otlp_config { + if let (Some(ot_url), Some(ot_auth)) = ( + OPENTELEMETRY_URL.clone(), + OPENTELEMETRY_AUTHORIZATION.clone(), + ) { registry .with(fmt_layer(level)) .with(otlp_layer(ot_url, ot_auth, level, build_info)) diff --git a/quickwit/quickwit-lambda/src/searcher/environment.rs b/quickwit/quickwit-lambda/src/searcher/environment.rs index b995949dff9..2aee5752463 100644 --- a/quickwit/quickwit-lambda/src/searcher/environment.rs +++ b/quickwit/quickwit-lambda/src/searcher/environment.rs @@ -30,8 +30,5 @@ searcher: partial_request_cache_capacity: ${QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY:-64M} "; -pub(crate) static INDEX_ID: Lazy = - Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set")); - pub(crate) static DISABLE_SEARCH_CACHE: Lazy = Lazy::new(|| var("QW_LAMBDA_DISABLE_SEARCH_CACHE").is_ok_and(|v| v.as_str() == "true")); diff --git a/quickwit/quickwit-lambda/src/searcher/handler.rs b/quickwit/quickwit-lambda/src/searcher/handler.rs index e57de93bdac..b20e373e5bf 100644 --- a/quickwit/quickwit-lambda/src/searcher/handler.rs +++ b/quickwit/quickwit-lambda/src/searcher/handler.rs @@ -26,8 +26,9 @@ use quickwit_search::SearchResponseRest; use quickwit_serve::SearchRequestQueryString; use tracing::{debug_span, error, info_span, instrument, Instrument}; -use super::environment::{DISABLE_SEARCH_CACHE, INDEX_ID}; +use super::environment::DISABLE_SEARCH_CACHE; use super::search::{search, SearchArgs}; +use crate::environment::INDEX_ID; use crate::logger; use crate::utils::LambdaContainerContext; diff --git a/quickwit/quickwit-lambda/src/searcher/search.rs b/quickwit/quickwit-lambda/src/searcher/search.rs index 3d0e0d249f9..e03516fe8ef 100644 --- a/quickwit/quickwit-lambda/src/searcher/search.rs +++ b/quickwit/quickwit-lambda/src/searcher/search.rs @@ -35,7 +35,8 @@ use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, Teleme use tokio::sync::OnceCell; use tracing::debug; -use super::environment::{CONFIGURATION_TEMPLATE, DISABLE_SEARCH_CACHE, INDEX_ID}; +use super::environment::{CONFIGURATION_TEMPLATE, DISABLE_SEARCH_CACHE}; +use crate::environment::INDEX_ID; use crate::utils::load_node_config; static LAMBDA_SEARCH_CACHE: OnceCell = OnceCell::const_new();