Skip to content

Commit

Permalink
Allow index ptns to not match any indexes.
Browse files Browse the repository at this point in the history
Closes #4042
  • Loading branch information
fulmicoton committed Oct 29, 2023
1 parent 1a58f9f commit 192fa48
Show file tree
Hide file tree
Showing 16 changed files with 102 additions and 81 deletions.
3 changes: 1 addition & 2 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -390,8 +390,7 @@ mod tests {
use quickwit_actors::{AskError, Observe, SupervisorMetrics};
use quickwit_config::{IndexConfig, SourceParams, INGEST_SOURCE_ID};
use quickwit_metastore::{
CreateIndexRequestExt, IndexMetadata,
ListIndexesMetadataResponseExt,
CreateIndexRequestExt, IndexMetadata, ListIndexesMetadataResponseExt,
};
use quickwit_proto::control_plane::GetOrCreateOpenShardsSubrequest;
use quickwit_proto::ingest::{Shard, ShardState};
Expand Down
8 changes: 3 additions & 5 deletions quickwit/quickwit-control-plane/src/control_plane_model.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,13 @@ use fnv::{FnvHashMap, FnvHashSet};
use itertools::Itertools;
use quickwit_common::Progress;
use quickwit_config::{SourceConfig, INGEST_SOURCE_ID};
use quickwit_metastore::{
IndexMetadata, ListIndexesMetadataResponseExt, list_all_indexes_request,
};
use quickwit_metastore::{IndexMetadata, ListIndexesMetadataResponseExt};
use quickwit_proto::control_plane::ControlPlaneResult;
use quickwit_proto::ingest::{Shard, ShardState};
use quickwit_proto::metastore;
use quickwit_proto::metastore::{
EntityKind, ListIndexesMetadataRequest, ListShardsSubrequest, MetastoreError, MetastoreService,
MetastoreServiceClient
MetastoreServiceClient,
};
use quickwit_proto::types::{IndexId, IndexUid, NodeId, NodeIdRef, ShardId, SourceId};
use serde::Serialize;
Expand Down Expand Up @@ -120,7 +118,7 @@ impl ControlPlaneModel {
let now = Instant::now();
self.clear();
let index_metadatas = progress
.protect_future(metastore.list_indexes_metadata(list_all_indexes_request()))
.protect_future(metastore.list_indexes_metadata(ListIndexesMetadataRequest::all()))
.await?
.deserialize_indexes_metadata()?;
let num_indexes = index_metadatas.len();
Expand Down
7 changes: 2 additions & 5 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,7 @@ use quickwit_config::{
use quickwit_ingest::{
DropQueueRequest, IngestApiService, IngesterPool, ListQueuesRequest, QUEUES_DIR_NAME,
};
use quickwit_metastore::{
IndexMetadata, IndexMetadataResponseExt,
ListIndexesMetadataResponseExt,
};
use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId,
IndexingTask, PipelineMetrics,
Expand Down Expand Up @@ -675,7 +672,7 @@ impl IndexingService {
let indexes_metadatas = self
.metastore
.clone()
.list_indexes_metadata(quickwit_metastore::list_all_indexes_request())
.list_indexes_metadata(ListIndexesMetadataRequest::all())
.await?
.deserialize_indexes_metadata()?;
let index_ids: HashSet<String> = indexes_metadatas
Expand Down
6 changes: 2 additions & 4 deletions quickwit/quickwit-janitor/src/actors/delete_task_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,7 @@ use quickwit_actors::{Actor, ActorContext, ActorExitStatus, ActorHandle, Handler
use quickwit_common::pubsub::EventBroker;
use quickwit_common::temp_dir::{self};
use quickwit_config::IndexConfig;
use quickwit_metastore::{
IndexMetadataResponseExt, ListIndexesMetadataResponseExt,
};
use quickwit_metastore::{IndexMetadataResponseExt, ListIndexesMetadataResponseExt};
use quickwit_proto::metastore::{
IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
};
Expand Down Expand Up @@ -116,7 +114,7 @@ impl DeleteTaskService {
) -> anyhow::Result<()> {
let mut index_config_by_index_id: HashMap<IndexUid, IndexConfig> = self
.metastore
.list_indexes_metadata(quickwit_metastore::list_all_indexes_request())
.list_indexes_metadata(ListIndexesMetadataRequest::all())
.await?
.deserialize_indexes_metadata()?
.into_iter()
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-janitor/src/actors/garbage_collector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,7 +89,7 @@ impl GarbageCollector {

let indexes = match self
.metastore
.list_indexes_metadata(quickwit_metastore::list_all_indexes_request())
.list_indexes_metadata(ListIndexesMetadataRequest::all())
.await
.and_then(|list_indexes_metadata_response| {
list_indexes_metadata_response.deserialize_indexes_metadata()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ use async_trait::async_trait;
use itertools::Itertools;
use quickwit_actors::{Actor, ActorContext, Handler};
use quickwit_config::IndexConfig;
use quickwit_metastore::{list_all_indexes_request, ListIndexesMetadataResponseExt};
use quickwit_metastore::ListIndexesMetadataResponseExt;
use quickwit_proto::metastore::{
ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
};
Expand Down Expand Up @@ -85,7 +85,7 @@ impl RetentionPolicyExecutor {

let index_metadatas = match self
.metastore
.list_indexes_metadata(list_all_indexes_request())
.list_indexes_metadata(ListIndexesMetadataRequest::all())
.await
.and_then(|response| response.deserialize_indexes_metadata())
{
Expand Down
6 changes: 3 additions & 3 deletions quickwit/quickwit-metastore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,9 +49,9 @@ pub(crate) use metastore::index_metadata::serialize::{IndexMetadataV0_6, Version
pub use metastore::postgresql_metastore::PostgresqlMetastore;
pub use metastore::{
file_backed_metastore, AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata,
IndexMetadataResponseExt, ListIndexesMetadataResponseExt,
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt,
MetastoreServiceExt, PublishSplitsRequestExt, StageSplitsRequestExt, list_all_indexes_request
IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery,
ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, PublishSplitsRequestExt,
StageSplitsRequestExt,
};
pub use metastore_factory::{MetastoreFactory, UnsupportedMetastore};
pub use metastore_resolver::MetastoreResolver;
Expand Down
12 changes: 3 additions & 9 deletions quickwit/quickwit-metastore/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -37,9 +37,9 @@ use quickwit_config::{IndexConfig, SourceConfig};
use quickwit_doc_mapper::tag_pruning::TagFilterAst;
use quickwit_proto::metastore::{
serde_utils, AddSourceRequest, CreateIndexRequest, DeleteTask, IndexMetadataRequest,
IndexMetadataResponse, ListIndexesMetadataRequest, ListIndexesMetadataResponse,
ListSplitsRequest, ListSplitsResponse, MetastoreError, MetastoreResult, MetastoreService,
MetastoreServiceClient, PublishSplitsRequest, StageSplitsRequest,
IndexMetadataResponse, ListIndexesMetadataResponse, ListSplitsRequest, ListSplitsResponse,
MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient,
PublishSplitsRequest, StageSplitsRequest,
};
use quickwit_proto::types::{IndexUid, SplitId};
use time::OffsetDateTime;
Expand Down Expand Up @@ -74,12 +74,6 @@ pub trait MetastoreServiceExt: MetastoreService {

impl MetastoreServiceExt for MetastoreServiceClient {}

pub fn list_all_indexes_request() -> ListIndexesMetadataRequest {
ListIndexesMetadataRequest {
index_ptns: vec!["*".to_string()]
}
}

/// Helper trait to build a [`CreateIndexRequest`] and deserialize its payload.
pub trait CreateIndexRequestExt {
/// Creates a new [`CreateIndexRequest`] from an [`IndexConfig`].
Expand Down
24 changes: 11 additions & 13 deletions quickwit/quickwit-metastore/src/metastore/postgresql_metastore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -60,10 +60,9 @@ use crate::metastore::postgresql_model::{PgDeleteTask, PgIndex, PgSplit, Splits,
use crate::metastore::{instrument_metastore, FilterRange, PublishSplitsRequestExt};
use crate::{
AddSourceRequestExt, CreateIndexRequestExt, IndexMetadata, IndexMetadataResponseExt,
ListIndexesMetadataRequestExt, ListIndexesMetadataResponseExt, ListIndexesQuery,
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreFactory,
MetastoreResolverError, MetastoreServiceExt, Split, SplitMaturity, SplitMetadata, SplitState,
StageSplitsRequestExt,
ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt,
MetastoreFactory, MetastoreResolverError, MetastoreServiceExt, Split, SplitMaturity,
SplitMetadata, SplitState, StageSplitsRequestExt,
};

static MIGRATOR: Migrator = sqlx::migrate!("migrations/postgresql");
Expand Down Expand Up @@ -823,10 +822,9 @@ impl MetastoreService for PostgresqlMetastore {
.iter()
.map(|index_uid| index_uid.index_id().to_string())
.collect();
let list_indexes_metadata_request =
ListIndexesMetadataRequest::try_from_list_indexes_query(
ListIndexesQuery::IndexIdPatterns(index_ids_str.clone()),
)?;
let list_indexes_metadata_request = ListIndexesMetadataRequest {
index_ptns: index_ids_str.clone(),
};
let found_index_ids: HashSet<String> = self
.list_indexes_metadata(list_indexes_metadata_request)
.await?
Expand Down Expand Up @@ -1386,7 +1384,7 @@ fn build_index_id_patterns_sql_query(index_id_patterns: &[String]) -> anyhow::Re
if index_id_patterns.is_empty() {
anyhow::bail!("The list of index id patterns may not be empty.");
}
if &index_id_patterns == &["*"] {
if index_id_patterns == ["*"] {
return Ok("SELECT * FROM indexes".to_string());
}
// TODO Fix me.
Expand Down Expand Up @@ -1854,11 +1852,11 @@ mod tests {
#[test]
fn test_index_id_pattern_like_query() {
assert_eq!(
&build_index_id_patterns_sql_query(vec!["*-index-*-last*".to_string()]).unwrap(),
&build_index_id_patterns_sql_query(&["*-index-*-last*".to_string()]).unwrap(),
"SELECT * FROM indexes WHERE index_id LIKE '%-index-%-last%'"
);
assert_eq!(
&build_index_id_patterns_sql_query(vec![
&build_index_id_patterns_sql_query(&[
"*-index-*-last*".to_string(),
"another-index".to_string()
])
Expand All @@ -1867,7 +1865,7 @@ mod tests {
'another-index'"
);
assert_eq!(
&build_index_id_patterns_sql_query(vec![
&build_index_id_patterns_sql_query(&[
"*-index-*-last**".to_string(),
"another-index".to_string(),
"*".to_string()
Expand All @@ -1876,7 +1874,7 @@ mod tests {
"SELECT * FROM indexes"
);
assert_eq!(
build_index_id_patterns_sql_query(vec!["*-index-*-&-last**".to_string()])
build_index_id_patterns_sql_query(&["*-index-*-&-last**".to_string()])
.unwrap_err()
.to_string(),
"internal error: failed to build list indexes query; cause: `index ID pattern \
Expand Down
22 changes: 7 additions & 15 deletions quickwit/quickwit-metastore/src/tests/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,9 +48,8 @@ use crate::checkpoint::{
};
use crate::{
AddSourceRequestExt, CreateIndexRequestExt, IndexMetadataResponseExt,
ListIndexesMetadataRequestExt, ListIndexesMetadataResponseExt, ListSplitsQuery,
ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt, Split, SplitMaturity,
SplitMetadata, SplitState, StageSplitsRequestExt,
ListIndexesMetadataResponseExt, ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt,
MetastoreServiceExt, Split, SplitMaturity, SplitMetadata, SplitState, StageSplitsRequestExt,
};

#[async_trait]
Expand Down Expand Up @@ -317,15 +316,12 @@ pub async fn test_metastore_list_indexes<MetastoreToTest: MetastoreService + Def
let index_uri_4 = format!("ram:///indexes/{index_id_4}");
let index_config_4 = IndexConfig::for_test(&index_id_4, &index_uri_4);

let list_index_metadata_query = crate::ListIndexesQuery::IndexIdPatterns(vec![
let index_ptns = vec![
format!("prefix-*-{index_id_fragment}-suffix-*"),
format!("prefix*{index_id_fragment}*suffix-*"),
]);
];
let indexes_count = metastore
.list_indexes_metadata(
ListIndexesMetadataRequest::try_from_list_indexes_query(list_index_metadata_query)
.unwrap(),
)
.list_indexes_metadata(ListIndexesMetadataRequest { index_ptns })
.await
.unwrap()
.deserialize_indexes_metadata()
Expand Down Expand Up @@ -358,13 +354,9 @@ pub async fn test_metastore_list_indexes<MetastoreToTest: MetastoreService + Def
.index_uid
.into();

let list_indexes_query = crate::ListIndexesQuery::IndexIdPatterns(vec![format!(
"prefix-*-{index_id_fragment}-suffix-*"
)]);
let index_ptns = vec![format!("prefix-*-{index_id_fragment}-suffix-*")];
let indexes_count = metastore
.list_indexes_metadata(
ListIndexesMetadataRequest::try_from_list_indexes_query(list_indexes_query).unwrap(),
)
.list_indexes_metadata(ListIndexesMetadataRequest { index_ptns })
.await
.unwrap()
.deserialize_indexes_metadata()
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-proto/src/metastore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -298,3 +298,11 @@ pub mod serde_utils {
})
}
}

impl ListIndexesMetadataRequest {
pub fn all() -> ListIndexesMetadataRequest {
ListIndexesMetadataRequest {
index_ptns: vec!["*".to_string()],
}
}
}
10 changes: 4 additions & 6 deletions quickwit/quickwit-search/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,8 @@ use tokio::task::JoinError;
#[allow(missing_docs)]
#[derive(Error, Debug, Serialize, Deserialize, Clone)]
pub enum SearchError {
#[error("could not find indexes matching the IDs or patterns `{index_id_patterns:?}`")]
IndexesNotFound { index_id_patterns: Vec<String> },
#[error("could not find indexes matching the IDs `{index_ids:?}`")]
IndexesNotFound { index_ids: Vec<String> },
#[error("internal error: `{0}`")]
Internal(String),
#[error("invalid aggregation request: {0}")]
Expand Down Expand Up @@ -104,13 +104,11 @@ impl From<MetastoreError> for SearchError {
match metastore_error {
MetastoreError::NotFound(EntityKind::Index { index_id }) => {
SearchError::IndexesNotFound {
index_id_patterns: vec![index_id],
index_ids: vec![index_id],
}
}
MetastoreError::NotFound(EntityKind::Indexes { index_ids }) => {
SearchError::IndexesNotFound {
index_id_patterns: index_ids,
}
SearchError::IndexesNotFound { index_ids }
}
_ => SearchError::Internal(metastore_error.to_string()),
}
Expand Down
57 changes: 49 additions & 8 deletions quickwit/quickwit-search/src/root.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,8 @@ use quickwit_config::{build_doc_mapper, IndexConfig};
use quickwit_doc_mapper::tag_pruning::extract_tags_from_query;
use quickwit_doc_mapper::{DocMapper, DYNAMIC_FIELD_NAME};
use quickwit_metastore::{
IndexMetadata, IndexMetadataResponseExt,
ListIndexesMetadataResponseExt, ListSplitsRequestExt, ListSplitsResponseExt,
SplitMetadata,
IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsRequestExt,
ListSplitsResponseExt, SplitMetadata,
};
use quickwit_proto::metastore::{
IndexMetadataRequest, ListIndexesMetadataRequest, ListSplitsRequest, MetastoreService,
Expand Down Expand Up @@ -706,6 +705,46 @@ fn finalize_aggregation_if_any(
Ok(Some(aggregation_result_json))
}

/// Checks that all of the index researched as found.
///
/// An index pattern (= containing a wildcard) not matching is not an error.
/// A specific index id however must be found.
///
/// We put this check here and not in the metastore to make sure the logic is independent
/// of the metastore implementation, and some different use cases could require different
/// behaviors. This specification was principally motivated by #4042.
fn check_all_index_metadata_found(
index_metadatas: &[IndexMetadata],
index_ptns: &[String],
) -> crate::Result<()> {
let mut index_ids: HashSet<&str> = index_ptns
.iter()
.map(|index_ptn| index_ptn.as_str())
.filter(|index_ptn| !index_ptn.contains('*'))
.collect();

if index_ids.is_empty() {
// All of the patterns are wildcard patterns.
return Ok(());
}

for index_metadata in index_metadatas {
index_ids.remove(index_metadata.index_uid.index_id());
}

if !index_ids.is_empty() {
let missing_index_ids = index_ids
.into_iter()
.map(|missing_index_id| missing_index_id.to_string())
.collect();
return Err(SearchError::IndexesNotFound {
index_ids: missing_index_ids,
});
}

Ok(())
}

/// Performs a distributed search.
/// 1. Sends leaf request over gRPC to multiple leaf nodes.
/// 2. Merges the search results.
Expand All @@ -721,17 +760,19 @@ pub async fn root_search(
info!(searcher_context = ?searcher_context, search_request = ?search_request);
let start_instant = tokio::time::Instant::now();
let list_indexes_metadatas_request = ListIndexesMetadataRequest {
index_ptns: search_request.index_id_patterns.clone()
index_ptns: search_request.index_id_patterns.clone(),
};
let indexes_metadata = metastore
let indexes_metadata: Vec<IndexMetadata> = metastore
.list_indexes_metadata(list_indexes_metadatas_request)
.await?
.deserialize_indexes_metadata()?;

check_all_index_metadata_found(&indexes_metadata[..], &search_request.index_id_patterns[..])?;

if indexes_metadata.is_empty() {
return Err(SearchError::IndexesNotFound {
index_id_patterns: search_request.index_id_patterns,
});
return Ok(SearchResponse::default());
}

let index_uids = indexes_metadata
.iter()
.map(|index_metadata| index_metadata.index_uid.clone())
Expand Down
Loading

0 comments on commit 192fa48

Please sign in to comment.