Skip to content

Commit

Permalink
Replace FileSourceParams path with URI
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 26, 2024
1 parent 6d6fdb1 commit 9933781
Show file tree
Hide file tree
Showing 13 changed files with 139 additions and 147 deletions.
2 changes: 1 addition & 1 deletion quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ mod tests {
source_id: "foo-source".to_string(),
num_pipelines: NonZeroUsize::new(1).unwrap(),
enabled: true,
source_params: SourceParams::file("path/to/file"),
source_params: SourceParams::file_from_str("path/to/file").unwrap(),
transform_config: None,
input_format: SourceInputFormat::Json,
}];
Expand Down
10 changes: 4 additions & 6 deletions quickwit/quickwit-cli/src/tool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ pub fn build_tool_command() -> Command {
pub struct LocalIngestDocsArgs {
pub config_uri: Uri,
pub index_id: IndexId,
pub input_path_opt: Option<PathBuf>,
pub input_path_opt: Option<Uri>,
pub input_format: SourceInputFormat,
pub overwrite: bool,
pub vrl_script: Option<String>,
Expand Down Expand Up @@ -251,9 +251,7 @@ impl ToolCliCommand {
.remove_one::<String>("index")
.expect("`index` should be a required arg.");
let input_path_opt = if let Some(input_path) = matches.remove_one::<String>("input-path") {
Uri::from_str(&input_path)?
.filepath()
.map(|path| path.to_path_buf())
Some(Uri::from_str(&input_path)?)
} else {
None
};
Expand Down Expand Up @@ -410,8 +408,8 @@ pub async fn local_ingest_docs_cli(args: LocalIngestDocsArgs) -> anyhow::Result<
get_resolvers(&config.storage_configs, &config.metastore_configs);
let mut metastore = metastore_resolver.resolve(&config.metastore_uri).await?;

let source_params = if let Some(filepath) = args.input_path_opt.as_ref() {
SourceParams::file(filepath)
let source_params = if let Some(uri) = args.input_path_opt.as_ref() {
SourceParams::file_from_uri(uri.clone())
} else {
SourceParams::stdin()
};
Expand Down
56 changes: 26 additions & 30 deletions quickwit/quickwit-cli/tests/cli.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
mod helpers;

use std::path::Path;
use std::str::FromStr;

use anyhow::Result;
use clap::error::ErrorKind;
Expand All @@ -37,6 +38,7 @@ use quickwit_cli::tool::{
};
use quickwit_common::fs::get_cache_directory_path;
use quickwit_common::rand::append_random_suffix;
use quickwit_common::uri::Uri;
use quickwit_config::{RetentionPolicy, SourceInputFormat, CLI_SOURCE_ID};
use quickwit_metastore::{
ListSplitsRequestExt, MetastoreResolver, MetastoreServiceExt, MetastoreServiceStreamSplitsExt,
Expand All @@ -61,11 +63,11 @@ async fn create_logs_index(test_env: &TestEnv) -> anyhow::Result<()> {
create_index_cli(args).await
}

async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Result<()> {
async fn local_ingest_docs(uri: Uri, test_env: &TestEnv) -> anyhow::Result<()> {
let args = LocalIngestDocsArgs {
config_uri: test_env.resource_files.config.clone(),
index_id: test_env.index_id.clone(),
input_path_opt: Some(input_path.to_path_buf()),
input_path_opt: Some(uri),
input_format: SourceInputFormat::Json,
overwrite: false,
clear_cache: true,
Expand All @@ -74,6 +76,10 @@ async fn local_ingest_docs(input_path: &Path, test_env: &TestEnv) -> anyhow::Res
local_ingest_docs_cli(args).await
}

async fn local_ingest_log_docs(test_env: &TestEnv) -> anyhow::Result<()> {
local_ingest_docs(test_env.resource_files.log_docs.clone(), test_env).await
}

#[test]
fn test_cmd_help() {
let cmd = build_cli();
Expand Down Expand Up @@ -252,14 +258,15 @@ async fn test_ingest_docs_cli() {

// Ensure cache directory is empty.
let cache_directory_path = get_cache_directory_path(&test_env.data_dir_path);
let data_dir_uri = Uri::from_str(test_env.data_dir_path.to_str().unwrap()).unwrap();

assert!(cache_directory_path.read_dir().unwrap().next().is_none());

// Ingest a non-existing file should fail.
let args = LocalIngestDocsArgs {
config_uri: test_env.resource_files.config,
index_id: test_env.index_id,
input_path_opt: Some(test_env.data_dir_path.join("file-does-not-exist.json")),
input_path_opt: Some(data_dir_uri.join("file-does-not-exist.json").unwrap()),
input_format: SourceInputFormat::Json,
overwrite: false,
clear_cache: true,
Expand Down Expand Up @@ -332,9 +339,7 @@ async fn test_cmd_search_aggregation() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let aggregation: Value = json!(
{
Expand Down Expand Up @@ -432,9 +437,7 @@ async fn test_cmd_search_with_snippets() -> Result<()> {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

// search with snippets
let args = SearchIndexArgs {
Expand Down Expand Up @@ -487,9 +490,7 @@ async fn test_search_index_cli() {
sort_by_score: false,
};

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let args = create_search_args("level:info");

Expand Down Expand Up @@ -600,9 +601,7 @@ async fn test_delete_index_cli_dry_run() {
.unwrap();
assert!(metastore.index_exists(&index_id).await.unwrap());

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

// On non-empty index
let args = create_delete_args(true);
Expand All @@ -626,9 +625,7 @@ async fn test_delete_index_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let args = DeleteIndexArgs {
client_args: test_env.default_client_args(),
Expand All @@ -652,9 +649,7 @@ async fn test_garbage_collect_cli_no_grace() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
let index_uid = test_env.index_metadata().await.unwrap().index_uid;
local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let metastore = MetastoreResolver::unconfigured()
.resolve(&test_env.metastore_uri)
Expand Down Expand Up @@ -762,9 +757,7 @@ async fn test_garbage_collect_index_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();
let index_uid = test_env.index_metadata().await.unwrap().index_uid;
local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let refresh_metastore = |metastore| async {
// In this test we rely on the file backed metastore and
Expand Down Expand Up @@ -914,9 +907,7 @@ async fn test_all_local_index() {
.unwrap();
assert!(metadata_file_exists);

local_ingest_docs(test_env.resource_files.log_docs.as_path(), &test_env)
.await
.unwrap();
local_ingest_log_docs(&test_env).await.unwrap();

let query_response = reqwest::get(format!(
"http://127.0.0.1:{}/api/v1/{}/search?query=level:info",
Expand Down Expand Up @@ -970,16 +961,21 @@ async fn test_all_with_s3_localstack_cli() {
test_env.start_server().await.unwrap();
create_logs_index(&test_env).await.unwrap();

let s3_path = upload_test_file(
let s3_uri = upload_test_file(
test_env.storage_resolver.clone(),
test_env.resource_files.log_docs.clone(),
test_env
.resource_files
.log_docs
.filepath()
.unwrap()
.to_path_buf(),
"quickwit-integration-tests",
"sources/",
&append_random_suffix("test-all--cli-s3-localstack"),
)
.await;

local_ingest_docs(&s3_path, &test_env).await.unwrap();
local_ingest_docs(s3_uri, &test_env).await.unwrap();

// Cli search
let args = SearchIndexArgs {
Expand Down
18 changes: 8 additions & 10 deletions quickwit/quickwit-cli/tests/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::borrow::Borrow;
use std::fs;
use std::path::PathBuf;
use std::str::FromStr;
Expand Down Expand Up @@ -114,8 +113,8 @@ pub struct TestResourceFiles {
pub index_config: Uri,
pub index_config_without_uri: Uri,
pub index_config_with_retention: Uri,
pub log_docs: PathBuf,
pub wikipedia_docs: PathBuf,
pub log_docs: Uri,
pub wikipedia_docs: Uri,
}

/// A struct to hold few info about the test environment.
Expand Down Expand Up @@ -269,8 +268,8 @@ pub async fn create_test_env(
index_config: uri_from_path(index_config_path),
index_config_without_uri: uri_from_path(index_config_without_uri_path),
index_config_with_retention: uri_from_path(index_config_with_retention_path),
log_docs: log_docs_path,
wikipedia_docs: wikipedia_docs_path,
log_docs: uri_from_path(log_docs_path),
wikipedia_docs: uri_from_path(wikipedia_docs_path),
};

Ok(TestEnv {
Expand All @@ -297,15 +296,14 @@ pub async fn upload_test_file(
bucket: &str,
prefix: &str,
filename: &str,
) -> PathBuf {
) -> Uri {
let test_data = tokio::fs::read(local_src_path).await.unwrap();
let mut src_location: PathBuf = [r"s3://", bucket, prefix].iter().collect();
let storage_uri = Uri::from_str(src_location.to_string_lossy().borrow()).unwrap();
let src_location = format!("s3://{}/{}", bucket, prefix);
let storage_uri = Uri::from_str(&src_location).unwrap();
let storage = storage_resolver.resolve(&storage_uri).await.unwrap();
storage
.put(&PathBuf::from(filename), Box::new(test_data))
.await
.unwrap();
src_location.push(filename);
src_location
storage_uri.join(filename).unwrap()
}
64 changes: 36 additions & 28 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,9 +20,10 @@
pub(crate) mod serialize;

use std::num::NonZeroUsize;
use std::path::{Path, PathBuf};
use std::path::Path;
use std::str::FromStr;

use anyhow::Context;
use bytes::Bytes;
use quickwit_common::is_false;
use quickwit_common::uri::Uri;
Expand Down Expand Up @@ -230,12 +231,26 @@ pub enum SourceParams {
}

impl SourceParams {
pub fn file<P: AsRef<Path>>(filepath: P) -> Self {
Self::File(FileSourceParams::file(filepath))
pub fn file_from_uri(uri: Uri) -> Self {
Self::File(FileSourceParams {
filepath: Some(uri),
})
}

pub fn file_from_str<P: AsRef<str>>(filepath: P) -> anyhow::Result<Self> {
FileSourceParams::from_str(filepath.as_ref()).map(Self::File)
}

pub fn file_from_path<P: AsRef<Path>>(filepath: P) -> anyhow::Result<Self> {
let path_str = filepath
.as_ref()
.to_str()
.context("failed to convert path to string")?;
Self::file_from_str(path_str)
}

pub fn stdin() -> Self {
Self::File(FileSourceParams::stdin())
Self::File(FileSourceParams { filepath: None })
}

pub fn void() -> Self {
Expand All @@ -250,37 +265,33 @@ pub struct FileSourceParams {
#[schema(value_type = String)]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
#[serde(deserialize_with = "absolute_filepath_from_str")]
pub filepath: Option<PathBuf>, //< If None read from stdin.
#[serde(deserialize_with = "uri_from_str")]
pub filepath: Option<Uri>, //< If None read from stdin.
}

impl FromStr for FileSourceParams {
type Err = anyhow::Error;

fn from_str(filepath: &str) -> anyhow::Result<Self> {
let uri = Uri::from_str(filepath)?;
Ok(Self {
filepath: Some(uri),
})
}
}

/// Deserializing as an URI first to validate the input.
///
/// TODO: we might want to replace `PathBuf` with `Uri` directly in
/// `FileSourceParams`
fn absolute_filepath_from_str<'de, D>(deserializer: D) -> Result<Option<PathBuf>, D::Error>
/// Deserializing as an URI
fn uri_from_str<'de, D>(deserializer: D) -> Result<Option<Uri>, D::Error>
where D: Deserializer<'de> {
let filepath_opt: Option<String> = Deserialize::deserialize(deserializer)?;
if let Some(filepath) = filepath_opt {
let uri = Uri::from_str(&filepath).map_err(D::Error::custom)?;
Ok(Some(PathBuf::from(uri.as_str())))
Ok(Some(uri))
} else {
Ok(None)
}
}

impl FileSourceParams {
pub fn file<P: AsRef<Path>>(filepath: P) -> Self {
FileSourceParams {
filepath: Some(filepath.as_ref().to_path_buf()),
}
}

pub fn stdin() -> Self {
FileSourceParams { filepath: None }
}
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct KafkaSourceParams {
Expand Down Expand Up @@ -809,10 +820,7 @@ mod tests {
"#;
let file_params = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
let uri = Uri::from_str("source-path.json").unwrap();
assert_eq!(
file_params.filepath.unwrap().as_path(),
Path::new(uri.as_str())
);
assert_eq!(file_params.filepath.unwrap(), uri);
}
}

Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-indexing/src/actors/indexing_pipeline.rs
Original file line number Diff line number Diff line change
Expand Up @@ -639,7 +639,7 @@ mod tests {
source_id: "test-source".to_string(),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::file(PathBuf::from(test_file)),
source_params: SourceParams::file_from_str(test_file).unwrap(),
transform_config: None,
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -758,7 +758,7 @@ mod tests {
source_id: "test-source".to_string(),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::file(PathBuf::from(test_file)),
source_params: SourceParams::file_from_str(test_file).unwrap(),
transform_config: None,
input_format: SourceInputFormat::Json,
};
Expand Down Expand Up @@ -965,7 +965,7 @@ mod tests {
source_id: "test-source".to_string(),
num_pipelines: NonZeroUsize::MIN,
enabled: true,
source_params: SourceParams::file(PathBuf::from(test_file)),
source_params: SourceParams::file_from_str(test_file).unwrap(),
transform_config: None,
input_format: SourceInputFormat::Json,
};
Expand Down
Loading

0 comments on commit 9933781

Please sign in to comment.