Skip to content

Commit

Permalink
Revert "Optimize RoutingContext keyexpr for Query and Response messag…
Browse files Browse the repository at this point in the history
…es (#1266)"

This reverts commit e38fc16.
  • Loading branch information
Mallets authored Jul 26, 2024
1 parent e38fc16 commit 2532c11
Show file tree
Hide file tree
Showing 8 changed files with 269 additions and 233 deletions.
12 changes: 6 additions & 6 deletions zenoh/src/api/session.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2799,18 +2799,18 @@ impl crate::net::primitives::EPrimitives for Session {
}

#[inline]
fn send_request(&self, msg: Request) {
(self as &dyn Primitives).send_request(msg)
fn send_request(&self, ctx: crate::net::routing::RoutingContext<Request>) {
(self as &dyn Primitives).send_request(ctx.msg)
}

#[inline]
fn send_response(&self, msg: Response) {
(self as &dyn Primitives).send_response(msg)
fn send_response(&self, ctx: crate::net::routing::RoutingContext<Response>) {
(self as &dyn Primitives).send_response(ctx.msg)
}

#[inline]
fn send_response_final(&self, msg: ResponseFinal) {
(self as &dyn Primitives).send_response_final(msg)
fn send_response_final(&self, ctx: crate::net::routing::RoutingContext<ResponseFinal>) {
(self as &dyn Primitives).send_response_final(ctx.msg)
}

fn as_any(&self) -> &dyn std::any::Any {
Expand Down
12 changes: 6 additions & 6 deletions zenoh/src/net/primitives/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ pub(crate) trait EPrimitives: Send + Sync {

fn send_push(&self, msg: Push);

fn send_request(&self, msg: Request);
fn send_request(&self, ctx: RoutingContext<Request>);

fn send_response(&self, msg: Response);
fn send_response(&self, ctx: RoutingContext<Response>);

fn send_response_final(&self, msg: ResponseFinal);
fn send_response_final(&self, ctx: RoutingContext<ResponseFinal>);
}

#[derive(Default)]
Expand Down Expand Up @@ -82,11 +82,11 @@ impl EPrimitives for DummyPrimitives {

fn send_push(&self, _msg: Push) {}

fn send_request(&self, _msg: Request) {}
fn send_request(&self, _ctx: RoutingContext<Request>) {}

fn send_response(&self, _msg: Response) {}
fn send_response(&self, _ctx: RoutingContext<Response>) {}

fn send_response_final(&self, _msg: ResponseFinal) {}
fn send_response_final(&self, _ctx: RoutingContext<ResponseFinal>) {}

fn as_any(&self) -> &dyn Any {
self
Expand Down
246 changes: 126 additions & 120 deletions zenoh/src/net/primitives/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,75 +271,78 @@ impl EPrimitives for Mux {
}
}

fn send_request(&self, msg: Request) {
let msg = NetworkMessage {
body: NetworkBody::Request(msg),
#[cfg(feature = "stats")]
size: None,
fn send_request(&self, ctx: RoutingContext<Request>) {
let ctx = RoutingContext {
msg: NetworkMessage {
body: NetworkBody::Request(ctx.msg),
#[cfg(feature = "stats")]
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}

fn send_response(&self, msg: Response) {
let msg = NetworkMessage {
body: NetworkBody::Response(msg),
#[cfg(feature = "stats")]
size: None,
fn send_response(&self, ctx: RoutingContext<Response>) {
let ctx = RoutingContext {
msg: NetworkMessage {
body: NetworkBody::Response(ctx.msg),
#[cfg(feature = "stats")]
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}

fn send_response_final(&self, msg: ResponseFinal) {
let msg = NetworkMessage {
body: NetworkBody::ResponseFinal(msg),
#[cfg(feature = "stats")]
size: None,
fn send_response_final(&self, ctx: RoutingContext<ResponseFinal>) {
let ctx = RoutingContext {
msg: NetworkMessage {
body: NetworkBody::ResponseFinal(ctx.msg),
#[cfg(feature = "stats")]
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get().and_then(|f| f.upgrade()) {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(&face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}

Expand Down Expand Up @@ -589,75 +592,78 @@ impl EPrimitives for McastMux {
}
}

fn send_request(&self, msg: Request) {
let msg = NetworkMessage {
body: NetworkBody::Request(msg),
#[cfg(feature = "stats")]
size: None,
fn send_request(&self, ctx: RoutingContext<Request>) {
let ctx = RoutingContext {
msg: NetworkMessage {
body: NetworkBody::Request(ctx.msg),
#[cfg(feature = "stats")]
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}

fn send_response(&self, msg: Response) {
let msg = NetworkMessage {
body: NetworkBody::Response(msg),
#[cfg(feature = "stats")]
size: None,
fn send_response(&self, ctx: RoutingContext<Response>) {
let ctx = RoutingContext {
msg: NetworkMessage {
body: NetworkBody::Response(ctx.msg),
#[cfg(feature = "stats")]
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}

fn send_response_final(&self, msg: ResponseFinal) {
let msg = NetworkMessage {
body: NetworkBody::ResponseFinal(msg),
#[cfg(feature = "stats")]
size: None,
fn send_response_final(&self, ctx: RoutingContext<ResponseFinal>) {
let ctx = RoutingContext {
msg: NetworkMessage {
body: NetworkBody::ResponseFinal(ctx.msg),
#[cfg(feature = "stats")]
size: None,
},
inface: ctx.inface,
outface: ctx.outface,
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if self.interceptor.interceptors.is_empty() {
let _ = self.handler.schedule(msg);
} else if let Some(face) = self.face.get() {
let ctx = RoutingContext::new_out(msg, face.clone());
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix.as_ref().and_then(|p| p.get_egress_cache(face));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
} else {
tracing::error!("Uninitialized multiplexer!");
let prefix = ctx
.wire_expr()
.and_then(|we| (!we.has_suffix()).then(|| ctx.prefix()))
.flatten()
.cloned();
let cache = prefix
.as_ref()
.and_then(|p| p.get_egress_cache(ctx.outface.get().unwrap()));
if let Some(ctx) = self.interceptor.intercept(ctx, cache) {
let _ = self.handler.schedule(ctx.msg);
}
}

Expand Down
Loading

0 comments on commit 2532c11

Please sign in to comment.