Skip to content

Commit

Permalink
simplify pop_queue_message_into_pending; improve tests
Browse files Browse the repository at this point in the history
Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Sep 2, 2023
1 parent 90303c8 commit 9e3334b
Show file tree
Hide file tree
Showing 2 changed files with 31 additions and 26 deletions.
35 changes: 25 additions & 10 deletions kube-runtime/src/controller/runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,16 +117,7 @@ where
|| !*this.is_ready_to_execute
{
match scheduler.as_mut().hold().poll_next_unpin(cx) {
Poll::Pending => break Poll::Pending,
Poll::Ready(None) => {
break if has_active_slots {
// We're done listening for new messages, but still have some that
// haven't finished quite yet
Poll::Pending
} else {
Poll::Ready(None)
};
}
Poll::Pending | Poll::Ready(None) => break Poll::Pending,
// The above future never returns Poll::Ready(Some(_)).
_ => unreachable!(),
};
Expand Down Expand Up @@ -450,5 +441,29 @@ mod tests {
assert!(poll!(runner.as_mut()).is_pending());
// Assert that we run the third request when we have the capacity to
assert_eq!(*count.lock().unwrap(), 3);
advance(Duration::from_secs(3)).await;
assert!(poll!(runner.as_mut()).is_pending());

let (mut sched_tx, sched_rx) = mpsc::unbounded();
let mut runner = Box::pin(
Runner::new(scheduler(sched_rx), 1, |_| {
DurationalFuture::new(Duration::from_secs(2))
})
.for_each(|_| async {}),

Check warning on line 452 in kube-runtime/src/controller/runner.rs

View check run for this annotation

Codecov / codecov/patch

kube-runtime/src/controller/runner.rs#L452

Added line #L452 was not covered by tests
);

sched_tx
.send(ScheduleRequest {
message: 1,
run_at: Instant::now(),
})
.await
.unwrap();
assert!(poll!(runner.as_mut()).is_pending());

// Drop the sender to test that we stop the runner when the requests
// stream finishes.
drop(sched_tx);
assert_eq!(poll!(runner.as_mut()), Poll::Pending);
}
}
22 changes: 6 additions & 16 deletions kube-runtime/src/scheduler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,15 +128,10 @@ impl<'a, T: Hash + Eq + Clone, R> SchedulerProj<'a, T, R> {
}

/// Attempt to retrieve a message from queue and mark it as pending.
pub fn pop_queue_message_into_pending(&mut self, cx: &mut Context<'_>) -> Poll<T> {
loop {
match self.queue.poll_expired(cx) {
Poll::Ready(Some(msg)) => {
let msg = msg.into_inner();
self.pending.insert(msg);
}
Poll::Ready(None) | Poll::Pending => break Poll::Pending,
}
pub fn pop_queue_message_into_pending(&mut self, cx: &mut Context<'_>) {
while let Poll::Ready(Some(msg)) = self.queue.poll_expired(cx) {
let msg = msg.into_inner();
self.pending.insert(msg);
}
}
}
Expand All @@ -153,7 +148,6 @@ where
{
type Item = T;

#[allow(clippy::match_wildcard_for_single_variants)]
fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let this = self.get_mut();
let mut scheduler = this.scheduler.as_mut().project();
Expand All @@ -166,12 +160,8 @@ where
}
}

match scheduler.pop_queue_message_into_pending(cx) {
Poll::Pending => Poll::Pending,
// Since the above method never returns anything other than Poll::Pending
// we don't need to handle any other variant.
_ => unreachable!(),
}
scheduler.pop_queue_message_into_pending(cx);
Poll::Pending
}
}

Expand Down

0 comments on commit 9e3334b

Please sign in to comment.