diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index 0ead9318091..f749b6a9e3f 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -73,7 +73,7 @@ struct InnerEventBroker { impl EventBroker { // The point of this private method is to allow the public subscribe method to have only one // generic argument and avoid the ugly `::` syntax. - fn subscribe_aux(&self, subscriber: S) -> EventSubscriptionHandle + fn subscribe_aux(&self, subscriber: S, with_timeout: bool) -> EventSubscriptionHandle where E: Event, S: EventSubscriber + Send + Sync + 'static, @@ -96,6 +96,7 @@ impl EventBroker { let subscription = EventSubscription { subscriber_name, subscriber: Arc::new(TokioMutex::new(Box::new(subscriber))), + with_timeout, }; let typed_subscriptions = subscriptions .get_mut::>() @@ -119,10 +120,31 @@ impl EventBroker { } /// Subscribes to an event type. + /// + /// The callback should be as light as possible. + /// + /// # Disclaimer + /// + /// If the callback takes more than `EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT` to execute, + /// the callback future will be aborted. #[must_use] pub fn subscribe(&self, subscriber: impl EventSubscriber) -> EventSubscriptionHandle where E: Event { - self.subscribe_aux(subscriber) + self.subscribe_aux(subscriber, true) + } + + /// Subscribes to an event type. + /// + /// The callback should be as light as possible. + #[must_use] + pub fn subscribe_without_timeout( + &self, + subscriber: impl EventSubscriber, + ) -> EventSubscriptionHandle + where + E: Event, + { + self.subscribe_aux(subscriber, false) } /// Publishes an event. @@ -133,24 +155,9 @@ impl EventBroker { .subscriptions .lock() .expect("lock should not be poisoned"); - if let Some(typed_subscriptions) = subscriptions.get::>() { for subscription in typed_subscriptions.values() { - let event = event.clone(); - let subscriber_name = subscription.subscriber_name; - let subscriber_clone = subscription.subscriber.clone(); - let handle_event_fut = async move { - if tokio::time::timeout(EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT, async { - subscriber_clone.lock().await.handle_event(event).await - }) - .await - .is_err() - { - let event_name = std::any::type_name::(); - warn!("{subscriber_name}'s handler for {event_name} timed out"); - } - }; - tokio::spawn(handle_event_fut); + subscription.trigger(event.clone()); } } } @@ -161,6 +168,62 @@ struct EventSubscription { // to access it. subscriber_name: &'static str, subscriber: Arc>>>, + with_timeout: bool, +} + +impl EventSubscription { + /// Call the callback associated with the subscription. + fn trigger(&self, event: E) { + if self.with_timeout { + self.trigger_abort_on_timeout(event); + } else { + self.trigger_just_log_on_timeout(event) + } + } + + /// Spawns a task to run the given subscription. + /// + /// Just logs a warning if it took more than `EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT` + /// for the future to execute. + fn trigger_just_log_on_timeout(&self, event: E) { + let subscriber_name = self.subscriber_name; + let subscriber = self.subscriber.clone(); + // This task is just here to log a warning if the callback takes too long to execute. + let log_timeout_task_handle = tokio::task::spawn(async move { + tokio::time::sleep(EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT).await; + let event_name = std::any::type_name::(); + warn!( + "{subscriber_name}'s handler for {event_name} did not finished within {}ms", + EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT.as_millis() + ); + }); + tokio::task::spawn(async move { + subscriber.lock().await.handle_event(event).await; + // The callback has terminated, let's abort the timeout task. + log_timeout_task_handle.abort(); + }); + } + + /// Spawns a task to run the given subscription. + /// + /// Aborts the future execution and logs a warning if it takes more than + /// `EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT`. + fn trigger_abort_on_timeout(&self, event: E) { + let subscriber_name = self.subscriber_name; + let subscriber = self.subscriber.clone(); + let fut = async move { + if tokio::time::timeout(EVENT_SUBSCRIPTION_CALLBACK_TIMEOUT, async { + subscriber.lock().await.handle_event(event).await + }) + .await + .is_err() + { + let event_name = std::any::type_name::(); + warn!("{subscriber_name}'s handler for {event_name} timed out, abort"); + } + }; + tokio::task::spawn(fut); + } } #[derive(Clone)] diff --git a/quickwit/quickwit-indexing/src/models/shard_positions.rs b/quickwit/quickwit-indexing/src/models/shard_positions.rs index b2d9cfd4b7c..e1c41b72131 100644 --- a/quickwit/quickwit-indexing/src/models/shard_positions.rs +++ b/quickwit/quickwit-indexing/src/models/shard_positions.rs @@ -140,8 +140,9 @@ impl ShardPositionsService { let shard_positions_service = ShardPositionsService::new(event_broker.clone(), cluster); let (shard_positions_service_mailbox, _) = spawn_ctx.spawn_builder().spawn(shard_positions_service); + // This subscription is in charge of updating the shard positions model. event_broker - .subscribe::(move |update| { + .subscribe_without_timeout::(move |update| { if shard_positions_service_mailbox .try_send_message(update) .is_err() diff --git a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs index 2783f8a2f1f..05ef6f5f301 100644 --- a/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs +++ b/quickwit/quickwit-ingest/src/ingest_v2/ingester.rs @@ -400,9 +400,9 @@ impl Ingester { pub fn subscribe(&self, event_broker: &EventBroker) { let weak_ingester_state = self.state.weak(); - + // This subscription is the one in charge of truncating the mrecordlog. event_broker - .subscribe::(weak_ingester_state) + .subscribe_without_timeout::(weak_ingester_state) .forever(); } diff --git a/quickwit/quickwit-serve/src/lib.rs b/quickwit/quickwit-serve/src/lib.rs index 9fad78a13c8..0750d2be424 100644 --- a/quickwit/quickwit-serve/src/lib.rs +++ b/quickwit/quickwit-serve/src/lib.rs @@ -826,12 +826,11 @@ async fn setup_control_plane( metastore, ); let subscriber = ControlPlaneEventSubscriber::new(control_plane_mailbox.downgrade()); - event_broker - .subscribe::(subscriber.clone()) + .subscribe_without_timeout::(subscriber.clone()) .forever(); event_broker - .subscribe::(subscriber) + .subscribe_without_timeout::(subscriber) .forever(); Ok(control_plane_mailbox)