-
-
Notifications
You must be signed in to change notification settings - Fork 320
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Added in a first attempt at a solution to sharing watchers between stores and controllers #1128
Conversation
…ores and controllers
Thanks a lot for taking the initiative on this and getting a reasonable starting PR out. There's definitely some good ideas here, even though it'll take some time to get something as ambitious as this over the line. I'll put some comments on various bits of the bits here (starting with the easier* stuff) over the next coming days. Will need to think a bit about this as well. |
pub struct Prism<K> { | ||
reflector: BoxStream<'static, watcher::Result<Event<K>>>, | ||
ready_state: ReadyState, | ||
sender: broadcast::Sender<Event<K>>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'll start here. This Prism/SharedStore pattern can work, but it feels a bit inside out.
Ideally, this Prism
logic can be a method on WatchStreamExt
, rather than a public top level type because then it is compatible with both reflector
and watcher
. That might actually be pretty close to what you have with WatchStreamExt::subscribe
returning a Prism
AFAIKT since you impl Stream
for it.
For the second problem; blocking until ready; I feel like that can maybe be done in WatchStreamExt
as well. We already have WatchStreamExt::backoff
there as a way to limit flow, so it should be possible to do something similar to wait until we have seen ONE element (a Once thing inside a struct that impls stream can maybe embed a token) without having the user manage/pass ready tokens around. That way we could conceivably do something like:
let cm_stream = reflector(writer, watcher(...)).once_ready().await;
let cm_stream_cpy = cm_stream.subscribe();
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(it would be a minor performance hit to do multiple .once_ready().await
s sequentially, but I think this can be worked around with a join_all
before creating subscriptions.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Cheers for the suggestions; I'll look at moving some of this stuff around onto the WatchStreamExt. I'm still on a pretty steep learning curve with streams so you'll have to bear with me on lots of boxed streams until I fully get my head wrapped around things properly 😅.
I have a concern with the once_ready
suggestion in that it could be easy to forget, or not discovered, until you get bitten by a bug caused by a race condition.
If we're still thinking of something like the SharedStore
struct that provides a wrapper over Store
creation then that alleviate that concern somewhat. Easy mode would be shared_store.store(lp).await
and if more fine gained control is wanted then it's perhaps not unreasonable to expect people to invest a bit more in learning the API.
The token approach does allow the Store creation to remain synchronous rather than async (although I'm not sure how valuable that actually is).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some more thoughts on once_ready
(because it's not far from how we currently handle our stores and trying to implement something like it reminded me of some of the issues we had to tackle).
It would probably need to not only be async, but also return be resulty since the stream could end before the await completes successfully. Our implementation also takes a timeout so that the future won't wait indefinitely if the stream never successfully receives an event, which is another type of failures that needs to be handled.
One of my aims with the initial approach was to defer these problems away from setup to the point where you start consuming the streams.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yeah, good point about the "missed once_ready
" type mistake. Not sure what the best way for that would be. Possible you could do a wrapper type for the stream to encapsulate the "post-init" state, but that's pretty hairy to do on streams (maybe even too hairy to be worth it).
As for the Result
problem, you could still collect futures (of once_ready
results) early without awaiting them, then do a try_join!
on all the streams before subscribing, but I guess that does not plug into timeout
easily? We might have to supply a helper that does a raw select!
to do this nicely possibly. It sounds brittle 😞
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm just going to re-iterate the specific problem we're trying to solve so it's clear in my head.
When you call .state()
on a Store
you may get a result that hasn't yet being initialised. We're currently trying to solve the init problem; either by not running controllers until all the stores have reported ready, or by supplying a future that can be waited until a store is ready. Once we have declared the store "ready" in some way we know it's safe to use.
Perhaps we're tackling the problem in the wrong place? We could provide a wrapper over store (SafeStore
?), or (more radically) change the interface of Store
itself so that calling .state()
or .get()
are async. They won't return a result until the Store
has initialised but once it has will always instantly poll ready with the answer.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've had a think about the approach I started here and realised the I was bundling everything together specifically to solve the race condition problem. If you can solve the race condition in a different way you can unbundle everything.
Given that; I've pared things right back and deleted all of the Controller stuff for now to focus on the duplicate stores and race condition (which are separate concerns which probably belong in their own PRs, keeping it all here for now for historical reasons while it's discussed).
I've added in a struct called SafeStore
which exposes async versions of the Store
methods. They will block until the Store
has received the first event and then always instantly return the value. Instances of SafeStore
can be given to controllers with no worry about race conditions.
I've also added a subscribable
method in watch_ext
which returns a version of the watch stream that exposes a subscribe_ok
method. This will allow controllers to hook into the same stream that a store uses (probably with an approach something like you discussed in Issue 1080).
As I mentioned above SharedStore
has dropped all notions of handling controllers. It now just provides Stores
with the same caching mechanism as before. Additional I've added in subscribe
methods to the SharedStore
, allowing controllers to pull their streams from them.
To revisit the original example I put above:
fn setup_pvc_controller(
namespace: &str,
pvc_shared_store: &mut SharedStore<PersistentVolumeClaim>,
pod_shared_store: &mut SharedStore<Pod>,
cm_shared_store: &mut SharedStore<ConfigMap>,
) {
let pod_store = pod_shared_store.all(ListParams::default().labels(“app=nginx”));
let cm_store = cm_shared_store.all(ListParams::default().labels(“app=other”));
// Given that to setup a controller you'll always call both of these method, perhaps there should be a single
// call that returns both the store and the subscription?
let pvc_store = pvc_shared_store.namespaced(namespace, ListParams::default().labels(“app=nginx”))
let pvc_stream = pvc_shared_store.namespaced_subscribe(namespace, ListParams::default().labels(“app=nginx”))
// Below syntax is entire made up
Controller::new_from(pvc_store, pvc_stream).run(....)
}
#[pin_project] | ||
/// todo - docs | ||
#[must_use = "streams do nothing unless polled"] | ||
pub struct StreamSubscribable<S> | ||
where | ||
S: TryStream, | ||
{ | ||
#[pin] | ||
stream: S, | ||
sender: broadcast::Sender<S::Ok>, | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I do like the setup of this now. It's fairly orthogonal to the rest of this PR and should have wide usability outside of the more ambitious scope of this PR, so it might be worth cleaning this file up (plus the watch_ext callee) in an own PR so we can reduce the scope here.
There's only tests and docs (incl a doc example) missing. Tests can follow other WatchStreamExt setup and check some reasonable edge cases; more than 1 auxiliary subscriber, and verify that sending an Err propagates the err everywhere, etc.
#[pin_project] | ||
/// todo - docs | ||
#[must_use = "streams do nothing unless polled"] | ||
pub struct StreamSubscribable<S> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would call this StreamSubscribe
to be more in line with normal, unconjugated naming of https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html
pub async fn get(&self, key: &ObjectRef<K>) -> Option<Arc<K>> { | ||
self.ready.ready().await; | ||
self.store.get(key) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a little worried about everything hinging on this future not being ready. This means we can pass around SafeStores that are not actually ready. Given that "being ready" is a one-time thing to wait, it feels to me that this should be done AS we transition INTO the SafeStore. This also would simplify our usage in more complicated bits because we can just assume the invariant than have to account for the possibility of it with async (which is sometimes hard inside combinator style async code).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
effectively what i want is an async TryFrom style conversion (maybe via https://docs.rs/async-convert/latest/async_convert/trait.TryFrom.html - haven't really evaluated this crate)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the main thing I'm trying to address is decoupling setup from usage. If the await was on the transition into of SafeStore
you still need to start consuming the watcher stream before you could get a SafeStore
to pass to a Controller
. That would make wiring together lots of stores and controllers much more complicated.
Maybe a better alternative would be to return a Result::Err
from the store if not ready; at least that way if you do hit the race condition (or you forget to start consuming your watcher stream) you just get an error StoreNotReady
rather than an await
that could potentially hang forever.
I'll focus on tidying up the StreamSubscribe
for now then circle back around to think more about this (and have a look into the crate you've linked).
Motivation
We run quite a complex Kubernetes Operator, with lots of stores and controllers. As the project has grown we've shifted the controller logic out into many independent libraries with the eventual goal of having a binary which is just a runner for all of our small unit testable controller libraries.
While this approach has been largely successful we're still grappling with a few concerns that we'd like to address:
We have more than one cache of the same data from Kubernetes in our service (Issue 1080). We attempted to address this by having a god object in the primary service, but it ended up having knowing way too much about the things it was running; and it didn't solve the problem of controllers running on different stores to the stores.
Stores startup is racey against controllers (Discussion 1124). Before you can run a controller you have to ensure that all stores passed to it have received at least one event otherwise you run the risk of reconciling against incorrect (missing) data.
Running everything in a consistent manner. This one is probably less of a general problem and more specific to the way we've implemented things; but with all of these controllers and stores (especially given point 2) running all of these streams gets complicated.
Solution
N.B. The solution is still rough / incomplete in a few areas. I’m creating a draft PR now for feedback if this is something that could be considered valuable for the
kube-runtime
lib.The struct currently known as
SharedStore
(probably not the right name but it's a starting point) wraps a singleApi
and allows for the creation of stores and controllers from thatApi
. Here's how it addresses each of the concerns we had above:Whenever a store is requested it's stored inside the
SharedStore
and indexed by itsListParams
, if the same store (e.g. sameListParams
) is requested again (either for a controller or a store) then the existing store is returned. This approach keeps the details of what's being requested close to the call point, with the head of the application only needs to know what types of resources are being managed.Each
SharedStore
takes a reference to aReadyToken
to allow communication betweenSharedStores
about their ready status. Each new store requested registers itself against theReadyToken
and marks itself as ready after receiving the first event. When everything is ran (see point 3) the controllers will only begin running once every store has received its first event.The
SharedStore
bundles up all of the streams for each store and controller and follows a pattern similar toController::new(...).run(...)
where it consumes itself and returns a stream. The stream will only poll the controller streams once theReadyToken
has marked itself as ready.Here's a contrived example of how it could work: