Skip to content

Commit

Permalink
fix(828): ensuring valid JSON response from REST API
Browse files Browse the repository at this point in the history
Signed-off-by: gabrik <[email protected]>
  • Loading branch information
gabrik committed Mar 14, 2024
1 parent cc68ffb commit 51cf8a8
Showing 1 changed file with 55 additions and 29 deletions.
84 changes: 55 additions & 29 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,10 @@
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use async_std::prelude::FutureExt;
use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
use base64::Engine;
use futures::StreamExt;
use http_types::Method;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::convert::TryFrom;
use std::str::FromStr;
Expand All @@ -46,47 +47,68 @@ lazy_static::lazy_static! {
}
const RAW_KEY: &str = "_raw";

fn payload_to_json(payload: Payload) -> String {
payload
.deserialize::<String>()
.unwrap_or_else(|_| format!(r#""{}""#, b64_std_engine.encode(payload.contiguous())))
#[derive(Serialize, Deserialize)]
struct JSONSample {
key: String,
value: serde_json::Value,
encoding: String,
time: Option<String>,
}

fn sample_to_json(sample: Sample) -> String {
format!(
r#"{{ "key": "{}", "value": {}, "encoding": "{}", "time": "{}" }}"#,
sample.key_expr.as_str(),
payload_to_json(sample.payload),
sample.encoding,
if let Some(ts) = sample.timestamp {
ts.to_string()
} else {
"None".to_string()
pub fn base64_encode(data: &[u8]) -> String {
use base64::engine::general_purpose;
general_purpose::STANDARD.encode(data)
}

fn payload_to_json(payload: Payload, encoding: &Encoding) -> serde_json::Value {
match payload.len() {
// If the value is empty return a JSON null
0 => serde_json::Value::Null,
// if it is not check the encoding
_ => {
match encoding {
// If it is a JSON try to deserialize as json, if it fails fallback to base64
&Encoding::APPLICATION_JSON | &Encoding::TEXT_JSON | &Encoding::TEXT_JSON5 => {
serde_json::from_slice::<serde_json::Value>(&payload.contiguous()).unwrap_or(
serde_json::Value::String(base64_encode(&payload.contiguous())),
)
}
// otherwise convert to base64 and JSON String
_ => serde_json::Value::String(base64_encode(&payload.contiguous())),
}
}
)
}
}

fn result_to_json(sample: Result<Sample, Value>) -> String {
fn sample_to_json(sample: Sample) -> JSONSample {
JSONSample {
key: sample.key_expr.as_str().to_string(),
value: payload_to_json(sample.payload, &sample.encoding),
encoding: sample.encoding.to_string(),
time: sample.timestamp.map(|ts| ts.to_string()),
}
}

fn result_to_json(sample: Result<Sample, Value>) -> JSONSample {
match sample {
Ok(sample) => sample_to_json(sample),
Err(err) => {
format!(
r#"{{ "key": "ERROR", "value": {}, "encoding": "{}"}}"#,
payload_to_json(err.payload),
err.encoding,
)
}
Err(err) => JSONSample {
key: "ERROR".into(),
value: payload_to_json(err.payload, &err.encoding),
encoding: err.encoding.to_string(),
time: None,
},
}
}

async fn to_json(results: flume::Receiver<Reply>) -> String {
let values = results
.stream()
.filter_map(move |reply| async move { Some(result_to_json(reply.sample)) })
.collect::<Vec<String>>()
.await
.join(",\n");
format!("[\n{values}\n]\n")
.collect::<Vec<JSONSample>>()
.await;

serde_json::to_string(&values).unwrap_or("[]".into())
}

async fn to_json_response(results: flume::Receiver<Reply>) -> Response {
Expand Down Expand Up @@ -321,8 +343,12 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
.unwrap();
loop {
let sample = sub.recv_async().await.unwrap();
let kind = sample.kind.clone();
let json_sample =
serde_json::to_string(&sample_to_json(sample)).unwrap_or("{}".into());

match sender
.send(&sample.kind.to_string(), sample_to_json(sample), None)
.send(&kind.to_string(), json_sample, None)
.timeout(std::time::Duration::new(10, 0))
.await
{
Expand Down

0 comments on commit 51cf8a8

Please sign in to comment.