-
Notifications
You must be signed in to change notification settings - Fork 35
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
5 changed files
with
238 additions
and
34 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,42 @@ | ||
// | ||
// Copyright (c) 2024 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
|
||
use zenoh::{ | ||
config::Config, | ||
internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, | ||
}; | ||
use zenoh_config::ModeDependentValue; | ||
|
||
pub fn init_env() { | ||
std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp"); | ||
} | ||
|
||
pub async fn create_bridge() { | ||
let mut plugins_mgr = PluginsManager::static_plugins_only(); | ||
plugins_mgr.declare_static_plugin::<zenoh_plugin_ros2dds::ROS2Plugin, &str>("ros2dds", true); | ||
let mut config = Config::default(); | ||
config.insert_json5("plugins/ros2dds", "{}").unwrap(); | ||
config | ||
.timestamping | ||
.set_enabled(Some(ModeDependentValue::Unique(true))) | ||
.unwrap(); | ||
config.adminspace.set_enabled(true).unwrap(); | ||
config.plugins_loading.set_enabled(true).unwrap(); | ||
let mut runtime = RuntimeBuilder::new(config) | ||
.plugins_manager(plugins_mgr) | ||
.build() | ||
.await | ||
.unwrap(); | ||
runtime.start().await.unwrap(); | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,185 @@ | ||
// | ||
// Copyright (c) 2024 ZettaScale Technology | ||
// | ||
// This program and the accompanying materials are made available under the | ||
// terms of the Eclipse Public License 2.0 which is available at | ||
// http://www.eclipse.org/legal/epl-2.0, or the Apache License, Version 2.0 | ||
// which is available at https://www.apache.org/licenses/LICENSE-2.0. | ||
// | ||
// SPDX-License-Identifier: EPL-2.0 OR Apache-2.0 | ||
// | ||
// Contributors: | ||
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
|
||
pub mod common; | ||
|
||
use std::time::Duration; | ||
|
||
use futures::StreamExt; | ||
use r2r::{self, QosProfile}; | ||
use serde_derive::{Deserialize, Serialize}; | ||
use zenoh::Wait; | ||
|
||
// The test service | ||
const TEST_SERVICE_Z2R: &str = "test_service_z2r"; | ||
const TEST_SERVICE_R2Z: &str = "test_service_r2Z"; | ||
|
||
#[derive(Serialize, Deserialize, PartialEq, Clone)] | ||
struct AddTwoIntsRequest { | ||
a: i64, | ||
b: i64, | ||
} | ||
#[derive(Serialize, Deserialize, PartialEq, Clone)] | ||
struct AddTwoIntsReply { | ||
sum: i64, | ||
} | ||
|
||
#[test] | ||
fn test_ros_client_zenoh_service() { | ||
// Run the bridge for MQTT and Zenoh | ||
let rt = tokio::runtime::Runtime::new().unwrap(); | ||
|
||
let (sender, receiver) = std::sync::mpsc::channel(); | ||
|
||
rt.block_on(async { | ||
common::init_env(); | ||
// Create zenoh-bridge-ros2dds | ||
tokio::spawn(common::create_bridge()); | ||
|
||
let a = 1; | ||
let b = 2; | ||
|
||
// Zenoh service | ||
let session = zenoh::open(zenoh::Config::default()).await.unwrap(); | ||
let _queryable = session | ||
.declare_queryable(TEST_SERVICE_R2Z) | ||
.callback(|query| { | ||
let request: AddTwoIntsRequest = | ||
cdr::deserialize(&query.payload().unwrap().to_bytes()).unwrap(); | ||
let response = AddTwoIntsReply { | ||
sum: request.a + request.b, | ||
}; | ||
let data = cdr::serialize::<_, _, cdr::CdrLe>(&response, cdr::Infinite).unwrap(); | ||
query.reply(TEST_SERVICE_R2Z, data).wait().unwrap(); | ||
}) | ||
.await | ||
.unwrap(); | ||
|
||
// ROS client | ||
let ctx = r2r::Context::create().unwrap(); | ||
let mut node = r2r::Node::create(ctx, "ros_client", "").unwrap(); | ||
let client = node | ||
.create_client::<r2r::example_interfaces::srv::AddTwoInts::Service>( | ||
&format!("/{}", TEST_SERVICE_R2Z), | ||
QosProfile::default(), | ||
) | ||
.unwrap(); | ||
|
||
// Node spin | ||
let _handler = tokio::task::spawn_blocking(move || loop { | ||
node.spin_once(std::time::Duration::from_millis(100)); | ||
}); | ||
|
||
// Wait for the environment to be ready | ||
tokio::time::sleep(Duration::from_secs(1)).await; | ||
|
||
// Send the request and then process the response | ||
let my_req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; | ||
let resp = client.request(&my_req).unwrap().await.unwrap(); | ||
assert_eq!(resp.sum, a + b); | ||
|
||
// Tell the main test thread, we're completed | ||
sender.send(()).unwrap(); | ||
}); | ||
|
||
let test_result = receiver.recv_timeout(Duration::from_secs(5)); | ||
// Stop the tokio runtime | ||
// Note that we should shutdown the runtime before doing any check that might panic the test. | ||
// Otherwise, the tasks inside the runtime will never be completed. | ||
rt.shutdown_background(); | ||
match test_result { | ||
Ok(_) => { | ||
println!("Test passed"); | ||
} | ||
Err(_) => { | ||
panic!("Test failed due to timeout....."); | ||
} | ||
} | ||
} | ||
|
||
#[test] | ||
fn test_zenoh_client_ros_service() { | ||
// Run the bridge for MQTT and Zenoh | ||
let rt = tokio::runtime::Runtime::new().unwrap(); | ||
|
||
let (sender, receiver) = std::sync::mpsc::channel(); | ||
|
||
rt.spawn(async move { | ||
common::init_env(); | ||
// Create zenoh-bridge-ros2dds | ||
tokio::spawn(common::create_bridge()); | ||
|
||
let a = 1; | ||
let b = 2; | ||
|
||
// ROS service | ||
let ctx = r2r::Context::create().unwrap(); | ||
let mut node = r2r::Node::create(ctx, "ros_service", "").unwrap(); | ||
let mut service = node | ||
.create_service::<r2r::example_interfaces::srv::AddTwoInts::Service>( | ||
&format!("/{}", TEST_SERVICE_Z2R), | ||
QosProfile::default(), | ||
) | ||
.unwrap(); | ||
// Processing the requests and send back responses | ||
tokio::spawn(async move { | ||
while let Some(req) = service.next().await { | ||
let resp = r2r::example_interfaces::srv::AddTwoInts::Response { | ||
sum: req.message.a + req.message.b, | ||
}; | ||
req.respond(resp).unwrap(); | ||
} | ||
}); | ||
|
||
// Node spin | ||
let _handler = tokio::task::spawn_blocking(move || loop { | ||
node.spin_once(std::time::Duration::from_millis(100)); | ||
}); | ||
|
||
// Zenoh client | ||
let session = zenoh::open(zenoh::Config::default()).await.unwrap(); | ||
let client = session.declare_querier(TEST_SERVICE_Z2R).await.unwrap(); | ||
|
||
// Wait for the environment to be ready | ||
tokio::time::sleep(Duration::from_secs(1)).await; | ||
|
||
// Send request to ROS service | ||
let req = r2r::example_interfaces::srv::AddTwoInts::Request { a, b }; | ||
let buf = cdr::serialize::<_, _, cdr::CdrLe>(&req, cdr::size::Infinite).unwrap(); | ||
let recv_handler = client.get().payload(buf).await.unwrap(); | ||
|
||
// Process the response | ||
let reply = recv_handler.recv().unwrap(); | ||
let reader = reply.result().unwrap().payload().reader(); | ||
let result: Result<i64, _> = cdr::deserialize_from(reader, cdr::size::Infinite); | ||
assert_eq!(result.unwrap(), a + b); | ||
|
||
// Tell the main test thread, we're completed | ||
sender.send(()).unwrap(); | ||
}); | ||
|
||
let test_result = receiver.recv_timeout(Duration::from_secs(5)); | ||
// Stop the tokio runtime | ||
// Note that we should shutdown the runtime before doing any check that might panic the test. | ||
// Otherwise, the tasks inside the runtime will never be completed. | ||
rt.shutdown_background(); | ||
match test_result { | ||
Ok(_) => { | ||
println!("Test passed"); | ||
} | ||
Err(_) => { | ||
panic!("Test failed due to timeout....."); | ||
} | ||
} | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -12,51 +12,25 @@ | |
// ZettaScale Zenoh Team, <[email protected]> | ||
// | ||
|
||
pub mod common; | ||
|
||
use std::{sync::mpsc::channel, time::Duration}; | ||
|
||
use futures::StreamExt; | ||
use r2r::{self, QosProfile}; | ||
use zenoh::{ | ||
config::Config, | ||
internal::{plugins::PluginsManager, runtime::RuntimeBuilder}, | ||
}; | ||
use zenoh_config::ModeDependentValue; | ||
|
||
// The test topic | ||
const TEST_TOPIC: &str = "test_topic"; | ||
// The test TEST_PAYLOAD | ||
const TEST_PAYLOAD: &str = "Hello World"; | ||
|
||
fn init_env() { | ||
std::env::set_var("RMW_IMPLEMENTATION", "rmw_cyclonedds_cpp"); | ||
} | ||
|
||
async fn create_bridge() { | ||
let mut plugins_mgr = PluginsManager::static_plugins_only(); | ||
plugins_mgr.declare_static_plugin::<zenoh_plugin_ros2dds::ROS2Plugin, &str>("ros2dds", true); | ||
let mut config = Config::default(); | ||
config.insert_json5("plugins/ros2dds", "{}").unwrap(); | ||
config | ||
.timestamping | ||
.set_enabled(Some(ModeDependentValue::Unique(true))) | ||
.unwrap(); | ||
config.adminspace.set_enabled(true).unwrap(); | ||
config.plugins_loading.set_enabled(true).unwrap(); | ||
let mut runtime = RuntimeBuilder::new(config) | ||
.plugins_manager(plugins_mgr) | ||
.build() | ||
.await | ||
.unwrap(); | ||
runtime.start().await.unwrap(); | ||
} | ||
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_zenoh_pub_ros_sub() { | ||
init_env(); | ||
common::init_env(); | ||
let (tx, rx) = channel(); | ||
|
||
// Create zenoh-bridge-ros2dds | ||
tokio::spawn(create_bridge()); | ||
tokio::spawn(common::create_bridge()); | ||
|
||
// ROS subscriber | ||
let ctx = r2r::Context::create().unwrap(); | ||
|
@@ -94,9 +68,9 @@ async fn test_zenoh_pub_ros_sub() { | |
|
||
#[tokio::test(flavor = "multi_thread")] | ||
async fn test_ros_pub_zenoh_sub() { | ||
init_env(); | ||
common::init_env(); | ||
// Create zenoh-bridge-ros2dds | ||
tokio::spawn(create_bridge()); | ||
tokio::spawn(common::create_bridge()); | ||
|
||
// Zenoh subscriber | ||
let session = zenoh::open(zenoh::Config::default()).await.unwrap(); | ||
|