diff --git a/Makefile b/Makefile index f86b2995679..4617ff44b68 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ docker-compose-up: COMPOSE_PROFILES=$(DOCKER_SERVICES) docker compose -f docker-compose.yml up -d --remove-orphans --wait docker-compose-down: - docker compose -f docker-compose.yml down --remove-orphans + docker compose -p quickwit down --remove-orphans docker-compose-logs: docker compose logs -f -t diff --git a/docker-compose.yml b/docker-compose.yml index 4fdf9dc2903..2cfeb7580c9 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -15,6 +15,8 @@ # first if they are already tagged latest and volumes if their content is # incompatible with the latest version, as in case of postgres. +name: quickwit + networks: default: name: quickwit-network diff --git a/quickwit/quickwit-config/src/source_config/serialize.rs b/quickwit/quickwit-config/src/source_config/serialize.rs index ae2c00bb414..9e97ab719b7 100644 --- a/quickwit/quickwit-config/src/source_config/serialize.rs +++ b/quickwit/quickwit-config/src/source_config/serialize.rs @@ -74,7 +74,7 @@ impl SourceConfigForSerialization { /// TODO refactor #1065 fn validate_and_build(self) -> anyhow::Result { if !RESERVED_SOURCE_IDS.contains(&self.source_id.as_str()) { - validate_identifier("Source ID", &self.source_id)?; + validate_identifier("source", &self.source_id)?; } let num_pipelines = NonZeroUsize::new(self.num_pipelines) .ok_or_else(|| anyhow::anyhow!("`desired_num_pipelines` must be strictly positive"))?; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/19_add-split-node-id-field.down.sql b/quickwit/quickwit-metastore/migrations/postgresql/19_add-split-node-id-field.down.sql new file mode 100644 index 00000000000..ad1a00f3a46 --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/19_add-split-node-id-field.down.sql @@ -0,0 +1,4 @@ +DROP INDEX IF EXISTS splits_node_id_idx; + +ALTER TABLE splits + DROP IF EXISTS COLUMN node_id; diff --git a/quickwit/quickwit-metastore/migrations/postgresql/19_add-split-node-id-field.up.sql b/quickwit/quickwit-metastore/migrations/postgresql/19_add-split-node-id-field.up.sql new file mode 100644 index 00000000000..4fb953976ae --- /dev/null +++ b/quickwit/quickwit-metastore/migrations/postgresql/19_add-split-node-id-field.up.sql @@ -0,0 +1,12 @@ +ALTER TABLE splits + ADD COLUMN node_id VARCHAR(253); + +UPDATE + splits +SET + node_id = splits.split_metadata_json::json ->> 'node_id'; + +ALTER TABLE splits + ALTER COLUMN node_id SET NOT NULL; + +CREATE INDEX IF NOT EXISTS splits_node_id_idx ON splits USING HASH (node_id); diff --git a/quickwit/quickwit-metastore/src/metastore/mod.rs b/quickwit/quickwit-metastore/src/metastore/mod.rs index 8fe8c173ca5..a104cb843c7 100644 --- a/quickwit/quickwit-metastore/src/metastore/mod.rs +++ b/quickwit/quickwit-metastore/src/metastore/mod.rs @@ -40,7 +40,7 @@ use quickwit_proto::metastore::{ MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream, PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest, }; -use quickwit_proto::types::{IndexUid, SplitId}; +use quickwit_proto::types::{IndexUid, NodeId, SourceId, SplitId}; use time::OffsetDateTime; use crate::checkpoint::IndexCheckpointDelta; @@ -557,9 +557,15 @@ impl ListSplitsResponseExt for ListSplitsResponse { #[derive(Debug, Clone, serde::Serialize, serde::Deserialize)] /// A query builder for listing splits within the metastore. pub struct ListSplitsQuery { - /// A non-empty list of index UIDs to get splits from. + /// A non-empty list of index UIDs for which to fetch the splits. pub index_uids: Vec, + /// A non-empty list of source IDs for which to fetch the splits. + pub source_ids: Vec, + + /// Foo + pub node_id: Option, + /// The maximum number of splits to retrieve. pub limit: Option, @@ -598,6 +604,8 @@ impl ListSplitsQuery { pub fn for_index(index_uid: IndexUid) -> Self { Self { index_uids: vec![index_uid], + source_ids: Vec::new(), + node_id: None, limit: None, offset: None, split_states: Vec::new(), @@ -622,6 +630,8 @@ impl ListSplitsQuery { } Ok(Self { index_uids, + source_ids: Vec::new(), + node_id: None, limit: None, offset: None, split_states: Vec::new(), @@ -635,6 +645,12 @@ impl ListSplitsQuery { }) } + /// Sets the node ID filter. + pub fn with_node_id(mut self, node_id: NodeId) -> Self { + self.node_id = Some(node_id); + self + } + /// Sets the maximum number of splits to retrieve. pub fn with_limit(mut self, n: usize) -> Self { self.limit = Some(n); diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs index 207e8a8437d..6b55fad2f6e 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs @@ -559,24 +559,25 @@ impl MetastoreService for PostgresqlMetastore { &mut self, request: StageSplitsRequest, ) -> MetastoreResult { - let split_metadata_list = request.deserialize_splits_metadata()?; + let splits_metadata = request.deserialize_splits_metadata()?; + + if splits_metadata.is_empty() { + return Ok(Default::default()); + } let index_uid: IndexUid = request.index_uid().clone(); - let mut split_ids = Vec::with_capacity(split_metadata_list.len()); - let mut time_range_start_list = Vec::with_capacity(split_metadata_list.len()); - let mut time_range_end_list = Vec::with_capacity(split_metadata_list.len()); - let mut tags_list = Vec::with_capacity(split_metadata_list.len()); - let mut split_metadata_json_list = Vec::with_capacity(split_metadata_list.len()); - let mut delete_opstamps = Vec::with_capacity(split_metadata_list.len()); - let mut maturity_timestamps = Vec::with_capacity(split_metadata_list.len()); - - for split_metadata in split_metadata_list { - let split_metadata_json = serde_json::to_string(&split_metadata).map_err(|error| { - MetastoreError::JsonSerializeError { - struct_name: "SplitMetadata".to_string(), - message: error.to_string(), - } - })?; - split_metadata_json_list.push(split_metadata_json); + let mut split_ids = Vec::with_capacity(splits_metadata.len()); + let mut time_range_start_list = Vec::with_capacity(splits_metadata.len()); + let mut time_range_end_list = Vec::with_capacity(splits_metadata.len()); + let mut tags_list = Vec::with_capacity(splits_metadata.len()); + let mut splits_metadata_json = Vec::with_capacity(splits_metadata.len()); + let mut delete_opstamps = Vec::with_capacity(splits_metadata.len()); + let mut maturity_timestamps = Vec::with_capacity(splits_metadata.len()); + let mut source_ids = Vec::with_capacity(splits_metadata.len()); + let mut node_ids = Vec::with_capacity(splits_metadata.len()); + + for split_metadata in splits_metadata { + let split_metadata_json = serde_utils::to_json_str(&split_metadata)?; + splits_metadata_json.push(split_metadata_json); let time_range_start = split_metadata .time_range @@ -592,13 +593,16 @@ impl MetastoreService for PostgresqlMetastore { tags_list.push(sqlx::types::Json(tags)); split_ids.push(split_metadata.split_id); delete_opstamps.push(split_metadata.delete_opstamp as i64); + source_ids.push(split_metadata.source_id); + node_ids.push(split_metadata.node_id); } tracing::Span::current().record("split_ids", format!("{split_ids:?}")); + // TODO: Remove transaction. run_with_tx!(self.connection_pool, tx, { let upserted_split_ids: Vec = sqlx::query_scalar(r#" INSERT INTO splits - (split_id, time_range_start, time_range_end, tags, split_metadata_json, delete_opstamp, maturity_timestamp, split_state, index_uid) + (split_id, time_range_start, time_range_end, tags, split_metadata_json, delete_opstamp, maturity_timestamp, split_state, index_uid, source_id, node_id) SELECT split_id, time_range_start, @@ -607,11 +611,13 @@ impl MetastoreService for PostgresqlMetastore { split_metadata_json, delete_opstamp, to_timestamp(maturity_timestamp), - $8 as split_state, - $9 as index_uid + $10 as split_state, + $11 as index_uid, + source_id, + node_id FROM - UNNEST($1, $2, $3, $4, $5, $6, $7) - AS staged_splits (split_id, time_range_start, time_range_end, tags_json, split_metadata_json, delete_opstamp, maturity_timestamp) + UNNEST($1, $2, $3, $4, $5, $6, $7, $8, $9) + AS staged_splits (split_id, time_range_start, time_range_end, tags_json, split_metadata_json, delete_opstamp, maturity_timestamp, source_id, node_id) ON CONFLICT(split_id) DO UPDATE SET time_range_start = excluded.time_range_start, @@ -621,6 +627,8 @@ impl MetastoreService for PostgresqlMetastore { delete_opstamp = excluded.delete_opstamp, maturity_timestamp = excluded.maturity_timestamp, index_uid = excluded.index_uid, + node_id = excluded.node_id, + source_id = excluded.source_id, update_timestamp = CURRENT_TIMESTAMP, create_timestamp = CURRENT_TIMESTAMP WHERE splits.split_id = excluded.split_id AND splits.split_state = 'Staged' @@ -630,9 +638,11 @@ impl MetastoreService for PostgresqlMetastore { .bind(time_range_start_list) .bind(time_range_end_list) .bind(tags_list) - .bind(split_metadata_json_list) + .bind(splits_metadata_json) .bind(delete_opstamps) .bind(maturity_timestamps) + .bind(&source_ids) + .bind(&node_ids) .bind(SplitState::Staged.as_str()) .bind(&index_uid) .fetch_all(tx.as_mut()) @@ -651,7 +661,7 @@ impl MetastoreService for PostgresqlMetastore { return Err(MetastoreError::FailedPrecondition { entity, message }); } info!( - index_id=%index_uid.index_id, + %index_uid, "staged `{}` splits successfully", split_ids.len() ); Ok(EmptyResponse {}) @@ -818,7 +828,7 @@ impl MetastoreService for PostgresqlMetastore { return Err(MetastoreError::FailedPrecondition { entity, message }); } info!( - index_id=%index_uid.index_id, + %index_uid, "published {} splits and marked {} for deletion successfully", num_published_splits, num_marked_splits ); @@ -936,14 +946,14 @@ impl MetastoreService for PostgresqlMetastore { })); } info!( - index_id=%index_uid.index_id, + %index_uid, "Marked {} splits for deletion, among which {} were newly marked.", split_ids.len() - not_found_split_ids.len(), num_marked_splits ); if !not_found_split_ids.is_empty() { warn!( - index_id=%index_uid.index_id, + %index_uid, split_ids=?PrettySample::new(¬_found_split_ids, 5), "{} splits were not found and could not be marked for deletion.", not_found_split_ids.len() @@ -1028,11 +1038,11 @@ impl MetastoreService for PostgresqlMetastore { }; return Err(MetastoreError::FailedPrecondition { entity, message }); } - info!(index_id=%index_uid.index_id, "Deleted {} splits from index.", num_deleted_splits); + info!(%index_uid, "Deleted {} splits from index.", num_deleted_splits); if !not_found_split_ids.is_empty() { warn!( - index_id=%index_uid.index_id, + %index_uid, split_ids=?PrettySample::new(¬_found_split_ids, 5), "{} splits were not found and could not be deleted.", not_found_split_ids.len() diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs index 341ffcbe9fa..c017176485b 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/model.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/model.rs @@ -81,6 +81,8 @@ pub enum Splits { Tags, SplitMetadataJson, IndexUid, + SourceId, + NodeId, DeleteOpstamp, } diff --git a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs index 1f972749c46..e7fdc9cba24 100644 --- a/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs +++ b/quickwit/quickwit-metastore/src/metastore/postgres/utils.rs @@ -97,9 +97,13 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits // Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list. sql.cond_where( - Expr::col(Splits::IndexUid).is_in(query.index_uids.iter().map(|val| val.to_string())), + Expr::col(Splits::IndexUid).is_in(&query.index_uids), ); + if let Some(node_id) = &query.node_id { + sql.cond_where(Expr::col(Splits::NodeId).eq(node_id)); + }; + if !query.split_states.is_empty() { sql.cond_where( Expr::col(Splits::SplitState) diff --git a/quickwit/quickwit-proto/src/types/index_uid.rs b/quickwit/quickwit-proto/src/types/index_uid.rs index a437038cdad..9382261227d 100644 --- a/quickwit/quickwit-proto/src/types/index_uid.rs +++ b/quickwit/quickwit-proto/src/types/index_uid.rs @@ -232,9 +232,16 @@ impl PartialEq<(&'static str, u128)> for IndexUid { } } +#[cfg(feature = "postgres")] +impl From for sea_query::Value { + fn from(index_uid: IndexUid) -> Self { + index_uid.to_string().into() + } +} + #[cfg(feature = "postgres")] impl From<&IndexUid> for sea_query::Value { - fn from(val: &IndexUid) -> Self { - val.to_string().into() + fn from(index_uid: &IndexUid) -> Self { + index_uid.to_string().into() } } diff --git a/quickwit/quickwit-proto/src/types/mod.rs b/quickwit/quickwit-proto/src/types/mod.rs index 18e034e6285..998239db68c 100644 --- a/quickwit/quickwit-proto/src/types/mod.rs +++ b/quickwit/quickwit-proto/src/types/mod.rs @@ -264,6 +264,13 @@ impl ToOwned for NodeIdRef { } } +#[cfg(feature = "postgres")] +impl From<&NodeId> for sea_query::Value { + fn from(node_id: &NodeId) -> Self { + node_id.to_string().into() + } +} + #[cfg(test)] mod tests { use super::*;