Skip to content

Commit

Permalink
Optimize and batch merge pipelines list splits queries
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed May 9, 2024
1 parent e8978df commit 3b66e6e
Show file tree
Hide file tree
Showing 11 changed files with 100 additions and 36 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ docker-compose-up:
COMPOSE_PROFILES=$(DOCKER_SERVICES) docker compose -f docker-compose.yml up -d --remove-orphans --wait

docker-compose-down:
docker compose -f docker-compose.yml down --remove-orphans
docker compose -p quickwit down --remove-orphans

docker-compose-logs:
docker compose logs -f -t
Expand Down
2 changes: 2 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
# first if they are already tagged latest and volumes if their content is
# incompatible with the latest version, as in case of postgres.

name: quickwit

networks:
default:
name: quickwit-network
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-config/src/source_config/serialize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ impl SourceConfigForSerialization {
/// TODO refactor #1065
fn validate_and_build(self) -> anyhow::Result<SourceConfig> {
if !RESERVED_SOURCE_IDS.contains(&self.source_id.as_str()) {
validate_identifier("Source ID", &self.source_id)?;
validate_identifier("source", &self.source_id)?;
}
let num_pipelines = NonZeroUsize::new(self.num_pipelines)
.ok_or_else(|| anyhow::anyhow!("`desired_num_pipelines` must be strictly positive"))?;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
DROP INDEX IF EXISTS splits_node_id_idx;

ALTER TABLE splits
DROP IF EXISTS COLUMN node_id;
Original file line number Diff line number Diff line change
@@ -0,0 +1,12 @@
ALTER TABLE splits
ADD COLUMN node_id VARCHAR(253);

UPDATE
splits
SET
node_id = splits.split_metadata_json::json ->> 'node_id';

ALTER TABLE splits
ALTER COLUMN node_id SET NOT NULL;

CREATE INDEX IF NOT EXISTS splits_node_id_idx ON splits USING HASH (node_id);
20 changes: 18 additions & 2 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ use quickwit_proto::metastore::{
MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream,
PublishSplitsRequest, StageSplitsRequest, UpdateIndexRequest,
};
use quickwit_proto::types::{IndexUid, SplitId};
use quickwit_proto::types::{IndexUid, NodeId, SourceId, SplitId};
use time::OffsetDateTime;

use crate::checkpoint::IndexCheckpointDelta;
Expand Down Expand Up @@ -557,9 +557,15 @@ impl ListSplitsResponseExt for ListSplitsResponse {
#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
/// A query builder for listing splits within the metastore.
pub struct ListSplitsQuery {
/// A non-empty list of index UIDs to get splits from.
/// A non-empty list of index UIDs for which to fetch the splits.
pub index_uids: Vec<IndexUid>,

/// A non-empty list of source IDs for which to fetch the splits.
pub source_ids: Vec<SourceId>,

/// Foo
pub node_id: Option<NodeId>,

/// The maximum number of splits to retrieve.
pub limit: Option<usize>,

Expand Down Expand Up @@ -598,6 +604,8 @@ impl ListSplitsQuery {
pub fn for_index(index_uid: IndexUid) -> Self {
Self {
index_uids: vec![index_uid],
source_ids: Vec::new(),
node_id: None,
limit: None,
offset: None,
split_states: Vec::new(),
Expand All @@ -622,6 +630,8 @@ impl ListSplitsQuery {
}
Ok(Self {
index_uids,
source_ids: Vec::new(),
node_id: None,
limit: None,
offset: None,
split_states: Vec::new(),
Expand All @@ -635,6 +645,12 @@ impl ListSplitsQuery {
})
}

/// Sets the node ID filter.
pub fn with_node_id(mut self, node_id: NodeId) -> Self {
self.node_id = Some(node_id);
self
}

/// Sets the maximum number of splits to retrieve.
pub fn with_limit(mut self, n: usize) -> Self {
self.limit = Some(n);
Expand Down
68 changes: 39 additions & 29 deletions quickwit/quickwit-metastore/src/metastore/postgres/metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -559,24 +559,25 @@ impl MetastoreService for PostgresqlMetastore {
&mut self,
request: StageSplitsRequest,
) -> MetastoreResult<EmptyResponse> {
let split_metadata_list = request.deserialize_splits_metadata()?;
let splits_metadata = request.deserialize_splits_metadata()?;

if splits_metadata.is_empty() {
return Ok(Default::default());
}
let index_uid: IndexUid = request.index_uid().clone();
let mut split_ids = Vec::with_capacity(split_metadata_list.len());
let mut time_range_start_list = Vec::with_capacity(split_metadata_list.len());
let mut time_range_end_list = Vec::with_capacity(split_metadata_list.len());
let mut tags_list = Vec::with_capacity(split_metadata_list.len());
let mut split_metadata_json_list = Vec::with_capacity(split_metadata_list.len());
let mut delete_opstamps = Vec::with_capacity(split_metadata_list.len());
let mut maturity_timestamps = Vec::with_capacity(split_metadata_list.len());

for split_metadata in split_metadata_list {
let split_metadata_json = serde_json::to_string(&split_metadata).map_err(|error| {
MetastoreError::JsonSerializeError {
struct_name: "SplitMetadata".to_string(),
message: error.to_string(),
}
})?;
split_metadata_json_list.push(split_metadata_json);
let mut split_ids = Vec::with_capacity(splits_metadata.len());
let mut time_range_start_list = Vec::with_capacity(splits_metadata.len());
let mut time_range_end_list = Vec::with_capacity(splits_metadata.len());
let mut tags_list = Vec::with_capacity(splits_metadata.len());
let mut splits_metadata_json = Vec::with_capacity(splits_metadata.len());
let mut delete_opstamps = Vec::with_capacity(splits_metadata.len());
let mut maturity_timestamps = Vec::with_capacity(splits_metadata.len());
let mut source_ids = Vec::with_capacity(splits_metadata.len());
let mut node_ids = Vec::with_capacity(splits_metadata.len());

for split_metadata in splits_metadata {
let split_metadata_json = serde_utils::to_json_str(&split_metadata)?;
splits_metadata_json.push(split_metadata_json);

let time_range_start = split_metadata
.time_range
Expand All @@ -592,13 +593,16 @@ impl MetastoreService for PostgresqlMetastore {
tags_list.push(sqlx::types::Json(tags));
split_ids.push(split_metadata.split_id);
delete_opstamps.push(split_metadata.delete_opstamp as i64);
source_ids.push(split_metadata.source_id);
node_ids.push(split_metadata.node_id);
}
tracing::Span::current().record("split_ids", format!("{split_ids:?}"));

// TODO: Remove transaction.
run_with_tx!(self.connection_pool, tx, {
let upserted_split_ids: Vec<String> = sqlx::query_scalar(r#"
INSERT INTO splits
(split_id, time_range_start, time_range_end, tags, split_metadata_json, delete_opstamp, maturity_timestamp, split_state, index_uid)
(split_id, time_range_start, time_range_end, tags, split_metadata_json, delete_opstamp, maturity_timestamp, split_state, index_uid, source_id, node_id)
SELECT
split_id,
time_range_start,
Expand All @@ -607,11 +611,13 @@ impl MetastoreService for PostgresqlMetastore {
split_metadata_json,
delete_opstamp,
to_timestamp(maturity_timestamp),
$8 as split_state,
$9 as index_uid
$10 as split_state,
$11 as index_uid,
source_id,
node_id
FROM
UNNEST($1, $2, $3, $4, $5, $6, $7)
AS staged_splits (split_id, time_range_start, time_range_end, tags_json, split_metadata_json, delete_opstamp, maturity_timestamp)
UNNEST($1, $2, $3, $4, $5, $6, $7, $8, $9)
AS staged_splits (split_id, time_range_start, time_range_end, tags_json, split_metadata_json, delete_opstamp, maturity_timestamp, source_id, node_id)
ON CONFLICT(split_id) DO UPDATE
SET
time_range_start = excluded.time_range_start,
Expand All @@ -621,6 +627,8 @@ impl MetastoreService for PostgresqlMetastore {
delete_opstamp = excluded.delete_opstamp,
maturity_timestamp = excluded.maturity_timestamp,
index_uid = excluded.index_uid,
node_id = excluded.node_id,
source_id = excluded.source_id,
update_timestamp = CURRENT_TIMESTAMP,
create_timestamp = CURRENT_TIMESTAMP
WHERE splits.split_id = excluded.split_id AND splits.split_state = 'Staged'
Expand All @@ -630,9 +638,11 @@ impl MetastoreService for PostgresqlMetastore {
.bind(time_range_start_list)
.bind(time_range_end_list)
.bind(tags_list)
.bind(split_metadata_json_list)
.bind(splits_metadata_json)
.bind(delete_opstamps)
.bind(maturity_timestamps)
.bind(&source_ids)
.bind(&node_ids)
.bind(SplitState::Staged.as_str())
.bind(&index_uid)
.fetch_all(tx.as_mut())
Expand All @@ -651,7 +661,7 @@ impl MetastoreService for PostgresqlMetastore {
return Err(MetastoreError::FailedPrecondition { entity, message });
}
info!(
index_id=%index_uid.index_id,
%index_uid,
"staged `{}` splits successfully", split_ids.len()
);
Ok(EmptyResponse {})
Expand Down Expand Up @@ -818,7 +828,7 @@ impl MetastoreService for PostgresqlMetastore {
return Err(MetastoreError::FailedPrecondition { entity, message });
}
info!(
index_id=%index_uid.index_id,
%index_uid,
"published {} splits and marked {} for deletion successfully",
num_published_splits, num_marked_splits
);
Expand Down Expand Up @@ -936,14 +946,14 @@ impl MetastoreService for PostgresqlMetastore {
}));
}
info!(
index_id=%index_uid.index_id,
%index_uid,
"Marked {} splits for deletion, among which {} were newly marked.",
split_ids.len() - not_found_split_ids.len(),
num_marked_splits
);
if !not_found_split_ids.is_empty() {
warn!(
index_id=%index_uid.index_id,
%index_uid,
split_ids=?PrettySample::new(&not_found_split_ids, 5),
"{} splits were not found and could not be marked for deletion.",
not_found_split_ids.len()
Expand Down Expand Up @@ -1028,11 +1038,11 @@ impl MetastoreService for PostgresqlMetastore {
};
return Err(MetastoreError::FailedPrecondition { entity, message });
}
info!(index_id=%index_uid.index_id, "Deleted {} splits from index.", num_deleted_splits);
info!(%index_uid, "Deleted {} splits from index.", num_deleted_splits);

if !not_found_split_ids.is_empty() {
warn!(
index_id=%index_uid.index_id,
%index_uid,
split_ids=?PrettySample::new(&not_found_split_ids, 5),
"{} splits were not found and could not be deleted.",
not_found_split_ids.len()
Expand Down
2 changes: 2 additions & 0 deletions quickwit/quickwit-metastore/src/metastore/postgres/model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,8 @@ pub enum Splits {
Tags,
SplitMetadataJson,
IndexUid,
SourceId,
NodeId,
DeleteOpstamp,
}

Expand Down
6 changes: 5 additions & 1 deletion quickwit/quickwit-metastore/src/metastore/postgres/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,13 @@ pub(super) fn append_query_filters(sql: &mut SelectStatement, query: &ListSplits
// Note: `ListSplitsQuery` builder enforces a non empty `index_uids` list.

sql.cond_where(
Expr::col(Splits::IndexUid).is_in(query.index_uids.iter().map(|val| val.to_string())),
Expr::col(Splits::IndexUid).is_in(&query.index_uids),
);

if let Some(node_id) = &query.node_id {
sql.cond_where(Expr::col(Splits::NodeId).eq(node_id));
};

if !query.split_states.is_empty() {
sql.cond_where(
Expr::col(Splits::SplitState)
Expand Down
11 changes: 9 additions & 2 deletions quickwit/quickwit-proto/src/types/index_uid.rs
Original file line number Diff line number Diff line change
Expand Up @@ -232,9 +232,16 @@ impl PartialEq<(&'static str, u128)> for IndexUid {
}
}

#[cfg(feature = "postgres")]
impl From<IndexUid> for sea_query::Value {
fn from(index_uid: IndexUid) -> Self {
index_uid.to_string().into()
}
}

#[cfg(feature = "postgres")]
impl From<&IndexUid> for sea_query::Value {
fn from(val: &IndexUid) -> Self {
val.to_string().into()
fn from(index_uid: &IndexUid) -> Self {
index_uid.to_string().into()
}
}
7 changes: 7 additions & 0 deletions quickwit/quickwit-proto/src/types/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -264,6 +264,13 @@ impl ToOwned for NodeIdRef {
}
}

#[cfg(feature = "postgres")]
impl From<&NodeId> for sea_query::Value {
fn from(node_id: &NodeId) -> Self {
node_id.to_string().into()
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down

0 comments on commit 3b66e6e

Please sign in to comment.