From 8140c0727817dd3c9e628ff2ac9807d880b7976d Mon Sep 17 00:00:00 2001 From: Corentin Regal Date: Wed, 25 Oct 2023 09:16:49 +0200 Subject: [PATCH] Fix pop_queue_message_into_pending Signed-off-by: Corentin Regal --- kube-runtime/src/controller/runner.rs | 12 ++++++++++++ kube-runtime/src/scheduler.rs | 3 +++ 2 files changed, 15 insertions(+) diff --git a/kube-runtime/src/controller/runner.rs b/kube-runtime/src/controller/runner.rs index bf4d9ef9d..95ed6f7e4 100644 --- a/kube-runtime/src/controller/runner.rs +++ b/kube-runtime/src/controller/runner.rs @@ -444,6 +444,18 @@ mod tests { advance(Duration::from_secs(3)).await; assert!(poll!(runner.as_mut()).is_pending()); + // Send the third message again and check it's ran + sched_tx + .send(ScheduleRequest { + message: 3, + run_at: Instant::now(), + }) + .await + .unwrap(); + advance(Duration::from_secs(3)).await; + assert!(poll!(runner.as_mut()).is_pending()); + assert_eq!(*count.lock().unwrap(), 4); + let (mut sched_tx, sched_rx) = mpsc::unbounded(); let mut runner = Box::pin( Runner::new(scheduler(sched_rx), 1, |_| { diff --git a/kube-runtime/src/scheduler.rs b/kube-runtime/src/scheduler.rs index 7ee19ae5b..f972dec34 100644 --- a/kube-runtime/src/scheduler.rs +++ b/kube-runtime/src/scheduler.rs @@ -131,6 +131,9 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> { pub fn pop_queue_message_into_pending(&mut self, cx: &mut Context<'_>) { while let Poll::Ready(Some(msg)) = self.queue.poll_expired(cx) { let msg = msg.into_inner(); + self.scheduled.remove_entry(&msg).expect( + "Expired message was popped from the Scheduler queue, but was not in the metadata map", + ); self.pending.insert(msg); } }