Skip to content

Commit

Permalink
Perf improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jan 5, 2024
1 parent 7cd513a commit b530da2
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 46 deletions.
27 changes: 17 additions & 10 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,11 @@
// ZettaScale Zenoh Team, <[email protected]>
//
use super::Primitives;
use crate::net::routing::{dispatcher::face::Face, interceptor::IngressIntercept, RoutingContext};
use crate::net::routing::{
dispatcher::face::Face,
interceptor::{InterceptTrait, InterceptsChain},
RoutingContext,
};
use std::any::Any;
use zenoh_link::Link;
use zenoh_protocol::network::{NetworkBody, NetworkMessage};
Expand All @@ -22,14 +26,14 @@ use zenoh_transport::{TransportPeerEventHandler, TransportUnicast};
pub struct DeMux {
face: Face,
pub(crate) transport: Option<TransportUnicast>,
pub(crate) intercept: IngressIntercept,
pub(crate) intercept: InterceptsChain,
}

impl DeMux {
pub(crate) fn new(
face: Face,
transport: Option<TransportUnicast>,
intercept: IngressIntercept,
intercept: InterceptsChain,
) -> Self {
Self {
face,
Expand All @@ -41,14 +45,17 @@ impl DeMux {

impl TransportPeerEventHandler for DeMux {
#[inline]
fn handle_message(&self, msg: NetworkMessage) -> ZResult<()> {
let ctx = RoutingContext::with_face(msg, self.face.clone());
let ctx = match self.intercept.intercept(ctx) {
Some(ctx) => ctx,
None => return Ok(()),
};
fn handle_message(&self, mut msg: NetworkMessage) -> ZResult<()> {
if !self.intercept.intercepts.is_empty() {
let ctx = RoutingContext::new_in(msg, self.face.clone());
let ctx = match self.intercept.intercept(ctx) {
Some(ctx) => ctx,
None => return Ok(()),
};
msg = ctx.msg;
}

match ctx.msg.body {
match msg.body {
NetworkBody::Push(m) => self.face.send_push(m),
NetworkBody::Declare(m) => self.face.send_declare(m),
NetworkBody::Request(m) => self.face.send_request(m),
Expand Down
40 changes: 25 additions & 15 deletions zenoh/src/net/primitives/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use super::{EPrimitives, Primitives};
use crate::net::routing::{
dispatcher::{face::Face, tables::TablesLock},
interceptor::EgressIntercept,
interceptor::{InterceptTrait, InterceptsChain},
RoutingContext,
};
use zenoh_protocol::network::{
Expand All @@ -28,15 +28,15 @@ pub struct Mux {
pub handler: TransportUnicast,
pub(crate) fid: usize,
pub(crate) tables: Arc<TablesLock>,
pub(crate) intercept: EgressIntercept,
pub(crate) intercept: InterceptsChain,
}

impl Mux {
pub(crate) fn new(
handler: TransportUnicast,
fid: usize,
tables: Arc<TablesLock>,
intercept: EgressIntercept,
intercept: InterceptsChain,
) -> Mux {
Mux {
handler,
Expand All @@ -58,7 +58,7 @@ impl Primitives for Mux {
let face = tables.faces.get(&self.fid).cloned();
drop(tables);
if let Some(face) = face {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -81,7 +81,7 @@ impl Primitives for Mux {
let face = tables.faces.get(&self.fid).cloned();
drop(tables);
if let Some(face) = face {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -104,7 +104,7 @@ impl Primitives for Mux {
let face = tables.faces.get(&self.fid).cloned();
drop(tables);
if let Some(face) = face {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -127,7 +127,7 @@ impl Primitives for Mux {
let face = tables.faces.get(&self.fid).cloned();
drop(tables);
if let Some(face) = face {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -150,7 +150,7 @@ impl Primitives for Mux {
let face = tables.faces.get(&self.fid).cloned();
drop(tables);
if let Some(face) = face {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -177,6 +177,7 @@ impl EPrimitives for Mux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand All @@ -193,6 +194,7 @@ impl EPrimitives for Mux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand All @@ -209,6 +211,7 @@ impl EPrimitives for Mux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand All @@ -225,6 +228,7 @@ impl EPrimitives for Mux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand All @@ -241,6 +245,7 @@ impl EPrimitives for Mux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand All @@ -258,15 +263,15 @@ pub struct McastMux {
pub handler: TransportMulticast,
pub(crate) fid: usize,
pub(crate) tables: Arc<TablesLock>,
pub(crate) intercept: EgressIntercept,
pub(crate) intercept: InterceptsChain,
}

impl McastMux {
pub(crate) fn new(
handler: TransportMulticast,
fid: usize,
tables: Arc<TablesLock>,
intercept: EgressIntercept,
intercept: InterceptsChain,
) -> McastMux {
McastMux {
handler,
Expand All @@ -285,7 +290,7 @@ impl Primitives for McastMux {
size: None,
};
if let Some(face) = zread!(self.tables.tables).faces.get(&self.fid).cloned() {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -305,7 +310,7 @@ impl Primitives for McastMux {
size: None,
};
if let Some(face) = zread!(self.tables.tables).faces.get(&self.fid).cloned() {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -325,7 +330,7 @@ impl Primitives for McastMux {
size: None,
};
if let Some(face) = zread!(self.tables.tables).faces.get(&self.fid).cloned() {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -345,7 +350,7 @@ impl Primitives for McastMux {
size: None,
};
if let Some(face) = zread!(self.tables.tables).faces.get(&self.fid).cloned() {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -365,7 +370,7 @@ impl Primitives for McastMux {
size: None,
};
if let Some(face) = zread!(self.tables.tables).faces.get(&self.fid).cloned() {
let ctx = RoutingContext::with_face(
let ctx = RoutingContext::new_in(
msg,
Face {
tables: self.tables.clone(),
Expand All @@ -392,6 +397,7 @@ impl EPrimitives for McastMux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand All @@ -408,6 +414,7 @@ impl EPrimitives for McastMux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand All @@ -424,6 +431,7 @@ impl EPrimitives for McastMux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand All @@ -440,6 +448,7 @@ impl EPrimitives for McastMux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand All @@ -456,6 +465,7 @@ impl EPrimitives for McastMux {
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
Expand Down
15 changes: 14 additions & 1 deletion zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,19 @@ impl FaceState {
}
}

#[inline]
#[allow(clippy::trivially_copy_pass_by_ref)]
pub(crate) fn get_sent_mapping(
&self,
prefixid: &ExprId,
mapping: Mapping,
) -> Option<&std::sync::Arc<Resource>> {
match mapping {
Mapping::Sender => self.local_mappings.get(prefixid),
Mapping::Receiver => self.remote_mappings.get(prefixid),
}
}

pub(crate) fn get_next_local_id(&self) -> ExprId {
let mut id = 1;
while self.local_mappings.get(&id).is_some() || self.remote_mappings.get(&id).is_some() {
Expand Down Expand Up @@ -159,7 +172,7 @@ impl Primitives for Face {
#[inline]
fn send_push(&self, msg: Push) {
full_reentrant_route_data(
&self.tables.tables,
&self.tables,
&self.state,
&msg.wire_expr,
msg.ext_qos,
Expand Down
28 changes: 19 additions & 9 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,8 @@
//
use super::face::FaceState;
use super::resource::{DataRoutes, Direction, PullCaches, Resource};
use super::tables::{NodeId, Route, RoutingExpr, Tables};
use super::tables::{NodeId, Route, RoutingExpr, Tables, TablesLock};
use crate::net::routing::dispatcher::face::Face;
use crate::net::routing::RoutingContext;
use std::sync::Arc;
use std::sync::RwLock;
Expand Down Expand Up @@ -199,14 +200,14 @@ macro_rules! inc_stats {

#[allow(clippy::too_many_arguments)]
pub fn full_reentrant_route_data(
tables_ref: &RwLock<Tables>,
tables_ref: &Arc<TablesLock>,
face: &FaceState,
expr: &WireExpr,
ext_qos: ext::QoSType,
mut payload: PushBody,
routing_context: NodeId,
) {
let tables = zread!(tables_ref);
let tables = zread!(tables_ref.tables);
match tables.get_mapping(face, &expr.scope, expr.mapping).cloned() {
Some(prefix) => {
log::trace!(
Expand Down Expand Up @@ -249,15 +250,18 @@ pub fn full_reentrant_route_data(
inc_stats!(face, tx, admin, payload)
}

outface.primitives.send_push(RoutingContext::with_expr(
outface.primitives.send_push(RoutingContext::new_out(
Push {
wire_expr: key_expr.into(),
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: *context },
payload,
},
expr.full_expr().to_string(),
Face {
tables: tables_ref.clone(),
state: outface.clone(),
},
))
}
} else {
Expand Down Expand Up @@ -287,15 +291,18 @@ pub fn full_reentrant_route_data(
inc_stats!(face, tx, admin, payload)
}

outface.primitives.send_push(RoutingContext::with_expr(
outface.primitives.send_push(RoutingContext::new_out(
Push {
wire_expr: key_expr,
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: context },
payload: payload.clone(),
},
expr.full_expr().to_string(),
Face {
tables: tables_ref.clone(),
state: outface.clone(),
},
))
}
} else {
Expand All @@ -317,15 +324,18 @@ pub fn full_reentrant_route_data(
inc_stats!(face, tx, admin, payload)
}

outface.primitives.send_push(RoutingContext::with_expr(
outface.primitives.send_push(RoutingContext::new_out(
Push {
wire_expr: key_expr.into(),
ext_qos,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType { node_id: *context },
payload: payload.clone(),
},
expr.full_expr().to_string(),
Face {
tables: tables_ref.clone(),
state: outface.clone(),
},
))
}
}
Expand Down
Loading

0 comments on commit b530da2

Please sign in to comment.