Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Don't reopen file for every seek if we don't have to. Search directionally for the correct file #1117

Merged
merged 3 commits into from
Nov 7, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 20 additions & 4 deletions rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,19 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
void seek(const rcutils_time_point_value_t & timestamp) override;

/**
* Ask whether there is another database file to read from the list of relative
* file paths.
* Ask whether there is another storage file to read from the list of relative file paths.
*
* \return true if there are still files to read in the list
* \return true if iteration is not on the last file
*/
virtual bool has_next_file() const;

/**
* Ask whether there is a previous file to read from the list of relative file paths.
*
* \return true if iteration is not on the first file
*/
virtual bool has_prev_file() const;

/**
* Return the relative file path pointed to by the current file iterator.
*/
Expand Down Expand Up @@ -127,15 +133,25 @@ class ROSBAG2_CPP_PUBLIC SequentialReader
*/
virtual void load_current_file();


/**
* Increment the current file iterator to point to the next file in the list of relative file
* paths, and opens the next file by calling open_current_file()
* paths, and opens that file by calling open_current_file()
*
* Expected usage:
* if (has_next_file()) load_next_file();
*/
virtual void load_next_file();

/**
* Increment the current file iterator to point to the previous file in the list of relative file
* paths, and opens that file by calling open_current_file()
*
* Expected usage:
* if (has_prev_file()) load_prev_file();
*/
virtual void load_prev_file();

/**
* Checks if all topics in the bagfile have the same RMW serialization format.
* Currently a bag file can only be played if all topics have the same serialization format.
Expand Down
73 changes: 48 additions & 25 deletions rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,10 +143,15 @@ bool SequentialReader::has_next()
// If there's no new message, check if there's at least another file to read and update storage
// to read from there. Otherwise, check if there's another message.
bool current_storage_has_next = storage_->has_next();
if (!current_storage_has_next && has_next_file()) {
load_next_file();
// recursively call has_next again after rollover
return has_next();
if (!current_storage_has_next) {
if (!read_order_.reverse && has_next_file()) {
load_next_file();
return has_next();
}
if (read_order_.reverse && has_prev_file()) {
load_prev_file();
return has_next();
}
}
return current_storage_has_next;
}
Expand Down Expand Up @@ -201,27 +206,38 @@ void SequentialReader::reset_filter()
void SequentialReader::seek(const rcutils_time_point_value_t & timestamp)
{
seek_time_ = timestamp;
if (storage_) {
// reset to the first file
if (read_order_.reverse) {
current_file_iterator_ = std::prev(file_paths_.end());
} else {
current_file_iterator_ = file_paths_.begin();
}
load_current_file();
return;
if (!storage_) {
throw std::runtime_error(
"Bag is not open. Call open() before seeking time.");
}
throw std::runtime_error(
"Bag is not open. Call open() before seeking time.");

auto metadata = storage_->get_metadata();
auto start_time = metadata.starting_time.time_since_epoch().count();
auto end_time = (metadata.starting_time + metadata.duration).time_since_epoch().count();

if (timestamp < start_time && has_prev_file()) {
// Check back a file if the timestamp is before the beginning of the current file
load_prev_file();
return seek(timestamp);
} else if (timestamp > end_time && has_next_file()) {
// Check forward a file if the timestamp is after the end of the current file
load_next_file();
return seek(timestamp);
} else {
// The timestamp lies in the range of this file, or there are no files left to go to
storage_->seek(timestamp);
}
return;
}

bool SequentialReader::has_next_file() const
{
if (read_order_.reverse) {
return current_file_iterator_ != file_paths_.begin();
} else {
return (current_file_iterator_ + 1) != file_paths_.end();
}
return (current_file_iterator_ + 1) != file_paths_.end();
}

bool SequentialReader::has_prev_file() const
{
return current_file_iterator_ != file_paths_.begin();
}

void SequentialReader::load_current_file()
Expand Down Expand Up @@ -249,11 +265,18 @@ void SequentialReader::load_next_file()
assert(current_file_iterator_ != file_paths_.end());
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = get_current_file();
if (read_order_.reverse) {
current_file_iterator_--;
} else {
current_file_iterator_++;
}
current_file_iterator_++;
info->opened_file = get_current_file();
load_current_file();
callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info);
}

void SequentialReader::load_prev_file()
{
assert(current_file_iterator_ != file_paths_.begin());
auto info = std::make_shared<bag_events::BagSplitInfo>();
info->closed_file = get_current_file();
current_file_iterator_--;
info->opened_file = get_current_file();
load_current_file();
callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info);
Expand Down
3 changes: 1 addition & 2 deletions rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -228,9 +228,8 @@ TEST_F(MultifileReaderTest, seek_bag)
{
init();
reader_->open(default_storage_options_, {"", storage_serialization_format_});
EXPECT_CALL(*storage_, has_next()).Times(3).WillRepeatedly(Return(false));
EXPECT_CALL(*storage_, has_next()).Times(1).WillRepeatedly(Return(false));
EXPECT_CALL(*storage_, seek(_)).Times(3);
EXPECT_CALL(*storage_, set_filter(_)).Times(3);
reader_->seek(9999999999999);
reader_->has_next();
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,8 +116,9 @@ get_interface_instance(
registered_classes.begin(),
registered_classes.end(), storage_options.storage_id);
if (class_exists == registered_classes.end()) {
ROSBAG2_STORAGE_LOG_WARN_STREAM(
"No storage plugin found with id '" << storage_options.storage_id << "'.");
// This should not print a warning, because it can be used by open_read_only twice,
// legitimately expecting to fail for READ_ONLY but succeed for READ_WRITE
// The extra output is misleading to end users.
return nullptr;
}

Expand Down