Skip to content

Commit

Permalink
Add PlayerClock::wakeup() to interrupt sleeping (#1869)
Browse files Browse the repository at this point in the history
* Add PlayerClock::wakeup() to interrupt sleeping

Signed-off-by: Christophe Bedard <[email protected]>

* Use PlayerClock::wakeup() in new play_next() impl

Signed-off-by: Christophe Bedard <[email protected]>

---------

Signed-off-by: Christophe Bedard <[email protected]>
  • Loading branch information
christophebedard authored Dec 3, 2024
1 parent 4602b2c commit c8feaea
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 9 deletions.
13 changes: 13 additions & 0 deletions rosbag2_cpp/include/rosbag2_cpp/clocks/player_clock.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,19 @@ class PlayerClock
ROSBAG2_CPP_PUBLIC
virtual bool sleep_until(rclcpp::Time time) = 0;

/**
* \return whether the clock is currently sleeping.
*/
ROSBAG2_CPP_PUBLIC
virtual bool is_sleeping() = 0;

/**
* \brief Wake up the clock if it is sleeping.
* \note This will wake any waiting `sleep_until`.
*/
ROSBAG2_CPP_PUBLIC
virtual void wakeup() = 0;

/**
* Change the rate of the flow of time for the clock.
* \param rate new rate of clock playback
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,12 @@ class TimeControllerClock : public PlayerClock
ROSBAG2_CPP_PUBLIC
bool sleep_until(rclcpp::Time until) override;

ROSBAG2_CPP_PUBLIC
bool is_sleeping() override;

ROSBAG2_CPP_PUBLIC
void wakeup() override;

/**
* Change the rate of the flow of time for the clock.
*
Expand Down
16 changes: 16 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/clocks/time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class TimeControllerClockImpl
std::condition_variable cv RCPPUTILS_TSA_GUARDED_BY(state_mutex);
double rate RCPPUTILS_TSA_GUARDED_BY(state_mutex) = 1.0;
bool paused RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false;
bool is_sleeping RCPPUTILS_TSA_GUARDED_BY(state_mutex) = false;
TimeReference reference RCPPUTILS_TSA_GUARDED_BY(state_mutex);
};

Expand Down Expand Up @@ -165,12 +166,16 @@ bool TimeControllerClock::sleep_until(rcutils_time_point_value_t until)
{
rcpputils::unique_lock<std::mutex> lock(impl_->state_mutex);
if (impl_->paused) {
impl_->is_sleeping = true;
impl_->cv.wait_for(lock, impl_->sleep_time_while_paused);
impl_->is_sleeping = false;
} else {
const auto steady_until = impl_->ros_to_steady(until);
// wait only if necessary for performance
if (steady_until > impl_->now_fn()) {
impl_->is_sleeping = true;
impl_->cv.wait_until(lock, steady_until);
impl_->is_sleeping = false;
}
}
if (impl_->paused) {
Expand All @@ -187,6 +192,17 @@ bool TimeControllerClock::sleep_until(rclcpp::Time until)
return sleep_until(until.nanoseconds());
}

bool TimeControllerClock::is_sleeping()
{
std::lock_guard<std::mutex> lock(impl_->state_mutex);
return impl_->is_sleeping;
}

void TimeControllerClock::wakeup()
{
impl_->cv.notify_all();
}

bool TimeControllerClock::set_rate(double rate)
{
if (rate <= 0) {
Expand Down
26 changes: 26 additions & 0 deletions rosbag2_cpp/test/rosbag2_cpp/test_time_controller_clock.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,32 @@ TEST_F(TimeControllerClockTest, interrupted_sleep_returns_false_immediately)
EXPECT_FALSE(thread_sleep_result);
}

TEST_F(TimeControllerClockTest, wakeup_sleep_returns_immediately)
{
// Use a bigger value for sleep_time_while_paused
auto sleep_time_while_paused = std::chrono::seconds(30);
rosbag2_cpp::TimeControllerClock clock(ros_start_time, now_fn, sleep_time_while_paused);
clock.pause();
EXPECT_FALSE(clock.is_sleeping());
auto steady_start = std::chrono::steady_clock::now();
std::atomic_bool thread_sleep_result{true};
auto sleep_long_thread = std::thread(
[&clock, &thread_sleep_result]() {
bool sleep_result = clock.sleep_until(clock.now() + RCUTILS_S_TO_NS(10));
thread_sleep_result.store(sleep_result);
});
// Wait for the thread to be sleeping for the purposes of this test
while (!clock.is_sleeping()) {
std::this_thread::sleep_for(std::chrono::milliseconds(1));
}
clock.wakeup();
sleep_long_thread.join();
EXPECT_FALSE(clock.is_sleeping());
// Check that the clock slept less than the sleep_time_while_paused value
EXPECT_LT(std::chrono::steady_clock::now() - steady_start, sleep_time_while_paused);
EXPECT_FALSE(thread_sleep_result);
}

TEST_F(TimeControllerClockTest, resumes_at_correct_ros_time)
{
rosbag2_cpp::TimeControllerClock clock(ros_start_time, now_fn);
Expand Down
15 changes: 6 additions & 9 deletions rosbag2_transport/src/rosbag2_transport/player.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -641,8 +641,8 @@ void PlayerImpl::stop()
}

if (clock_->is_paused()) {
clock_->resume(); // Temporary resume clock to force wakeup in clock_->sleep_until(time)
clock_->pause(); // Return in pause mode to preserve original state of the player
// Wake up the clock in case it's in a sleep_until(time) call
clock_->wakeup();
}
// Note: Don't clean up message queue here. It will be cleaned up automatically in
// playback thread after finishing play_messages_from_queue();
Expand Down Expand Up @@ -739,10 +739,8 @@ bool PlayerImpl::play_next()

// Wait for play next to be done, and then return the result
std::unique_lock<std::mutex> lk(finished_play_next_mutex_);
// Temporarily resume clock to force wakeup in clock_->sleep_until(time),
// then return in pause mode to preserve original state of the player
clock_->resume();
clock_->pause();
// Wake up the clock in case it's in a sleep_until(time) call
clock_->wakeup();
finished_play_next_ = false;
finished_play_next_cv_.wait(lk, [this] {return finished_play_next_.load();});
play_next_ = false;
Expand Down Expand Up @@ -1044,9 +1042,8 @@ void PlayerImpl::play_messages_from_queue()
while (!stop_playback_ && is_paused() && !play_next_.load() && rclcpp::ok()) {
clock_->sleep_until(clock_->now());
}
// If we ran out of messages and are not in pause state, it means we're done playing,
// unless play_next() is resuming and pausing the clock in order to wake us up
if (!is_paused() && !play_next_.load()) {
// If we ran out of messages and are not in pause state, it means we're done playing
if (!is_paused()) {
break;
}

Expand Down

0 comments on commit c8feaea

Please sign in to comment.