diff --git a/rclcpp/CHANGELOG.rst b/rclcpp/CHANGELOG.rst index e71a1ab69a..f368a5eb3f 100644 --- a/rclcpp/CHANGELOG.rst +++ b/rclcpp/CHANGELOG.rst @@ -2,6 +2,20 @@ Changelog for package rclcpp ^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +15.1.0 (2022-03-01) +------------------- +* spin_all with a zero timeout. (`#1878 `_) +* Add RMW listener APIs (`#1579 `_) +* Remove fastrtps customization on tests (`#1887 `_) +* Install headers to include/${PROJECT_NAME} (`#1888 `_) +* Use ament_generate_version_header (`#1886 `_) +* use universal reference to support rvalue. (`#1883 `_) +* fix one subscription can wait_for_message twice (`#1870 `_) +* Add return value version of get_parameter_or (`#1813 `_) +* Cleanup time source object lifetimes (`#1867 `_) +* add is_spinning() method to executor base class +* Contributors: Alberto Soragna, Chen Lihui, Chris Lalancette, Kenji Miyake, Miguel Company, Shane Loretz, Tomoya Fujita, iRobot ROS + 15.0.0 (2022-01-14) ------------------- * Cleanup the TypeAdapt tests (`#1858 `_) diff --git a/rclcpp/CMakeLists.txt b/rclcpp/CMakeLists.txt index a18cf19c40..9fa483e4b2 100644 --- a/rclcpp/CMakeLists.txt +++ b/rclcpp/CMakeLists.txt @@ -191,7 +191,7 @@ endif() target_include_directories(${PROJECT_NAME} PUBLIC "$" "$" - "$") + "$") target_link_libraries(${PROJECT_NAME} ${CMAKE_THREAD_LIBS_INIT}) # specific order: dependents before dependencies ament_target_dependencies(${PROJECT_NAME} @@ -221,11 +221,14 @@ install( RUNTIME DESTINATION bin ) -# specific order: dependents before dependencies -ament_export_include_directories(include) +# Export old-style CMake variables +ament_export_include_directories("include/${PROJECT_NAME}") ament_export_libraries(${PROJECT_NAME}) + +# Export modern CMake targets ament_export_targets(${PROJECT_NAME}) +# specific order: dependents before dependencies ament_export_dependencies(ament_index_cpp) ament_export_dependencies(libstatistics_collector) ament_export_dependencies(rcl) @@ -251,7 +254,7 @@ ament_package() install( DIRECTORY include/ ${CMAKE_CURRENT_BINARY_DIR}/include/ - DESTINATION include + DESTINATION include/${PROJECT_NAME} ) if(TEST cppcheck) @@ -259,4 +262,4 @@ if(TEST cppcheck) set_tests_properties(cppcheck PROPERTIES TIMEOUT 500) endif() -ament_cmake_gen_version_h() +ament_generate_version_header(${PROJECT_NAME}) diff --git a/rclcpp/include/rclcpp/callback_group.hpp b/rclcpp/include/rclcpp/callback_group.hpp index efa8352206..94bceced81 100644 --- a/rclcpp/include/rclcpp/callback_group.hpp +++ b/rclcpp/include/rclcpp/callback_group.hpp @@ -138,6 +138,14 @@ class CallbackGroup const CallbackGroupType & type() const; + RCLCPP_PUBLIC + void collect_all_ptrs( + std::function sub_func, + std::function service_func, + std::function client_func, + std::function timer_func, + std::function waitable_func) const; + /// Return a reference to the 'associated with executor' atomic boolean. /** * When a callback group is added to an executor this boolean is checked diff --git a/rclcpp/include/rclcpp/client.hpp b/rclcpp/include/rclcpp/client.hpp index a1deef73f6..e169b890bb 100644 --- a/rclcpp/include/rclcpp/client.hpp +++ b/rclcpp/include/rclcpp/client.hpp @@ -273,7 +273,7 @@ class ClientBase } }; - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has @@ -295,7 +295,7 @@ class ClientBase void clear_on_new_response_callback() { - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); if (on_new_response_callback_) { set_on_new_response_callback(nullptr, nullptr); on_new_response_callback_ = nullptr; @@ -330,7 +330,7 @@ class ClientBase std::atomic in_use_by_wait_set_{false}; - std::recursive_mutex listener_mutex_; + std::recursive_mutex callback_mutex_; std::function on_new_response_callback_{nullptr}; }; diff --git a/rclcpp/include/rclcpp/create_publisher.hpp b/rclcpp/include/rclcpp/create_publisher.hpp index c633c74e2e..f6088a33c3 100644 --- a/rclcpp/include/rclcpp/create_publisher.hpp +++ b/rclcpp/include/rclcpp/create_publisher.hpp @@ -92,7 +92,7 @@ template< typename NodeT> std::shared_ptr create_publisher( - NodeT & node, + NodeT && node, const std::string & topic_name, const rclcpp::QoS & qos, const rclcpp::PublisherOptionsWithAllocator & options = ( diff --git a/rclcpp/include/rclcpp/detail/cpp_callback_trampoline.hpp b/rclcpp/include/rclcpp/detail/cpp_callback_trampoline.hpp index 7ea991c0de..be857e1b58 100644 --- a/rclcpp/include/rclcpp/detail/cpp_callback_trampoline.hpp +++ b/rclcpp/include/rclcpp/detail/cpp_callback_trampoline.hpp @@ -38,7 +38,7 @@ namespace detail * captures, and various kinds of std::bind instances. * * The interior of this function is likely to be executed within a C runtime, - * so no exceptins should be thrown at this point, and doing so results in + * so no exceptions should be thrown at this point, and doing so results in * undefined behavior. * * \tparam UserDataT Deduced type based on what is passed for user data, diff --git a/rclcpp/include/rclcpp/executor.hpp b/rclcpp/include/rclcpp/executor.hpp index f3f6e9bfe4..a2dd124ff0 100644 --- a/rclcpp/include/rclcpp/executor.hpp +++ b/rclcpp/include/rclcpp/executor.hpp @@ -305,7 +305,9 @@ class Executor * If the time that waitables take to be executed is longer than the period on which new waitables * become ready, this method will execute work repeatedly until `max_duration` has elapsed. * - * \param[in] max_duration The maximum amount of time to spend executing work. Must be positive. + * \param[in] max_duration The maximum amount of time to spend executing work, must be >= 0. + * `0` is potentially block forever until no more work is available. + * \throw throw std::invalid_argument if max_duration is less than 0. * Note that spin_all() may take longer than this time as it only returns once max_duration has * been exceeded. */ diff --git a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp index ea742a05e6..f4e3e7c224 100644 --- a/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp +++ b/rclcpp/include/rclcpp/executors/static_single_threaded_executor.hpp @@ -99,9 +99,10 @@ class StaticSingleThreadedExecutor : public rclcpp::Executor /// Static executor implementation of spin all /** - * This non-blocking function will execute entities until - * timeout or no more work available. If new entities get ready - * while executing work available, they will be executed + * This non-blocking function will execute entities until timeout (must be >= 0) + * or no more work available. + * If timeout is `0`, potentially it blocks forever until no more work is available. + * If new entities get ready while executing work available, they will be executed * as long as the timeout hasn't expired. * * Example: diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp index db36690788..e16ac2ecef 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_base.hpp @@ -103,7 +103,7 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable * The application should provide a generic callback function that will be then * forwarded by the waitable to all of its entities. * Before forwarding, a different value for the identifier argument will be - * bond to the function. + * bound to the function. * This implies that the provided callback can use the identifier to behave * differently depending on which entity triggered the waitable to become ready. * @@ -149,11 +149,11 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable } }; - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); on_new_message_callback_ = new_callback; if (unread_count_ > 0) { - if (qos_profile_.history() == HistoryPolicy::KeepLast || qos_profile_.depth() == 0) { + if (qos_profile_.history() == HistoryPolicy::KeepAll) { on_new_message_callback_(unread_count_); } else { // Use qos profile depth as upper bound for unread_count_ @@ -167,12 +167,12 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable void clear_on_ready_callback() override { - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); on_new_message_callback_ = nullptr; } protected: - std::recursive_mutex listener_mutex_; + std::recursive_mutex callback_mutex_; std::function on_new_message_callback_ {nullptr}; size_t unread_count_{0}; rclcpp::GuardCondition gc_; @@ -180,6 +180,17 @@ class SubscriptionIntraProcessBase : public rclcpp::Waitable virtual void trigger_guard_condition() = 0; + void + invoke_on_new_message() + { + std::lock_guard lock(this->callback_mutex_); + if (this->on_new_message_callback_) { + this->on_new_message_callback_(1); + } else { + this->unread_count_++; + } + } + private: std::string topic_name_; QoS qos_profile_; diff --git a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp index 3f7dc48fa6..9fcbc043dc 100644 --- a/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp +++ b/rclcpp/include/rclcpp/experimental/subscription_intra_process_buffer.hpp @@ -125,7 +125,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff buffer_->add_shared(convert_ros_message_to_subscribed_type_unique_ptr(*message)); trigger_guard_condition(); } - invoke_on_new_message_listener(); + this->invoke_on_new_message(); } void @@ -138,7 +138,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff buffer_->add_unique(convert_ros_message_to_subscribed_type_unique_ptr(*message)); trigger_guard_condition(); } - invoke_on_new_message_listener(); + this->invoke_on_new_message(); } void @@ -146,7 +146,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff { buffer_->add_shared(std::move(message)); trigger_guard_condition(); - invoke_on_new_message_listener(); + this->invoke_on_new_message(); } void @@ -154,7 +154,7 @@ class SubscriptionIntraProcessBuffer : public SubscriptionROSMsgIntraProcessBuff { buffer_->add_unique(std::move(message)); trigger_guard_condition(); - invoke_on_new_message_listener(); + this->invoke_on_new_message(); } bool diff --git a/rclcpp/include/rclcpp/memory_strategy.hpp b/rclcpp/include/rclcpp/memory_strategy.hpp index 0993c223c9..fb5ba2a63f 100644 --- a/rclcpp/include/rclcpp/memory_strategy.hpp +++ b/rclcpp/include/rclcpp/memory_strategy.hpp @@ -100,52 +100,52 @@ class RCLCPP_PUBLIC MemoryStrategy static rclcpp::SubscriptionBase::SharedPtr get_subscription_by_handle( - std::shared_ptr subscriber_handle, + const std::shared_ptr & subscriber_handle, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::ServiceBase::SharedPtr get_service_by_handle( - std::shared_ptr service_handle, + const std::shared_ptr & service_handle, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::ClientBase::SharedPtr get_client_by_handle( - std::shared_ptr client_handle, + const std::shared_ptr & client_handle, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::TimerBase::SharedPtr get_timer_by_handle( - std::shared_ptr timer_handle, + const std::shared_ptr & timer_handle, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::node_interfaces::NodeBaseInterface::SharedPtr get_node_by_group( - rclcpp::CallbackGroup::SharedPtr group, + const rclcpp::CallbackGroup::SharedPtr & group, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_subscription( - rclcpp::SubscriptionBase::SharedPtr subscription, + const rclcpp::SubscriptionBase::SharedPtr & subscription, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_service( - rclcpp::ServiceBase::SharedPtr service, + const rclcpp::ServiceBase::SharedPtr & service, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_client( - rclcpp::ClientBase::SharedPtr client, + const rclcpp::ClientBase::SharedPtr & client, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_timer( - rclcpp::TimerBase::SharedPtr timer, + const rclcpp::TimerBase::SharedPtr & timer, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); static rclcpp::CallbackGroup::SharedPtr get_group_by_waitable( - rclcpp::Waitable::SharedPtr waitable, + const rclcpp::Waitable::SharedPtr & waitable, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes); }; diff --git a/rclcpp/include/rclcpp/qos_event.hpp b/rclcpp/include/rclcpp/qos_event.hpp index c25789b86b..9d6f0964de 100644 --- a/rclcpp/include/rclcpp/qos_event.hpp +++ b/rclcpp/include/rclcpp/qos_event.hpp @@ -179,7 +179,7 @@ class QOSEventHandlerBase : public Waitable } }; - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has @@ -201,7 +201,7 @@ class QOSEventHandlerBase : public Waitable void clear_on_ready_callback() override { - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); if (on_new_event_callback_) { set_on_new_event_callback(nullptr, nullptr); on_new_event_callback_ = nullptr; @@ -215,7 +215,7 @@ class QOSEventHandlerBase : public Waitable rcl_event_t event_handle_; size_t wait_set_event_index_; - std::recursive_mutex listener_mutex_; + std::recursive_mutex callback_mutex_; std::function on_new_event_callback_{nullptr}; }; diff --git a/rclcpp/include/rclcpp/service.hpp b/rclcpp/include/rclcpp/service.hpp index f1f8484436..47704a4c09 100644 --- a/rclcpp/include/rclcpp/service.hpp +++ b/rclcpp/include/rclcpp/service.hpp @@ -183,7 +183,7 @@ class ServiceBase } }; - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has @@ -205,7 +205,7 @@ class ServiceBase void clear_on_new_request_callback() { - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); if (on_new_request_callback_) { set_on_new_request_callback(nullptr, nullptr); on_new_request_callback_ = nullptr; @@ -236,7 +236,7 @@ class ServiceBase std::atomic in_use_by_wait_set_{false}; - std::recursive_mutex listener_mutex_; + std::recursive_mutex callback_mutex_; std::function on_new_request_callback_{nullptr}; }; diff --git a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp index d0bb9c7df2..88698179d4 100644 --- a/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp +++ b/rclcpp/include/rclcpp/strategies/allocator_memory_strategy.hpp @@ -164,30 +164,22 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy if (!group || !group->can_be_taken_from().load()) { continue; } - group->find_subscription_ptrs_if( + + group->collect_all_ptrs( [this](const rclcpp::SubscriptionBase::SharedPtr & subscription) { subscription_handles_.push_back(subscription->get_subscription_handle()); - return false; - }); - group->find_service_ptrs_if( + }, [this](const rclcpp::ServiceBase::SharedPtr & service) { service_handles_.push_back(service->get_service_handle()); - return false; - }); - group->find_client_ptrs_if( + }, [this](const rclcpp::ClientBase::SharedPtr & client) { client_handles_.push_back(client->get_client_handle()); - return false; - }); - group->find_timer_ptrs_if( + }, [this](const rclcpp::TimerBase::SharedPtr & timer) { timer_handles_.push_back(timer->get_timer_handle()); - return false; - }); - group->find_waitable_ptrs_if( + }, [this](const rclcpp::Waitable::SharedPtr & waitable) { waitable_handles_.push_back(waitable); - return false; }); } @@ -204,7 +196,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy bool add_handles_to_wait_set(rcl_wait_set_t * wait_set) override { - for (auto subscription : subscription_handles_) { + for (const std::shared_ptr & subscription : subscription_handles_) { if (rcl_wait_set_add_subscription(wait_set, subscription.get(), NULL) != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", @@ -213,7 +205,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } } - for (auto client : client_handles_) { + for (const std::shared_ptr & client : client_handles_) { if (rcl_wait_set_add_client(wait_set, client.get(), NULL) != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", @@ -222,7 +214,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } } - for (auto service : service_handles_) { + for (const std::shared_ptr & service : service_handles_) { if (rcl_wait_set_add_service(wait_set, service.get(), NULL) != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", @@ -231,7 +223,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy } } - for (auto timer : timer_handles_) { + for (const std::shared_ptr & timer : timer_handles_) { if (rcl_wait_set_add_timer(wait_set, timer.get(), NULL) != RCL_RET_OK) { RCUTILS_LOG_ERROR_NAMED( "rclcpp", @@ -244,7 +236,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy detail::add_guard_condition_to_rcl_wait_set(*wait_set, *guard_condition); } - for (auto waitable : waitable_handles_) { + for (const std::shared_ptr & waitable : waitable_handles_) { waitable->add_to_wait_set(wait_set); } return true; @@ -402,7 +394,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy { auto it = waitable_handles_.begin(); while (it != waitable_handles_.end()) { - auto waitable = *it; + std::shared_ptr & waitable = *it; if (waitable) { // Find the group for this handle and see if it can be serviced auto group = get_group_by_waitable(waitable, weak_groups_to_nodes); @@ -438,7 +430,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy size_t number_of_ready_subscriptions() const override { size_t number_of_subscriptions = subscription_handles_.size(); - for (auto waitable : waitable_handles_) { + for (const std::shared_ptr & waitable : waitable_handles_) { number_of_subscriptions += waitable->get_number_of_ready_subscriptions(); } return number_of_subscriptions; @@ -447,7 +439,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy size_t number_of_ready_services() const override { size_t number_of_services = service_handles_.size(); - for (auto waitable : waitable_handles_) { + for (const std::shared_ptr & waitable : waitable_handles_) { number_of_services += waitable->get_number_of_ready_services(); } return number_of_services; @@ -456,7 +448,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy size_t number_of_ready_events() const override { size_t number_of_events = 0; - for (auto waitable : waitable_handles_) { + for (const std::shared_ptr & waitable : waitable_handles_) { number_of_events += waitable->get_number_of_ready_events(); } return number_of_events; @@ -465,7 +457,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy size_t number_of_ready_clients() const override { size_t number_of_clients = client_handles_.size(); - for (auto waitable : waitable_handles_) { + for (const std::shared_ptr & waitable : waitable_handles_) { number_of_clients += waitable->get_number_of_ready_clients(); } return number_of_clients; @@ -474,7 +466,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy size_t number_of_guard_conditions() const override { size_t number_of_guard_conditions = guard_conditions_.size(); - for (auto waitable : waitable_handles_) { + for (const std::shared_ptr & waitable : waitable_handles_) { number_of_guard_conditions += waitable->get_number_of_ready_guard_conditions(); } return number_of_guard_conditions; @@ -483,7 +475,7 @@ class AllocatorMemoryStrategy : public memory_strategy::MemoryStrategy size_t number_of_ready_timers() const override { size_t number_of_timers = timer_handles_.size(); - for (auto waitable : waitable_handles_) { + for (const std::shared_ptr & waitable : waitable_handles_) { number_of_timers += waitable->get_number_of_ready_timers(); } return number_of_timers; diff --git a/rclcpp/include/rclcpp/subscription_base.hpp b/rclcpp/include/rclcpp/subscription_base.hpp index eff4b64052..26650cd121 100644 --- a/rclcpp/include/rclcpp/subscription_base.hpp +++ b/rclcpp/include/rclcpp/subscription_base.hpp @@ -341,7 +341,7 @@ class SubscriptionBase : public std::enable_shared_from_this } }; - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has @@ -363,7 +363,7 @@ class SubscriptionBase : public std::enable_shared_from_this void clear_on_new_message_callback() { - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callback_mutex_); if (on_new_message_callback_) { set_on_new_message_callback(nullptr, nullptr); @@ -545,7 +545,7 @@ class SubscriptionBase : public std::enable_shared_from_this std::unordered_map> qos_events_in_use_by_wait_set_; - std::recursive_mutex listener_mutex_; + std::recursive_mutex callback_mutex_; std::function on_new_message_callback_{nullptr}; }; diff --git a/rclcpp/include/rclcpp/timer.hpp b/rclcpp/include/rclcpp/timer.hpp index 52df423d83..8bfab30eef 100644 --- a/rclcpp/include/rclcpp/timer.hpp +++ b/rclcpp/include/rclcpp/timer.hpp @@ -113,7 +113,8 @@ class TimerBase /// Check how long the timer has until its next scheduled callback. /** - * \return A std::chrono::duration representing the relative time until the next callback. + * \return A std::chrono::duration representing the relative time until the next callback + * or std::chrono::nanoseconds::max() if the timer is canceled. * \throws std::runtime_error if the rcl_timer_get_time_until_next_call returns a failure */ RCLCPP_PUBLIC @@ -148,11 +149,40 @@ class TimerBase bool exchange_in_use_by_wait_set_state(bool in_use_state); + /// Set a callback to be called when the timer is reset + /** + * You should aim to make this callback fast and not blocking. + * If you need to do a lot of work or wait for some other event, you should + * spin it off to another thread. + * + * Calling it again will override any previously set callback. + * + * This function is thread-safe. + * + * If you want more information available in the callback, + * you may use a lambda with captures or std::bind. + * + * \param[in] callback functor to be called whenever timer is reset + */ + void + set_on_reset_callback(std::function callback); + + /// Unset the callback registered for reset timer + void + clear_on_reset_callback(); + protected: Clock::SharedPtr clock_; std::shared_ptr timer_handle_; std::atomic in_use_by_wait_set_{false}; + + RCLCPP_PUBLIC + void + set_on_reset_callback(rcl_event_callback_t callback, const void * user_data); + + std::recursive_mutex callback_mutex_; + std::function on_reset_callback_{nullptr}; }; diff --git a/rclcpp/include/rclcpp/timers_manager.hpp b/rclcpp/include/rclcpp/timers_manager.hpp index 37228a7fc0..81cbdc9a56 100644 --- a/rclcpp/include/rclcpp/timers_manager.hpp +++ b/rclcpp/include/rclcpp/timers_manager.hpp @@ -441,6 +441,13 @@ class TimersManager std::make_heap(owned_heap_.begin(), owned_heap_.end(), timer_greater); } + void clear_callbacks() + { + for (TimerPtr & t : owned_heap_) { + t->clear_on_reset_callback(); + } + } + /** * @brief Friend declaration to allow the `validate_and_lock()` function to access the * underlying heap container diff --git a/rclcpp/include/rclcpp/wait_for_message.hpp b/rclcpp/include/rclcpp/wait_for_message.hpp index 20c9df4db6..25c45ad782 100644 --- a/rclcpp/include/rclcpp/wait_for_message.hpp +++ b/rclcpp/include/rclcpp/wait_for_message.hpp @@ -18,6 +18,8 @@ #include #include +#include "rcpputils/scope_exit.hpp" + #include "rclcpp/node.hpp" #include "rclcpp/visibility_control.hpp" #include "rclcpp/wait_set.hpp" @@ -54,6 +56,7 @@ bool wait_for_message( rclcpp::WaitSet wait_set; wait_set.add_subscription(subscription); + RCPPUTILS_SCOPE_EXIT(wait_set.remove_subscription(subscription); ); wait_set.add_guard_condition(gc); auto ret = wait_set.wait(time_to_wait); if (ret.kind() != rclcpp::WaitResultKind::Ready) { diff --git a/rclcpp/package.xml b/rclcpp/package.xml index 52cd61807b..43f6ae2527 100644 --- a/rclcpp/package.xml +++ b/rclcpp/package.xml @@ -2,7 +2,7 @@ rclcpp - 15.0.0 + 15.1.0 The ROS client library in C++. Ivan Paunovic Jacob Perron diff --git a/rclcpp/src/rclcpp/callback_group.cpp b/rclcpp/src/rclcpp/callback_group.cpp index 6b38578ba9..4b11156cf9 100644 --- a/rclcpp/src/rclcpp/callback_group.cpp +++ b/rclcpp/src/rclcpp/callback_group.cpp @@ -40,6 +40,51 @@ CallbackGroup::type() const return type_; } +void CallbackGroup::collect_all_ptrs( + std::function sub_func, + std::function service_func, + std::function client_func, + std::function timer_func, + std::function waitable_func) const +{ + std::lock_guard lock(mutex_); + + for (const rclcpp::SubscriptionBase::WeakPtr & weak_ptr : subscription_ptrs_) { + rclcpp::SubscriptionBase::SharedPtr ref_ptr = weak_ptr.lock(); + if (ref_ptr) { + sub_func(ref_ptr); + } + } + + for (const rclcpp::ServiceBase::WeakPtr & weak_ptr : service_ptrs_) { + rclcpp::ServiceBase::SharedPtr ref_ptr = weak_ptr.lock(); + if (ref_ptr) { + service_func(ref_ptr); + } + } + + for (const rclcpp::ClientBase::WeakPtr & weak_ptr : client_ptrs_) { + rclcpp::ClientBase::SharedPtr ref_ptr = weak_ptr.lock(); + if (ref_ptr) { + client_func(ref_ptr); + } + } + + for (const rclcpp::TimerBase::WeakPtr & weak_ptr : timer_ptrs_) { + rclcpp::TimerBase::SharedPtr ref_ptr = weak_ptr.lock(); + if (ref_ptr) { + timer_func(ref_ptr); + } + } + + for (const rclcpp::Waitable::WeakPtr & weak_ptr : waitable_ptrs_) { + rclcpp::Waitable::SharedPtr ref_ptr = weak_ptr.lock(); + if (ref_ptr) { + waitable_func(ref_ptr); + } + } +} + std::atomic_bool & CallbackGroup::get_associated_with_executor_atomic() { diff --git a/rclcpp/src/rclcpp/executor.cpp b/rclcpp/src/rclcpp/executor.cpp index 6ed3ace557..73b7b8d80d 100644 --- a/rclcpp/src/rclcpp/executor.cpp +++ b/rclcpp/src/rclcpp/executor.cpp @@ -414,8 +414,8 @@ void Executor::spin_some(std::chrono::nanoseconds max_duration) void Executor::spin_all(std::chrono::nanoseconds max_duration) { - if (max_duration <= 0ns) { - throw std::invalid_argument("max_duration must be positive"); + if (max_duration < 0ns) { + throw std::invalid_argument("max_duration must be greater than or equal to 0"); } return this->spin_some_impl(max_duration, true); } diff --git a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp index da30945a71..ae80f3ca5d 100644 --- a/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp +++ b/rclcpp/src/rclcpp/executors/static_single_threaded_executor.cpp @@ -72,8 +72,8 @@ StaticSingleThreadedExecutor::spin_some(std::chrono::nanoseconds max_duration) void StaticSingleThreadedExecutor::spin_all(std::chrono::nanoseconds max_duration) { - if (max_duration <= std::chrono::nanoseconds(0)) { - throw std::invalid_argument("max_duration must be positive"); + if (max_duration < std::chrono::nanoseconds(0)) { + throw std::invalid_argument("max_duration must be greater than or equal to 0"); } return this->spin_some_impl(max_duration, true); } diff --git a/rclcpp/src/rclcpp/memory_strategy.cpp b/rclcpp/src/rclcpp/memory_strategy.cpp index bd32bdb169..cb69dc0d26 100644 --- a/rclcpp/src/rclcpp/memory_strategy.cpp +++ b/rclcpp/src/rclcpp/memory_strategy.cpp @@ -19,7 +19,7 @@ using rclcpp::memory_strategy::MemoryStrategy; rclcpp::SubscriptionBase::SharedPtr MemoryStrategy::get_subscription_by_handle( - std::shared_ptr subscriber_handle, + const std::shared_ptr & subscriber_handle, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { for (const auto & pair : weak_groups_to_nodes) { @@ -40,7 +40,7 @@ MemoryStrategy::get_subscription_by_handle( rclcpp::ServiceBase::SharedPtr MemoryStrategy::get_service_by_handle( - std::shared_ptr service_handle, + const std::shared_ptr & service_handle, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { for (const auto & pair : weak_groups_to_nodes) { @@ -61,7 +61,7 @@ MemoryStrategy::get_service_by_handle( rclcpp::ClientBase::SharedPtr MemoryStrategy::get_client_by_handle( - std::shared_ptr client_handle, + const std::shared_ptr & client_handle, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { for (const auto & pair : weak_groups_to_nodes) { @@ -82,7 +82,7 @@ MemoryStrategy::get_client_by_handle( rclcpp::TimerBase::SharedPtr MemoryStrategy::get_timer_by_handle( - std::shared_ptr timer_handle, + const std::shared_ptr & timer_handle, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { for (const auto & pair : weak_groups_to_nodes) { @@ -103,7 +103,7 @@ MemoryStrategy::get_timer_by_handle( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr MemoryStrategy::get_node_by_group( - rclcpp::CallbackGroup::SharedPtr group, + const rclcpp::CallbackGroup::SharedPtr & group, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { if (!group) { @@ -121,7 +121,7 @@ MemoryStrategy::get_node_by_group( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_subscription( - rclcpp::SubscriptionBase::SharedPtr subscription, + const rclcpp::SubscriptionBase::SharedPtr & subscription, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { for (const auto & pair : weak_groups_to_nodes) { @@ -143,7 +143,7 @@ MemoryStrategy::get_group_by_subscription( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_service( - rclcpp::ServiceBase::SharedPtr service, + const rclcpp::ServiceBase::SharedPtr & service, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { for (const auto & pair : weak_groups_to_nodes) { @@ -165,7 +165,7 @@ MemoryStrategy::get_group_by_service( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_client( - rclcpp::ClientBase::SharedPtr client, + const rclcpp::ClientBase::SharedPtr & client, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { for (const auto & pair : weak_groups_to_nodes) { @@ -187,7 +187,7 @@ MemoryStrategy::get_group_by_client( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_timer( - rclcpp::TimerBase::SharedPtr timer, + const rclcpp::TimerBase::SharedPtr & timer, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { for (const auto & pair : weak_groups_to_nodes) { @@ -209,7 +209,7 @@ MemoryStrategy::get_group_by_timer( rclcpp::CallbackGroup::SharedPtr MemoryStrategy::get_group_by_waitable( - rclcpp::Waitable::SharedPtr waitable, + const rclcpp::Waitable::SharedPtr & waitable, const WeakCallbackGroupsToNodesMap & weak_groups_to_nodes) { for (const auto & pair : weak_groups_to_nodes) { diff --git a/rclcpp/src/rclcpp/timer.cpp b/rclcpp/src/rclcpp/timer.cpp index 70414f38c6..e29a8ac2b3 100644 --- a/rclcpp/src/rclcpp/timer.cpp +++ b/rclcpp/src/rclcpp/timer.cpp @@ -19,9 +19,12 @@ #include #include +#include "rmw/impl/cpp/demangle.hpp" + #include "rclcpp/contexts/default_context.hpp" +#include "rclcpp/detail/cpp_callback_trampoline.hpp" #include "rclcpp/exceptions.hpp" - +#include "rclcpp/logging.hpp" #include "rcutils/logging_macros.h" using rclcpp::TimerBase; @@ -71,7 +74,9 @@ TimerBase::TimerBase( } TimerBase::~TimerBase() -{} +{ + clear_on_reset_callback(); +} void TimerBase::cancel() @@ -116,10 +121,21 @@ TimerBase::is_ready() std::chrono::nanoseconds TimerBase::time_until_trigger() { + bool is_canceled = false; + rcl_ret_t ret = rcl_timer_is_canceled(timer_handle_.get(), &is_canceled); + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "Couldn't get timer cancelled state"); + } + if (is_canceled) { + return std::chrono::nanoseconds::max(); + } + int64_t time_until_next_call = 0; - rcl_ret_t ret = rcl_timer_get_time_until_next_call( + ret = rcl_timer_get_time_until_next_call( timer_handle_.get(), &time_until_next_call); - if (ret != RCL_RET_OK) { + if (ret == RCL_RET_TIMER_CANCELED) { + return std::chrono::nanoseconds::max(); + } else if (ret != RCL_RET_OK) { rclcpp::exceptions::throw_from_rcl_error(ret, "Timer could not get time until next call"); } return std::chrono::nanoseconds(time_until_next_call); @@ -136,3 +152,73 @@ TimerBase::exchange_in_use_by_wait_set_state(bool in_use_state) { return in_use_by_wait_set_.exchange(in_use_state); } + +void +TimerBase::set_on_reset_callback(std::function callback) +{ + if (!callback) { + throw std::invalid_argument( + "The callback passed to set_on_reset_callback " + "is not callable."); + } + + auto new_callback = + [callback, this](size_t reset_calls) { + try { + (void)reset_calls; + callback(); + } catch (const std::exception & exception) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::TimerBase@" << this << + " caught " << rmw::impl::cpp::demangle(exception) << + " exception in user-provided callback for the 'on reset' callback: " << + exception.what()); + } catch (...) { + RCLCPP_ERROR_STREAM( + rclcpp::get_logger("rclcpp"), + "rclcpp::TimerBase@" << this << + " caught unhandled exception in user-provided callback " << + "for the 'on reset' callback"); + } + }; + + std::lock_guard lock(callback_mutex_); + + // Set it temporarily to the new callback, while we replace the old one. + // This two-step setting, prevents a gap where the old std::function has + // been replaced but rcl hasn't been told about the new one yet. + set_on_reset_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&new_callback)); + + // Store the std::function to keep it in scope, also overwrites the existing one. + on_reset_callback_ = new_callback; + + // Set it again, now using the permanent storage. + set_on_reset_callback( + rclcpp::detail::cpp_callback_trampoline, + static_cast(&on_reset_callback_)); +} + +void +TimerBase::clear_on_reset_callback() +{ + std::lock_guard lock(callback_mutex_); + + if (on_reset_callback_) { + set_on_reset_callback(nullptr, nullptr); + on_reset_callback_ = nullptr; + } +} + +void +TimerBase::set_on_reset_callback(rcl_event_callback_t callback, const void * user_data) +{ + rcl_guard_condition_t * timer_gc = rcl_timer_get_guard_condition(timer_handle_.get()); + rcl_ret_t ret = rcl_guard_condition_set_on_trigger_callback(timer_gc, callback, user_data); + + if (ret != RCL_RET_OK) { + rclcpp::exceptions::throw_from_rcl_error(ret, "Failed to set on reset callback"); + } +} diff --git a/rclcpp/src/rclcpp/timers_manager.cpp b/rclcpp/src/rclcpp/timers_manager.cpp index a25fe66b87..38ab311e77 100644 --- a/rclcpp/src/rclcpp/timers_manager.cpp +++ b/rclcpp/src/rclcpp/timers_manager.cpp @@ -49,6 +49,14 @@ void TimersManager::add_timer(rclcpp::TimerBase::SharedPtr timer) timers_updated_ = timers_updated_ || added; } + timer->set_on_reset_callback([this](){ + { + std::unique_lock lock(timers_mutex_); + timers_updated_ = true; + } + timers_cv_.notify_one(); + }); + if (added) { // Notify that a timer has been added timers_cv_.notify_one(); @@ -268,6 +276,9 @@ void TimersManager::clear() { // Lock mutex and then clear all data structures std::unique_lock lock(timers_mutex_); + + TimersHeap locked_heap = weak_timers_heap_.validate_and_lock(); + locked_heap.clear_callbacks(); weak_timers_heap_.clear(); timers_updated_ = true; @@ -291,4 +302,5 @@ void TimersManager::remove_timer(TimerPtr timer) // Notify timers thread such that it can re-compute its timeout timers_cv_.notify_one(); } + timer->clear_on_reset_callback(); } diff --git a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp index c068e8f640..e1b916e9c0 100644 --- a/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp +++ b/rclcpp/test/rclcpp/strategies/test_allocator_memory_strategy.cpp @@ -497,14 +497,8 @@ TEST_F(TestAllocatorMemoryStrategy, add_remove_waitables) { TEST_F(TestAllocatorMemoryStrategy, number_of_entities_with_subscription) { RclWaitSetSizes expected_sizes = {}; expected_sizes.size_of_subscriptions = 1; - const std::string implementation_identifier = rmw_get_implementation_identifier(); - if (implementation_identifier == "rmw_cyclonedds_cpp" || - implementation_identifier == "rmw_connextdds") - { - // For cyclonedds and connext, a subscription will also add an event and waitable - expected_sizes.size_of_events += 1; - expected_sizes.size_of_waitables += 1; - } + expected_sizes.size_of_events = 1; + expected_sizes.size_of_waitables = 1; auto node_with_subscription = create_node_with_subscription("subscription_node"); EXPECT_TRUE(TestNumberOfEntitiesAfterCollection(node_with_subscription, expected_sizes)); } diff --git a/rclcpp/test/rclcpp/test_executor.cpp b/rclcpp/test/rclcpp/test_executor.cpp index 0b007d519b..bdbb0a1079 100644 --- a/rclcpp/test/rclcpp/test_executor.cpp +++ b/rclcpp/test/rclcpp/test_executor.cpp @@ -248,7 +248,7 @@ TEST_F(TestExecutor, spin_all_invalid_duration) { RCLCPP_EXPECT_THROW_EQ( dummy.spin_all(std::chrono::nanoseconds(-1)), - std::invalid_argument("max_duration must be positive")); + std::invalid_argument("max_duration must be greater than or equal to 0")); } TEST_F(TestExecutor, spin_some_in_spin_some) { diff --git a/rclcpp/test/rclcpp/test_publisher.cpp b/rclcpp/test/rclcpp/test_publisher.cpp index 1509e3f487..6671fa1176 100644 --- a/rclcpp/test/rclcpp/test_publisher.cpp +++ b/rclcpp/test/rclcpp/test_publisher.cpp @@ -156,6 +156,17 @@ TEST_F(TestPublisher, various_creation_signatures) { rclcpp::create_publisher(node, "topic", 42, options); (void)publisher; } + { + auto publisher = rclcpp::create_publisher( + node->get_node_topics_interface(), "topic", 42, rclcpp::PublisherOptions()); + (void)publisher; + } + { + auto node_topics_interface = node->get_node_topics_interface(); + auto publisher = rclcpp::create_publisher( + node_topics_interface, "topic", 42, rclcpp::PublisherOptions()); + (void)publisher; + } { auto node_parameters = node->get_node_parameters_interface(); auto node_topics = node->get_node_topics_interface(); diff --git a/rclcpp/test/rclcpp/test_qos_event.cpp b/rclcpp/test/rclcpp/test_qos_event.cpp index d7ab0775f5..221e2bdf0a 100644 --- a/rclcpp/test/rclcpp/test_qos_event.cpp +++ b/rclcpp/test/rclcpp/test_qos_event.cpp @@ -39,9 +39,6 @@ class TestQosEvent : public ::testing::Test void SetUp() { - is_fastrtps = - std::string(rmw_get_implementation_identifier()).find("rmw_fastrtps") != std::string::npos; - node = std::make_shared("test_qos_event", "/ns"); message_callback = [node = node.get()](test_msgs::msg::Empty::ConstSharedPtr /*msg*/) { @@ -56,7 +53,6 @@ class TestQosEvent : public ::testing::Test static constexpr char topic_name[] = "test_topic"; rclcpp::Node::SharedPtr node; - bool is_fastrtps; std::function message_callback; }; @@ -103,12 +99,8 @@ TEST_F(TestQosEvent, test_publisher_constructor) "Offered incompatible qos - total %d (delta %d), last_policy_kind: %d", event.total_count, event.total_count_change, event.last_policy_kind); }; - try { - publisher = node->create_publisher( - topic_name, 10, options); - } catch (const rclcpp::UnsupportedEventTypeException & /*exc*/) { - EXPECT_TRUE(is_fastrtps); - } + publisher = node->create_publisher( + topic_name, 10, options); } /* @@ -153,12 +145,8 @@ TEST_F(TestQosEvent, test_subscription_constructor) "Requested incompatible qos - total %d (delta %d), last_policy_kind: %d", event.total_count, event.total_count_change, event.last_policy_kind); }; - try { - subscription = node->create_subscription( - topic_name, 10, message_callback, options); - } catch (const rclcpp::UnsupportedEventTypeException & /*exc*/) { - EXPECT_TRUE(is_fastrtps); - } + subscription = node->create_subscription( + topic_name, 10, message_callback, options); } /* @@ -212,22 +200,17 @@ TEST_F(TestQosEvent, test_default_incompatible_qos_callbacks) ex.add_node(node->get_node_base_interface()); // This future won't complete on fastrtps, so just timeout immediately - const auto timeout = (is_fastrtps) ? std::chrono::milliseconds(5) : std::chrono::seconds(10); + const auto timeout = std::chrono::seconds(10); ex.spin_until_future_complete(log_msgs_future, timeout); - if (is_fastrtps) { - EXPECT_EQ("", pub_log_msg); - EXPECT_EQ("", sub_log_msg); - } else { - EXPECT_EQ( - "New subscription discovered on topic '/ns/test_topic', requesting incompatible QoS. " - "No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY", - pub_log_msg); - EXPECT_EQ( - "New publisher discovered on topic '/ns/test_topic', offering incompatible QoS. " - "No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY", - sub_log_msg); - } + EXPECT_EQ( + "New subscription discovered on topic '/ns/test_topic', requesting incompatible QoS. " + "No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY", + pub_log_msg); + EXPECT_EQ( + "New publisher discovered on topic '/ns/test_topic', offering incompatible QoS. " + "No messages will be sent to it. Last incompatible policy: DURABILITY_QOS_POLICY", + sub_log_msg); rcutils_logging_set_output_handler(original_output_handler); } diff --git a/rclcpp/test/rclcpp/test_timer.cpp b/rclcpp/test/rclcpp/test_timer.cpp index 0928fcaa5d..7a6599dfe4 100644 --- a/rclcpp/test/rclcpp/test_timer.cpp +++ b/rclcpp/test/rclcpp/test_timer.cpp @@ -111,11 +111,13 @@ TEST_F(TestTimer, test_is_canceled_reset) // reset shouldn't affect state (not canceled yet) timer->reset(); + EXPECT_LE(timer->time_until_trigger().count(), std::chrono::nanoseconds::max().count()); EXPECT_FALSE(timer->is_canceled()); // cancel after reset timer->cancel(); EXPECT_TRUE(timer->is_canceled()); + EXPECT_EQ(timer->time_until_trigger().count(), std::chrono::nanoseconds::max().count()); // reset and cancel timer->reset(); diff --git a/rclcpp/test/rclcpp/test_wait_for_message.cpp b/rclcpp/test/rclcpp/test_wait_for_message.cpp index f0071698ca..dadc06f2da 100644 --- a/rclcpp/test/rclcpp/test_wait_for_message.cpp +++ b/rclcpp/test/rclcpp/test_wait_for_message.cpp @@ -72,3 +72,38 @@ TEST(TestUtilities, wait_for_message_indefinitely) { ASSERT_FALSE(received); } + +TEST(TestUtilities, wait_for_message_twice_one_sub) { + rclcpp::init(0, nullptr); + + auto node = std::make_shared("wait_for_message_node3"); + + using MsgT = test_msgs::msg::Strings; + auto pub = node->create_publisher("wait_for_message_topic", 10); + auto sub = node->create_subscription( + "wait_for_message_topic", 1, [](const std::shared_ptr) {}); + + MsgT out1; + MsgT out2; + auto received = false; + auto wait = std::async( + [&]() { + auto ret = rclcpp::wait_for_message(out1, sub, node->get_node_options().context(), 5s); + EXPECT_TRUE(ret); + ret = rclcpp::wait_for_message(out2, sub, node->get_node_options().context(), 5s); + EXPECT_TRUE(ret); + received = true; + }); + + for (auto i = 0u; i < 10 && received == false; ++i) { + pub->publish(*get_messages_strings()[0]); + std::this_thread::sleep_for(1s); + } + + ASSERT_NO_THROW(wait.get()); + ASSERT_TRUE(received); + EXPECT_EQ(out1, *get_messages_strings()[0]); + EXPECT_EQ(out2, *get_messages_strings()[0]); + + rclcpp::shutdown(); +} diff --git a/rclcpp_action/CHANGELOG.rst b/rclcpp_action/CHANGELOG.rst index 8b9312df43..cdb217533e 100644 --- a/rclcpp_action/CHANGELOG.rst +++ b/rclcpp_action/CHANGELOG.rst @@ -3,6 +3,12 @@ Changelog for package rclcpp_action ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +15.1.0 (2022-03-01) +------------------- +* Add RMW listener APIs (`#1579 `_) +* Install headers to include/${PROJECT_NAME} (`#1888 `_) +* Contributors: Shane Loretz, iRobot ROS + 15.0.0 (2022-01-14) ------------------- * Fix include order and relative paths for cpplint (`#1859 `_) diff --git a/rclcpp_action/CMakeLists.txt b/rclcpp_action/CMakeLists.txt index e9bd327643..353e179244 100644 --- a/rclcpp_action/CMakeLists.txt +++ b/rclcpp_action/CMakeLists.txt @@ -34,7 +34,7 @@ add_library(${PROJECT_NAME} target_include_directories(${PROJECT_NAME} PUBLIC "$" - "$") + "$") ament_target_dependencies(${PROJECT_NAME} "action_msgs" @@ -51,7 +51,7 @@ target_compile_definitions(${PROJECT_NAME} install( DIRECTORY include/ - DESTINATION include) + DESTINATION include/${PROJECT_NAME}) install( TARGETS ${PROJECT_NAME} @@ -61,11 +61,14 @@ install( RUNTIME DESTINATION bin ) -# specific order: dependents before dependencies -ament_export_include_directories(include) +# Export old-style CMake variables +ament_export_include_directories("include/${PROJECT_NAME}") ament_export_libraries(${PROJECT_NAME}) + +# Export modern CMake targets ament_export_targets(${PROJECT_NAME}) +# specific order: dependents before dependencies ament_export_dependencies(ament_cmake) ament_export_dependencies(action_msgs) ament_export_dependencies(rclcpp) diff --git a/rclcpp_action/include/rclcpp_action/create_server.hpp b/rclcpp_action/include/rclcpp_action/create_server.hpp index 3aa0e4dea7..2f525da51d 100644 --- a/rclcpp_action/include/rclcpp_action/create_server.hpp +++ b/rclcpp_action/include/rclcpp_action/create_server.hpp @@ -60,6 +60,7 @@ create_server( rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock_interface, rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging_interface, rclcpp::node_interfaces::NodeWaitablesInterface::SharedPtr node_waitables_interface, + rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers_interface, const std::string & name, typename Server::GoalCallback handle_goal, typename Server::CancelCallback handle_cancel, @@ -100,6 +101,7 @@ create_server( node_base_interface, node_clock_interface, node_logging_interface, + node_timers_interface, name, options, handle_goal, @@ -143,6 +145,7 @@ create_server( node->get_node_clock_interface(), node->get_node_logging_interface(), node->get_node_waitables_interface(), + node->get_node_timers_interface(), name, handle_goal, handle_cancel, diff --git a/rclcpp_action/include/rclcpp_action/server.hpp b/rclcpp_action/include/rclcpp_action/server.hpp index 18535b13d4..e896c459e7 100644 --- a/rclcpp_action/include/rclcpp_action/server.hpp +++ b/rclcpp_action/include/rclcpp_action/server.hpp @@ -77,6 +77,7 @@ class ServerBase : public rclcpp::Waitable GoalService, ResultService, CancelService, + ExpireTimer, }; RCLCPP_ACTION_PUBLIC @@ -183,6 +184,7 @@ class ServerBase : public rclcpp::Waitable rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base, rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock, rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging, + rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers, const std::string & name, const rosidl_action_type_support_t * type_support, const rcl_action_server_options_t & options); @@ -312,9 +314,12 @@ class ServerBase : public rclcpp::Waitable protected: // Mutex to protect the callbacks storage. - std::recursive_mutex listener_mutex_; + std::recursive_mutex callbacks_mutex_; // Storage for std::function callbacks to keep them in scope std::unordered_map> entity_type_to_on_ready_callback_; + // Timer for expiring goals + rclcpp::TimerBase::SharedPtr timer_; + std::function timer_expired_callback_{nullptr}; /// Set a callback to be called when the specified entity is ready RCLCPP_ACTION_PUBLIC @@ -381,6 +386,7 @@ class Server : public ServerBase, public std::enable_shared_from_this(), options), diff --git a/rclcpp_action/package.xml b/rclcpp_action/package.xml index 73ec8bfee7..213f81fb7c 100644 --- a/rclcpp_action/package.xml +++ b/rclcpp_action/package.xml @@ -2,7 +2,7 @@ rclcpp_action - 15.0.0 + 15.1.0 Adds action APIs for C++. Ivan Paunovic Jacob Perron diff --git a/rclcpp_action/src/server.cpp b/rclcpp_action/src/server.cpp index 7af05f1e5e..3574bb5c5a 100644 --- a/rclcpp_action/src/server.cpp +++ b/rclcpp_action/src/server.cpp @@ -27,6 +27,7 @@ #include "action_msgs/msg/goal_status_array.hpp" #include "action_msgs/srv/cancel_goal.hpp" +#include "rclcpp/create_timer.hpp" #include "rclcpp/exceptions.hpp" #include "rclcpp_action/server.hpp" @@ -83,6 +84,7 @@ ServerBase::ServerBase( rclcpp::node_interfaces::NodeBaseInterface::SharedPtr node_base, rclcpp::node_interfaces::NodeClockInterface::SharedPtr node_clock, rclcpp::node_interfaces::NodeLoggingInterface::SharedPtr node_logging, + rclcpp::node_interfaces::NodeTimersInterface::SharedPtr node_timers, const std::string & name, const rosidl_action_type_support_t * type_support, const rcl_action_server_options_t & options @@ -110,8 +112,28 @@ ServerBase::ServerBase( rcl_node_t * rcl_node = node_base->get_rcl_node_handle(); rcl_clock_t * rcl_clock = pimpl_->clock_->get_clock_handle(); + timer_ = create_timer( + node_base, + node_timers, + node_clock->get_clock(), + rclcpp::Duration(std::chrono::nanoseconds(options.result_timeout.nanoseconds)), + [this]() { + std::lock_guard lock(callbacks_mutex_); + std::cout << "if (timer_expired_callback_)" << std::endl; + if (timer_expired_callback_) { + std::cout << "calling timer_expired_callback_" << std::endl; + timer_expired_callback_(1); + } + }); + rcl_ret_t ret = rcl_action_server_init( - pimpl_->action_server_.get(), rcl_node, rcl_clock, type_support, name.c_str(), &options); + pimpl_->action_server_.get(), + rcl_node, + rcl_clock, + type_support, + name.c_str(), + &options, + timer_->get_timer_handle().get()); if (RCL_RET_OK != ret) { rclcpp::exceptions::throw_from_rcl_error(ret); @@ -235,6 +257,12 @@ ServerBase::take_data_by_entity_id(int id) break; } + case EntityType::ExpireTimer: + { + pimpl_->goal_expired_ = true; + break; + } + default: throw std::runtime_error("ServerBase::take_data. Unknown entity type."); break; @@ -565,6 +593,7 @@ ServerBase::execute_check_expired_goals() // Allocate expecting only one goal to expire at a time rcl_action_goal_info_t expired_goals[1]; size_t num_expired = 1; + std::cout << "execute_check_expired_goals" << std::endl; // Loop in case more than 1 goal expired while (num_expired > 0u) { @@ -586,6 +615,8 @@ ServerBase::execute_check_expired_goals() pimpl_->goal_handles_.erase(uuid); } } + + pimpl_->goal_expired_ = false; } void @@ -725,6 +756,7 @@ ServerBase::set_on_ready_callback(std::function callback) set_callback_to_entity(EntityType::GoalService, callback); set_callback_to_entity(EntityType::ResultService, callback); set_callback_to_entity(EntityType::CancelService, callback); + set_callback_to_entity(EntityType::ExpireTimer, callback); } void @@ -753,6 +785,11 @@ ServerBase::set_callback_to_entity( } }; + if (entity_type == EntityType::ExpireTimer) { + std::lock_guard lock(callbacks_mutex_); + timer_expired_callback_ = new_callback; + return; + } // Set it temporarily to the new callback, while we replace the old one. // This two-step setting, prevents a gap where the old std::function has @@ -762,7 +799,7 @@ ServerBase::set_callback_to_entity( rclcpp::detail::cpp_callback_trampoline, static_cast(&new_callback)); - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callbacks_mutex_); // Store the std::function to keep it in scope, also overwrites the existing one. auto it = entity_type_to_on_ready_callback_.find(entity_type); @@ -836,12 +873,13 @@ ServerBase::set_on_ready_callback( void ServerBase::clear_on_ready_callback() { - std::lock_guard lock(listener_mutex_); + std::lock_guard lock(callbacks_mutex_); if (on_ready_callback_set_) { set_on_ready_callback(EntityType::GoalService, nullptr, nullptr); set_on_ready_callback(EntityType::ResultService, nullptr, nullptr); set_on_ready_callback(EntityType::CancelService, nullptr, nullptr); + timer_expired_callback_ = nullptr; on_ready_callback_set_ = false; } diff --git a/rclcpp_components/CHANGELOG.rst b/rclcpp_components/CHANGELOG.rst index 6373079eb5..6c4bae3687 100644 --- a/rclcpp_components/CHANGELOG.rst +++ b/rclcpp_components/CHANGELOG.rst @@ -2,6 +2,14 @@ Changelog for package rclcpp_components ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +15.1.0 (2022-03-01) +------------------- +* small improvements to node_main.cpp.in +* Install headers to include/${PROJECT_NAME} (`#1888 `_) +* Use spin() in component_manager_isolated.hpp (`#1881 `_) +* add use_global_arguments for node options of component nodes (`#1776 `_) +* Contributors: Alberto Soragna, Shane Loretz, gezp + 15.0.0 (2022-01-14) ------------------- * Add rclcpp_components::component (`#1855 `_) diff --git a/rclcpp_components/CMakeLists.txt b/rclcpp_components/CMakeLists.txt index c7dec3e65f..0b9cda7803 100644 --- a/rclcpp_components/CMakeLists.txt +++ b/rclcpp_components/CMakeLists.txt @@ -24,7 +24,7 @@ find_package(rcpputils REQUIRED) add_library(component INTERFACE) target_include_directories(component INTERFACE "$" - "$") + "$") target_link_libraries(component INTERFACE class_loader::class_loader rclcpp::rclcpp) @@ -36,7 +36,7 @@ add_library( ) target_include_directories(component_manager PUBLIC "$" - "$") + "$") ament_target_dependencies(component_manager "ament_index_cpp" "class_loader" @@ -159,7 +159,7 @@ install( # Install include directories install( DIRECTORY include/ - DESTINATION include + DESTINATION include/${PROJECT_NAME} ) # Install cmake @@ -168,10 +168,14 @@ install( DESTINATION share/${PROJECT_NAME} ) -# specific order: dependents before dependencies -ament_export_include_directories(include) +# Export old-style CMake variables +ament_export_include_directories("include/${PROJECT_NAME}") ament_export_libraries(component_manager) + +# Export modern CMake targets ament_export_targets(export_${PROJECT_NAME}) + +# specific order: dependents before dependencies ament_export_dependencies(ament_index_cpp) ament_export_dependencies(class_loader) ament_export_dependencies(composition_interfaces) diff --git a/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp b/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp index 8ff9a60665..ea77532312 100644 --- a/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp +++ b/rclcpp_components/include/rclcpp_components/component_manager_isolated.hpp @@ -38,7 +38,6 @@ class ComponentManagerIsolated : public rclcpp_components::ComponentManager { std::shared_ptr executor; std::thread thread; - std::promise promise; }; public: @@ -46,9 +45,7 @@ class ComponentManagerIsolated : public rclcpp_components::ComponentManager { if (node_wrappers_.size()) { for (auto & executor_wrapper : dedicated_executor_wrappers_) { - executor_wrapper.second.promise.set_value(); - executor_wrapper.second.executor->cancel(); - executor_wrapper.second.thread.join(); + cancel_executor(executor_wrapper.second); } node_wrappers_.clear(); } @@ -67,8 +64,8 @@ class ComponentManagerIsolated : public rclcpp_components::ComponentManager exec->add_node(node_wrappers_[node_id].get_node_base_interface()); executor_wrapper.executor = exec; executor_wrapper.thread = std::thread( - [exec, cancel_token = executor_wrapper.promise.get_future()]() { - exec->spin_until_future_complete(cancel_token); + [exec]() { + exec->spin(); }); dedicated_executor_wrappers_[node_id] = std::move(executor_wrapper); } @@ -81,14 +78,36 @@ class ComponentManagerIsolated : public rclcpp_components::ComponentManager { auto executor_wrapper = dedicated_executor_wrappers_.find(node_id); if (executor_wrapper != dedicated_executor_wrappers_.end()) { - executor_wrapper->second.promise.set_value(); - executor_wrapper->second.executor->cancel(); - executor_wrapper->second.thread.join(); + cancel_executor(executor_wrapper->second); dedicated_executor_wrappers_.erase(executor_wrapper); } } private: + /// Stops a spinning executor avoiding race conditions. + /** + * @param executor_wrapper executor to stop and its associated thread + */ + void cancel_executor(DedicatedExecutorWrapper & executor_wrapper) + { + // We can't immediately call the cancel() API on an executor because if it is not + // already spinning, this operation will have no effect. + // We rely on the assumption that this class creates executors and then immediately makes them + // spin in a separate thread, i.e. the time gap between when the executor is created and when + // it starts to spin is small (although it's not negligible). + + while (!executor_wrapper.executor->is_spinning()) { + // This is an arbitrarily small delay to avoid busy looping + rclcpp::sleep_for(std::chrono::milliseconds(1)); + } + + // After the while loop we are sure that the executor is now spinning, so + // the call to cancel() will work. + executor_wrapper.executor->cancel(); + // Wait for the thread task to return + executor_wrapper.thread.join(); + } + std::unordered_map dedicated_executor_wrappers_; }; diff --git a/rclcpp_components/package.xml b/rclcpp_components/package.xml index b1d03eaa51..c5941ac74b 100644 --- a/rclcpp_components/package.xml +++ b/rclcpp_components/package.xml @@ -2,7 +2,7 @@ rclcpp_components - 15.0.0 + 15.1.0 Package containing tools for dynamically loadable components Ivan Paunovic Jacob Perron diff --git a/rclcpp_components/src/node_main.cpp.in b/rclcpp_components/src/node_main.cpp.in index bfebc4aa7d..487331b251 100644 --- a/rclcpp_components/src/node_main.cpp.in +++ b/rclcpp_components/src/node_main.cpp.in @@ -39,11 +39,20 @@ int main(int argc, char * argv[]) RCLCPP_DEBUG(logger, "Load library %s", library_name.c_str()); auto loader = new class_loader::ClassLoader(library_name); auto classes = loader->getAvailableClasses(); - for (auto clazz : classes) { + for (const auto & clazz : classes) { std::string name = clazz.c_str(); - if (!(name.compare(class_name))) { + if (name.compare(class_name) == 0) { RCLCPP_DEBUG(logger, "Instantiate class %s", clazz.c_str()); - auto node_factory = loader->createInstance(clazz); + std::shared_ptr node_factory = nullptr; + try { + node_factory = loader->createInstance(clazz); + } catch (const std::exception & ex) { + RCLCPP_ERROR(logger, "Failed to load library %s", ex.what()); + return 1; + } catch (...) { + RCLCPP_ERROR(logger, "Failed to load library"); + return 1; + } auto wrapper = node_factory->create_node_instance(options); auto node = wrapper.get_node_base_interface(); node_wrappers.push_back(wrapper); diff --git a/rclcpp_lifecycle/CHANGELOG.rst b/rclcpp_lifecycle/CHANGELOG.rst index 3a355582c6..ee0f49a0a1 100644 --- a/rclcpp_lifecycle/CHANGELOG.rst +++ b/rclcpp_lifecycle/CHANGELOG.rst @@ -3,6 +3,12 @@ Changelog for package rclcpp_lifecycle ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ +15.1.0 (2022-03-01) +------------------- +* Install headers to include/${PROJECT_NAME} (`#1888 `_) +* LifecycleNode::on_deactivate deactivate all managed entities. (`#1885 `_) +* Contributors: Shane Loretz, Tomoya Fujita + 15.0.0 (2022-01-14) ------------------- * Automatically transition lifecycle entities when node transitions (`#1863 `_) diff --git a/rclcpp_lifecycle/CMakeLists.txt b/rclcpp_lifecycle/CMakeLists.txt index d635d54133..a823d44a68 100644 --- a/rclcpp_lifecycle/CMakeLists.txt +++ b/rclcpp_lifecycle/CMakeLists.txt @@ -27,7 +27,7 @@ add_library(rclcpp_lifecycle target_include_directories(${PROJECT_NAME} PUBLIC "$" - "$") + "$") # specific order: dependents before dependencies ament_target_dependencies(rclcpp_lifecycle "rclcpp" @@ -155,14 +155,18 @@ if(BUILD_TESTING) endif() endif() -# specific order: dependents before dependencies -ament_export_include_directories(include) +# Export old-style CMake variables +ament_export_include_directories("include/${PROJECT_NAME}") ament_export_libraries(${PROJECT_NAME}) + +# Export modern CMake targets ament_export_targets(${PROJECT_NAME}) + +# specific order: dependents before dependencies ament_export_dependencies(rclcpp) ament_export_dependencies(rcl_lifecycle) ament_export_dependencies(lifecycle_msgs) ament_package() install(DIRECTORY include/ - DESTINATION include) + DESTINATION include/${PROJECT_NAME}) diff --git a/rclcpp_lifecycle/package.xml b/rclcpp_lifecycle/package.xml index e708c8c79a..6a537dfcb4 100644 --- a/rclcpp_lifecycle/package.xml +++ b/rclcpp_lifecycle/package.xml @@ -2,7 +2,7 @@ rclcpp_lifecycle - 15.0.0 + 15.1.0 Package containing a prototype for lifecycle implementation Ivan Paunovic Jacob Perron