Skip to content

Commit bcc7535

Browse files
committed
Compute identical routes from different source types once
1 parent 2ae8104 commit bcc7535

File tree

16 files changed

+254
-207
lines changed

16 files changed

+254
-207
lines changed

zenoh/src/net/routing/dispatcher/pubsub.rs

+9-44
Original file line numberDiff line numberDiff line change
@@ -11,11 +11,14 @@
1111
// Contributors:
1212
// ZettaScale Zenoh Team, <[email protected]>
1313
//
14-
use std::{collections::HashMap, sync::Arc};
14+
15+
#[zenoh_macros::unstable]
16+
use std::collections::HashMap;
17+
use std::sync::Arc;
1518

1619
use zenoh_core::zread;
1720
use zenoh_protocol::{
18-
core::{key_expr::keyexpr, Reliability, WhatAmI, WireExpr},
21+
core::{key_expr::keyexpr, Reliability, WireExpr},
1922
network::{
2023
declare::{ext, SubscriberId},
2124
Push,
@@ -177,57 +180,19 @@ pub(crate) fn undeclare_subscription(
177180
}
178181
}
179182

180-
fn compute_data_routes_(tables: &Tables, routes: &mut DataRoutes, expr: &mut RoutingExpr) {
181-
let indexes = tables.hat_code.get_data_routes_entries(tables);
182-
183-
let max_idx = indexes.routers.iter().max().unwrap();
184-
routes
185-
.routers
186-
.resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new()));
187-
188-
for idx in indexes.routers {
189-
routes.routers[idx as usize] =
190-
tables
191-
.hat_code
192-
.compute_data_route(tables, expr, idx, WhatAmI::Router);
193-
}
194-
195-
let max_idx = indexes.peers.iter().max().unwrap();
196-
routes
197-
.peers
198-
.resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new()));
199-
200-
for idx in indexes.peers {
201-
routes.peers[idx as usize] =
202-
tables
203-
.hat_code
204-
.compute_data_route(tables, expr, idx, WhatAmI::Peer);
205-
}
206-
207-
let max_idx = indexes.clients.iter().max().unwrap();
208-
routes
209-
.clients
210-
.resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new()));
211-
212-
for idx in indexes.clients {
213-
routes.clients[idx as usize] =
214-
tables
215-
.hat_code
216-
.compute_data_route(tables, expr, idx, WhatAmI::Client);
217-
}
218-
}
219-
220183
pub(crate) fn compute_data_routes(tables: &Tables, expr: &mut RoutingExpr) -> DataRoutes {
221184
let mut routes = DataRoutes::default();
222-
compute_data_routes_(tables, &mut routes, expr);
185+
tables
186+
.hat_code
187+
.compute_data_routes(tables, &mut routes, expr);
223188
routes
224189
}
225190

