From f8590d5f6ec9db7be9ccb4357d34c4b20a444ec0 Mon Sep 17 00:00:00 2001 From: Remi Dettai Date: Mon, 3 Jun 2024 12:10:24 +0200 Subject: [PATCH] Various Lambda fixes (#5016) * Various Lambda fixes on HEAD * Fix clippy lint * Logging improvements --- distribution/lambda/Makefile | 3 + distribution/lambda/cdk/cli.py | 8 ++- .../src/indexer/ingest/helpers.rs | 60 +++++++++++++------ .../quickwit-lambda/src/indexer/ingest/mod.rs | 7 ++- 4 files changed, 55 insertions(+), 23 deletions(-) diff --git a/distribution/lambda/Makefile b/distribution/lambda/Makefile index d6accd9334c..0c4decfce57 100644 --- a/distribution/lambda/Makefile +++ b/distribution/lambda/Makefile @@ -28,8 +28,11 @@ package: then pushd ../../quickwit/ rustc --version + # TODO: remove --disable-optimizations when upgrading to a release containing + # https://github.com/cargo-lambda/cargo-lambda/issues/649 (> 1.2.1) cargo lambda build \ -p quickwit-lambda \ + --disable-optimizations \ --release \ --output-format zip \ --target x86_64-unknown-linux-gnu diff --git a/distribution/lambda/cdk/cli.py b/distribution/lambda/cdk/cli.py index d4264df28fc..847a11fdeb3 100644 --- a/distribution/lambda/cdk/cli.py +++ b/distribution/lambda/cdk/cli.py @@ -255,6 +255,13 @@ def get_logs( last_event_id = "" last_event_found = True start_time = time.time() + while time.time() - start_time < timeout: + describe_resp = client.describe_log_groups(logGroupNamePrefix=log_group_name) + group_names = [group["logGroupName"] for group in describe_resp["logGroups"]] + if log_group_name in group_names: + break + print(f"log group not found, retrying...") + time.sleep(3) while time.time() - start_time < timeout: for page in paginator.paginate( logGroupName=log_group_name, @@ -268,7 +275,6 @@ def get_logs( last_event_id = event["eventId"] yield event["message"] if event["message"].startswith("REPORT"): - lower_time_bound = int(event["timestamp"]) last_event_id = "REPORT" break if last_event_id == "REPORT": diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs index f35931af591..098cc1841ff 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs @@ -42,11 +42,12 @@ use quickwit_indexing::IndexingPipeline; use quickwit_ingest::IngesterPool; use quickwit_janitor::{start_janitor_service, JanitorService}; use quickwit_metastore::{ - CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata, IndexMetadataResponseExt, + AddSourceRequestExt, CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata, + IndexMetadataResponseExt, }; use quickwit_proto::indexing::CpuCapacity; use quickwit_proto::metastore::{ - CreateIndexRequest, IndexMetadataRequest, MetastoreError, MetastoreService, + AddSourceRequest, CreateIndexRequest, IndexMetadataRequest, MetastoreError, MetastoreService, MetastoreServiceClient, ResetSourceCheckpointRequest, }; use quickwit_proto::types::PipelineUid; @@ -60,7 +61,7 @@ use crate::indexer::environment::{ DISABLE_JANITOR, DISABLE_MERGE, INDEX_CONFIG_URI, MAX_CHECKPOINTS, }; -const LAMBDA_SOURCE_ID: &str = "_ingest-lambda-source"; +const LAMBDA_SOURCE_ID: &str = "ingest-lambda-source"; /// The indexing service needs to update its cluster chitchat state so that the control plane is /// aware of the running tasks. We thus create a fake cluster to instantiate the indexing service @@ -154,29 +155,47 @@ pub(super) async fn configure_source( } /// Check if the index exists, creating or overwriting it if necessary +/// +/// If the index exists but without the Lambda source ([`LAMBDA_SOURCE_ID`]), +/// the source is added. pub(super) async fn init_index_if_necessary( metastore: &mut MetastoreServiceClient, storage_resolver: &StorageResolver, default_index_root_uri: &Uri, overwrite: bool, + source_config: &SourceConfig, ) -> anyhow::Result { let metadata_result = metastore .index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone())) .await; let metadata = match metadata_result { - Ok(_) if overwrite => { - info!( - index_id = *INDEX_ID, - "Overwrite enabled, clearing existing index", - ); - let mut index_service = IndexService::new(metastore.clone(), storage_resolver.clone()); - index_service.clear_index(&INDEX_ID).await?; - metastore - .index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone())) - .await? - .deserialize_index_metadata()? + Ok(metadata_resp) => { + let current_metadata = metadata_resp.deserialize_index_metadata()?; + let mut metadata_changed = false; + if overwrite { + info!(index_uid = %current_metadata.index_uid, "overwrite enabled, clearing existing index"); + let mut index_service = + IndexService::new(metastore.clone(), storage_resolver.clone()); + index_service.clear_index(&INDEX_ID).await?; + metadata_changed = true; + } + if !current_metadata.sources.contains_key(LAMBDA_SOURCE_ID) { + let add_source_request = AddSourceRequest::try_from_source_config( + current_metadata.index_uid.clone(), + source_config, + )?; + metastore.add_source(add_source_request).await?; + metadata_changed = true; + } + if metadata_changed { + metastore + .index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone())) + .await? + .deserialize_index_metadata()? + } else { + current_metadata + } } - Ok(metadata_resp) => metadata_resp.deserialize_index_metadata()?, Err(MetastoreError::NotFound(_)) => { info!( index_id = *INDEX_ID, @@ -191,10 +210,13 @@ pub(super) async fn init_index_if_necessary( index_config.index_id, ); } - let create_resp = metastore - .create_index(CreateIndexRequest::try_from_index_config(&index_config)?) - .await?; - info!("index created"); + let create_index_request = CreateIndexRequest::try_from_index_and_source_configs( + &index_config, + std::slice::from_ref(source_config), + )?; + let create_resp = metastore.create_index(create_index_request).await?; + + info!(index_uid = %create_resp.index_uid(), "index created"); create_resp.deserialize_index_metadata()? } Err(e) => bail!(e), diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs index 31c98ab73f9..6faf495b85f 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs @@ -58,17 +58,18 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result { let (config, storage_resolver, mut metastore) = load_node_config(CONFIGURATION_TEMPLATE).await?; + let source_config = + configure_source(args.input_path, args.input_format, args.vrl_script).await?; + let index_metadata = init_index_if_necessary( &mut metastore, &storage_resolver, &config.default_index_root_uri, args.overwrite, + &source_config, ) .await?; - let source_config = - configure_source(args.input_path, args.input_format, args.vrl_script).await?; - let mut services = vec![QuickwitService::Indexer]; if !*DISABLE_JANITOR { services.push(QuickwitService::Janitor);