Skip to content

Commit

Permalink
Update routes pre computation strategy to pre compute less routes (#1717
Browse files Browse the repository at this point in the history
)

* Only compute data/query routes on nonwild keys with subs/qabls

* Fix regression in resource cleaning

* Compute identical routes from different source types once
  • Loading branch information
OlivierHecart authored Jan 15, 2025
1 parent f90587b commit 1e3a496
Show file tree
Hide file tree
Showing 16 changed files with 347 additions and 250 deletions.
64 changes: 16 additions & 48 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,14 @@
// Contributors:
// ZettaScale Zenoh Team, <[email protected]>
//
use std::{collections::HashMap, sync::Arc};

#[zenoh_macros::unstable]
use std::collections::HashMap;
use std::sync::Arc;

use zenoh_core::zread;
use zenoh_protocol::{
core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr},
core::{key_expr::keyexpr, Reliability, WireExpr},
network::{
declare::{ext, SubscriberId},
Push,
Expand Down Expand Up @@ -177,61 +180,24 @@ pub(crate) fn undeclare_subscription(
}
}

fn compute_data_routes_(tables: &Tables, routes: &mut DataRoutes, expr: &mut RoutingExpr) {
let indexes = tables.hat_code.get_data_routes_entries(tables);

let max_idx = indexes.routers.iter().max().unwrap();
routes
.routers
.resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new()));

for idx in indexes.routers {
routes.routers[idx as usize] =
tables
.hat_code
.compute_data_route(tables, expr, idx, WhatAmI::Router);
}

let max_idx = indexes.peers.iter().max().unwrap();
routes
.peers
.resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new()));

for idx in indexes.peers {
routes.peers[idx as usize] =
tables
.hat_code
.compute_data_route(tables, expr, idx, WhatAmI::Peer);
}

let max_idx = indexes.clients.iter().max().unwrap();
routes
.clients
.resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new()));

for idx in indexes.clients {
routes.clients[idx as usize] =
tables
.hat_code
.compute_data_route(tables, expr, idx, WhatAmI::Client);
}
}

pub(crate) fn compute_data_routes(tables: &Tables, expr: &mut RoutingExpr) -> DataRoutes {
let mut routes = DataRoutes::default();
compute_data_routes_(tables, &mut routes, expr);
tables
.hat_code
.compute_data_routes(tables, &mut routes, expr);
routes
}

