Skip to content

Commit

Permalink
samplefields used
Browse files Browse the repository at this point in the history
  • Loading branch information
milyin committed Mar 24, 2024
1 parent f52140a commit a629c76
Show file tree
Hide file tree
Showing 2 changed files with 24 additions and 15 deletions.
16 changes: 8 additions & 8 deletions plugins/zenoh-plugin-example/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::sync::{
use zenoh::plugins::{RunningPluginTrait, ZenohPlugin};
use zenoh::prelude::r#async::*;
use zenoh::runtime::Runtime;
use zenoh::sample::SampleFields;
use zenoh::sample_builder::SampleBuilderTrait;
use zenoh_core::zlock;
use zenoh_plugin_trait::{plugin_long_version, plugin_version, Plugin, PluginControl};
Expand Down Expand Up @@ -175,15 +176,14 @@ async fn run(runtime: Runtime, selector: KeyExpr<'_>, flag: Arc<AtomicBool>) {
info!("Handling query '{}'", query.selector());
for (key_expr, sample) in stored.iter() {
if query.selector().key_expr.intersects(unsafe{keyexpr::from_str_unchecked(key_expr)}) {
let SampleFields { key_expr, timestamp, attachment, source_info, payload, kind, .. } = sample.clone().into();
let reply = query
.reply_sample(sample.key_expr().clone().into_owned())
.with_timestamp_opt(sample.timestamp().cloned());
#[cfg(feature = "unstable")]
let reply = reply
.with_attachment_opt(sample.attachment())
.with_source_info(sample.source_info());
match sample.kind() {
SampleKind::Put => reply.put(sample.payload().clone()).res().await.unwrap(),
.reply_sample(key_expr)
.with_timestamp_opt(timestamp)
.with_attachment_opt(attachment)
.with_source_info(source_info);
match kind {
SampleKind::Put => reply.put(payload).res().await.unwrap(),
SampleKind::Delete => reply.delete().res().await.unwrap(),
}
}
Expand Down
23 changes: 16 additions & 7 deletions zenoh-ext/src/publication_cache.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use std::convert::TryInto;
use std::future::Ready;
use zenoh::prelude::r#async::*;
use zenoh::queryable::{Query, Queryable};
use zenoh::sample::SampleFields;
use zenoh::sample_builder::SampleBuilderTrait;
use zenoh::subscriber::FlumeSubscriber;
use zenoh::SessionRef;
Expand Down Expand Up @@ -118,14 +119,22 @@ pub struct PublicationCache<'a> {
}

async fn reply_sample(query: &Query, sample: &Sample) {
let SampleFields {
key_expr,
timestamp,
attachment,
source_info,
payload,
kind,
..
} = sample.clone().into();
let reply = query
.reply_sample(sample.key_expr().clone().into_owned())
.with_timestamp_opt(sample.timestamp().cloned());
let reply = reply
.with_attachment_opt(sample.attachment().cloned())
.with_source_info(sample.source_info().clone());
if let Err(e) = match sample.kind() {
SampleKind::Put => reply.put(sample.payload().clone()).res_async().await,
.reply_sample(key_expr)
.with_timestamp_opt(timestamp)
.with_attachment_opt(attachment)
.with_source_info(source_info);
if let Err(e) = match kind {
SampleKind::Put => reply.put(payload).res_async().await,
SampleKind::Delete => reply.delete().res_async().await,
} {
log::warn!("Error replying to query: {}", e);
Expand Down

0 comments on commit a629c76

Please sign in to comment.