Skip to content

Commit

Permalink
fix: GC executed at the beginning and add test
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Gil <[email protected]>
  • Loading branch information
pando85 committed Dec 6, 2024
1 parent 369264d commit 70a69af
Showing 1 changed file with 102 additions and 15 deletions.
117 changes: 102 additions & 15 deletions kube-runtime/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -307,6 +307,17 @@ impl Recorder {
pub async fn publish(&self, ev: Event, reference: &ObjectReference) -> Result<(), kube_client::Error> {
let now = Utc::now();

// gc past events older than now + CACHE_TTL
self.cache.write().await.retain(|_, v| {
if let Some(series) = v.series.as_ref() {
series.last_observed_time.0 + CACHE_TTL > now
} else if let Some(event_time) = v.event_time.as_ref() {
event_time.0 + CACHE_TTL > now
} else {
true
}
});

let key = self.get_event_key(&ev, reference);
let event = match self.cache.read().await.get(&key) {
Some(e) => {
Expand Down Expand Up @@ -337,31 +348,28 @@ impl Recorder {
{
let mut cache = self.cache.write().await;
cache.insert(key, event);

// gc past events older than now + CACHE_TTL
cache.retain(|_, v| {
if let Some(series) = v.series.as_ref() {
series.last_observed_time.0 + CACHE_TTL > now
} else if let Some(event_time) = v.event_time.as_ref() {
event_time.0 + CACHE_TTL > now
} else {
true
}
});
}
Ok(())
}
}

#[cfg(test)]
mod test {
use k8s_openapi::api::{
core::v1::{ComponentStatus, Service},
events::v1::Event as K8sEvent,
use std::{collections::HashMap, sync::Arc};

use k8s_openapi::{
api::{
core::v1::{ComponentStatus, Service},
events::v1::Event as K8sEvent,
},
apimachinery::pkg::apis::meta::v1::MicroTime,
chrono::{Duration, Utc},
};
use kube::{Api, Client, Resource};
use kube_client::api::ObjectMeta;
use tokio::sync::RwLock;

use super::{Event, EventType, Recorder};
use super::{Event, EventKey, EventType, Recorder, Reference, Reporter};

#[tokio::test]
#[ignore = "needs cluster (creates an event for the default kubernetes service)"]
Expand Down Expand Up @@ -468,4 +476,83 @@ mod test {
assert!(found_event.series.is_some());
Ok(())
}

#[tokio::test]
#[ignore = "needs cluster (creates an event for the default kubernetes service)"]
async fn event_recorder_cache_retain() -> Result<(), Box<dyn std::error::Error>> {
let client = Client::try_default().await?;

let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
let s = svcs.get("kubernetes").await?; // always a kubernetes service in default

let reference = s.object_ref(&());
let reporter: Reporter = "kube".into();
let ev = Event {
type_: EventType::Normal,
reason: "TestCacheTtl".into(),
note: Some("Sending kubernetes to detention".into()),
action: "Test event - plz ignore".into(),
secondary: None,
};
let key = EventKey {
event_type: ev.type_,
action: ev.action.clone(),
reason: ev.reason.clone(),
reporting_controller: reporter.controller.clone(),
regarding: Reference(reference.clone()),
reporting_instance: None,
related: None,
};

let now = Utc::now();
let past = now - Duration::minutes(10);
let event = K8sEvent {
action: Some(ev.action.clone()),
reason: Some(ev.reason.clone()),
event_time: Some(MicroTime(past)),
regarding: Some(reference.clone()),
note: ev.note.clone().map(Into::into),
metadata: ObjectMeta {
namespace: reference.namespace.clone(),
name: Some(format!(
"{}.{:x}",
reference.name.as_ref().unwrap_or(&reporter.controller),
past.timestamp_nanos_opt().unwrap_or_else(|| past.timestamp())
)),
..Default::default()
},
reporting_controller: Some(reporter.controller.clone()),
reporting_instance: Some(
reporter
.instance
.clone()
.unwrap_or_else(|| reporter.controller.clone()),
),
type_: Some("Normal".into()),
..Default::default()
};

let cache = Arc::new(RwLock::new(HashMap::new()));

cache.write().await.insert(key.clone(), event.clone());

let recorder = Recorder {
client: client.clone(),
reporter: reporter.controller.into(),
cache,
};

recorder.publish(ev, &s.object_ref(&())).await?;
let events: Api<K8sEvent> = Api::namespaced(client, "default");

let event_list = events.list(&Default::default()).await?;
let found_event = event_list
.into_iter()
.find(|e| std::matches!(e.reason.as_deref(), Some("TestCacheTtl")))
.unwrap();
assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");
assert!(found_event.series.is_none());

Ok(())
}
}

0 comments on commit 70a69af

Please sign in to comment.