Skip to content

Commit

Permalink
Wait for merges to complete
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Feb 22, 2024
1 parent 94cabff commit 558e2ca
Show file tree
Hide file tree
Showing 16 changed files with 154 additions and 72 deletions.
2 changes: 2 additions & 0 deletions distribution/lambda/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -108,6 +108,7 @@ bench-index:
done

bench-search-term:
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
Expand All @@ -117,6 +118,7 @@ bench-search-term:
done

bench-search-histogram:
export QW_LAMBDA_LOG_SPAN_BOUNDARIES=true
mem_sizes=( 1024 2048 4096 8192 )
for mem_size in "$${mem_sizes[@]}"
do
Expand Down
11 changes: 7 additions & 4 deletions distribution/lambda/cdk/stacks/examples/mock_data_stack.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from constructs import Construct
import yaml

from ..services.quickwit_service import QuickwitService
from ..services import quickwit_service

SEARCHER_FUNCTION_NAME_EXPORT_NAME = "mock-data-searcher-function-name"
INDEX_STORE_BUCKET_NAME_EXPORT_NAME = "mock-data-index-store-bucket-name"
Expand All @@ -28,7 +28,7 @@ def __init__(
scope: Construct,
construct_id: str,
index_id: str,
qw_svc: QuickwitService,
qw_svc: quickwit_service.QuickwitService,
**kwargs,
):
super().__init__(scope, construct_id, **kwargs)
Expand Down Expand Up @@ -83,7 +83,7 @@ def __init__(
scope: Construct,
construct_id: str,
index_id: str,
qw_svc: QuickwitService,
qw_svc: quickwit_service.QuickwitService,
api_key: str,
**kwargs,
) -> None:
Expand Down Expand Up @@ -149,12 +149,15 @@ def __init__(
"mock-data-index-config",
path=index_config_local_path,
)
qw_svc = QuickwitService(
lambda_env = quickwit_service.extract_local_env()
qw_svc = quickwit_service.QuickwitService(
self,
"Quickwit",
index_id=index_id,
index_config_bucket=index_config.s3_bucket_name,
index_config_key=index_config.s3_object_key,
indexer_environment=lambda_env,
searcher_environment=lambda_env,
indexer_package_location=indexer_package_location,
searcher_package_location=searcher_package_location,
)
Expand Down
4 changes: 3 additions & 1 deletion distribution/lambda/cdk/stacks/services/indexer_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@ def __init__(
"QW_LAMBDA_INDEX_CONFIG_URI": f"s3://{index_config_bucket}/{index_config_key}",
**environment,
},
timeout=aws_cdk.Duration.minutes(15),
# use a strict timeout and retry policy to avoid unexpected costs
timeout=aws_cdk.Duration.minutes(1),
retry_attempts=0,
reserved_concurrent_executions=1,
memory_size=memory_size,
ephemeral_storage_size=aws_cdk.Size.gibibytes(10),
Expand Down
8 changes: 6 additions & 2 deletions distribution/lambda/cdk/stacks/services/quickwit_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,12 @@


def extract_local_env() -> dict[str, str]:
"""Extracts local environment variables that start with QW_LAMBDA_"""
return {k: os.environ[k] for k in os.environ.keys() if k.startswith("QW_LAMBDA_")}
"""Extracts local environment variables QW_LAMBDA_* and QW_DISABLE_TELEMETRY"""
return {
k: os.environ[k]
for k in os.environ.keys()
if (k.startswith("QW_LAMBDA_") or k == "QW_DISABLE_TELEMETRY")
}


class QuickwitService(Construct):
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-lambda/src/bin/indexer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use quickwit_lambda::logger;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
logger::setup_lambda_tracer(tracing::Level::DEBUG)?;
logger::setup_lambda_tracer(tracing::Level::INFO)?;
let func = service_fn(handler);
lambda_runtime::run(func)
.await
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-lambda/src/bin/searcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ use quickwit_lambda::searcher::handler;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
logger::setup_lambda_tracer(tracing::Level::DEBUG)?;
logger::setup_lambda_tracer(tracing::Level::INFO)?;
let func = service_fn(handler);
run(func).await.map_err(|e| anyhow::anyhow!(e))
}
34 changes: 34 additions & 0 deletions quickwit/quickwit-lambda/src/environment.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
// Copyright (C) 2024 Quickwit, Inc.
//
// Quickwit is offered under the AGPL v3.0 and as commercial software.
// For commercial licensing, contact us at [email protected].
//
// AGPL:
// This program is free software: you can redistribute it and/or modify
// it under the terms of the GNU Affero General Public License as
// published by the Free Software Foundation, either version 3 of the
// License, or (at your option) any later version.
//
// This program is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Affero General Public License for more details.
//
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::env::var;

use once_cell::sync::Lazy;

pub static INDEX_ID: Lazy<String> =
Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set"));

