Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add ES Resolve index API handler #5096

Closed
wants to merge 2 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions quickwit/quickwit-lambda/src/searcher/api.rs
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,7 @@ fn es_compat_api(
.or(es_compat_stats_handler(metastore.clone()))
.or(es_compat_index_cat_indices_handler(metastore.clone()))
.or(es_compat_cat_indices_handler(metastore.clone()))
.or(es_compat_resolve_index_handler(metastore.clone()))
}

fn index_api(
Expand Down
8 changes: 8 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,14 @@ pub(crate) fn elastic_field_capabilities_filter() -> impl Filter<
.and(json_or_empty())
}

#[utoipa::path(get, tag = "Metadata", path = "/_resolve/index/{index}")]
pub(crate) fn elastic_resolve_index_filter(
) -> impl Filter<Extract = (Vec<String>,), Error = Rejection> + Clone {
warp::path!("_elastic" / "_resolve" / "index" / String)
.and_then(extract_index_id_patterns)
.and(warp::get())
}

#[utoipa::path(get, tag = "Count", path = "/{index}/_count")]
pub(crate) fn elastic_index_count_filter(
) -> impl Filter<Extract = (Vec<String>, SearchQueryParamsCount, SearchBody), Error = Rejection> + Clone
Expand Down
5 changes: 3 additions & 2 deletions quickwit/quickwit-serve/src/elasticsearch_api/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,8 +38,8 @@ pub use rest_handler::{
es_compat_cat_indices_handler, es_compat_cluster_info_handler, es_compat_delete_index_handler,
es_compat_index_cat_indices_handler, es_compat_index_count_handler,
es_compat_index_field_capabilities_handler, es_compat_index_multi_search_handler,
es_compat_index_search_handler, es_compat_index_stats_handler, es_compat_scroll_handler,
es_compat_search_handler, es_compat_stats_handler,
es_compat_index_search_handler, es_compat_index_stats_handler, es_compat_resolve_index_handler,
es_compat_scroll_handler, es_compat_search_handler, es_compat_stats_handler,
};
use serde::{Deserialize, Serialize};
use warp::{Filter, Rejection};
Expand Down Expand Up @@ -79,6 +79,7 @@ pub fn elastic_api_handlers(
.or(es_compat_stats_handler(metastore.clone()))
.or(es_compat_index_cat_indices_handler(metastore.clone()))
.or(es_compat_cat_indices_handler(metastore.clone()))
.or(es_compat_resolve_index_handler(metastore.clone()))
// Register newly created handlers here.
}

Expand Down
20 changes: 20 additions & 0 deletions quickwit/quickwit-serve/src/elasticsearch_api/model/cat_indices.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,26 @@ impl From<SplitMetadata> for ElasticsearchCatIndexResponse {
}
}

#[derive(Debug, Clone, Default, Serialize)]
pub struct ElasticsearchResolveIndexResponse {
pub indices: Vec<ElasticsearchResolveIndexEntryResponse>,
}

#[derive(Debug, Clone, Default, Serialize)]
pub struct ElasticsearchResolveIndexEntryResponse {
pub name: String,
pub attributes: Vec<Status>,
}

impl From<IndexMetadata> for ElasticsearchResolveIndexEntryResponse {
fn from(index_metadata: IndexMetadata) -> Self {
ElasticsearchResolveIndexEntryResponse {
name: index_metadata.index_config.index_id.to_string(),
attributes: vec![Status::Open],
}
}
}

