From 9c13551aa5d5438b19cabdeb0edc73f82fc1725d Mon Sep 17 00:00:00 2001 From: Dan Spencer Date: Fri, 27 Jan 2023 15:48:39 +0000 Subject: [PATCH 1/6] Added in a first attempt at a solution to sharing watchers between stores and controllers --- kube-core/src/params.rs | 2 +- kube-runtime/Cargo.toml | 1 + kube-runtime/src/lib.rs | 1 + kube-runtime/src/shared_store/mod.rs | 567 +++++++++++++++++++ kube-runtime/src/shared_store/prism.rs | 61 ++ kube-runtime/src/shared_store/ready_token.rs | 71 +++ 6 files changed, 702 insertions(+), 1 deletion(-) create mode 100644 kube-runtime/src/shared_store/mod.rs create mode 100644 kube-runtime/src/shared_store/prism.rs create mode 100644 kube-runtime/src/shared_store/ready_token.rs diff --git a/kube-core/src/params.rs b/kube-core/src/params.rs index 1ae0689e4..feef32f19 100644 --- a/kube-core/src/params.rs +++ b/kube-core/src/params.rs @@ -3,7 +3,7 @@ use crate::request::Error; use serde::Serialize; /// Common query parameters used in watch/list/delete calls on collections -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Eq, Hash, PartialEq)] pub struct ListParams { /// A selector to restrict the list of returned objects by their labels. /// diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 64538907a..6520df283 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -42,6 +42,7 @@ default-features = false [dev-dependencies] kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" } +maplit = "1.0.2" serde_json = "1.0.68" tokio = { version = "1.14.0", features = ["full", "test-util"] } rand = "0.8.0" diff --git a/kube-runtime/src/lib.rs b/kube-runtime/src/lib.rs index 7296ac019..2ff946237 100644 --- a/kube-runtime/src/lib.rs +++ b/kube-runtime/src/lib.rs @@ -26,6 +26,7 @@ pub mod scheduler; pub mod utils; pub mod wait; pub mod watcher; +pub mod shared_store; pub use controller::{applier, Controller}; pub use finalizer::finalizer; diff --git a/kube-runtime/src/shared_store/mod.rs b/kube-runtime/src/shared_store/mod.rs new file mode 100644 index 000000000..b81f7e659 --- /dev/null +++ b/kube-runtime/src/shared_store/mod.rs @@ -0,0 +1,567 @@ +mod prism; +mod ready_token; + +use crate::controller::{trigger_self, Action}; +use crate::shared_store::prism::Prism; +use crate::shared_store::ready_token::ReadyToken; +use crate::watcher::Event; +use crate::{ + applier, + reflector::{ + reflector, + store::{Store, Writer}, + ObjectRef, + }, + utils::{CancelableJoinHandle, StreamBackoff, WatchStreamExt}, + watcher::{self, watcher}, +}; +use futures::{stream, Stream, StreamExt, TryFuture, TryFutureExt}; +use kube_client::api::ListParams; +use kube_client::{Api, Resource}; +use serde::de::DeserializeOwned; +use std::collections::HashMap; +use std::{fmt::Debug, hash::Hash, sync::Arc}; +use stream::BoxStream; +use tokio::runtime::Handle; +use tracing::Instrument; + +// TODO - Not sure this is the right name? +pub struct SharedStore> +where + K: 'static + Resource + Clone + DeserializeOwned + Debug + Send, + K::DynamicType: Hash + Eq, + W: CreateWatcher, +{ + watcher_provider: W, + reflectors: HashMap, Prism)>, + controllers: Vec>>, + ready_token: ReadyToken, +} + +// TODO - not sure about the error type here, we might not need to box it? +type ControllerResult = Result<(ObjectRef, Action), Box>; + +impl SharedStore +where + K: 'static + Resource + Clone + DeserializeOwned + Debug + Send + Sync, + K::DynamicType: Default + Eq + Hash + Clone, +{ + pub fn new(api: Api, ready_token: &ReadyToken) -> Self { + Self { + watcher_provider: WatcherProvider::new(api), + reflectors: HashMap::new(), + controllers: Vec::new(), + ready_token: ready_token.clone(), + } + } +} + +impl SharedStore +where + K: Clone + Resource + DeserializeOwned + Debug + Send + Sync + 'static, + K::DynamicType: Default + Eq + Hash + Clone, + W: CreateWatcher + 'static, +{ + pub fn run(self) -> impl Stream> { + let reflectors = stream::select_all(self.reflectors.into_iter().map(|(_, (_, reflector))| reflector)); + + let have_controllers = !self.controllers.is_empty(); + let controllers = stream::select_all(self.controllers); + + // todo - make sure that if ANY stream dies we die + + stream::unfold( + (reflectors, controllers, self.ready_token), + move |(mut reflectors, mut controllers, ready_token)| async move { + tokio::select!( + result = reflectors.next() => { + result.map(|r| (ProviderResult::Reflector(r), (reflectors, controllers, ready_token))) + }, + result = controllers.next(), if have_controllers && ready_token.is_ready() => { + result.map(|r| (ProviderResult::Controller(r), (reflectors, controllers, ready_token))) + } + ) + }, + ) + } + + // todo - we still need to be able to setup "watches" and "owns" + pub fn controller( + &mut self, + list_params: ListParams, + mut reconciler: impl FnMut(Arc, Arc) -> ReconcilerFut + Send + 'static, + error_policy: impl Fn(Arc, &ReconcilerFut::Error, Arc) -> Action + Send + Sync + 'static, + context: Arc, + ) where + K::DynamicType: Debug + Unpin, + ReconcilerFut: TryFuture + Send + 'static, + ReconcilerFut::Error: std::error::Error + Send + 'static, + Ctx: Send + Sync + 'static, + { + let dyntype = K::DynamicType::default(); + + let (store, event_stream) = self.reflector(list_params.clone().into()); + let self_watcher = trigger_self(event_stream.map(Ok).applied_objects(), dyntype).boxed(); + + let mut trigger_selector = stream::SelectAll::new(); + trigger_selector.push(self_watcher); + + let trigger_backoff = Box::new(watcher::default_backoff()); + + let stream = applier( + move |obj, ctx| { + CancelableJoinHandle::spawn( + reconciler(obj, ctx).into_future().in_current_span(), + &Handle::current(), + ) + }, + error_policy, + context, + store, + StreamBackoff::new(trigger_selector, trigger_backoff), + ) + .map(|result| result.map_err(|e| Box::new(e) as Box)); + + self.controllers.push(stream.boxed()); + } + + pub fn store(&mut self, list_params: ListParams) -> Store { + self.reflector(list_params).0 + } + + fn reflector(&mut self, list_params: ListParams) -> (Store, impl Stream>) { + if let Some((store, prism)) = self.reflectors.get(&list_params) { + return (store.clone(), prism.subscribe()); + } + + let watcher = self.watcher_provider.watcher(list_params.clone()); + let store_writer = Writer::default(); + let store_reader = store_writer.as_reader(); + + let reflector = reflector(store_writer, watcher); + + let prism = Prism::new(reflector, self.ready_token.child()); + let event_stream = prism.subscribe(); + + self.reflectors + .insert(list_params.clone(), (store_reader.clone(), prism)); + + (store_reader, event_stream) + } +} + +pub trait CreateWatcher +where + K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, +{ + fn watcher(&self, list_params: ListParams) -> BoxStream<'static, watcher::Result>>; +} + +pub struct WatcherProvider +where + K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, +{ + api: Api, +} + +impl WatcherProvider +where + K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, +{ + fn new(api: Api) -> Self { + Self { api } + } +} + +impl CreateWatcher for WatcherProvider +where + K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, +{ + fn watcher(&self, list_params: ListParams) -> BoxStream<'static, watcher::Result>> { + watcher(self.api.clone(), list_params).boxed() + } +} + +pub enum ProviderResult { + Reflector(watcher::Result>), + Controller(Result<(ObjectRef, Action), Box>), +} + +#[cfg(test)] +#[allow(clippy::expect_used, clippy::expect_used)] +mod test { + use super::*; + use futures::stream; + use k8s_openapi::api::core::v1::{ConfigMap, Pod}; + use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; + use maplit::hashmap; + use std::collections::VecDeque; + use std::fmt::{Display, Formatter}; + use std::sync::Mutex; + use std::time::Duration; + + mod store { + use super::*; + + #[tokio::test] + async fn it_returns_a_store() { + let lp = ListParams::default(); + let mut provider = + TestProvider::::new(hashmap!(lp.clone().into() => vec![]), &ReadyToken::new()); + let store = provider.store(lp); + assert_eq!(store.state().len(), 0); + } + + #[tokio::test] + async fn it_returns_stores_that_updates_on_events() { + let lp = ListParams::default(); + let expected_state = vec![test_pod(1)]; + let mut kp = TestProvider::::new( + hashmap!(lp.clone().into() => vec![Event::Restarted(expected_state.clone())]), + &ReadyToken::new(), + ); + let store = kp.store(lp); + + kp.spawn().await; + + assert_eq!(store.cloned_state(), expected_state); + } + + #[tokio::test] + async fn it_returns_the_same_store_for_the_same_list_params() { + let lp = ListParams::default().labels("foo=bar"); + let expected_state = vec![test_pod(1)]; + let mut provider = TestProvider::::new( + hashmap!(lp.clone().into() => vec![Event::Restarted(expected_state.clone())]), + &ReadyToken::new(), + ); + + let store1 = provider.store(lp.clone()); + let store2 = provider.store(lp); + + provider.spawn().await; + + assert_eq!(store1.cloned_state(), expected_state); + assert_eq!(store2.cloned_state(), expected_state); + } + + #[tokio::test] + async fn it_returns_a_different_store_for_different_list_params() { + let lp1 = ListParams::default().labels("foo=bar"); + let lp2 = ListParams::default().labels("foo=baz"); + let expected_state1 = vec![test_pod(1)]; + let expected_state2 = vec![test_pod(2)]; + let mut kp = TestProvider::::new( + hashmap!( + lp1.clone().into() => vec![Event::Restarted(expected_state1.clone())], + lp2.clone().into() => vec![Event::Restarted(expected_state2.clone())], + ), + &ReadyToken::new(), + ); + + let store1 = kp.store(lp1); + let store2 = kp.store(lp2); + + kp.spawn().await; + + assert_eq!(store1.cloned_state(), expected_state1, "Store 1"); + assert_eq!(store2.cloned_state(), expected_state2, "Store 2"); + } + } + + mod controller { + use super::*; + use futures::stream::select_all; + use k8s_openapi::api::core::v1::ConfigMap; + + #[tokio::test] + async fn it_creates_a_controller() { + let lp = ListParams::default().labels("foo=baz"); + let expected_state = vec![test_pod(2)]; + let mut provider = TestProvider::::new( + hashmap!( + lp.clone().into() => vec![Event::Restarted(vec![expected_state[0].clone()])], + ), + &ReadyToken::new(), + ); + + let context = Arc::new(Mutex::new(vec![])); + provider.controller( + lp.clone(), + |pod, ctx| async move { + ctx.lock().unwrap().push(pod.clone()); + Ok::<_, TestError>(Action::await_change()) + }, + |_, _, _| Action::await_change(), + context.clone(), + ); + + let store = provider.store(lp.clone()); + + provider.spawn().await; + + assert_eq!(store.cloned_state(), expected_state, "Store"); + assert_eq!(context.cloned_state(), expected_state, "Context"); + } + + #[tokio::test] + async fn it_doesnt_run_the_controller_until_the_store_has_received_a_first_event() { + let lp = ListParams::default().labels("foo=baz"); + let mut kp = TestProvider::::new( + hashmap!( + lp.clone().into() => vec![], + + ), + &ReadyToken::new(), + ); + + let context = kp.test_controller(lp.clone()); + + kp.spawn().await; + + assert!(!context.reconciled()); + } + + #[tokio::test] + async fn many_controllers_wait_until_every_store_is_ready() { + let lp = ListParams::default().labels("foo=baz"); + let ready_token = ReadyToken::new(); + let mut kp1 = TestProvider::::new( + hashmap!( + lp.clone().into() => vec![], + ), + &ready_token, + ); + let mut kp2 = TestProvider::::new( + hashmap!( + lp.clone().into() => vec![], + ), + &ready_token, + ); + + let context1 = kp1.test_controller(lp.clone()); + let context2 = kp2.test_controller(lp.clone()); + + kp1.spawn().await; + kp2.spawn().await; + + assert!(!context1.reconciled()); + assert!(!context2.reconciled()); + } + + #[tokio::test] + async fn it_doesnt_run_a_controller_if_a_store_from_another_provider_isnt_ready() { + let lp = ListParams::default().labels("foo=baz"); + let ready_token = ReadyToken::new(); + let mut kp1 = TestProvider::::new( + hashmap!( + lp.clone().into() => vec![Event::Restarted(vec![test_pod(1).clone()])], + ), + &ready_token, + ); + let mut kp2 = TestProvider::::new( + hashmap!( + lp.clone().into() => vec![], + ), + &ready_token, + ); + + let context = kp1.test_controller(lp.clone()); + let _store = kp2.store(lp.clone()); + + kp1.spawn().await; + kp2.spawn().await; + + assert!(!ready_token.is_ready(), "Ready token"); + assert!(!context.reconciled(), "Context"); + } + + #[tokio::test] + async fn many_controllers_run_after_all_stores_are_ready() { + let lp = ListParams::default().labels("foo=baz"); + let ready_token = ReadyToken::new(); + let mut provider1 = TestProvider::::new( + hashmap!( + lp.clone().into() => vec![Event::Restarted(vec![test_pod(1).clone()])], + ), + &ready_token, + ); + let mut provider2 = TestProvider::::new( + hashmap!( + lp.clone().into() => vec![Event::Restarted(vec![test_cm(1)])], + ), + &ready_token, + ); + + let context1 = provider1.test_controller(lp.clone()); + let context2 = provider2.test_controller(lp.clone()); + + tokio::spawn(async move { + select_all(vec![ + provider1.run().map(|_| ()).boxed(), + provider2.run().map(|_| ()).boxed(), + ]) + .for_each(|_| async {}) + .await; + }); + + tokio::time::sleep(Duration::from_millis(10)).await; + + assert!(ready_token.is_ready(), "ReadyToken"); + assert!(context1.reconciled(), "Context 1"); + assert!(context2.reconciled(), "Context 2"); + } + } + + fn test_pod(postfix: usize) -> Pod { + Pod { + metadata: ObjectMeta { + name: Some(format!("test-pod-{}", postfix)), + namespace: Some("default".to_string()), + ..Default::default() + }, + ..Default::default() + } + } + + fn test_cm(postfix: usize) -> ConfigMap { + ConfigMap { + metadata: ObjectMeta { + name: Some(format!("test-pod-{}", postfix)), + namespace: Some("default".to_string()), + ..Default::default() + }, + ..Default::default() + } + } + + type TestProvider = SharedStore>; + + impl SharedStore> + where + K: 'static + Resource + Clone + DeserializeOwned + Debug + Send + Sync, + K::DynamicType: Clone + Debug + Default + Eq + Hash + Unpin, + { + fn new(events: HashMap>>, ready_token: &ReadyToken) -> Self { + Self { + watcher_provider: TestWatcherProvider { + events: Mutex::new(events.into_iter().map(|(k, v)| (k, v.into())).collect()), + }, + reflectors: HashMap::new(), + controllers: Vec::new(), + ready_token: ready_token.clone(), + } + } + + fn test_controller(&mut self, list_params: ListParams) -> TestContext { + let context = TestContext::new(); + self.controller( + list_params, + |_, ctx| async move { + *(ctx.reconciled.lock().unwrap()) = true; + Ok::<_, TestError>(Action::await_change()) + }, + |_, _, _| Action::await_change(), + Arc::new(context.clone()), + ); + context + } + + async fn spawn(self) { + tokio::spawn(async move { + self.run().for_each(|_| async {}).await; + }); + + // We have to sleep here to give the scheduling stuff inside KubeRS chance to + // schedule the events that are fed into it + tokio::time::sleep(Duration::from_millis(10)).await; + } + } + + #[derive(Clone, Debug)] + struct TestContext { + reconciled: Arc>, + } + + impl TestContext { + fn new() -> Self { + Self { + reconciled: Arc::new(Mutex::new(false)), + } + } + + fn reconciled(&self) -> bool { + *self.reconciled.lock().unwrap() + } + } + + struct TestWatcherProvider { + events: Mutex>>>, + } + + impl CreateWatcher for TestWatcherProvider + where + K: 'static + Resource + Clone + DeserializeOwned + Debug + Send, + K::DynamicType: Hash + Eq, + { + fn watcher(&self, list_params: ListParams) -> BoxStream<'static, watcher::Result>> { + let events = self + .events + .lock() + .unwrap() + .remove(&list_params.into()) + .expect("There can be only one stream per ListParams"); + + stream::unfold(events, |mut events| async move { + match events.pop_front() { + Some(event) => Some((Ok(event), events)), + // if there's nothing left we block to simulate waiting for a change + None => futures::future::pending().await, + } + }) + .boxed() + } + } + + trait ClonedState { + fn cloned_state(&self) -> Vec; + } + + impl ClonedState for Store + where + K: 'static + Resource + Clone + DeserializeOwned + Debug + Send + Sync, + K::DynamicType: Clone + Default + Eq + Hash, + { + fn cloned_state(&self) -> Vec { + self.state().into_iter().map(|k| (*k).clone()).collect::>() + } + } + + impl ClonedState for Arc>>> { + fn cloned_state(&self) -> Vec { + self.lock() + .unwrap() + .iter() + .map(|k| (**k).clone()) + .collect::>() + } + } + + enum TestError { + TestError, + } + + impl Debug for TestError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } + } + + impl Display for TestError { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + todo!() + } + } + + impl std::error::Error for TestError {} +} diff --git a/kube-runtime/src/shared_store/prism.rs b/kube-runtime/src/shared_store/prism.rs new file mode 100644 index 000000000..585c11d56 --- /dev/null +++ b/kube-runtime/src/shared_store/prism.rs @@ -0,0 +1,61 @@ +use crate::shared_store::ready_token::ReadyState; +use crate::watcher; +use crate::watcher::Event; +use futures::stream::BoxStream; +use futures::{stream, Stream, StreamExt}; +use std::fmt::Debug; +use std::pin::Pin; +use std::task::{Context, Poll}; +use tokio::sync::broadcast; + +/// Wraps a Reflector stream and enables it to split out so that additional subscribers can be added. +/// This allows Controllers to subscribe to the same stream of events as the Reflector, without +/// having to own the original stream. +/// +/// We only forward Ok events along the subscriptions and let the primary Reflector stream handle any +/// errors. +// todo - not sure about struct name? +pub struct Prism { + reflector: BoxStream<'static, watcher::Result>>, + ready_state: ReadyState, + sender: broadcast::Sender>, +} + +impl Prism { + pub fn new( + reflector: impl Stream>> + Send + 'static, + ready_state: ReadyState, + ) -> Self { + let (sender, _) = broadcast::channel(100); + + Prism { + reflector: reflector.boxed(), + sender, + ready_state, + } + } + + pub fn subscribe(&self) -> impl Stream> { + stream::unfold(self.sender.subscribe(), |mut rx| async move { + match rx.recv().await { + Ok(event) => Some((event, rx)), + Err(_) => None, + } + }) + } +} + +impl Stream for Prism { + type Item = watcher::Result>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let item = self.reflector.poll_next_unpin(cx); + + if let Poll::Ready(Some(Ok(event))) = &item { + self.ready_state.ready(); + self.sender.send((*event).clone()).ok(); + } + + item + } +} diff --git a/kube-runtime/src/shared_store/ready_token.rs b/kube-runtime/src/shared_store/ready_token.rs new file mode 100644 index 000000000..24d8e36e9 --- /dev/null +++ b/kube-runtime/src/shared_store/ready_token.rs @@ -0,0 +1,71 @@ +use parking_lot::RwLock; +use std::sync::Arc; + +/// A token that can be used to track readiness of multiple components. +#[derive(Clone, Default)] +pub struct ReadyToken(Arc>>); + +#[derive(Clone)] +pub struct ReadyState(Arc>); + +impl ReadyToken { + pub fn new() -> Self { + Self(Arc::new(RwLock::new(vec![]))) + } + + pub fn is_ready(&self) -> bool { + self.0.read().iter().all(|state| *state.0.read()) + } + + pub fn child(&self) -> ReadyState { + let state = ReadyState::new(); + self.0.write().push(state.clone()); + state + } +} + +impl ReadyState { + fn new() -> Self { + Self(Arc::new(RwLock::new(false))) + } + + pub fn ready(&mut self) { + *(self.0.write()) = true; + } +} + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn it_is_ready_if_it_has_no_ready_states_to_track() { + assert!(ReadyToken::new().is_ready()); + } + + #[test] + fn it_is_not_ready_if_it_has_a_state_that_is_not_ready() { + let rt = ReadyToken::new(); + let _ = rt.child(); + assert!(!rt.is_ready()); + } + + #[test] + fn it_is_ready_when_the_child_state_is_ready() { + let rt = ReadyToken::new(); + let mut child = rt.child(); + child.ready(); + assert!(rt.is_ready()); + } + + #[test] + fn it_works_for_multiple_child_states() { + let rt = ReadyToken::new(); + let mut child1 = rt.child(); + let mut child2 = rt.child(); + child1.ready(); + assert!(!rt.is_ready()); + child2.ready(); + assert!(rt.is_ready()); + } +} From c283f8c9cf5be0b600147b4ce0f3a79ad47518c9 Mon Sep 17 00:00:00 2001 From: Dan Spencer Date: Mon, 30 Jan 2023 10:55:08 +0000 Subject: [PATCH 2/6] Renamed prism to stream_subscribable and added a method for it to watch_ext --- kube-runtime/src/shared_store/mod.rs | 23 ++++--- kube-runtime/src/shared_store/prism.rs | 61 ------------------- kube-runtime/src/shared_store/ready_token.rs | 2 +- kube-runtime/src/utils/mod.rs | 3 +- kube-runtime/src/utils/stream_subscribable.rs | 48 +++++++++++++++ kube-runtime/src/utils/watch_ext.rs | 11 ++++ 6 files changed, 75 insertions(+), 73 deletions(-) delete mode 100644 kube-runtime/src/shared_store/prism.rs create mode 100644 kube-runtime/src/utils/stream_subscribable.rs diff --git a/kube-runtime/src/shared_store/mod.rs b/kube-runtime/src/shared_store/mod.rs index b81f7e659..b6c07cb07 100644 --- a/kube-runtime/src/shared_store/mod.rs +++ b/kube-runtime/src/shared_store/mod.rs @@ -1,9 +1,8 @@ -mod prism; mod ready_token; use crate::controller::{trigger_self, Action}; -use crate::shared_store::prism::Prism; use crate::shared_store::ready_token::ReadyToken; +use crate::utils::StreamSubscribable; use crate::watcher::Event; use crate::{ applier, @@ -15,7 +14,7 @@ use crate::{ utils::{CancelableJoinHandle, StreamBackoff, WatchStreamExt}, watcher::{self, watcher}, }; -use futures::{stream, Stream, StreamExt, TryFuture, TryFutureExt}; +use futures::{stream, Stream, StreamExt, TryFuture, TryFutureExt, TryStreamExt}; use kube_client::api::ListParams; use kube_client::{Api, Resource}; use serde::de::DeserializeOwned; @@ -33,7 +32,7 @@ where W: CreateWatcher, { watcher_provider: W, - reflectors: HashMap, Prism)>, + reflectors: HashMap, StreamSubscribable)>, controllers: Vec>>, ready_token: ReadyToken, } @@ -131,20 +130,24 @@ where fn reflector(&mut self, list_params: ListParams) -> (Store, impl Stream>) { if let Some((store, prism)) = self.reflectors.get(&list_params) { - return (store.clone(), prism.subscribe()); + return (store.clone(), prism.subscribe_ok()); } let watcher = self.watcher_provider.watcher(list_params.clone()); let store_writer = Writer::default(); let store_reader = store_writer.as_reader(); - let reflector = reflector(store_writer, watcher); + let ready_state = self.ready_token.child(); - let prism = Prism::new(reflector, self.ready_token.child()); - let event_stream = prism.subscribe(); + let reflector = reflector(store_writer, watcher).inspect_ok(move |_| ready_state.ready()); - self.reflectors - .insert(list_params.clone(), (store_reader.clone(), prism)); + let subscribable_reflector = reflector.subscribable(); + let event_stream = subscribable_reflector.subscribe_ok(); + + self.reflectors.insert( + list_params.clone(), + (store_reader.clone(), subscribable_reflector), + ); (store_reader, event_stream) } diff --git a/kube-runtime/src/shared_store/prism.rs b/kube-runtime/src/shared_store/prism.rs deleted file mode 100644 index 585c11d56..000000000 --- a/kube-runtime/src/shared_store/prism.rs +++ /dev/null @@ -1,61 +0,0 @@ -use crate::shared_store::ready_token::ReadyState; -use crate::watcher; -use crate::watcher::Event; -use futures::stream::BoxStream; -use futures::{stream, Stream, StreamExt}; -use std::fmt::Debug; -use std::pin::Pin; -use std::task::{Context, Poll}; -use tokio::sync::broadcast; - -/// Wraps a Reflector stream and enables it to split out so that additional subscribers can be added. -/// This allows Controllers to subscribe to the same stream of events as the Reflector, without -/// having to own the original stream. -/// -/// We only forward Ok events along the subscriptions and let the primary Reflector stream handle any -/// errors. -// todo - not sure about struct name? -pub struct Prism { - reflector: BoxStream<'static, watcher::Result>>, - ready_state: ReadyState, - sender: broadcast::Sender>, -} - -impl Prism { - pub fn new( - reflector: impl Stream>> + Send + 'static, - ready_state: ReadyState, - ) -> Self { - let (sender, _) = broadcast::channel(100); - - Prism { - reflector: reflector.boxed(), - sender, - ready_state, - } - } - - pub fn subscribe(&self) -> impl Stream> { - stream::unfold(self.sender.subscribe(), |mut rx| async move { - match rx.recv().await { - Ok(event) => Some((event, rx)), - Err(_) => None, - } - }) - } -} - -impl Stream for Prism { - type Item = watcher::Result>; - - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let item = self.reflector.poll_next_unpin(cx); - - if let Poll::Ready(Some(Ok(event))) = &item { - self.ready_state.ready(); - self.sender.send((*event).clone()).ok(); - } - - item - } -} diff --git a/kube-runtime/src/shared_store/ready_token.rs b/kube-runtime/src/shared_store/ready_token.rs index 24d8e36e9..b2bd207c8 100644 --- a/kube-runtime/src/shared_store/ready_token.rs +++ b/kube-runtime/src/shared_store/ready_token.rs @@ -29,7 +29,7 @@ impl ReadyState { Self(Arc::new(RwLock::new(false))) } - pub fn ready(&mut self) { + pub fn ready(&self) { *(self.0.write()) = true; } } diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index b93779d83..6dc13eb2d 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -3,11 +3,13 @@ mod backoff_reset_timer; mod event_flatten; mod stream_backoff; +mod stream_subscribable; mod watch_ext; pub use backoff_reset_timer::ResetTimerBackoff; pub use event_flatten::EventFlatten; pub use stream_backoff::StreamBackoff; +pub use stream_subscribable::StreamSubscribable; pub use watch_ext::WatchStreamExt; use futures::{ @@ -25,7 +27,6 @@ use std::{ use stream::IntoStream; use tokio::{runtime::Handle, task::JoinHandle}; - /// Allows splitting a `Stream` into several streams that each emit a disjoint subset of the input stream's items, /// like a streaming variant of pattern matching. /// diff --git a/kube-runtime/src/utils/stream_subscribable.rs b/kube-runtime/src/utils/stream_subscribable.rs new file mode 100644 index 000000000..468d0c595 --- /dev/null +++ b/kube-runtime/src/utils/stream_subscribable.rs @@ -0,0 +1,48 @@ +use crate::watcher; +use core::{ + pin::Pin, + task::{Context, Poll}, +}; +use futures::stream::BoxStream; +use futures::{stream, Stream, StreamExt}; +use std::fmt::Debug; +use tokio::sync::broadcast; + +pub struct StreamSubscribable { + reflector: BoxStream<'static, watcher::Result>>, + sender: broadcast::Sender>, +} + +impl StreamSubscribable { + pub fn new(reflector: impl Stream>> + Send + 'static) -> Self { + let (sender, _) = broadcast::channel(100); + + StreamSubscribable { + reflector: reflector.boxed(), + sender, + } + } + + pub fn subscribe_ok(&self) -> impl Stream> { + stream::unfold(self.sender.subscribe(), |mut rx| async move { + match rx.recv().await { + Ok(event) => Some((event, rx)), + Err(_) => None, + } + }) + } +} + +impl Stream for StreamSubscribable { + type Item = watcher::Result>; + + fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let item = self.reflector.poll_next_unpin(cx); + + if let Poll::Ready(Some(Ok(event))) = &item { + self.sender.send((*event).clone()).ok(); + } + + item + } +} diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 54a6d45bc..650db1849 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -4,6 +4,7 @@ use crate::{ }; use backoff::backoff::Backoff; +use crate::utils::stream_subscribable::StreamSubscribable; use futures::{Stream, TryStream}; /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) @@ -36,5 +37,15 @@ pub trait WatchStreamExt: Stream { { EventFlatten::new(self, true) } + + /// Create a subscribable stream from a [`watcher()`] stream + /// + /// This allows multiple consumers to subscribe to the same stream of events. + fn subscribable(self) -> StreamSubscribable + where + Self: Stream, watcher::Error>> + Send + Sized + 'static, + { + StreamSubscribable::new(self) + } } impl WatchStreamExt for St where St: Stream {} From 12b5e83b70750ceb160ca1e8111d5fc45d6e2e3e Mon Sep 17 00:00:00 2001 From: Dan Spencer Date: Mon, 30 Jan 2023 13:45:36 +0000 Subject: [PATCH 3/6] Moved the stream boxing out of stream_subscribable --- kube-runtime/src/shared_store/mod.rs | 15 ++++-- kube-runtime/src/utils/stream_subscribable.rs | 51 +++++++++++-------- kube-runtime/src/utils/watch_ext.rs | 2 +- 3 files changed, 44 insertions(+), 24 deletions(-) diff --git a/kube-runtime/src/shared_store/mod.rs b/kube-runtime/src/shared_store/mod.rs index b6c07cb07..4a87cdb1d 100644 --- a/kube-runtime/src/shared_store/mod.rs +++ b/kube-runtime/src/shared_store/mod.rs @@ -14,11 +14,12 @@ use crate::{ utils::{CancelableJoinHandle, StreamBackoff, WatchStreamExt}, watcher::{self, watcher}, }; -use futures::{stream, Stream, StreamExt, TryFuture, TryFutureExt, TryStreamExt}; +use futures::{stream, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt}; use kube_client::api::ListParams; use kube_client::{Api, Resource}; use serde::de::DeserializeOwned; use std::collections::HashMap; +use std::pin::Pin; use std::{fmt::Debug, hash::Hash, sync::Arc}; use stream::BoxStream; use tokio::runtime::Handle; @@ -32,7 +33,13 @@ where W: CreateWatcher, { watcher_provider: W, - reflectors: HashMap, StreamSubscribable)>, + reflectors: HashMap< + ListParams, + ( + Store, + StreamSubscribable>>>, + ), + >, controllers: Vec>>, ready_token: ReadyToken, } @@ -139,7 +146,9 @@ where let ready_state = self.ready_token.child(); - let reflector = reflector(store_writer, watcher).inspect_ok(move |_| ready_state.ready()); + let reflector = reflector(store_writer, watcher) + .inspect_ok(move |_| ready_state.ready()) + .boxed(); let subscribable_reflector = reflector.subscribable(); let event_stream = subscribable_reflector.subscribe_ok(); diff --git a/kube-runtime/src/utils/stream_subscribable.rs b/kube-runtime/src/utils/stream_subscribable.rs index 468d0c595..4f70bfc59 100644 --- a/kube-runtime/src/utils/stream_subscribable.rs +++ b/kube-runtime/src/utils/stream_subscribable.rs @@ -1,46 +1,57 @@ -use crate::watcher; use core::{ pin::Pin, task::{Context, Poll}, }; use futures::stream::BoxStream; -use futures::{stream, Stream, StreamExt}; -use std::fmt::Debug; +use futures::{stream, Stream, StreamExt, TryStream}; +use pin_project::pin_project; use tokio::sync::broadcast; -pub struct StreamSubscribable { - reflector: BoxStream<'static, watcher::Result>>, - sender: broadcast::Sender>, +#[pin_project] +/// todo - docs +#[must_use = "streams do nothing unless polled"] +pub struct StreamSubscribable +where + S: TryStream, +{ + #[pin] + stream: S, + sender: broadcast::Sender, } -impl StreamSubscribable { - pub fn new(reflector: impl Stream>> + Send + 'static) -> Self { +impl StreamSubscribable +where + S::Ok: Clone, +{ + pub fn new(stream: S) -> Self { let (sender, _) = broadcast::channel(100); - StreamSubscribable { - reflector: reflector.boxed(), - sender, - } + Self { stream, sender } } - pub fn subscribe_ok(&self) -> impl Stream> { + /// Subscribe to non-error events from this stream. + pub fn subscribe_ok(&self) -> impl Stream { stream::unfold(self.sender.subscribe(), |mut rx| async move { match rx.recv().await { - Ok(event) => Some((event, rx)), + Ok(obj) => Some((obj, rx)), Err(_) => None, } }) } } -impl Stream for StreamSubscribable { - type Item = watcher::Result>; +impl Stream for StreamSubscribable +where + S::Ok: Clone, +{ + type Item = Result; - fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - let item = self.reflector.poll_next_unpin(cx); + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let this = self.project(); + let item = this.stream.try_poll_next(cx); - if let Poll::Ready(Some(Ok(event))) = &item { - self.sender.send((*event).clone()).ok(); + if let Poll::Ready(Some(Ok(item))) = &item { + this.sender.send((*item).clone()).ok(); } item diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 650db1849..a6f30c30f 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -41,7 +41,7 @@ pub trait WatchStreamExt: Stream { /// Create a subscribable stream from a [`watcher()`] stream /// /// This allows multiple consumers to subscribe to the same stream of events. - fn subscribable(self) -> StreamSubscribable + fn subscribable(self) -> StreamSubscribable where Self: Stream, watcher::Error>> + Send + Sized + 'static, { From 727ffd989dd96d76330a1286b90919a842c99f30 Mon Sep 17 00:00:00 2001 From: Dan Spencer Date: Tue, 31 Jan 2023 14:33:32 +0000 Subject: [PATCH 4/6] Simplified a lot about the SharedStore to focus on just providing Stores --- kube-runtime/Cargo.toml | 2 + kube-runtime/src/shared_store/mod.rs | 313 +++--------------- kube-runtime/src/shared_store/ready_token.rs | 71 ++-- kube-runtime/src/shared_store/safe_store.rs | 96 ++++++ kube-runtime/src/utils/stream_subscribable.rs | 5 +- kube-runtime/src/utils/watch_ext.rs | 3 +- 6 files changed, 178 insertions(+), 312 deletions(-) create mode 100644 kube-runtime/src/shared_store/safe_store.rs diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 6520df283..01a078a55 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -35,6 +35,7 @@ json-patch = "0.3.0" serde_json = "1.0.68" thiserror = "1.0.29" backoff = "0.4.0" +once_cell = "1.8.0" [dependencies.k8s-openapi] version = "0.17.0" @@ -47,6 +48,7 @@ serde_json = "1.0.68" tokio = { version = "1.14.0", features = ["full", "test-util"] } rand = "0.8.0" schemars = "0.8.6" +async-trait = "0.1.51" [dev-dependencies.k8s-openapi] version = "0.17.0" diff --git a/kube-runtime/src/shared_store/mod.rs b/kube-runtime/src/shared_store/mod.rs index 4a87cdb1d..9bdd6eeb1 100644 --- a/kube-runtime/src/shared_store/mod.rs +++ b/kube-runtime/src/shared_store/mod.rs @@ -1,7 +1,9 @@ mod ready_token; +mod safe_store; use crate::controller::{trigger_self, Action}; use crate::shared_store::ready_token::ReadyToken; +use crate::shared_store::safe_store::SafeStore; use crate::utils::StreamSubscribable; use crate::watcher::Event; use crate::{ @@ -15,15 +17,14 @@ use crate::{ watcher::{self, watcher}, }; use futures::{stream, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt}; +use k8s_openapi::NamespaceResourceScope; use kube_client::api::ListParams; -use kube_client::{Api, Resource}; +use kube_client::{Api, Client, Resource}; use serde::de::DeserializeOwned; use std::collections::HashMap; -use std::pin::Pin; -use std::{fmt::Debug, hash::Hash, sync::Arc}; +use std::fmt::Debug; +use std::hash::Hash; use stream::BoxStream; -use tokio::runtime::Handle; -use tracing::Instrument; // TODO - Not sure this is the right name? pub struct SharedStore> @@ -33,31 +34,20 @@ where W: CreateWatcher, { watcher_provider: W, - reflectors: HashMap< - ListParams, - ( - Store, - StreamSubscribable>>>, - ), - >, - controllers: Vec>>, - ready_token: ReadyToken, + reflectors: HashMap, SubscribableBoxStream)>, } -// TODO - not sure about the error type here, we might not need to box it? -type ControllerResult = Result<(ObjectRef, Action), Box>; +type SubscribableBoxStream = StreamSubscribable>>>; impl SharedStore where K: 'static + Resource + Clone + DeserializeOwned + Debug + Send + Sync, K::DynamicType: Default + Eq + Hash + Clone, { - pub fn new(api: Api, ready_token: &ReadyToken) -> Self { + pub fn new(api: Api) -> Self { Self { watcher_provider: WatcherProvider::new(api), reflectors: HashMap::new(), - controllers: Vec::new(), - ready_token: ready_token.clone(), } } } @@ -68,74 +58,15 @@ where K::DynamicType: Default + Eq + Hash + Clone, W: CreateWatcher + 'static, { - pub fn run(self) -> impl Stream> { - let reflectors = stream::select_all(self.reflectors.into_iter().map(|(_, (_, reflector))| reflector)); - - let have_controllers = !self.controllers.is_empty(); - let controllers = stream::select_all(self.controllers); - - // todo - make sure that if ANY stream dies we die - - stream::unfold( - (reflectors, controllers, self.ready_token), - move |(mut reflectors, mut controllers, ready_token)| async move { - tokio::select!( - result = reflectors.next() => { - result.map(|r| (ProviderResult::Reflector(r), (reflectors, controllers, ready_token))) - }, - result = controllers.next(), if have_controllers && ready_token.is_ready() => { - result.map(|r| (ProviderResult::Controller(r), (reflectors, controllers, ready_token))) - } - ) - }, - ) - } - - // todo - we still need to be able to setup "watches" and "owns" - pub fn controller( - &mut self, - list_params: ListParams, - mut reconciler: impl FnMut(Arc, Arc) -> ReconcilerFut + Send + 'static, - error_policy: impl Fn(Arc, &ReconcilerFut::Error, Arc) -> Action + Send + Sync + 'static, - context: Arc, - ) where - K::DynamicType: Debug + Unpin, - ReconcilerFut: TryFuture + Send + 'static, - ReconcilerFut::Error: std::error::Error + Send + 'static, - Ctx: Send + Sync + 'static, - { - let dyntype = K::DynamicType::default(); - - let (store, event_stream) = self.reflector(list_params.clone().into()); - let self_watcher = trigger_self(event_stream.map(Ok).applied_objects(), dyntype).boxed(); - - let mut trigger_selector = stream::SelectAll::new(); - trigger_selector.push(self_watcher); - - let trigger_backoff = Box::new(watcher::default_backoff()); - - let stream = applier( - move |obj, ctx| { - CancelableJoinHandle::spawn( - reconciler(obj, ctx).into_future().in_current_span(), - &Handle::current(), - ) - }, - error_policy, - context, - store, - StreamBackoff::new(trigger_selector, trigger_backoff), - ) - .map(|result| result.map_err(|e| Box::new(e) as Box)); - - self.controllers.push(stream.boxed()); + pub fn run(self) -> impl Stream>> { + stream::select_all(self.reflectors.into_iter().map(|(_, (_, reflector))| reflector)) } - pub fn store(&mut self, list_params: ListParams) -> Store { + pub fn store(&mut self, list_params: ListParams) -> SafeStore { self.reflector(list_params).0 } - fn reflector(&mut self, list_params: ListParams) -> (Store, impl Stream>) { + fn reflector(&mut self, list_params: ListParams) -> (SafeStore, impl Stream>) { if let Some((store, prism)) = self.reflectors.get(&list_params) { return (store.clone(), prism.subscribe_ok()); } @@ -144,21 +75,19 @@ where let store_writer = Writer::default(); let store_reader = store_writer.as_reader(); - let ready_state = self.ready_token.child(); + let safe_store = SafeStore::new(store_reader); - let reflector = reflector(store_writer, watcher) - .inspect_ok(move |_| ready_state.ready()) - .boxed(); + // todo - maybe we want a "safe_reflector" ? + let safe_store_clone = safe_store.clone(); + let reflector = reflector(store_writer, watcher).inspect_ok(move |_| safe_store_clone.make_ready()); - let subscribable_reflector = reflector.subscribable(); + let subscribable_reflector = reflector.boxed().subscribable(); let event_stream = subscribable_reflector.subscribe_ok(); - self.reflectors.insert( - list_params.clone(), - (store_reader.clone(), subscribable_reflector), - ); + self.reflectors + .insert(list_params.clone(), (safe_store.clone(), subscribable_reflector)); - (store_reader, event_stream) + (safe_store, event_stream) } } @@ -203,27 +132,20 @@ pub enum ProviderResult { #[allow(clippy::expect_used, clippy::expect_used)] mod test { use super::*; + use async_trait::async_trait; use futures::stream; use k8s_openapi::api::core::v1::{ConfigMap, Pod}; use k8s_openapi::apimachinery::pkg::apis::meta::v1::ObjectMeta; use maplit::hashmap; use std::collections::VecDeque; use std::fmt::{Display, Formatter}; + use std::sync::Arc; use std::sync::Mutex; use std::time::Duration; mod store { use super::*; - #[tokio::test] - async fn it_returns_a_store() { - let lp = ListParams::default(); - let mut provider = - TestProvider::::new(hashmap!(lp.clone().into() => vec![]), &ReadyToken::new()); - let store = provider.store(lp); - assert_eq!(store.state().len(), 0); - } - #[tokio::test] async fn it_returns_stores_that_updates_on_events() { let lp = ListParams::default(); @@ -236,7 +158,7 @@ mod test { kp.spawn().await; - assert_eq!(store.cloned_state(), expected_state); + assert_eq!(store.cloned_state().await, expected_state); } #[tokio::test] @@ -253,8 +175,8 @@ mod test { provider.spawn().await; - assert_eq!(store1.cloned_state(), expected_state); - assert_eq!(store2.cloned_state(), expected_state); + assert_eq!(store1.cloned_state().await, expected_state); + assert_eq!(store2.cloned_state().await, expected_state); } #[tokio::test] @@ -276,152 +198,8 @@ mod test { kp.spawn().await; - assert_eq!(store1.cloned_state(), expected_state1, "Store 1"); - assert_eq!(store2.cloned_state(), expected_state2, "Store 2"); - } - } - - mod controller { - use super::*; - use futures::stream::select_all; - use k8s_openapi::api::core::v1::ConfigMap; - - #[tokio::test] - async fn it_creates_a_controller() { - let lp = ListParams::default().labels("foo=baz"); - let expected_state = vec![test_pod(2)]; - let mut provider = TestProvider::::new( - hashmap!( - lp.clone().into() => vec![Event::Restarted(vec![expected_state[0].clone()])], - ), - &ReadyToken::new(), - ); - - let context = Arc::new(Mutex::new(vec![])); - provider.controller( - lp.clone(), - |pod, ctx| async move { - ctx.lock().unwrap().push(pod.clone()); - Ok::<_, TestError>(Action::await_change()) - }, - |_, _, _| Action::await_change(), - context.clone(), - ); - - let store = provider.store(lp.clone()); - - provider.spawn().await; - - assert_eq!(store.cloned_state(), expected_state, "Store"); - assert_eq!(context.cloned_state(), expected_state, "Context"); - } - - #[tokio::test] - async fn it_doesnt_run_the_controller_until_the_store_has_received_a_first_event() { - let lp = ListParams::default().labels("foo=baz"); - let mut kp = TestProvider::::new( - hashmap!( - lp.clone().into() => vec![], - - ), - &ReadyToken::new(), - ); - - let context = kp.test_controller(lp.clone()); - - kp.spawn().await; - - assert!(!context.reconciled()); - } - - #[tokio::test] - async fn many_controllers_wait_until_every_store_is_ready() { - let lp = ListParams::default().labels("foo=baz"); - let ready_token = ReadyToken::new(); - let mut kp1 = TestProvider::::new( - hashmap!( - lp.clone().into() => vec![], - ), - &ready_token, - ); - let mut kp2 = TestProvider::::new( - hashmap!( - lp.clone().into() => vec![], - ), - &ready_token, - ); - - let context1 = kp1.test_controller(lp.clone()); - let context2 = kp2.test_controller(lp.clone()); - - kp1.spawn().await; - kp2.spawn().await; - - assert!(!context1.reconciled()); - assert!(!context2.reconciled()); - } - - #[tokio::test] - async fn it_doesnt_run_a_controller_if_a_store_from_another_provider_isnt_ready() { - let lp = ListParams::default().labels("foo=baz"); - let ready_token = ReadyToken::new(); - let mut kp1 = TestProvider::::new( - hashmap!( - lp.clone().into() => vec![Event::Restarted(vec![test_pod(1).clone()])], - ), - &ready_token, - ); - let mut kp2 = TestProvider::::new( - hashmap!( - lp.clone().into() => vec![], - ), - &ready_token, - ); - - let context = kp1.test_controller(lp.clone()); - let _store = kp2.store(lp.clone()); - - kp1.spawn().await; - kp2.spawn().await; - - assert!(!ready_token.is_ready(), "Ready token"); - assert!(!context.reconciled(), "Context"); - } - - #[tokio::test] - async fn many_controllers_run_after_all_stores_are_ready() { - let lp = ListParams::default().labels("foo=baz"); - let ready_token = ReadyToken::new(); - let mut provider1 = TestProvider::::new( - hashmap!( - lp.clone().into() => vec![Event::Restarted(vec![test_pod(1).clone()])], - ), - &ready_token, - ); - let mut provider2 = TestProvider::::new( - hashmap!( - lp.clone().into() => vec![Event::Restarted(vec![test_cm(1)])], - ), - &ready_token, - ); - - let context1 = provider1.test_controller(lp.clone()); - let context2 = provider2.test_controller(lp.clone()); - - tokio::spawn(async move { - select_all(vec![ - provider1.run().map(|_| ()).boxed(), - provider2.run().map(|_| ()).boxed(), - ]) - .for_each(|_| async {}) - .await; - }); - - tokio::time::sleep(Duration::from_millis(10)).await; - - assert!(ready_token.is_ready(), "ReadyToken"); - assert!(context1.reconciled(), "Context 1"); - assert!(context2.reconciled(), "Context 2"); + assert_eq!(store1.cloned_state().await, expected_state1, "Store 1"); + assert_eq!(store2.cloned_state().await, expected_state2, "Store 2"); } } @@ -460,25 +238,9 @@ mod test { events: Mutex::new(events.into_iter().map(|(k, v)| (k, v.into())).collect()), }, reflectors: HashMap::new(), - controllers: Vec::new(), - ready_token: ready_token.clone(), } } - fn test_controller(&mut self, list_params: ListParams) -> TestContext { - let context = TestContext::new(); - self.controller( - list_params, - |_, ctx| async move { - *(ctx.reconciled.lock().unwrap()) = true; - Ok::<_, TestError>(Action::await_change()) - }, - |_, _, _| Action::await_change(), - Arc::new(context.clone()), - ); - context - } - async fn spawn(self) { tokio::spawn(async move { self.run().for_each(|_| async {}).await; @@ -535,22 +297,29 @@ mod test { } } + #[async_trait] trait ClonedState { - fn cloned_state(&self) -> Vec; + async fn cloned_state(&self) -> Vec; } - impl ClonedState for Store + #[async_trait] + impl ClonedState for SafeStore where K: 'static + Resource + Clone + DeserializeOwned + Debug + Send + Sync, K::DynamicType: Clone + Default + Eq + Hash, { - fn cloned_state(&self) -> Vec { - self.state().into_iter().map(|k| (*k).clone()).collect::>() + async fn cloned_state(&self) -> Vec { + self.state() + .await + .into_iter() + .map(|k| (*k).clone()) + .collect::>() } } - impl ClonedState for Arc>>> { - fn cloned_state(&self) -> Vec { + #[async_trait] + impl ClonedState for Arc>>> { + async fn cloned_state(&self) -> Vec { self.lock() .unwrap() .iter() diff --git a/kube-runtime/src/shared_store/ready_token.rs b/kube-runtime/src/shared_store/ready_token.rs index b2bd207c8..cd244e454 100644 --- a/kube-runtime/src/shared_store/ready_token.rs +++ b/kube-runtime/src/shared_store/ready_token.rs @@ -1,71 +1,70 @@ -use parking_lot::RwLock; -use std::sync::Arc; +use tokio_util::sync::{CancellationToken, WaitForCancellationFuture}; +// Todo - Don't use a CancellationToken to track this, either find something better or write it /// A token that can be used to track readiness of multiple components. #[derive(Clone, Default)] -pub struct ReadyToken(Arc>>); - -#[derive(Clone)] -pub struct ReadyState(Arc>); +pub struct ReadyToken(CancellationToken); impl ReadyToken { pub fn new() -> Self { - Self(Arc::new(RwLock::new(vec![]))) + Self(CancellationToken::new()) } pub fn is_ready(&self) -> bool { - self.0.read().iter().all(|state| *state.0.read()) + self.0.is_cancelled() } - pub fn child(&self) -> ReadyState { - let state = ReadyState::new(); - self.0.write().push(state.clone()); - state + pub fn ready(&self) -> WaitForCancellationFuture { + self.0.cancelled() } -} -impl ReadyState { - fn new() -> Self { - Self(Arc::new(RwLock::new(false))) - } - - pub fn ready(&self) { - *(self.0.write()) = true; + pub fn make_ready(&self) { + self.0.cancel() } } #[cfg(test)] mod test { use super::*; + use futures::FutureExt; #[test] - fn it_is_ready_if_it_has_no_ready_states_to_track() { - assert!(ReadyToken::new().is_ready()); + fn it_is_not_ready_on_init() { + let rt = ReadyToken::new(); + + assert!(!rt.is_ready()); } #[test] - fn it_is_not_ready_if_it_has_a_state_that_is_not_ready() { + fn it_is_ready_after_make_ready() { let rt = ReadyToken::new(); - let _ = rt.child(); - assert!(!rt.is_ready()); + rt.make_ready(); + + assert!(rt.is_ready()); } #[test] - fn it_is_ready_when_the_child_state_is_ready() { + fn it_does_not_resolve_ready_until_ready() { let rt = ReadyToken::new(); - let mut child = rt.child(); - child.ready(); - assert!(rt.is_ready()); + + let mut fut = rt.ready().boxed(); + assert!((&mut fut).now_or_never().is_none()); + + rt.make_ready(); + assert!((&mut fut).now_or_never().is_some()) } #[test] - fn it_works_for_multiple_child_states() { + fn it_allows_multiple_futures_to_wait_to_be_ready() { let rt = ReadyToken::new(); - let mut child1 = rt.child(); - let mut child2 = rt.child(); - child1.ready(); - assert!(!rt.is_ready()); - child2.ready(); - assert!(rt.is_ready()); + + let mut fut1 = rt.ready().boxed(); + let mut fut2 = rt.ready().boxed(); + assert!((&mut fut1).now_or_never().is_none()); + assert!((&mut fut2).now_or_never().is_none()); + + rt.make_ready(); + assert!((&mut fut1).now_or_never().is_some()); + assert!((&mut fut2).now_or_never().is_some()); } } diff --git a/kube-runtime/src/shared_store/safe_store.rs b/kube-runtime/src/shared_store/safe_store.rs new file mode 100644 index 000000000..2597683e3 --- /dev/null +++ b/kube-runtime/src/shared_store/safe_store.rs @@ -0,0 +1,96 @@ +use crate::reflector::{ObjectRef, Store}; +use crate::shared_store::ready_token::ReadyToken; +use kube_client::Resource; +use std::hash::Hash; +use std::sync::Arc; + +/// A wrapper around a Store that exposes async versions of the Store's methods. These will always +/// return immediately if the store is ready, or wait until the store is ready if it is not. +#[derive(Clone)] +pub struct SafeStore +where + K::DynamicType: Hash + Eq, +{ + store: Store, + ready: ReadyToken, +} + +impl SafeStore +where + K::DynamicType: Eq + Hash + Clone, +{ + pub fn new(store: Store) -> SafeStore { + Self { + store, + ready: ReadyToken::new(), + } + } + + pub fn make_ready(&self) { + self.ready.make_ready() + } + + pub async fn get(&self, key: &ObjectRef) -> Option> { + self.ready.ready().await; + self.store.get(key) + } + + pub async fn state(&self) -> Vec> { + self.ready.ready().await; + self.store.state() + } + + pub fn store(&self) -> Store { + self.store.clone() + } +} + +#[cfg(test)] +mod test { + use super::*; + use crate::shared_store::Writer; + use crate::watcher::Event; + use futures::FutureExt; + use k8s_openapi::api::core::v1::Pod; + + #[test] + fn it_does_not_return_get_until_ready() { + let store = test_store(); + let ss = SafeStore::::new(store.clone()); + let obj_ref = &ObjectRef::new("test"); + + let mut fut = ss.get(obj_ref).boxed(); + assert!((&mut fut).now_or_never().is_none()); + + ss.make_ready(); + assert_eq!( + (&mut fut).now_or_never().expect("Should have resolved"), + store.get(obj_ref) + ); + } + + #[test] + fn it_does_not_return_state_until_ready() { + let store = test_store(); + let ss = SafeStore::::new(store.clone()); + + let mut fut = ss.state().boxed(); + assert!((&mut fut).now_or_never().is_none()); + + ss.make_ready(); + assert_eq!( + (&mut fut).now_or_never().expect("Should have resolved"), + store.state() + ); + } + + fn test_store() -> Store { + let mut store_writer = Writer::default(); + store_writer.apply_watcher_event(&Event::Restarted(vec![{ + let mut pod = Pod::default(); + pod.meta_mut().name = Some("test".to_string()); + pod + }])); + store_writer.as_reader() + } +} diff --git a/kube-runtime/src/utils/stream_subscribable.rs b/kube-runtime/src/utils/stream_subscribable.rs index 4f70bfc59..1202c82ab 100644 --- a/kube-runtime/src/utils/stream_subscribable.rs +++ b/kube-runtime/src/utils/stream_subscribable.rs @@ -2,8 +2,7 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use futures::stream::BoxStream; -use futures::{stream, Stream, StreamExt, TryStream}; +use futures::{stream, Stream, TryFutureExt, TryStream}; use pin_project::pin_project; use tokio::sync::broadcast; @@ -29,7 +28,7 @@ where Self { stream, sender } } - /// Subscribe to non-error events from this stream. + /// Subscribe to success events from this stream pub fn subscribe_ok(&self) -> impl Stream { stream::unfold(self.sender.subscribe(), |mut rx| async move { match rx.recv().await { diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index a6f30c30f..678def3b7 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -3,9 +3,10 @@ use crate::{ watcher, }; use backoff::backoff::Backoff; +use std::future::Future; use crate::utils::stream_subscribable::StreamSubscribable; -use futures::{Stream, TryStream}; +use futures::{Stream, StreamExt, TryStream}; /// Extension trait for streams returned by [`watcher`](watcher()) or [`reflector`](crate::reflector::reflector) pub trait WatchStreamExt: Stream { From ccbbd265ff4e8404f985ec6b7538372a642f5674 Mon Sep 17 00:00:00 2001 From: Dan Spencer Date: Tue, 31 Jan 2023 16:56:32 +0000 Subject: [PATCH 5/6] Added subscribe methods to the shared store --- kube-runtime/src/shared_store/mod.rs | 211 +++++++++++++++++++-------- 1 file changed, 154 insertions(+), 57 deletions(-) diff --git a/kube-runtime/src/shared_store/mod.rs b/kube-runtime/src/shared_store/mod.rs index 9bdd6eeb1..2ffd10cc2 100644 --- a/kube-runtime/src/shared_store/mod.rs +++ b/kube-runtime/src/shared_store/mod.rs @@ -1,13 +1,11 @@ mod ready_token; mod safe_store; -use crate::controller::{trigger_self, Action}; -use crate::shared_store::ready_token::ReadyToken; +use crate::controller::Action; use crate::shared_store::safe_store::SafeStore; use crate::utils::StreamSubscribable; use crate::watcher::Event; use crate::{ - applier, reflector::{ reflector, store::{Store, Writer}, @@ -26,8 +24,7 @@ use std::fmt::Debug; use std::hash::Hash; use stream::BoxStream; -// TODO - Not sure this is the right name? -pub struct SharedStore> +pub struct SharedStore> where K: 'static + Resource + Clone + DeserializeOwned + Debug + Send, K::DynamicType: Hash + Eq, @@ -44,9 +41,9 @@ where K: 'static + Resource + Clone + DeserializeOwned + Debug + Send + Sync, K::DynamicType: Default + Eq + Hash + Clone, { - pub fn new(api: Api) -> Self { + pub fn new(client: Client) -> Self { Self { - watcher_provider: WatcherProvider::new(api), + watcher_provider: WatcherFactory::new(client), reflectors: HashMap::new(), } } @@ -62,16 +59,78 @@ where stream::select_all(self.reflectors.into_iter().map(|(_, (_, reflector))| reflector)) } - pub fn store(&mut self, list_params: ListParams) -> SafeStore { - self.reflector(list_params).0 + pub fn namespaced(&mut self, namespace: &str, list_params: ListParams) -> SafeStore + where + K: Resource, + { + if let Some((store, _)) = self.reflectors.get(&list_params) { + return store.clone(); + } + + let watcher = self.watcher_provider.namespaced(namespace, list_params.clone()); + + self.reflector(watcher, list_params) + } + + pub fn all(&mut self, list_params: ListParams) -> SafeStore { + if let Some((store, _)) = self.reflectors.get(&list_params) { + return store.clone(); + } + + let watcher = self.watcher_provider.all(list_params.clone()); + + self.reflector(watcher, list_params) + } + + pub fn subscribe_namespaced( + &mut self, + namespace: &str, + list_params: ListParams, + ) -> impl Stream> + where + K: Resource, + { + if let Some((_, reflector)) = self.reflectors.get(&list_params) { + return reflector.subscribe_ok(); + } + + let watcher = self.watcher_provider.namespaced(namespace, list_params.clone()); + + self.reflector(watcher, list_params.clone()); + + // todo -We can safely unwrap here because we know we just created it ... but it's horrible, so we should fix it + self.reflectors + .get(&list_params) + .expect("reflector must exist") + .1 + .subscribe_ok() + } + + pub fn subscribe_all(&mut self, list_params: ListParams) -> impl Stream> + where + K: Resource, + { + if let Some((_, reflector)) = self.reflectors.get(&list_params) { + return reflector.subscribe_ok(); + } + + let watcher = self.watcher_provider.all(list_params.clone()); + + self.reflector(watcher, list_params.clone()); + + // todo -We can safely unwrap here because we know we just created it ... but it's horrible, so we should fix it + self.reflectors + .get(&list_params) + .expect("reflector must exist") + .1 + .subscribe_ok() } - fn reflector(&mut self, list_params: ListParams) -> (SafeStore, impl Stream>) { + fn reflector(&mut self, pending_watcher: PendingWatcher, list_params: ListParams) -> SafeStore { if let Some((store, prism)) = self.reflectors.get(&list_params) { - return (store.clone(), prism.subscribe_ok()); + return store.clone(); } - let watcher = self.watcher_provider.watcher(list_params.clone()); let store_writer = Writer::default(); let store_reader = store_writer.as_reader(); @@ -79,7 +138,8 @@ where // todo - maybe we want a "safe_reflector" ? let safe_store_clone = safe_store.clone(); - let reflector = reflector(store_writer, watcher).inspect_ok(move |_| safe_store_clone.make_ready()); + let reflector = + reflector(store_writer, pending_watcher.run()).inspect_ok(move |_| safe_store_clone.make_ready()); let subscribable_reflector = reflector.boxed().subscribable(); let event_stream = subscribable_reflector.subscribe_ok(); @@ -87,39 +147,65 @@ where self.reflectors .insert(list_params.clone(), (safe_store.clone(), subscribable_reflector)); - (safe_store, event_stream) + safe_store } } -pub trait CreateWatcher -where - K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, -{ - fn watcher(&self, list_params: ListParams) -> BoxStream<'static, watcher::Result>>; +pub trait CreateWatcher { + fn all(&self, list_params: ListParams) -> PendingWatcher; + + fn namespaced(&self, namespace: &str, list_params: ListParams) -> PendingWatcher + where + K: Resource; } -pub struct WatcherProvider -where - K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, -{ - api: Api, +pub struct PendingWatcher(BoxStream<'static, watcher::Result>>); + +impl PendingWatcher { + fn new(stream: S) -> Self + where + S: Stream>> + Send + 'static, + { + Self(stream.boxed()) + } + + fn run(self) -> impl Stream>> { + self.0 + } } -impl WatcherProvider -where - K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, -{ - fn new(api: Api) -> Self { - Self { api } +pub struct WatcherFactory { + client: Client, + _phantom: std::marker::PhantomData, +} + +impl WatcherFactory { + fn new(client: Client) -> Self { + Self { + client, + _phantom: std::marker::PhantomData, + } } } -impl CreateWatcher for WatcherProvider +impl CreateWatcher for WatcherFactory where K: Resource + Clone + DeserializeOwned + Debug + Send + 'static, + ::DynamicType: Default, { - fn watcher(&self, list_params: ListParams) -> BoxStream<'static, watcher::Result>> { - watcher(self.api.clone(), list_params).boxed() + fn all(&self, list_params: ListParams) -> PendingWatcher { + // is it worth catching the APIs within the provider? + PendingWatcher::new(watcher(Api::all(self.client.clone()), list_params)) + } + + fn namespaced(&self, namespace: &str, list_params: ListParams) -> PendingWatcher + where + K: Resource, + { + PendingWatcher::new(watcher( + Api::namespaced(self.client.clone(), namespace), + list_params, + )) } } @@ -150,13 +236,12 @@ mod test { async fn it_returns_stores_that_updates_on_events() { let lp = ListParams::default(); let expected_state = vec![test_pod(1)]; - let mut kp = TestProvider::::new( + let mut ss = TestProvider::::new( hashmap!(lp.clone().into() => vec![Event::Restarted(expected_state.clone())]), - &ReadyToken::new(), ); - let store = kp.store(lp); + let store = ss.all(lp); - kp.spawn().await; + ss.spawn().await; assert_eq!(store.cloned_state().await, expected_state); } @@ -165,15 +250,14 @@ mod test { async fn it_returns_the_same_store_for_the_same_list_params() { let lp = ListParams::default().labels("foo=bar"); let expected_state = vec![test_pod(1)]; - let mut provider = TestProvider::::new( + let mut ss = TestProvider::::new( hashmap!(lp.clone().into() => vec![Event::Restarted(expected_state.clone())]), - &ReadyToken::new(), ); - let store1 = provider.store(lp.clone()); - let store2 = provider.store(lp); + let store1 = ss.all(lp.clone()); + let store2 = ss.all(lp); - provider.spawn().await; + ss.spawn().await; assert_eq!(store1.cloned_state().await, expected_state); assert_eq!(store2.cloned_state().await, expected_state); @@ -185,18 +269,15 @@ mod test { let lp2 = ListParams::default().labels("foo=baz"); let expected_state1 = vec![test_pod(1)]; let expected_state2 = vec![test_pod(2)]; - let mut kp = TestProvider::::new( - hashmap!( - lp1.clone().into() => vec![Event::Restarted(expected_state1.clone())], - lp2.clone().into() => vec![Event::Restarted(expected_state2.clone())], - ), - &ReadyToken::new(), - ); + let mut ss = TestProvider::::new(hashmap!( + lp1.clone().into() => vec![Event::Restarted(expected_state1.clone())], + lp2.clone().into() => vec![Event::Restarted(expected_state2.clone())], + )); - let store1 = kp.store(lp1); - let store2 = kp.store(lp2); + let store1 = ss.all(lp1); + let store2 = ss.all(lp2); - kp.spawn().await; + ss.spawn().await; assert_eq!(store1.cloned_state().await, expected_state1, "Store 1"); assert_eq!(store2.cloned_state().await, expected_state2, "Store 2"); @@ -232,7 +313,7 @@ mod test { K: 'static + Resource + Clone + DeserializeOwned + Debug + Send + Sync, K::DynamicType: Clone + Debug + Default + Eq + Hash + Unpin, { - fn new(events: HashMap>>, ready_token: &ReadyToken) -> Self { + fn new(events: HashMap>>) -> Self { Self { watcher_provider: TestWatcherProvider { events: Mutex::new(events.into_iter().map(|(k, v)| (k, v.into())).collect()), @@ -278,7 +359,24 @@ mod test { K: 'static + Resource + Clone + DeserializeOwned + Debug + Send, K::DynamicType: Hash + Eq, { - fn watcher(&self, list_params: ListParams) -> BoxStream<'static, watcher::Result>> { + fn all(&self, list_params: ListParams) -> PendingWatcher { + self.watcher(list_params) + } + + fn namespaced(&self, _namespace: &str, list_params: ListParams) -> PendingWatcher + where + K: Resource, + { + self.watcher(list_params) + } + } + + impl TestWatcherProvider + where + K: 'static + Resource + Clone + DeserializeOwned + Debug + Send, + K::DynamicType: Hash + Eq, + { + fn watcher(&self, list_params: ListParams) -> PendingWatcher { let events = self .events .lock() @@ -286,14 +384,13 @@ mod test { .remove(&list_params.into()) .expect("There can be only one stream per ListParams"); - stream::unfold(events, |mut events| async move { + PendingWatcher::new(stream::unfold(events, |mut events| async move { match events.pop_front() { Some(event) => Some((Ok(event), events)), // if there's nothing left we block to simulate waiting for a change None => futures::future::pending().await, } - }) - .boxed() + })) } } From dd4bcb586817ce0ccac77012a8f8492d9bdf1506 Mon Sep 17 00:00:00 2001 From: Dan Spencer Date: Wed, 1 Feb 2023 08:25:16 +0000 Subject: [PATCH 6/6] Fixed issue with SharedStore not correctly handling different scopes --- kube-runtime/src/shared_store/mod.rs | 97 +++++++++++++------ kube-runtime/src/utils/stream_subscribable.rs | 2 +- 2 files changed, 68 insertions(+), 31 deletions(-) diff --git a/kube-runtime/src/shared_store/mod.rs b/kube-runtime/src/shared_store/mod.rs index 2ffd10cc2..b205faa65 100644 --- a/kube-runtime/src/shared_store/mod.rs +++ b/kube-runtime/src/shared_store/mod.rs @@ -31,7 +31,7 @@ where W: CreateWatcher, { watcher_provider: W, - reflectors: HashMap, SubscribableBoxStream)>, + reflectors: HashMap<(Option, ListParams), (SafeStore, SubscribableBoxStream)>, } type SubscribableBoxStream = StreamSubscribable>>>; @@ -63,23 +63,26 @@ where where K: Resource, { - if let Some((store, _)) = self.reflectors.get(&list_params) { + if let Some((store, _)) = self + .reflectors + .get(&(Some(namespace.to_string()), list_params.clone())) + { return store.clone(); } let watcher = self.watcher_provider.namespaced(namespace, list_params.clone()); - self.reflector(watcher, list_params) + self.reflector(watcher, Some(namespace.to_string()), list_params) } pub fn all(&mut self, list_params: ListParams) -> SafeStore { - if let Some((store, _)) = self.reflectors.get(&list_params) { + if let Some((store, _)) = self.reflectors.get(&(None, list_params.clone())) { return store.clone(); } let watcher = self.watcher_provider.all(list_params.clone()); - self.reflector(watcher, list_params) + self.reflector(watcher, None, list_params) } pub fn subscribe_namespaced( @@ -90,17 +93,20 @@ where where K: Resource, { - if let Some((_, reflector)) = self.reflectors.get(&list_params) { + if let Some((_, reflector)) = self + .reflectors + .get(&(Some(namespace.to_string()), list_params.clone())) + { return reflector.subscribe_ok(); } let watcher = self.watcher_provider.namespaced(namespace, list_params.clone()); - self.reflector(watcher, list_params.clone()); + self.reflector(watcher, Some(namespace.to_string()), list_params.clone()); - // todo -We can safely unwrap here because we know we just created it ... but it's horrible, so we should fix it + // todo - We can safely unwrap here because we know we just created it ... but it's horrible, so we should fix it self.reflectors - .get(&list_params) + .get(&(Some(namespace.to_string()), list_params)) .expect("reflector must exist") .1 .subscribe_ok() @@ -110,27 +116,28 @@ where where K: Resource, { - if let Some((_, reflector)) = self.reflectors.get(&list_params) { + if let Some((_, reflector)) = self.reflectors.get(&(None, list_params.clone())) { return reflector.subscribe_ok(); } let watcher = self.watcher_provider.all(list_params.clone()); - self.reflector(watcher, list_params.clone()); + self.reflector(watcher, None, list_params.clone()); - // todo -We can safely unwrap here because we know we just created it ... but it's horrible, so we should fix it + // todo - We can safely unwrap here because we know we just created it ... but it's horrible, so we should fix it self.reflectors - .get(&list_params) + .get(&(None, list_params)) .expect("reflector must exist") .1 .subscribe_ok() } - fn reflector(&mut self, pending_watcher: PendingWatcher, list_params: ListParams) -> SafeStore { - if let Some((store, prism)) = self.reflectors.get(&list_params) { - return store.clone(); - } - + fn reflector( + &mut self, + pending_watcher: PendingWatcher, + scope: Option, + list_params: ListParams, + ) -> SafeStore { let store_writer = Writer::default(); let store_reader = store_writer.as_reader(); @@ -145,7 +152,7 @@ where let event_stream = subscribable_reflector.subscribe_ok(); self.reflectors - .insert(list_params.clone(), (safe_store.clone(), subscribable_reflector)); + .insert((scope, list_params), (safe_store.clone(), subscribable_reflector)); safe_store } @@ -237,7 +244,7 @@ mod test { let lp = ListParams::default(); let expected_state = vec![test_pod(1)]; let mut ss = TestProvider::::new( - hashmap!(lp.clone().into() => vec![Event::Restarted(expected_state.clone())]), + hashmap!((None, lp.clone().into()) => vec![Event::Restarted(expected_state.clone())]), ); let store = ss.all(lp); @@ -251,7 +258,7 @@ mod test { let lp = ListParams::default().labels("foo=bar"); let expected_state = vec![test_pod(1)]; let mut ss = TestProvider::::new( - hashmap!(lp.clone().into() => vec![Event::Restarted(expected_state.clone())]), + hashmap!((None, lp.clone().into()) => vec![Event::Restarted(expected_state.clone())]), ); let store1 = ss.all(lp.clone()); @@ -270,8 +277,8 @@ mod test { let expected_state1 = vec![test_pod(1)]; let expected_state2 = vec![test_pod(2)]; let mut ss = TestProvider::::new(hashmap!( - lp1.clone().into() => vec![Event::Restarted(expected_state1.clone())], - lp2.clone().into() => vec![Event::Restarted(expected_state2.clone())], + (None, lp1.clone().into()) => vec![Event::Restarted(expected_state1.clone())], + (None, lp2.clone().into()) => vec![Event::Restarted(expected_state2.clone())], )); let store1 = ss.all(lp1); @@ -282,6 +289,36 @@ mod test { assert_eq!(store1.cloned_state().await, expected_state1, "Store 1"); assert_eq!(store2.cloned_state().await, expected_state2, "Store 2"); } + + #[tokio::test] + async fn it_returns_different_stores_by_scope() { + let lp = ListParams::default().labels("foo=bar"); + let ns = "ns1"; + let expected_state1 = vec![test_pod(1)]; + let expected_state2 = vec![test_pod(2)]; + let mut ss = TestProvider::::new(hashmap!( + (None, lp.clone().into()) => vec![Event::Restarted(expected_state1.clone())], + (Some(ns.to_string()), lp.clone().into()) => vec![Event::Restarted(expected_state2.clone())], + )); + + let cluster_store1 = ss.all(lp.clone()); + let ns_store1 = ss.namespaced(ns, lp.clone()); + let cluster_store2 = ss.all(lp); + + ss.spawn().await; + + assert_eq!( + cluster_store1.cloned_state().await, + expected_state1, + "ClusterStore 1" + ); + assert_eq!(ns_store1.cloned_state().await, expected_state2, "NS Store 1"); + assert_eq!( + cluster_store2.cloned_state().await, + expected_state1, + "Cluster Store 2" + ); + } } fn test_pod(postfix: usize) -> Pod { @@ -313,7 +350,7 @@ mod test { K: 'static + Resource + Clone + DeserializeOwned + Debug + Send + Sync, K::DynamicType: Clone + Debug + Default + Eq + Hash + Unpin, { - fn new(events: HashMap>>) -> Self { + fn new(events: HashMap<(Option, ListParams), Vec>>) -> Self { Self { watcher_provider: TestWatcherProvider { events: Mutex::new(events.into_iter().map(|(k, v)| (k, v.into())).collect()), @@ -351,7 +388,7 @@ mod test { } struct TestWatcherProvider { - events: Mutex>>>, + events: Mutex, ListParams), VecDeque>>>, } impl CreateWatcher for TestWatcherProvider @@ -360,14 +397,14 @@ mod test { K::DynamicType: Hash + Eq, { fn all(&self, list_params: ListParams) -> PendingWatcher { - self.watcher(list_params) + self.watcher(None, list_params) } - fn namespaced(&self, _namespace: &str, list_params: ListParams) -> PendingWatcher + fn namespaced(&self, namespace: &str, list_params: ListParams) -> PendingWatcher where K: Resource, { - self.watcher(list_params) + self.watcher(Some(namespace.to_string()), list_params) } } @@ -376,12 +413,12 @@ mod test { K: 'static + Resource + Clone + DeserializeOwned + Debug + Send, K::DynamicType: Hash + Eq, { - fn watcher(&self, list_params: ListParams) -> PendingWatcher { + fn watcher(&self, scope: Option, list_params: ListParams) -> PendingWatcher { let events = self .events .lock() .unwrap() - .remove(&list_params.into()) + .remove(&(scope, list_params.into())) .expect("There can be only one stream per ListParams"); PendingWatcher::new(stream::unfold(events, |mut events| async move { diff --git a/kube-runtime/src/utils/stream_subscribable.rs b/kube-runtime/src/utils/stream_subscribable.rs index 1202c82ab..d3f038075 100644 --- a/kube-runtime/src/utils/stream_subscribable.rs +++ b/kube-runtime/src/utils/stream_subscribable.rs @@ -2,7 +2,7 @@ use core::{ pin::Pin, task::{Context, Poll}, }; -use futures::{stream, Stream, TryFutureExt, TryStream}; +use futures::{stream, Stream, TryStream}; use pin_project::pin_project; use tokio::sync::broadcast;