From 28fb7734a288983f7d114d11157900b9ef42050e Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Sun, 1 Dec 2024 14:17:43 +0100 Subject: [PATCH 1/9] feat(runtime): Add series implementation for event recorder Signed-off-by: Alexander Gil --- Cargo.toml | 7 +- kube-runtime/Cargo.toml | 1 + kube-runtime/src/events.rs | 254 ++++++++++++++++++++++++++----------- 3 files changed, 186 insertions(+), 76 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 14fc4e283..f82c7afdf 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -48,18 +48,19 @@ form_urlencoded = "1.2.0" futures = { version = "0.3.17", default-features = false } hashbrown = "0.15.0" home = "0.5.4" +hostname = "0.3" http = "1.1.0" http-body = "1.0.1" http-body-util = "0.1.2" hyper = "1.2.0" -hyper-util = "0.1.9" hyper-openssl = "0.10.2" hyper-rustls = { version = "0.27.1", default-features = false } hyper-socks2 = { version = "0.9.0", default-features = false } hyper-timeout = "0.5.1" +hyper-util = "0.1.9" json-patch = "3" -jsonptr = "0.6" jsonpath-rust = "0.7.3" +jsonptr = "0.6" k8s-openapi = { version = "0.23.0", default-features = false } openssl = "0.10.36" parking_lot = "0.12.0" @@ -74,8 +75,8 @@ schemars = "0.8.6" secrecy = "0.10.2" serde = "1.0.130" serde_json = "1.0.68" -serde-value = "0.7.0" serde_yaml = "0.9.19" +serde-value = "0.7.0" syn = "2.0.38" tame-oauth = "0.10.0" tempfile = "3.1.0" diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index a89492384..9006bd819 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -49,6 +49,7 @@ hashbrown.workspace = true k8s-openapi.workspace = true async-broadcast.workspace = true async-stream.workspace = true +hostname.workspace = true [dev-dependencies] kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" } diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index 5e0d9dd6f..f8473cab6 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -1,13 +1,23 @@ //! Publishes events for objects for kubernetes >= 1.19 +use std::hash::{Hash, Hasher}; +use std::{collections::HashMap, sync::Arc}; + use k8s_openapi::{ - api::{core::v1::ObjectReference, events::v1::Event as K8sEvent}, + api::{ + core::v1::ObjectReference, + events::v1::{Event as K8sEvent, EventSeries}, + }, apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta}, - chrono::Utc, + chrono::{Duration, Utc}, }; +use kube_client::ResourceExt; use kube_client::{ - api::{Api, PostParams}, + api::{Api, Patch, PatchParams, PostParams}, Client, }; +use tokio::sync::RwLock; + +const EVENT_FINISH_TIME: Duration = Duration::minutes(6); /// Minimal event type for publishing through [`Recorder::publish`]. /// @@ -64,6 +74,34 @@ pub enum EventType { Warning, } +/// ObjectReference with Hash and Eq implementations +#[derive(Clone, Debug, PartialEq)] +pub struct Reference(ObjectReference); + +impl Eq for Reference {} + +impl Hash for Reference { + fn hash(&self, state: &mut H) { + self.0.api_version.hash(state); + self.0.kind.hash(state); + self.0.name.hash(state); + self.0.namespace.hash(state); + self.0.uid.hash(state); + } +} + +/// Isomorphic key for caching similar events +#[derive(Clone, Debug, PartialEq, Eq, Hash)] +pub struct EventKey { + pub event_type: EventType, + pub action: String, + pub reason: String, + pub reporting_controller: String, + pub reporting_instance: Option, + pub regarding: Reference, + pub related: Option, +} + /// Information about the reporting controller. /// /// ``` @@ -99,7 +137,8 @@ pub struct Reporter { /// /// in the manifest of your controller. /// - /// NB: If no `instance` is provided, then `reporting_instance == reporting_controller` in the `Event`. + /// Note: If `instance` is not provided, the hostname is used. If the hostname is also + /// unavailable, `reporting_instance` defaults to `reporting_controller` in the `Event`. pub instance: Option, } @@ -115,9 +154,10 @@ impl From for Reporter { impl From<&str> for Reporter { fn from(es: &str) -> Self { + let instance = hostname::get().ok().and_then(|h| h.into_string().ok()); Self { controller: es.into(), - instance: None, + instance, } } } @@ -168,13 +208,13 @@ impl From<&str> for Reporter { /// ```yaml /// - apiGroups: ["events.k8s.io"] /// resources: ["events"] -/// verbs: ["create"] +/// verbs: ["create", "patch"] /// ``` #[derive(Clone)] pub struct Recorder { - events: Api, + client: Client, reporter: Reporter, - reference: ObjectReference, + events_cache: Arc>>, } impl Recorder { @@ -184,13 +224,66 @@ impl Recorder { /// /// Cluster scoped objects will publish events in the "default" namespace. #[must_use] - pub fn new(client: Client, reporter: Reporter, reference: ObjectReference) -> Self { - let default_namespace = "kube-system".to_owned(); // default does not work on k8s < 1.22 - let events = Api::namespaced(client, reference.namespace.as_ref().unwrap_or(&default_namespace)); + pub fn new(client: Client, reporter: Reporter) -> Self { + let events_cache = Arc::new(RwLock::new(HashMap::new())); Self { - events, + client, reporter, - reference, + events_cache, + } + } + + /// Builds unique event key based on reportingController, reportingInstance, regarding, reason + /// and note + fn get_event_key(&self, ev: &Event, regarding: &ObjectReference) -> EventKey { + EventKey { + event_type: ev.type_, + action: ev.action.clone(), + reason: ev.reason.clone(), + reporting_controller: self.reporter.controller.clone(), + reporting_instance: self.reporter.instance.clone(), + regarding: Reference(regarding.clone()), + related: ev.secondary.clone().map(Reference), + } + } + + // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io + // for more detail on the fields + // and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125 + fn generate_event(&self, ev: &Event, reference: &ObjectReference) -> K8sEvent { + let now = Utc::now(); + K8sEvent { + action: Some(ev.action.clone()), + reason: Some(ev.reason.clone()), + deprecated_count: None, + deprecated_first_timestamp: None, + deprecated_last_timestamp: None, + deprecated_source: None, + event_time: Some(MicroTime(now)), + regarding: Some(reference.clone()), + note: ev.note.clone().map(Into::into), + metadata: ObjectMeta { + namespace: reference.namespace.clone(), + name: Some(format!( + "{}.{}", + reference.name.as_ref().unwrap_or(&self.reporter.controller), + now.timestamp() + )), + ..Default::default() + }, + reporting_controller: Some(self.reporter.controller.clone()), + reporting_instance: Some( + self.reporter + .instance + .clone() + .unwrap_or_else(|| self.reporter.controller.clone()), + ), + series: None, + type_: match ev.type_ { + EventType::Normal => Some("Normal".into()), + EventType::Warning => Some("Warning".into()), + }, + related: ev.secondary.clone(), } } @@ -198,60 +291,69 @@ impl Recorder { /// /// # Access control /// - /// The event object is created in the same namespace of the [`ObjectReference`] - /// you specified in [`Recorder::new`]. + /// The event object is created in the same namespace of the [`ObjectReference`]. /// Make sure that your controller has `create` permissions in the required namespaces /// for the `event` resource in the API group `events.k8s.io`. /// /// # Errors /// /// Returns an [`Error`](`kube_client::Error`) if the event is rejected by Kubernetes. - pub async fn publish(&self, ev: Event) -> Result<(), kube_client::Error> { - // See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io - // for more detail on the fields - // and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125 - self.events - .create(&PostParams::default(), &K8sEvent { - action: Some(ev.action), - reason: Some(ev.reason), - deprecated_count: None, - deprecated_first_timestamp: None, - deprecated_last_timestamp: None, - deprecated_source: None, - event_time: Some(MicroTime(Utc::now())), - regarding: Some(self.reference.clone()), - note: ev.note.map(Into::into), - metadata: ObjectMeta { - namespace: self.reference.namespace.clone(), - generate_name: Some(format!("{}-", self.reporter.controller)), - ..Default::default() - }, - reporting_controller: Some(self.reporter.controller.clone()), - reporting_instance: Some( - self.reporter - .instance - .clone() - .unwrap_or_else(|| self.reporter.controller.clone()), - ), - series: None, - type_: match ev.type_ { - EventType::Normal => Some("Normal".into()), - EventType::Warning => Some("Warning".into()), - }, - related: ev.secondary, - }) - .await?; + pub async fn publish(&self, ev: Event, reference: &ObjectReference) -> Result<(), kube_client::Error> { + let now = Utc::now(); + + let key = self.get_event_key(&ev, reference); + let event = match self.events_cache.read().await.get(&key) { + Some(e) => { + let series = match &e.series { + Some(series) => EventSeries { + count: series.count + 1, + last_observed_time: MicroTime(now), + }, + None => EventSeries { + count: 2, + last_observed_time: MicroTime(now), + }, + }; + let mut event = e.clone(); + event.series = Some(series); + event + } + None => self.generate_event(&ev, reference), + }; + + let events = Api::namespaced( + self.client.clone(), + reference.namespace.as_ref().unwrap_or(&"default".to_string()), + ); + if event.series.is_some() { + events + .patch(&event.name_any(), &PatchParams::default(), &Patch::Merge(&event)) + .await?; + } else { + events.create(&PostParams::default(), &event).await?; + }; + + { + let mut cache = self.events_cache.write().await; + cache.insert(key, event); + cache.retain(|_, v| { + if let Some(series) = v.series.as_ref() { + series.last_observed_time.0 + EVENT_FINISH_TIME > now + } else if let Some(event_time) = v.event_time.as_ref() { + event_time.0 + EVENT_FINISH_TIME > now + } else { + true + } + }); + } Ok(()) } } #[cfg(test)] mod test { - use k8s_openapi::api::{ - core::v1::{Event as K8sEvent, Service}, - rbac::v1::ClusterRole, - }; - use kube_client::{Api, Client, Resource}; + use k8s_openapi::api::{core::v1::Service, events::v1::Event as K8sEvent, rbac::v1::ClusterRole}; + use kube::{Api, Client, Resource}; use super::{Event, EventType, Recorder}; @@ -262,15 +364,18 @@ mod test { let svcs: Api = Api::namespaced(client.clone(), "default"); let s = svcs.get("kubernetes").await?; // always a kubernetes service in default - let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&())); + let recorder = Recorder::new(client.clone(), "kube".into()); recorder - .publish(Event { - type_: EventType::Normal, - reason: "VeryCoolService".into(), - note: Some("Sending kubernetes to detention".into()), - action: "Test event - plz ignore".into(), - secondary: None, - }) + .publish( + Event { + type_: EventType::Normal, + reason: "VeryCoolService".into(), + note: Some("Sending kubernetes to detention".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }, + &s.object_ref(&()), + ) .await?; let events: Api = Api::namespaced(client, "default"); @@ -279,7 +384,7 @@ mod test { .into_iter() .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService"))) .unwrap(); - assert_eq!(found_event.message.unwrap(), "Sending kubernetes to detention"); + assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention"); Ok(()) } @@ -291,15 +396,18 @@ mod test { let svcs: Api = Api::all(client.clone()); let s = svcs.get("system:basic-user").await?; // always get this default ClusterRole - let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&())); + let recorder = Recorder::new(client.clone(), "kube".into()); recorder - .publish(Event { - type_: EventType::Normal, - reason: "VeryCoolServiceNoNamespace".into(), - note: Some("Sending kubernetes to detention without namespace".into()), - action: "Test event - plz ignore".into(), - secondary: None, - }) + .publish( + Event { + type_: EventType::Normal, + reason: "VeryCoolServiceNoNamespace".into(), + note: Some("Sending kubernetes to detention without namespace".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }, + &s.object_ref(&()), + ) .await?; let events: Api = Api::namespaced(client, "kube-system"); @@ -309,7 +417,7 @@ mod test { .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace"))) .unwrap(); assert_eq!( - found_event.message.unwrap(), + found_event.note.unwrap(), "Sending kubernetes to detention without namespace" ); From 1f4ad64524acc2df732fadd10d92fde6da1fc917 Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Thu, 5 Dec 2024 09:08:51 +0100 Subject: [PATCH 2/9] fix: Minor suggestion from first PR review Signed-off-by: Alexander Gil --- kube-runtime/src/events.rs | 35 ++++++++++++++++------------------- 1 file changed, 16 insertions(+), 19 deletions(-) diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index f8473cab6..9c5faefb5 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -17,7 +17,7 @@ use kube_client::{ }; use tokio::sync::RwLock; -const EVENT_FINISH_TIME: Duration = Duration::minutes(6); +const CACHE_TTL: Duration = Duration::minutes(6); /// Minimal event type for publishing through [`Recorder::publish`]. /// @@ -92,7 +92,7 @@ impl Hash for Reference { /// Isomorphic key for caching similar events #[derive(Clone, Debug, PartialEq, Eq, Hash)] -pub struct EventKey { +struct EventKey { pub event_type: EventType, pub action: String, pub reason: String, @@ -214,7 +214,7 @@ impl From<&str> for Reporter { pub struct Recorder { client: Client, reporter: Reporter, - events_cache: Arc>>, + cache: Arc>>, } impl Recorder { @@ -225,11 +225,11 @@ impl Recorder { /// Cluster scoped objects will publish events in the "default" namespace. #[must_use] pub fn new(client: Client, reporter: Reporter) -> Self { - let events_cache = Arc::new(RwLock::new(HashMap::new())); + let cache = Arc::default(); Self { client, reporter, - events_cache, + cache, } } @@ -267,7 +267,7 @@ impl Recorder { name: Some(format!( "{}.{}", reference.name.as_ref().unwrap_or(&self.reporter.controller), - now.timestamp() + now.timestamp_nanos_opt().unwrap_or_else(|| now.timestamp()) )), ..Default::default() }, @@ -302,17 +302,12 @@ impl Recorder { let now = Utc::now(); let key = self.get_event_key(&ev, reference); - let event = match self.events_cache.read().await.get(&key) { + let event = match self.cache.read().await.get(&key) { Some(e) => { - let series = match &e.series { - Some(series) => EventSeries { - count: series.count + 1, - last_observed_time: MicroTime(now), - }, - None => EventSeries { - count: 2, - last_observed_time: MicroTime(now), - }, + let count = if let Some(s) = &e.series { s.count + 1 } else { 2 }; + let series = EventSeries { + count, + last_observed_time: MicroTime(now), }; let mut event = e.clone(); event.series = Some(series); @@ -334,13 +329,15 @@ impl Recorder { }; { - let mut cache = self.events_cache.write().await; + let mut cache = self.cache.write().await; cache.insert(key, event); + + // gc past events older than now + CACHE_TTL cache.retain(|_, v| { if let Some(series) = v.series.as_ref() { - series.last_observed_time.0 + EVENT_FINISH_TIME > now + series.last_observed_time.0 + CACHE_TTL > now } else if let Some(event_time) = v.event_time.as_ref() { - event_time.0 + EVENT_FINISH_TIME > now + event_time.0 + CACHE_TTL > now } else { true } From a80a0dd728079e9dc07f0592bc3ec4f0fbf69511 Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Thu, 5 Dec 2024 17:31:16 +0100 Subject: [PATCH 3/9] feat: Print `timestamp_nanos` in hexadecimal Following the same pattern than the go-client. Signed-off-by: Alexander Gil --- kube-runtime/src/events.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index 9c5faefb5..e07f2ce34 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -74,7 +74,9 @@ pub enum EventType { Warning, } -/// ObjectReference with Hash and Eq implementations +/// [`ObjectReference`] with Hash and Eq implementations +/// +/// [`ObjectReference`]: k8s_openapi::api::core::v1::ObjectReference #[derive(Clone, Debug, PartialEq)] pub struct Reference(ObjectReference); @@ -265,7 +267,7 @@ impl Recorder { metadata: ObjectMeta { namespace: reference.namespace.clone(), name: Some(format!( - "{}.{}", + "{}.{:x}", reference.name.as_ref().unwrap_or(&self.reporter.controller), now.timestamp_nanos_opt().unwrap_or_else(|| now.timestamp()) )), From 490cfd81b31fd7a6c9111cc57227f2ae1b53b426 Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Thu, 5 Dec 2024 18:09:55 +0100 Subject: [PATCH 4/9] fix: Test in events without namespace Signed-off-by: Alexander Gil --- kube-runtime/src/events.rs | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index e07f2ce34..3f663ff0b 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -351,7 +351,10 @@ impl Recorder { #[cfg(test)] mod test { - use k8s_openapi::api::{core::v1::Service, events::v1::Event as K8sEvent, rbac::v1::ClusterRole}; + use k8s_openapi::api::{ + core::v1::{ComponentStatus, Service}, + events::v1::Event as K8sEvent, + }; use kube::{Api, Client, Resource}; use super::{Event, EventType, Recorder}; @@ -393,8 +396,8 @@ mod test { async fn event_recorder_attaches_events_without_namespace() -> Result<(), Box> { let client = Client::try_default().await?; - let svcs: Api = Api::all(client.clone()); - let s = svcs.get("system:basic-user").await?; // always get this default ClusterRole + let component_status_api: Api = Api::all(client.clone()); + let s = component_status_api.get("scheduler").await?; let recorder = Recorder::new(client.clone(), "kube".into()); recorder .publish( @@ -408,7 +411,7 @@ mod test { &s.object_ref(&()), ) .await?; - let events: Api = Api::namespaced(client, "kube-system"); + let events: Api = Api::namespaced(client, "default"); let event_list = events.list(&Default::default()).await?; let found_event = event_list From 69c61bf8b1a7fc28ed05a303dfb4bea160ab5843 Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Fri, 6 Dec 2024 09:19:34 +0100 Subject: [PATCH 5/9] fix: Doc test for `Recorder` Signed-off-by: Alexander Gil --- kube-runtime/src/events.rs | 22 +++++++++++++--------- 1 file changed, 13 insertions(+), 9 deletions(-) diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index 3f663ff0b..94ae341ca 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -180,6 +180,8 @@ impl From<&str> for Reporter { /// instance: std::env::var("CONTROLLER_POD_NAME").ok(), /// }; /// +/// let recorder = Recorder::new(client, reporter); +/// /// // references can be made manually using `ObjectMeta` and `ApiResource`/`Resource` info /// let reference = ObjectReference { /// // [...] @@ -187,15 +189,17 @@ impl From<&str> for Reporter { /// }; /// // or for k8s-openapi / kube-derive types, use Resource::object_ref: /// // let reference = myobject.object_ref(); -/// -/// let recorder = Recorder::new(client, reporter, reference); -/// recorder.publish(Event { -/// action: "Scheduling".into(), -/// reason: "Pulling".into(), -/// note: Some("Pulling image `nginx`".into()), -/// type_: EventType::Normal, -/// secondary: None, -/// }).await?; +/// recorder +/// .publish( +/// Event { +/// action: "Scheduling".into(), +/// reason: "Pulling".into(), +/// note: Some("Pulling image `nginx`".into()), +/// type_: EventType::Normal, +/// secondary: None, +/// }, +/// &reference, +/// ).await?; /// # Ok(()) /// # } /// ``` From 4b927227e71b72680fd2f724d305701bcf87c759 Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Fri, 6 Dec 2024 09:34:25 +0100 Subject: [PATCH 6/9] test: Extend tests for aggregating isomorphic events Signed-off-by: Alexander Gil --- kube-runtime/src/events.rs | 39 ++++++++++++++++++++++++++++++++++++++ 1 file changed, 39 insertions(+) diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index 94ae341ca..7895283dc 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -392,6 +392,26 @@ mod test { .unwrap(); assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention"); + recorder + .publish( + Event { + type_: EventType::Normal, + reason: "VeryCoolService".into(), + note: Some("Sending kubernetes to detention twice".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }, + &s.object_ref(&()), + ) + .await?; + + let event_list = events.list(&Default::default()).await?; + let found_event = event_list + .into_iter() + .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService"))) + .unwrap(); + assert!(found_event.series.is_some()); + Ok(()) } @@ -427,6 +447,25 @@ mod test { "Sending kubernetes to detention without namespace" ); + recorder + .publish( + Event { + type_: EventType::Normal, + reason: "VeryCoolServiceNoNamespace".into(), + note: Some("Sending kubernetes to detention without namespace twice".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }, + &s.object_ref(&()), + ) + .await?; + + let event_list = events.list(&Default::default()).await?; + let found_event = event_list + .into_iter() + .find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace"))) + .unwrap(); + assert!(found_event.series.is_some()); Ok(()) } } From 38ed567c99073518182940c8ee06c729ad8e7496 Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Fri, 6 Dec 2024 10:25:34 +0100 Subject: [PATCH 7/9] fix: GC executed at the beginning and add test Signed-off-by: Alexander Gil --- kube-runtime/src/events.rs | 117 ++++++++++++++++++++++++++++++++----- 1 file changed, 102 insertions(+), 15 deletions(-) diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index 7895283dc..9c4b68987 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -307,6 +307,17 @@ impl Recorder { pub async fn publish(&self, ev: Event, reference: &ObjectReference) -> Result<(), kube_client::Error> { let now = Utc::now(); + // gc past events older than now + CACHE_TTL + self.cache.write().await.retain(|_, v| { + if let Some(series) = v.series.as_ref() { + series.last_observed_time.0 + CACHE_TTL > now + } else if let Some(event_time) = v.event_time.as_ref() { + event_time.0 + CACHE_TTL > now + } else { + true + } + }); + let key = self.get_event_key(&ev, reference); let event = match self.cache.read().await.get(&key) { Some(e) => { @@ -337,17 +348,6 @@ impl Recorder { { let mut cache = self.cache.write().await; cache.insert(key, event); - - // gc past events older than now + CACHE_TTL - cache.retain(|_, v| { - if let Some(series) = v.series.as_ref() { - series.last_observed_time.0 + CACHE_TTL > now - } else if let Some(event_time) = v.event_time.as_ref() { - event_time.0 + CACHE_TTL > now - } else { - true - } - }); } Ok(()) } @@ -355,13 +355,21 @@ impl Recorder { #[cfg(test)] mod test { - use k8s_openapi::api::{ - core::v1::{ComponentStatus, Service}, - events::v1::Event as K8sEvent, + use std::{collections::HashMap, sync::Arc}; + + use k8s_openapi::{ + api::{ + core::v1::{ComponentStatus, Service}, + events::v1::Event as K8sEvent, + }, + apimachinery::pkg::apis::meta::v1::MicroTime, + chrono::{Duration, Utc}, }; use kube::{Api, Client, Resource}; + use kube_client::api::ObjectMeta; + use tokio::sync::RwLock; - use super::{Event, EventType, Recorder}; + use super::{Event, EventKey, EventType, Recorder, Reference, Reporter}; #[tokio::test] #[ignore = "needs cluster (creates an event for the default kubernetes service)"] @@ -468,4 +476,83 @@ mod test { assert!(found_event.series.is_some()); Ok(()) } + + #[tokio::test] + #[ignore = "needs cluster (creates an event for the default kubernetes service)"] + async fn event_recorder_cache_retain() -> Result<(), Box> { + let client = Client::try_default().await?; + + let svcs: Api = Api::namespaced(client.clone(), "default"); + let s = svcs.get("kubernetes").await?; // always a kubernetes service in default + + let reference = s.object_ref(&()); + let reporter: Reporter = "kube".into(); + let ev = Event { + type_: EventType::Normal, + reason: "TestCacheTtl".into(), + note: Some("Sending kubernetes to detention".into()), + action: "Test event - plz ignore".into(), + secondary: None, + }; + let key = EventKey { + event_type: ev.type_, + action: ev.action.clone(), + reason: ev.reason.clone(), + reporting_controller: reporter.controller.clone(), + regarding: Reference(reference.clone()), + reporting_instance: None, + related: None, + }; + + let now = Utc::now(); + let past = now - Duration::minutes(10); + let event = K8sEvent { + action: Some(ev.action.clone()), + reason: Some(ev.reason.clone()), + event_time: Some(MicroTime(past)), + regarding: Some(reference.clone()), + note: ev.note.clone().map(Into::into), + metadata: ObjectMeta { + namespace: reference.namespace.clone(), + name: Some(format!( + "{}.{:x}", + reference.name.as_ref().unwrap_or(&reporter.controller), + past.timestamp_nanos_opt().unwrap_or_else(|| past.timestamp()) + )), + ..Default::default() + }, + reporting_controller: Some(reporter.controller.clone()), + reporting_instance: Some( + reporter + .instance + .clone() + .unwrap_or_else(|| reporter.controller.clone()), + ), + type_: Some("Normal".into()), + ..Default::default() + }; + + let cache = Arc::new(RwLock::new(HashMap::new())); + + cache.write().await.insert(key.clone(), event.clone()); + + let recorder = Recorder { + client: client.clone(), + reporter: reporter.controller.into(), + cache, + }; + + recorder.publish(ev, &s.object_ref(&())).await?; + let events: Api = Api::namespaced(client, "default"); + + let event_list = events.list(&Default::default()).await?; + let found_event = event_list + .into_iter() + .find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl"))) + .unwrap(); + assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention"); + assert!(found_event.series.is_none()); + + Ok(()) + } } From dd650b21d9b59042683f441584a50ffd5da97b64 Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Fri, 6 Dec 2024 12:52:58 +0100 Subject: [PATCH 8/9] fix: Change import definitions according to rustfmt +nightly Signed-off-by: Alexander Gil --- kube-runtime/src/events.rs | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index 9c4b68987..204aa0c4d 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -1,6 +1,9 @@ //! Publishes events for objects for kubernetes >= 1.19 -use std::hash::{Hash, Hasher}; -use std::{collections::HashMap, sync::Arc}; +use std::{ + collections::HashMap, + hash::{Hash, Hasher}, + sync::Arc, +}; use k8s_openapi::{ api::{ @@ -10,10 +13,9 @@ use k8s_openapi::{ apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta}, chrono::{Duration, Utc}, }; -use kube_client::ResourceExt; use kube_client::{ api::{Api, Patch, PatchParams, PostParams}, - Client, + Client, ResourceExt, }; use tokio::sync::RwLock; From 9e3032367661c345dc2c755b4703a7fa993fc18d Mon Sep 17 00:00:00 2001 From: Alexander Gil Date: Tue, 10 Dec 2024 13:11:30 +0100 Subject: [PATCH 9/9] docs: Change event key description removing Isomorphic word Signed-off-by: Alexander Gil --- kube-runtime/src/events.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/kube-runtime/src/events.rs b/kube-runtime/src/events.rs index 204aa0c4d..5fc1eeb69 100644 --- a/kube-runtime/src/events.rs +++ b/kube-runtime/src/events.rs @@ -94,7 +94,7 @@ impl Hash for Reference { } } -/// Isomorphic key for caching similar events +/// Cache key for event deduplication #[derive(Clone, Debug, PartialEq, Eq, Hash)] struct EventKey { pub event_type: EventType,