Skip to content

Commit

Permalink
Remove timeout from shard positions callbacks. (#4588)
Browse files Browse the repository at this point in the history
Closes #4505
  • Loading branch information
fulmicoton authored Feb 16, 2024
1 parent ec5e0b1 commit 1533c0f
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 24 deletions.
99 changes: 81 additions & 18 deletions quickwit/quickwit-common/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ struct InnerEventBroker {
impl EventBroker {
// The point of this private method is to allow the public subscribe method to have only one
// generic argument and avoid the ugly `::<E, _>` syntax.
fn subscribe_aux<E, S>(&self, subscriber: S) -> EventSubscriptionHandle
fn subscribe_aux<E, S>(&self, subscriber: S, with_timeout: bool) -> EventSubscriptionHandle
where
E: Event,
S: EventSubscriber<E> + Send + Sync + 'static,
Expand All @@ -96,6 +96,7 @@ impl EventBroker {
let subscription = EventSubscription {
subscriber_name,
subscriber: Arc::new(TokioMutex::new(Box::new(subscriber))),
with_timeout,
};
let typed_subscriptions = subscriptions
.get_mut::<EventSubscriptions<E>>()
Expand All @@ -119,10 +120,31 @@ impl EventBroker {
}

/// Subscribes to an event type.
///
/// The callback should be as light as possible.
///
/// # Disclaimer
///
/// If the callback takes more than `EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT` to execute,
/// the callback future will be aborted.
#[must_use]
pub fn subscribe<E>(&self, subscriber: impl EventSubscriber<E>) -> EventSubscriptionHandle
where E: Event {
self.subscribe_aux(subscriber)
self.subscribe_aux(subscriber, true)
}

/// Subscribes to an event type.
///
/// The callback should be as light as possible.
#[must_use]
pub fn subscribe_without_timeout<E>(
&self,
subscriber: impl EventSubscriber<E>,
) -> EventSubscriptionHandle
where
E: Event,
{
self.subscribe_aux(subscriber, false)
}

/// Publishes an event.
Expand All @@ -133,24 +155,9 @@ impl EventBroker {
.subscriptions
.lock()
.expect("lock should not be poisoned");

if let Some(typed_subscriptions) = subscriptions.get::<EventSubscriptions<E>>() {
for subscription in typed_subscriptions.values() {
let event = event.clone();
let subscriber_name = subscription.subscriber_name;
let subscriber_clone = subscription.subscriber.clone();
let handle_event_fut = async move {
if tokio::time::timeout(EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT, async {
subscriber_clone.lock().await.handle_event(event).await
})
.await
.is_err()
{
let event_name = std::any::type_name::<E>();
warn!("{subscriber_name}'s handler for {event_name} timed out");
}
};
tokio::spawn(handle_event_fut);
subscription.trigger(event.clone());
}
}
}
Expand All @@ -161,6 +168,62 @@ struct EventSubscription<E> {
// to access it.
subscriber_name: &'static str,
subscriber: Arc<TokioMutex<Box<dyn EventSubscriber<E>>>>,
with_timeout: bool,
}

impl<E: Event> EventSubscription<E> {
/// Call the callback associated with the subscription.
fn trigger(&self, event: E) {
if self.with_timeout {
self.trigger_abort_on_timeout(event);
} else {
self.trigger_just_log_on_timeout(event)
}
}

/// Spawns a task to run the given subscription.
///
/// Just logs a warning if it took more than `EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT`
/// for the future to execute.
fn trigger_just_log_on_timeout(&self, event: E) {
let subscriber_name = self.subscriber_name;
let subscriber = self.subscriber.clone();
// This task is just here to log a warning if the callback takes too long to execute.
let log_timeout_task_handle = tokio::task::spawn(async move {
tokio::time::sleep(EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT).await;
let event_name = std::any::type_name::<E>();
warn!(
"{subscriber_name}'s handler for {event_name} did not finished within {}ms",
EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT.as_millis()
);
});
tokio::task::spawn(async move {
subscriber.lock().await.handle_event(event).await;
// The callback has terminated, let's abort the timeout task.
log_timeout_task_handle.abort();
});
}

/// Spawns a task to run the given subscription.
///
/// Aborts the future execution and logs a warning if it takes more than
/// `EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT`.
fn trigger_abort_on_timeout(&self, event: E) {
let subscriber_name = self.subscriber_name;
let subscriber = self.subscriber.clone();
let fut = async move {
if tokio::time::timeout(EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT, async {
subscriber.lock().await.handle_event(event).await
})
.await
.is_err()
{
let event_name = std::any::type_name::<E>();
warn!("{subscriber_name}'s handler for {event_name} timed out, abort");
}
};
tokio::task::spawn(fut);
}
}

#[derive(Clone)]
Expand Down
3 changes: 2 additions & 1 deletion quickwit/quickwit-indexing/src/models/shard_positions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,8 +140,9 @@ impl ShardPositionsService {
let shard_positions_service = ShardPositionsService::new(event_broker.clone(), cluster);
let (shard_positions_service_mailbox, _) =
spawn_ctx.spawn_builder().spawn(shard_positions_service);
// This subscription is in charge of updating the shard positions model.
event_broker
.subscribe::<LocalShardPositionsUpdate>(move |update| {
.subscribe_without_timeout::<LocalShardPositionsUpdate>(move |update| {
if shard_positions_service_mailbox
.try_send_message(update)
.is_err()
Expand Down
4 changes: 2 additions & 2 deletions quickwit/quickwit-ingest/src/ingest_v2/ingester.rs
Original file line number Diff line number Diff line change
Expand Up @@ -400,9 +400,9 @@ impl Ingester {

pub fn subscribe(&self, event_broker: &EventBroker) {
let weak_ingester_state = self.state.weak();

// This subscription is the one in charge of truncating the mrecordlog.
event_broker
.subscribe::<ShardPositionsUpdate>(weak_ingester_state)
.subscribe_without_timeout::<ShardPositionsUpdate>(weak_ingester_state)
.forever();
}

Expand Down
5 changes: 2 additions & 3 deletions quickwit/quickwit-serve/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -826,12 +826,11 @@ async fn setup_control_plane(
metastore,
);
let subscriber = ControlPlaneEventSubscriber::new(control_plane_mailbox.downgrade());

event_broker
.subscribe::<LocalShardsUpdate>(subscriber.clone())
.subscribe_without_timeout::<LocalShardsUpdate>(subscriber.clone())
.forever();
event_broker
.subscribe::<ShardPositionsUpdate>(subscriber)
.subscribe_without_timeout::<ShardPositionsUpdate>(subscriber)
.forever();

Ok(control_plane_mailbox)
Expand Down

0 comments on commit 1533c0f

Please sign in to comment.