Skip to content

Commit

Permalink
Release tables locks before propagating subscribers and queryables de…
Browse files Browse the repository at this point in the history
…clarations to void dead locks (#1150)

* Send simple sub and qabl declarations using a given function

* Send simple sub and qabl declarations after releasing tables lock

* Send simple sub and qabl declarations after releasing tables lock (missing places)
  • Loading branch information
OlivierHecart authored Jun 17, 2024
1 parent 7adad94 commit 93f93d2
Show file tree
Hide file tree
Showing 20 changed files with 1,194 additions and 647 deletions.
24 changes: 22 additions & 2 deletions zenoh/src/net/primitives/demux.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,9 +72,21 @@ impl TransportPeerEventHandler for DeMux {
NetworkBody::ResponseFinal(m) => self.face.send_response_final(m),
NetworkBody::OAM(m) => {
if let Some(transport) = self.transport.as_ref() {
let mut declares = vec![];
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut tables = zwrite!(self.face.tables.tables);
ctrl_lock.handle_oam(&mut tables, &self.face.tables, m, transport)?
ctrl_lock.handle_oam(
&mut tables,
&self.face.tables,
m,
transport,
&mut |p, m| declares.push((p.clone(), m)),
)?;
drop(tables);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
}
}
Expand All @@ -89,9 +101,17 @@ impl TransportPeerEventHandler for DeMux {
fn closing(&self) {
self.face.send_close();
if let Some(transport) = self.transport.as_ref() {
let mut declares = vec![];
let ctrl_lock = zlock!(self.face.tables.ctrl_lock);
let mut tables = zwrite!(self.face.tables.tables);
let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport);
let _ = ctrl_lock.closing(&mut tables, &self.face.tables, transport, &mut |p, m| {
declares.push((p.clone(), m))
});
drop(tables);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
}

Expand Down
25 changes: 24 additions & 1 deletion zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -195,50 +195,73 @@ impl Primitives for Face {
unregister_expr(&self.tables, &mut self.state.clone(), m.id);
}
zenoh_protocol::network::DeclareBody::DeclareSubscriber(m) => {
let mut declares = vec![];
declare_subscription(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
&m.wire_expr,
&m.ext_info,
msg.ext_nodeid.node_id,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
zenoh_protocol::network::DeclareBody::UndeclareSubscriber(m) => {
let mut declares = vec![];
undeclare_subscription(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
&m.ext_wire_expr.wire_expr,
msg.ext_nodeid.node_id,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
zenoh_protocol::network::DeclareBody::DeclareQueryable(m) => {
let mut declares = vec![];
declare_queryable(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
&m.wire_expr,
&m.ext_info,
msg.ext_nodeid.node_id,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
zenoh_protocol::network::DeclareBody::UndeclareQueryable(m) => {
let mut declares = vec![];
undeclare_queryable(
ctrl_lock.as_ref(),
&self.tables,
&mut self.state.clone(),
&m.ext_wire_expr.wire_expr,
msg.ext_nodeid.node_id,
&mut |p, m| declares.push((p.clone(), m)),
);
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
zenoh_protocol::network::DeclareBody::DeclareToken(_m) => todo!(),
zenoh_protocol::network::DeclareBody::UndeclareToken(_m) => todo!(),
zenoh_protocol::network::DeclareBody::DeclareInterest(_m) => todo!(),
zenoh_protocol::network::DeclareBody::FinalInterest(_m) => todo!(),
zenoh_protocol::network::DeclareBody::UndeclareInterest(_m) => todo!(),
}
drop(ctrl_lock);
}

#[inline]
Expand Down
21 changes: 18 additions & 3 deletions zenoh/src/net/routing/dispatcher/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
use super::face::FaceState;
use super::resource::{DataRoutes, Direction, PullCaches, Resource};
use super::tables::{NodeId, Route, RoutingExpr, Tables, TablesLock};
use crate::net::routing::hat::HatTrait;
use crate::net::routing::hat::{HatTrait, SendDeclare};
use std::borrow::Cow;
use std::collections::HashMap;
use std::sync::Arc;
Expand All @@ -37,6 +37,7 @@ pub(crate) fn declare_subscription(
expr: &WireExpr,
sub_info: &SubscriberInfo,
node_id: NodeId,
send_declare: &mut SendDeclare,
) {
tracing::debug!("Declare subscription {}", face);
let rtables = zread!(tables.tables);
Expand Down Expand Up @@ -66,7 +67,14 @@ pub(crate) fn declare_subscription(
(res, wtables)
};

hat_code.declare_subscription(&mut wtables, face, &mut res, sub_info, node_id);
hat_code.declare_subscription(
&mut wtables,
face,
&mut res,
sub_info,
node_id,
send_declare,
);

disable_matches_data_routes(&mut wtables, &mut res);
drop(wtables);
Expand Down Expand Up @@ -96,6 +104,7 @@ pub(crate) fn undeclare_subscription(
face: &mut Arc<FaceState>,
expr: &WireExpr,
node_id: NodeId,
send_declare: &mut SendDeclare,
) {
tracing::debug!("Undeclare subscription {}", face);
let rtables = zread!(tables.tables);
Expand All @@ -105,7 +114,13 @@ pub(crate) fn undeclare_subscription(
drop(rtables);
let mut wtables = zwrite!(tables.tables);

hat_code.undeclare_subscription(&mut wtables, face, &mut res, node_id);
hat_code.undeclare_subscription(
&mut wtables,
face,
&mut res,
node_id,
send_declare,
);

disable_matches_data_routes(&mut wtables, &mut res);
drop(wtables);
Expand Down
15 changes: 12 additions & 3 deletions zenoh/src/net/routing/dispatcher/queries.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::face::FaceState;
use super::resource::{QueryRoute, QueryRoutes, QueryTargetQablSet, Resource};
use super::tables::NodeId;
use super::tables::{RoutingExpr, Tables, TablesLock};
use crate::net::routing::hat::HatTrait;
use crate::net::routing::hat::{HatTrait, SendDeclare};
use crate::net::routing::RoutingContext;
use async_trait::async_trait;
use std::collections::HashMap;
Expand Down Expand Up @@ -56,6 +56,7 @@ pub(crate) fn declare_queryable(
expr: &WireExpr,
qabl_info: &QueryableInfo,
node_id: NodeId,
send_declare: &mut SendDeclare,
) {
tracing::debug!("Register queryable {}", face);
let rtables = zread!(tables.tables);
Expand Down Expand Up @@ -85,7 +86,14 @@ pub(crate) fn declare_queryable(
(res, wtables)
};

hat_code.declare_queryable(&mut wtables, face, &mut res, qabl_info, node_id);
hat_code.declare_queryable(
&mut wtables,
face,
&mut res,
qabl_info,
node_id,
send_declare,
);

disable_matches_query_routes(&mut wtables, &mut res);
drop(wtables);
Expand All @@ -112,6 +120,7 @@ pub(crate) fn undeclare_queryable(
face: &mut Arc<FaceState>,
expr: &WireExpr,
node_id: NodeId,
send_declare: &mut SendDeclare,
) {
let rtables = zread!(tables.tables);
match rtables.get_mapping(face, &expr.scope, expr.mapping) {
Expand All @@ -120,7 +129,7 @@ pub(crate) fn undeclare_queryable(
drop(rtables);
let mut wtables = zwrite!(tables.tables);

hat_code.undeclare_queryable(&mut wtables, face, &mut res, node_id);
hat_code.undeclare_queryable(&mut wtables, face, &mut res, node_id, send_declare);

disable_matches_query_routes(&mut wtables, &mut res);
drop(wtables);
Expand Down
8 changes: 7 additions & 1 deletion zenoh/src/net/routing/dispatcher/tables.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,13 @@ pub fn close_face(tables: &TablesLock, face: &Weak<FaceState>) {
tracing::debug!("Close {}", face);
face.task_controller.terminate_all(Duration::from_secs(10));
finalize_pending_queries(tables, &mut face);
zlock!(tables.ctrl_lock).close_face(tables, &mut face);
let mut declares = vec![];
let ctrl_lock = zlock!(tables.ctrl_lock);
ctrl_lock.close_face(tables, &mut face, &mut |p, m| declares.push((p.clone(), m)));
drop(ctrl_lock);
for (p, m) in declares {
p.send_declare(m);
}
}
None => tracing::error!("Face already closed!"),
}
Expand Down
25 changes: 17 additions & 8 deletions zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ use super::{
face::FaceState,
tables::{NodeId, Resource, RoutingExpr, Tables, TablesLock},
},
HatBaseTrait, HatTrait,
HatBaseTrait, HatTrait, SendDeclare,
};
use std::{
any::Any,
Expand Down Expand Up @@ -97,9 +97,10 @@ impl HatBaseTrait for HatCode {
tables: &mut Tables,
_tables_ref: &Arc<TablesLock>,
face: &mut Face,
send_declare: &mut SendDeclare,
) -> ZResult<()> {
pubsub_new_face(tables, &mut face.state);
queries_new_face(tables, &mut face.state);
pubsub_new_face(tables, &mut face.state, send_declare);
queries_new_face(tables, &mut face.state, send_declare);
Ok(())
}

Expand All @@ -109,13 +110,19 @@ impl HatBaseTrait for HatCode {
_tables_ref: &Arc<TablesLock>,
face: &mut Face,
_transport: &TransportUnicast,
send_declare: &mut SendDeclare,
) -> ZResult<()> {
pubsub_new_face(tables, &mut face.state);
queries_new_face(tables, &mut face.state);
pubsub_new_face(tables, &mut face.state, send_declare);
queries_new_face(tables, &mut face.state, send_declare);
Ok(())
}

fn close_face(&self, tables: &TablesLock, face: &mut Arc<FaceState>) {
fn close_face(
&self,
tables: &TablesLock,
face: &mut Arc<FaceState>,
send_declare: &mut SendDeclare,
) {
let mut wtables = zwrite!(tables.tables);
let mut face_clone = face.clone();
let face = get_mut_unchecked(face);
Expand All @@ -139,7 +146,7 @@ impl HatBaseTrait for HatCode {
.drain()
{
get_mut_unchecked(&mut res).session_ctxs.remove(&face.id);
undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res);
undeclare_client_subscription(&mut wtables, &mut face_clone, &mut res, send_declare);

if res.context.is_some() {
for match_ in &res.context().matches {
Expand Down Expand Up @@ -167,7 +174,7 @@ impl HatBaseTrait for HatCode {
.drain()
{
get_mut_unchecked(&mut res).session_ctxs.remove(&face.id);
undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res);
undeclare_client_queryable(&mut wtables, &mut face_clone, &mut res, send_declare);

if res.context.is_some() {
for match_ in &res.context().matches {
Expand Down Expand Up @@ -229,6 +236,7 @@ impl HatBaseTrait for HatCode {
_tables_ref: &Arc<TablesLock>,
_oam: Oam,
_transport: &TransportUnicast,
_send_declare: &mut SendDeclare,
) -> ZResult<()> {
Ok(())
}
Expand All @@ -248,6 +256,7 @@ impl HatBaseTrait for HatCode {
_tables: &mut Tables,
_tables_ref: &Arc<TablesLock>,
_transport: &TransportUnicast,
_send_declare: &mut SendDeclare,
) -> ZResult<()> {
Ok(())
}
Expand Down
Loading

0 comments on commit 93f93d2

Please sign in to comment.