diff --git a/rclpy/src/rclpy/events_executor/events_executor.cpp b/rclpy/src/rclpy/events_executor/events_executor.cpp index 570eeefb1..47898f122 100644 --- a/rclpy/src/rclpy/events_executor/events_executor.cpp +++ b/rclpy/src/rclpy/events_executor/events_executor.cpp @@ -182,13 +182,14 @@ py::list EventsExecutor::get_nodes() const {return nodes_;} // specific node and more than one node may be connected to an executor instance. // https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L184-L185 -void EventsExecutor::spin(std::optional timeout_sec) +void EventsExecutor::spin(std::optional timeout_sec, bool stop_after_user_callback) { { std::unique_lock spin_lock(spinning_mutex_, std::try_to_lock); if (!spin_lock) { throw std::runtime_error("Attempt to spin an already-spinning Executor"); } + stop_after_user_callback_ = stop_after_user_callback; // Release the GIL while we block. Any callbacks on the io_context that want to touch Python // will need to reacquire it though. py::gil_scoped_release gil_release; @@ -208,57 +209,26 @@ void EventsExecutor::spin(std::optional timeout_sec) io_context_.restart(); } - CheckForSignals(); -} - -void EventsExecutor::spin_once(std::optional timeout_sec) -{ - { - std::unique_lock spin_lock(spinning_mutex_, std::try_to_lock); - if (!spin_lock) { - throw std::runtime_error("Attempt to spin an already-spinning Executor"); - } - // Release the GIL while we block. Any callbacks on the io_context that want to touch Python - // will need to reacquire it though. - py::gil_scoped_release gil_release; - // Don't let asio auto stop if there's nothing to do - const auto work = asio::make_work_guard(io_context_); - // We can't just use asio run_one*() here, because 'one' asio callback might include some - // internal housekeeping, and not a user-visible callback that was intended when the Executor - // was asked to dispatch 'once'. - ran_user_ = false; - if (timeout_sec) { - const auto timeout_ns = std::chrono::duration_cast( - std::chrono::duration(*timeout_sec)); - const auto end = std::chrono::steady_clock::now() + timeout_ns; - // Dispatch anything that's immediately ready, even with zero timeout - while (io_context_.poll_one() && !ran_user_) { - } - // Now possibly block until the end of the timeout period - while (!ran_user_ && io_context_.run_one_until(end)) { - } - } else { - while (io_context_.run_one() && !ran_user_) { - } + const int signum = signal_pending_.exchange(0); + if (signum) { + rclpy_context_.attr("try_shutdown")(); + if (signum == SIGINT) { + PyErr_SetInterrupt(); + return; } - io_context_.restart(); } - CheckForSignals(); + const bool ok = py::cast(rclpy_context_.attr("ok")()); + if (!ok) { + Raise(py::module_::import("rclpy.executors").attr("ExternalShutdownException")()); + } } void EventsExecutor::spin_until_future_complete( - py::handle future, std::optional timeout_sec) + py::handle future, std::optional timeout_sec, bool stop_after_user_callback) { future.attr("add_done_callback")(py::cpp_function([this](py::handle) {io_context_.stop();})); - spin(timeout_sec); -} - -void EventsExecutor::spin_once_until_future_complete( - py::handle future, std::optional timeout_sec) -{ - future.attr("add_done_callback")(py::cpp_function([this](py::handle) {io_context_.stop();})); - spin_once(timeout_sec); + spin(timeout_sec, stop_after_user_callback); } EventsExecutor * EventsExecutor::enter() {return this;} @@ -344,9 +314,9 @@ void EventsExecutor::HandleAddedSubscription(py::handle subscription) const rcl_subscription_t * rcl_ptr = py::cast(handle).rcl_ptr(); const auto cb = std::bind(&EventsExecutor::HandleSubscriptionReady, this, subscription, pl::_1); if ( - RCL_RET_OK != - rcl_subscription_set_on_new_message_callback( - rcl_ptr, RclEventCallbackTrampoline, rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) + RCL_RET_OK != rcl_subscription_set_on_new_message_callback( + rcl_ptr, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) { throw std::runtime_error( std::string("Failed to set the on new message callback for subscription: ") + @@ -368,7 +338,9 @@ void EventsExecutor::HandleRemovedSubscription(py::handle subscription) void EventsExecutor::HandleSubscriptionReady(py::handle subscription, size_t number_of_events) { - ran_user_ = true; + if (stop_after_user_callback_) { + io_context_.stop(); + } py::gil_scoped_acquire gil_acquire; // Largely based on rclpy.Executor._take_subscription() and _execute_subcription(). @@ -444,8 +416,8 @@ void EventsExecutor::HandleTimerReady(py::handle timer, const rcl_timer_call_inf if (py::cast(inspect_iscoroutine_(result))) { // Create a Task to manage iteration of this coroutine later. create_task(result); - } else { - ran_user_ = true; + } else if (stop_after_user_callback_) { + io_context_.stop(); } } @@ -456,9 +428,9 @@ void EventsExecutor::HandleAddedClient(py::handle client) const rcl_client_t * rcl_ptr = py::cast(handle).rcl_ptr(); const auto cb = std::bind(&EventsExecutor::HandleClientReady, this, client, pl::_1); if ( - RCL_RET_OK != - rcl_client_set_on_new_response_callback( - rcl_ptr, RclEventCallbackTrampoline, rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) + RCL_RET_OK != rcl_client_set_on_new_response_callback( + rcl_ptr, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) { throw std::runtime_error( std::string("Failed to set the on new response callback for client: ") + @@ -480,7 +452,9 @@ void EventsExecutor::HandleRemovedClient(py::handle client) void EventsExecutor::HandleClientReady(py::handle client, size_t number_of_events) { - ran_user_ = true; + if (stop_after_user_callback_) { + io_context_.stop(); + } py::gil_scoped_acquire gil_acquire; // Largely based on rclpy.Executor._take_client() and _execute_client(). @@ -524,9 +498,9 @@ void EventsExecutor::HandleAddedService(py::handle service) const rcl_service_t * rcl_ptr = py::cast(handle).rcl_ptr(); const auto cb = std::bind(&EventsExecutor::HandleServiceReady, this, service, pl::_1); if ( - RCL_RET_OK != - rcl_service_set_on_new_request_callback( - rcl_ptr, RclEventCallbackTrampoline, rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) + RCL_RET_OK != rcl_service_set_on_new_request_callback( + rcl_ptr, RclEventCallbackTrampoline, + rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with))) { throw std::runtime_error( std::string("Failed to set the on new request callback for service: ") + @@ -548,7 +522,9 @@ void EventsExecutor::HandleRemovedService(py::handle service) void EventsExecutor::HandleServiceReady(py::handle service, size_t number_of_events) { - ran_user_ = true; + if (stop_after_user_callback_) { + io_context_.stop(); + } py::gil_scoped_acquire gil_acquire; // Largely based on rclpy.Executor._take_service() and _execute_service(). @@ -818,7 +794,9 @@ void EventsExecutor::HandleWaitableEventReady( void EventsExecutor::HandleWaitableReady( py::handle waitable, std::shared_ptr wait_set, size_t number_of_events) { - ran_user_ = true; + if (stop_after_user_callback_) { + io_context_.stop(); + } // Largely based on rclpy.Executor._take_waitable() // https://github.com/ros2/rclpy/blob/a19180c238d4d97ed2b58868d8fb7fa3e3b621f2/rclpy/rclpy/executors.py#L447-L454 py::object is_ready = waitable.attr("is_ready"); @@ -842,7 +820,9 @@ void EventsExecutor::HandleWaitableReady( void EventsExecutor::IterateTask(py::handle task) { - ran_user_ = true; + if (stop_after_user_callback_) { + io_context_.stop(); + } py::gil_scoped_acquire gil_acquire; // Calling this won't throw, but it may set the exception property on the task object. task(); @@ -925,23 +905,6 @@ void EventsExecutor::Raise(py::object ex) py::exec("raise ex", scope); } -void EventsExecutor::CheckForSignals() -{ - const int signum = signal_pending_.exchange(0); - if (signum) { - rclpy_context_.attr("try_shutdown")(); - if (signum == SIGINT) { - PyErr_SetInterrupt(); - return; - } - } - - const bool ok = py::cast(rclpy_context_.attr("ok")()); - if (!ok) { - Raise(py::module_::import("rclpy.executors").attr("ExternalShutdownException")()); - } -} - // pybind11 module bindings void define_events_executor(py::object module) @@ -956,12 +919,23 @@ void define_events_executor(py::object module) .def("wake", &EventsExecutor::wake) .def("get_nodes", &EventsExecutor::get_nodes) .def("spin", [](EventsExecutor & exec) {exec.spin();}) - .def("spin_once", &EventsExecutor::spin_once, py::arg("timeout_sec") = py::none()) .def( - "spin_until_future_complete", &EventsExecutor::spin_until_future_complete, py::arg("future"), + "spin_once", + [](EventsExecutor & exec, std::optional timeout_sec) { + exec.spin(timeout_sec, true); + }, py::arg("timeout_sec") = py::none()) .def( - "spin_once_until_future_complete", &EventsExecutor::spin_once_until_future_complete, + "spin_until_future_complete", + [](EventsExecutor & exec, py::handle future, std::optional timeout_sec) { + exec.spin_until_future_complete(future, timeout_sec); + }, + py::arg("future"), py::arg("timeout_sec") = py::none()) + .def( + "spin_once_until_future_complete", + [](EventsExecutor & exec, py::handle future, std::optional timeout_sec) { + exec.spin_until_future_complete(future, timeout_sec, true); + }, py::arg("future"), py::arg("timeout_sec") = py::none()) .def("__enter__", &EventsExecutor::enter) .def("__exit__", &EventsExecutor::exit); diff --git a/rclpy/src/rclpy/events_executor/events_executor.hpp b/rclpy/src/rclpy/events_executor/events_executor.hpp index 901335545..4f4f25f3b 100644 --- a/rclpy/src/rclpy/events_executor/events_executor.hpp +++ b/rclpy/src/rclpy/events_executor/events_executor.hpp @@ -70,11 +70,10 @@ class EventsExecutor void remove_node(pybind11::handle node); void wake(); pybind11::list get_nodes() const; - void spin(std::optional timeout_sec = {}); - void spin_once(std::optional timeout_sec = {}); - void spin_until_future_complete(pybind11::handle future, std::optional timeout_sec = {}); - void spin_once_until_future_complete( - pybind11::handle future, std::optional timeout_sec = {}); + void spin(std::optional timeout_sec = {}, bool stop_after_user_callback = false); + void spin_until_future_complete( + pybind11::handle future, std::optional timeout_sec = {}, + bool stop_after_user_callback = false); EventsExecutor * enter(); void exit(pybind11::object, pybind11::object, pybind11::object); @@ -175,9 +174,9 @@ class EventsExecutor std::timed_mutex spinning_mutex_; ///< Held while a thread is spinning std::atomic signal_pending_{}; ///< Signal number of caught signal, 0 if none - /// This flag is used by spin_once() to determine if a user-visible callback has been dispatched - /// during its operation. - bool ran_user_{}; + /// This flag is used by spin_once() to signal that the io_context_ should be stopped after a + /// single user-visible callback has been dispatched. + bool stop_after_user_callback_{}; // Collection of awaitable entities we're servicing pybind11::set subscriptions_;