Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix source path in Lambda distrib #5327

Merged
merged 2 commits into from
Aug 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,10 @@ test-all: docker-compose-up
test-failpoints:
@$(MAKE) -C $(QUICKWIT_SRC) test-failpoints

test-lambda: DOCKER_SERVICES=localstack
test-lambda: docker-compose-up
@$(MAKE) -C $(QUICKWIT_SRC) test-lambda

# This will build and push all custom cross images for cross-compilation.
# You will need to login into Docker Hub with the `quickwit` account.
IMAGE_TAGS = x86_64-unknown-linux-gnu aarch64-unknown-linux-gnu x86_64-unknown-linux-musl aarch64-unknown-linux-musl
Expand Down Expand Up @@ -104,4 +108,3 @@ build-rustdoc:
.PHONY: build-ui
build-ui:
$(MAKE) -C $(QUICKWIT_SRC) build-ui

8 changes: 8 additions & 0 deletions quickwit/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -36,6 +36,14 @@ test-all:
test-failpoints:
cargo nextest run --test failpoints --features fail/failpoints

test-lambda:
AWS_ACCESS_KEY_ID=ignored \
AWS_SECRET_ACCESS_KEY=ignored \
AWS_REGION=us-east-1 \
QW_S3_ENDPOINT=http://localhost:4566 \
QW_S3_FORCE_PATH_STYLE_ACCESS=1 \
cargo nextest run --all-features -p quickwit-lambda --retries 1

# TODO: to be replaced by https://github.com/quickwit-oss/quickwit/issues/237
TARGET ?= x86_64-unknown-linux-gnu
.PHONY: build
Expand Down
3 changes: 3 additions & 0 deletions quickwit/quickwit-lambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@ path = "src/bin/indexer.rs"
name = "searcher"
path = "src/bin/searcher.rs"

[features]
s3-localstack-tests = []

[dependencies]
anyhow = { workspace = true }
aws_lambda_events = "0.15.0"
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-lambda/src/indexer/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,8 +26,8 @@ pub const CONFIGURATION_TEMPLATE: &str = r#"
version: 0.8
node_id: lambda-indexer
cluster_id: lambda-ephemeral
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/${QW_LAMBDA_METASTORE_PREFIX:-index}
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/${QW_LAMBDA_INDEX_PREFIX:-index}
data_dir: /tmp
"#;

