Skip to content

Commit

Permalink
Added metrics server for daemon (#210)
Browse files Browse the repository at this point in the history
* feat: added metrics server for daemon

* docs: updated config examples

* test: updated deamon manifest

* feat: updated daemon bootstrap and added pod monitor

* chore: updated daemon bootstrap
  • Loading branch information
paulobressan authored Dec 6, 2024
1 parent 313ebb5 commit 481704f
Show file tree
Hide file tree
Showing 8 changed files with 86 additions and 11 deletions.
1 change: 1 addition & 0 deletions bootstrap/daemon/config.tf
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ resource "kubernetes_config_map_v1" "fabric_daemon_config" {
prometheus_delay_sec = var.prometheus_delay_sec
prometheus_query_step = var.prometheus_query_step
mode = var.mode
metrics_port = local.metrics_port
}
)}"
}
Expand Down
3 changes: 3 additions & 0 deletions bootstrap/daemon/daemon.toml.tftpl
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ cluster_id = "${cluster_id}"
delay_sec = ${prometheus_delay_sec}
mode = "${mode}"

[metrics]
addr = "0.0.0.0:${metrics_port}"

[prometheus]
url = "${prometheus_url}"
query_step = "${prometheus_query_step}"
Expand Down
1 change: 1 addition & 0 deletions bootstrap/daemon/main.tf
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
locals {
configmap_name = "fabric-daemon-config"
port = 5000
metrics_port = 9946
}

