Skip to content

Commit

Permalink
Merge pull request #55 from EdwinBetanc0urt/bugfix/remove-menu-topic-…
Browse files Browse the repository at this point in the history
…deprecated

fix: Remove deprecated `menu` topic.
  • Loading branch information
yamelsenih authored Sep 30, 2024
2 parents 2d01993 + ff87a1f commit f5ddcca
Show file tree
Hide file tree
Showing 5 changed files with 37 additions and 29 deletions.
2 changes: 1 addition & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@ RUST_LOG=info
PORT=7878
ALLOWED_ORIGIN="*"
KAFKA_ENABLED="Y"
KAFKA_QUEUES="menu browser form process window menu_item menu_tree role"
KAFKA_QUEUES="browser form process window menu_item menu_tree role"
KAFKA_HOST="localhost:29092"
KAFKA_GROUP="default"
OPENSEARCH_URL="http://localhost:9200"
Expand Down
10 changes: 7 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,9 @@ cargo run --bin server
A output generated by the Arduino UNO emulator can be like it:

```Shell
2024-06-20T19:28:59.081Z INFO [server] Kafka Consumer is enabled
2024-06-20T19:28:59.081Z INFO [server] Kafka queue to Subscribe: "localhost:29092"
2024-06-20T19:28:59.081Z INFO [server] Kafka Topics to Subscribe: ["browser", "form", "process", "window", "menu_item", "menu_tree", "role"]
INFO [server] └──!NULL!
└──api
├──[OPTIONS] -> server::options_response
Expand Down Expand Up @@ -135,9 +138,10 @@ INFO [server] └──!NULL!
├──[OPTIONS] -> server::options_response
└──[GET] -> server::get_windows

2024-06-20T19:28:59.081Z INFO [server] Kafka Consumer is enabled
2024-06-20T19:28:59.081Z INFO [server] Kafka queue: "localhost:29092"
2024-06-20T19:28:59.081Z INFO [server] Topics to Subscribed: ["menu", "browser", "form", "process", "window", "menu_item", "menu_tree", "role"]
2024-06-20T19:28:59.081Z INFO [server] Subscribed to kafka brokers successfully:: "localhost:29092"
2024-06-20T19:28:59.081Z INFO [server] Subscribed to kafka topics successfully: "browser form process window menu_item menu_tree role"
2024-06-20T19:28:59.081Z INFO [dictionary_rs::controller::kafka] Pre rebalance Assign(TPL {menu_tree/0: offset=Invalid metadata="", error=Ok(()); window/0: offset=Invalid metadata="", error=Ok(()); role/0: offset=Invalid metadata="", error=Ok(()); browser/0: offset=Invalid metadata="", error=Ok(()); process/0: offset=Invalid metadata="", error=Ok(()); form/0: offset=Invalid metadata="", error=Ok(()); menu_item/0: offset=Invalid metadata="", error=Ok(())})
2024-06-20T19:28:59.081Z INFO [dictionary_rs::controller::kafka] Post rebalance Assign(TPL {menu_tree/0: offset=Invalid metadata="", error=Ok(()); window/0: offset=Invalid metadata="", error=Ok(()); role/0: offset=Invalid metadata="", error=Ok(()); browser/0: offset=Invalid metadata="", error=Ok(()); process/0: offset=Invalid metadata="", error=Ok(()); form/0: offset=Invalid metadata="", error=Ok(()); menu_item/0: offset=Invalid metadata="", error=Ok(())})
```

# General Info
Expand Down
2 changes: 1 addition & 1 deletion docker/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ ENV \
RUST_LOG="info" \
PORT="7878" \
KAFKA_ENABLED="Y" \
KAFKA_QUEUES="menu browser form process window menu_item menu_tree role" \
KAFKA_QUEUES="browser form process window menu_item menu_tree role" \
ALLOWED_ORIGIN="*" \
KAFKA_HOST="0.0.0.0:9092" \
KAFKA_GROUP="default" \
Expand Down
40 changes: 20 additions & 20 deletions src/bin/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -384,32 +384,32 @@ async fn get_windows<'a>(_req: &mut Request, _res: &mut Response) {
}

async fn consume_queue() {
let kafka_host = match env::var("KAFKA_HOST") {
Ok(value) => value,
Err(_) => {
log::info!("Variable `KAFKA_HOST` Not found from enviroment, loaded from local IP");
"127.0.0.1:9092".to_owned()
}.to_owned(),
};
log::info!("Kafka queue: {:?}", kafka_host.to_owned());
let kafka_host: String = match env::var("KAFKA_HOST") {
Ok(value) => value,
Err(_) => {
log::info!("Variable `KAFKA_HOST` Not found from enviroment, loaded from local IP");
"127.0.0.1:9092".to_owned()
}.to_owned(),
};
log::info!("Kafka queue to Subscribe: {:?}", kafka_host.to_owned());

let kafka_group = match env::var("KAFKA_GROUP") {
Ok(value) => value,
Err(_) => {
log::info!("Variable `KAFKA_GROUP` Not found from enviroment, loaded with `default` value");
"default".to_owned()
}.to_owned(),
};
let kafka_group: String = match env::var("KAFKA_GROUP") {
Ok(value) => value,
Err(_) => {
log::info!("Variable `KAFKA_GROUP` Not found from enviroment, loaded with `default` value");
"default".to_owned()
}.to_owned(),
};
let kafka_queues: String = match env::var("KAFKA_QUEUES") {
Ok(value) => value.clone(),
Err(_) => {
log::info!("Variable `KAFKA_QUEUES` Not found from enviroment, loaded with `default` value");
Ok(value) => value.clone(),
Err(_) => {
log::info!("Variable `KAFKA_QUEUES` Not found from enviroment, loaded with `default` value");
"menu process browser window".to_owned()
}.to_owned()
};
};

let topics: Vec<&str> = kafka_queues.split_whitespace().collect();
log::info!("Topics to Subscribed: {:?}", topics.to_owned());
log::info!("Kafka Topics to Subscribe: {:?}", topics.to_owned());

let consumer_result = create_consumer(&kafka_host, &kafka_group, &topics);
match consumer_result {
Expand Down
12 changes: 8 additions & 4 deletions src/controller/kafka.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,18 +54,22 @@ pub fn create_consumer(brokers: &str, group_id: &str, topics: &[&str]) -> Result
if consumer_value.is_err() {
return Err(Error::new(ErrorKind::InvalidData.into(), consumer_value.err().unwrap()))
}
let consumer = consumer_value.unwrap();

let consumer: StreamConsumer<CustomContext> = consumer_value.unwrap();
log::info!("Subscribed to kafka brokers successfully: {:?}", &brokers);

loop {
match consumer.subscribe(&topics) {
Ok(()) => {
log::info!("Subscribed to topics successfully: {:?}", topics.join(" "));
log::info!("Subscribed to kafka topics successfully: {:?}", topics.join(" "));
break
},
Err(e) => {
log::warn!("Can't subscribe to specified topics '{:?}': {}", topics, e);
log::warn!("Can't subscribe to kafka specified topics '{:?}': {}", topics, e);
},
}
let waiting_time = Duration::from_secs(5);

let waiting_time: Duration = Duration::from_secs(5);
thread::sleep(waiting_time);
}
Ok(consumer)
Expand Down

0 comments on commit f5ddcca

Please sign in to comment.