diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index 9642d48a198..72ef0c4995d 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -165,7 +165,7 @@ impl Actor for ControlPlane { self.model .load_from_metastore(&mut self.metastore, ctx.progress()) .await - .context("failed to initialize the model")?; + .context("failed to initialize control plane model")?; let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx); @@ -467,7 +467,10 @@ impl DeferableReplyHandler for ControlPlane { reply: impl FnOnce(Self::Reply) + Send + Sync + 'static, ctx: &ActorContext, ) -> Result<(), ActorExitStatus> { - let response = match self.metastore.create_index(request).await { + let response = match ctx + .protect_future(self.metastore.create_index(request)) + .await + { Ok(response) => response, Err(metastore_error) => { reply(convert_metastore_error(metastore_error)?); @@ -515,7 +518,10 @@ impl Handler for ControlPlane { let index_uid: IndexUid = request.index_uid().clone(); debug!(%index_uid, "deleting index"); - if let Err(metastore_error) = self.metastore.delete_index(request).await { + if let Err(metastore_error) = ctx + .protect_future(self.metastore.delete_index(request)) + .await + { return convert_metastore_error(metastore_error); }; info!(%index_uid, "deleted index"); @@ -562,7 +568,7 @@ impl Handler for ControlPlane { let source_id = source_config.source_id.clone(); debug!(%index_uid, source_id, "adding source"); - if let Err(error) = self.metastore.add_source(request).await { + if let Err(error) = ctx.protect_future(self.metastore.add_source(request)).await { return Ok(Err(ControlPlaneError::from(error))); }; self.model @@ -596,7 +602,10 @@ impl Handler for ControlPlane { let enable = request.enable; debug!(%index_uid, source_id, enable, "toggling source"); - if let Err(error) = self.metastore.toggle_source(request).await { + if let Err(error) = ctx + .protect_future(self.metastore.toggle_source(request)) + .await + { return Ok(Err(ControlPlaneError::from(error))); }; info!(%index_uid, source_id, enabled=enable, "toggled source"); @@ -629,7 +638,10 @@ impl Handler for ControlPlane { source_id: source_id.clone(), }; - if let Err(metastore_error) = self.metastore.delete_source(request).await { + if let Err(metastore_error) = ctx + .protect_future(self.metastore.delete_source(request)) + .await + { // TODO If the metastore fails returns an error but somehow succeed deleting the source, // the control plane will restart and the shards will be remaining on the ingesters. //