Skip to content

Commit

Permalink
do not construct Push when unnecessary
Browse files Browse the repository at this point in the history
  • Loading branch information
yellowhatter committed Nov 22, 2024
1 parent f965e1e commit e5eab3b
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 23 deletions.
36 changes: 34 additions & 2 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1972,7 +1972,38 @@ impl SessionInner {
let primitives = zread!(self.state).primitives()?;
primitives.opt_send_push(
&wire_expr,
|| Push {
|| { (
push::ext::QoSType::new(
priority.into(),
congestion_control,
is_express,
),
match kind {
SampleKind::Put => PushBody::Put(Put {
timestamp,
encoding: encoding.clone().into(),
#[cfg(feature = "unstable")]
ext_sinfo: source_info.into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
#[cfg(feature = "shared-memory")]
ext_shm: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
payload: payload.clone().into(),
}),
SampleKind::Delete => PushBody::Del(Del {
timestamp,
#[cfg(feature = "unstable")]
ext_sinfo: source_info.into(),
#[cfg(not(feature = "unstable"))]
ext_sinfo: None,
ext_attachment: attachment.clone().map(|a| a.into()),
ext_unknown: vec![],
}),
},
)
/*Push {
wire_expr: wire_expr.to_owned(),
ext_qos: push::ext::QoSType::new(
priority.into(),
Expand Down Expand Up @@ -2005,7 +2036,8 @@ impl SessionInner {
ext_unknown: vec![],
}),
},
},
}*/
},
#[cfg(feature = "unstable")]
reliability,
#[cfg(not(feature = "unstable"))]
Expand Down
4 changes: 2 additions & 2 deletions zenoh/src/net/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,13 @@ pub use demux::*;
pub use mux::*;
use zenoh_protocol::{
core::{Reliability, WireExpr},
network::{interest::Interest, Declare, Push, Request, Response, ResponseFinal},
network::{interest::Interest, push, Declare, Push, Request, Response, ResponseFinal}, zenoh::PushBody,
};

use super::routing::RoutingContext;

pub trait OptPrimitives: Send + Sync {
fn opt_send_push<F: FnOnce()->Push>(&self,wire_expr: &WireExpr<'_>, fn_msg: F, reliability: Reliability);
fn opt_send_push<F: FnOnce()->(push::ext::QoSType, PushBody)>(&self,wire_expr: &WireExpr<'_>, fn_msg: F, reliability: Reliability);
}

pub trait Primitives: Send + Sync {
Expand Down
7 changes: 3 additions & 4 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,9 @@ use tokio_util::sync::CancellationToken;
use zenoh_protocol::{
core::{ExprId, Reliability, WhatAmI, ZenohIdProto},
network::{
interest::{InterestId, InterestMode, InterestOptions},
Mapping, Push, Request, RequestId, Response, ResponseFinal,
interest::{InterestId, InterestMode, InterestOptions}, push, Mapping, Push, Request, RequestId, Response, ResponseFinal
},
zenoh::RequestBody,
zenoh::{PushBody, RequestBody},
};
use zenoh_sync::get_mut_unchecked;
use zenoh_task::TaskController;
Expand Down Expand Up @@ -217,7 +216,7 @@ impl Face {

impl OptPrimitives for Face {
#[inline]
fn opt_send_push<F: FnOnce()->Push>(&self,wire_expr: &zenoh_protocol::core::WireExpr<'_>, fn_msg: F, reliability: Reliability) {
fn opt_send_push<F: FnOnce()->(push::ext::QoSType, PushBody)>(&self,wire_expr: &zenoh_protocol::core::WireExpr<'_>, fn_msg: F, reliability: Reliability) {
opt_route_data(&self.tables, &self.state, wire_expr, fn_msg, reliability);
}
}
Expand Down
29 changes: 14 additions & 15 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,7 @@ use zenoh_core::zread;
use zenoh_protocol::{
core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr},
network::{
declare::{ext, SubscriberId},
Push,
declare::{ext, SubscriberId}, push, Push
},
zenoh::PushBody,
};
Expand Down Expand Up @@ -524,7 +523,7 @@ pub fn route_data(
}


pub fn opt_route_data<F: FnOnce()->Push>(
pub fn opt_route_data<F: FnOnce()->(push::ext::QoSType, PushBody)>(
tables_ref: &Arc<TablesLock>,
face: &FaceState,
wire_expr: &WireExpr<'_>,
Expand All @@ -537,14 +536,13 @@ pub fn opt_route_data<F: FnOnce()->Push>(
.cloned()
{
Some(prefix) => {
let mut msg = fn_msg();
tracing::trace!(
"{} Route data for res {}{}",
face,
prefix.expr(),
msg.wire_expr.suffix.as_ref()
wire_expr.suffix.as_ref()
);
let mut expr = RoutingExpr::new(&prefix, msg.wire_expr.suffix.as_ref());
let mut expr = RoutingExpr::new(&prefix, wire_expr.suffix.as_ref());

#[cfg(feature = "stats")]
let admin = expr.full_expr().starts_with("@/");
Expand All @@ -558,10 +556,11 @@ pub fn opt_route_data<F: FnOnce()->Push>(
if tables.hat_code.ingress_filter(&tables, face, &mut expr) {
let res = Resource::get_resource(&prefix, expr.suffix);

let route = get_data_route(&tables, face, &res, &mut expr, msg.ext_nodeid.node_id);
let route = get_data_route(&tables, face, &res, &mut expr, push::ext::NodeIdType::DEFAULT.node_id);

if !route.is_empty() {
treat_timestamp!(&tables.hlc, msg.payload, tables.drop_future_timestamp);
let (ext_qos, mut payload) = fn_msg();
treat_timestamp!(&tables.hlc, payload, tables.drop_future_timestamp);

if route.len() == 1 {
let (outface, key_expr, context) = route.values().next().unwrap();
Expand All @@ -580,10 +579,10 @@ pub fn opt_route_data<F: FnOnce()->Push>(
outface.primitives.send_push(
Push {
wire_expr: key_expr.into(),
ext_qos: msg.ext_qos,
ext_tstamp: msg.ext_tstamp,
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: *context },
payload: msg.payload,
payload,
},
reliability,
)
Expand Down Expand Up @@ -611,10 +610,10 @@ pub fn opt_route_data<F: FnOnce()->Push>(
outface.primitives.send_push(
Push {
wire_expr: key_expr,
ext_qos: msg.ext_qos,
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: context },
payload: msg.payload.clone(),
payload: payload.clone(),
},
reliability,
)
Expand All @@ -638,10 +637,10 @@ pub fn opt_route_data<F: FnOnce()->Push>(
outface.primitives.send_push(
Push {
wire_expr: key_expr.into(),
ext_qos: msg.ext_qos,
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: *context },
payload: msg.payload.clone(),
payload: payload.clone(),
},
reliability,
)
Expand Down

0 comments on commit e5eab3b

Please sign in to comment.