Skip to content

Commit

Permalink
Renaming
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jan 15, 2024
1 parent 57a5d8f commit 62049b1
Show file tree
Hide file tree
Showing 5 changed files with 66 additions and 63 deletions.
12 changes: 6 additions & 6 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use super::Primitives;
use crate::net::routing::{
dispatcher::face::Face,
interceptor::{InterceptTrait, InterceptsChain},
interceptor::{InterceptorTrait, InterceptorsChain},
RoutingContext,
};
use std::any::Any;
Expand All @@ -27,29 +27,29 @@ use zenoh_transport::TransportPeerEventHandler;
pub struct DeMux {
face: Face,
pub(crate) transport: Option<TransportUnicast>,
pub(crate) intercept: InterceptsChain,
pub(crate) interceptor: InterceptorsChain,
}

impl DeMux {
pub(crate) fn new(
face: Face,
transport: Option<TransportUnicast>,
intercept: InterceptsChain,
interceptor: InterceptorsChain,
) -> Self {
Self {
face,
transport,
intercept,
interceptor,
}
}
}

impl TransportPeerEventHandler for DeMux {
#[inline]
fn handle_message(&self, mut msg: NetworkMessage) -> ZResult<()> {
if !self.intercept.intercepts.is_empty() {
if !self.interceptor.interceptors.is_empty() {
let ctx = RoutingContext::new_in(msg, self.face.clone());
let ctx = match self.intercept.intercept(ctx) {
let ctx = match self.interceptor.intercept(ctx) {
Some(ctx) => ctx,
None => return Ok(()),
};
Expand Down
32 changes: 16 additions & 16 deletions zenoh/src/net/primitives/mux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@ use std::sync::Arc;
use super::{EPrimitives, Primitives};
use crate::net::routing::{
dispatcher::{face::Face, tables::TablesLock},
interceptor::{InterceptTrait, InterceptsChain},
interceptor::{InterceptorTrait, InterceptorsChain},
RoutingContext,
};
use zenoh_protocol::network::{
Expand All @@ -28,21 +28,21 @@ pub struct Mux {
pub handler: TransportUnicast,
pub(crate) fid: usize,
pub(crate) tables: Arc<TablesLock>,
pub(crate) intercept: InterceptsChain,
pub(crate) interceptor: InterceptorsChain,
}

impl Mux {
pub(crate) fn new(
handler: TransportUnicast,
fid: usize,
tables: Arc<TablesLock>,
intercept: InterceptsChain,
interceptor: InterceptorsChain,
) -> Mux {
Mux {
handler,
fid,
tables,
intercept,
interceptor,
}
}
}
Expand All @@ -65,7 +65,7 @@ impl Primitives for Mux {
state: face.clone(),
},
);
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -88,7 +88,7 @@ impl Primitives for Mux {
state: face.clone(),
},
);
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -111,7 +111,7 @@ impl Primitives for Mux {
state: face.clone(),
},
);
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -134,7 +134,7 @@ impl Primitives for Mux {
state: face.clone(),
},
);
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -157,7 +157,7 @@ impl Primitives for Mux {
state: face.clone(),
},
);
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -181,7 +181,7 @@ impl EPrimitives for Mux {
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -198,7 +198,7 @@ impl EPrimitives for Mux {
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -215,7 +215,7 @@ impl EPrimitives for Mux {
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -232,7 +232,7 @@ impl EPrimitives for Mux {
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -249,7 +249,7 @@ impl EPrimitives for Mux {
prefix: ctx.prefix,
full_expr: ctx.full_expr,
};
if let Some(ctx) = self.intercept.intercept(ctx) {
if let Some(ctx) = self.interceptor.intercept(ctx) {
let _ = self.handler.schedule(ctx.msg);
}
}
Expand All @@ -263,15 +263,15 @@ pub struct McastMux {
pub handler: TransportMulticast,
pub(crate) fid: usize,
pub(crate) tables: Arc<TablesLock>,
pub(crate) intercept: InterceptsChain,
pub(crate) intercept: InterceptorsChain,
}

impl McastMux {
pub(crate) fn new(
handler: TransportMulticast,
fid: usize,
tables: Arc<TablesLock>,
intercept: InterceptsChain,
intercept: InterceptorsChain,
) -> McastMux {
McastMux {
handler,
Expand Down
8 changes: 4 additions & 4 deletions zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@ pub use super::queries::*;
pub use super::resource::*;
use crate::net::routing::hat;
use crate::net::routing::hat::HatTrait;
use crate::net::routing::interceptor::interceptors;
use crate::net::routing::interceptor::Interceptor;
use crate::net::routing::interceptor::interceptor_factories;
use crate::net::routing::interceptor::InterceptorFactory;
use std::any::Any;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
Expand Down Expand Up @@ -69,7 +69,7 @@ pub struct Tables {
pub(crate) faces: HashMap<usize, Arc<FaceState>>,
pub(crate) mcast_groups: Vec<Arc<FaceState>>,
pub(crate) mcast_faces: Vec<Arc<FaceState>>,
pub(crate) interceptors: Vec<Interceptor>,
pub(crate) interceptors: Vec<InterceptorFactory>,
pub(crate) pull_caches_lock: Mutex<()>,
pub(crate) hat: Box<dyn Any + Send + Sync>,
pub(crate) hat_code: Arc<dyn HatTrait + Send + Sync>, // TODO make this a Box
Expand All @@ -96,7 +96,7 @@ impl Tables {
faces: HashMap::new(),
mcast_groups: vec![],
mcast_faces: vec![],
interceptors: interceptors(config),
interceptors: interceptor_factories(config),
pull_caches_lock: Mutex::new(()),
hat: hat_code.new_tables(router_peers_failover_brokering),
hat_code: hat_code.into(),
Expand Down
55 changes: 29 additions & 26 deletions zenoh/src/net/routing/interceptor/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,58 +22,61 @@ use zenoh_config::Config;
use zenoh_protocol::network::NetworkMessage;
use zenoh_transport::{multicast::TransportMulticast, unicast::TransportUnicast};

pub(crate) trait InterceptTrait {
pub(crate) trait InterceptorTrait {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>>;
}

pub(crate) type Intercept = Box<dyn InterceptTrait + Send + Sync>;
pub(crate) type IngressIntercept = Intercept;
pub(crate) type EgressIntercept = Intercept;
pub(crate) type Interceptor = Box<dyn InterceptorTrait + Send + Sync>;
pub(crate) type IngressInterceptor = Interceptor;
pub(crate) type EgressInterceptor = Interceptor;

pub(crate) trait InterceptorTrait {
pub(crate) trait InterceptorFactoryTrait {
fn new_transport_unicast(
&self,
transport: &TransportUnicast,
) -> (Option<IngressIntercept>, Option<EgressIntercept>);
fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option<EgressIntercept>;
fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option<IngressIntercept>;
) -> (Option<IngressInterceptor>, Option<EgressInterceptor>);
fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option<EgressInterceptor>;
fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option<IngressInterceptor>;
}

pub(crate) type Interceptor = Box<dyn InterceptorTrait + Send + Sync>;
pub(crate) type InterceptorFactory = Box<dyn InterceptorFactoryTrait + Send + Sync>;

pub(crate) fn interceptors(_config: &Config) -> Vec<Interceptor> {
pub(crate) fn interceptor_factories(_config: &Config) -> Vec<InterceptorFactory> {
// Add interceptors here
// TODO build the list of intercetors with the correct order from the config
// vec![Box::new(LoggerInterceptor {})]
vec![]
}

pub(crate) struct InterceptsChain {
pub(crate) intercepts: Vec<Intercept>,
pub(crate) struct InterceptorsChain {
pub(crate) interceptors: Vec<Interceptor>,
}

impl InterceptsChain {
impl InterceptorsChain {
#[allow(dead_code)]
pub(crate) fn empty() -> Self {
Self { intercepts: vec![] }
Self {
interceptors: vec![],
}
}
}

impl From<Vec<Intercept>> for InterceptsChain {
fn from(intercepts: Vec<Intercept>) -> Self {
InterceptsChain { intercepts }
impl From<Vec<Interceptor>> for InterceptorsChain {
fn from(interceptors: Vec<Interceptor>) -> Self {
InterceptorsChain { interceptors }
}
}

impl InterceptTrait for InterceptsChain {
impl InterceptorTrait for InterceptorsChain {
fn intercept(
&self,
mut ctx: RoutingContext<NetworkMessage>,
) -> Option<RoutingContext<NetworkMessage>> {
for intercept in &self.intercepts {
match intercept.intercept(ctx) {
for interceptor in &self.interceptors {
match interceptor.intercept(ctx) {
Some(newctx) => ctx = newctx,
None => {
log::trace!("Msg intercepted!");
Expand All @@ -87,7 +90,7 @@ impl InterceptTrait for InterceptsChain {

pub(crate) struct IngressMsgLogger {}

impl InterceptTrait for IngressMsgLogger {
impl InterceptorTrait for IngressMsgLogger {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
Expand All @@ -105,7 +108,7 @@ impl InterceptTrait for IngressMsgLogger {
}
pub(crate) struct EgressMsgLogger {}

impl InterceptTrait for EgressMsgLogger {
impl InterceptorTrait for EgressMsgLogger {
fn intercept(
&self,
ctx: RoutingContext<NetworkMessage>,
Expand All @@ -117,24 +120,24 @@ impl InterceptTrait for EgressMsgLogger {

pub(crate) struct LoggerInterceptor {}

impl InterceptorTrait for LoggerInterceptor {
impl InterceptorFactoryTrait for LoggerInterceptor {
fn new_transport_unicast(
&self,
transport: &TransportUnicast,
) -> (Option<IngressIntercept>, Option<EgressIntercept>) {
) -> (Option<IngressInterceptor>, Option<EgressInterceptor>) {
log::debug!("New transport unicast {:?}", transport);
(
Some(Box::new(IngressMsgLogger {})),
Some(Box::new(EgressMsgLogger {})),
)
}

fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option<EgressIntercept> {
fn new_transport_multicast(&self, transport: &TransportMulticast) -> Option<EgressInterceptor> {
log::debug!("New transport multicast {:?}", transport);
Some(Box::new(EgressMsgLogger {}))
}

fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option<IngressIntercept> {
fn new_peer_multicast(&self, transport: &TransportMulticast) -> Option<IngressInterceptor> {
log::debug!("New peer multicast {:?}", transport);
Some(Box::new(IngressMsgLogger {}))
}
Expand Down
Loading

0 comments on commit 62049b1

Please sign in to comment.