Skip to content

Commit

Permalink
Refactor code
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Jan 17, 2024
1 parent a1301d5 commit 6d9f2f6
Show file tree
Hide file tree
Showing 12 changed files with 502 additions and 1,357 deletions.
12 changes: 8 additions & 4 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,8 @@ impl Primitives for Face {
unregister_expr(&self.tables, &mut self.state.clone(), m.id);
}
zenoh_protocol::network::DeclareBody::DeclareSubscriber(m) => {
ctrl_lock.declare_subscription(
declare_subscription(
&ctrl_lock,
&self.tables,
&mut self.state.clone(),
&m.wire_expr,
Expand All @@ -145,15 +146,17 @@ impl Primitives for Face {
);
}
zenoh_protocol::network::DeclareBody::UndeclareSubscriber(m) => {
ctrl_lock.forget_subscription(
undeclare_subscription(
&ctrl_lock,
&self.tables,
&mut self.state.clone(),
&m.ext_wire_expr.wire_expr,
msg.ext_nodeid.node_id,
);
}
zenoh_protocol::network::DeclareBody::DeclareQueryable(m) => {
ctrl_lock.declare_queryable(
declare_queryable(
&ctrl_lock,
&self.tables,
&mut self.state.clone(),
&m.wire_expr,
Expand All @@ -162,7 +165,8 @@ impl Primitives for Face {
);
}
zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => {
ctrl_lock.forget_queryable(
undeclare_queryable(
&ctrl_lock,
&self.tables,
&mut self.state.clone(),
&m.ext_wire_expr.wire_expr,
Expand Down
108 changes: 106 additions & 2 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@ use super::face::FaceState;
use super::resource::{DataRoutes, Direction, PullCaches, Resource};
use super::tables::{NodeId, Route, RoutingExpr, Tables, TablesLock};
use crate::net::routing::dispatcher::face::Face;
use crate::net::routing::hat::HatTrait;
use crate::net::routing::RoutingContext;
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
use std::sync::RwLock;
use std::sync::{Arc, MutexGuard};
use zenoh_core::zread;
use zenoh_protocol::core::key_expr::OwnedKeyExpr;
use zenoh_protocol::core::key_expr::{keyexpr, OwnedKeyExpr};
use zenoh_protocol::network::declare::subscriber::ext::SubscriberInfo;
use zenoh_protocol::network::declare::Mode;
use zenoh_protocol::{
core::{WhatAmI, WireExpr},
Expand All @@ -30,6 +32,108 @@ use zenoh_protocol::{
};
use zenoh_sync::get_mut_unchecked;

pub(crate) fn declare_subscription(
hat_code: &MutexGuard<'_, Box<dyn HatTrait + Send + Sync>>,
tables: &TablesLock,
face: &mut Arc<FaceState>,
expr: &WireExpr,
sub_info: &SubscriberInfo,
node_id: NodeId,
) {
log::debug!("Declare subscription {}", face);
let rtables = zread!(tables.tables);
match rtables
.get_mapping(face, &expr.scope, expr.mapping)
.cloned()
{
Some(mut prefix) => {
let res = Resource::get_resource(&prefix, &expr.suffix);
let (mut res, mut wtables) =
if res.as_ref().map(|r| r.context.is_some()).unwrap_or(false) {
drop(rtables);
let wtables = zwrite!(tables.tables);
(res.unwrap(), wtables)
} else {
let mut fullexpr = prefix.expr();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

hat_code.declare_subscription(&mut wtables, face, &mut res, sub_info, node_id);

disable_matches_data_routes(&mut wtables, &mut res);
drop(wtables);

let rtables = zread!(tables.tables);
let matches_data_routes = compute_matches_data_routes(&rtables, &res);
drop(rtables);

let wtables = zwrite!(tables.tables);
for (mut res, data_routes, matching_pulls) in matches_data_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_data_routes(data_routes);
get_mut_unchecked(&mut res)
.context_mut()
.update_matching_pulls(matching_pulls);
}
drop(wtables);
}
None => log::error!("Declare subscription for unknown scope {}!", expr.scope),
}
}

pub(crate) fn undeclare_subscription(
hat_code: &MutexGuard<'_, Box<dyn HatTrait + Send + Sync>>,
tables: &TablesLock,
face: &mut Arc<FaceState>,
expr: &WireExpr,
node_id: NodeId,
) {
log::debug!("Undeclare subscription {}", face);
let rtables = zread!(tables.tables);
match rtables.get_mapping(face, &expr.scope, expr.mapping) {
Some(prefix) => match Resource::get_resource(prefix, expr.suffix.as_ref()) {
Some(mut res) => {
drop(rtables);
let mut wtables = zwrite!(tables.tables);

hat_code.undeclare_subscription(&mut wtables, face, &mut res, node_id);

disable_matches_data_routes(&mut wtables, &mut res);
drop(wtables);

let rtables = zread!(tables.tables);
let matches_data_routes = compute_matches_data_routes(&rtables, &res);
drop(rtables);

let wtables = zwrite!(tables.tables);
for (mut res, data_routes, matching_pulls) in matches_data_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_data_routes(data_routes);
get_mut_unchecked(&mut res)
.context_mut()
.update_matching_pulls(matching_pulls);
}
Resource::clean(&mut res);
drop(wtables);
}
None => log::error!("Undeclare unknown subscription!"),
},
None => log::error!("Undeclare subscription with unknown scope!"),
}
}

fn compute_data_routes_(tables: &Tables, routes: &mut DataRoutes, expr: &mut RoutingExpr) {
let indexes = tables.hat_code.get_data_routes_entries(tables);

Expand Down
100 changes: 99 additions & 1 deletion zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
use crate::net::routing::hat::HatTrait;
use crate::net::routing::RoutingContext;

//
Expand All @@ -19,8 +20,10 @@ use super::tables::NodeId;
use super::tables::{RoutingExpr, Tables, TablesLock};
use async_trait::async_trait;
use std::collections::HashMap;
use std::sync::{Arc, Weak};
use std::sync::{Arc, MutexGuard, Weak};
use zenoh_config::WhatAmI;
use zenoh_protocol::core::key_expr::keyexpr;
use zenoh_protocol::network::declare::queryable::ext::QueryableInfo;
use zenoh_protocol::{
core::{Encoding, WireExpr},
network::{
Expand All @@ -38,6 +41,101 @@ pub(crate) struct Query {
src_qid: RequestId,
}

pub(crate) fn declare_queryable(
hat_code: &MutexGuard<'_, Box<dyn HatTrait + Send + Sync>>,
tables: &TablesLock,
face: &mut Arc<FaceState>,
expr: &WireExpr,
qabl_info: &QueryableInfo,
node_id: NodeId,
) {
log::debug!("Register queryable {}", face);
let rtables = zread!(tables.tables);
match rtables
.get_mapping(face, &expr.scope, expr.mapping)
.cloned()
{
Some(mut prefix) => {
let res = Resource::get_resource(&prefix, &expr.suffix);
let (mut res, mut wtables) =
if res.as_ref().map(|r| r.context.is_some()).unwrap_or(false) {
drop(rtables);
let wtables = zwrite!(tables.tables);
(res.unwrap(), wtables)
} else {
let mut fullexpr = prefix.expr();
fullexpr.push_str(expr.suffix.as_ref());
let mut matches = keyexpr::new(fullexpr.as_str())
.map(|ke| Resource::get_matches(&rtables, ke))
.unwrap_or_default();
drop(rtables);
let mut wtables = zwrite!(tables.tables);
let mut res =
Resource::make_resource(&mut wtables, &mut prefix, expr.suffix.as_ref());
matches.push(Arc::downgrade(&res));
Resource::match_resource(&wtables, &mut res, matches);
(res, wtables)
};

hat_code.declare_queryable(&mut wtables, face, &mut res, qabl_info, node_id);

disable_matches_query_routes(&mut wtables, &mut res);
drop(wtables);

let rtables = zread!(tables.tables);
let matches_query_routes = compute_matches_query_routes(&rtables, &res);
drop(rtables);

let wtables = zwrite!(tables.tables);
for (mut res, query_routes) in matches_query_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_query_routes(query_routes);
}
drop(wtables);
}
None => log::error!("Declare queryable for unknown scope {}!", expr.scope),
}
}

pub(crate) fn undeclare_queryable(
hat_code: &MutexGuard<'_, Box<dyn HatTrait + Send + Sync>>,
tables: &TablesLock,
face: &mut Arc<FaceState>,
expr: &WireExpr,
node_id: NodeId,
) {
let rtables = zread!(tables.tables);
match rtables.get_mapping(face, &expr.scope, expr.mapping) {
Some(prefix) => match Resource::get_resource(prefix, expr.suffix.as_ref()) {
Some(mut res) => {
drop(rtables);
let mut wtables = zwrite!(tables.tables);

hat_code.undeclare_queryable(&mut wtables, face, &mut res, node_id);

disable_matches_query_routes(&mut wtables, &mut res);
drop(wtables);

let rtables = zread!(tables.tables);
let matches_query_routes = compute_matches_query_routes(&rtables, &res);
drop(rtables);

let wtables = zwrite!(tables.tables);
for (mut res, query_routes) in matches_query_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_query_routes(query_routes);
}
Resource::clean(&mut res);
drop(wtables);
}
None => log::error!("Undeclare unknown queryable!"),
},
None => log::error!("Undeclare queryable with unknown scope!"),
}
}

fn compute_query_routes_(tables: &Tables, routes: &mut QueryRoutes, expr: &mut RoutingExpr) {
let indexes = tables.hat_code.get_query_routes_entries(tables);

Expand Down
Loading

0 comments on commit 6d9f2f6

Please sign in to comment.