Skip to content

Commit

Permalink
WIP
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Apr 27, 2024
1 parent 32edb28 commit 2e3bdb4
Show file tree
Hide file tree
Showing 17 changed files with 965 additions and 151 deletions.
36 changes: 32 additions & 4 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use quickwit_proto::metastore::{
serde_utils, AddSourceRequest, CreateIndexRequest, CreateIndexResponse, DeleteIndexRequest,
DeleteShardsRequest, DeleteSourceRequest, EmptyResponse, FindIndexTemplateMatchesRequest,
IndexTemplateMatch, MetastoreError, MetastoreResult, MetastoreService, MetastoreServiceClient,
ToggleSourceRequest,
ToggleSourceRequest, UpdateIndexRequest,
};
use quickwit_proto::types::{IndexUid, NodeId, ShardId, SourceUid};
use serde::Serialize;
Expand Down Expand Up @@ -570,11 +570,39 @@ impl DeferableReplyHandler<CreateIndexRequest> for ControlPlane {
} else {
reply(Ok(response));
}

Ok(())
}
}

// This handler is a metastore call proxied through the control plane: we must first forward the
// request to the metastore, and then act on the event.
#[async_trait]
impl Handler<UpdateIndexRequest> for ControlPlane {
type Reply = ControlPlaneResult<IndexMetadataResponse>;

async fn handle(
&mut self,
request: UpdateIndexRequest,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let index_uid: IndexUid = request.index_uid().clone();
debug!(%index_uid, "updating index");

let response = match ctx
.protect_future(self.metastore.update_index(request))
.await
{
Ok(response) => response,
Err(metastore_error) => {
return convert_metastore_error(metastore_error);
}
};
// TODO: Handle doc mapping and/or indexing settings update here.
info!(%index_uid, "updated index");
Ok(Ok(response))
}
}

// This handler is a metastore call proxied through the control plane: we must first forward the
// request to the metastore, and then act on the event.
#[async_trait]
Expand Down Expand Up @@ -681,9 +709,9 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
};
info!(%index_uid, source_id, enabled=enable, "toggled source");

let mutation_occured = self.model.toggle_source(&index_uid, &source_id, enable)?;
let mutation_occurred = self.model.toggle_source(&index_uid, &source_id, enable)?;

if mutation_occured {
if mutation_occurred {
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
}
Ok(Ok(EmptyResponse {}))
Expand Down
77 changes: 51 additions & 26 deletions quickwit/quickwit-indexing/src/actors/indexing_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,13 +43,17 @@ use quickwit_ingest::{
DropQueueRequest, GetPartitionId, IngestApiService, IngesterPool, ListQueuesRequest,
QUEUES_DIR_NAME,
};
use quickwit_metastore::{IndexMetadata, IndexMetadataResponseExt, ListIndexesMetadataResponseExt};
use quickwit_metastore::{
IndexMetadata, IndexMetadataResponseExt, IndexesMetadataResponseExt,
ListIndexesMetadataResponseExt,
};
use quickwit_proto::indexing::{
ApplyIndexingPlanRequest, ApplyIndexingPlanResponse, IndexingError, IndexingPipelineId,
IndexingTask, PipelineMetrics,
};
use quickwit_proto::metastore::{
IndexMetadataRequest, ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
IndexMetadataRequest, IndexMetadataSubrequest, IndexesMetadataRequest,
ListIndexesMetadataRequest, MetastoreService, MetastoreServiceClient,
};
use quickwit_proto::types::{IndexId, IndexUid, PipelineUid};
use quickwit_storage::StorageResolver;
Expand Down Expand Up @@ -366,20 +370,46 @@ impl IndexingService {
}

async fn index_metadata(
&self,
&mut self,
ctx: &ActorContext<Self>,
index_id: &str,
) -> Result<IndexMetadata, IndexingError> {
let _protect_guard = ctx.protect_zone();
let index_metadata_response = self
.metastore
.clone()
.index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string()))
let index_metadata_response = ctx
.protect_future(
self.metastore
.index_metadata(IndexMetadataRequest::for_index_id(index_id.to_string())),
)
.await?;
let index_metadata = index_metadata_response.deserialize_index_metadata()?;
Ok(index_metadata)
}

async fn indexes_metadata(
&mut self,
ctx: &ActorContext<Self>,
indexing_pipeline_ids: &[IndexingPipelineId],
) -> Result<Vec<IndexMetadata>, IndexingError> {
let index_metadata_subrequests: Vec<IndexMetadataSubrequest> = indexing_pipeline_ids
.iter()
// Remove duplicate subrequests
.unique_by(|pipeline_id| &pipeline_id.index_uid)
.map(|pipeline_id| IndexMetadataSubrequest {
index_id: None,
index_uid: Some(pipeline_id.index_uid.clone()),
})
.collect();
let indexes_metadata_request = IndexesMetadataRequest {
subrequests: index_metadata_subrequests,
};
let indexes_metadata_response = ctx
.protect_future(self.metastore.indexes_metadata(indexes_metadata_request))
.await?;
let indexes_metadata = indexes_metadata_response
.deserialize_indexes_metadata()
.await?;
Ok(indexes_metadata)
}

