Skip to content

Commit

Permalink
feat(runtime): Add series implementation for event recorder
Browse files Browse the repository at this point in the history
Signed-off-by: Alexander Gil <[email protected]>
  • Loading branch information
pando85 committed Dec 3, 2024
1 parent 3ee4ae5 commit e09e462
Show file tree
Hide file tree
Showing 3 changed files with 186 additions and 76 deletions.
7 changes: 4 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,19 @@ form_urlencoded = "1.2.0"
futures = { version = "0.3.17", default-features = false }
hashbrown = "0.15.0"
home = "0.5.4"
hostname = "0.3"
http = "1.1.0"
http-body = "1.0.1"
http-body-util = "0.1.2"
hyper = "1.2.0"
hyper-util = "0.1.9"
hyper-openssl = "0.10.2"
hyper-rustls = { version = "0.27.1", default-features = false }
hyper-socks2 = { version = "0.9.0", default-features = false }
hyper-timeout = "0.5.1"
hyper-util = "0.1.9"
json-patch = "3"
jsonptr = "0.6"
jsonpath-rust = "0.7.3"
jsonptr = "0.6"
k8s-openapi = { version = "0.23.0", default-features = false }
openssl = "0.10.36"
parking_lot = "0.12.0"
Expand All @@ -74,8 +75,8 @@ schemars = "0.8.6"
secrecy = "0.10.2"
serde = "1.0.130"
serde_json = "1.0.68"
serde-value = "0.7.0"
serde_yaml = "0.9.19"
serde-value = "0.7.0"
syn = "2.0.38"
tame-oauth = "0.10.0"
tempfile = "3.1.0"
Expand Down
1 change: 1 addition & 0 deletions kube-runtime/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ hashbrown.workspace = true
k8s-openapi.workspace = true
async-broadcast.workspace = true
async-stream.workspace = true
hostname.workspace = true

[dev-dependencies]
kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" }
Expand Down
254 changes: 181 additions & 73 deletions kube-runtime/src/events.rs
Original file line number Diff line number Diff line change
@@ -1,13 +1,23 @@
//! Publishes events for objects for kubernetes >= 1.19
use std::hash::{Hash, Hasher};
use std::{collections::HashMap, sync::Arc};

use k8s_openapi::{
api::{core::v1::ObjectReference, events::v1::Event as K8sEvent},
api::{
core::v1::ObjectReference,
events::v1::{Event as K8sEvent, EventSeries},
},
apimachinery::pkg::apis::meta::v1::{MicroTime, ObjectMeta},
chrono::Utc,
chrono::{Duration, Utc},
};
use kube_client::ResourceExt;
use kube_client::{
api::{Api, PostParams},
api::{Api, Patch, PatchParams, PostParams},
Client,
};
use tokio::sync::RwLock;

const EVENT_FINISH_TIME: Duration = Duration::minutes(6);

/// Minimal event type for publishing through [`Recorder::publish`].
///
Expand Down Expand Up @@ -64,6 +74,34 @@ pub enum EventType {
Warning,
}

/// ObjectReference with Hash and Eq implementations

Check failure on line 77 in kube-runtime/src/events.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

item in documentation is missing backticks

error: item in documentation is missing backticks --> kube-runtime/src/events.rs:77:5 | 77 | /// ObjectReference with Hash and Eq implementations | ^^^^^^^^^^^^^^^ | = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#doc_markdown note: the lint level is defined here --> kube-runtime/src/lib.rs:12:9 | 12 | #![deny(clippy::pedantic)] | ^^^^^^^^^^^^^^^^ = note: `#[deny(clippy::doc_markdown)]` implied by `#[deny(clippy::pedantic)]` help: try | 77 | /// `ObjectReference` with Hash and Eq implementations | ~~~~~~~~~~~~~~~~~
#[derive(Clone, Debug, PartialEq)]
pub struct Reference(ObjectReference);

impl Eq for Reference {}

impl Hash for Reference {
fn hash<H: Hasher>(&self, state: &mut H) {
self.0.api_version.hash(state);
self.0.kind.hash(state);
self.0.name.hash(state);
self.0.namespace.hash(state);
self.0.uid.hash(state);
}
}

/// Isomorphic key for caching similar events
#[derive(Clone, Debug, PartialEq, Eq, Hash)]
pub struct EventKey {
pub event_type: EventType,
pub action: String,
pub reason: String,
pub reporting_controller: String,
pub reporting_instance: Option<String>,
pub regarding: Reference,
pub related: Option<Reference>,
}

