Skip to content

Commit

Permalink
Fix visibility actor drop
Browse files Browse the repository at this point in the history
  • Loading branch information
rdettai committed Jul 3, 2024
1 parent 95beb16 commit bfd360e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 12 deletions.
7 changes: 5 additions & 2 deletions quickwit/quickwit-actors/src/actor_context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -339,8 +339,11 @@ impl<A: Actor> ActorContext<A> {
self.self_mailbox.try_send_message(msg)
}

/// Schedules a message that will be sent to the high-priority
/// queue of the actor Mailbox once `after_duration` has elapsed.
/// Schedules a message that will be sent to the high-priority queue of the
/// actor Mailbox once `after_duration` has elapsed.
///
/// Note that this holds a reference to the actor mailbox until the message
/// is actually sent.
pub fn schedule_self_msg<M>(&self, after_duration: Duration, message: M)
where
A: DeferableReplyHandler<M>,
Expand Down
34 changes: 24 additions & 10 deletions quickwit/quickwit-indexing/src/source/queue_sources/visibility.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
// You should have received a copy of the GNU Affero General Public License
// along with this program. If not, see <http://www.gnu.org/licenses/>.

use std::sync::Arc;
use std::sync::{Arc, Weak};
use std::time::{Duration, Instant};

use anyhow::{anyhow, Context};
Expand Down Expand Up @@ -67,8 +67,9 @@ struct VisibilityTask {
ack_id: String,
extension_count: u64,
current_deadline: Instant,
stop_extension_loop: bool,
last_extension_requested: bool,
visibility_settings: VisibilitySettings,
ref_count: Weak<()>,
}

// A handle to the visibility actor. When dropped, the actor exits and the
Expand All @@ -77,6 +78,7 @@ pub(super) struct VisibilityTaskHandle {
mailbox: Mailbox<VisibilityTask>,
actor_handle: ActorHandle<VisibilityTask>,
ack_id: String,
_ref_count: Arc<()>,
}

/// Spawns actor that ensures that the visibility of a given message
Expand All @@ -90,19 +92,23 @@ pub(super) fn spawn_visibility_task(
current_deadline: Instant,
visibility_settings: VisibilitySettings,
) -> VisibilityTaskHandle {
let ref_count = Arc::new(());
let weak_ref = Arc::downgrade(&ref_count);
let task = VisibilityTask {
queue,
ack_id: ack_id.clone(),
extension_count: 0,
current_deadline,
stop_extension_loop: false,
last_extension_requested: false,
visibility_settings,
ref_count: weak_ref,
};
let (_mailbox, _actor_handle) = ctx.spawn_actor().spawn(task);
let (mailbox, actor_handle) = ctx.spawn_actor().spawn(task);
VisibilityTaskHandle {
mailbox: _mailbox,
actor_handle: _actor_handle,
mailbox,
actor_handle,
ack_id,
_ref_count: ref_count,
}
}

Expand Down Expand Up @@ -189,7 +195,10 @@ impl Handler<Loop> for VisibilityTask {
_message: Loop,
ctx: &ActorContext<Self>,
) -> Result<(), ActorExitStatus> {
if self.stop_extension_loop {
if self.ref_count.strong_count() == 0 {
return Ok(());
}
if self.last_extension_requested {
return Ok(());
}
self.extend_visibility(ctx, self.visibility_settings.deadline_for_default_extension)
Expand All @@ -216,7 +225,7 @@ impl Handler<RequestLastExtension> for VisibilityTask {
ctx: &ActorContext<Self>,
) -> Result<Self::Reply, ActorExitStatus> {
let last_deadline = Instant::now() + message.extension;
self.stop_extension_loop = true;
self.last_extension_requested = true;
if last_deadline > self.current_deadline {
Ok(self.extend_visibility(ctx, message.extension).await)
} else {
Expand Down Expand Up @@ -266,7 +275,8 @@ mod tests {
// assert that the background task performs extensions
assert!(!handle.extension_failed());
tokio::time::sleep_until(initial_deadline.into()).await;
assert!(initial_deadline < queue.next_visibility_deadline(&ack_id).unwrap());
let next_deadline = queue.next_visibility_deadline(&ack_id).unwrap();
assert!(initial_deadline < next_deadline);
assert!(!handle.extension_failed());
// request last extension
handle
Expand Down Expand Up @@ -313,7 +323,11 @@ mod tests {
// assert that visibility is not extended after drop
drop(handle);
tokio::time::sleep_until(initial_deadline.into()).await;
assert_eq!(queue.next_visibility_deadline(&ack_id), None);
// the message is either already expired or about to expire
if let Some(next_deadline) = queue.next_visibility_deadline(&ack_id) {
assert_eq!(next_deadline, initial_deadline);
}
// assert_eq!(q, None);
universe.assert_quit().await;
}
}

0 comments on commit bfd360e

Please sign in to comment.