Skip to content

Commit

Permalink
fix(828): ensuring valid JSON response from REST API (#831)
Browse files Browse the repository at this point in the history
* fix(828): ensuring valid JSON response from REST API

Signed-off-by: gabrik <[email protected]>

* fix(828): improved JSON format conversion

Signed-off-by: gabrik <[email protected]>

* chore: addressing comments

Signed-off-by: gabrik <[email protected]>

* fix(828): added 'into_string' for StringOrBase64

Signed-off-by: gabrik <[email protected]>

* chore: address comments

Signed-off-by: gabrik <[email protected]>

---------

Signed-off-by: gabrik <[email protected]>
  • Loading branch information
gabrik authored Mar 14, 2024
1 parent d808ba2 commit 4d8ec6c
Show file tree
Hide file tree
Showing 2 changed files with 64 additions and 29 deletions.
85 changes: 56 additions & 29 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
//!
//! [Click here for Zenoh's documentation](../zenoh/index.html)
use async_std::prelude::FutureExt;
use base64::{engine::general_purpose::STANDARD as b64_std_engine, Engine};
use base64::Engine;
use futures::StreamExt;
use http_types::Method;
use serde::{Deserialize, Serialize};
use std::borrow::Cow;
use std::convert::TryFrom;
use std::str::FromStr;
use std::sync::Arc;
use tide::http::Mime;
use tide::sse::Sender;
use tide::{Request, Response, Server, StatusCode};
use zenoh::payload::StringOrBase64;
use zenoh::plugins::{RunningPluginTrait, ZenohPlugin};
use zenoh::prelude::r#async::*;
use zenoh::query::{QueryConsolidation, Reply};
Expand All @@ -46,47 +48,68 @@ lazy_static::lazy_static! {
}
const RAW_KEY: &str = "_raw";

fn payload_to_json(payload: Payload) -> String {
payload
.deserialize::<String>()
.unwrap_or_else(|_| format!(r#""{}""#, b64_std_engine.encode(payload.contiguous())))
#[derive(Serialize, Deserialize)]
struct JSONSample {
key: String,
value: serde_json::Value,
encoding: String,
time: Option<String>,
}

fn sample_to_json(sample: Sample) -> String {
format!(
r#"{{ "key": "{}", "value": {}, "encoding": "{}", "time": "{}" }}"#,
sample.key_expr.as_str(),
payload_to_json(sample.payload),
sample.encoding,
if let Some(ts) = sample.timestamp {
ts.to_string()
} else {
"None".to_string()
pub fn base64_encode(data: &[u8]) -> String {
use base64::engine::general_purpose;
general_purpose::STANDARD.encode(data)
}

fn payload_to_json(payload: Payload, encoding: &Encoding) -> serde_json::Value {
match payload.is_empty() {
// If the value is empty return a JSON null
true => serde_json::Value::Null,
// if it is not check the encoding
false => {
match encoding {
// If it is a JSON try to deserialize as json, if it fails fallback to base64
&Encoding::APPLICATION_JSON | &Encoding::TEXT_JSON | &Encoding::TEXT_JSON5 => {
serde_json::from_slice::<serde_json::Value>(&payload.contiguous()).unwrap_or(
serde_json::Value::String(StringOrBase64::from(payload).into_string()),
)
}
// otherwise convert to JSON string
_ => serde_json::Value::String(StringOrBase64::from(payload).into_string()),
}
}
)
}
}

fn result_to_json(sample: Result<Sample, Value>) -> String {
fn sample_to_json(sample: Sample) -> JSONSample {
JSONSample {
key: sample.key_expr.as_str().to_string(),
value: payload_to_json(sample.payload, &sample.encoding),
encoding: sample.encoding.to_string(),
time: sample.timestamp.map(|ts| ts.to_string()),
}
}

fn result_to_json(sample: Result<Sample, Value>) -> JSONSample {
match sample {
Ok(sample) => sample_to_json(sample),
Err(err) => {
format!(
r#"{{ "key": "ERROR", "value": {}, "encoding": "{}"}}"#,
payload_to_json(err.payload),
err.encoding,
)
}
Err(err) => JSONSample {
key: "ERROR".into(),
value: payload_to_json(err.payload, &err.encoding),
encoding: err.encoding.to_string(),
time: None,
},
}
}

async fn to_json(results: flume::Receiver<Reply>) -> String {
let values = results
.stream()
.filter_map(move |reply| async move { Some(result_to_json(reply.sample)) })
.collect::<Vec<String>>()
.await
.join(",\n");
format!("[\n{values}\n]\n")
.collect::<Vec<JSONSample>>()
.await;

serde_json::to_string(&values).unwrap_or("[]".into())
}

async fn to_json_response(results: flume::Receiver<Reply>) -> Response {
Expand Down Expand Up @@ -321,8 +344,12 @@ async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
.unwrap();
loop {
let sample = sub.recv_async().await.unwrap();
let kind = sample.kind;
let json_sample =
serde_json::to_string(&sample_to_json(sample)).unwrap_or("{}".into());

match sender
.send(&sample.kind.to_string(), sample_to_json(sample), None)
.send(&kind.to_string(), json_sample, None)
.timeout(std::time::Duration::new(10, 0))
.await
{
Expand Down
8 changes: 8 additions & 0 deletions zenoh/src/payload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -563,6 +563,14 @@ pub enum StringOrBase64 {
Base64(String),
}

impl StringOrBase64 {
pub fn into_string(self) -> String {
match self {
StringOrBase64::String(s) | StringOrBase64::Base64(s) => s,
}
}
}

impl Deref for StringOrBase64 {
type Target = String;

Expand Down

0 comments on commit 4d8ec6c

Please sign in to comment.