diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index eff22739d..e62483a45 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -157,16 +157,100 @@ enum State { /// Used to control whether the watcher receives the full object, or only the /// metadata -#[async_trait] -trait ApiMode { - type Value: Clone; +// #[async_trait] +// trait ApiMode { +// type Value: Clone; + +// async fn list(&self, lp: &ListParams) -> kube_client::Result>; +// async fn watch( +// &self, +// wp: &WatchParams, +// version: &str, +// ) -> kube_client::Result>>>; +// } + +#[cfg(test)] +mod tests { + use super::*; + use async_trait::async_trait; + use futures::stream::{self, BoxStream}; + use kube_client::{Result, ListParams, WatchParams, ObjectList, WatchEvent}; + use std::sync::{Arc, Mutex}; + + #[derive(Clone)] + struct TestResource { + // fields here + } - async fn list(&self, lp: &ListParams) -> kube_client::Result>; - async fn watch( - &self, - wp: &WatchParams, - version: &str, - ) -> kube_client::Result>>>; + struct TestApiMode { + list_response: Vec>, + watch_response: Vec>>, + list_call_count: Arc>, + watch_call_count: Arc>, + selectors: Arc>>, + } + + impl TestApiMode { + fn new(list_response: Vec>, watch_response: Vec>>) -> Self { + TestApiMode { + list_response, + watch_response, + list_call_count: Arc::new(Mutex::new(0)), + watch_call_count: Arc::new(Mutex::new(0)), + selectors: Arc::new(Mutex::new(Vec::new())), + } + } + } + + #[async_trait] + impl ApiMode for TestApiMode { + type Value = TestResource; + + async fn list(&self, lp: &ListParams) -> Result> { + let mut count = self.list_call_count.lock().unwrap(); + *count += 1; + + let mut selectors = self.selectors.lock().unwrap(); + selectors.push(lp.clone()); + + Ok(self.list_response.get(*count - 1).cloned().unwrap_or_else(|| ObjectList::default())) + } + + async fn watch(&self, wp: &WatchParams, _version: &str) -> Result>>> { + let mut count = self.watch_call_count.lock().unwrap(); + *count += 1; + Ok(stream::iter(self.watch_response.clone()).boxed()) + } + } + + #[tokio::test] + async fn test_watcher_behavior() { + // Simulate paginated list responses + let list_response = vec![ + ObjectList:: { items: vec![TestResource { /* fields */ }], metadata: Default::default() }, + ObjectList:: { items: vec![TestResource { /* fields */ }], metadata: Default::default() }, + ]; + let watch_response = vec![Ok(WatchEvent::Added(TestResource { /* fields */ }))]; + + let api_mode = TestApiMode::new(list_response, watch_response); + + // Create the watcher using the TestApiMode + // Verify the watcher behavior with assertions + + // Verify list call count + let list_call_count = api_mode.list_call_count.lock().unwrap(); + assert_eq!(*list_call_count, 2); + + // Verify watch call count + let watch_call_count = api_mode.watch_call_count.lock().unwrap(); + assert_eq!(*watch_call_count, 1); + + // Verify selectors consistency + let selectors = api_mode.selectors.lock().unwrap(); + assert!(selectors.iter().all(|lp| lp.selector == expected_selector)); + + // Additional assertions for union of list and watch events and desync handling + } } /// A wrapper around the `Api` of a `Resource` type that when used by the