diff --git a/CMakeLists.txt b/CMakeLists.txt index ffcf6ff..2a80d9e 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -139,6 +139,7 @@ elseif("$ENV{ROS_VERSION}" STREQUAL "2") ros2_foxglove_bridge/src/param_utils.cpp ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp ros2_foxglove_bridge/src/parameter_interface.cpp + ros2_foxglove_bridge/src/generic_client.cpp ) target_include_directories(foxglove_bridge_component PUBLIC @@ -203,7 +204,7 @@ elseif(ROS_BUILD_TYPE STREQUAL "ament_cmake") target_link_libraries(serialization_test foxglove_bridge_base) ament_add_gtest(smoke_test ros2_foxglove_bridge/tests/smoke_test.cpp) - ament_target_dependencies(smoke_test rclcpp rclcpp_components std_msgs) + ament_target_dependencies(smoke_test rclcpp rclcpp_components std_msgs std_srvs) target_link_libraries(smoke_test foxglove_bridge_base) endif() endif() diff --git a/ros2_foxglove_bridge/include/foxglove_bridge/generic_client.hpp b/ros2_foxglove_bridge/include/foxglove_bridge/generic_client.hpp new file mode 100644 index 0000000..3a1ee65 --- /dev/null +++ b/ros2_foxglove_bridge/include/foxglove_bridge/generic_client.hpp @@ -0,0 +1,52 @@ +#pragma once + +#include + +#include +#include +#include + +namespace foxglove_bridge { + +class GenericClient : public rclcpp::ClientBase { +public: + using SharedRequest = std::shared_ptr; + using SharedResponse = std::shared_ptr; + using Promise = std::promise; + using PromiseWithRequest = std::promise>; + using SharedPromise = std::shared_ptr; + using SharedPromiseWithRequest = std::shared_ptr; + using SharedFuture = std::shared_future; + using SharedFutureWithRequest = std::shared_future>; + using CallbackType = std::function; + using CallbackWithRequestType = std::function; + + RCLCPP_SMART_PTR_DEFINITIONS(GenericClient) + + GenericClient(rclcpp::node_interfaces::NodeBaseInterface* node_base, + rclcpp::node_interfaces::NodeGraphInterface::SharedPtr node_graph, + std::string service_name, std::string service_type, + rcl_client_options_t& client_options); + virtual ~GenericClient() {} + + std::shared_ptr create_response() override; + std::shared_ptr create_request_header() override; + void handle_response(std::shared_ptr request_header, + std::shared_ptr response) override; + SharedFuture async_send_request(SharedRequest request); + SharedFuture async_send_request(SharedRequest request, CallbackType&& cb); + +private: + RCLCPP_DISABLE_COPY(GenericClient) + + std::map> pending_requests_; + std::mutex pending_requests_mutex_; + std::shared_ptr _typeSupportLib; + std::shared_ptr _typeIntrospectionLib; + const rosidl_service_type_support_t* _serviceTypeSupportHdl; + const rosidl_message_type_support_t* _requestTypeSupportHdl; + const rosidl_message_type_support_t* _responseTypeSupportHdl; + const rosidl_service_type_support_t* _typeIntrospectionHdl; +}; + +} // namespace foxglove_bridge diff --git a/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp b/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp index aea155c..cf8001e 100644 --- a/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp +++ b/ros2_foxglove_bridge/include/foxglove_bridge/param_utils.hpp @@ -16,6 +16,7 @@ constexpr char PARAM_CERTFILE[] = "certfile"; constexpr char PARAM_KEYFILE[] = "keyfile"; constexpr char PARAM_MAX_QOS_DEPTH[] = "max_qos_depth"; constexpr char PARAM_TOPIC_WHITELIST[] = "topic_whitelist"; +constexpr char PARAM_SERVICE_WHITELIST[] = "service_whitelist"; constexpr char PARAM_PARAMETER_WHITELIST[] = "param_whitelist"; constexpr int64_t DEFAULT_PORT = 8765; diff --git a/ros2_foxglove_bridge/src/generic_client.cpp b/ros2_foxglove_bridge/src/generic_client.cpp new file mode 100644 index 0000000..05247cb --- /dev/null +++ b/ros2_foxglove_bridge/src/generic_client.cpp @@ -0,0 +1,209 @@ +#include +#include + +#include +#include +#include +#include +#include + +#include + +namespace { + +// Copy of github.com/ros2/rclcpp/blob/33dae5d67/rclcpp/src/rclcpp/typesupport_helpers.cpp#L69-L92 +static std::tuple extract_type_identifier( + const std::string& full_type) { + char type_separator = '/'; + auto sep_position_back = full_type.find_last_of(type_separator); + auto sep_position_front = full_type.find_first_of(type_separator); + if (sep_position_back == std::string::npos || sep_position_back == 0 || + sep_position_back == full_type.length() - 1) { + throw std::runtime_error( + "Message type is not of the form package/type and cannot be processed"); + } + + std::string package_name = full_type.substr(0, sep_position_front); + std::string middle_module = ""; + if (sep_position_back - sep_position_front > 0) { + middle_module = + full_type.substr(sep_position_front + 1, sep_position_back - sep_position_front - 1); + } + std::string type_name = full_type.substr(sep_position_back + 1); + + return std::make_tuple(package_name, middle_module, type_name); +} +} // namespace + +namespace foxglove_bridge { + +constexpr char TYPESUPPORT_INTROSPECTION_LIB_NAME[] = "rosidl_typesupport_introspection_cpp"; +constexpr char TYPESUPPORT_LIB_NAME[] = "rosidl_typesupport_cpp"; +using rosidl_typesupport_introspection_cpp::MessageMembers; +using rosidl_typesupport_introspection_cpp::ServiceMembers; + +std::shared_ptr allocate_message(const MessageMembers* members) { + uint8_t* buf = static_cast(malloc(members->size_of_)); + memset(buf, 0, members->size_of_); + members->init_function(buf, rosidl_runtime_cpp::MessageInitialization::ALL); + return std::shared_ptr(buf); +} + +std::string getTypeIntrospectionSymbolName(const std::string& serviceType) { + const auto [pkgName, middleModule, typeName] = extract_type_identifier(serviceType); + + return std::string(TYPESUPPORT_INTROSPECTION_LIB_NAME) + "__get_service_type_support_handle__" + + pkgName + "__" + (middleModule.empty() ? "srv" : middleModule) + "__" + typeName; +} + +/** + * The default symbol names for getting type support handles for services are missing from the + * rosidl_typesupport_cpp shared libraries, see + * https://github.com/ros2/rosidl_typesupport/issues/122 + * + * We can however, as a (hacky) workaround, use other symbols defined in the shared library. + * With `nm -C -D /opt/ros/humble/lib/libtest_msgs__rosidl_typesupport_cpp.so` we see that there is + * `rosidl_service_type_support_t const* + * rosidl_typesupport_cpp::get_service_type_support_handle()` which + * mangled becomes + * `_ZN22rosidl_typesupport_cpp31get_service_type_support_handleIN9test_msgs3srv10BasicTypesEEEPK29rosidl_service_type_support_tv` + * This is the same for galactic, humble and rolling (tested with gcc / clang) + * + * This function produces the mangled symbol name for a given service type. + * + * \param[in] serviceType The service type, e.g. "test_msgs/srv/BasicTypes" + * \return Symbol name for getting the service type support handle + */ +std::string getServiceTypeSupportHandleSymbolName(const std::string& serviceType) { + const auto [pkgName, middleModule, typeName] = extract_type_identifier(serviceType); + const auto lengthPrefixedString = [](const std::string& s) { + return std::to_string(s.size()) + s; + }; + + return "_ZN" + lengthPrefixedString(TYPESUPPORT_LIB_NAME) + + lengthPrefixedString("get_service_type_support_handle") + "IN" + + lengthPrefixedString(pkgName) + + lengthPrefixedString(middleModule.empty() ? "srv" : middleModule) + + lengthPrefixedString(typeName) + "EEEPK" + + lengthPrefixedString("rosidl_service_type_support_t") + "v"; +} + +GenericClient::GenericClient(rclcpp::node_interfaces::NodeBaseInterface* nodeBase, + rclcpp::node_interfaces::NodeGraphInterface::SharedPtr nodeGraph, + std::string serviceName, std::string serviceType, + rcl_client_options_t& client_options) + : rclcpp::ClientBase(nodeBase, nodeGraph) { + const auto [pkgName, middleModule, typeName] = extract_type_identifier(serviceType); + const auto requestTypeName = serviceType + "_Request"; + const auto responseTypeName = serviceType + "_Response"; + + _typeSupportLib = rclcpp::get_typesupport_library(serviceType, TYPESUPPORT_LIB_NAME); + _typeIntrospectionLib = + rclcpp::get_typesupport_library(serviceType, TYPESUPPORT_INTROSPECTION_LIB_NAME); + if (!_typeSupportLib || !_typeIntrospectionLib) { + throw std::runtime_error("Failed to load shared library for service type " + serviceType); + } + + const auto typesupportSymbolName = getServiceTypeSupportHandleSymbolName(serviceType); + if (!_typeSupportLib->has_symbol(typesupportSymbolName)) { + throw std::runtime_error("Failed to find symbol '" + typesupportSymbolName + "' in " + + _typeSupportLib->get_library_path()); + } + + const rosidl_service_type_support_t* (*get_ts)() = nullptr; + _serviceTypeSupportHdl = + (reinterpret_cast(_typeSupportLib->get_symbol(typesupportSymbolName)))(); + + const auto typeinstrospection_symbol_name = getTypeIntrospectionSymbolName(serviceType); + + // This will throw runtime_error if the symbol was not found. + _typeIntrospectionHdl = (reinterpret_cast( + _typeIntrospectionLib->get_symbol(typeinstrospection_symbol_name)))(); + + _requestTypeSupportHdl = + rclcpp::get_typesupport_handle(requestTypeName, TYPESUPPORT_LIB_NAME, *_typeSupportLib); + _responseTypeSupportHdl = + rclcpp::get_typesupport_handle(responseTypeName, TYPESUPPORT_LIB_NAME, *_typeSupportLib); + + rcl_ret_t ret = rcl_client_init(this->get_client_handle().get(), this->get_rcl_node_handle(), + _serviceTypeSupportHdl, serviceName.c_str(), &client_options); + if (ret != RCL_RET_OK) { + if (ret == RCL_RET_SERVICE_NAME_INVALID) { + auto rcl_node_handle = this->get_rcl_node_handle(); + // this will throw on any validation problem + rcl_reset_error(); + rclcpp::expand_topic_or_service_name(serviceName, rcl_node_get_name(rcl_node_handle), + rcl_node_get_namespace(rcl_node_handle), true); + } + rclcpp::exceptions::throw_from_rcl_error(ret, "could not create client"); + } +} + +std::shared_ptr GenericClient::create_response() { + auto srv_members = static_cast(_typeIntrospectionHdl->data); + return allocate_message(srv_members->response_members_); +} + +std::shared_ptr GenericClient::create_request_header() { + return std::shared_ptr(new rmw_request_id_t); +} + +void GenericClient::handle_response(std::shared_ptr request_header, + std::shared_ptr response) { + std::unique_lock lock(pending_requests_mutex_); + int64_t sequence_number = request_header->sequence_number; + + auto ser_response = std::make_shared(); + rmw_ret_t r = rmw_serialize(response.get(), _responseTypeSupportHdl, + &ser_response->get_rcl_serialized_message()); + if (r != RMW_RET_OK) { + RCUTILS_LOG_ERROR_NAMED("foxglove_bridge", "Failed to serialize service response. Ignoring..."); + return; + } + + // TODO(esteve) this should throw instead since it is not expected to happen in the first place + if (this->pending_requests_.count(sequence_number) == 0) { + RCUTILS_LOG_ERROR_NAMED("foxglove_bridge", "Received invalid sequence number. Ignoring..."); + return; + } + auto tuple = this->pending_requests_[sequence_number]; + auto call_promise = std::get<0>(tuple); + auto callback = std::get<1>(tuple); + auto future = std::get<2>(tuple); + this->pending_requests_.erase(sequence_number); + // Unlock here to allow the service to be called recursively from one of its callbacks. + lock.unlock(); + + call_promise->set_value(ser_response); + callback(future); +} + +GenericClient::SharedFuture GenericClient::async_send_request(SharedRequest request) { + return async_send_request(request, [](SharedFuture) {}); +} + +GenericClient::SharedFuture GenericClient::async_send_request(SharedRequest request, + CallbackType&& cb) { + std::lock_guard lock(pending_requests_mutex_); + int64_t sequence_number; + + auto srv_members = static_cast(_typeIntrospectionHdl->data); + auto buf = allocate_message(srv_members->request_members_); + + const rmw_serialized_message_t* sm = &request->get_rcl_serialized_message(); + if (const auto ret = rmw_deserialize(sm, _requestTypeSupportHdl, buf.get()) != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to desirialize request"); + } + rcl_ret_t ret = rcl_send_request(get_client_handle().get(), buf.get(), &sequence_number); + if (RCL_RET_OK != ret) { + rclcpp::exceptions::throw_from_rcl_error(ret, "failed to send request"); + } + + SharedPromise call_promise = std::make_shared(); + SharedFuture f(call_promise->get_future()); + pending_requests_[sequence_number] = + std::make_tuple(call_promise, std::forward(cb), f); + return f; +} + +} // namespace foxglove_bridge diff --git a/ros2_foxglove_bridge/src/message_definition_cache.cpp b/ros2_foxglove_bridge/src/message_definition_cache.cpp index 8b52674..6a4ebd4 100644 --- a/ros2_foxglove_bridge/src/message_definition_cache.cpp +++ b/ros2_foxglove_bridge/src/message_definition_cache.cpp @@ -16,8 +16,9 @@ namespace foxglove { -// Match datatype names (foo_msgs/Bar or foo_msgs/msg/Bar) -static const std::regex PACKAGE_TYPENAME_REGEX{R"(^([a-zA-Z0-9_]+)/(?:msg/)?([a-zA-Z0-9_]+)$)"}; +// Match datatype names (foo_msgs/Bar or foo_msgs/msg/Bar or foo_msgs/srv/Bar) +static const std::regex PACKAGE_TYPENAME_REGEX{ + R"(^([a-zA-Z0-9_]+)/(?:msg/|srv/)?([a-zA-Z0-9_]+)$)"}; // Match field types from .msg definitions ("foo_msgs/Bar" in "foo_msgs/Bar[] bar") static const std::regex MSG_FIELD_TYPE_REGEX{R"((?:^|\n)\s*([a-zA-Z0-9_/]+)(?:\[[^\]]*\])?\s+)"}; diff --git a/ros2_foxglove_bridge/src/param_utils.cpp b/ros2_foxglove_bridge/src/param_utils.cpp index 75b110f..99e006d 100644 --- a/ros2_foxglove_bridge/src/param_utils.cpp +++ b/ros2_foxglove_bridge/src/param_utils.cpp @@ -80,6 +80,15 @@ void declareParameters(rclcpp::Node* node) { node->declare_parameter(PARAM_TOPIC_WHITELIST, std::vector({".*"}), topicWhiteListDescription); + auto serviceWhiteListDescription = rcl_interfaces::msg::ParameterDescriptor{}; + serviceWhiteListDescription.name = PARAM_SERVICE_WHITELIST; + serviceWhiteListDescription.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING_ARRAY; + serviceWhiteListDescription.description = + "List of regular expressions (ECMAScript) of whitelisted service names."; + serviceWhiteListDescription.read_only = true; + node->declare_parameter(PARAM_SERVICE_WHITELIST, std::vector({".*"}), + serviceWhiteListDescription); + auto paramWhiteListDescription = rcl_interfaces::msg::ParameterDescriptor{}; paramWhiteListDescription.name = PARAM_PARAMETER_WHITELIST; paramWhiteListDescription.type = rcl_interfaces::msg::ParameterType::PARAMETER_STRING_ARRAY; diff --git a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp index d34e40d..b138091 100644 --- a/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp +++ b/ros2_foxglove_bridge/src/ros2_foxglove_bridge.cpp @@ -10,6 +10,7 @@ #define ASIO_STANDALONE #include +#include #include #include #include @@ -47,6 +48,8 @@ class FoxgloveBridge : public rclcpp::Node { _maxQosDepth = static_cast(this->get_parameter(PARAM_MAX_QOS_DEPTH).as_int()); const auto topicWhiteList = this->get_parameter(PARAM_TOPIC_WHITELIST).as_string_array(); _topicWhitelistPatterns = parseRegexStrings(this, topicWhiteList); + const auto serviceWhiteList = this->get_parameter(PARAM_SERVICE_WHITELIST).as_string_array(); + _serviceWhitelistPatterns = parseRegexStrings(this, serviceWhiteList); const auto paramWhiteList = this->get_parameter(PARAM_PARAMETER_WHITELIST).as_string_array(); const auto paramWhitelistPatterns = parseRegexStrings(this, paramWhiteList); _useSimTime = this->get_parameter("use_sim_time").as_bool(); @@ -58,6 +61,7 @@ class FoxgloveBridge : public rclcpp::Node { foxglove::CAPABILITY_CLIENT_PUBLISH, foxglove::CAPABILITY_PARAMETERS, foxglove::CAPABILITY_PARAMETERS_SUBSCRIBE, + foxglove::CAPABILITY_SERVICES, }; if (_useSimTime) { serverCapabilities.push_back(foxglove::CAPABILITY_TIME); @@ -89,6 +93,8 @@ class FoxgloveBridge : public rclcpp::Node { std::bind(&FoxgloveBridge::parameterChangeHandler, this, _1, _2, _3)); _server->setParameterSubscriptionHandler( std::bind(&FoxgloveBridge::parameterSubscriptionHandler, this, _1, _2, _3)); + _server->setServiceRequestHandler( + std::bind(&FoxgloveBridge::serviceRequestHandler, this, _1, _2)); _paramInterface->setParamUpdateCallback(std::bind(&FoxgloveBridge::parameterUpdates, this, _1)); @@ -109,6 +115,7 @@ class FoxgloveBridge : public rclcpp::Node { _subscriptionCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant); _clientPublishCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive); + _servicesCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant); if (_useSimTime) { _clockSubscription = this->create_subscription( @@ -132,6 +139,7 @@ class FoxgloveBridge : public rclcpp::Node { void rosgraphPollThread() { updateAdvertisedTopics(); + updateAdvertisedServices(); auto graphEvent = this->get_graph_event(); while (rclcpp::ok()) { @@ -140,6 +148,7 @@ class FoxgloveBridge : public rclcpp::Node { if (triggered) { RCLCPP_DEBUG(this->get_logger(), "rosgraph change detected"); updateAdvertisedTopics(); + updateAdvertisedServices(); // Graph changes tend to come in batches, so wait a bit before checking again std::this_thread::sleep_for(500ms); } @@ -265,6 +274,97 @@ class FoxgloveBridge : public rclcpp::Node { } } + void updateAdvertisedServices() { + if (!rclcpp::ok()) { + return; + } + + // Get the current list of visible services and datatypes from the ROS graph + const auto serviceNamesAndTypes = + this->get_node_graph_interface()->get_service_names_and_types(); + + std::lock_guard lock(_servicesMutex); + + // Remove advertisements for services that have been removed + std::vector servicesToRemove; + for (const auto& service : _advertisedServices) { + const auto it = std::find_if(serviceNamesAndTypes.begin(), serviceNamesAndTypes.end(), + [service](const auto& serviceNameAndTypes) { + return serviceNameAndTypes.first == service.second.name; + }); + if (it == serviceNamesAndTypes.end()) { + servicesToRemove.push_back(service.first); + } + } + for (auto serviceId : servicesToRemove) { + _advertisedServices.erase(serviceId); + } + _server->removeServices(servicesToRemove); + + // Advertise new services + std::vector newServices; + for (const auto& serviceNamesAndType : serviceNamesAndTypes) { + const auto& serviceName = serviceNamesAndType.first; + const auto& datatypes = serviceNamesAndType.second; + + // Ignore the service if it's already advertised + if (std::find_if(_advertisedServices.begin(), _advertisedServices.end(), + [serviceName](const auto& idWithService) { + return idWithService.second.name == serviceName; + }) != _advertisedServices.end()) { + continue; + } + + // Ignore the service if it is not on the service whitelist + if (std::find_if(_serviceWhitelistPatterns.begin(), _serviceWhitelistPatterns.end(), + [&serviceName](const auto& regex) { + return std::regex_match(serviceName, regex); + }) == _serviceWhitelistPatterns.end()) { + continue; + } + + foxglove::ServiceWithoutId service; + service.name = serviceName; + service.type = datatypes.front(); + + try { + auto [format, reqSchema] = _messageDefinitionCache.get_full_text(service.type + "_Request"); + auto resSchema = _messageDefinitionCache.get_full_text(service.type + "_Response").second; + switch (format) { + case foxglove::MessageDefinitionFormat::MSG: + service.requestSchema = reqSchema; + service.responseSchema = resSchema; + break; + case foxglove::MessageDefinitionFormat::IDL: + RCLCPP_WARN(this->get_logger(), + "IDL message definition format cannot be communicated over ws-protocol. " + "Service \"%s\" (%s) may not decode correctly in clients", + service.name.c_str(), service.type.c_str()); + service.requestSchema = reqSchema; + service.responseSchema = resSchema; + break; + } + } catch (const foxglove::DefinitionNotFoundError& err) { + RCLCPP_WARN(this->get_logger(), "Could not find definition for type %s: %s", + service.type.c_str(), err.what()); + // We still advertise the service, but with an emtpy schema + service.requestSchema = ""; + service.responseSchema = ""; + } catch (const std::exception& err) { + RCLCPP_WARN(this->get_logger(), "Failed to add service \"%s\" (%s): %s", + service.name.c_str(), service.type.c_str(), err.what()); + continue; + } + + newServices.push_back(service); + } + + const auto serviceIds = _server->addServices(newServices); + for (size_t i = 0; i < serviceIds.size(); ++i) { + _advertisedServices.emplace(serviceIds[i], newServices[i]); + } + } + private: struct PairHash { template @@ -276,15 +376,20 @@ class FoxgloveBridge : public rclcpp::Node { std::unique_ptr _server; foxglove::MessageDefinitionCache _messageDefinitionCache; std::vector _topicWhitelistPatterns; + std::vector _serviceWhitelistPatterns; std::shared_ptr _paramInterface; std::unordered_map _advertisedTopics; + std::unordered_map _advertisedServices; std::unordered_map _channelToTopicAndDatatype; std::unordered_map _subscriptions; PublicationsByClient _clientAdvertisedTopics; + std::unordered_map _serviceClients; rclcpp::CallbackGroup::SharedPtr _subscriptionCallbackGroup; rclcpp::CallbackGroup::SharedPtr _clientPublishCallbackGroup; + rclcpp::CallbackGroup::SharedPtr _servicesCallbackGroup; std::mutex _subscriptionsMutex; std::mutex _clientAdvertisementsMutex; + std::mutex _servicesMutex; std::unique_ptr _rosgraphPollThread; size_t _maxQosDepth = DEFAULT_MAX_QOS_DEPTH; std::shared_ptr> _clockSubscription; @@ -621,6 +726,57 @@ class FoxgloveBridge : public rclcpp::Node { msg->get_rcl_serialized_message().buffer_length}; _server->sendMessage(clientHandle, channel.id, static_cast(timestamp), payload); } + + void serviceRequestHandler(const foxglove::ServiceRequest& request, + foxglove::ConnHandle clientHandle) { + RCLCPP_DEBUG(this->get_logger(), "Received a request for service %d", request.serviceId); + + std::lock_guard lock(_servicesMutex); + const auto serviceIt = _advertisedServices.find(request.serviceId); + if (serviceIt == _advertisedServices.end()) { + RCLCPP_ERROR(this->get_logger(), "Service with id '%d' does not exist", request.serviceId); + return; + } + + auto clientIt = _serviceClients.find(request.serviceId); + if (clientIt == _serviceClients.end()) { + try { + auto clientOptions = rcl_client_get_default_options(); + auto genClient = GenericClient::make_shared( + this->get_node_base_interface().get(), this->get_node_graph_interface(), + serviceIt->second.name, serviceIt->second.type, clientOptions); + clientIt = _serviceClients.emplace(request.serviceId, std::move(genClient)).first; + this->get_node_services_interface()->add_client(clientIt->second, _servicesCallbackGroup); + } catch (const std::exception& ex) { + RCLCPP_ERROR(get_logger(), "Failed to create service client for service %d (%s): %s", + request.serviceId, serviceIt->second.name.c_str(), ex.what()); + return; + } + } + + auto client = clientIt->second; + if (!client->wait_for_service(1s)) { + RCLCPP_ERROR(get_logger(), "Service %d (%s) is not available", request.serviceId, + serviceIt->second.name.c_str()); + return; + } + + auto reqMessage = std::make_shared(request.data.size()); + auto& rclSerializedMsg = reqMessage->get_rcl_serialized_message(); + std::memcpy(rclSerializedMsg.buffer, request.data.data(), request.data.size()); + rclSerializedMsg.buffer_length = request.data.size(); + + auto responseReceivedCallback = [this, request, + clientHandle](GenericClient::SharedFuture future) { + const auto serializedResponseMsg = future.get()->get_rcl_serialized_message(); + foxglove::ServiceRequest response{request.serviceId, request.callId, request.encoding, + std::vector(serializedResponseMsg.buffer_length)}; + std::memcpy(response.data.data(), serializedResponseMsg.buffer, + serializedResponseMsg.buffer_length); + _server->sendServiceResponse(clientHandle, response); + }; + client->async_send_request(reqMessage, responseReceivedCallback); + } }; } // namespace foxglove_bridge diff --git a/ros2_foxglove_bridge/tests/smoke_test.cpp b/ros2_foxglove_bridge/tests/smoke_test.cpp index f74ef66..a52d2ed 100644 --- a/ros2_foxglove_bridge/tests/smoke_test.cpp +++ b/ros2_foxglove_bridge/tests/smoke_test.cpp @@ -7,6 +7,7 @@ #include #include #include +#include #include #include @@ -71,6 +72,58 @@ class ParameterTest : public ::testing::Test { std::shared_ptr> _wsClient; }; +class ServiceTest : public ::testing::Test { +public: + inline static const std::string SERVICE_NAME = "/foo_service"; + +protected: + void SetUp() override { + _node = rclcpp::Node::make_shared("node"); + _service = _node->create_service( + SERVICE_NAME, [&](std::shared_ptr req, + std::shared_ptr res) { + res->message = "hello"; + res->success = req->data; + }); + + _executor.add_node(_node); + _executorThread = std::thread([this]() { + _executor.spin(); + }); + } + + void TearDown() override { + _executor.cancel(); + _executorThread.join(); + } + + rclcpp::executors::SingleThreadedExecutor _executor; + rclcpp::Node::SharedPtr _node; + rclcpp::ServiceBase::SharedPtr _service; + std::thread _executorThread; + std::shared_ptr> _wsClient; +}; + +template +std::shared_ptr serializeMsg(const T* msg) { + using rosidl_typesupport_cpp::get_message_type_support_handle; + auto typeSupportHdl = get_message_type_support_handle(); + auto result = std::make_shared(); + rmw_ret_t ret = rmw_serialize(msg, typeSupportHdl, &result->get_rcl_serialized_message()); + EXPECT_EQ(ret, RMW_RET_OK); + return result; +} + +template +std::shared_ptr deserializeMsg(const rcl_serialized_message_t* msg) { + using rosidl_typesupport_cpp::get_message_type_support_handle; + auto typeSupportHdl = get_message_type_support_handle(); + auto result = std::make_shared(); + rmw_ret_t ret = rmw_deserialize(msg, typeSupportHdl, result.get()); + EXPECT_EQ(ret, RMW_RET_OK); + return result; +} + TEST(SmokeTest, testConnection) { foxglove::Client wsClient; EXPECT_EQ(std::future_status::ready, wsClient.connect(URI).wait_for(DEFAULT_TIMEOUT)); @@ -301,6 +354,58 @@ TEST_F(ParameterTest, testGetParametersParallel) { } } +TEST_F(ServiceTest, testCallServiceParallel) { + // Connect a few clients (in parallel) and make sure that they can all call the service + auto clients = { + std::make_shared>(), + std::make_shared>(), + std::make_shared>(), + }; + + auto serviceFuture = foxglove::waitForService(*clients.begin(), SERVICE_NAME); + for (auto client : clients) { + ASSERT_EQ(std::future_status::ready, client->connect(URI).wait_for(std::chrono::seconds(5))); + } + ASSERT_EQ(std::future_status::ready, serviceFuture.wait_for(std::chrono::seconds(5))); + const foxglove::Service service = serviceFuture.get(); + + std_srvs::srv::SetBool::Request requestMsg; + requestMsg.data = true; + const auto serializedRequest = serializeMsg(&requestMsg); + const auto& serRequestMsg = serializedRequest->get_rcl_serialized_message(); + + foxglove::ServiceRequest request; + request.serviceId = service.id; + request.callId = 123lu; + request.encoding = "cdr"; + request.data.resize(serRequestMsg.buffer_length); + std::memcpy(request.data.data(), serRequestMsg.buffer, serRequestMsg.buffer_length); + + std::vector> futures; + for (auto client : clients) { + futures.push_back(foxglove::waitForServiceResponse(client)); + client->sendServiceRequest(request); + } + + for (auto& future : futures) { + ASSERT_EQ(std::future_status::ready, future.wait_for(std::chrono::seconds(5))); + foxglove::ServiceResponse response; + EXPECT_NO_THROW(response = future.get()); + EXPECT_EQ(response.serviceId, request.serviceId); + EXPECT_EQ(response.callId, request.callId); + EXPECT_EQ(response.encoding, request.encoding); + + rclcpp::SerializedMessage serializedResponseMsg(response.data.size()); + auto& serMsg = serializedResponseMsg.get_rcl_serialized_message(); + std::memcpy(serMsg.buffer, response.data.data(), response.data.size()); + serMsg.buffer_length = response.data.size(); + const auto resMsg = deserializeMsg(&serMsg); + + EXPECT_EQ(resMsg->message, "hello"); + EXPECT_EQ(resMsg->success, requestMsg.data); + } +} + // Run all the tests that were declared with TEST() int main(int argc, char** argv) { testing::InitGoogleTest(&argc, argv);