pub static LOG_SPAN_BOUNDARIES: Lazy<bool> =
Lazy::new(|| var("QW_LAMBDA_LOG_SPAN_BOUNDARIES").is_ok_and(|v| v.as_str() == "true"));

pub static OPENTELEMETRY_URL: Lazy<Option<String>> =
Lazy::new(|| var("QW_LAMBDA_OPENTELEMETRY_URL").ok());

pub static OPENTELEMETRY_AUTHORIZATION: Lazy<Option<String>> =
Lazy::new(|| var("QW_LAMBDA_OPENTELEMETRY_AUTHORIZATION").ok());
7 changes: 4 additions & 3 deletions quickwit/quickwit-lambda/src/indexer/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use once_cell::sync::Lazy;

pub const CONFIGURATION_TEMPLATE: &str = "version: 0.6
node_id: lambda-indexer
cluster_id: lambda-ephemeral
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
data_dir: /tmp
Expand All @@ -32,8 +33,8 @@ pub static INDEX_CONFIG_URI: Lazy<String> = Lazy::new(|| {
var("QW_LAMBDA_INDEX_CONFIG_URI").expect("QW_LAMBDA_INDEX_CONFIG_URI must be set")
});

pub static INDEX_ID: Lazy<String> =
Lazy::new(|| var("QW_LAMBDA_INDEX_ID").expect("QW_LAMBDA_INDEX_ID must be set"));

pub static DISABLE_MERGE: Lazy<bool> =
Lazy::new(|| var("QW_LAMBDA_DISABLE_MERGE").is_ok_and(|v| v.as_str() == "true"));

pub static DISABLE_JANITOR: Lazy<bool> =
Lazy::new(|| var("QW_LAMBDA_DISABLE_JANITOR").is_ok_and(|v| v.as_str() == "true"));
4 changes: 3 additions & 1 deletion quickwit/quickwit-lambda/src/indexer/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,10 @@ use lambda_runtime::{Error, LambdaEvent};
use serde_json::Value;
use tracing::{debug_span, error, info, info_span, Instrument};

use super::environment::{DISABLE_MERGE, INDEX_CONFIG_URI, INDEX_ID};
use super::environment::{DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI};
use super::ingest::{ingest, IngestArgs};
use super::model::IndexerEvent;
use crate::environment::INDEX_ID;
use crate::logger;
use crate::utils::LambdaContainerContext;

Expand All @@ -45,6 +46,7 @@ async fn indexer_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
env.INDEX_CONFIG_URI = *INDEX_CONFIG_URI,
env.INDEX_ID = *INDEX_ID,
env.DISABLE_MERGE = *DISABLE_MERGE,
env.DISABLE_JANITOR = *DISABLE_JANITOR,
cold = container_ctx.cold,
container_id = container_ctx.container_id,
))
Expand Down
75 changes: 51 additions & 24 deletions quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,21 @@ use quickwit_storage::StorageResolver;
use quickwit_telemetry::payload::{QuickwitFeature, QuickwitTelemetryInfo, TelemetryEvent};
use tracing::{debug, info, instrument};

use crate::indexer::environment::{DISABLE_MERGE, INDEX_CONFIG_URI, INDEX_ID};
use crate::environment::INDEX_ID;
use crate::indexer::environment::{DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI};

