Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Client interests #863

Merged
merged 3 commits into from
Mar 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 7 additions & 2 deletions zenoh/src/net/routing/dispatcher/face.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::any::Any;
use std::collections::HashMap;
use std::fmt;
use std::sync::Arc;
use zenoh_protocol::network::declare::{FinalInterest, InterestId};
use zenoh_protocol::network::declare::{FinalInterest, Interest, InterestId};
use zenoh_protocol::network::{ext, Declare, DeclareBody};
use zenoh_protocol::zenoh::RequestBody;
use zenoh_protocol::{
Expand All @@ -41,6 +41,7 @@ pub struct FaceState {
#[cfg(feature = "stats")]
pub(crate) stats: Option<Arc<TransportStats>>,
pub(crate) primitives: Arc<dyn crate::net::primitives::EPrimitives + Send + Sync>,
pub(crate) local_interests: HashMap<InterestId, (Interest, Option<Arc<Resource>>, bool)>,
pub(crate) remote_key_interests: HashMap<InterestId, Option<Arc<Resource>>>,
pub(crate) local_mappings: HashMap<ExprId, Arc<Resource>>,
pub(crate) remote_mappings: HashMap<ExprId, Arc<Resource>>,
Expand Down Expand Up @@ -70,6 +71,7 @@ impl FaceState {
#[cfg(feature = "stats")]
stats,
primitives,
local_interests: HashMap::new(),
remote_key_interests: HashMap::new(),
local_mappings: HashMap::new(),
remote_mappings: HashMap::new(),
Expand Down Expand Up @@ -265,7 +267,10 @@ impl Primitives for Face {
}
}
zenoh_protocol::network::DeclareBody::FinalInterest(m) => {
log::warn!("Received unsupported {m:?}")
get_mut_unchecked(&mut self.state.clone())
.local_interests
.entry(m.id)
.and_modify(|interest| interest.2 = true);
}
zenoh_protocol::network::DeclareBody::UndeclareInterest(m) => {
unregister_expr_interest(&self.tables, &mut self.state.clone(), m.id);
Expand Down
6 changes: 5 additions & 1 deletion zenoh/src/net/routing/hat/client/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,9 @@ use std::{
sync::{atomic::AtomicU32, Arc},
};
use zenoh_config::WhatAmI;
use zenoh_protocol::network::declare::{queryable::ext::QueryableInfo, QueryableId, SubscriberId};
use zenoh_protocol::network::declare::{
queryable::ext::QueryableInfo, InterestId, QueryableId, SubscriberId,
};
use zenoh_protocol::network::Oam;
use zenoh_result::ZResult;
use zenoh_sync::get_mut_unchecked;
Expand Down Expand Up @@ -282,6 +284,7 @@ impl HatContext {

struct HatFace {
next_id: AtomicU32, // @TODO: manage rollover and uniqueness
remote_sub_interests: HashMap<InterestId, Option<Arc<Resource>>>,
local_subs: HashMap<Arc<Resource>, SubscriberId>,
remote_subs: HashMap<SubscriberId, Arc<Resource>>,
local_qabls: HashMap<Arc<Resource>, (QueryableId, QueryableInfo)>,
Expand All @@ -292,6 +295,7 @@ impl HatFace {
fn new() -> Self {
Self {
next_id: AtomicU32::new(0),
remote_sub_interests: HashMap::new(),
local_subs: HashMap::new(),
remote_subs: HashMap::new(),
local_qabls: HashMap::new(),
Expand Down
159 changes: 132 additions & 27 deletions zenoh/src/net/routing/hat/client/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,14 @@ use crate::net::routing::dispatcher::tables::{Route, RoutingExpr};
use crate::net::routing::hat::HatPubSubTrait;
use crate::net::routing::router::RoutesIndexes;
use crate::net::routing::{RoutingContext, PREFIX_LIVELINESS};
use crate::KeyExpr;
use std::borrow::Cow;
use std::collections::{HashMap, HashSet};
use std::sync::atomic::Ordering;
use std::sync::Arc;
use zenoh_protocol::core::key_expr::OwnedKeyExpr;
use zenoh_protocol::network::declare::{InterestId, SubscriberId};
use zenoh_protocol::network::declare::{Interest, InterestId, SubscriberId};
use zenoh_protocol::network::{DeclareInterest, UndeclareInterest};
use zenoh_protocol::{
core::{Reliability, WhatAmI},
network::declare::{
Expand Down Expand Up @@ -244,24 +246,96 @@ pub(super) fn pubsub_new_face(tables: &mut Tables, face: &mut Arc<FaceState>) {
impl HatPubSubTrait for HatCode {
fn declare_sub_interest(
&self,
_tables: &mut Tables,
_face: &mut Arc<FaceState>,
_id: InterestId,
_res: Option<&mut Arc<Resource>>,
_current: bool,
_future: bool,
tables: &mut Tables,
face: &mut Arc<FaceState>,
id: InterestId,
res: Option<&mut Arc<Resource>>,
current: bool,
future: bool,
_aggregate: bool,
) {
todo!()
face_hat_mut!(face)
.remote_sub_interests
.insert(id, res.as_ref().map(|res| (*res).clone()));
for dst_face in tables
.faces
.values_mut()
.filter(|f| f.whatami != WhatAmI::Client)
{
let id = face_hat!(dst_face).next_id.fetch_add(1, Ordering::SeqCst);
let mut interest = Interest::KEYEXPRS + Interest::SUBSCRIBERS;
if current {
interest += Interest::CURRENT;
}
if future {
interest += Interest::FUTURE;
}
get_mut_unchecked(dst_face).local_interests.insert(
id,
(interest, res.as_ref().map(|res| (*res).clone()), !current),
);
let wire_expr = res.as_ref().map(|res| Resource::decl_key(res, dst_face));
dst_face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::DeclareInterest(DeclareInterest {
id,
interest,
wire_expr,
}),
},
res.as_ref().map(|res| res.expr()).unwrap_or_default(),
));
}
}

fn undeclare_sub_interest(
&self,
_tables: &mut Tables,
_face: &mut Arc<FaceState>,
_id: InterestId,
tables: &mut Tables,
face: &mut Arc<FaceState>,
id: InterestId,
) {
todo!()
if let Some(interest) = face_hat_mut!(face).remote_sub_interests.remove(&id) {
if !tables.faces.values().any(|f| {
f.whatami == WhatAmI::Client
&& face_hat!(f)
.remote_sub_interests
.values()
.any(|i| *i == interest)
}) {
for dst_face in tables
.faces
.values_mut()
.filter(|f| f.whatami != WhatAmI::Client)
{
for id in dst_face
.local_interests
.keys()
.cloned()
.collect::<Vec<InterestId>>()
{
let (int, res, _) = dst_face.local_interests.get(&id).unwrap();
if int.subscribers() && (*res == interest) {
dst_face.primitives.send_declare(RoutingContext::with_expr(
Declare {
ext_qos: ext::QoSType::DECLARE,
ext_tstamp: None,
ext_nodeid: ext::NodeIdType::DEFAULT,
body: DeclareBody::UndeclareInterest(UndeclareInterest {
id,
ext_wire_expr: WireExprType::null(),
}),
},
res.as_ref().map(|res| res.expr()).unwrap_or_default(),
));
get_mut_unchecked(dst_face).local_interests.remove(&id);
}
}
}
}
}
}

fn declare_subscription(
Expand Down Expand Up @@ -323,12 +397,51 @@ impl HatPubSubTrait for HatCode {
}
};

if let Some(face) = tables.faces.values().find(|f| f.whatami != WhatAmI::Client) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
for face in tables
.faces
.values()
.filter(|f| f.whatami != WhatAmI::Client)
{
if face
.local_interests
.values()
.any(|(interest, res, finalized)| {
*finalized
&& interest.subscribers()
&& res
.as_ref()
.map(|res| {
KeyExpr::try_from(res.expr())
.and_then(|intres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| intres.includes(&putres))
})
.unwrap_or(false)
})
.unwrap_or(true)
})
{
if face_hat!(face).remote_subs.values().any(|sub| {
KeyExpr::try_from(sub.expr())
.and_then(|subres| {
KeyExpr::try_from(expr.full_expr())
.map(|putres| subres.intersects(&putres))
})
.unwrap_or(false)
}) {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
} else {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, face.id);
route.insert(
face.id,
(face.clone(), key_expr.to_owned(), NodeId::default()),
);
}
}

let res = Resource::get_resource(expr.prefix, expr.suffix);
Expand All @@ -342,15 +455,7 @@ impl HatPubSubTrait for HatCode {
let mres = mres.upgrade().unwrap();

for (sid, context) in &mres.session_ctxs {
if context.subs.is_some()
&& match tables.whatami {
WhatAmI::Router => context.face.whatami != WhatAmI::Router,
_ => {
source_type == WhatAmI::Client
|| context.face.whatami == WhatAmI::Client
}
}
{
if context.subs.is_some() && context.face.whatami == WhatAmI::Client {
route.entry(*sid).or_insert_with(|| {
let key_expr = Resource::get_best_key(expr.prefix, expr.suffix, *sid);
(context.face.clone(), key_expr.to_owned(), NodeId::default())
Expand Down
65 changes: 36 additions & 29 deletions zenoh/src/publication.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::{
handlers::{Callback, DefaultHandler, IntoHandler},
Id,
};
use std::fmt;
use std::future::Ready;
use zenoh_core::{zread, AsyncResolve, Resolvable, Resolve, SyncResolve};
use zenoh_protocol::network::push::ext;
Expand Down Expand Up @@ -157,7 +158,7 @@ impl SyncResolve for PutBuilder<'_, '_> {
let publisher = Publisher {
session,
#[cfg(feature = "unstable")]
eid: 0, // This is a one shot Publisher
id: 0, // This is a one shot Publisher
key_expr: key_expr?,
congestion_control,
priority,
Expand Down Expand Up @@ -193,6 +194,22 @@ use std::pin::Pin;
use std::task::{Context, Poll};
use zenoh_result::Error;

pub(crate) struct PublisherState {
pub(crate) id: Id,
pub(crate) remote_id: Id,
pub(crate) key_expr: KeyExpr<'static>,
pub(crate) destination: Locality,
}

impl fmt::Debug for PublisherState {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
f.debug_struct("Publisher")
.field("id", &self.id)
.field("key_expr", &self.key_expr)
.finish()
}
}

#[zenoh_macros::unstable]
#[derive(Clone)]
pub enum PublisherRef<'a> {
Expand Down Expand Up @@ -254,8 +271,7 @@ impl std::fmt::Debug for PublisherRef<'_> {
#[derive(Debug, Clone)]
pub struct Publisher<'a> {
pub(crate) session: SessionRef<'a>,
#[cfg(feature = "unstable")]
pub(crate) eid: EntityId,
pub(crate) id: Id,
pub(crate) key_expr: KeyExpr<'a>,
pub(crate) congestion_control: CongestionControl,
pub(crate) priority: Priority,
Expand Down Expand Up @@ -283,7 +299,7 @@ impl<'a> Publisher<'a> {
pub fn id(&self) -> EntityGlobalId {
EntityGlobalId {
zid: self.session.zid(),
eid: self.eid,
eid: self.id,
}
}

Expand Down Expand Up @@ -588,11 +604,9 @@ impl Resolvable for PublisherUndeclaration<'_> {
impl SyncResolve for PublisherUndeclaration<'_> {
fn res_sync(mut self) -> <Self as Resolvable>::To {
let Publisher {
session, key_expr, ..
session, id: eid, ..
} = &self.publisher;
session
.undeclare_publication_intent(key_expr.clone())
.res_sync()?;
session.undeclare_publisher_inner(*eid)?;
self.publisher.key_expr = unsafe { keyexpr::from_str_unchecked("") }.into();
Ok(())
}
Expand All @@ -609,10 +623,7 @@ impl AsyncResolve for PublisherUndeclaration<'_> {
impl Drop for Publisher<'_> {
fn drop(&mut self) {
if !self.key_expr.is_empty() {
let _ = self
.session
.undeclare_publication_intent(self.key_expr.clone())
.res_sync();
let _ = self.session.undeclare_publisher_inner(self.id);
}
}
}
Expand Down Expand Up @@ -841,23 +852,19 @@ impl<'a, 'b> SyncResolve for PublisherBuilder<'a, 'b> {
}
}
}
self.session
.declare_publication_intent(key_expr.clone())
.res_sync()?;
#[cfg(feature = "unstable")]
let eid = self.session.runtime.next_id();
let publisher = Publisher {
session: self.session,
#[cfg(feature = "unstable")]
eid,
key_expr,
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
};
log::trace!("publish({:?})", publisher.key_expr);
Ok(publisher)
let session = self.session;
session
.declare_publisher_inner(key_expr.clone(), self.destination)
.map(|eid| Publisher {
session,
#[cfg(feature = "unstable")]
id: eid,
key_expr,
congestion_control: self.congestion_control,
priority: self.priority,
is_express: self.is_express,
destination: self.destination,
})
}
}

Expand Down
Loading
Loading