/// Information about the reporting controller.
///
/// ```
Expand Down Expand Up @@ -99,7 +137,8 @@ pub struct Reporter {
///
/// in the manifest of your controller.
///
/// NB: If no `instance` is provided, then `reporting_instance == reporting_controller` in the `Event`.
/// Note: If `instance` is not provided, the hostname is used. If the hostname is also
/// unavailable, `reporting_instance` defaults to `reporting_controller` in the `Event`.
pub instance: Option<String>,
}

Expand All @@ -115,9 +154,10 @@ impl From<String> for Reporter {

impl From<&str> for Reporter {
fn from(es: &str) -> Self {
let instance = hostname::get().ok().and_then(|h| h.into_string().ok());
Self {
controller: es.into(),
instance: None,
instance,
}
}
}
Expand Down Expand Up @@ -168,13 +208,13 @@ impl From<&str> for Reporter {
/// ```yaml
/// - apiGroups: ["events.k8s.io"]
/// resources: ["events"]
/// verbs: ["create"]
/// verbs: ["create", "patch"]
/// ```
#[derive(Clone)]
pub struct Recorder {
events: Api<K8sEvent>,
client: Client,
reporter: Reporter,
reference: ObjectReference,
events_cache: Arc<RwLock<HashMap<EventKey, K8sEvent>>>,
}

impl Recorder {
Expand All @@ -184,74 +224,136 @@ impl Recorder {
///
/// Cluster scoped objects will publish events in the "default" namespace.
#[must_use]
pub fn new(client: Client, reporter: Reporter, reference: ObjectReference) -> Self {
let default_namespace = "kube-system".to_owned(); // default does not work on k8s < 1.22
let events = Api::namespaced(client, reference.namespace.as_ref().unwrap_or(&default_namespace));
pub fn new(client: Client, reporter: Reporter) -> Self {
let events_cache = Arc::new(RwLock::new(HashMap::new()));
Self {
events,
client,
reporter,
reference,
events_cache,
}
}

/// Builds unique event key based on reportingController, reportingInstance, regarding, reason
/// and note
fn get_event_key(&self, ev: &Event, regarding: &ObjectReference) -> EventKey {
EventKey {
event_type: ev.type_,
action: ev.action.clone(),
reason: ev.reason.clone(),
reporting_controller: self.reporter.controller.clone(),
reporting_instance: self.reporter.instance.clone(),
regarding: Reference(regarding.clone()),
related: ev.secondary.clone().map(Reference),
}
}

// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io
// for more detail on the fields
// and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125
fn generate_event(&self, ev: &Event, reference: &ObjectReference) -> K8sEvent {
let now = Utc::now();
K8sEvent {
action: Some(ev.action.clone()),
reason: Some(ev.reason.clone()),
deprecated_count: None,
deprecated_first_timestamp: None,
deprecated_last_timestamp: None,
deprecated_source: None,
event_time: Some(MicroTime(now)),
regarding: Some(reference.clone()),
note: ev.note.clone().map(Into::into),
metadata: ObjectMeta {
namespace: reference.namespace.clone(),
name: Some(format!(
"{}.{}",
reference.name.as_ref().unwrap_or(&self.reporter.controller),
now.timestamp()
)),
..Default::default()
},
reporting_controller: Some(self.reporter.controller.clone()),
reporting_instance: Some(
self.reporter
.instance
.clone()
.unwrap_or_else(|| self.reporter.controller.clone()),
),
series: None,
type_: match ev.type_ {
EventType::Normal => Some("Normal".into()),
EventType::Warning => Some("Warning".into()),
},
related: ev.secondary.clone(),
}
}

/// Publish a new Kubernetes' event.
///
/// # Access control
///
/// The event object is created in the same namespace of the [`ObjectReference`]
/// you specified in [`Recorder::new`].
/// The event object is created in the same namespace of the [`ObjectReference`].
/// Make sure that your controller has `create` permissions in the required namespaces
/// for the `event` resource in the API group `events.k8s.io`.
///
/// # Errors
///
/// Returns an [`Error`](`kube_client::Error`) if the event is rejected by Kubernetes.
pub async fn publish(&self, ev: Event) -> Result<(), kube_client::Error> {
// See https://kubernetes.io/docs/reference/generated/kubernetes-api/v1.22/#event-v1-events-k8s-io
// for more detail on the fields
// and what's expected: https://kubernetes.io/docs/reference/using-api/deprecation-guide/#event-v125
self.events
.create(&PostParams::default(), &K8sEvent {
action: Some(ev.action),
reason: Some(ev.reason),
deprecated_count: None,
deprecated_first_timestamp: None,
deprecated_last_timestamp: None,
deprecated_source: None,
event_time: Some(MicroTime(Utc::now())),
regarding: Some(self.reference.clone()),
note: ev.note.map(Into::into),
metadata: ObjectMeta {
namespace: self.reference.namespace.clone(),
generate_name: Some(format!("{}-", self.reporter.controller)),
..Default::default()
},
reporting_controller: Some(self.reporter.controller.clone()),
reporting_instance: Some(
self.reporter
.instance
.clone()
.unwrap_or_else(|| self.reporter.controller.clone()),
),
series: None,
type_: match ev.type_ {
EventType::Normal => Some("Normal".into()),
EventType::Warning => Some("Warning".into()),
},
related: ev.secondary,
})
.await?;
pub async fn publish(&self, ev: Event, reference: &ObjectReference) -> Result<(), kube_client::Error> {
let now = Utc::now();

let key = self.get_event_key(&ev, reference);
let event = match self.events_cache.read().await.get(&key) {
Some(e) => {
let series = match &e.series {
Some(series) => EventSeries {
count: series.count + 1,
last_observed_time: MicroTime(now),
},
None => EventSeries {
count: 2,
last_observed_time: MicroTime(now),
},
};
let mut event = e.clone();
event.series = Some(series);
event
}
None => self.generate_event(&ev, reference),
};

let events = Api::namespaced(
self.client.clone(),
reference.namespace.as_ref().unwrap_or(&"default".to_string()),
);
if event.series.is_some() {
events
.patch(&event.name_any(), &PatchParams::default(), &Patch::Merge(&event))
.await?;
} else {
events.create(&PostParams::default(), &event).await?;
};

{
let mut cache = self.events_cache.write().await;
cache.insert(key, event);
cache.retain(|_, v| {
if let Some(series) = v.series.as_ref() {
series.last_observed_time.0 + EVENT_FINISH_TIME > now
} else if let Some(event_time) = v.event_time.as_ref() {
event_time.0 + EVENT_FINISH_TIME > now
} else {
true
}
});
}
Ok(())
}
}

#[cfg(test)]
mod test {
use k8s_openapi::api::{
core::v1::{Event as K8sEvent, Service},
rbac::v1::ClusterRole,
};
use kube_client::{Api, Client, Resource};
use k8s_openapi::api::{core::v1::Service, events::v1::Event as K8sEvent, rbac::v1::ClusterRole};
use kube::{Api, Client, Resource};

use super::{Event, EventType, Recorder};

Expand All @@ -262,15 +364,18 @@ mod test {

let svcs: Api<Service> = Api::namespaced(client.clone(), "default");
let s = svcs.get("kubernetes").await?; // always a kubernetes service in default
let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&()));
let recorder = Recorder::new(client.clone(), "kube".into());
recorder
.publish(Event {
type_: EventType::Normal,
reason: "VeryCoolService".into(),
note: Some("Sending kubernetes to detention".into()),
action: "Test event - plz ignore".into(),
secondary: None,
})
.publish(
Event {
type_: EventType::Normal,
reason: "VeryCoolService".into(),
note: Some("Sending kubernetes to detention".into()),
action: "Test event - plz ignore".into(),
secondary: None,
},
&s.object_ref(&()),
)
.await?;
let events: Api<K8sEvent> = Api::namespaced(client, "default");

Expand All @@ -279,7 +384,7 @@ mod test {
.into_iter()
.find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolService")))
.unwrap();
assert_eq!(found_event.message.unwrap(), "Sending kubernetes to detention");
assert_eq!(found_event.note.unwrap(), "Sending kubernetes to detention");

Ok(())
}
Expand All @@ -291,15 +396,18 @@ mod test {

let svcs: Api<ClusterRole> = Api::all(client.clone());
let s = svcs.get("system:basic-user").await?; // always get this default ClusterRole
let recorder = Recorder::new(client.clone(), "kube".into(), s.object_ref(&()));
let recorder = Recorder::new(client.clone(), "kube".into());
recorder
.publish(Event {
type_: EventType::Normal,
reason: "VeryCoolServiceNoNamespace".into(),
note: Some("Sending kubernetes to detention without namespace".into()),
action: "Test event - plz ignore".into(),
secondary: None,
})
.publish(
Event {
type_: EventType::Normal,
reason: "VeryCoolServiceNoNamespace".into(),
note: Some("Sending kubernetes to detention without namespace".into()),
action: "Test event - plz ignore".into(),
secondary: None,
},
&s.object_ref(&()),
)
.await?;
let events: Api<K8sEvent> = Api::namespaced(client, "kube-system");

Expand All @@ -309,7 +417,7 @@ mod test {
.find(|e| std::matches!(e.reason.as_deref(), Some("VeryCoolServiceNoNamespace")))
.unwrap();
assert_eq!(
found_event.message.unwrap(),
found_event.note.unwrap(),
"Sending kubernetes to detention without namespace"
);

Expand Down

0 comments on commit e09e462

Please sign in to comment.