Skip to content

Commit

Permalink
Simplify spin_once() implementation
Browse files Browse the repository at this point in the history
This both reduces duplicate code now, and simplifies the asio interface used which would need
replacing.

Signed-off-by: Brad Martin <[email protected]>
  • Loading branch information
Brad Martin committed Jan 24, 2025
1 parent 35f4aea commit df3dbb7
Show file tree
Hide file tree
Showing 2 changed files with 61 additions and 88 deletions.
134 changes: 54 additions & 80 deletions rclpy/src/rclpy/events_executor/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,14 @@ py::list EventsExecutor::get_nodes() const {return nodes_;}
// specific node and more than one node may be connected to an executor instance.
// https://github.com/ros2/rclpy/blob/06d78fb28a6d61ede793201ae75474f3e5432b47/rclpy/rclpy/executors.py#L184-L185

void EventsExecutor::spin(std::optional<double> timeout_sec)
void EventsExecutor::spin(std::optional<double> timeout_sec, bool stop_after_user_callback)
{
{
std::unique_lock<std::timed_mutex> spin_lock(spinning_mutex_, std::try_to_lock);
if (!spin_lock) {
throw std::runtime_error("Attempt to spin an already-spinning Executor");
}
stop_after_user_callback_ = stop_after_user_callback;
// Release the GIL while we block. Any callbacks on the io_context that want to touch Python
// will need to reacquire it though.
py::gil_scoped_release gil_release;
Expand All @@ -208,57 +209,26 @@ void EventsExecutor::spin(std::optional<double> timeout_sec)
io_context_.restart();
}

CheckForSignals();
}

