Skip to content

Commit

Permalink
fix: Minor suggestion from first PR review
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Gil <[email protected]>
  • Loading branch information
pando85 committed Dec 5, 2024
1 parent e09e462 commit bebbb10
Showing 1 changed file with 16 additions and 19 deletions.
35 changes: 16 additions & 19 deletions kube-runtime/src/events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ use kube_client::{
};
use tokio::sync::RwLock;

const EVENT_FINISH_TIME: Duration = Duration::minutes(6);
const CACHE_TTL: Duration = Duration::minutes(6);

/// Minimal event type for publishing through [`Recorder::publish`].
///
Expand Down Expand Up @@ -92,7 +92,7 @@ impl Hash for Reference {

/// Isomorphic key for caching similar events
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct EventKey {
struct EventKey {
pub event_type: EventType,
pub action: String,
pub reason: String,
Expand Down Expand Up @@ -214,7 +214,7 @@ impl From<&str> for Reporter {
pub struct Recorder {
client: Client,
reporter: Reporter,
events_cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>,
cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>,
}

impl Recorder {
Expand All @@ -225,11 +225,11 @@ impl Recorder {
/// Cluster scoped objects will publish events in the "default" namespace.
#[must_use]
pub fn new(client: Client, reporter: Reporter) -> Self {
let events_cache = Arc::new(RwLock::new(HashMap::new()));
let cache = Arc::default();
Self {
client,
reporter,
events_cache,
cache,
}
}

Expand Down Expand Up @@ -267,7 +267,7 @@ impl Recorder {
name: Some(format!(
"{}.{}",
reference.name.as_ref().unwrap_or(&self.reporter.controller),
now.timestamp()
now.timestamp_nanos_opt().unwrap_or_else(|| now.timestamp())
)),
..Default::default()
},
Expand Down Expand Up @@ -302,17 +302,12 @@ impl Recorder {
let now = Utc::now();

let key = self.get_event_key(&ev, reference);
let event = match self.events_cache.read().await.get(&key) {
let event = match self.cache.read().await.get(&key) {
Some(e) => {
let series = match &e.series {
Some(series) => EventSeries {
count: series.count + 1,
last_observed_time: MicroTime(now),
},
None => EventSeries {
count: 2,
last_observed_time: MicroTime(now),
},
let count = if let Some(s) = &e.series { s.count + 1 } else { 2 };
let series = EventSeries {
count,
last_observed_time: MicroTime(now),
};
let mut event = e.clone();
event.series = Some(series);
Expand All @@ -334,13 +329,15 @@ impl Recorder {
};

{
let mut cache = self.events_cache.write().await;
let mut cache = self.cache.write().await;
cache.insert(key, event);

// gc past events older than now + CACHE_TTL
cache.retain(|_, v| {
if let Some(series) = v.series.as_ref() {
series.last_observed_time.0 + EVENT_FINISH_TIME > now
series.last_observed_time.0 + CACHE_TTL > now
} else if let Some(event_time) = v.event_time.as_ref() {
event_time.0 + EVENT_FINISH_TIME > now
event_time.0 + CACHE_TTL > now
} else {
true
}
Expand Down

0 comments on commit bebbb10

Please sign in to comment.