-
Notifications
You must be signed in to change notification settings - Fork 173
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Make SHM sub\queryable examples more robust
- Loading branch information
1 parent
aace7f1
commit a36081a
Showing
2 changed files
with
112 additions
and
16 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,9 +12,12 @@ | |
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
use clap::Parser; | ||
use zenoh::{config::Config, key_expr::KeyExpr, prelude::*, shm::zshm}; | ||
use zenoh::{bytes::ZBytes, config::Config, key_expr::KeyExpr, prelude::*}; | ||
use zenoh_examples::CommonArgs; | ||
|
||
#[cfg(all(feature = "shared-memory", feature = "unstable"))] | ||
use zenoh::shm::zshm; | ||
|
||
#[tokio::main] | ||
async fn main() { | ||
// Initiate logging | ||
|
@@ -35,16 +38,23 @@ async fn main() { | |
|
||
println!("Press CTRL-C to quit..."); | ||
while let Ok(sample) = subscriber.recv_async().await { | ||
// Print overall payload information | ||
let (payload_type, payload) = handle_bytes(sample.payload()); | ||
print!( | ||
">> [Subscriber] Received {} ('{}': ", | ||
">> [Subscriber] Received [{}] {} ('{}': '{}')", | ||
payload_type, | ||
sample.kind(), | ||
sample.key_expr().as_str(), | ||
payload | ||
); | ||
match sample.payload().deserialize::<&zshm>() { | ||
Ok(payload) => print!("'{}'", String::from_utf8_lossy(payload)), | ||
Err(e) => print!("'Not a ShmBufInner: {:?}'", e), | ||
|
||
// Print attachment information | ||
if let Some(att) = sample.attachment() { | ||
let (attachment_type, attachment) = handle_bytes(att); | ||
print!(" ({}: {})", attachment_type, attachment); | ||
} | ||
println!(")"); | ||
|
||
println!(); | ||
} | ||
|
||
// // Try to get a mutable reference to the SHM buffer. If this subscriber is the only subscriber | ||
|
@@ -81,3 +91,40 @@ fn parse_args() -> (Config, KeyExpr<'static>) { | |
let args = SubArgs::parse(); | ||
(args.common.into(), args.key) | ||
} | ||
|
||
fn handle_bytes(bytes: &ZBytes) -> (&str, String) { | ||
// Determine buffer type for indication purpose | ||
let bytes_type = { | ||
// if Zenoh is built without SHM support, the only buffer type it can receive is RAW | ||
#[cfg(not(feature = "shared-memory"))] | ||
{ | ||
"RAW" | ||
} | ||
|
||
// if Zenoh is built with SHM support but without SHM API (that is unstable), it can | ||
// receive buffers of any type, but there is no way to detect the buffer type | ||
#[cfg(all(feature = "shared-memory", not(feature = "unstable")))] | ||
{ | ||
"UNKNOWN" | ||
} | ||
|
||
// if Zenoh is built with SHM support and with SHM API we can detect the exact buffer type | ||
#[cfg(all(feature = "shared-memory", feature = "unstable"))] | ||
match bytes.deserialize::<&zshm>() { | ||
Ok(_) => "SHM", | ||
Err(_) => "RAW", | ||
} | ||
}; | ||
|
||
// In order to indicate the real underlying buffer type the code above is written ^^^ | ||
// Sample is SHM-agnostic: Sample handling code works both with SHM and RAW data transparently. | ||
// In other words, the common application compiled with "shared-memory" feature will be able to | ||
// handle incoming SHM data without any changes in the application code. | ||
// | ||
// Refer to z_bytes.rs to see how to deserialize different types of message | ||
let bytes_string = bytes | ||
.deserialize::<String>() | ||
.unwrap_or_else(|e| format!("{}", e)); | ||
|
||
(bytes_type, bytes_string) | ||
} |