Skip to content

Commit

Permalink
feat: added group id consumer
Browse files Browse the repository at this point in the history
  • Loading branch information
paulobressan committed Jul 12, 2024
1 parent 91be4a7 commit 7e7e219
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 4 deletions.
13 changes: 12 additions & 1 deletion src/bin/daemon.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ use std::env;

use anyhow::Result;
use dotenv::dotenv;
use fabric::drivers::monitor::MonitorConfig;
use serde::Deserialize;
use tracing::Level;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
Expand All @@ -22,12 +23,13 @@ async fn main() -> Result<()> {

let config = Config::new()?;

fabric::drivers::monitor::subscribe(&config.brokers).await
fabric::drivers::monitor::subscribe(config.into()).await
}

#[derive(Debug, Deserialize)]
struct Config {
brokers: String,
consumer_name: String,
}
impl Config {
pub fn new() -> Result<Self> {
Expand All @@ -43,3 +45,12 @@ impl Config {
Ok(config)
}
}

impl From<Config> for MonitorConfig {
fn from(value: Config) -> Self {
Self {
brokers: value.brokers,
consumer_name: value.consumer_name,
}
}
}
11 changes: 8 additions & 3 deletions src/drivers/monitor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@ use crate::{
driven::k8s::K8sCluster,
};

pub async fn subscribe(brokers: &str) -> Result<()> {
pub async fn subscribe(config: MonitorConfig) -> Result<()> {
let k8s_cluster = Arc::new(K8sCluster::new().await?);

let topic = String::from("events");

let consumer: StreamConsumer = ClientConfig::new()
.set("bootstrap.servers", brokers)
.set("group.id", "clusters")
.set("bootstrap.servers", &config.brokers)
.set("group.id", &config.consumer_name)
.create()?;

consumer.subscribe(&[&topic])?;
Expand Down Expand Up @@ -50,3 +50,8 @@ pub async fn subscribe(brokers: &str) -> Result<()> {
};
}
}

pub struct MonitorConfig {
pub brokers: String,
pub consumer_name: String,
}

0 comments on commit 7e7e219

Please sign in to comment.