Skip to content

Commit

Permalink
test compilation
Browse files Browse the repository at this point in the history
  • Loading branch information
fulmicoton committed May 25, 2024
1 parent 5637d77 commit 127414b
Show file tree
Hide file tree
Showing 4 changed files with 580 additions and 398 deletions.
68 changes: 31 additions & 37 deletions quickwit/quickwit-control-plane/src/control_plane.rs
Original file line number Diff line number Diff line change
Expand Up @@ -459,9 +459,13 @@ impl Handler<ControlPlanLoop> for ControlPlane {
if self.disable_control_loop {
return Ok(());
}
self.ingest_controller
if let Err(metastore_error) = self
.ingest_controller
.rebalance_shards(&mut self.model, ctx.mailbox(), ctx.progress())
.await;
.await
{
return convert_metastore_error::<()>(metastore_error).map(|_| ());
}
self.indexing_scheduler.control_running_plan(&self.model);
ctx.schedule_self_msg(CONTROL_PLAN_LOOP_INTERVAL, ControlPlanLoop);
Ok(())
Expand All @@ -482,23 +486,7 @@ fn convert_metastore_error<T>(
) -> Result<ControlPlaneResult<T>, ActorExitStatus> {
// If true, we know that the transactions has not been recorded in the Metastore.
// If false, we simply are not sure whether the transaction has been recorded or not.
let is_transaction_certainly_aborted = match &metastore_error {
MetastoreError::AlreadyExists(_)
| MetastoreError::FailedPrecondition { .. }
| MetastoreError::Forbidden { .. }
| MetastoreError::InvalidArgument { .. }
| MetastoreError::JsonDeserializeError { .. }
| MetastoreError::JsonSerializeError { .. }
| MetastoreError::NotFound(_)
| MetastoreError::TooManyRequests => true,

MetastoreError::Connection { .. }
| MetastoreError::Db { .. }
| MetastoreError::Internal { .. }
| MetastoreError::Io { .. }
| MetastoreError::Timeout { .. }
| MetastoreError::Unavailable(_) => false,
};
let is_transaction_certainly_aborted = metastore_error.is_transaction_certainly_aborted();
if is_transaction_certainly_aborted {
// If the metastore transaction is certain to have been aborted,
// this is actually a good thing.
Expand Down Expand Up @@ -777,29 +765,23 @@ impl Handler<GetOrCreateOpenShardsRequest> for ControlPlane {
request: GetOrCreateOpenShardsRequest,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
// TODO This seems incorrect... Index creation could yield a metastore error... In that
// case, we end up with an inconsistent state.
if let Err(control_plane_error) = self
.auto_create_indexes(&request.subrequests, ctx.progress())
.await
{
return Ok(Err(control_plane_error));
}
let response = match self
match self
.ingest_controller
.get_or_create_open_shards(request, &mut self.model, ctx.progress())
.await
{
Ok(response) => response,
Err(ControlPlaneError::Metastore(metastore_error)) => {
return convert_metastore_error(metastore_error);
Ok(resp) => {
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
Ok(Ok(resp))
}
Err(control_plane_error) => {
return Ok(Err(control_plane_error));
}
};
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
Ok(Ok(response))
Err(metastore_error) => convert_metastore_error(metastore_error),
}
}
}

Expand Down Expand Up @@ -829,9 +811,13 @@ impl Handler<LocalShardsUpdate> for ControlPlane {
local_shards_update: LocalShardsUpdate,
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
self.ingest_controller
if let Err(metastore_error) = self
.ingest_controller
.handle_local_shards_update(local_shards_update, &mut self.model, ctx.progress())
.await;
.await
{
return convert_metastore_error(metastore_error);
}
let _rebuild_plan_waiter = self.rebuild_plan_debounced(ctx);
Ok(Ok(()))
}
Expand Down Expand Up @@ -923,9 +909,13 @@ impl Handler<IndexerJoined> for ControlPlane {
message.0.node_id()
);
// TODO: Update shard table.
self.ingest_controller
if let Err(metastore_error) = self
.ingest_controller
.rebalance_shards(&mut self.model, ctx.mailbox(), ctx.progress())
.await;
.await
{
return convert_metastore_error::<()>(metastore_error).map(|_| ());
}
self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
}
Expand All @@ -949,9 +939,13 @@ impl Handler<IndexerLeft> for ControlPlane {
message.0.node_id()
);
// TODO: Update shard table.
self.ingest_controller
if let Err(metastore_error) = self
.ingest_controller
.rebalance_shards(&mut self.model, ctx.mailbox(), ctx.progress())
.await;
.await
{
return convert_metastore_error::<()>(metastore_error).map(|_| ());
}
self.indexing_scheduler.rebuild_plan(&self.model);
Ok(())
}
Expand Down
Loading

0 comments on commit 127414b

Please sign in to comment.