Skip to content

Commit

Permalink
Protect some futures in the control plane
Browse files Browse the repository at this point in the history
  • Loading branch information
guilload committed Mar 15, 2024
1 parent c7bffd6 commit 6aabd00
Showing 1 changed file with 18 additions and 6 deletions.
24 changes: 18 additions & 6 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,7 @@ impl Actor for ControlPlane {
self.model
.load_from_metastore(&mut self.metastore, ctx.progress())
.await
.context("failed to initialize the model")?;
.context("failed to initialize control plane model")?;

let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);

Expand Down Expand Up @@ -467,7 +467,10 @@ impl DeferableReplyHandler<CreateIndexRequest> for ControlPlane {
reply: impl FnOnce(Self::Reply) + Send + Sync + 'static,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
let response = match self.metastore.create_index(request).await {
let response = match ctx
.protect_future(self.metastore.create_index(request))
.await
{
Ok(response) => response,
Err(metastore_error) => {
reply(convert_metastore_error(metastore_error)?);
Expand Down Expand Up @@ -515,7 +518,10 @@ impl Handler<DeleteIndexRequest> for ControlPlane {
let index_uid: IndexUid = request.index_uid().clone();
debug!(%index_uid, "deleting index");

if let Err(metastore_error) = self.metastore.delete_index(request).await {
if let Err(metastore_error) = ctx
.protect_future(self.metastore.delete_index(request))
.await
{
return convert_metastore_error(metastore_error);
};
info!(%index_uid, "deleted index");
Expand Down Expand Up @@ -562,7 +568,7 @@ impl Handler<AddSourceRequest> for ControlPlane {
let source_id = source_config.source_id.clone();
debug!(%index_uid, source_id, "adding source");

if let Err(error) = self.metastore.add_source(request).await {
if let Err(error) = ctx.protect_future(self.metastore.add_source(request)).await {
return Ok(Err(ControlPlaneError::from(error)));
};
self.model
Expand Down Expand Up @@ -596,7 +602,10 @@ impl Handler<ToggleSourceRequest> for ControlPlane {
let enable = request.enable;
debug!(%index_uid, source_id, enable, "toggling source");

if let Err(error) = self.metastore.toggle_source(request).await {
if let Err(error) = ctx
.protect_future(self.metastore.toggle_source(request))
.await
{
return Ok(Err(ControlPlaneError::from(error)));
};
info!(%index_uid, source_id, enabled=enable, "toggled source");
Expand Down Expand Up @@ -629,7 +638,10 @@ impl Handler<DeleteSourceRequest> for ControlPlane {
source_id: source_id.clone(),
};

if let Err(metastore_error) = self.metastore.delete_source(request).await {
if let Err(metastore_error) = ctx
.protect_future(self.metastore.delete_source(request))
.await
{
// TODO If the metastore fails returns an error but somehow succeed deleting the source,
// the control plane will restart and the shards will be remaining on the ingesters.
//
Expand Down

0 comments on commit 6aabd00

Please sign in to comment.