Skip to content

Commit

Permalink
Remove race condition in actor framework
Browse files Browse the repository at this point in the history
The bug could surfaced on any actor looping alone,
like the control plane.

Closes #4248
  • Loading branch information
fulmicoton committed Dec 9, 2023
1 parent 735e384 commit 2ff4e8d
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 5 deletions.
14 changes: 10 additions & 4 deletions quickwit/quickwit-actors/src/spawn_builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -285,17 +285,23 @@ impl<A: Actor> ActorExecutionEnv<A> {
self.process_one_message(envelope).await?;
}
self.ctx.yield_now().await;

if self.inbox.is_empty() {
break;
}
}
self.actor.get_mut().on_drained_messages(&self.ctx).await?;
self.ctx.idle();
if self.ctx.mailbox().is_last_mailbox() {
// No one will be able to send us more messages.
// We can exit the actor.
return Err(ActorExitStatus::Success);
// We double check here that the mailbox does not contain any messages,
// as someone on different runtime thread could have added a last message
// and dropped the last mailbox right before this block.
// See #4248
if self.inbox.is_empty() {
// No one will be able to send us more messages.
// We can exit the actor.
info!(actor = self.ctx.actor_instance_id(), "no more messages");
return Err(ActorExitStatus::Success);
}
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,7 +117,7 @@ impl Ingester {
rate_limiter_settings: RateLimiterSettings,
replication_factor: usize,
) -> IngestV2Result<Self> {
let self_node_id: NodeId = cluster.self_node_id().clone().into();
let self_node_id: NodeId = cluster.self_node_id().into();
let mrecordlog = MultiRecordLog::open_with_prefs(
wal_dir_path,
mrecordlog::SyncPolicy::OnDelay(Duration::from_secs(5)),
Expand Down

0 comments on commit 2ff4e8d

Please sign in to comment.