diff --git a/quickwit/quickwit-cli/src/source.rs b/quickwit/quickwit-cli/src/source.rs index 1a1948fdd99..e2a1c9df94c 100644 --- a/quickwit/quickwit-cli/src/source.rs +++ b/quickwit/quickwit-cli/src/source.rs @@ -744,7 +744,7 @@ mod tests { source_id: "foo-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, - source_params: SourceParams::file("path/to/file"), + source_params: SourceParams::file_from_str("path/to/file").unwrap(), transform_config: None, input_format: SourceInputFormat::Json, }]; diff --git a/quickwit/quickwit-cli/src/tool.rs b/quickwit/quickwit-cli/src/tool.rs index b936cbc4897..c7ab1911205 100644 --- a/quickwit/quickwit-cli/src/tool.rs +++ b/quickwit/quickwit-cli/src/tool.rs @@ -173,7 +173,7 @@ pub fn build_tool_command() -> Command { pub struct LocalIngestDocsArgs { pub config_uri: Uri, pub index_id: IndexId, - pub input_path_opt: Option, + pub input_path_opt: Option, pub input_format: SourceInputFormat, pub overwrite: bool, pub vrl_script: Option, @@ -251,9 +251,7 @@ impl ToolCliCommand { .remove_one::("index") .expect("`index` should be a required arg."); let input_path_opt = if let Some(input_path) = matches.remove_one::("input-path") { - Uri::from_str(&input_path)? - .filepath() - .map(|path| path.to_path_buf()) + Some(Uri::from_str(&input_path)?) } else { None }; @@ -410,8 +408,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result< get_resolvers(&config.storage_configs, &config.metastore_configs); let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?; - let source_params = if let Some(filepath) = args.input_path_opt.as_ref() { - SourceParams::file(filepath) + let source_params = if let Some(uri) = args.input_path_opt.as_ref() { + SourceParams::file_from_uri(uri.clone()) } else { SourceParams::stdin() }; diff --git a/quickwit/quickwit-cli/tests/cli.rs b/quickwit/quickwit-cli/tests/cli.rs index 628c695be78..15c4ff3d8be 100644 --- a/quickwit/quickwit-cli/tests/cli.rs +++ b/quickwit/quickwit-cli/tests/cli.rs @@ -22,6 +22,7 @@ mod helpers; use std::path::Path; +use std::str::FromStr; use anyhow::Result; use clap::error::ErrorKind; @@ -37,6 +38,7 @@ use quickwit_cli::tool::{ }; use quickwit_common::fs::get_cache_directory_path; use quickwit_common::rand::append_random_suffix; +use quickwit_common::uri::Uri; use quickwit_config::{RetentionPolicy, SourceInputFormat, CLI_SOURCE_ID}; use quickwit_metastore::{ ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt, @@ -61,11 +63,11 @@ async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> { create_index_cli(args).await } -async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Result<()> { +async fn local_ingest_docs(uri: Uri, test_env: &TestEnv) -> anyhow::Result<()> { let args = LocalIngestDocsArgs { config_uri: test_env.resource_files.config.clone(), index_id: test_env.index_id.clone(), - input_path_opt: Some(input_path.to_path_buf()), + input_path_opt: Some(uri), input_format: SourceInputFormat::Json, overwrite: false, clear_cache: true, @@ -74,6 +76,10 @@ async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Res local_ingest_docs_cli(args).await } +async fn local_ingest_log_docs(test_env: &TestEnv) -> anyhow::Result<()> { + local_ingest_docs(test_env.resource_files.log_docs.clone(), test_env).await +} + #[test] fn test_cmd_help() { let cmd = build_cli(); @@ -252,6 +258,7 @@ async fn test_ingest_docs_cli() { // Ensure cache directory is empty. let cache_directory_path = get_cache_directory_path(&test_env.data_dir_path); + let data_dir_uri = Uri::from_str(test_env.data_dir_path.to_str().unwrap()).unwrap(); assert!(cache_directory_path.read_dir().unwrap().next().is_none()); @@ -259,7 +266,7 @@ async fn test_ingest_docs_cli() { let args = LocalIngestDocsArgs { config_uri: test_env.resource_files.config, index_id: test_env.index_id, - input_path_opt: Some(test_env.data_dir_path.join("file-does-not-exist.json")), + input_path_opt: Some(data_dir_uri.join("file-does-not-exist.json").unwrap()), input_format: SourceInputFormat::Json, overwrite: false, clear_cache: true, @@ -332,9 +339,7 @@ async fn test_cmd_search_aggregation() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) - .await - .unwrap(); + local_ingest_log_docs(&test_env).await.unwrap(); let aggregation: Value = json!( { @@ -432,9 +437,7 @@ async fn test_cmd_search_with_snippets() -> Result<()> { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) - .await - .unwrap(); + local_ingest_log_docs(&test_env).await.unwrap(); // search with snippets let args = SearchIndexArgs { @@ -487,9 +490,7 @@ async fn test_search_index_cli() { sort_by_score: false, }; - local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) - .await - .unwrap(); + local_ingest_log_docs(&test_env).await.unwrap(); let args = create_search_args("level:info"); @@ -600,9 +601,7 @@ async fn test_delete_index_cli_dry_run() { .unwrap(); assert!(metastore.index_exists(&index_id).await.unwrap()); - local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) - .await - .unwrap(); + local_ingest_log_docs(&test_env).await.unwrap(); // On non-empty index let args = create_delete_args(true); @@ -626,9 +625,7 @@ async fn test_delete_index_cli() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) - .await - .unwrap(); + local_ingest_log_docs(&test_env).await.unwrap(); let args = DeleteIndexArgs { client_args: test_env.default_client_args(), @@ -652,9 +649,7 @@ async fn test_garbage_collect_cli_no_grace() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); let index_uid = test_env.index_metadata().await.unwrap().index_uid; - local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) - .await - .unwrap(); + local_ingest_log_docs(&test_env).await.unwrap(); let metastore = MetastoreResolver::unconfigured() .resolve(&test_env.metastore_uri) @@ -762,9 +757,7 @@ async fn test_garbage_collect_index_cli() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); let index_uid = test_env.index_metadata().await.unwrap().index_uid; - local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) - .await - .unwrap(); + local_ingest_log_docs(&test_env).await.unwrap(); let refresh_metastore = |metastore| async { // In this test we rely on the file backed metastore and @@ -914,9 +907,7 @@ async fn test_all_local_index() { .unwrap(); assert!(metadata_file_exists); - local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env) - .await - .unwrap(); + local_ingest_log_docs(&test_env).await.unwrap(); let query_response = reqwest::get(format!( "http://127.0.0.1:{}/api/v1/{}/search?query=level:info", @@ -970,16 +961,21 @@ async fn test_all_with_s3_localstack_cli() { test_env.start_server().await.unwrap(); create_logs_index(&test_env).await.unwrap(); - let s3_path = upload_test_file( + let s3_uri = upload_test_file( test_env.storage_resolver.clone(), - test_env.resource_files.log_docs.clone(), + test_env + .resource_files + .log_docs + .filepath() + .unwrap() + .to_path_buf(), "quickwit-integration-tests", "sources/", &append_random_suffix("test-all--cli-s3-localstack"), ) .await; - local_ingest_docs(&s3_path, &test_env).await.unwrap(); + local_ingest_docs(s3_uri, &test_env).await.unwrap(); // Cli search let args = SearchIndexArgs { diff --git a/quickwit/quickwit-cli/tests/helpers.rs b/quickwit/quickwit-cli/tests/helpers.rs index 0a52f6b9792..a3e18bc847a 100644 --- a/quickwit/quickwit-cli/tests/helpers.rs +++ b/quickwit/quickwit-cli/tests/helpers.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::borrow::Borrow; use std::fs; use std::path::PathBuf; use std::str::FromStr; @@ -114,8 +113,8 @@ pub struct TestResourceFiles { pub index_config: Uri, pub index_config_without_uri: Uri, pub index_config_with_retention: Uri, - pub log_docs: PathBuf, - pub wikipedia_docs: PathBuf, + pub log_docs: Uri, + pub wikipedia_docs: Uri, } /// A struct to hold few info about the test environment. @@ -269,8 +268,8 @@ pub async fn create_test_env( index_config: uri_from_path(index_config_path), index_config_without_uri: uri_from_path(index_config_without_uri_path), index_config_with_retention: uri_from_path(index_config_with_retention_path), - log_docs: log_docs_path, - wikipedia_docs: wikipedia_docs_path, + log_docs: uri_from_path(log_docs_path), + wikipedia_docs: uri_from_path(wikipedia_docs_path), }; Ok(TestEnv { @@ -297,15 +296,14 @@ pub async fn upload_test_file( bucket: &str, prefix: &str, filename: &str, -) -> PathBuf { +) -> Uri { let test_data = tokio::fs::read(local_src_path).await.unwrap(); - let mut src_location: PathBuf = [r"s3://", bucket, prefix].iter().collect(); - let storage_uri = Uri::from_str(src_location.to_string_lossy().borrow()).unwrap(); + let src_location = format!("s3://{}/{}", bucket, prefix); + let storage_uri = Uri::from_str(&src_location).unwrap(); let storage = storage_resolver.resolve(&storage_uri).await.unwrap(); storage .put(&PathBuf::from(filename), Box::new(test_data)) .await .unwrap(); - src_location.push(filename); - src_location + storage_uri.join(filename).unwrap() } diff --git a/quickwit/quickwit-config/src/source_config/mod.rs b/quickwit/quickwit-config/src/source_config/mod.rs index 2354b608d8e..04eb90ec8a1 100644 --- a/quickwit/quickwit-config/src/source_config/mod.rs +++ b/quickwit/quickwit-config/src/source_config/mod.rs @@ -20,9 +20,10 @@ pub(crate) mod serialize; use std::num::NonZeroUsize; -use std::path::{Path, PathBuf}; +use std::path::Path; use std::str::FromStr; +use anyhow::Context; use bytes::Bytes; use quickwit_common::is_false; use quickwit_common::uri::Uri; @@ -230,12 +231,26 @@ pub enum SourceParams { } impl SourceParams { - pub fn file>(filepath: P) -> Self { - Self::File(FileSourceParams::file(filepath)) + pub fn file_from_uri(uri: Uri) -> Self { + Self::File(FileSourceParams { + filepath: Some(uri), + }) + } + + pub fn file_from_str>(filepath: P) -> anyhow::Result { + FileSourceParams::from_str(filepath.as_ref()).map(Self::File) + } + + pub fn file_from_path>(filepath: P) -> anyhow::Result { + let path_str = filepath + .as_ref() + .to_str() + .context("failed to convert path to string")?; + Self::file_from_str(path_str) } pub fn stdin() -> Self { - Self::File(FileSourceParams::stdin()) + Self::File(FileSourceParams { filepath: None }) } pub fn void() -> Self { @@ -250,37 +265,33 @@ pub struct FileSourceParams { #[schema(value_type = String)] #[serde(skip_serializing_if = "Option::is_none")] #[serde(default)] - #[serde(deserialize_with = "absolute_filepath_from_str")] - pub filepath: Option, //< If None read from stdin. + #[serde(deserialize_with = "uri_from_str")] + pub filepath: Option, //< If None read from stdin. +} + +impl FromStr for FileSourceParams { + type Err = anyhow::Error; + + fn from_str(filepath: &str) -> anyhow::Result { + let uri = Uri::from_str(filepath)?; + Ok(Self { + filepath: Some(uri), + }) + } } -/// Deserializing as an URI first to validate the input. -/// -/// TODO: we might want to replace `PathBuf` with `Uri` directly in -/// `FileSourceParams` -fn absolute_filepath_from_str<'de, D>(deserializer: D) -> Result, D::Error> +/// Deserializing as an URI +fn uri_from_str<'de, D>(deserializer: D) -> Result, D::Error> where D: Deserializer<'de> { let filepath_opt: Option = Deserialize::deserialize(deserializer)?; if let Some(filepath) = filepath_opt { let uri = Uri::from_str(&filepath).map_err(D::Error::custom)?; - Ok(Some(PathBuf::from(uri.as_str()))) + Ok(Some(uri)) } else { Ok(None) } } -impl FileSourceParams { - pub fn file>(filepath: P) -> Self { - FileSourceParams { - filepath: Some(filepath.as_ref().to_path_buf()), - } - } - - pub fn stdin() -> Self { - FileSourceParams { filepath: None } - } -} - #[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)] #[serde(deny_unknown_fields)] pub struct KafkaSourceParams { @@ -809,10 +820,7 @@ mod tests { "#; let file_params = serde_yaml::from_str::(yaml).unwrap(); let uri = Uri::from_str("source-path.json").unwrap(); - assert_eq!( - file_params.filepath.unwrap().as_path(), - Path::new(uri.as_str()) - ); + assert_eq!(file_params.filepath.unwrap(), uri); } } diff --git a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs index 0c54c44fd5d..f6951e464bd 100644 --- a/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs +++ b/quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs @@ -639,7 +639,7 @@ mod tests { source_id: "test-source".to_string(), num_pipelines: NonZeroUsize::MIN, enabled: true, - source_params: SourceParams::file(PathBuf::from(test_file)), + source_params: SourceParams::file_from_str(test_file).unwrap(), transform_config: None, input_format: SourceInputFormat::Json, }; @@ -758,7 +758,7 @@ mod tests { source_id: "test-source".to_string(), num_pipelines: NonZeroUsize::MIN, enabled: true, - source_params: SourceParams::file(PathBuf::from(test_file)), + source_params: SourceParams::file_from_str(test_file).unwrap(), transform_config: None, input_format: SourceInputFormat::Json, }; @@ -965,7 +965,7 @@ mod tests { source_id: "test-source".to_string(), num_pipelines: NonZeroUsize::MIN, enabled: true, - source_params: SourceParams::file(PathBuf::from(test_file)), + source_params: SourceParams::file_from_str(test_file).unwrap(), transform_config: None, input_format: SourceInputFormat::Json, }; diff --git a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs index a6eb686fc94..250689164e3 100644 --- a/quickwit/quickwit-indexing/src/source/doc_file_reader.rs +++ b/quickwit/quickwit-indexing/src/source/doc_file_reader.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::ffi::OsStr; use std::io; use std::path::Path; @@ -76,18 +75,18 @@ pub struct DocFileReader { } impl DocFileReader { - pub async fn from_path( + pub async fn from_uri( storage_resolver: &StorageResolver, - filepath: &Path, + uri: &Uri, offset: usize, ) -> anyhow::Result { - let (dir_uri, file_name) = dir_and_filename(filepath)?; + let (dir_uri, file_name) = dir_and_filename(uri)?; let storage = storage_resolver.resolve(&dir_uri).await?; let file_size = storage.file_num_bytes(file_name).await?.try_into().unwrap(); // If it's a gzip file, we can't seek to a specific offset. `SkipReader` // starts from the beginning of the file, decompresses and skips the // first `offset` bytes. - let reader = if filepath.extension() == Some(OsStr::new("gz")) { + let reader = if uri.extension() == Some("gz") { let stream = storage.get_slice_stream(file_name, 0..file_size).await?; let decompressed_stream = Box::new(GzipDecoder::new(BufReader::new(stream))); DocFileReader { @@ -130,17 +129,14 @@ impl DocFileReader { } } -pub(crate) fn dir_and_filename(filepath: &Path) -> anyhow::Result<(Uri, &Path)> { +pub(crate) fn dir_and_filename(filepath: &Uri) -> anyhow::Result<(Uri, &Path)> { let dir_uri: Uri = filepath .parent() - .context("Parent directory could not be resolved")? - .to_str() - .context("Path cannot be turned to string")? - .parse()?; + .context("Parent directory could not be resolved")?; let file_name = filepath .file_name() .context("Path does not appear to be a file")?; - Ok((dir_uri, file_name.as_ref())) + Ok((dir_uri, file_name)) } #[cfg(test)] @@ -205,7 +201,7 @@ pub mod file_test_helpers { #[cfg(test)] mod tests { use std::io::Cursor; - use std::path::PathBuf; + use std::str::FromStr; use file_test_helpers::generate_index_doc_file; @@ -259,10 +255,10 @@ mod tests { } } - async fn aux_test_full_read(file: impl Into, expected_lines: usize) { + async fn aux_test_full_read(file: impl AsRef, expected_lines: usize) { let storage_resolver = StorageResolver::for_test(); - let file_path = file.into(); - let mut doc_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0) + let uri = Uri::from_str(file.as_ref()).unwrap(); + let mut doc_reader = DocFileReader::from_uri(&storage_resolver, &uri, 0) .await .unwrap(); let mut parsed_lines = 0; @@ -283,14 +279,14 @@ mod tests { } async fn aux_test_resumed_read( - file: impl Into, + file: impl AsRef, expected_lines: usize, stop_at_line: usize, ) { let storage_resolver = StorageResolver::for_test(); - let file_path = file.into(); + let uri = Uri::from_str(file.as_ref()).unwrap(); // read the first part of the file - let mut first_part_reader = DocFileReader::from_path(&storage_resolver, &file_path, 0) + let mut first_part_reader = DocFileReader::from_uri(&storage_resolver, &uri, 0) .await .unwrap(); let mut resume_offset = 0; @@ -307,7 +303,7 @@ mod tests { } // read the second part of the file let mut second_part_reader = - DocFileReader::from_path(&storage_resolver, &file_path, resume_offset) + DocFileReader::from_uri(&storage_resolver, &uri, resume_offset) .await .unwrap(); while let Some(rec) = second_part_reader.next_record().await.unwrap() { @@ -320,18 +316,20 @@ mod tests { #[tokio::test] async fn test_resume_read() { let dummy_doc_file = generate_index_doc_file(false, 1000).await; - aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await; - aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await; - aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await; - aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await; + let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap(); + aux_test_resumed_read(dummy_doc_file_uri, 1000, 1).await; + aux_test_resumed_read(dummy_doc_file_uri, 1000, 40).await; + aux_test_resumed_read(dummy_doc_file_uri, 1000, 999).await; + aux_test_resumed_read(dummy_doc_file_uri, 1000, 1000).await; } #[tokio::test] async fn test_resume_read_gz() { let dummy_doc_file = generate_index_doc_file(true, 1000).await; - aux_test_resumed_read(dummy_doc_file.path(), 1000, 1).await; - aux_test_resumed_read(dummy_doc_file.path(), 1000, 40).await; - aux_test_resumed_read(dummy_doc_file.path(), 1000, 999).await; - aux_test_resumed_read(dummy_doc_file.path(), 1000, 1000).await; + let dummy_doc_file_uri = dummy_doc_file.path().to_str().unwrap(); + aux_test_resumed_read(dummy_doc_file_uri, 1000, 1).await; + aux_test_resumed_read(dummy_doc_file_uri, 1000, 40).await; + aux_test_resumed_read(dummy_doc_file_uri, 1000, 999).await; + aux_test_resumed_read(dummy_doc_file_uri, 1000, 1000).await; } } diff --git a/quickwit/quickwit-indexing/src/source/file_source.rs b/quickwit/quickwit-indexing/src/source/file_source.rs index ac00147bb29..2c92c6674b4 100644 --- a/quickwit/quickwit-indexing/src/source/file_source.rs +++ b/quickwit/quickwit-indexing/src/source/file_source.rs @@ -17,7 +17,6 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::borrow::Borrow; use std::fmt; use std::time::Duration; @@ -122,7 +121,7 @@ impl TypedSourceFactory for FileSourceFactory { source_runtime: SourceRuntime, params: FileSourceParams, ) -> anyhow::Result { - let Some(filepath) = ¶ms.filepath else { + let Some(uri) = ¶ms.filepath else { return Ok(FileSource { source_id: source_runtime.source_id().to_string(), reader: DocFileReader::from_stdin(), @@ -131,7 +130,7 @@ impl TypedSourceFactory for FileSourceFactory { }); }; - let partition_id = PartitionId::from(filepath.to_string_lossy().borrow()); + let partition_id = PartitionId::from(uri.as_str()); let checkpoint = source_runtime.fetch_checkpoint().await?; let offset = checkpoint .position_for_partition(&partition_id) @@ -142,10 +141,7 @@ impl TypedSourceFactory for FileSourceFactory { }) .transpose()? .unwrap_or(0); - - let reader = - DocFileReader::from_path(&source_runtime.storage_resolver, filepath, offset).await?; - + let reader = DocFileReader::from_uri(&source_runtime.storage_resolver, uri, offset).await?; Ok(FileSource { source_id: source_runtime.source_id().to_string(), reader, @@ -160,10 +156,11 @@ impl TypedSourceFactory for FileSourceFactory { #[cfg(test)] mod tests { - use std::borrow::Borrow; use std::num::NonZeroUsize; + use std::str::FromStr; use quickwit_actors::{Command, Universe}; + use quickwit_common::uri::Uri; use quickwit_config::{SourceConfig, SourceInputFormat, SourceParams}; use quickwit_metastore::checkpoint::{PartitionId, SourceCheckpointDelta}; use quickwit_proto::types::{IndexUid, Position}; @@ -186,9 +183,9 @@ mod tests { let universe = Universe::with_accelerated_time(); let (doc_processor_mailbox, indexer_inbox) = universe.create_test_mailbox(); let params = if gzip { - FileSourceParams::file("data/test_corpus.json.gz") + FileSourceParams::from_str("data/test_corpus.json.gz").unwrap() } else { - FileSourceParams::file("data/test_corpus.json") + FileSourceParams::from_str("data/test_corpus.json").unwrap() }; let source_config = SourceConfig { source_id: "test-file-source".to_string(), @@ -237,13 +234,8 @@ mod tests { let universe = Universe::with_accelerated_time(); let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let temp_file = generate_dummy_doc_file(gzip, 20_000).await; - let params = FileSourceParams::file(temp_file.path()); - let filepath = params - .filepath - .as_ref() - .unwrap() - .to_string_lossy() - .to_string(); + let filepath = temp_file.path().to_str().unwrap(); + let params = FileSourceParams::from_str(filepath).unwrap(); let source_config = SourceConfig { source_id: "test-file-source".to_string(), @@ -278,11 +270,12 @@ mod tests { let batch1 = indexer_msgs[0].downcast_ref::().unwrap(); let batch2 = indexer_msgs[1].downcast_ref::().unwrap(); let command = indexer_msgs[2].downcast_ref::().unwrap(); + let uri = Uri::from_str(filepath).unwrap(); assert_eq!( format!("{:?}", &batch1.checkpoint_delta), format!( "∆({}:{})", - filepath, "(00000000000000000000..00000000000000500010]" + uri, "(00000000000000000000..00000000000000500010]" ) ); assert_eq!( @@ -314,8 +307,8 @@ mod tests { let universe = Universe::with_accelerated_time(); let (doc_processor_mailbox, doc_processor_inbox) = universe.create_test_mailbox(); let temp_file = generate_index_doc_file(gzip, 100).await; - let temp_file_path = temp_file.path(); - let params = FileSourceParams::file(temp_file_path); + let temp_file_path = temp_file.path().to_str().unwrap(); + let params = FileSourceParams::from_str(temp_file_path).unwrap(); let source_config = SourceConfig { source_id: "test-file-source".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), @@ -324,7 +317,7 @@ mod tests { transform_config: None, input_format: SourceInputFormat::Json, }; - let partition_id = PartitionId::from(temp_file_path.to_string_lossy().borrow()); + let partition_id = PartitionId::from(temp_file_path); let source_checkpoint_delta = SourceCheckpointDelta::from_partition_delta( partition_id, Position::Beginning, diff --git a/quickwit/quickwit-indexing/src/source/mod.rs b/quickwit/quickwit-indexing/src/source/mod.rs index ddd4b00aff7..98f28a5e6e9 100644 --- a/quickwit/quickwit-indexing/src/source/mod.rs +++ b/quickwit/quickwit-indexing/src/source/mod.rs @@ -677,7 +677,7 @@ mod tests { source_id: "file".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, - source_params: SourceParams::file("file-does-not-exist.json"), + source_params: SourceParams::file_from_str("file-does-not-exist.json").unwrap(), transform_config: None, input_format: SourceInputFormat::Json, }; @@ -692,7 +692,7 @@ mod tests { source_id: "file".to_string(), num_pipelines: NonZeroUsize::new(1).unwrap(), enabled: true, - source_params: SourceParams::file("data/test_corpus.json"), + source_params: SourceParams::file_from_str("data/test_corpus.json").unwrap(), transform_config: None, input_format: SourceInputFormat::Json, }; diff --git a/quickwit/quickwit-lambda/src/indexer/handler.rs b/quickwit/quickwit-lambda/src/indexer/handler.rs index 1282b0e54a4..c2027b25955 100644 --- a/quickwit/quickwit-lambda/src/indexer/handler.rs +++ b/quickwit/quickwit-lambda/src/indexer/handler.rs @@ -34,7 +34,7 @@ async fn indexer_handler(event: LambdaEvent) -> Result { let payload = serde_json::from_value::(event.payload)?; let ingest_res = ingest(IngestArgs { - input_path: payload.uri(), + input_path: payload.uri()?, input_format: quickwit_config::SourceInputFormat::Json, overwrite: false, vrl_script: None, diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs index 098cc1841ff..d168c072ee0 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/helpers.rs @@ -19,7 +19,7 @@ use std::collections::HashSet; use std::num::NonZeroUsize; -use std::path::{Path, PathBuf}; +use std::path::Path; use anyhow::{bail, Context}; use chitchat::transport::ChannelTransport; @@ -138,12 +138,12 @@ pub(super) async fn send_telemetry() { /// Convert the incomming file path to a source config pub(super) async fn configure_source( - input_path: PathBuf, + input_uri: Uri, input_format: SourceInputFormat, vrl_script: Option, ) -> anyhow::Result { let transform_config = vrl_script.map(|vrl_script| TransformConfig::new(vrl_script, None)); - let source_params = SourceParams::file(input_path.clone()); + let source_params = SourceParams::file_from_uri(input_uri); Ok(SourceConfig { source_id: LAMBDA_SOURCE_ID.to_owned(), num_pipelines: NonZeroUsize::new(1).expect("1 is always non-zero."), diff --git a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs index 6faf495b85f..13dd7f9d1b5 100644 --- a/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs +++ b/quickwit/quickwit-lambda/src/indexer/ingest/mod.rs @@ -20,7 +20,6 @@ mod helpers; use std::collections::HashSet; -use std::path::PathBuf; use anyhow::bail; use helpers::{ @@ -31,6 +30,7 @@ use quickwit_actors::Universe; use quickwit_cli::start_actor_runtimes; use quickwit_cli::tool::start_statistics_reporting_loop; use quickwit_common::runtimes::RuntimesConfig; +use quickwit_common::uri::Uri; use quickwit_config::service::QuickwitService; use quickwit_config::SourceInputFormat; use quickwit_index_management::clear_cache_directory; @@ -43,7 +43,7 @@ use crate::utils::load_node_config; #[derive(Debug, Eq, PartialEq)] pub struct IngestArgs { - pub input_path: PathBuf, + pub input_path: Uri, pub input_format: SourceInputFormat, pub overwrite: bool, pub vrl_script: Option, diff --git a/quickwit/quickwit-lambda/src/indexer/model.rs b/quickwit/quickwit-lambda/src/indexer/model.rs index fe6ae14aea4..1b3c8a9a803 100644 --- a/quickwit/quickwit-lambda/src/indexer/model.rs +++ b/quickwit/quickwit-lambda/src/indexer/model.rs @@ -17,9 +17,10 @@ // You should have received a copy of the GNU Affero General Public License // along with this program. If not, see . -use std::path::PathBuf; +use std::str::FromStr; use aws_lambda_events::event::s3::S3Event; +use quickwit_common::uri::Uri; use serde::{Deserialize, Serialize}; #[derive(Clone, Debug, Deserialize, Serialize)] @@ -31,17 +32,17 @@ pub enum IndexerEvent { } impl IndexerEvent { - pub fn uri(&self) -> PathBuf { - match &self { - IndexerEvent::Custom { source_uri } => PathBuf::from(source_uri), + pub fn uri(&self) -> anyhow::Result { + let path = match self { + IndexerEvent::Custom { source_uri } => source_uri.clone(), IndexerEvent::S3(event) => [ "s3://", event.records[0].s3.bucket.name.as_ref().unwrap(), event.records[0].s3.object.key.as_ref().unwrap(), ] - .iter() - .collect(), - } + .join(""), + }; + Uri::from_str(&path) } } @@ -58,14 +59,14 @@ mod tests { }); let parsed_cust_event: IndexerEvent = serde_json::from_value(cust_event).unwrap(); assert_eq!( - parsed_cust_event.uri(), - PathBuf::from("s3://quickwit-test/test.json"), + parsed_cust_event.uri().unwrap(), + Uri::from_str("s3://quickwit-test/test.json").unwrap(), ); } #[test] fn test_s3_event_uri() { - let cust_event = json!({ + let s3_event = json!({ "Records": [ { "eventVersion": "2.0", @@ -103,10 +104,10 @@ mod tests { } ] }); - let parsed_cust_event: IndexerEvent = serde_json::from_value(cust_event).unwrap(); + let s3_event: IndexerEvent = serde_json::from_value(s3_event).unwrap(); assert_eq!( - parsed_cust_event.uri(), - PathBuf::from("s3://quickwit-test/test.json"), + s3_event.uri().unwrap(), + Uri::from_str("s3://quickwit-test/test.json").unwrap(), ); } }