Skip to content

Commit

Permalink
fix(meta): fix receiving scheduled command on blocked database (#20241)
Browse files Browse the repository at this point in the history
  • Loading branch information
wenym1 authored Jan 21, 2025
1 parent 509ec7a commit 33bf9ba
Showing 1 changed file with 29 additions and 5 deletions.
34 changes: 29 additions & 5 deletions src/meta/src/barrier/schedule.rs
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,12 @@ enum QueueStatus {
Blocked(String),
}

impl QueueStatus {
fn is_blocked(&self) -> bool {
matches!(self, Self::Blocked(_))
}
}

struct ScheduledQueueItem {
command: Command,
notifiers: Vec<Notifier>,
Expand Down Expand Up @@ -94,8 +100,10 @@ impl<T> StatusQueue<T> {
self.status = QueueStatus::Blocked(reason);
}

fn mark_ready(&mut self) {
fn mark_ready(&mut self) -> bool {
let prev_blocked = self.status.is_blocked();
self.status = QueueStatus::Ready;
prev_blocked
}

fn validate_item(&mut self, scheduled: &ScheduledQueueItem) -> MetaResult<()> {
Expand Down Expand Up @@ -395,7 +403,13 @@ impl ScheduledBarriers {
let mut rx = self.inner.changed_tx.subscribe();
{
let mut queue = self.inner.queue.lock();
if queue.status.is_blocked() {
continue;
}
for (database_id, queue) in &mut queue.queue {
if queue.status.is_blocked() {
continue;
}
if let Some(item) = queue.queue.pop_front() {
item.send_latency_timer.observe_duration();
break 'outer Scheduled {
Expand Down Expand Up @@ -452,16 +466,26 @@ impl ScheduledBarriers {
pub(super) fn mark_ready(&self, database_id: Option<DatabaseId>) {
let mut queue = self.inner.queue.lock();
if let Some(database_id) = database_id {
queue
let database_queue = queue
.queue
.entry(database_id)
.or_insert_with(DatabaseScheduledQueue::new)
.mark_ready();
.or_insert_with(DatabaseScheduledQueue::new);
if database_queue.mark_ready() && !database_queue.queue.is_empty() {
self.inner.changed_tx.send(()).ok();
}
} else {
queue.mark_ready();
let prev_blocked = queue.mark_ready();
for queue in queue.queue.values_mut() {
queue.mark_ready();
}
if prev_blocked
&& queue
.queue
.values()
.any(|database_queue| !database_queue.queue.is_empty())
{
self.inner.changed_tx.send(()).ok();
}
}
}

Expand Down

0 comments on commit 33bf9ba

Please sign in to comment.