diff --git a/.env b/.env index ad6c2e8..d5574ee 100644 --- a/.env +++ b/.env @@ -2,7 +2,7 @@ RUST_LOG=info PORT=7878 ALLOWED_ORIGIN="*" KAFKA_ENABLED="Y" -KAFKA_QUEUES="menu browser form process window menu_item menu_tree role" +KAFKA_QUEUES="browser form process window menu_item menu_tree role" KAFKA_HOST="localhost:29092" KAFKA_GROUP="default" OPENSEARCH_URL="http://localhost:9200" diff --git a/README.md b/README.md index 47fe49e..7608818 100644 --- a/README.md +++ b/README.md @@ -102,6 +102,9 @@ cargo run --bin server A output generated by the Arduino UNO emulator can be like it: ```Shell +2024-06-20T19:28:59.081Z INFO [server] Kafka Consumer is enabled +2024-06-20T19:28:59.081Z INFO [server] Kafka queue to Subscribe: "localhost:29092" +2024-06-20T19:28:59.081Z INFO [server] Kafka Topics to Subscribe: ["browser", "form", "process", "window", "menu_item", "menu_tree", "role"] INFO [server] └──!NULL! └──api ├──[OPTIONS] -> server::options_response @@ -135,9 +138,10 @@ INFO [server] └──!NULL! ├──[OPTIONS] -> server::options_response └──[GET] -> server::get_windows -2024-06-20T19:28:59.081Z INFO [server] Kafka Consumer is enabled -2024-06-20T19:28:59.081Z INFO [server] Kafka queue: "localhost:29092" -2024-06-20T19:28:59.081Z INFO [server] Topics to Subscribed: ["menu", "browser", "form", "process", "window", "menu_item", "menu_tree", "role"] +2024-06-20T19:28:59.081Z INFO [server] Subscribed to kafka brokers successfully:: "localhost:29092" +2024-06-20T19:28:59.081Z INFO [server] Subscribed to kafka topics successfully: "browser form process window menu_item menu_tree role" +2024-06-20T19:28:59.081Z INFO [dictionary_rs::controller::kafka] Pre rebalance Assign(TPL {menu_tree/0: offset=Invalid metadata="", error=Ok(()); window/0: offset=Invalid metadata="", error=Ok(()); role/0: offset=Invalid metadata="", error=Ok(()); browser/0: offset=Invalid metadata="", error=Ok(()); process/0: offset=Invalid metadata="", error=Ok(()); form/0: offset=Invalid metadata="", error=Ok(()); menu_item/0: offset=Invalid metadata="", error=Ok(())}) +2024-06-20T19:28:59.081Z INFO [dictionary_rs::controller::kafka] Post rebalance Assign(TPL {menu_tree/0: offset=Invalid metadata="", error=Ok(()); window/0: offset=Invalid metadata="", error=Ok(()); role/0: offset=Invalid metadata="", error=Ok(()); browser/0: offset=Invalid metadata="", error=Ok(()); process/0: offset=Invalid metadata="", error=Ok(()); form/0: offset=Invalid metadata="", error=Ok(()); menu_item/0: offset=Invalid metadata="", error=Ok(())}) ``` # General Info diff --git a/docker/Dockerfile b/docker/Dockerfile index 0eda501..833cba5 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -27,7 +27,7 @@ ENV \ RUST_LOG="info" \ PORT="7878" \ KAFKA_ENABLED="Y" \ - KAFKA_QUEUES="menu browser form process window menu_item menu_tree role" \ + KAFKA_QUEUES="browser form process window menu_item menu_tree role" \ ALLOWED_ORIGIN="*" \ KAFKA_HOST="0.0.0.0:9092" \ KAFKA_GROUP="default" \ diff --git a/src/bin/server.rs b/src/bin/server.rs index da1fafc..7a57691 100644 --- a/src/bin/server.rs +++ b/src/bin/server.rs @@ -384,32 +384,32 @@ async fn get_windows<'a>(_req: &mut Request, _res: &mut Response) { } async fn consume_queue() { - let kafka_host = match env::var("KAFKA_HOST") { - Ok(value) => value, - Err(_) => { - log::info!("Variable `KAFKA_HOST` Not found from enviroment, loaded from local IP"); - "127.0.0.1:9092".to_owned() - }.to_owned(), - }; - log::info!("Kafka queue: {:?}", kafka_host.to_owned()); + let kafka_host: String = match env::var("KAFKA_HOST") { + Ok(value) => value, + Err(_) => { + log::info!("Variable `KAFKA_HOST` Not found from enviroment, loaded from local IP"); + "127.0.0.1:9092".to_owned() + }.to_owned(), + }; + log::info!("Kafka queue to Subscribe: {:?}", kafka_host.to_owned()); - let kafka_group = match env::var("KAFKA_GROUP") { - Ok(value) => value, - Err(_) => { - log::info!("Variable `KAFKA_GROUP` Not found from enviroment, loaded with `default` value"); - "default".to_owned() - }.to_owned(), - }; + let kafka_group: String = match env::var("KAFKA_GROUP") { + Ok(value) => value, + Err(_) => { + log::info!("Variable `KAFKA_GROUP` Not found from enviroment, loaded with `default` value"); + "default".to_owned() + }.to_owned(), + }; let kafka_queues: String = match env::var("KAFKA_QUEUES") { - Ok(value) => value.clone(), - Err(_) => { - log::info!("Variable `KAFKA_QUEUES` Not found from enviroment, loaded with `default` value"); + Ok(value) => value.clone(), + Err(_) => { + log::info!("Variable `KAFKA_QUEUES` Not found from enviroment, loaded with `default` value"); "menu process browser window".to_owned() }.to_owned() - }; + }; let topics: Vec<&str> = kafka_queues.split_whitespace().collect(); - log::info!("Topics to Subscribed: {:?}", topics.to_owned()); + log::info!("Kafka Topics to Subscribe: {:?}", topics.to_owned()); let consumer_result = create_consumer(&kafka_host, &kafka_group, &topics); match consumer_result { diff --git a/src/controller/kafka.rs b/src/controller/kafka.rs index a3bd6f9..2fa2fcf 100644 --- a/src/controller/kafka.rs +++ b/src/controller/kafka.rs @@ -54,18 +54,22 @@ pub fn create_consumer(brokers: &str, group_id: &str, topics: &[&str]) -> Result if consumer_value.is_err() { return Err(Error::new(ErrorKind::InvalidData.into(), consumer_value.err().unwrap())) } - let consumer = consumer_value.unwrap(); + + let consumer: StreamConsumer = consumer_value.unwrap(); + log::info!("Subscribed to kafka brokers successfully: {:?}", &brokers); + loop { match consumer.subscribe(&topics) { Ok(()) => { - log::info!("Subscribed to topics successfully: {:?}", topics.join(" ")); + log::info!("Subscribed to kafka topics successfully: {:?}", topics.join(" ")); break }, Err(e) => { - log::warn!("Can't subscribe to specified topics '{:?}': {}", topics, e); + log::warn!("Can't subscribe to kafka specified topics '{:?}': {}", topics, e); }, } - let waiting_time = Duration::from_secs(5); + + let waiting_time: Duration = Duration::from_secs(5); thread::sleep(waiting_time); } Ok(consumer)