Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature: Added unit test for watcher #1529

Open
wants to merge 2 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
102 changes: 93 additions & 9 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -157,16 +157,100 @@

/// Used to control whether the watcher receives the full object, or only the
/// metadata
#[async_trait]
trait ApiMode {
type Value: Clone;
// #[async_trait]
// trait ApiMode {
// type Value: Clone;

// async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
// async fn watch(
// &self,
// wp: &WatchParams,
// version: &str,
// ) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
// }
Comment on lines +160 to +170
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was this commented as a mistake?


#[cfg(test)]
mod tests {
use super::*;
use async_trait::async_trait;
use futures::stream::{self, BoxStream};
use kube_client::{Result, ListParams, WatchParams, ObjectList, WatchEvent};
use std::sync::{Arc, Mutex};

#[derive(Clone)]
struct TestResource {
// fields here
}
Comment on lines +180 to +183
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder if we actually need a TestResource? Can we have our TestApiMode be generic over a K type that represents a k8s resource? Is the TestResource used to simplify this?


async fn list(&self, lp: &ListParams) -> kube_client::Result<ObjectList<Self::Value>>;
async fn watch(
&self,
wp: &WatchParams,
version: &str,
) -> kube_client::Result<BoxStream<'static, kube_client::Result<WatchEvent<Self::Value>>>>;
struct TestApiMode {
list_response: Vec<ObjectList<TestResource>>,
watch_response: Vec<Result<WatchEvent<TestResource>>>,
list_call_count: Arc<Mutex<usize>>,
watch_call_count: Arc<Mutex<usize>>,
selectors: Arc<Mutex<Vec<ListParams>>>,
}

impl TestApiMode {
fn new(list_response: Vec<ObjectList<TestResource>>, watch_response: Vec<Result<WatchEvent<TestResource>>>) -> Self {
TestApiMode {
list_response,
watch_response,
list_call_count: Arc::new(Mutex::new(0)),
watch_call_count: Arc::new(Mutex::new(0)),
selectors: Arc::new(Mutex::new(Vec::new())),
}
}
}

#[async_trait]
impl ApiMode for TestApiMode {
type Value = TestResource;

async fn list(&self, lp: &ListParams) -> Result<ObjectList<Self::Value>> {
let mut count = self.list_call_count.lock().unwrap();
*count += 1;

let mut selectors = self.selectors.lock().unwrap();
selectors.push(lp.clone());

Ok(self.list_response.get(*count - 1).cloned().unwrap_or_else(|| ObjectList::default()))
}

async fn watch(&self, wp: &WatchParams, _version: &str) -> Result<BoxStream<'static, Result<WatchEvent<Self::Value>>>> {
let mut count = self.watch_call_count.lock().unwrap();
*count += 1;
Ok(stream::iter(self.watch_response.clone()).boxed())
}
}

#[tokio::test]
async fn test_watcher_behavior() {
// Simulate paginated list responses
let list_response = vec![
ObjectList::<TestResource> { items: vec![TestResource { /* fields */ }], metadata: Default::default() },
ObjectList::<TestResource> { items: vec![TestResource { /* fields */ }], metadata: Default::default() },
];
let watch_response = vec![Ok(WatchEvent::Added(TestResource { /* fields */ }))];

let api_mode = TestApiMode::new(list_response, watch_response);

// Create the watcher using the TestApiMode
// Verify the watcher behavior with assertions

// Verify list call count
let list_call_count = api_mode.list_call_count.lock().unwrap();
assert_eq!(*list_call_count, 2);

// Verify watch call count
let watch_call_count = api_mode.watch_call_count.lock().unwrap();
assert_eq!(*watch_call_count, 1);

// Verify selectors consistency
let selectors = api_mode.selectors.lock().unwrap();
assert!(selectors.iter().all(|lp| lp.selector == expected_selector));

// Additional assertions for union of list and watch events and desync handling
}
}

/// A wrapper around the `Api` of a `Resource` type that when used by the
Expand Down Expand Up @@ -409,7 +493,7 @@
}

#[async_trait]
impl<K> ApiMode for FullObject<'_, K>

Check failure on line 496 in kube-runtime/src/watcher.rs

View workflow job for this annotation

GitHub Actions / msrv

cannot find trait `ApiMode` in this scope
where
K: Clone + Debug + DeserializeOwned + Send + 'static,
{
Expand All @@ -435,7 +519,7 @@
}

#[async_trait]
impl<K> ApiMode for MetaOnly<'_, K>

Check failure on line 522 in kube-runtime/src/watcher.rs

View workflow job for this annotation

GitHub Actions / msrv

cannot find trait `ApiMode` in this scope
where
K: Clone + Debug + DeserializeOwned + Send + 'static,
{
Expand Down Expand Up @@ -465,7 +549,7 @@
state: State<A::Value>,
) -> (Option<Result<Event<A::Value>>>, State<A::Value>)
where
A: ApiMode,

Check failure on line 552 in kube-runtime/src/watcher.rs

View workflow job for this annotation

GitHub Actions / msrv

cannot find trait `ApiMode` in this scope
A::Value: Resource + 'static,
{
match state {
Expand Down Expand Up @@ -668,7 +752,7 @@
mut state: State<A::Value>,
) -> (Result<Event<A::Value>>, State<A::Value>)
where
A: ApiMode,

Check failure on line 755 in kube-runtime/src/watcher.rs

View workflow job for this annotation

GitHub Actions / msrv

cannot find trait `ApiMode` in this scope
A::Value: Resource + 'static,
{
loop {
Expand Down
Loading