Skip to content

Commit

Permalink
Rework command for adding a new source to an indexing
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Apr 8, 2022
1 parent 0a5b6da commit 34eb196
Show file tree
Hide file tree
Showing 7 changed files with 231 additions and 150 deletions.
156 changes: 43 additions & 113 deletions quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use anyhow::{bail, Context};
use clap::{arg, ArgMatches, Command};
use itertools::Itertools;
use quickwit_common::uri::Uri;
use quickwit_config::{SourceConfig, SourceParams};
use quickwit_config::SourceConfig;
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::checkpoint::SourceCheckpoint;
use quickwit_metastore::{quickwit_metastore_uri_resolver, IndexMetadata};
use quickwit_storage::load_file;
use serde_json::{Map, Value};
use serde_json::Value;
use tabled::{Table, Tabled};

use crate::{load_quickwit_config, make_table};
Expand All @@ -35,22 +35,20 @@ pub fn build_source_command<'a>() -> Command<'a> {
Command::new("source")
.about("Manages sources.")
.subcommand(
Command::new("add")
.about("Adds a new source.")
Command::new("create")
.about("Adds a new source to an index.")
.args(&[
arg!(--config <CONFIG> "Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX> "ID of the target index"),
arg!(--params <PARAMS> "Parameters for the source formatted as a JSON object passed inline or via a file. Parameters are source-specific. Please, refer to the source's documentation for more details."),
arg!(--source <SOURCE_ID> "ID of the source."),
arg!(--type <TYPE> "Type of the source. Available types are: `file` and `kafka`."),
arg!(--config <CONFIG> "Path to Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX_ID> "ID of the target index"),
arg!(--"source-config" <SOURCE_CONFIG> "Path to source config file. Please, refer to the documentation for more details."),
])
)
.subcommand(
Command::new("delete")
.about("Deletes a source.")
.about("Deletes a source from an index.")
.args(&[
arg!(--config <CONFIG> "Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX> "ID of the target index"),
arg!(--index <INDEX_ID> "ID of the target index"),
arg!(--source <SOURCE_ID> "ID of the source."),
])
)
Expand All @@ -59,29 +57,26 @@ pub fn build_source_command<'a>() -> Command<'a> {
.about("Describes a source.")
.args(&[
arg!(--config <CONFIG> "Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX> "ID of the target index"),
arg!(--index <INDEX_ID> "ID of the target index"),
arg!(--source <SOURCE_ID> "ID of the source."),
])
)
.subcommand(
Command::new("list")
.about("List the sources of an index.")
.about("Lists the sources of an index.")
.args(&[
arg!(--config <CONFIG> "Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX> "ID of the target index"),
arg!(--index <INDEX_ID> "ID of the target index"),
])
)
.arg_required_else_help(true)
}

#[derive(Debug, PartialEq)]
pub struct AddSourceArgs {
pub struct CreateSourceArgs {
pub config_uri: Uri,
pub index_id: String,
pub source_id: String,
pub source_type: String,
/// Can be an inline JSON object or a path to a file holding a JSON object.
pub params: String,
pub source_config_uri: Uri,
}

#[derive(Debug, PartialEq)]
Expand All @@ -106,7 +101,7 @@ pub struct ListSourcesArgs {

#[derive(Debug, PartialEq)]
pub enum SourceCliCommand {
AddSource(AddSourceArgs),
CreateSource(CreateSourceArgs),
DeleteSource(DeleteSourceArgs),
DescribeSource(DescribeSourceArgs),
ListSources(ListSourcesArgs),
Expand All @@ -115,7 +110,7 @@ pub enum SourceCliCommand {
impl SourceCliCommand {
pub async fn execute(self) -> anyhow::Result<()> {
match self {
Self::AddSource(args) => add_source_cli(args).await,
Self::CreateSource(args) => create_source_cli(args).await,
Self::DeleteSource(args) => delete_source_cli(args).await,
Self::DescribeSource(args) => describe_source_cli(args).await,
Self::ListSources(args) => list_sources_cli(args).await,
Expand All @@ -127,15 +122,15 @@ impl SourceCliCommand {
.subcommand()
.ok_or_else(|| anyhow::anyhow!("Failed to parse source subcommand arguments."))?;
match subcommand {
"add" => Self::parse_add_args(submatches).map(Self::AddSource),
"create" => Self::parse_create_args(submatches).map(Self::CreateSource),
"delete" => Self::parse_delete_args(submatches).map(Self::DeleteSource),
"describe" => Self::parse_describe_args(submatches).map(Self::DescribeSource),
"list" => Self::parse_list_args(submatches).map(Self::ListSources),
_ => bail!("Source subcommand `{}` is not implemented.", subcommand),
}
}

fn parse_add_args(matches: &ArgMatches) -> anyhow::Result<AddSourceArgs> {
fn parse_create_args(matches: &ArgMatches) -> anyhow::Result<CreateSourceArgs> {
let config_uri = matches
.value_of("config")
.map(Uri::try_new)
Expand All @@ -144,24 +139,14 @@ impl SourceCliCommand {
.value_of("index")
.map(String::from)
.expect("`index` is a required arg.");
let source_id = matches
.value_of("source")
.map(String::from)
.expect("`source` is a required arg.");
let source_type = matches
.value_of("type")
.map(String::from)
.expect("`type` is a required arg.");
let params = matches
.value_of("params")
.map(String::from)
.expect("`params` is a required arg.");
Ok(AddSourceArgs {
let source_config_uri = matches
.value_of("source-config")
.map(Uri::try_new)
.expect("`source-config` is a required arg.")?;
Ok(CreateSourceArgs {
config_uri,
index_id,
source_id,
source_type,
params,
source_config_uri,
})
}

Expand Down Expand Up @@ -221,27 +206,21 @@ impl SourceCliCommand {
}
}

async fn add_source_cli(args: AddSourceArgs) -> anyhow::Result<()> {
let config = load_quickwit_config(&args.config_uri, None).await?;
async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> {
let qw_config = load_quickwit_config(&args.config_uri, None).await?;
let metastore = quickwit_metastore_uri_resolver()
.resolve(&config.metastore_uri())
.resolve(&qw_config.metastore_uri())
.await?;
let params = sniff_params(&args.params).await?;
let mut source_params_json: Map<String, Value> = Map::new();
source_params_json.insert("source_type".to_string(), Value::String(args.source_type));
source_params_json.insert("params".to_string(), Value::Object(params));
let source_params: SourceParams = serde_json::from_value(Value::Object(source_params_json))?;
let source = SourceConfig {
source_id: args.source_id.clone(),
source_params,
};
source.validate()?;
let source_config_content = load_file(&args.source_config_uri).await?;
let source =
SourceConfig::load(&args.source_config_uri, source_config_content.as_slice()).await?;
let source_id = source.source_id.clone();
check_source_connectivity(&source).await?;

metastore.add_source(&args.index_id, source).await?;
println!(
"Source `{}` successfully created for index `{}`.",
args.source_id, args.index_id
source_id, args.index_id
);
Ok(())
}
Expand Down Expand Up @@ -392,20 +371,6 @@ fn flatten_json(value: Value) -> Vec<(String, Value)> {
acc
}

/// Tries to read a JSON object from a string, assuming the string is an inline JSON object or a
/// path to a file holding a JSON object.
async fn sniff_params(params: &str) -> anyhow::Result<Map<String, Value>> {
if let Ok(Value::Object(values)) = serde_json::from_str(params) {
return Ok(values);
}
let params_uri = Uri::try_new(params)?;
let params_bytes = load_file(&params_uri).await?;
if let Ok(Value::Object(values)) = serde_json::from_slice(params_bytes.as_slice()) {
return Ok(values);
}
bail!("Failed to parse JSON object from `{}`.", params)
}

async fn resolve_index(metastore_uri: &str, index_id: &str) -> anyhow::Result<IndexMetadata> {
let metastore_uri_resolver = quickwit_metastore_uri_resolver();
let metastore = metastore_uri_resolver.resolve(metastore_uri).await?;
Expand All @@ -415,10 +380,8 @@ async fn resolve_index(metastore_uri: &str, index_id: &str) -> anyhow::Result<In

#[cfg(test)]
mod tests {
use std::path::Path;

use quickwit_config::SourceParams;
use quickwit_metastore::checkpoint::{PartitionId, Position};
use quickwit_storage::{quickwit_storage_uri_resolver, PutPayload};
use serde_json::json;

use super::*;
Expand All @@ -441,61 +404,28 @@ mod tests {
);
}

#[tokio::test]
async fn test_sniff_params() {
sniff_params("").await.unwrap_err();
sniff_params("foo").await.unwrap_err();
sniff_params("null").await.unwrap_err();
sniff_params("0").await.unwrap_err();
sniff_params("[]").await.unwrap_err();

assert!(sniff_params(r#"{"foo": 0}"#)
.await
.unwrap()
.contains_key("foo"));

let storage = quickwit_storage_uri_resolver()
.resolve("ram:///tmp")
.unwrap();
let payload: Box<dyn PutPayload> = Box::new(r#"{"bar": 1}"#.to_string().into_bytes());
storage
.put(Path::new("params.json"), payload)
.await
.unwrap();

assert!(sniff_params("ram:///tmp/params.json")
.await
.unwrap()
.contains_key("bar"));
}

#[test]
fn test_parse_add_source_args() {
fn test_parse_create_source_args() {
let app = build_cli().no_binary_name(true);
let matches = app
.try_get_matches_from(vec![
"source",
"add",
"create",
"--index",
"hdfs-logs",
"--source",
"hdfs-logs-source",
"--type",
"kafka",
"--params",
"{}",
"--source-config",
"/source-conf.yaml",
"--config",
"/conf.yaml",
])
.unwrap();
let command = CliCommand::parse_cli_args(&matches).unwrap();
let expected_command = CliCommand::Source(SourceCliCommand::AddSource(AddSourceArgs {
config_uri: Uri::try_new("file:///conf.yaml").unwrap(),
index_id: "hdfs-logs".to_string(),
source_id: "hdfs-logs-source".to_string(),
source_type: "kafka".to_string(),
params: "{}".to_string(),
}));
let expected_command =
CliCommand::Source(SourceCliCommand::CreateSource(CreateSourceArgs {
config_uri: Uri::try_new("file:///conf.yaml").unwrap(),
index_id: "hdfs-logs".to_string(),
source_config_uri: Uri::try_new("file:///source-conf.yaml").unwrap(),
}));
assert_eq!(command, expected_command);
}

Expand Down
47 changes: 47 additions & 0 deletions quickwit-common/src/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::env;
use std::ffi::OsStr;
use std::fmt::Display;
use std::path::{Component, Path, PathBuf};

Expand All @@ -28,6 +29,26 @@ const FILE_PROTOCOL: &str = "file";

const PROTOCOL_SEPARATOR: &str = "://";

#[derive(Debug, PartialEq)]
pub enum Extension {
Json,
Toml,
Unknown(String),
Yaml,
}

impl Extension {
fn maybe_new(extension: &str) -> Option<Self> {
match extension {
"json" => Some(Self::Json),
"toml" => Some(Self::Toml),
"yaml" | "yml" => Some(Self::Yaml),
"" => None,
unknown => Some(Self::Unknown(unknown.to_string())),
}
}
}

/// Encapsulates the URI type.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct Uri {
Expand Down Expand Up @@ -82,6 +103,14 @@ impl Uri {
})
}

/// Returns the URI extension.
pub fn extension(&self) -> Option<Extension> {
Path::new(&self.uri)
.extension()
.and_then(OsStr::to_str)
.and_then(Extension::maybe_new)
}

/// Returns the uri protocol.
pub fn protocol(&self) -> &str {
&self.uri[..self.protocol_idx]
Expand Down Expand Up @@ -225,6 +254,24 @@ mod tests {
"s3://home/homer/docs/../dognuts"
);

assert!(Uri::try_new("s3://").unwrap().extension().is_none());

assert_eq!(
Uri::try_new("s3://config.json")
.unwrap()
.extension()
.unwrap(),
Extension::Json
);

assert_eq!(
Uri::try_new("s3://config.foo")
.unwrap()
.extension()
.unwrap(),
Extension::Unknown("foo".to_string())
);

Ok(())
}
}
10 changes: 10 additions & 0 deletions quickwit-config/resources/tests/source_config/kafka-source.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"source_id": "hdfs-logs-kafka-source",
"source_type": "kafka",
"params": {
"topic": "cloudera-cluster-logs",
"client_params": {
"bootstrap.servers": "host:9092"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
source_id: hdfs-logs-kinesis-source
source_type: kinesis
params:
stream_name: emr-cluster-logs
Loading

0 comments on commit 34eb196

Please sign in to comment.