fn serialize_u64_as_string<S>(value: &u64, serializer: S) -> Result<S::Ok, S::Error>
where S: Serializer {
serializer.serialize_str(&value.to_string())
Expand Down
5 changes: 4 additions & 1 deletion quickwit/quickwit-serve/src/elasticsearch_api/model/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,10 @@ mod stats;

pub use bulk_body::BulkAction;
pub use bulk_query_params::ElasticBulkOptions;
pub use cat_indices::{CatIndexQueryParams, ElasticsearchCatIndexResponse};
pub use cat_indices::{
CatIndexQueryParams, ElasticsearchCatIndexResponse, ElasticsearchResolveIndexEntryResponse,
ElasticsearchResolveIndexResponse,
};
pub use error::{ElasticsearchError, ErrorCauseException};
pub use field_capability::{
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
Expand Down
45 changes: 36 additions & 9 deletions quickwit/quickwit-serve/src/elasticsearch_api/rest_handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,11 +50,13 @@ use super::filter::{
elastic_field_capabilities_filter, elastic_index_cat_indices_filter,
elastic_index_count_filter, elastic_index_field_capabilities_filter,
elastic_index_search_filter, elastic_index_stats_filter, elastic_multi_search_filter,
elastic_scroll_filter, elastic_stats_filter, elasticsearch_filter,
elastic_resolve_index_filter, elastic_scroll_filter, elastic_stats_filter,
elasticsearch_filter,
};
use super::model::{
build_list_field_request_for_es_api, convert_to_es_field_capabilities_response,
CatIndexQueryParams, DeleteQueryParams, ElasticsearchCatIndexResponse, ElasticsearchError,
ElasticsearchResolveIndexEntryResponse, ElasticsearchResolveIndexResponse,
ElasticsearchStatsResponse, FieldCapabilityQueryParams, FieldCapabilityRequestBody,
FieldCapabilityResponse, MultiSearchHeader, MultiSearchQueryParams, MultiSearchResponse,
MultiSearchSingleResponse, ScrollQueryParams, SearchBody, SearchQueryParams,
Expand Down Expand Up @@ -133,44 +135,54 @@ pub fn es_compat_delete_index_handler(

/// GET _elastic/_stats
pub fn es_compat_stats_handler(
search_service: MetastoreServiceClient,
metastore_service: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_stats_filter()
.and(with_arg(search_service))
.and(with_arg(metastore_service))
.then(es_compat_stats)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// GET _elastic/{index}/_stats
pub fn es_compat_index_stats_handler(
search_service: MetastoreServiceClient,
metastore_service: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_index_stats_filter()
.and(with_arg(search_service))
.and(with_arg(metastore_service))
.then(es_compat_index_stats)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// GET _elastic/_cat/indices
pub fn es_compat_cat_indices_handler(
search_service: MetastoreServiceClient,
metastore_service: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_cat_indices_filter()
.and(with_arg(search_service))
.and(with_arg(metastore_service))
.then(es_compat_cat_indices)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// GET _elastic/_cat/indices/{index}
pub fn es_compat_index_cat_indices_handler(
search_service: MetastoreServiceClient,
metastore_service: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_index_cat_indices_filter()
.and(with_arg(search_service))
.and(with_arg(metastore_service))
.then(es_compat_index_cat_indices)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// GET _elastic/_resolve/index/{index}
pub fn es_compat_resolve_index_handler(
metastore_service: MetastoreServiceClient,
) -> impl Filter<Extract = (impl warp::Reply,), Error = Rejection> + Clone {
elastic_resolve_index_filter()
.and(with_arg(metastore_service))
.then(es_compat_resolve_index)
.map(|result| make_elastic_api_response(result, BodyFormat::default()))
}

/// GET or POST _elastic/{index}/_search
pub fn es_compat_index_search_handler(
search_service: Arc<dyn SearchService>,
Expand Down Expand Up @@ -542,6 +554,21 @@ async fn es_compat_index_cat_indices(
Ok(search_response_rest)
}

async fn es_compat_resolve_index(
index_id_patterns: Vec<String>,
mut metastore: MetastoreServiceClient,
) -> Result<ElasticsearchResolveIndexResponse, ElasticsearchError> {
let indexes_metadata = resolve_index_patterns(&index_id_patterns, &mut metastore).await?;
let mut indices: Vec<ElasticsearchResolveIndexEntryResponse> = indexes_metadata
.into_iter()
.map(|metadata| metadata.into())
.collect();

indices.sort_by(|a, b| a.name.cmp(&b.name));

Ok(ElasticsearchResolveIndexResponse { indices })
}

async fn es_compat_index_field_capabilities(
index_id_patterns: Vec<String>,
search_params: FieldCapabilityQueryParams,
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1245,8 +1245,8 @@ pub mod lambda_search_api {
es_compat_cat_indices_handler, es_compat_index_cat_indices_handler,
es_compat_index_count_handler, es_compat_index_field_capabilities_handler,
es_compat_index_multi_search_handler, es_compat_index_search_handler,
es_compat_index_stats_handler, es_compat_scroll_handler, es_compat_search_handler,
es_compat_stats_handler,
es_compat_index_stats_handler, es_compat_resolve_index_handler, es_compat_scroll_handler,
es_compat_search_handler, es_compat_stats_handler,
};
pub use crate::index_api::get_index_metadata_handler;
pub use crate::rest::recover_fn;
Expand Down
Loading