diff --git a/examples/examples/z_queryable_shm.rs b/examples/examples/z_queryable_shm.rs index 75da0379e2..a5be1252a0 100644 --- a/examples/examples/z_queryable_shm.rs +++ b/examples/examples/z_queryable_shm.rs @@ -13,6 +13,7 @@ // use clap::Parser; use zenoh::{ + bytes::ZBytes, key_expr::KeyExpr, prelude::*, shm::{ @@ -63,18 +64,29 @@ async fn main() { println!("Press CTRL-C to quit..."); while let Ok(query) = queryable.recv_async().await { - print!( - ">> [Queryable] Received Query '{}' ('{}'", - query.selector(), - query.key_expr().as_str(), - ); - if let Some(query_payload) = query.payload() { - match query_payload.deserialize::<&zshm>() { - Ok(p) => print!(": '{}'", String::from_utf8_lossy(p)), - Err(e) => print!(": 'Not a ShmBufInner: {:?}'", e), + // Print overall query payload information + match query.payload() { + Some(payload) => { + let (payload_type, payload) = handle_bytes(payload); + print!( + ">> [Queryable] Received Query [{}] ('{}': '{}')", + payload_type, + query.selector(), + payload + ); + } + None => { + print!(">> Received Query '{}'", query.selector()); } + }; + + // Print attachment information + if let Some(att) = query.attachment() { + let (attachment_type, attachment) = handle_bytes(att); + print!(" ({}: {})", attachment_type, attachment); } - println!(")"); + + println!(); // Allocate an SHM buffer // NOTE: For allocation API please check z_alloc_shm.rs example @@ -119,3 +131,40 @@ fn parse_args() -> (Config, KeyExpr<'static>, String, bool) { let args = Args::parse(); (args.common.into(), args.key, args.payload, args.complete) } + +fn handle_bytes(bytes: &ZBytes) -> (&str, String) { + // Determine buffer type for indication purpose + let bytes_type = { + // if Zenoh is built without SHM support, the only buffer type it can receive is RAW + #[cfg(not(feature = "shared-memory"))] + { + "RAW" + } + + // if Zenoh is built with SHM support but without SHM API (that is unstable), it can + // receive buffers of any type, but there is no way to detect the buffer type + #[cfg(all(feature = "shared-memory", not(feature = "unstable")))] + { + "UNKNOWN" + } + + // if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type + #[cfg(all(feature = "shared-memory", feature = "unstable"))] + match bytes.deserialize::<&zshm>() { + Ok(_) => "SHM", + Err(_) => "RAW", + } + }; + + // In order to indicate the real underlying buffer type the code above is written ^^^ + // Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently. + // In other words, the common application compiled with "shared-memory" feature will be able to + // handle incoming SHM data without any changes in the application code. + // + // Refer to z_bytes.rs to see how to deserialize different types of message + let bytes_string = bytes + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)); + + (bytes_type, bytes_string) +} diff --git a/examples/examples/z_sub_shm.rs b/examples/examples/z_sub_shm.rs index e32c6140ac..f2922b1630 100644 --- a/examples/examples/z_sub_shm.rs +++ b/examples/examples/z_sub_shm.rs @@ -12,9 +12,12 @@ // ZettaScale Zenoh Team, // use clap::Parser; -use zenoh::{config::Config, key_expr::KeyExpr, prelude::*, shm::zshm}; +use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*}; use zenoh_examples::CommonArgs; +#[cfg(all(feature = "shared-memory", feature = "unstable"))] +use zenoh::shm::zshm; + #[tokio::main] async fn main() { // Initiate logging @@ -35,16 +38,23 @@ async fn main() { println!("Press CTRL-C to quit..."); while let Ok(sample) = subscriber.recv_async().await { + // Print overall payload information + let (payload_type, payload) = handle_bytes(sample.payload()); print!( - ">> [Subscriber] Received {} ('{}': ", + ">> [Subscriber] Received [{}] {} ('{}': '{}')", + payload_type, sample.kind(), sample.key_expr().as_str(), + payload ); - match sample.payload().deserialize::<&zshm>() { - Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)), - Err(e) => print!("'Not a ShmBufInner: {:?}'", e), + + // Print attachment information + if let Some(att) = sample.attachment() { + let (attachment_type, attachment) = handle_bytes(att); + print!(" ({}: {})", attachment_type, attachment); } - println!(")"); + + println!(); } // // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber @@ -81,3 +91,40 @@ fn parse_args() -> (Config, KeyExpr<'static>) { let args = SubArgs::parse(); (args.common.into(), args.key) } + +fn handle_bytes(bytes: &ZBytes) -> (&str, String) { + // Determine buffer type for indication purpose + let bytes_type = { + // if Zenoh is built without SHM support, the only buffer type it can receive is RAW + #[cfg(not(feature = "shared-memory"))] + { + "RAW" + } + + // if Zenoh is built with SHM support but without SHM API (that is unstable), it can + // receive buffers of any type, but there is no way to detect the buffer type + #[cfg(all(feature = "shared-memory", not(feature = "unstable")))] + { + "UNKNOWN" + } + + // if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type + #[cfg(all(feature = "shared-memory", feature = "unstable"))] + match bytes.deserialize::<&zshm>() { + Ok(_) => "SHM", + Err(_) => "RAW", + } + }; + + // In order to indicate the real underlying buffer type the code above is written ^^^ + // Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently. + // In other words, the common application compiled with "shared-memory" feature will be able to + // handle incoming SHM data without any changes in the application code. + // + // Refer to z_bytes.rs to see how to deserialize different types of message + let bytes_string = bytes + .deserialize::() + .unwrap_or_else(|e| format!("{}", e)); + + (bytes_type, bytes_string) +}