Skip to content

Commit

Permalink
Add support for coroutines to timer handling
Browse files Browse the repository at this point in the history
Signed-off-by: Brad Martin <[email protected]>
  • Loading branch information
Brad Martin committed Jan 21, 2025
1 parent 7cc7f5f commit 3502872
Show file tree
Hide file tree
Showing 3 changed files with 57 additions and 56 deletions.
25 changes: 12 additions & 13 deletions rclpy/src/rclpy/events_executor/events_executor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ namespace events_executor

EventsExecutor::EventsExecutor(py::object context)
: rclpy_context_(context),
asyncio_run_(py::module_::import("asyncio").attr("run")),
inspect_iscoroutine_(py::module_::import("inspect").attr("iscoroutine")),
rclpy_task_(py::module_::import("rclpy.task").attr("Task")),
signals_(io_context_),
rcl_callback_manager_(io_context_.get_executor()),
Expand Down Expand Up @@ -411,13 +411,18 @@ void EventsExecutor::HandleRemovedTimer(py::handle timer) {timers_manager_.Remov

void EventsExecutor::HandleTimerReady(py::handle timer)
{
ran_user_ = true;
py::gil_scoped_acquire gil_acquire;

try {
// Unlike most rclpy objects this doesn't document whether it's a Callable or might be a
// Coroutine. Let's hope it's the former.
timer.attr("callback")();
// The type markup claims this can't be a coroutine, but this seems to be a lie because the unit
// test does exactly that.
py::object result = timer.attr("callback")();
if (py::cast<bool>(inspect_iscoroutine_(result))) {
// Create a Task to manage iteration of this coroutine later.
create_task(result);
} else {
ran_user_ = true;
}
} catch (const py::error_already_set & e) {
HandleCallbackExceptionInNodeEntity(e, timer, "timers");
throw;
Expand Down Expand Up @@ -810,14 +815,8 @@ void EventsExecutor::HandleWaitableReady(
throw std::runtime_error("Failed to make Waitable ready");
}
py::object data = take_data();
try {
// execute() is an async method, we need to use asyncio to run it
// TODO(bmartin427) Don't run all of this immediately, blocking everything else
asyncio_run_(execute(data));
} catch (const py::error_already_set & e) {
HandleCallbackExceptionInNodeEntity(e, waitable, "waitables");
throw;
}
// execute() is an async method, we need a Task to run it
create_task(execute(data));
}
}

Expand Down
4 changes: 2 additions & 2 deletions rclpy/src/rclpy/events_executor/events_executor.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class EventsExecutor
// rclpy Executor API methods:
pybind11::object get_context() const {return rclpy_context_;}
pybind11::object create_task(
pybind11::object callback, pybind11::args args, const pybind11::kwargs & kwargs);
pybind11::object callback, pybind11::args args = {}, const pybind11::kwargs & kwargs = {});
bool shutdown(std::optional<double> timeout_sec = {});
bool add_node(pybind11::object node);
void remove_node(pybind11::handle node);
Expand Down Expand Up @@ -160,7 +160,7 @@ class EventsExecutor
const pybind11::object rclpy_context_;

// Imported python objects we depend on
const pybind11::object asyncio_run_;
const pybind11::object inspect_iscoroutine_;
const pybind11::object rclpy_task_;

asio::io_context io_context_;
Expand Down
84 changes: 43 additions & 41 deletions rclpy/test/test_executor.py
Original file line number Diff line number Diff line change
Expand Up @@ -211,32 +211,34 @@ def test_executor_spin_non_blocking(self) -> None:

def test_execute_coroutine_timer(self) -> None:
self.assertIsNotNone(self.node.handle)
# TODO(bmartin427) EventsExecutor doesn't yet properly handle coroutines
executor = SingleThreadedExecutor(context=self.context)
executor.add_node(self.node)

called1 = False
called2 = False

async def coroutine() -> None:
nonlocal called1
nonlocal called2
called1 = True
await asyncio.sleep(0)
called2 = True

tmr = self.node.create_timer(0.1, coroutine)
try:
executor.spin_once(timeout_sec=1.23)
self.assertTrue(called1)
self.assertFalse(called2)
for cls in [SingleThreadedExecutor, EventsExecutor]:
executor = cls(context=self.context)
executor.add_node(self.node)

called1 = False
executor.spin_once(timeout_sec=0)
self.assertFalse(called1)
self.assertTrue(called2)
finally:
self.node.destroy_timer(tmr)
called2 = False

async def coroutine() -> None:
nonlocal called1
nonlocal called2
called1 = True
await asyncio.sleep(0)
called2 = True

# TODO(bmartin427) The type markup on Node.create_timer() says you can't pass a
# coroutine here.
tmr = self.node.create_timer(0.1, coroutine)
try:
executor.spin_once(timeout_sec=1.23)
self.assertTrue(called1)
self.assertFalse(called2)

called1 = False
executor.spin_once(timeout_sec=0)
self.assertFalse(called1)
self.assertTrue(called2)
finally:
self.node.destroy_timer(tmr)

def test_execute_coroutine_guard_condition(self) -> None:
self.assertIsNotNone(self.node.handle)
Expand Down Expand Up @@ -410,27 +412,27 @@ def __await__(self):
yield
return

# TODO(bmartin427) EventsExecutor doesn't yet properly handle async callbacks
trigger = TriggerAwait()
did_callback = False
did_return = False
for cls in [SingleThreadedExecutor, EventsExecutor]:
trigger = TriggerAwait()
did_callback = False
did_return = False

async def timer_callback() -> None:
nonlocal trigger, did_callback, did_return
did_callback = True
await trigger
did_return = True
async def timer_callback() -> None:
nonlocal trigger, did_callback, did_return
did_callback = True
await trigger
did_return = True

timer = self.node.create_timer(0.1, timer_callback)
timer = self.node.create_timer(0.1, timer_callback)

executor = SingleThreadedExecutor(context=self.context)
rclpy.spin_once(self.node, timeout_sec=0.5, executor=executor)
self.assertTrue(did_callback)
executor = cls(context=self.context)
rclpy.spin_once(self.node, timeout_sec=0.5, executor=executor)
self.assertTrue(did_callback)

timer.cancel()
trigger.do_yield = False
rclpy.spin_once(self.node, timeout_sec=0, executor=executor)
self.assertTrue(did_return)
timer.cancel()
trigger.do_yield = False
rclpy.spin_once(self.node, timeout_sec=0, executor=executor)
self.assertTrue(did_return)

def test_executor_add_node(self) -> None:
self.assertIsNotNone(self.node.handle)
Expand Down

0 comments on commit 3502872

Please sign in to comment.