Skip to content

Commit

Permalink
Merge branch 'master' into matching_state
Browse files Browse the repository at this point in the history
  • Loading branch information
OlivierHecart committed Oct 10, 2023
2 parents 403a15d + 5edd0eb commit f23bce5
Show file tree
Hide file tree
Showing 5 changed files with 93 additions and 39 deletions.
6 changes: 3 additions & 3 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

38 changes: 25 additions & 13 deletions plugins/zenoh-plugin-rest/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -296,7 +296,7 @@ fn with_extended_string<R, F: FnMut(&mut String) -> R>(
result
}

async fn query(req: Request<(Arc<Session>, String)>) -> tide::Result<Response> {
async fn query(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Response> {
log::trace!("Incoming GET request: {:?}", req);

let first_accept = match req.header("accept") {
Expand Down Expand Up @@ -374,6 +374,7 @@ async fn query(req: Request<(Arc<Session>, String)>) -> tide::Result<Response> {
},
))
} else {
let body = req.body_bytes().await.unwrap_or_default();
let url = req.url();
let key_expr = match path_to_key_expr(url.path(), &req.state().1) {
Ok(ke) => ke,
Expand All @@ -397,14 +398,15 @@ async fn query(req: Request<(Arc<Session>, String)>) -> tide::Result<Response> {
QueryConsolidation::from(zenoh::query::ConsolidationMode::Latest)
};
let raw = selector.decode().any(|(k, _)| k.as_ref() == RAW_KEY);
match req
.state()
.0
.get(&selector)
.consolidation(consolidation)
.res()
.await
{
let mut query = req.state().0.get(&selector).consolidation(consolidation);
if !body.is_empty() {
let encoding: Encoding = req
.content_type()
.map(|m| m.to_string().into())
.unwrap_or_default();
query = query.with_value(Value::from(body).encoding(encoding));
}
match query.res().await {
Ok(receiver) => {
if raw {
Ok(to_raw_response(receiver).await)
Expand Down Expand Up @@ -439,7 +441,7 @@ async fn write(mut req: Request<(Arc<Session>, String)>) -> tide::Result<Respons
};
let encoding: Encoding = req
.content_type()
.map(|m| m.essence().to_owned().into())
.map(|m| m.to_string().into())
.unwrap_or_default();

// @TODO: Define the right congestion control value
Expand Down Expand Up @@ -481,16 +483,26 @@ pub async fn run(runtime: Runtime, conf: Config) -> ZResult<()> {
app.with(
tide::security::CorsMiddleware::new()
.allow_methods(
"GET, PUT, PATCH, DELETE"
"GET, POST, PUT, PATCH, DELETE"
.parse::<http_types::headers::HeaderValue>()
.unwrap(),
)
.allow_origin(tide::security::Origin::from("*"))
.allow_credentials(false),
);

app.at("/").get(query).put(write).patch(write).delete(write);
app.at("*").get(query).put(write).patch(write).delete(write);
app.at("/")
.get(query)
.post(query)
.put(write)
.patch(write)
.delete(write);
app.at("*")
.get(query)
.post(query)
.put(write)
.patch(write)
.delete(write);

if let Err(e) = app.listen(conf.http_port).await {
log::error!("Unable to start http server for REST: {:?}", e);
Expand Down
2 changes: 1 addition & 1 deletion zenoh-ext/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub use publication_cache::{PublicationCache, PublicationCacheBuilder};
pub use querying_subscriber::{
FetchingSubscriber, FetchingSubscriberBuilder, QueryingSubscriberBuilder,
};
pub use session_ext::SessionExt;
pub use session_ext::{ArcSessionExt, SessionExt};
pub use subscriber_ext::SubscriberBuilderExt;
pub use subscriber_ext::SubscriberForward;

Expand Down
40 changes: 25 additions & 15 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,14 @@ use std::future::Ready;
use zenoh::prelude::r#async::*;
use zenoh::queryable::{Query, Queryable};
use zenoh::subscriber::FlumeSubscriber;
use zenoh::Session;
use zenoh::SessionRef;
use zenoh_core::{AsyncResolve, Resolvable, SyncResolve};
use zenoh_result::{bail, ZResult};
use zenoh_util::core::ResolveFuture;

/// The builder of PublicationCache, allowing to configure it.
pub struct PublicationCacheBuilder<'a, 'b, 'c> {
session: &'a Session,
session: SessionRef<'a>,
pub_key_expr: ZResult<KeyExpr<'b>>,
queryable_prefix: Option<ZResult<KeyExpr<'c>>>,
queryable_origin: Locality,
Expand All @@ -38,7 +38,7 @@ pub struct PublicationCacheBuilder<'a, 'b, 'c> {

impl<'a, 'b, 'c> PublicationCacheBuilder<'a, 'b, 'c> {
pub(crate) fn new(
session: &'a Session,
session: SessionRef<'a>,
pub_key_expr: ZResult<KeyExpr<'b>>,
) -> PublicationCacheBuilder<'a, 'b, 'c> {
PublicationCacheBuilder {
Expand Down Expand Up @@ -136,18 +136,28 @@ impl<'a> PublicationCache<'a> {
}

// declare the local subscriber that will store the local publications
let local_sub = conf
.session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?;

// declare the queryable that will answer to queries on cache
let queryable = conf
.session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?;
let (local_sub, queryable) = match conf.session.clone() {
SessionRef::Borrow(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
SessionRef::Shared(session) => (
session
.declare_subscriber(&key_expr)
.allowed_origin(Locality::SessionLocal)
.res_sync()?,
session
.declare_queryable(&queryable_key_expr)
.allowed_origin(conf.queryable_origin)
.res_sync()?,
),
};

// take local ownership of stuff to be moved into task
let sub_recv = local_sub.receiver.clone();
Expand Down
46 changes: 39 additions & 7 deletions zenoh-ext/src/session_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ use super::PublicationCacheBuilder;
use std::convert::TryInto;
use std::sync::Arc;
use zenoh::prelude::KeyExpr;
use zenoh::Session;
use zenoh::{Session, SessionRef};

/// Some extensions to the [`zenoh::Session`](zenoh::Session)
pub trait SessionExt {
Expand All @@ -37,19 +37,51 @@ impl SessionExt for Session {
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
PublicationCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into))
PublicationCacheBuilder::new(
SessionRef::Borrow(self),
pub_key_expr.try_into().map_err(Into::into),
)
}
}

impl SessionExt for Arc<Session> {
fn declare_publication_cache<'a, 'b, 'c, TryIntoKeyExpr>(
&'a self,
pub trait ArcSessionExt {
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'a, 'b, 'c>
) -> PublicationCacheBuilder<'static, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>;
}

impl ArcSessionExt for Arc<Session> {
/// Examples:
/// ```
/// # async_std::task::block_on(async {
/// use zenoh::prelude::r#async::*;
/// use zenoh::config::ModeDependentValue::Unique;
/// use zenoh_ext::ArcSessionExt;
///
/// let mut config = config::default();
/// config.timestamping.set_enabled(Some(Unique(true)));
/// let session = zenoh::open(config).res().await.unwrap().into_arc();
/// let publication_cache = session.declare_publication_cache("key/expression").res().await.unwrap();
/// async_std::task::spawn(async move {
/// publication_cache.key_expr();
/// }).await;
/// # })
/// ```
fn declare_publication_cache<'b, 'c, TryIntoKeyExpr>(
&self,
pub_key_expr: TryIntoKeyExpr,
) -> PublicationCacheBuilder<'static, 'b, 'c>
where
TryIntoKeyExpr: TryInto<KeyExpr<'b>>,
<TryIntoKeyExpr as TryInto<KeyExpr<'b>>>::Error: Into<zenoh_result::Error>,
{
PublicationCacheBuilder::new(self, pub_key_expr.try_into().map_err(Into::into))
PublicationCacheBuilder::new(
SessionRef::Shared(self.clone()),
pub_key_expr.try_into().map_err(Into::into),
)
}
}

0 comments on commit f23bce5

Please sign in to comment.