/// The indexing service needs to update its cluster chitchat state so that the control plane is
/// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service
/// and avoid impacting potential control plane running on the cluster.
pub(super) async fn create_empty_cluster(config: &NodeConfig) -> anyhow::Result<Cluster> {
pub(super) async fn create_empty_cluster(
config: &NodeConfig,
services: &[QuickwitService],
) -> anyhow::Result<Cluster> {
let self_node = ClusterMember {
node_id: NodeId::new(config.node_id.clone()),
generation_id: quickwit_cluster::GenerationId::now(),
is_ready: false,
enabled_services: HashSet::new(),
enabled_services: HashSet::from_iter(services.to_owned()),
gossip_advertise_addr: config.gossip_advertise_addr,
grpc_advertise_addr: config.grpc_advertise_addr,
indexing_tasks: Vec::new(),
Expand Down Expand Up @@ -198,9 +202,8 @@ pub(super) async fn spawn_services(
node_config: &NodeConfig,
runtime_config: RuntimesConfig,
) -> anyhow::Result<(
Mailbox<IndexingService>,
ActorHandle<IndexingService>,
Mailbox<JanitorService>,
Option<Mailbox<JanitorService>>,
)> {
let event_broker = EventBroker::default();

Expand All @@ -211,7 +214,7 @@ pub(super) async fn spawn_services(
universe.spawn_builder().spawn(merge_scheduler_service);

// spawn indexer service
let indexing_server = IndexingService::new(
let indexing_service = IndexingService::new(
node_config.node_id.clone(),
node_config.data_dir_path.clone(),
node_config.indexer_config.clone(),
Expand All @@ -225,25 +228,26 @@ pub(super) async fn spawn_services(
event_broker.clone(),
)
.await?;
let (indexing_server_mailbox, indexing_server_handle) =
universe.spawn_builder().spawn(indexing_server);
let (_, indexing_service_handle) = universe.spawn_builder().spawn(indexing_service);

let janitor_service_mailbox = start_janitor_service(
universe,
node_config,
metastore,
SearchJobPlacer::default(),
storage_resolver,
event_broker,
true,
)
.await?;

Ok((
indexing_server_mailbox,
indexing_server_handle,
janitor_service_mailbox,
))
// spawn janitor service
let janitor_service_opt = if *DISABLE_JANITOR {
None
} else {
Some(
start_janitor_service(
universe,
node_config,
metastore,
SearchJobPlacer::default(),
storage_resolver,
event_broker,
true,
)
.await?,
)
};
Ok((indexing_service_handle, janitor_service_opt))
}

pub(super) async fn spawn_pipelines(
Expand All @@ -267,3 +271,26 @@ pub(super) async fn spawn_pipelines(
.await?;
Ok((indexing_pipeline_handle, merge_pipeline_handle))
}

pub(super) async fn wait_for_merges(
merge_pipeline_handle: ActorHandle<MergePipeline>,
) -> anyhow::Result<()> {
// TODO: find a way to stop the MergePlanner actor in the MergePipeline,
// otherwise a new merge might be scheduled after this loop. That shouldn't
// have any concrete impact as the merge will be immediately cancelled, but
// it might generate errors during the universe shutdown (i.e "Failed to
// acquire permit")
loop {
let state = merge_pipeline_handle.state();
let obs = merge_pipeline_handle.observe().await;
debug!(state=?state, ongoing=obs.num_ongoing_merges, "merge pipeline state");
if obs.num_ongoing_merges == 0 {
break;
}
// We tolerate a relatively low refresh rate because the indexer
// typically runs for longuer periods of times and merges happen only
// occasionally.
tokio::time::sleep(tokio::time::Duration::from_millis(500)).await;
}
Ok(())
}
49 changes: 25 additions & 24 deletions quickwit/quickwit-lambda/src/indexer/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ use quickwit_indexing::models::IndexingStatistics;
use tracing::{debug, info};

use crate::indexer::environment::CONFIGURATION_TEMPLATE;
use crate::indexer::ingest::helpers::wait_for_merges;
use crate::utils::load_node_config;

#[derive(Debug, Eq, PartialEq)]
Expand Down Expand Up @@ -68,42 +69,42 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
)
.await?;

let cluster = create_empty_cluster(&config).await?;
let services = [QuickwitService::Indexer, QuickwitService::Janitor];
let cluster = create_empty_cluster(&config, &services).await?;
let universe = Universe::new();
let runtimes_config = RuntimesConfig::default();

start_actor_runtimes(
start_actor_runtimes(runtimes_config, &HashSet::from_iter(services))?;

let (indexing_service_handle, _janitor_service_guard) = spawn_services(
&universe,
cluster,
metastore.clone(),
storage_resolver.clone(),
&config,
runtimes_config,
&HashSet::from_iter([QuickwitService::Indexer, QuickwitService::Janitor]),
)?;

let (indexing_server_mailbox, indexing_server_handle, janitor_service_mailbox) =
spawn_services(
&universe,
cluster,
metastore.clone(),
storage_resolver.clone(),
&config,
runtimes_config,
)
.await?;
)
.await?;

let (indexing_pipeline_handle, merge_pipeline_handle) =
spawn_pipelines(&indexing_server_mailbox, source_config).await?;
spawn_pipelines(indexing_service_handle.mailbox(), source_config).await?;

debug!("wait for indexing statistics");
debug!("wait for indexing to complete");
let statistics = start_statistics_reporting_loop(indexing_pipeline_handle, false).await?;

debug!("indexing completed, tear down actors");
merge_pipeline_handle.quit().await;
universe
.send_exit_with_success(&janitor_service_mailbox)
.await?;
debug!("wait for merges to complete");
wait_for_merges(merge_pipeline_handle).await?;

debug!("indexing completed, tearing down actors");
// TODO: is it really necessary to terminate the indexing service?
// Quitting the universe should be enough.
universe
.send_exit_with_success(&indexing_server_mailbox)
.send_exit_with_success(indexing_service_handle.mailbox())
.await?;
indexing_server_handle.join().await;
indexing_service_handle.join().await;
debug!("quitting universe");
universe.quit().await;
debug!("universe.quit() awaited");

if args.clear_cache {
info!("clearing local cache directory");
Expand Down
1 change: 1 addition & 0 deletions quickwit/quickwit-lambda/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

mod environment;
pub mod indexer;
pub mod logger;
pub mod searcher;
Expand Down
Loading

0 comments on commit 558e2ca

Please sign in to comment.