Skip to content

Commit

Permalink
feat: enhance JSON format in response message
Browse files Browse the repository at this point in the history
  • Loading branch information
pcvolkmer committed May 15, 2024
1 parent a41f0b2 commit 4e65b30
Showing 1 changed file with 22 additions and 27 deletions.
49 changes: 22 additions & 27 deletions src/main.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
/*
* This file is part of ETL-Processor
*
* Copyright (c) 2023 Comprehensive Cancer Center Mainfranken
* Copyright (c) 2024 Comprehensive Cancer Center Mainfranken
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License as published
Expand All @@ -24,16 +24,16 @@ use std::str::FromStr;
use std::time::Duration;

use log::{debug, error, info, warn};
use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList};
use rdkafka::consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer};
use rdkafka::error::KafkaResult;
use rdkafka::producer::{FutureProducer, FutureRecord};
use serde_json::json;
use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList};
use serde_json::{json, Value};
use simple_logger::SimpleLogger;

use crate::AppError::{ConnectionError, HttpError, MissingConfig};
use crate::bwhc_client::{BwhcClient, HttpResponse};
use crate::resources::request::Request;
use crate::AppError::{ConnectionError, HttpError, MissingConfig};

mod bwhc_client;
mod resources;
Expand Down Expand Up @@ -90,16 +90,18 @@ enum KafkaResponsePayload {
impl KafkaResponsePayload {
fn to_payload(&self, request_id: &str) -> String {
match self {
KafkaResponsePayload::SuccessfulConnection(s) => format!(
"{{\"request_id\":\"{}\", \"status_code\":{}, \"status_body\":{}}}",
request_id,
s.status_code,
if s.status_body.trim().is_empty() {
String::from("{}")
} else {
s.status_body.clone()
KafkaResponsePayload::SuccessfulConnection(s) => json!({
"request_id": request_id,
"status_code": s.status_code,
"status_body" : {
"issues": if s.status_body.trim().is_empty() {
json!({})
} else {
serde_json::from_str::<Value>(&s.status_body).unwrap_or(json!({}))
}
}
),
})
.to_string(),
KafkaResponsePayload::NoConnection => json!({
"request_id": request_id,
"status_code": 900,
Expand All @@ -110,7 +112,7 @@ impl KafkaResponsePayload {
}]
}
})
.to_string(),
.to_string(),
}
}
}
Expand All @@ -135,12 +137,7 @@ async fn send_kafka_response(
};
}

async fn handle_message(
producer: &FutureProducer,
topic: &str,
key: &str,
payload: &str,
) {
async fn handle_message(producer: &FutureProducer, topic: &str, key: &str, payload: &str) {
if Request::can_parse(payload) {
if let Ok(request) = Request::from_str(payload) {
if request.has_consent() {
Expand All @@ -153,7 +150,7 @@ async fn handle_message(
key,
KafkaResponsePayload::SuccessfulConnection(response),
)
.await
.await
}
Err(_) => {
send_kafka_response(
Expand All @@ -163,7 +160,7 @@ async fn handle_message(
key,
KafkaResponsePayload::NoConnection,
)
.await
.await
}
}
} else {
Expand All @@ -176,7 +173,7 @@ async fn handle_message(
key,
KafkaResponsePayload::SuccessfulConnection(response),
)
.await
.await
}
Err(_) => {
send_kafka_response(
Expand All @@ -186,7 +183,7 @@ async fn handle_message(
key,
KafkaResponsePayload::NoConnection,
)
.await
.await
}
}
}
Expand Down Expand Up @@ -246,9 +243,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
match consumer.recv().await {
Ok(msg) => match msg.payload_view::<str>() {
Some(Ok(s)) => match msg.key_view::<str>() {
Some(Ok(key)) => {
handle_message(producer, dst_topic.as_str(), key, s).await
}
Some(Ok(key)) => handle_message(producer, dst_topic.as_str(), key, s).await,
_ => error!("Unable to use key!"),
},
_ => error!("Unable to use payload!"),
Expand Down

0 comments on commit 4e65b30

Please sign in to comment.