From 33bf9ba598f3c0b045a385972f7aa9db39cc5048 Mon Sep 17 00:00:00 2001 From: William Wen <44139337+wenym1@users.noreply.github.com> Date: Tue, 21 Jan 2025 18:37:55 +0800 Subject: [PATCH] fix(meta): fix receiving scheduled command on blocked database (#20241) --- src/meta/src/barrier/schedule.rs | 34 +++++++++++++++++++++++++++----- 1 file changed, 29 insertions(+), 5 deletions(-) diff --git a/src/meta/src/barrier/schedule.rs b/src/meta/src/barrier/schedule.rs index 13653a382d4ea..299f34bf59aa1 100644 --- a/src/meta/src/barrier/schedule.rs +++ b/src/meta/src/barrier/schedule.rs @@ -64,6 +64,12 @@ enum QueueStatus { Blocked(String), } +impl QueueStatus { + fn is_blocked(&self) -> bool { + matches!(self, Self::Blocked(_)) + } +} + struct ScheduledQueueItem { command: Command, notifiers: Vec, @@ -94,8 +100,10 @@ impl StatusQueue { self.status = QueueStatus::Blocked(reason); } - fn mark_ready(&mut self) { + fn mark_ready(&mut self) -> bool { + let prev_blocked = self.status.is_blocked(); self.status = QueueStatus::Ready; + prev_blocked } fn validate_item(&mut self, scheduled: &ScheduledQueueItem) -> MetaResult<()> { @@ -395,7 +403,13 @@ impl ScheduledBarriers { let mut rx = self.inner.changed_tx.subscribe(); { let mut queue = self.inner.queue.lock(); + if queue.status.is_blocked() { + continue; + } for (database_id, queue) in &mut queue.queue { + if queue.status.is_blocked() { + continue; + } if let Some(item) = queue.queue.pop_front() { item.send_latency_timer.observe_duration(); break 'outer Scheduled { @@ -452,16 +466,26 @@ impl ScheduledBarriers { pub(super) fn mark_ready(&self, database_id: Option) { let mut queue = self.inner.queue.lock(); if let Some(database_id) = database_id { - queue + let database_queue = queue .queue .entry(database_id) - .or_insert_with(DatabaseScheduledQueue::new) - .mark_ready(); + .or_insert_with(DatabaseScheduledQueue::new); + if database_queue.mark_ready() && !database_queue.queue.is_empty() { + self.inner.changed_tx.send(()).ok(); + } } else { - queue.mark_ready(); + let prev_blocked = queue.mark_ready(); for queue in queue.queue.values_mut() { queue.mark_ready(); } + if prev_blocked + && queue + .queue + .values() + .any(|database_queue| !database_queue.queue.is_empty()) + { + self.inner.changed_tx.send(()).ok(); + } } }