diff --git a/quickwit/quickwit-common/src/pubsub.rs b/quickwit/quickwit-common/src/pubsub.rs index 816eeff1303..c318d95fb93 100644 --- a/quickwit/quickwit-common/src/pubsub.rs +++ b/quickwit/quickwit-common/src/pubsub.rs @@ -69,10 +69,13 @@ struct InnerEventBroker { } impl EventBroker { - /// Subscribes to an event type. - #[must_use] - pub fn subscribe(&self, subscriber: impl EventSubscriber) -> EventSubscriptionHandle - where E: Event { + // The point of this private method is to allow the public subscribe method to have only one + // generic argument and avoid the ugly `::` syntax. + fn subscribe_aux(&self, subscriber: S) -> EventSubscriptionHandle + where + E: Event, + S: EventSubscriber + Send + Sync + 'static, + { let mut subscriptions = self .inner .subscriptions @@ -87,7 +90,9 @@ impl EventBroker { .subscription_sequence .fetch_add(1, Ordering::Relaxed); + let subscriber_name = std::any::type_name::(); let subscription = EventSubscription { + subscriber_name, subscriber: Arc::new(TokioMutex::new(Box::new(subscriber))), }; let typed_subscriptions = subscriptions @@ -111,6 +116,13 @@ impl EventBroker { } } + /// Subscribes to an event type. + #[must_use] + pub fn subscribe(&self, subscriber: impl EventSubscriber) -> EventSubscriptionHandle + where E: Event { + self.subscribe_aux(subscriber) + } + /// Publishes an event. pub fn publish(&self, event: E) where E: Event { @@ -123,6 +135,7 @@ impl EventBroker { if let Some(typed_subscriptions) = subscriptions.get::>() { for subscription in typed_subscriptions.values() { let event = event.clone(); + let subscriber_name = subscription.subscriber_name; let subscriber_clone = subscription.subscriber.clone(); let handle_event_fut = async move { if tokio::time::timeout(Duration::from_secs(1), async { @@ -131,7 +144,8 @@ impl EventBroker { .await .is_err() { - warn!("`{}` event handler timed out", std::any::type_name::()); + let event_name = std::any::type_name::(); + warn!("{}'s handler for {event_name} timed out", subscriber_name); } }; tokio::spawn(handle_event_fut); @@ -141,6 +155,9 @@ impl EventBroker { } struct EventSubscription { + // We put that in the subscription in order to avoid having to take the lock + // to access it. + subscriber_name: &'static str, subscriber: Arc>>>, } diff --git a/quickwit/quickwit-control-plane/src/control_plane.rs b/quickwit/quickwit-control-plane/src/control_plane.rs index a92e57314be..eebf521f3c2 100644 --- a/quickwit/quickwit-control-plane/src/control_plane.rs +++ b/quickwit/quickwit-control-plane/src/control_plane.rs @@ -165,7 +165,7 @@ impl ControlPlane { /// This method includes debouncing logic. Every call will be followed by a cooldown period. fn rebuild_plan_debounced(&mut self, ctx: &ActorContext) { self.rebuild_plan_debouncer - .self_send_with_cooldown::<_, RebuildPlan>(ctx); + .self_send_with_cooldown::(ctx); } /// Deletes a set of shards from the metastore and the control plane model. diff --git a/quickwit/quickwit-control-plane/src/debouncer.rs b/quickwit/quickwit-control-plane/src/debouncer.rs index 93ea9b2cf12..7b7e8566c8a 100644 --- a/quickwit/quickwit-control-plane/src/debouncer.rs +++ b/quickwit/quickwit-control-plane/src/debouncer.rs @@ -117,9 +117,10 @@ impl Debouncer { .schedule_event(callback, self.cooldown_period); } - pub fn self_send_with_cooldown(&self, ctx: &ActorContext) - where - A: Actor + Handler + DeferableReplyHandler, + pub fn self_send_with_cooldown( + &self, + ctx: &ActorContext + DeferableReplyHandler>, + ) where M: Default + std::fmt::Debug + Send + Sync + 'static, { let cooldown_state = self.accept_transition(Transition::Emit); @@ -194,7 +195,7 @@ mod tests { _message: DebouncedIncrement, ctx: &ActorContext, ) -> Result { - self.debouncer.self_send_with_cooldown::<_, Increment>(ctx); + self.debouncer.self_send_with_cooldown::(ctx); Ok(()) } }