Skip to content

Commit

Permalink
Move object backed source to file source
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jun 26, 2024
1 parent 12c2d39 commit 7183c49
Show file tree
Hide file tree
Showing 15 changed files with 534 additions and 408 deletions.
4 changes: 2 additions & 2 deletions quickwit/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 10 additions & 4 deletions quickwit/quickwit-cli/src/source.rs
Original file line number Diff line number Diff line change
Expand Up @@ -754,10 +754,16 @@ mod tests {
enabled: "true".to_string(),
}];
let expected_uri = Uri::from_str("path/to/file").unwrap();
let expected_params = vec![ParamsRow {
key: "filepath".to_string(),
value: JsonValue::String(expected_uri.to_string()),
}];
let expected_params = vec![
ParamsRow {
key: "filepath".to_string(),
value: JsonValue::String(expected_uri.to_string()),
},
ParamsRow {
key: "mode".to_string(),
value: JsonValue::String("file_uri".to_string()),
},
];
let expected_checkpoint = vec![
CheckpointRow {
partition_id: "shard-000".to_string(),
Expand Down
15 changes: 8 additions & 7 deletions quickwit/quickwit-config/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ use serde::de::DeserializeOwned;
use serde::Serialize;
use serde_json::Value as JsonValue;
pub use source_config::{
load_source_config_from_user_config, FileSourceParams, KafkaSourceParams, KinesisSourceParams,
PubSubSourceParams, PulsarSourceAuth, PulsarSourceParams, QueueMessageType, QueueParams,
RegionOrEndpoint, SourceConfig, SourceInputFormat, SourceParams, SqsSourceParams,
TransformConfig, VecSourceParams, VoidSourceParams, CLI_SOURCE_ID, INGEST_API_SOURCE_ID,
INGEST_V2_SOURCE_ID,
load_source_config_from_user_config, FileSourceMessageFormat, FileSourceParams, FileSourceSqs,
FileSourceUri, KafkaSourceParams, KinesisSourceParams, PubSubSourceParams, PulsarSourceAuth,
PulsarSourceParams, RegionOrEndpoint, SourceConfig, SourceInputFormat, SourceParams,
SqsSourceParams, TransformConfig, VecSourceParams, VoidSourceParams, CLI_SOURCE_ID,
INGEST_API_SOURCE_ID, INGEST_V2_SOURCE_ID,
};
use tracing::warn;

Expand Down Expand Up @@ -113,9 +113,10 @@ pub fn disable_ingest_v1() -> bool {
IndexTemplateV0_8,
SourceInputFormat,
SourceParams,
FileSourceMessageFormat,
FileSourceSqs,
FileSourceParams,
QueueParams,
QueueMessageType,
FileSourceUri,
SqsSourceParams,
PubSubSourceParams,
KafkaSourceParams,
Expand Down
120 changes: 66 additions & 54 deletions quickwit/quickwit-config/src/source_config/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -233,17 +233,15 @@ pub enum SourceParams {

impl SourceParams {
pub fn file_from_uri(uri: Uri) -> Self {
Self::File(FileSourceParams {
filepath: Some(uri),
})
Self::File(FileSourceParams::FileUri(FileSourceUri { filepath: uri }))
}

pub fn file_from_str<P: AsRef<str>>(filepath: P) -> anyhow::Result<Self> {
FileSourceParams::from_str(filepath.as_ref()).map(Self::File)
Uri::from_str(filepath.as_ref()).map(Self::file_from_uri)
}

pub fn stdin() -> Self {
Self::File(FileSourceParams { filepath: None })
Self::File(FileSourceParams::Stdin)
}

pub fn void() -> Self {
Expand All @@ -268,37 +266,45 @@ impl SourceParams {
}
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum FileSourceMessageFormat {
/// See https://docs.aws.amazon.com/AmazonS3/latest/userguide/notification-content-structure.html
S3Notification,
/// A string with the URI of the file (e.g `s3://bucket/key`)
RawUri,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(deny_unknown_fields)]
pub struct FileSourceParams {
/// Path of the file to read. Assume stdin if None.
#[schema(value_type = String)]
#[serde(skip_serializing_if = "Option::is_none")]
#[serde(default)]
#[serde(deserialize_with = "uri_from_str")]
pub filepath: Option<Uri>, //< If None read from stdin.
pub struct FileSourceSqs {
pub queue_url: String,
/// Polling wait time in seconds for receiving messages. Leave default value.
#[serde(default = "default_wait_time_seconds")]
pub wait_time_seconds: u8,
pub message_format: FileSourceMessageFormat,
}

impl FromStr for FileSourceParams {
type Err = anyhow::Error;
fn default_wait_time_seconds() -> u8 {
20
}

fn from_str(filepath: &str) -> anyhow::Result<Self> {
let uri = Uri::from_str(filepath)?;
Ok(Self {
filepath: Some(uri),
})
}
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
pub struct FileSourceUri {
#[schema(value_type = String)]
pub filepath: Uri,
}

/// Deserializing as an URI
fn uri_from_str<'de, D>(deserializer: D) -> Result<Option<Uri>, D::Error>
where D: Deserializer<'de> {
let filepath_opt: Option<String> = Deserialize::deserialize(deserializer)?;
if let Some(filepath) = filepath_opt {
let uri = Uri::from_str(&filepath).map_err(D::Error::custom)?;
Ok(Some(uri))
} else {
Ok(None)
#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "snake_case", tag = "mode")]
pub enum FileSourceParams {
FileUri(FileSourceUri),
Stdin,
Sqs(FileSourceSqs),
}

impl FileSourceParams {
pub fn from_filepath<P: AsRef<str>>(filepath: P) -> anyhow::Result<Self> {
Uri::from_str(filepath.as_ref()).map(|uri| Self::FileUri(FileSourceUri { filepath: uri }))
}
}

Expand Down Expand Up @@ -344,34 +350,12 @@ pub struct PubSubSourceParams {
pub max_messages_per_pull: Option<i32>,
}

#[derive(Clone, Copy, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
#[serde(rename_all = "snake_case")]
pub enum QueueMessageType {
S3Notification,
// GcsNotification,
// RawData,
RawUri,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
pub struct QueueParams {
pub message_type: QueueMessageType,
// pub deduplication_window_duration_sec: usize,
// pub deduplication_window_max_messages: usize,
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
pub struct SqsSourceParams {
pub queue_url: String,
/// Polling wait time in seconds for receiving messages. Leave default value.
#[serde(default = "default_wait_time_seconds")]
pub wait_time_seconds: u8,
#[serde(flatten)]
pub queue_params: QueueParams,
}

fn default_wait_time_seconds() -> u8 {
20
}

#[derive(Clone, Debug, Eq, PartialEq, Serialize, Deserialize, utoipa::ToSchema)]
Expand Down Expand Up @@ -853,14 +837,42 @@ mod tests {
}

#[test]
fn test_file_source_params_serialization() {
fn test_file_source_params_deserialization() {
{
let yaml = r#"
type: file_uri
filepath: source-path.json
"#;
let file_params = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
let uri = Uri::from_str("source-path.json").unwrap();
assert_eq!(file_params.filepath.unwrap(), uri);
assert_eq!(
file_params,
FileSourceParams::FileUri(FileSourceUri { filepath: uri })
);
}
{
let yaml = r#"
type: stdin
"#;
let file_params = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
assert_eq!(file_params, FileSourceParams::Stdin);
}
{
let yaml = r#"
mode: sqs
queue_url: https://sqs.us-east-1.amazonaws.com/123456789012/queue-name
notification_format: s3_notification
"#;
let file_params = serde_yaml::from_str::<FileSourceParams>(yaml).unwrap();
assert_eq!(
file_params,
FileSourceParams::Sqs(FileSourceSqs {
queue_url: "https://sqs.us-east-1.amazonaws.com/123456789012/queue-name"
.to_string(),
wait_time_seconds: default_wait_time_seconds(),
message_format: FileSourceMessageFormat::S3Notification,
})
);
}
}

Expand Down
7 changes: 5 additions & 2 deletions quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ use quickwit_proto::types::SourceId;
use serde::{Deserialize, Serialize};

use super::{TransformConfig, RESERVED_SOURCE_IDS};
use crate::{validate_identifier, ConfigFormat, SourceConfig, SourceInputFormat, SourceParams};
use crate::{
validate_identifier, ConfigFormat, FileSourceParams, SourceConfig, SourceInputFormat,
SourceParams,
};

type SourceConfigForSerialization = SourceConfigV0_8;

Expand Down Expand Up @@ -80,7 +83,7 @@ impl SourceConfigForSerialization {
match &self.source_params {
// We want to forbid source_config with no filepath
SourceParams::File(file_params) => {
if file_params.filepath.is_none() {
if matches!(file_params, FileSourceParams::Stdin) {
bail!(
"source `{}` of type `file` must contain a filepath",
self.source_id
Expand Down
Loading

0 comments on commit 7183c49

Please sign in to comment.