Skip to content

Commit

Permalink
Fixup watcher docs, and watchlist doc examples (#1284)
Browse files Browse the repository at this point in the history
* Fixup watcher docs, and watchlist doc examples

Lots of missing references in doc builds because many of these are warnings.

Removed the `namespace_reflector` because it's a slowly changing thing to watch, and instead added a way to opt-in to `WATCHLIST=1` in pod and node watchers.

Was trying to do this on event_watcher also but apparently events on k3s does not support sendInitialEvents on 1.27 yet so left it out there.
While in the area i also updated event_watcher to use the new `Event` type to make the example better.

Signed-off-by: clux <[email protected]>

* clippy

Signed-off-by: clux <[email protected]>

---------

Signed-off-by: clux <[email protected]>
  • Loading branch information
clux authored Aug 24, 2023
1 parent f44ce00 commit 9685bc1
Show file tree
Hide file tree
Showing 11 changed files with 56 additions and 82 deletions.
4 changes: 0 additions & 4 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,3 @@ path = "secret_syncer.rs"
name = "pod_shell_crossterm"
path = "pod_shell_crossterm.rs"
required-features = ["ws"]

[[example]]
name = "namespace_reflector"
path = "namespace_reflector.rs"
6 changes: 6 additions & 0 deletions examples/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,12 @@ cargo run --example node_watcher
cargo run --example dynamic_watcher
```

The `node_` and `pod_` watcher also allows using [Kubernetes 1.27 Streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists) via `WATCHLIST=1`:

```sh
WATCHLIST=1 RUST_LOG=info,kube=debug cargo run --example pod_watcher
```

### controllers
Main example requires you creating the custom resource first:

Expand Down
24 changes: 15 additions & 9 deletions examples/event_watcher.rs
Original file line number Diff line number Diff line change
@@ -1,21 +1,19 @@
use futures::{pin_mut, TryStreamExt};
use k8s_openapi::api::core::v1::Event;
use k8s_openapi::api::{core::v1::ObjectReference, events::v1::Event};
use kube::{
api::Api,
runtime::{watcher, WatchStreamExt},
Client,
};
use tracing::*;
use tracing::info;

#[tokio::main]
async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let events: Api<Event> = Api::all(client);
let wc = watcher::Config::default();

let ew = watcher(events, wc).applied_objects();
let ew = watcher(events, watcher::Config::default()).applied_objects();

pin_mut!(ew);
while let Some(event) = ew.try_next().await? {
Expand All @@ -27,10 +25,18 @@ async fn main() -> anyhow::Result<()> {
// This function lets the app handle an added/modified event from k8s
fn handle_event(ev: Event) -> anyhow::Result<()> {
info!(
"Event: \"{}\" via {} {}",
ev.message.unwrap().trim(),
ev.involved_object.kind.unwrap(),
ev.involved_object.name.unwrap()
"{}: {} ({})",
ev.regarding.map(fmt_obj_ref).unwrap_or_default(),
ev.reason.unwrap_or_default(),
ev.note.unwrap_or_default(),
);
Ok(())
}

fn fmt_obj_ref(oref: ObjectReference) -> String {
format!(
"{}/{}",
oref.kind.unwrap_or_default(),
oref.name.unwrap_or_default()
)
}
49 changes: 0 additions & 49 deletions examples/namespace_reflector.rs

This file was deleted.

8 changes: 7 additions & 1 deletion examples/node_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,13 @@ async fn main() -> anyhow::Result<()> {
let events: Api<Event> = Api::all(client.clone());
let nodes: Api<Node> = Api::all(client.clone());

let wc = watcher::Config::default().labels("beta.kubernetes.io/arch=amd64");
let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false);
let wc = if use_watchlist {
// requires WatchList feature gate on 1.27 or later
watcher::Config::default().streaming_lists()
} else {
watcher::Config::default()
};
let obs = watcher(nodes, wc).default_backoff().applied_objects();

pin_mut!(obs);
Expand Down
9 changes: 8 additions & 1 deletion examples/pod_watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,15 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;
let api = Api::<Pod>::default_namespaced(client);
let use_watchlist = std::env::var("WATCHLIST").map(|s| s == "1").unwrap_or(false);
let wc = if use_watchlist {
// requires WatchList feature gate on 1.27 or later
watcher::Config::default().streaming_lists()
} else {
watcher::Config::default()
};

watcher(api, watcher::Config::default())
watcher(api, wc)
.applied_objects()
.default_backoff()
.try_for_each(|p| async move {
Expand Down
6 changes: 3 additions & 3 deletions kube-core/src/params.rs
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ impl ListParams {
} else {
// When there's a continue token, we don't want to set resourceVersion
if let Some(rv) = &self.resource_version {
if rv != "0" || (rv == "0" && self.limit.is_none()) {
if rv != "0" || self.limit.is_none() {
qp.append_pair("resourceVersion", rv.as_str());

match &self.version_match {
Expand Down Expand Up @@ -189,7 +189,7 @@ impl ListParams {

/// Sets an arbitary resource version match strategy
///
/// A non-default strategy such as `VersionMatch::Exact` or `VersionMatch::NotGreaterThan`
/// A non-default strategy such as `VersionMatch::Exact` or `VersionMatch::NotOlderThan`
/// requires an explicit `resource_version` set to pass request validation.
#[must_use]
pub fn matching(mut self, version_match: VersionMatch) -> Self {
Expand Down Expand Up @@ -472,7 +472,7 @@ impl WatchParams {

/// Constructor for doing Kubernetes 1.27 Streaming List watches
///
/// Enables [`VersionMatch::NotGreaterThan`] semantics and [`WatchParams::send_initial_events`].
/// Enables [`VersionMatch::NotOlderThan`] semantics and [`WatchParams::send_initial_events`].
pub fn streaming_lists() -> Self {
Self {
send_initial_events: true,
Expand Down
4 changes: 2 additions & 2 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -428,8 +428,8 @@ pub struct Config {
impl Config {
/// The debounce duration used to deduplicate reconciliation requests.
///
/// When set to a non-zero duration, debouncing is enabled in the [`Scheduler`] resulting
/// in __trailing edge debouncing__ of reqonciler requests.
/// When set to a non-zero duration, debouncing is enabled in the [`scheduler`](crate::scheduler())
/// resulting in __trailing edge debouncing__ of reqonciler requests.
/// This option can help to reduce the amount of unnecessary reconciler calls
/// when using multiple controller relations, or during rapid phase transitions.
///
Expand Down
2 changes: 0 additions & 2 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,8 +117,6 @@ where
S::Ok: Debug,
S::Error: Debug,
{
// `arc_with_non_send_sync` false positive: https://github.com/rust-lang/rust-clippy/issues/11076
#[allow(clippy::arc_with_non_send_sync)]
let stream = Arc::new(Mutex::new(stream.into_stream().peekable()));
(
SplitCase {
Expand Down
2 changes: 0 additions & 2 deletions kube-runtime/src/utils/stream_subscribe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,6 @@ impl<S: Stream> Stream for StreamSubscribe<S> {

match item {
Poll::Ready(Some(item)) => {
// `arc_with_non_send_sync` false positive: https://github.com/rust-lang/rust-clippy/issues/11076
#[allow(clippy::arc_with_non_send_sync)]
let item = Arc::new(item);
this.sender.send(Some(item.clone())).ok();
Poll::Ready(Some(item))
Expand Down
24 changes: 15 additions & 9 deletions kube-runtime/src/watcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -200,12 +200,19 @@ pub enum ListSemantic {
}

/// Configurable watcher listwatch semantics

#[derive(Clone, Default, Debug, PartialEq)]
pub enum InitialListStrategy {
/// List first, then watch from given resouce version
///
/// This is the old and default way of watching. The watcher will do a paginated list call first before watching.
/// When using this mode, you can configure the page_size on the watcher.
#[default]
ListWatch,
/// Kubernetes 1.27 Streaming Lists
/// https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
///
/// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
/// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
StreamingList,
}

Expand Down Expand Up @@ -233,10 +240,9 @@ pub struct Config {
///
/// Configures re-list for performance vs. consistency.
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
pub list_semantic: ListSemantic,

/// Kubernetes 1.27 Streaming Lists
/// Control how the watcher fetches the initial list of objects.
///
/// ListWatch: The watcher will fetch the initial list of objects using a list call.
Expand All @@ -245,8 +251,8 @@ pub struct Config {
/// StreamingList is more efficient than ListWatch, but it requires the server to support
/// streaming list bookmarks (opt-in feature gate in Kubernetes 1.27).
///
/// See https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists
/// See https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details
/// See [upstream documentation on streaming lists](https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists),
/// and the [KEP](https://github.com/kubernetes/enhancements/tree/master/keps/sig-api-machinery/3157-watch-list#design-details).
pub initial_list_strategy: InitialListStrategy,

/// Maximum number of objects retrieved per list operation resyncs.
Expand All @@ -256,7 +262,7 @@ pub struct Config {
///
/// Defaults to 500. Note that `None` represents unbounded.
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
pub page_size: Option<u32>,

/// Enables watch events with type "BOOKMARK".
Expand Down Expand Up @@ -325,7 +331,7 @@ impl Config {

/// Sets list semantic to configure re-list performance and consistency
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
#[must_use]
pub fn list_semantic(mut self, semantic: ListSemantic) -> Self {
self.list_semantic = semantic;
Expand All @@ -334,7 +340,7 @@ impl Config {

/// Sets list semantic to `Any` to improve list performance
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
#[must_use]
pub fn any_semantic(self) -> Self {
self.list_semantic(ListSemantic::Any)
Expand All @@ -355,7 +361,7 @@ impl Config {
/// This can reduce the memory consumption during resyncs, at the cost of requiring more
/// API roundtrips to complete.
///
/// NB: This option only has an effect for [`WatcherMode::ListWatch`].
/// NB: This option only has an effect for [`InitialListStrategy::ListWatch`].
#[must_use]
pub fn page_size(mut self, page_size: u32) -> Self {
self.page_size = Some(page_size);
Expand Down

0 comments on commit 9685bc1

Please sign in to comment.