diff --git a/zenoh/src/api/bytes.rs b/zenoh/src/api/bytes.rs index 1496492379..ef94d83116 100644 --- a/zenoh/src/api/bytes.rs +++ b/zenoh/src/api/bytes.rs @@ -26,7 +26,10 @@ use zenoh_buffers::{ ZBuf, ZBufReader, ZBufWriter, ZSlice, }; use zenoh_codec::{RCodec, WCodec, Zenoh080}; -use zenoh_protocol::{core::Parameters, zenoh::ext::AttachmentType}; +use zenoh_protocol::{ + core::{Encoding as EncodingProto, Parameters}, + zenoh::ext::AttachmentType, +}; use zenoh_result::{ZError, ZResult}; #[cfg(feature = "shared-memory")] use zenoh_shm::{ @@ -37,6 +40,8 @@ use zenoh_shm::{ ShmBufInner, }; +use super::{encoding::Encoding, value::Value}; + /// Trait to encode a type `T` into a [`Value`]. pub trait Serialize { type Output; @@ -1220,6 +1225,174 @@ impl<'s> TryFrom<&'s mut ZBytes> for Parameters<'s> { } } +// Encoding +impl Serialize for ZSerde { + type Output = ZBytes; + + fn serialize(self, s: Encoding) -> Self::Output { + let e: EncodingProto = s.into(); + let codec = Zenoh080::new(); + let mut buffer = ZBuf::empty(); + let mut writer = buffer.writer(); + // SAFETY: we are serializing slices on a ZBuf, so serialization will never + // fail unless we run out of memory. In that case, Rust memory allocator + // will panic before the serializer has any chance to fail. + unsafe { + codec.write(&mut writer, &e).unwrap_unchecked(); + } + ZBytes::from(buffer) + } +} + +impl From for ZBytes { + fn from(t: Encoding) -> Self { + ZSerde.serialize(t) + } +} + +impl Serialize<&Encoding> for ZSerde { + type Output = ZBytes; + + fn serialize(self, s: &Encoding) -> Self::Output { + ZSerde.serialize(s.clone()) + } +} + +impl From<&Encoding> for ZBytes { + fn from(t: &Encoding) -> Self { + ZSerde.serialize(t) + } +} + +impl Serialize<&mut Encoding> for ZSerde { + type Output = ZBytes; + + fn serialize(self, s: &mut Encoding) -> Self::Output { + ZSerde.serialize(&*s) + } +} + +impl From<&mut Encoding> for ZBytes { + fn from(t: &mut Encoding) -> Self { + ZSerde.serialize(t) + } +} + +impl<'a> Deserialize<'a, Encoding> for ZSerde { + type Input = &'a ZBytes; + type Error = zenoh_buffers::reader::DidntRead; + + fn deserialize(self, v: Self::Input) -> Result { + let codec = Zenoh080::new(); + let mut reader = v.0.reader(); + let e: EncodingProto = codec.read(&mut reader)?; + Ok(e.into()) + } +} + +impl TryFrom for Encoding { + type Error = zenoh_buffers::reader::DidntRead; + + fn try_from(value: ZBytes) -> Result { + ZSerde.deserialize(&value) + } +} + +impl TryFrom<&ZBytes> for Encoding { + type Error = zenoh_buffers::reader::DidntRead; + + fn try_from(value: &ZBytes) -> Result { + ZSerde.deserialize(value) + } +} + +impl TryFrom<&mut ZBytes> for Encoding { + type Error = zenoh_buffers::reader::DidntRead; + + fn try_from(value: &mut ZBytes) -> Result { + ZSerde.deserialize(&*value) + } +} + +// Value +impl Serialize for ZSerde { + type Output = ZBytes; + + fn serialize(self, s: Value) -> Self::Output { + ZSerde.serialize((s.payload(), s.encoding())) + } +} + +impl From for ZBytes { + fn from(t: Value) -> Self { + ZSerde.serialize(t) + } +} + +impl Serialize<&Value> for ZSerde { + type Output = ZBytes; + + fn serialize(self, s: &Value) -> Self::Output { + ZSerde.serialize(s.clone()) + } +} + +impl From<&Value> for ZBytes { + fn from(t: &Value) -> Self { + ZSerde.serialize(t) + } +} + +impl Serialize<&mut Value> for ZSerde { + type Output = ZBytes; + + fn serialize(self, s: &mut Value) -> Self::Output { + ZSerde.serialize(&*s) + } +} + +impl From<&mut Value> for ZBytes { + fn from(t: &mut Value) -> Self { + ZSerde.serialize(t) + } +} + +impl<'a> Deserialize<'a, Value> for ZSerde { + type Input = &'a ZBytes; + type Error = ZError; + + fn deserialize(self, v: Self::Input) -> Result { + let (payload, encoding) = v + .deserialize::<(ZBytes, Encoding)>() + .map_err(|e| zerror!("{:?}", e))?; + Ok(Value::new(payload, encoding)) + } +} + +impl TryFrom for Value { + type Error = ZError; + + fn try_from(value: ZBytes) -> Result { + ZSerde.deserialize(&value) + } +} + +impl TryFrom<&ZBytes> for Value { + type Error = ZError; + + fn try_from(value: &ZBytes) -> Result { + ZSerde.deserialize(value) + } +} + +impl TryFrom<&mut ZBytes> for Value { + type Error = ZError; + + fn try_from(value: &mut ZBytes) -> Result { + ZSerde.deserialize(&*value) + } +} + // JSON impl Serialize for ZSerde { type Output = Result; diff --git a/zenoh/src/api/session.rs b/zenoh/src/api/session.rs index c1d7290e72..1c4ae2086f 100644 --- a/zenoh/src/api/session.rs +++ b/zenoh/src/api/session.rs @@ -1701,7 +1701,7 @@ impl Session { } } (query.callback)(Reply { - result: Err(Value::from("Timeout").into()), + result: Err(Value::new("Timeout", Encoding::ZENOH_STRING).into()), replier_id: Some(zid.into()), }); } diff --git a/zenoh/src/api/value.rs b/zenoh/src/api/value.rs index 4d482da0b5..006767e427 100644 --- a/zenoh/src/api/value.rs +++ b/zenoh/src/api/value.rs @@ -59,18 +59,6 @@ impl Value { } } -impl From for Value -where - T: Into, -{ - fn from(t: T) -> Self { - Value { - payload: t.into(), - encoding: Encoding::default(), - } - } -} - impl From> for Value where T: Into,