226191
pub(crate) fn update_data_routes(tables: &Tables, res: &mut Arc<Resource>) {
227192
if res.context.is_some() && !res.expr().contains('*') && res.has_subs() {
228193
let mut res_mut = res.clone();
229194
let res_mut = get_mut_unchecked(&mut res_mut);
230-
compute_data_routes_(
195+
tables.hat_code.compute_data_routes(
231196
tables,
232197
&mut res_mut.context_mut().data_routes,
233198
&mut RoutingExpr::new(res, ""),

zenoh/src/net/routing/dispatcher/queries.rs

+4-43
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@ use std::{
2020
use async_trait::async_trait;
2121
use tokio_util::sync::CancellationToken;
2222
use zenoh_buffers::ZBuf;
23-
use zenoh_config::WhatAmI;
2423
#[cfg(feature = "stats")]
2524
use zenoh_protocol::zenoh::reply::ReplyBody;
2625
use zenoh_protocol::{
@@ -205,57 +204,19 @@ pub(crate) fn undeclare_queryable(
205204
}
206205
}
207206

208-
fn compute_query_routes_(tables: &Tables, routes: &mut QueryRoutes, expr: &mut RoutingExpr) {
209-
let indexes = tables.hat_code.get_query_routes_entries(tables);
210-
211-
let max_idx = indexes.routers.iter().max().unwrap();
212-
routes.routers.resize_with((*max_idx as usize) + 1, || {
213-
Arc::new(QueryTargetQablSet::new())
214-
});
215-
216-
for idx in indexes.routers {
217-
routes.routers[idx as usize] =
218-
tables
219-
.hat_code
220-
.compute_query_route(tables, expr, idx, WhatAmI::Router);
221-
}
222-
223-
let max_idx = indexes.peers.iter().max().unwrap();
224-
routes.peers.resize_with((*max_idx as usize) + 1, || {
225-
Arc::new(QueryTargetQablSet::new())
226-
});
227-
228-
for idx in indexes.peers {
229-
routes.peers[idx as usize] =
230-
tables
231-
.hat_code
232-
.compute_query_route(tables, expr, idx, WhatAmI::Peer);
233-
}
234-
235-
let max_idx = indexes.clients.iter().max().unwrap();
236-
routes.clients.resize_with((*max_idx as usize) + 1, || {
237-
Arc::new(QueryTargetQablSet::new())
238-
});
239-
240-
for idx in indexes.clients {
241-
routes.clients[idx as usize] =
242-
tables
243-
.hat_code
244-
.compute_query_route(tables, expr, idx, WhatAmI::Client);
245-
}
246-
}
247-
248207
pub(crate) fn compute_query_routes(tables: &Tables, res: &Arc<Resource>) -> QueryRoutes {
249208
let mut routes = QueryRoutes::default();
250-
compute_query_routes_(tables, &mut routes, &mut RoutingExpr::new(res, ""));
209+
tables
210+
.hat_code
211+
.compute_query_routes(tables, &mut routes, &mut RoutingExpr::new(res, ""));
251212
routes
252213
}
253214

254215
pub(crate) fn update_query_routes(tables: &Tables, res: &Arc<Resource>) {
255216
if res.context.is_some() && !res.expr().contains('*') && res.has_qabls() {
256217
let mut res_mut = res.clone();
257218
let res_mut = get_mut_unchecked(&mut res_mut);
258-
compute_query_routes_(
219+
tables.hat_code.compute_query_routes(
259220
tables,
260221
&mut res_mut.context_mut().query_routes,
261222
&mut RoutingExpr::new(res, ""),

zenoh/src/net/routing/dispatcher/resource.rs

-7
Original file line numberDiff line numberDiff line change
@@ -75,13 +75,6 @@ impl SessionContext {
7575
}
7676
}
7777

78-
#[derive(Default)]
79-
pub(crate) struct RoutesIndexes {
80-
pub(crate) routers: Vec<NodeId>,
81-
pub(crate) peers: Vec<NodeId>,
82-
pub(crate) clients: Vec<NodeId>,
83-
}
84-
8578
#[derive(Default)]
8679
pub(crate) struct DataRoutes {
8780
pub(crate) routers: Vec<Arc<Route>>,

zenoh/src/net/routing/hat/client/mod.rs

+1-10
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ use super::{
4949
use crate::net::{
5050
routing::{
5151
dispatcher::{face::Face, interests::RemoteInterest},
52-
router::{compute_data_routes, compute_query_routes, RoutesIndexes},
52+
router::{compute_data_routes, compute_query_routes},
5353
},
5454
runtime::Runtime,
5555
};
@@ -330,12 +330,3 @@ impl HatFace {
330330
}
331331

332332
impl HatTrait for HatCode {}
333-
334-
#[inline]
335-
fn get_routes_entries() -> RoutesIndexes {
336-
RoutesIndexes {
337-
routers: vec![0],
338-
peers: vec![0],
339-
clients: vec![0],
340-
}
341-
}

zenoh/src/net/routing/hat/client/pubsub.rs

+14-5
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ use zenoh_protocol::{
2626
};
2727
use zenoh_sync::get_mut_unchecked;
2828

29-
use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace};
29+
use super::{face_hat, face_hat_mut, HatCode, HatFace};
3030
use crate::{
3131
key_expr::KeyExpr,
3232
net::routing::{
@@ -36,8 +36,8 @@ use crate::{
3636
resource::{NodeId, Resource, SessionContext},
3737
tables::{Route, RoutingExpr, Tables},
3838
},
39-
hat::{HatPubSubTrait, SendDeclare, Sources},
40-
router::{update_data_routes_from, RoutesIndexes},
39+
hat::{DataRoutes, HatPubSubTrait, SendDeclare, Sources},
40+
router::update_data_routes_from,
4141
RoutingContext,
4242
},
4343
};
@@ -402,8 +402,17 @@ impl HatPubSubTrait for HatCode {
402402
Arc::new(route)
403403
}
404404

405-
fn get_data_routes_entries(&self, _tables: &Tables) -> RoutesIndexes {
406-
get_routes_entries()
405+
fn compute_data_routes(
406+
&self,
407+
tables: &Tables,
408+
routes: &mut DataRoutes,
409+
expr: &mut RoutingExpr,
410+
) {
411+
let route = self.compute_data_route(tables, expr, 0, WhatAmI::Peer);
412+
routes.routers.resize_with(1, || route.clone());
413+
routes.peers.resize_with(1, || route.clone());
414+
let route = self.compute_data_route(tables, expr, 0, WhatAmI::Client);
415+
routes.clients.resize_with(1, || route.clone());
407416
}
408417

409418
#[zenoh_macros::unstable]

zenoh/src/net/routing/hat/client/queries.rs

+14-5
Original file line numberDiff line numberDiff line change
@@ -32,7 +32,7 @@ use zenoh_protocol::{
3232
};
3333
use zenoh_sync::get_mut_unchecked;
3434

35-
use super::{face_hat, face_hat_mut, get_routes_entries, HatCode, HatFace};
35+
use super::{face_hat, face_hat_mut, HatCode, HatFace};
3636
use crate::{
3737
key_expr::KeyExpr,
3838
net::routing::{
@@ -41,8 +41,8 @@ use crate::{
4141
resource::{NodeId, Resource, SessionContext},
4242
tables::{QueryTargetQabl, QueryTargetQablSet, RoutingExpr, Tables},
4343
},
44-
hat::{HatQueriesTrait, SendDeclare, Sources},
45-
router::{update_query_routes_from, RoutesIndexes},
44+
hat::{HatQueriesTrait, QueryRoutes, SendDeclare, Sources},
45+
router::update_query_routes_from,
4646
RoutingContext,
4747
},
4848
};
@@ -427,8 +427,17 @@ impl HatQueriesTrait for HatCode {
427427
Arc::new(route)
428428
}
429429

430-
fn get_query_routes_entries(&self, _tables: &Tables) -> RoutesIndexes {
431-
get_routes_entries()
430+
fn compute_query_routes(
431+
&self,
432+
tables: &Tables,
433+
routes: &mut QueryRoutes,
434+
expr: &mut RoutingExpr,
435+
) {
436+
let route = self.compute_query_route(tables, expr, 0, WhatAmI::Peer);
437+
routes.routers.resize_with(1, || route.clone());
438+
routes.peers.resize_with(1, || route.clone());
439+
let route = self.compute_query_route(tables, expr, 0, WhatAmI::Client);
440+
routes.clients.resize_with(1, || route.clone());
432441
}
433442

434443
#[cfg(feature = "unstable")]

zenoh/src/net/routing/hat/linkstate_peer/mod.rs

+1-18
Original file line numberDiff line numberDiff line change
@@ -58,7 +58,7 @@ use crate::net::{
5858
routing::{
5959
dispatcher::{face::Face, interests::RemoteInterest},
6060
hat::TREES_COMPUTATION_DELAY_MS,
61-
router::{compute_data_routes, compute_query_routes, RoutesIndexes},
61+
router::{compute_data_routes, compute_query_routes},
6262
},
6363
runtime::Runtime,
6464
};
@@ -554,20 +554,3 @@ fn get_peer(tables: &Tables, face: &Arc<FaceState>, nodeid: NodeId) -> Option<Ze
554554
}
555555

556556
impl HatTrait for HatCode {}
557-
558-
#[inline]
559-
fn get_routes_entries(tables: &Tables) -> RoutesIndexes {
560-
let indexes = hat!(tables)
561-
.linkstatepeers_net
562-
.as_ref()
563-
.unwrap()
564-
.graph
565-
.node_indices()
566-
.map(|i| i.index() as NodeId)
567-
.collect::<Vec<NodeId>>();
568-
RoutesIndexes {
569-
routers: indexes.clone(),
570-
peers: indexes,
571-
clients: vec![0],
572-
}
573-
}

zenoh/src/net/routing/hat/linkstate_peer/pubsub.rs

+33-6
Original file line numberDiff line numberDiff line change
@@ -31,8 +31,8 @@ use zenoh_protocol::{
3131
use zenoh_sync::get_mut_unchecked;
3232

3333
use super::{
34-
face_hat, face_hat_mut, get_peer, get_routes_entries, hat, hat_mut, network::Network, res_hat,
35-
res_hat_mut, HatCode, HatContext, HatFace, HatTables,
34+
face_hat, face_hat_mut, get_peer, hat, hat_mut, network::Network, res_hat, res_hat_mut,
35+
HatCode, HatContext, HatFace, HatTables,
3636
};
3737
#[cfg(feature = "unstable")]
3838
use crate::key_expr::KeyExpr;
@@ -44,8 +44,7 @@ use crate::net::routing::{
4444
resource::{NodeId, Resource, SessionContext},
4545
tables::{Route, RoutingExpr, Tables},
4646
},
47-
hat::{CurrentFutureTrait, HatPubSubTrait, SendDeclare, Sources},
48-
router::RoutesIndexes,
47+
hat::{CurrentFutureTrait, DataRoutes, HatPubSubTrait, SendDeclare, Sources},
4948
RoutingContext,
5049
};
5150

@@ -986,8 +985,36 @@ impl HatPubSubTrait for HatCode {
986985
Arc::new(route)
987986
}
988987

989-
fn get_data_routes_entries(&self, tables: &Tables) -> RoutesIndexes {
990-
get_routes_entries(tables)
988+
fn compute_data_routes(
989+
&self,
990+
tables: &Tables,
991+
routes: &mut DataRoutes,
992+
expr: &mut RoutingExpr,
993+
) {
994+
let indexes = hat!(tables)
995+
.linkstatepeers_net
996+
.as_ref()
997+
.unwrap()
998+
.graph
999+
.node_indices()
1000+
.map(|i| i.index() as NodeId)
1001+
.collect::<Vec<NodeId>>();
1002+
let max_idx = indexes.iter().max().unwrap();
1003+
routes
1004+
.routers
1005+
.resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new()));
1006+
routes
1007+
.peers
1008+
.resize_with((*max_idx as usize) + 1, || Arc::new(HashMap::new()));
1009+
for idx in indexes {
1010+
let route = self.compute_data_route(tables, expr, idx, WhatAmI::Peer);
1011+
routes.routers[idx as usize] = route.clone();
1012+
routes.peers[idx as usize] = route;
1013+
}
1014+
1015+
routes.clients.resize_with(1, || {
1016+
self.compute_data_route(tables, expr, 0, WhatAmI::Client)
1017+
});
9911018
}
9921019

9931020
#[zenoh_macros::unstable]

0 commit comments

Comments
 (0)