Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Rework command for adding a new source to an indexing #1270

Merged
merged 1 commit into from
Apr 11, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
156 changes: 43 additions & 113 deletions quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ use anyhow::{bail, Context};
use clap::{arg, ArgMatches, Command};
use itertools::Itertools;
use quickwit_common::uri::Uri;
use quickwit_config::{SourceConfig, SourceParams};
use quickwit_config::SourceConfig;
use quickwit_indexing::check_source_connectivity;
use quickwit_metastore::checkpoint::SourceCheckpoint;
use quickwit_metastore::{quickwit_metastore_uri_resolver, IndexMetadata};
use quickwit_storage::load_file;
use serde_json::{Map, Value};
use serde_json::Value;
use tabled::{Table, Tabled};

use crate::{load_quickwit_config, make_table};
Expand All @@ -35,22 +35,20 @@ pub fn build_source_command<'a>() -> Command<'a> {
Command::new("source")
.about("Manages sources.")
.subcommand(
Command::new("add")
.about("Adds a new source.")
Command::new("create")
.about("Adds a new source to an index.")
guilload marked this conversation as resolved.
Show resolved Hide resolved
.args(&[
arg!(--config <CONFIG> "Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX> "ID of the target index"),
arg!(--params <PARAMS> "Parameters for the source formatted as a JSON object passed inline or via a file. Parameters are source-specific. Please, refer to the source's documentation for more details."),
arg!(--source <SOURCE_ID> "ID of the source."),
arg!(--type <TYPE> "Type of the source. Available types are: `file` and `kafka`."),
arg!(--config <CONFIG> "Path to Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX_ID> "ID of the target index"),
guilload marked this conversation as resolved.
Show resolved Hide resolved
arg!(--"source-config" <SOURCE_CONFIG> "Path to source config file. Please, refer to the documentation for more details."),
])
)
.subcommand(
Command::new("delete")
.about("Deletes a source.")
.about("Deletes a source from an index.")
.args(&[
arg!(--config <CONFIG> "Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX> "ID of the target index"),
arg!(--index <INDEX_ID> "ID of the target index"),
arg!(--source <SOURCE_ID> "ID of the source."),
])
)
Expand All @@ -59,29 +57,26 @@ pub fn build_source_command<'a>() -> Command<'a> {
.about("Describes a source.")
.args(&[
arg!(--config <CONFIG> "Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX> "ID of the target index"),
arg!(--index <INDEX_ID> "ID of the target index"),
arg!(--source <SOURCE_ID> "ID of the source."),
])
)
.subcommand(
Command::new("list")
.about("List the sources of an index.")
.about("Lists the sources of an index.")
.args(&[
arg!(--config <CONFIG> "Quickwit config file").env("QW_CONFIG"),
arg!(--index <INDEX> "ID of the target index"),
arg!(--index <INDEX_ID> "ID of the target index"),
])
)
.arg_required_else_help(true)
}

#[derive(Debug, PartialEq)]
pub struct AddSourceArgs {
pub struct CreateSourceArgs {
pub config_uri: Uri,
pub index_id: String,
pub source_id: String,
pub source_type: String,
/// Can be an inline JSON object or a path to a file holding a JSON object.
pub params: String,
pub source_config_uri: Uri,
}

#[derive(Debug, PartialEq)]
Expand All @@ -106,7 +101,7 @@ pub struct ListSourcesArgs {

#[derive(Debug, PartialEq)]
pub enum SourceCliCommand {
AddSource(AddSourceArgs),
CreateSource(CreateSourceArgs),
DeleteSource(DeleteSourceArgs),
DescribeSource(DescribeSourceArgs),
ListSources(ListSourcesArgs),
Expand All @@ -115,7 +110,7 @@ pub enum SourceCliCommand {
impl SourceCliCommand {
pub async fn execute(self) -> anyhow::Result<()> {
match self {
Self::AddSource(args) => add_source_cli(args).await,
Self::CreateSource(args) => create_source_cli(args).await,
Self::DeleteSource(args) => delete_source_cli(args).await,
Self::DescribeSource(args) => describe_source_cli(args).await,
Self::ListSources(args) => list_sources_cli(args).await,
Expand All @@ -127,15 +122,15 @@ impl SourceCliCommand {
.subcommand()
.ok_or_else(|| anyhow::anyhow!("Failed to parse source subcommand arguments."))?;
match subcommand {
"add" => Self::parse_add_args(submatches).map(Self::AddSource),
"create" => Self::parse_create_args(submatches).map(Self::CreateSource),
"delete" => Self::parse_delete_args(submatches).map(Self::DeleteSource),
"describe" => Self::parse_describe_args(submatches).map(Self::DescribeSource),
"list" => Self::parse_list_args(submatches).map(Self::ListSources),
_ => bail!("Source subcommand `{}` is not implemented.", subcommand),
}
}

fn parse_add_args(matches: &ArgMatches) -> anyhow::Result<AddSourceArgs> {
fn parse_create_args(matches: &ArgMatches) -> anyhow::Result<CreateSourceArgs> {
let config_uri = matches
.value_of("config")
.map(Uri::try_new)
Expand All @@ -144,24 +139,14 @@ impl SourceCliCommand {
.value_of("index")
.map(String::from)
.expect("`index` is a required arg.");
let source_id = matches
.value_of("source")
.map(String::from)
.expect("`source` is a required arg.");
let source_type = matches
.value_of("type")
.map(String::from)
.expect("`type` is a required arg.");
let params = matches
.value_of("params")
.map(String::from)
.expect("`params` is a required arg.");
Ok(AddSourceArgs {
let source_config_uri = matches
.value_of("source-config")
.map(Uri::try_new)
.expect("`source-config` is a required arg.")?;
Ok(CreateSourceArgs {
config_uri,
index_id,
source_id,
source_type,
params,
source_config_uri,
})
}

Expand Down Expand Up @@ -221,27 +206,21 @@ impl SourceCliCommand {
}
}

async fn add_source_cli(args: AddSourceArgs) -> anyhow::Result<()> {
let config = load_quickwit_config(&args.config_uri, None).await?;
async fn create_source_cli(args: CreateSourceArgs) -> anyhow::Result<()> {
let qw_config = load_quickwit_config(&args.config_uri, None).await?;
let metastore = quickwit_metastore_uri_resolver()
.resolve(&config.metastore_uri())
.resolve(&qw_config.metastore_uri())
.await?;
let params = sniff_params(&args.params).await?;
let mut source_params_json: Map<String, Value> = Map::new();
source_params_json.insert("source_type".to_string(), Value::String(args.source_type));
source_params_json.insert("params".to_string(), Value::Object(params));
let source_params: SourceParams = serde_json::from_value(Value::Object(source_params_json))?;
let source = SourceConfig {
source_id: args.source_id.clone(),
source_params,
};
source.validate()?;
let source_config_content = load_file(&args.source_config_uri).await?;
let source =
SourceConfig::load(&args.source_config_uri, source_config_content.as_slice()).await?;
let source_id = source.source_id.clone();
check_source_connectivity(&source).await?;

metastore.add_source(&args.index_id, source).await?;
println!(
"Source `{}` successfully created for index `{}`.",
args.source_id, args.index_id
source_id, args.index_id
);
Ok(())
}
Expand Down Expand Up @@ -392,20 +371,6 @@ fn flatten_json(value: Value) -> Vec<(String, Value)> {
acc
}

/// Tries to read a JSON object from a string, assuming the string is an inline JSON object or a
/// path to a file holding a JSON object.
async fn sniff_params(params: &str) -> anyhow::Result<Map<String, Value>> {
if let Ok(Value::Object(values)) = serde_json::from_str(params) {
return Ok(values);
}
let params_uri = Uri::try_new(params)?;
let params_bytes = load_file(&params_uri).await?;
if let Ok(Value::Object(values)) = serde_json::from_slice(params_bytes.as_slice()) {
return Ok(values);
}
bail!("Failed to parse JSON object from `{}`.", params)
}

async fn resolve_index(metastore_uri: &str, index_id: &str) -> anyhow::Result<IndexMetadata> {
let metastore_uri_resolver = quickwit_metastore_uri_resolver();
let metastore = metastore_uri_resolver.resolve(metastore_uri).await?;
Expand All @@ -415,10 +380,8 @@ async fn resolve_index(metastore_uri: &str, index_id: &str) -> anyhow::Result<In

#[cfg(test)]
mod tests {
use std::path::Path;

use quickwit_config::SourceParams;
use quickwit_metastore::checkpoint::{PartitionId, Position};
use quickwit_storage::{quickwit_storage_uri_resolver, PutPayload};
use serde_json::json;

use super::*;
Expand All @@ -441,61 +404,28 @@ mod tests {
);
}

#[tokio::test]
async fn test_sniff_params() {
sniff_params("").await.unwrap_err();
sniff_params("foo").await.unwrap_err();
sniff_params("null").await.unwrap_err();
sniff_params("0").await.unwrap_err();
sniff_params("[]").await.unwrap_err();

assert!(sniff_params(r#"{"foo": 0}"#)
.await
.unwrap()
.contains_key("foo"));

let storage = quickwit_storage_uri_resolver()
.resolve("ram:///tmp")
.unwrap();
let payload: Box<dyn PutPayload> = Box::new(r#"{"bar": 1}"#.to_string().into_bytes());
storage
.put(Path::new("params.json"), payload)
.await
.unwrap();

assert!(sniff_params("ram:///tmp/params.json")
.await
.unwrap()
.contains_key("bar"));
}

#[test]
fn test_parse_add_source_args() {
fn test_parse_create_source_args() {
let app = build_cli().no_binary_name(true);
let matches = app
.try_get_matches_from(vec![
"source",
"add",
"create",
"--index",
"hdfs-logs",
"--source",
"hdfs-logs-source",
"--type",
"kafka",
"--params",
"{}",
"--source-config",
"/source-conf.yaml",
"--config",
"/conf.yaml",
])
.unwrap();
let command = CliCommand::parse_cli_args(&matches).unwrap();
let expected_command = CliCommand::Source(SourceCliCommand::AddSource(AddSourceArgs {
config_uri: Uri::try_new("file:///conf.yaml").unwrap(),
index_id: "hdfs-logs".to_string(),
source_id: "hdfs-logs-source".to_string(),
source_type: "kafka".to_string(),
params: "{}".to_string(),
}));
let expected_command =
CliCommand::Source(SourceCliCommand::CreateSource(CreateSourceArgs {
config_uri: Uri::try_new("file:///conf.yaml").unwrap(),
index_id: "hdfs-logs".to_string(),
source_config_uri: Uri::try_new("file:///source-conf.yaml").unwrap(),
}));
assert_eq!(command, expected_command);
}

Expand Down
47 changes: 47 additions & 0 deletions quickwit-common/src/uri.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::env;
use std::ffi::OsStr;
use std::fmt::Display;
use std::path::{Component, Path, PathBuf};

Expand All @@ -28,6 +29,26 @@ const FILE_PROTOCOL: &str = "file";

const PROTOCOL_SEPARATOR: &str = "://";

#[derive(Debug, PartialEq)]
pub enum Extension {
guilload marked this conversation as resolved.
Show resolved Hide resolved
Json,
Toml,
Unknown(String),
Yaml,
}

impl Extension {
fn maybe_new(extension: &str) -> Option<Self> {
match extension {
"json" => Some(Self::Json),
"toml" => Some(Self::Toml),
"yaml" | "yml" => Some(Self::Yaml),
"" => None,
unknown => Some(Self::Unknown(unknown.to_string())),
}
}
}

/// Encapsulates the URI type.
#[derive(Debug, PartialEq, Eq, Hash, Clone)]
pub struct Uri {
Expand Down Expand Up @@ -82,6 +103,14 @@ impl Uri {
})
}

/// Returns the URI extension.
pub fn extension(&self) -> Option<Extension> {
Path::new(&self.uri)
.extension()
.and_then(OsStr::to_str)
.and_then(Extension::maybe_new)
}

/// Returns the uri protocol.
pub fn protocol(&self) -> &str {
&self.uri[..self.protocol_idx]
Expand Down Expand Up @@ -225,6 +254,24 @@ mod tests {
"s3://home/homer/docs/../dognuts"
);

assert!(Uri::try_new("s3://").unwrap().extension().is_none());

assert_eq!(
Uri::try_new("s3://config.json")
.unwrap()
.extension()
.unwrap(),
Extension::Json
);

assert_eq!(
Uri::try_new("s3://config.foo")
.unwrap()
.extension()
.unwrap(),
Extension::Unknown("foo".to_string())
);

Ok(())
}
}
10 changes: 10 additions & 0 deletions quickwit-config/resources/tests/source_config/kafka-source.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
{
"source_id": "hdfs-logs-kafka-source",
"source_type": "kafka",
"params": {
"topic": "cloudera-cluster-logs",
"client_params": {
"bootstrap.servers": "host:9092"
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
source_id: hdfs-logs-kinesis-source
source_type: kinesis
params:
stream_name: emr-cluster-logs
Loading