void EventsExecutor::spin_once(std::optional<double> timeout_sec)
{
{
std::unique_lock<std::timed_mutex> spin_lock(spinning_mutex_, std::try_to_lock);
if (!spin_lock) {
throw std::runtime_error("Attempt to spin an already-spinning Executor");
}
// Release the GIL while we block. Any callbacks on the io_context that want to touch Python
// will need to reacquire it though.
py::gil_scoped_release gil_release;
// Don't let asio auto stop if there's nothing to do
const auto work = asio::make_work_guard(io_context_);
// We can't just use asio run_one*() here, because 'one' asio callback might include some
// internal housekeeping, and not a user-visible callback that was intended when the Executor
// was asked to dispatch 'once'.
ran_user_ = false;
if (timeout_sec) {
const auto timeout_ns = std::chrono::duration_cast<std::chrono::nanoseconds>(
std::chrono::duration<double>(*timeout_sec));
const auto end = std::chrono::steady_clock::now() + timeout_ns;
// Dispatch anything that's immediately ready, even with zero timeout
while (io_context_.poll_one() && !ran_user_) {
}
// Now possibly block until the end of the timeout period
while (!ran_user_ && io_context_.run_one_until(end)) {
}
} else {
while (io_context_.run_one() && !ran_user_) {
}
const int signum = signal_pending_.exchange(0);
if (signum) {
rclpy_context_.attr("try_shutdown")();
if (signum == SIGINT) {
PyErr_SetInterrupt();
return;
}
io_context_.restart();
}

CheckForSignals();
const bool ok = py::cast<bool>(rclpy_context_.attr("ok")());
if (!ok) {
Raise(py::module_::import("rclpy.executors").attr("ExternalShutdownException")());
}
}

void EventsExecutor::spin_until_future_complete(
py::handle future, std::optional<double> timeout_sec)
py::handle future, std::optional<double> timeout_sec, bool stop_after_user_callback)
{
future.attr("add_done_callback")(py::cpp_function([this](py::handle) {io_context_.stop();}));
spin(timeout_sec);
}

void EventsExecutor::spin_once_until_future_complete(
py::handle future, std::optional<double> timeout_sec)
{
future.attr("add_done_callback")(py::cpp_function([this](py::handle) {io_context_.stop();}));
spin_once(timeout_sec);
spin(timeout_sec, stop_after_user_callback);
}

EventsExecutor * EventsExecutor::enter() {return this;}
Expand Down Expand Up @@ -344,9 +314,9 @@ void EventsExecutor::HandleAddedSubscription(py::handle subscription)
const rcl_subscription_t * rcl_ptr = py::cast<const Subscription &>(handle).rcl_ptr();
const auto cb = std::bind(&EventsExecutor::HandleSubscriptionReady, this, subscription, pl::_1);
if (
RCL_RET_OK !=
rcl_subscription_set_on_new_message_callback(
rcl_ptr, RclEventCallbackTrampoline, rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with)))
RCL_RET_OK != rcl_subscription_set_on_new_message_callback(
rcl_ptr, RclEventCallbackTrampoline,
rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with)))
{
throw std::runtime_error(
std::string("Failed to set the on new message callback for subscription: ") +
Expand All @@ -368,7 +338,9 @@ void EventsExecutor::HandleRemovedSubscription(py::handle subscription)

void EventsExecutor::HandleSubscriptionReady(py::handle subscription, size_t number_of_events)
{
ran_user_ = true;
if (stop_after_user_callback_) {
io_context_.stop();
}
py::gil_scoped_acquire gil_acquire;

// Largely based on rclpy.Executor._take_subscription() and _execute_subcription().
Expand Down Expand Up @@ -444,8 +416,8 @@ void EventsExecutor::HandleTimerReady(py::handle timer, const rcl_timer_call_inf
if (py::cast<bool>(inspect_iscoroutine_(result))) {
// Create a Task to manage iteration of this coroutine later.
create_task(result);
} else {
ran_user_ = true;
} else if (stop_after_user_callback_) {
io_context_.stop();
}
}

Expand All @@ -456,9 +428,9 @@ void EventsExecutor::HandleAddedClient(py::handle client)
const rcl_client_t * rcl_ptr = py::cast<const Client &>(handle).rcl_ptr();
const auto cb = std::bind(&EventsExecutor::HandleClientReady, this, client, pl::_1);
if (
RCL_RET_OK !=
rcl_client_set_on_new_response_callback(
rcl_ptr, RclEventCallbackTrampoline, rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with)))
RCL_RET_OK != rcl_client_set_on_new_response_callback(
rcl_ptr, RclEventCallbackTrampoline,
rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with)))
{
throw std::runtime_error(
std::string("Failed to set the on new response callback for client: ") +
Expand All @@ -480,7 +452,9 @@ void EventsExecutor::HandleRemovedClient(py::handle client)

void EventsExecutor::HandleClientReady(py::handle client, size_t number_of_events)
{
ran_user_ = true;
if (stop_after_user_callback_) {
io_context_.stop();
}
py::gil_scoped_acquire gil_acquire;

// Largely based on rclpy.Executor._take_client() and _execute_client().
Expand Down Expand Up @@ -524,9 +498,9 @@ void EventsExecutor::HandleAddedService(py::handle service)
const rcl_service_t * rcl_ptr = py::cast<const Service &>(handle).rcl_ptr();
const auto cb = std::bind(&EventsExecutor::HandleServiceReady, this, service, pl::_1);
if (
RCL_RET_OK !=
rcl_service_set_on_new_request_callback(
rcl_ptr, RclEventCallbackTrampoline, rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with)))
RCL_RET_OK != rcl_service_set_on_new_request_callback(
rcl_ptr, RclEventCallbackTrampoline,
rcl_callback_manager_.MakeCallback(rcl_ptr, cb, with)))
{
throw std::runtime_error(
std::string("Failed to set the on new request callback for service: ") +
Expand All @@ -548,7 +522,9 @@ void EventsExecutor::HandleRemovedService(py::handle service)

void EventsExecutor::HandleServiceReady(py::handle service, size_t number_of_events)
{
ran_user_ = true;
if (stop_after_user_callback_) {
io_context_.stop();
}
py::gil_scoped_acquire gil_acquire;

// Largely based on rclpy.Executor._take_service() and _execute_service().
Expand Down Expand Up @@ -818,7 +794,9 @@ void EventsExecutor::HandleWaitableEventReady(
void EventsExecutor::HandleWaitableReady(
py::handle waitable, std::shared_ptr<WaitSet> wait_set, size_t number_of_events)
{
ran_user_ = true;
if (stop_after_user_callback_) {
io_context_.stop();
}
// Largely based on rclpy.Executor._take_waitable()
// https://github.com/ros2/rclpy/blob/a19180c238d4d97ed2b58868d8fb7fa3e3b621f2/rclpy/rclpy/executors.py#L447-L454
py::object is_ready = waitable.attr("is_ready");
Expand All @@ -842,7 +820,9 @@ void EventsExecutor::HandleWaitableReady(

void EventsExecutor::IterateTask(py::handle task)
{
ran_user_ = true;
if (stop_after_user_callback_) {
io_context_.stop();
}
py::gil_scoped_acquire gil_acquire;
// Calling this won't throw, but it may set the exception property on the task object.
task();
Expand Down Expand Up @@ -925,23 +905,6 @@ void EventsExecutor::Raise(py::object ex)
py::exec("raise ex", scope);
}

void EventsExecutor::CheckForSignals()
{
const int signum = signal_pending_.exchange(0);
if (signum) {
rclpy_context_.attr("try_shutdown")();
if (signum == SIGINT) {
PyErr_SetInterrupt();
return;
}
}

const bool ok = py::cast<bool>(rclpy_context_.attr("ok")());
if (!ok) {
Raise(py::module_::import("rclpy.executors").attr("ExternalShutdownException")());
}
}

// pybind11 module bindings

void define_events_executor(py::object module)
Expand All @@ -956,12 +919,23 @@ void define_events_executor(py::object module)
.def("wake", &EventsExecutor::wake)
.def("get_nodes", &EventsExecutor::get_nodes)
.def("spin", [](EventsExecutor & exec) {exec.spin();})
.def("spin_once", &EventsExecutor::spin_once, py::arg("timeout_sec") = py::none())
.def(
"spin_until_future_complete", &EventsExecutor::spin_until_future_complete, py::arg("future"),
"spin_once",
[](EventsExecutor & exec, std::optional<double> timeout_sec) {
exec.spin(timeout_sec, true);
},
py::arg("timeout_sec") = py::none())
.def(
"spin_once_until_future_complete", &EventsExecutor::spin_once_until_future_complete,
"spin_until_future_complete",
[](EventsExecutor & exec, py::handle future, std::optional<double> timeout_sec) {
exec.spin_until_future_complete(future, timeout_sec);
},
py::arg("future"), py::arg("timeout_sec") = py::none())
.def(
"spin_once_until_future_complete",
[](EventsExecutor & exec, py::handle future, std::optional<double> timeout_sec) {
exec.spin_until_future_complete(future, timeout_sec, true);
},
py::arg("future"), py::arg("timeout_sec") = py::none())
.def("__enter__", &EventsExecutor::enter)
.def("__exit__", &EventsExecutor::exit);
Expand Down
15 changes: 7 additions & 8 deletions rclpy/src/rclpy/events_executor/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,11 +70,10 @@ class EventsExecutor
void remove_node(pybind11::handle node);
void wake();
pybind11::list get_nodes() const;
void spin(std::optional<double> timeout_sec = {});
void spin_once(std::optional<double> timeout_sec = {});
void spin_until_future_complete(pybind11::handle future, std::optional<double> timeout_sec = {});
void spin_once_until_future_complete(
pybind11::handle future, std::optional<double> timeout_sec = {});
void spin(std::optional<double> timeout_sec = {}, bool stop_after_user_callback = false);
void spin_until_future_complete(
pybind11::handle future, std::optional<double> timeout_sec = {},
bool stop_after_user_callback = false);
EventsExecutor * enter();
void exit(pybind11::object, pybind11::object, pybind11::object);

Expand Down Expand Up @@ -175,9 +174,9 @@ class EventsExecutor
std::timed_mutex spinning_mutex_; ///< Held while a thread is spinning
std::atomic<int> signal_pending_{}; ///< Signal number of caught signal, 0 if none

/// This flag is used by spin_once() to determine if a user-visible callback has been dispatched
/// during its operation.
bool ran_user_{};
/// This flag is used by spin_once() to signal that the io_context_ should be stopped after a
/// single user-visible callback has been dispatched.
bool stop_after_user_callback_{};

// Collection of awaitable entities we're servicing
pybind11::set subscriptions_;
Expand Down

0 comments on commit df3dbb7

Please sign in to comment.