pub(crate) fn update_data_routes(tables: &Tables, res: &mut Arc<Resource>) {
if res.context.is_some() {
if res.context.is_some() && !res.expr().contains('*') && res.has_subs() {
let mut res_mut = res.clone();
let res_mut = get_mut_unchecked(&mut res_mut);
compute_data_routes_(
tables.hat_code.compute_data_routes(
tables,
&mut res_mut.context_mut().data_routes,
&mut RoutingExpr::new(res, ""),
);
res_mut.context_mut().valid_data_routes = true;
}
}

Expand All @@ -249,11 +215,13 @@ pub(crate) fn compute_matches_data_routes<'a>(
) -> Vec<(Arc<Resource>, DataRoutes)> {
let mut routes = vec![];
if res.context.is_some() {
let mut expr = RoutingExpr::new(res, "");
routes.push((res.clone(), compute_data_routes(tables, &mut expr)));
if !res.expr().contains('*') && res.has_subs() {
let mut expr = RoutingExpr::new(res, "");
routes.push((res.clone(), compute_data_routes(tables, &mut expr)));
}
for match_ in &res.context().matches {
let match_ = match_.upgrade().unwrap();
if !Arc::ptr_eq(&match_, res) {
if !Arc::ptr_eq(&match_, res) && !match_.expr().contains('*') && match_.has_subs() {
let mut expr = RoutingExpr::new(&match_, "");
let match_routes = compute_data_routes(tables, &mut expr);
routes.push((match_, match_routes));
Expand Down
56 changes: 10 additions & 46 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ use std::{
use async_trait::async_trait;
use tokio_util::sync::CancellationToken;
use zenoh_buffers::ZBuf;
use zenoh_config::WhatAmI;
#[cfg(feature = "stats")]
use zenoh_protocol::zenoh::reply::ReplyBody;
use zenoh_protocol::{
Expand Down Expand Up @@ -205,61 +204,24 @@ pub(crate) fn undeclare_queryable(
}
}

fn compute_query_routes_(tables: &Tables, routes: &mut QueryRoutes, expr: &mut RoutingExpr) {
let indexes = tables.hat_code.get_query_routes_entries(tables);

let max_idx = indexes.routers.iter().max().unwrap();
routes.routers.resize_with((*max_idx as usize) + 1, || {
Arc::new(QueryTargetQablSet::new())
});

for idx in indexes.routers {
routes.routers[idx as usize] =
tables
.hat_code
.compute_query_route(tables, expr, idx, WhatAmI::Router);
}

let max_idx = indexes.peers.iter().max().unwrap();
routes.peers.resize_with((*max_idx as usize) + 1, || {
Arc::new(QueryTargetQablSet::new())
});

for idx in indexes.peers {
routes.peers[idx as usize] =
tables
.hat_code
.compute_query_route(tables, expr, idx, WhatAmI::Peer);
}

let max_idx = indexes.clients.iter().max().unwrap();
routes.clients.resize_with((*max_idx as usize) + 1, || {
Arc::new(QueryTargetQablSet::new())
});

for idx in indexes.clients {
routes.clients[idx as usize] =
tables
.hat_code
.compute_query_route(tables, expr, idx, WhatAmI::Client);
}
}

pub(crate) fn compute_query_routes(tables: &Tables, res: &Arc<Resource>) -> QueryRoutes {
let mut routes = QueryRoutes::default();
compute_query_routes_(tables, &mut routes, &mut RoutingExpr::new(res, ""));
tables
.hat_code
.compute_query_routes(tables, &mut routes, &mut RoutingExpr::new(res, ""));
routes
}

pub(crate) fn update_query_routes(tables: &Tables, res: &Arc<Resource>) {
if res.context.is_some() {
if res.context.is_some() && !res.expr().contains('*') && res.has_qabls() {
let mut res_mut = res.clone();
let res_mut = get_mut_unchecked(&mut res_mut);
compute_query_routes_(
tables.hat_code.compute_query_routes(
tables,
&mut res_mut.context_mut().query_routes,
&mut RoutingExpr::new(res, ""),
);
res_mut.context_mut().valid_query_routes = true;
}
}

Expand All @@ -277,10 +239,12 @@ pub(crate) fn compute_matches_query_routes(
) -> Vec<(Arc<Resource>, QueryRoutes)> {
let mut routes = vec![];
if res.context.is_some() {
routes.push((res.clone(), compute_query_routes(tables, res)));
if !res.expr().contains('*') && res.has_qabls() {
routes.push((res.clone(), compute_query_routes(tables, res)));
}
for match_ in &res.context().matches {
let match_ = match_.upgrade().unwrap();
if !Arc::ptr_eq(&match_, res) {
if !Arc::ptr_eq(&match_, res) && !match_.expr().contains('*') && match_.has_qabls() {
let match_routes = compute_query_routes(tables, &match_);
routes.push((match_, match_routes));
}
Expand Down
15 changes: 8 additions & 7 deletions zenoh/src/net/routing/dispatcher/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,13 +75,6 @@ impl SessionContext {
}
}

#[derive(Default)]
pub(crate) struct RoutesIndexes {
pub(crate) routers: Vec<NodeId>,
pub(crate) peers: Vec<NodeId>,
pub(crate) clients: Vec<NodeId>,
}

#[derive(Default)]
pub(crate) struct DataRoutes {
pub(crate) routers: Vec<Arc<Route>>,
Expand Down Expand Up @@ -258,6 +251,14 @@ impl Resource {
}
}

pub(crate) fn has_subs(&self) -> bool {
self.session_ctxs.values().any(|sc| sc.subs.is_some())
}

pub(crate) fn has_qabls(&self) -> bool {
self.session_ctxs.values().any(|sc| sc.qabl.is_some())
}

#[inline]
pub(crate) fn data_route(&self, whatami: WhatAmI, context: NodeId) -> Option<Arc<Route>> {
match &self.context {
Expand Down
38 changes: 19 additions & 19 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ use super::{
use crate::net::{
routing::{
dispatcher::{face::Face, interests::RemoteInterest},
router::{compute_data_routes, compute_query_routes, RoutesIndexes},
router::{compute_data_routes, compute_query_routes},
},
runtime::Runtime,
};
Expand Down Expand Up @@ -217,25 +217,34 @@ impl HatBaseTrait for HatCode {
let mut matches_query_routes = vec![];
let rtables = zread!(tables.tables);
for _match in subs_matches.drain(..) {
let mut expr = RoutingExpr::new(&_match, "");
matches_data_routes.push((_match.clone(), compute_data_routes(&rtables, &mut expr)));
let route = (!_match.expr().contains('*') && _match.has_subs()).then(|| {
let mut expr = RoutingExpr::new(&_match, "");
compute_data_routes(&rtables, &mut expr)
});
matches_data_routes.push((_match.clone(), route));
}
for _match in qabls_matches.drain(..) {
matches_query_routes.push((_match.clone(), compute_query_routes(&rtables, &_match)));
let route = (!_match.expr().contains('*') && _match.has_qabls())
.then(|| compute_query_routes(&rtables, &_match));
matches_query_routes.push((_match.clone(), route));
}
drop(rtables);

let mut wtables = zwrite!(tables.tables);
for (mut res, data_routes) in matches_data_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_data_routes(data_routes);
if let Some(data_routes) = data_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_data_routes(data_routes);
}
Resource::clean(&mut res);
}
for (mut res, query_routes) in matches_query_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_query_routes(query_routes);
if let Some(query_routes) = query_routes {
get_mut_unchecked(&mut res)
.context_mut()
.update_query_routes(query_routes);
}
Resource::clean(&mut res);
}
wtables.faces.remove(&face.id);
Expand Down Expand Up @@ -321,12 +330,3 @@ impl HatFace {
}

impl HatTrait for HatCode {}

#[inline]
fn get_routes_entries() -> RoutesIndexes {
RoutesIndexes {
routers: vec![0],
peers: vec![0],
clients: vec![0],
}
}
19 changes: 14 additions & 5 deletions zenoh/src/net/routing/hat/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ use zenoh_protocol::{
};
use zenoh_sync::get_mut_unchecked;

use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace};
use super::{face_hat, face_hat_mut, HatCode, HatFace};
use crate::{
key_expr::KeyExpr,
net::routing::{
Expand All @@ -36,8 +36,8 @@ use crate::{
resource::{NodeId, Resource, SessionContext},
tables::{Route, RoutingExpr, Tables},
},
hat::{HatPubSubTrait, SendDeclare, Sources},
router::{update_data_routes_from, RoutesIndexes},
hat::{DataRoutes, HatPubSubTrait, SendDeclare, Sources},
router::update_data_routes_from,
RoutingContext,
},
};
Expand Down Expand Up @@ -402,8 +402,17 @@ impl HatPubSubTrait for HatCode {
Arc::new(route)
}

fn get_data_routes_entries(&self, _tables: &Tables) -> RoutesIndexes {
get_routes_entries()
fn compute_data_routes(
&self,
tables: &Tables,
routes: &mut DataRoutes,
expr: &mut RoutingExpr,
) {
let route = self.compute_data_route(tables, expr, 0, WhatAmI::Peer);
routes.routers.resize_with(1, || route.clone());
routes.peers.resize_with(1, || route.clone());
let route = self.compute_data_route(tables, expr, 0, WhatAmI::Client);
routes.clients.resize_with(1, || route.clone());
}

#[zenoh_macros::unstable]
Expand Down
19 changes: 14 additions & 5 deletions zenoh/src/net/routing/hat/client/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use zenoh_protocol::{
};
use zenoh_sync::get_mut_unchecked;

use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace};
use super::{face_hat, face_hat_mut, HatCode, HatFace};
use crate::{
key_expr::KeyExpr,
net::routing::{
Expand All @@ -41,8 +41,8 @@ use crate::{
resource::{NodeId, Resource, SessionContext},
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
},
hat::{HatQueriesTrait, SendDeclare, Sources},
router::{update_query_routes_from, RoutesIndexes},
hat::{HatQueriesTrait, QueryRoutes, SendDeclare, Sources},
router::update_query_routes_from,
RoutingContext,
},
};
Expand Down Expand Up @@ -427,8 +427,17 @@ impl HatQueriesTrait for HatCode {
Arc::new(route)
}

fn get_query_routes_entries(&self, _tables: &Tables) -> RoutesIndexes {
get_routes_entries()
fn compute_query_routes(
&self,
tables: &Tables,
routes: &mut QueryRoutes,
expr: &mut RoutingExpr,
) {
let route = self.compute_query_route(tables, expr, 0, WhatAmI::Peer);
routes.routers.resize_with(1, || route.clone());
routes.peers.resize_with(1, || route.clone());
let route = self.compute_query_route(tables, expr, 0, WhatAmI::Client);
routes.clients.resize_with(1, || route.clone());
}

#[cfg(feature = "unstable")]
Expand Down
Loading

0 comments on commit 1e3a496

Please sign in to comment.