async fn handle_supervise(&mut self) -> Result<(), ActorExitStatus> {
self.indexing_pipelines
.retain(|pipeline_uid, pipeline_handle| {
Expand All @@ -399,7 +429,7 @@ impl IndexingService {
// and are themselves in charge of supervising the pipeline actors.
error!(
pipeline_uid=%pipeline_uid,
"Indexing pipeline exited with failure. This should never happen."
"indexing pipeline exited with failure: this should never happen, please report"
);
self.counters.num_failed_pipelines += 1;
self.counters.num_running_pipelines -= 1;
Expand Down Expand Up @@ -489,9 +519,9 @@ impl IndexingService {
let pipeline_uids_to_remove: Vec<PipelineUid> = self
.indexing_pipelines
.keys()
.cloned()
.filter(|pipeline_uid| !pipeline_uids_in_plan.contains(pipeline_uid))
.collect::<Vec<_>>();
.cloned()
.collect();

// Shut down currently running pipelines that are missing in the new plan.
self.shutdown_pipelines(&pipeline_uids_to_remove).await;
Expand All @@ -508,15 +538,15 @@ impl IndexingService {
let pipeline_uid = indexing_task.pipeline_uid();
!self.indexing_pipelines.contains_key(&pipeline_uid)
})
.flat_map(|indexing_task| {
.map(|indexing_task| {
let pipeline_uid = indexing_task.pipeline_uid();
let index_uid = indexing_task.index_uid().clone();
Some(IndexingPipelineId {
IndexingPipelineId {
node_id: self.node_id.clone(),
index_uid,
source_id: indexing_task.source_id.clone(),
pipeline_uid,
})
}
})
.collect();
self.spawn_pipelines(&pipeline_ids_to_add, ctx).await
Expand Down Expand Up @@ -580,13 +610,9 @@ impl IndexingService {
ctx: &ActorContext<Self>,
) -> Result<Vec<IndexingPipelineId>, IndexingError> {
// We fetch the new indexes metadata.
let indexes_metadata_futures = added_pipeline_ids
.iter()
// No need to emit two request for the same `index_uid`
.unique_by(|pipeline_id| pipeline_id.index_uid.clone())
.map(|pipeline_id| self.index_metadata(ctx, &pipeline_id.index_uid.index_id));
let indexes_metadata = try_join_all(indexes_metadata_futures).await?;
let indexes_metadata_by_index_id: HashMap<IndexUid, IndexMetadata> = indexes_metadata
let indexes_metadata = self.indexes_metadata(ctx, added_pipeline_ids).await?;

let per_index_uid_indexes_metadata: HashMap<IndexUid, IndexMetadata> = indexes_metadata
.into_iter()
.map(|index_metadata| (index_metadata.index_uid.clone(), index_metadata))
.collect();
Expand All @@ -596,7 +622,7 @@ impl IndexingService {
// Add new pipelines.
for new_pipeline_id in added_pipeline_ids {
if let Some(index_metadata) =
indexes_metadata_by_index_id.get(&new_pipeline_id.index_uid)
per_index_uid_indexes_metadata.get(&new_pipeline_id.index_uid)
{
if let Some(source_config) = index_metadata.sources.get(&new_pipeline_id.source_id)
{
Expand All @@ -618,13 +644,12 @@ impl IndexingService {
}
} else {
error!(
"Failed to spawn pipeline: index {} no longer exists.",
&new_pipeline_id.index_uid.to_string()
"failed to spawn pipeline: index `{}` no longer exists",
new_pipeline_id.index_uid
);
failed_spawning_pipeline_ids.push(new_pipeline_id.clone());
}
}

Ok(failed_spawning_pipeline_ids)
}

Expand All @@ -640,7 +665,7 @@ impl IndexingService {
for &pipeline_uid_to_remove in pipeline_uids {
match self.detach_pipeline(pipeline_uid_to_remove).await {
Ok(pipeline_handle) => {
// Killing the pipeline ensure that all pipeline actors will stop.
// Killing the pipeline ensures that all the pipeline actors will stop.
pipeline_handle.kill().await;
}
Err(error) => {
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-metastore/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,8 @@ pub(crate) use metastore::index_metadata::serialize::{IndexMetadataV0_8, Version
pub use metastore::postgres::PostgresqlMetastore;
pub use metastore::{
file_backed, AddSourceRequestExt, CreateIndexRequestExt, CreateIndexResponseExt, IndexMetadata,
IndexMetadataResponseExt, ListIndexesMetadataResponseExt, ListSplitsQuery,
ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt,
IndexMetadataResponseExt, IndexesMetadataResponseExt, ListIndexesMetadataResponseExt,
ListSplitsQuery, ListSplitsRequestExt, ListSplitsResponseExt, MetastoreServiceExt,
MetastoreServiceStreamSplitsExt, PublishSplitsRequestExt, StageSplitsRequestExt,
UpdateIndexRequestExt,
};
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,14 @@ use quickwit_proto::metastore::{
DeleteIndexTemplatesRequest, DeleteQuery, DeleteShardsRequest, DeleteSourceRequest,
DeleteSplitsRequest, DeleteTask, EmptyResponse, FindIndexTemplateMatchesRequest,
FindIndexTemplateMatchesResponse, GetIndexTemplateRequest, GetIndexTemplateResponse,
IndexMetadataRequest, IndexMetadataResponse, LastDeleteOpstampRequest,
LastDeleteOpstampResponse, ListDeleteTasksRequest, ListDeleteTasksResponse,
ListIndexTemplatesRequest, ListIndexTemplatesResponse, ListIndexesMetadataRequest,
ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse, ListSplitsRequest,
ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest, MetastoreResult,
MetastoreService, MetastoreServiceClient, MetastoreServiceStream, OpenShardsRequest,
OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest, StageSplitsRequest,
ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
IndexMetadataRequest, IndexMetadataResponse, IndexesMetadataRequest, IndexesMetadataResponse,
LastDeleteOpstampRequest, LastDeleteOpstampResponse, ListDeleteTasksRequest,
ListDeleteTasksResponse, ListIndexTemplatesRequest, ListIndexTemplatesResponse,
ListIndexesMetadataRequest, ListIndexesMetadataResponse, ListShardsRequest, ListShardsResponse,
ListSplitsRequest, ListSplitsResponse, ListStaleSplitsRequest, MarkSplitsForDeletionRequest,
MetastoreResult, MetastoreService, MetastoreServiceClient, MetastoreServiceStream,
OpenShardsRequest, OpenShardsResponse, PublishSplitsRequest, ResetSourceCheckpointRequest,
StageSplitsRequest, ToggleSourceRequest, UpdateIndexRequest, UpdateSplitsDeleteOpstampRequest,
UpdateSplitsDeleteOpstampResponse,
};

Expand Down Expand Up @@ -86,6 +86,19 @@ impl MetastoreService for ControlPlaneMetastore {
Ok(response)
}

// Technically, proxying this call via the control plane is not necessary at the moment because
// it does not modify attributes the control plane cares about (retention policy, search
// settings). However, it would be easy to forget to do so when we add the ability to update the
// doc mapping or merge policy of an index, so we've already set up the proxy here since calling
// `update_index` is very infrequent anyway.
async fn update_index(
&mut self,
request: UpdateIndexRequest,
) -> MetastoreResult<IndexMetadataResponse> {
let response = self.control_plane.update_index(request).await?;
Ok(response)
}

async fn delete_index(
&mut self,
request: DeleteIndexRequest,
Expand Down Expand Up @@ -117,21 +130,20 @@ impl MetastoreService for ControlPlaneMetastore {

// Other metastore API calls.

async fn update_index(
&mut self,
request: UpdateIndexRequest,
) -> MetastoreResult<IndexMetadataResponse> {
let response = self.metastore.update_index(request).await?;
Ok(response)
}

async fn index_metadata(
&mut self,
request: IndexMetadataRequest,
) -> MetastoreResult<IndexMetadataResponse> {
self.metastore.index_metadata(request).await
}

async fn indexes_metadata(
&mut self,
request: IndexesMetadataRequest,
) -> MetastoreResult<IndexesMetadataResponse> {
self.metastore.indexes_metadata(request).await
}

async fn list_indexes_metadata(
&mut self,
request: ListIndexesMetadataRequest,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -213,18 +213,14 @@ impl FileBackedIndex {
&self.metadata
}

/// Replaces the search settings in the index config, returning whether a mutation occurred.
pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool {
let is_mutation = self.metadata.index_config.search_settings != search_settings;
self.metadata.index_config.search_settings = search_settings;
is_mutation
}

/// Replaces the retention policy in the index config, returning whether a mutation occurred.
pub fn set_retention_policy(&mut self, retention_policy_opt: Option<RetentionPolicy>) -> bool {
let is_mutation = self.metadata.index_config.retention_policy_opt != retention_policy_opt;
self.metadata.index_config.retention_policy_opt = retention_policy_opt;
is_mutation
self.metadata.set_retention_policy(retention_policy_opt)
}

/// Replaces the search settings in the index config, returning whether a mutation occurred.
pub fn set_search_settings(&mut self, search_settings: SearchSettings) -> bool {
self.metadata.set_search_settings(search_settings)
}

/// Stages a single split.
Expand Down
Loading

0 comments on commit 2e3bdb4

Please sign in to comment.