variable "namespace" {
Expand Down
27 changes: 27 additions & 0 deletions bootstrap/daemon/monitor.tf
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
resource "kubernetes_manifest" "daemon_monitor" {
manifest = {
apiVersion = "monitoring.coreos.com/v1"
kind = "PodMonitor"
metadata = {
labels = {
"app.kubernetes.io/component" = "o11y"
"app.kubernetes.io/part-of" = "demeter"
}
name = "fabric-daemon"
namespace = var.namespace
}
spec = {
selector = {
matchLabels = {
role = "fabric-daemon"
}
}
podMetricsEndpoints = [
{
port = "metrics",
path = "/metrics"
}
]
}
}
}
3 changes: 3 additions & 0 deletions examples/config/daemon.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,9 @@ cluster_id = "625e6681-8a74-4454-b5ad-861b45c6a42e"
delay_sec = 60
mode = "full"

[metrics]
addr="0.0.0.0:9947"

[prometheus]
url = "http://localhost:9090/api/v1"
query_step = "1m"
Expand Down
32 changes: 24 additions & 8 deletions src/bin/daemon.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use std::{collections::HashMap, env, time::Duration};
use std::{collections::HashMap, env, sync::Arc, time::Duration};

use anyhow::Result;
use dotenv::dotenv;
use fabric::drivers::{cache::CacheConfig, monitor::MonitorConfig, usage::UsageConfig};
use fabric::{
driven::prometheus::metrics::MetricsDriven,
drivers::{cache::CacheConfig, monitor::MonitorConfig, usage::UsageConfig},
};
use serde::{de::Visitor, Deserialize, Deserializer};
use tokio::try_join;
use tracing::Level;
Expand All @@ -23,23 +26,30 @@ async fn main() -> Result<()> {
.init();

let config = Config::new()?;
let metrics_driven = Arc::new(MetricsDriven::new()?);

let metrics = fabric::drivers::metrics::server(&config.metrics.addr, metrics_driven.clone());

match config.mode {
Mode::Usage => {
let cache = fabric::drivers::cache::subscribe(config.clone().into());
let schedule = fabric::drivers::usage::schedule(config.clone().into());
let usage = fabric::drivers::usage::schedule(config.clone().into());

try_join!(cache, schedule)?;
try_join!(cache, usage, metrics)?;
}
Mode::Monitor => {
fabric::drivers::monitor::subscribe(config.clone().into()).await?;
let monitor =
fabric::drivers::monitor::subscribe(config.clone().into(), metrics_driven.clone());

try_join!(monitor, metrics)?;
}
Mode::Full => {
let cache = fabric::drivers::cache::subscribe(config.clone().into());
let schedule = fabric::drivers::usage::schedule(config.clone().into());
let subscribe = fabric::drivers::monitor::subscribe(config.clone().into());
let usage = fabric::drivers::usage::schedule(config.clone().into());
let monitor =
fabric::drivers::monitor::subscribe(config.clone().into(), metrics_driven.clone());

try_join!(cache, schedule, subscribe)?;
try_join!(cache, usage, monitor, metrics)?;
}
};

Expand All @@ -59,11 +69,17 @@ struct Prometheus {
query_step: String,
}

#[derive(Debug, Deserialize, Clone)]
struct Metrics {
addr: String,
}

#[derive(Debug, Deserialize, Clone)]
struct Config {
db_path: String,
cluster_id: String,
prometheus: Prometheus,
metrics: Metrics,
#[serde(deserialize_with = "deserialize_duration")]
#[serde(rename(deserialize = "delay_sec"))]
delay: Duration,
Expand Down
27 changes: 24 additions & 3 deletions src/drivers/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,11 @@ use std::{borrow::Borrow, collections::HashMap, sync::Arc};
use tracing::{error, info, warn};

use crate::{
domain::{event::Event, project, resource},
driven::k8s::K8sCluster,
domain::{error::Error, event::Event, project, resource},
driven::{k8s::K8sCluster, prometheus::metrics::MetricsDriven},
};

pub async fn subscribe(config: MonitorConfig) -> Result<()> {
pub async fn subscribe(config: MonitorConfig, metrics: Arc<MetricsDriven>) -> Result<()> {
let cluster = Arc::new(K8sCluster::new().await?);

let mut client_config = ClientConfig::new();
Expand Down Expand Up @@ -60,22 +60,37 @@ pub async fn subscribe(config: MonitorConfig) -> Result<()> {
Event::ProjectCreated(evt) => {
project::cluster::apply_manifest(cluster.clone(), evt.clone())
.await
.inspect_err(|err| {
handle_error_metric(metrics.clone(), "project", err)
})
}
Event::ProjectDeleted(evt) => {
project::cluster::delete_manifest(cluster.clone(), evt.clone())
.await
.inspect_err(|err| {
handle_error_metric(metrics.clone(), "project", err)
})
}
Event::ResourceCreated(evt) => {
resource::cluster::apply_manifest(cluster.clone(), evt.clone())
.await
.inspect_err(|err| {
handle_error_metric(metrics.clone(), "resource", err)
})
}
Event::ResourceUpdated(evt) => {
resource::cluster::patch_manifest(cluster.clone(), evt.clone())
.await
.inspect_err(|err| {
handle_error_metric(metrics.clone(), "resource", err)
})
}
Event::ResourceDeleted(evt) => {
resource::cluster::delete_manifest(cluster.clone(), evt.clone())
.await
.inspect_err(|err| {
handle_error_metric(metrics.clone(), "resource", err)
})
}
_ => {
info!(event = event.key(), "bypass event");
Expand Down Expand Up @@ -112,3 +127,9 @@ pub struct MonitorConfig {
pub topic: String,
pub kafka: HashMap<String, String>,
}

fn handle_error_metric(metrics: Arc<MetricsDriven>, domain: &str, error: &Error) {
if let Error::Unexpected(err) = error {
metrics.domain_error("monitor", domain, &err.to_string());
}
}
3 changes: 3 additions & 0 deletions test/fabric.manifest.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,9 @@ data:
[prometheus]
url = "http://prometheus:9090/api/v1"
query_step = "1m"
[metrics]
addr="0.0.0.0:9946"
kind: ConfigMap
metadata:
name: daemon-config
Expand Down

0 comments on commit 481704f

Please sign in to comment.