Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Remove unsupported Put/Del in Request/Response #839

Merged
merged 1 commit into from
Mar 18, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 0 additions & 6 deletions commons/zenoh-codec/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,8 +80,6 @@ where
fn write(self, writer: &mut W, x: &RequestBody) -> Self::Output {
match x {
RequestBody::Query(b) => self.write(&mut *writer, b),
RequestBody::Put(b) => self.write(&mut *writer, b),
RequestBody::Del(b) => self.write(&mut *writer, b),
}
}
}
Expand All @@ -98,8 +96,6 @@ where
let codec = Zenoh080Header::new(header);
let body = match imsg::mid(codec.header) {
id::QUERY => RequestBody::Query(codec.read(&mut *reader)?),
id::PUT => RequestBody::Put(codec.read(&mut *reader)?),
id::DEL => RequestBody::Del(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

Expand All @@ -118,7 +114,6 @@ where
match x {
ResponseBody::Reply(b) => self.write(&mut *writer, b),
ResponseBody::Err(b) => self.write(&mut *writer, b),
ResponseBody::Put(b) => self.write(&mut *writer, b),
}
}
}
Expand All @@ -136,7 +131,6 @@ where
let body = match imsg::mid(codec.header) {
id::REPLY => ResponseBody::Reply(codec.read(&mut *reader)?),
id::ERR => ResponseBody::Err(codec.read(&mut *reader)?),
id::PUT => ResponseBody::Put(codec.read(&mut *reader)?),
_ => return Err(DidntRead),
};

Expand Down
22 changes: 2 additions & 20 deletions commons/zenoh-protocol/src/zenoh/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,8 +77,6 @@ impl From<Del> for PushBody {
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum RequestBody {
Query(Query),
Put(Put),
Del(Del),
}

impl RequestBody {
Expand All @@ -88,10 +86,8 @@ impl RequestBody {

let mut rng = rand::thread_rng();

match rng.gen_range(0..3) {
match rng.gen_range(0..1) {
0 => RequestBody::Query(Query::rand()),
1 => RequestBody::Put(Put::rand()),
2 => RequestBody::Del(Del::rand()),
_ => unreachable!(),
}
}
Expand All @@ -103,24 +99,11 @@ impl From<Query> for RequestBody {
}
}

impl From<Put> for RequestBody {
fn from(p: Put) -> RequestBody {
RequestBody::Put(p)
}
}

impl From<Del> for RequestBody {
fn from(d: Del) -> RequestBody {
RequestBody::Del(d)
}
}

// Response
#[derive(Debug, Clone, PartialEq, Eq)]
pub enum ResponseBody {
Reply(Reply),
Err(Err),
Put(Put),
}

impl ResponseBody {
Expand All @@ -129,10 +112,9 @@ impl ResponseBody {
use rand::Rng;
let mut rng = rand::thread_rng();

match rng.gen_range(0..3) {
match rng.gen_range(0..2) {
0 => ResponseBody::Reply(Reply::rand()),
1 => ResponseBody::Err(Err::rand()),
2 => ResponseBody::Put(Put::rand()),
_ => unreachable!(),
}
}
Expand Down
8 changes: 1 addition & 7 deletions io/zenoh-transport/src/shm.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,12 +140,9 @@ pub fn map_zmsg_to_shminfo(msg: &mut NetworkMessage) -> ZResult<bool> {
},
NetworkBody::Request(Request { payload, .. }) => match payload {
RequestBody::Query(b) => b.map_to_shminfo(),
RequestBody::Put(b) => b.map_to_shminfo(),
RequestBody::Del(_) => Ok(false),
},
NetworkBody::Response(Response { payload, .. }) => match payload {
ResponseBody::Reply(b) => b.map_to_shminfo(),
ResponseBody::Put(b) => b.map_to_shminfo(),
ResponseBody::Err(b) => b.map_to_shminfo(),
},
NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false),
Expand Down Expand Up @@ -194,13 +191,10 @@ pub fn map_zmsg_to_shmbuf(
},
NetworkBody::Request(Request { payload, .. }) => match payload {
RequestBody::Query(b) => b.map_to_shmbuf(shmr),
RequestBody::Put(b) => b.map_to_shmbuf(shmr),
RequestBody::Del(_) => Ok(false),
},
NetworkBody::Response(Response { payload, .. }) => match payload {
ResponseBody::Put(b) => b.map_to_shmbuf(shmr),
ResponseBody::Err(b) => b.map_to_shmbuf(shmr),
ResponseBody::Reply(b) => b.map_to_shmbuf(shmr),
ResponseBody::Err(b) => b.map_to_shmbuf(shmr),
},
NetworkBody::ResponseFinal(_) | NetworkBody::Declare(_) | NetworkBody::OAM(_) => Ok(false),
}
Expand Down
3 changes: 0 additions & 3 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -244,9 +244,6 @@ impl Primitives for Face {
msg.ext_nodeid.node_id,
);
}
_ => {
log::error!("{} Unsupported request!", self);
}
}
}

