From 4e65b30f89f8051b19f76ff14a03cf1634279004 Mon Sep 17 00:00:00 2001 From: Paul-Christian Volkmer Date: Wed, 15 May 2024 16:27:26 +0200 Subject: [PATCH] feat: enhance JSON format in response message --- src/main.rs | 49 ++++++++++++++++++++++--------------------------- 1 file changed, 22 insertions(+), 27 deletions(-) diff --git a/src/main.rs b/src/main.rs index c6d5f7b..a2bc1f0 100644 --- a/src/main.rs +++ b/src/main.rs @@ -1,7 +1,7 @@ /* * This file is part of ETL-Processor * - * Copyright (c) 2023 Comprehensive Cancer Center Mainfranken + * Copyright (c) 2024 Comprehensive Cancer Center Mainfranken * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License as published @@ -24,16 +24,16 @@ use std::str::FromStr; use std::time::Duration; use log::{debug, error, info, warn}; -use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList}; use rdkafka::consumer::{Consumer, ConsumerContext, Rebalance, StreamConsumer}; use rdkafka::error::KafkaResult; use rdkafka::producer::{FutureProducer, FutureRecord}; -use serde_json::json; +use rdkafka::{ClientConfig, ClientContext, Message, TopicPartitionList}; +use serde_json::{json, Value}; use simple_logger::SimpleLogger; -use crate::AppError::{ConnectionError, HttpError, MissingConfig}; use crate::bwhc_client::{BwhcClient, HttpResponse}; use crate::resources::request::Request; +use crate::AppError::{ConnectionError, HttpError, MissingConfig}; mod bwhc_client; mod resources; @@ -90,16 +90,18 @@ enum KafkaResponsePayload { impl KafkaResponsePayload { fn to_payload(&self, request_id: &str) -> String { match self { - KafkaResponsePayload::SuccessfulConnection(s) => format!( - "{{\"request_id\":\"{}\", \"status_code\":{}, \"status_body\":{}}}", - request_id, - s.status_code, - if s.status_body.trim().is_empty() { - String::from("{}") - } else { - s.status_body.clone() + KafkaResponsePayload::SuccessfulConnection(s) => json!({ + "request_id": request_id, + "status_code": s.status_code, + "status_body" : { + "issues": if s.status_body.trim().is_empty() { + json!({}) + } else { + serde_json::from_str::(&s.status_body).unwrap_or(json!({})) + } } - ), + }) + .to_string(), KafkaResponsePayload::NoConnection => json!({ "request_id": request_id, "status_code": 900, @@ -110,7 +112,7 @@ impl KafkaResponsePayload { }] } }) - .to_string(), + .to_string(), } } } @@ -135,12 +137,7 @@ async fn send_kafka_response( }; } -async fn handle_message( - producer: &FutureProducer, - topic: &str, - key: &str, - payload: &str, -) { +async fn handle_message(producer: &FutureProducer, topic: &str, key: &str, payload: &str) { if Request::can_parse(payload) { if let Ok(request) = Request::from_str(payload) { if request.has_consent() { @@ -153,7 +150,7 @@ async fn handle_message( key, KafkaResponsePayload::SuccessfulConnection(response), ) - .await + .await } Err(_) => { send_kafka_response( @@ -163,7 +160,7 @@ async fn handle_message( key, KafkaResponsePayload::NoConnection, ) - .await + .await } } } else { @@ -176,7 +173,7 @@ async fn handle_message( key, KafkaResponsePayload::SuccessfulConnection(response), ) - .await + .await } Err(_) => { send_kafka_response( @@ -186,7 +183,7 @@ async fn handle_message( key, KafkaResponsePayload::NoConnection, ) - .await + .await } } } @@ -246,9 +243,7 @@ async fn main() -> Result<(), Box> { match consumer.recv().await { Ok(msg) => match msg.payload_view::() { Some(Ok(s)) => match msg.key_view::() { - Some(Ok(key)) => { - handle_message(producer, dst_topic.as_str(), key, s).await - } + Some(Ok(key)) => handle_message(producer, dst_topic.as_str(), key, s).await, _ => error!("Unable to use key!"), }, _ => error!("Unable to use payload!"),