Expand Down
1 change: 0 additions & 1 deletion quickwit/quickwit-lambda/src/indexer/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@ async fn indexer_handler(event: LambdaEvent<Value>) -> Result<Value, Error> {
let ingest_res = ingest(IngestArgs {
input_path: payload.uri()?,
input_format: quickwit_config::SourceInputFormat::Json,
overwrite: false,
vrl_script: None,
// TODO: instead of clearing the cache, we use a cache and set its max
// size with indexer_config.split_store_max_num_bytes
Expand Down
17 changes: 2 additions & 15 deletions quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ use quickwit_config::{
load_index_config_from_user_config, ConfigFormat, IndexConfig, NodeConfig, SourceConfig,
SourceInputFormat, SourceParams, TransformConfig,
};
use quickwit_index_management::IndexService;
use quickwit_indexing::actors::{IndexingService, MergePipeline, MergeSchedulerService};
use quickwit_indexing::models::{DetachIndexingPipeline, DetachMergePipeline, SpawnPipeline};
use quickwit_indexing::IndexingPipeline;
Expand Down Expand Up @@ -154,15 +153,14 @@ pub(super) async fn configure_source(
})
}

/// Check if the index exists, creating or overwriting it if necessary
/// Check if the index exists, creating it if necessary
///
/// If the index exists but without the Lambda source ([`LAMBDA_SOURCE_ID`]),
/// the source is added.
pub(super) async fn init_index_if_necessary(
metastore: &mut MetastoreServiceClient,
storage_resolver: &StorageResolver,
default_index_root_uri: &Uri,
overwrite: bool,
source_config: &SourceConfig,
) -> anyhow::Result<IndexMetadata> {
let metadata_result = metastore
Expand All @@ -171,23 +169,12 @@ pub(super) async fn init_index_if_necessary(
let metadata = match metadata_result {
Ok(metadata_resp) => {
let current_metadata = metadata_resp.deserialize_index_metadata()?;
let mut metadata_changed = false;
if overwrite {
info!(index_uid = %current_metadata.index_uid, "overwrite enabled, clearing existing index");
let mut index_service =
IndexService::new(metastore.clone(), storage_resolver.clone());
index_service.clear_index(&INDEX_ID).await?;
metadata_changed = true;
}
if !current_metadata.sources.contains_key(LAMBDA_SOURCE_ID) {
let add_source_request = AddSourceRequest::try_from_source_config(
current_metadata.index_uid.clone(),
source_config,
)?;
metastore.add_source(add_source_request).await?;
metadata_changed = true;
}
if metadata_changed {
metastore
.index_metadata(IndexMetadataRequest::for_index_id(INDEX_ID.clone()))
.await?
Expand Down Expand Up @@ -305,7 +292,7 @@ pub(super) async fn spawn_pipelines(

/// Prune old Lambda file checkpoints if there are too many
///
/// Without pruning checkpoints accumulate indifinitely. This is particularly
/// Without pruning checkpoints accumulate indefinitely. This is particularly
/// problematic when indexing a lot of small files, as the metastore will grow
/// large even for a small index.
///
Expand Down
130 changes: 128 additions & 2 deletions quickwit/quickwit-lambda/src/indexer/ingest/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ use crate::utils::load_node_config;
pub struct IngestArgs {
pub input_path: Uri,
pub input_format: SourceInputFormat,
pub overwrite: bool,
pub vrl_script: Option<String>,
pub clear_cache: bool,
}
Expand All @@ -65,7 +64,6 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
&mut metastore,
&storage_resolver,
&config.default_index_root_uri,
args.overwrite,
&source_config,
)
.await?;
Expand Down Expand Up @@ -123,3 +121,131 @@ pub async fn ingest(args: IngestArgs) -> anyhow::Result<IndexingStatistics> {
}
Ok(statistics)
}

#[cfg(all(test, feature = "s3-localstack-tests"))]
mod tests {
use std::path::PathBuf;
use std::str::FromStr;

use quickwit_common::new_coolid;
use quickwit_storage::StorageResolver;

use super::*;

async fn put_object(
storage_resolver: StorageResolver,
bucket: &str,
prefix: &str,
filename: &str,
data: Vec<u8>,
) -> Uri {
let src_location = format!("s3://{}/{}", bucket, prefix);
let storage_uri = Uri::from_str(&src_location).unwrap();
let storage = storage_resolver.resolve(&storage_uri).await.unwrap();
storage
.put(&PathBuf::from(filename), Box::new(data))
.await
.unwrap();
storage_uri.join(filename).unwrap()
}

#[tokio::test]
async fn test_ingest() -> anyhow::Result<()> {
quickwit_common::setup_logging_for_tests();
let bucket = "quickwit-integration-tests";
let prefix = new_coolid("lambda-ingest-test");
let storage_resolver = StorageResolver::unconfigured();

let index_config = br#"
version: 0.8
index_id: lambda-test
doc_mapping:
field_mappings:
- name: timestamp
type: datetime
input_formats:
- unix_timestamp
fast: true
timestamp_field: timestamp
"#;
let config_uri = put_object(
storage_resolver.clone(),
bucket,
&prefix,
"index-config.yaml",
index_config.to_vec(),
)
.await;

// TODO use dependency injection instead of lazy static for env configs
std::env::set_var("QW_LAMBDA_METASTORE_BUCKET", bucket);
std::env::set_var("QW_LAMBDA_INDEX_BUCKET", bucket);
std::env::set_var("QW_LAMBDA_METASTORE_PREFIX", &prefix);
std::env::set_var("QW_LAMBDA_INDEX_PREFIX", &prefix);
std::env::set_var("QW_LAMBDA_INDEX_CONFIG_URI", config_uri.as_str());
std::env::set_var("QW_LAMBDA_INDEX_ID", "lambda-test");

// first ingestion creates the index metadata
let test_data_1 = br#"{"timestamp": 1724140899, "field1": "value1"}"#;
let test_data_1_uri = put_object(
storage_resolver.clone(),
bucket,
&prefix,
"data.json",
test_data_1.to_vec(),
)
.await;

{
let args = IngestArgs {
input_path: test_data_1_uri.clone(),
input_format: SourceInputFormat::Json,
vrl_script: None,
clear_cache: true,
};
let stats = ingest(args).await?;
assert_eq!(stats.num_invalid_docs, 0);
assert_eq!(stats.num_docs, 1);
}

tokio::time::sleep(std::time::Duration::from_secs(1)).await;

{
// ingesting the same data again is a no-op
let args = IngestArgs {
input_path: test_data_1_uri,
input_format: SourceInputFormat::Json,
vrl_script: None,
clear_cache: true,
};
let stats = ingest(args).await?;
assert_eq!(stats.num_invalid_docs, 0);
assert_eq!(stats.num_docs, 0);
}

{
// second ingestion should not fail when metadata already exists
let test_data = br#"{"timestamp": 1724149900, "field1": "value2"}"#;
let test_data_uri = put_object(
storage_resolver.clone(),
bucket,
&prefix,
"data2.json",
test_data.to_vec(),
)
.await;

let args = IngestArgs {
input_path: test_data_uri,
input_format: SourceInputFormat::Json,
vrl_script: None,
clear_cache: true,
};
let stats = ingest(args).await?;
assert_eq!(stats.num_invalid_docs, 0);
assert_eq!(stats.num_docs, 1);
}

Ok(())
}
}
1 change: 1 addition & 0 deletions quickwit/quickwit-lambda/src/indexer/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ impl IndexerEvent {
IndexerEvent::S3(event) => [
"s3://",
event.records[0].s3.bucket.name.as_ref().unwrap(),
"/",
rdettai marked this conversation as resolved.
Show resolved Hide resolved
event.records[0].s3.object.key.as_ref().unwrap(),
]
.join(""),
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-lambda/src/searcher/environment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
pub(crate) const CONFIGURATION_TEMPLATE: &str = r#"
version: 0.8
node_id: lambda-searcher
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/index#polling_interval=${QW_LAMBDA_SEARCHER_METASTORE_POLLING_INTERVAL_SECONDS:-60}s
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/index
metastore_uri: s3://${QW_LAMBDA_METASTORE_BUCKET}/${QW_LAMBDA_METASTORE_PREFIX:-index}#polling_interval=${QW_LAMBDA_SEARCHER_METASTORE_POLLING_INTERVAL_SECONDS:-60}s
default_index_root_uri: s3://${QW_LAMBDA_INDEX_BUCKET}/${QW_LAMBDA_INDEX_PREFIX:-index}
data_dir: /tmp
searcher:
partial_request_cache_capacity: ${QW_LAMBDA_PARTIAL_REQUEST_CACHE_CAPACITY:-64M}
Expand Down
Loading