Expand Down
15 changes: 0 additions & 15 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,13 +460,6 @@ macro_rules! inc_req_stats {
if let Some(stats) = $face.stats.as_ref() {
use zenoh_buffers::buffer::Buffer;
match &$body {
RequestBody::Put(p) => {
stats.[<$txrx _z_put_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_put_pl_bytes>].[<inc_ $space>](p.payload.len());
}
RequestBody::Del(_) => {
stats.[<$txrx _z_del_msgs>].[<inc_ $space>](1);
}
RequestBody::Query(q) => {
stats.[<$txrx _z_query_msgs>].[<inc_ $space>](1);
stats.[<$txrx _z_query_pl_bytes>].[<inc_ $space>](
Expand All @@ -491,14 +484,6 @@ macro_rules! inc_res_stats {
if let Some(stats) = $face.stats.as_ref() {
use zenoh_buffers::buffer::Buffer;
match &$body {
ResponseBody::Put(p) => {
stats.[<$txrx _z_put_msgs>].[<inc_ $space>](1);
let mut n = p.payload.len();
if let Some(a) = p.ext_attachment.as_ref() {
n += a.buffer.len();
}
stats.[<$txrx _z_put_pl_bytes>].[<inc_ $space>](n);
}
ResponseBody::Reply(r) => {
stats.[<$txrx _z_reply_msgs>].[<inc_ $space>](1);
let mut n = 0;
Expand Down
96 changes: 49 additions & 47 deletions zenoh/src/net/runtime/adminspace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -388,58 +388,60 @@ impl Primitives for AdminSpace {

fn send_request(&self, msg: Request) {
trace!("recv Request {:?}", msg);
if let RequestBody::Query(query) = msg.payload {
let primitives = zlock!(self.primitives).as_ref().unwrap().clone();
{
let conf = self.context.runtime.state.config.lock();
if !conf.adminspace.permissions().read {
log::error!(
match msg.payload {
RequestBody::Query(query) => {
let primitives = zlock!(self.primitives).as_ref().unwrap().clone();
{
let conf = self.context.runtime.state.config.lock();
if !conf.adminspace.permissions().read {
log::error!(
"Received GET on '{}' but adminspace.permissions.read=false in configuration",
msg.wire_expr
);
primitives.send_response_final(ResponseFinal {
rid: msg.id,
ext_qos: ext::QoSType::RESPONSE_FINAL,
ext_tstamp: None,
});
return;
}
}

let key_expr = match self.key_expr_to_string(&msg.wire_expr) {
Ok(key_expr) => key_expr.into_owned(),
Err(e) => {
log::error!("Unknown KeyExpr: {}", e);
primitives.send_response_final(ResponseFinal {
rid: msg.id,
ext_qos: ext::QoSType::RESPONSE_FINAL,
ext_tstamp: None,
});
return;
primitives.send_response_final(ResponseFinal {
rid: msg.id,
ext_qos: ext::QoSType::RESPONSE_FINAL,
ext_tstamp: None,
});
return;
}
}
};

let zid = self.zid;
let parameters = query.parameters.to_owned();
let query = Query {
inner: Arc::new(QueryInner {
key_expr: key_expr.clone(),
parameters,
value: query
.ext_body
.map(|b| Value::from(b.payload).with_encoding(b.encoding)),
qid: msg.id,
zid,
primitives,
#[cfg(feature = "unstable")]
attachment: query.ext_attachment.map(Into::into),
}),
eid: self.queryable_id,
};

for (key, handler) in &self.handlers {
if key_expr.intersects(key) {
handler(&self.context, query.clone());
let key_expr = match self.key_expr_to_string(&msg.wire_expr) {
Ok(key_expr) => key_expr.into_owned(),
Err(e) => {
log::error!("Unknown KeyExpr: {}", e);
primitives.send_response_final(ResponseFinal {
rid: msg.id,
ext_qos: ext::QoSType::RESPONSE_FINAL,
ext_tstamp: None,
});
return;
}
};

let zid = self.zid;
let parameters = query.parameters.to_owned();
let query = Query {
inner: Arc::new(QueryInner {
key_expr: key_expr.clone(),
parameters,
value: query
.ext_body
.map(|b| Value::from(b.payload).with_encoding(b.encoding)),
qid: msg.id,
zid,
primitives,
#[cfg(feature = "unstable")]
attachment: query.ext_attachment.map(Into::into),
}),
eid: self.queryable_id,
};

for (key, handler) in &self.handlers {
if key_expr.intersects(key) {
handler(&self.context, query.clone());
}
}
}
}
Expand Down
7 changes: 0 additions & 7 deletions zenoh/src/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2087,19 +2087,12 @@ impl Primitives for Session {
#[cfg(feature = "unstable")]
m.ext_attachment.map(Into::into),
),
RequestBody::Put(_) => (),
RequestBody::Del(_) => (),
}
}

fn send_response(&self, msg: Response) {
trace!("recv Response {:?}", msg);
match msg.payload {
ResponseBody::Put(_) => {
log::warn!(
"Received a ResponseBody::Put, but this isn't supported yet. Dropping message."
)
}
ResponseBody::Err(e) => {
let mut state = zwrite!(self.state);
match state.queries.get_mut(&msg.rid) {
Expand Down