From 77b240bf2c1c069d8947347e60766e1bbdee3020 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Thu, 6 Oct 2022 16:37:28 -0700 Subject: [PATCH 1/3] Don't reopen file for every seek if we don't have to. Search directionally for the correct file Signed-off-by: Emerson Knapp --- .../rosbag2_cpp/readers/sequential_reader.hpp | 7 +-- .../rosbag2_cpp/readers/sequential_reader.cpp | 44 ++++++++++++------- .../rosbag2_cpp/test_multifile_reader.cpp | 3 +- .../impl/storage_factory_impl.hpp | 5 ++- 4 files changed, 35 insertions(+), 24 deletions(-) diff --git a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp index c0d8c217d7..cca7652634 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp @@ -91,12 +91,13 @@ class ROSBAG2_CPP_PUBLIC SequentialReader void seek(const rcutils_time_point_value_t & timestamp) override; /** - * Ask whether there is another database file to read from the list of relative + * Ask whether there is another storage file to read from the list of relative * file paths. * + * \param reverse Look for a previous file instead of a next file * \return true if there are still files to read in the list */ - virtual bool has_next_file() const; + virtual bool has_next_file(bool reverse) const; /** * Return the relative file path pointed to by the current file iterator. @@ -134,7 +135,7 @@ class ROSBAG2_CPP_PUBLIC SequentialReader * Expected usage: * if (has_next_file()) load_next_file(); */ - virtual void load_next_file(); + virtual void load_next_file(bool reverse); /** * Checks if all topics in the bagfile have the same RMW serialization format. diff --git a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp index 1bac318483..94babad3f0 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp @@ -143,8 +143,8 @@ bool SequentialReader::has_next() // If there's no new message, check if there's at least another file to read and update storage // to read from there. Otherwise, check if there's another message. bool current_storage_has_next = storage_->has_next(); - if (!current_storage_has_next && has_next_file()) { - load_next_file(); + if (!current_storage_has_next && has_next_file(read_order_.reverse)) { + load_next_file(read_order_.reverse); // recursively call has_next again after rollover return has_next(); } @@ -201,23 +201,33 @@ void SequentialReader::reset_filter() void SequentialReader::seek(const rcutils_time_point_value_t & timestamp) { seek_time_ = timestamp; - if (storage_) { - // reset to the first file - if (read_order_.reverse) { - current_file_iterator_ = std::prev(file_paths_.end()); - } else { - current_file_iterator_ = file_paths_.begin(); - } - load_current_file(); - return; + if (!storage_) { + throw std::runtime_error( + "Bag is not open. Call open() before seeking time."); } - throw std::runtime_error( - "Bag is not open. Call open() before seeking time."); + + auto metadata = storage_->get_metadata(); + auto start_time = metadata.starting_time.time_since_epoch().count(); + auto end_time = (metadata.starting_time + metadata.duration).time_since_epoch().count(); + + if (timestamp < start_time && has_next_file(true)) { + // Check back a file if the timestamp is before the beginning of the current file + load_next_file(true); + return seek(timestamp); + } else if (timestamp > end_time && has_next_file(false)) { + // Check forward a file if the timestamp is after the end of the current file + load_next_file(false); + return seek(timestamp); + } else { + // The timestamp lies in the range of this file, or there are no files left to go to + storage_->seek(timestamp); + } + return; } -bool SequentialReader::has_next_file() const +bool SequentialReader::has_next_file(bool reverse) const { - if (read_order_.reverse) { + if (reverse) { return current_file_iterator_ != file_paths_.begin(); } else { return (current_file_iterator_ + 1) != file_paths_.end(); @@ -244,12 +254,12 @@ void SequentialReader::load_current_file() set_filter(topics_filter_); } -void SequentialReader::load_next_file() +void SequentialReader::load_next_file(bool reverse) { assert(current_file_iterator_ != file_paths_.end()); auto info = std::make_shared(); info->closed_file = get_current_file(); - if (read_order_.reverse) { + if (reverse) { current_file_iterator_--; } else { current_file_iterator_++; diff --git a/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp b/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp index 96b82e02d6..cb9332b9cf 100644 --- a/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp +++ b/rosbag2_cpp/test/rosbag2_cpp/test_multifile_reader.cpp @@ -228,9 +228,8 @@ TEST_F(MultifileReaderTest, seek_bag) { init(); reader_->open(default_storage_options_, {"", storage_serialization_format_}); - EXPECT_CALL(*storage_, has_next()).Times(3).WillRepeatedly(Return(false)); + EXPECT_CALL(*storage_, has_next()).Times(1).WillRepeatedly(Return(false)); EXPECT_CALL(*storage_, seek(_)).Times(3); - EXPECT_CALL(*storage_, set_filter(_)).Times(3); reader_->seek(9999999999999); reader_->has_next(); } diff --git a/rosbag2_storage/src/rosbag2_storage/impl/storage_factory_impl.hpp b/rosbag2_storage/src/rosbag2_storage/impl/storage_factory_impl.hpp index b9d1fe4813..85c4694689 100644 --- a/rosbag2_storage/src/rosbag2_storage/impl/storage_factory_impl.hpp +++ b/rosbag2_storage/src/rosbag2_storage/impl/storage_factory_impl.hpp @@ -116,8 +116,9 @@ get_interface_instance( registered_classes.begin(), registered_classes.end(), storage_options.storage_id); if (class_exists == registered_classes.end()) { - ROSBAG2_STORAGE_LOG_WARN_STREAM( - "No storage plugin found with id '" << storage_options.storage_id << "'."); + // This should not print a warning, because it can be used by open_read_only twice, + // legitimately expecting to fail for READ_ONLY but succeed for READ_WRITE + // The extra output is misleading to end users. return nullptr; } From 8807c4024e722bfb32f79bb6bc79eda0f4005c47 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Fri, 7 Oct 2022 10:35:19 -0700 Subject: [PATCH 2/3] Fix use of load_next_file in test Signed-off-by: Emerson Knapp --- .../test_sequential_compression_reader.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_reader.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_reader.cpp index 4f77e1dee2..a9202c8d0e 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_reader.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_reader.cpp @@ -237,7 +237,7 @@ TEST_F(SequentialCompressionReaderTest, compression_called_when_loading_split_ba std::move(metadata_io_)); compression_reader->open(storage_options_, converter_options_); - EXPECT_EQ(compression_reader->has_next_file(), true); + EXPECT_EQ(compression_reader->has_next_file(false), true); EXPECT_EQ(compression_reader->has_next(), true); // false then true compression_reader->read_next(); // calls has_next true } @@ -246,7 +246,7 @@ TEST_F(SequentialCompressionReaderTest, can_find_v4_names) { auto reader = create_reader(); reader->open(storage_options_, converter_options_); - EXPECT_TRUE(reader->has_next_file()); + EXPECT_TRUE(reader->has_next_file(false)); } TEST_F(SequentialCompressionReaderTest, throws_on_incorrect_filenames) @@ -268,7 +268,7 @@ TEST_F(SequentialCompressionReaderTest, can_find_prefixed_filenames) auto reader = create_reader(); EXPECT_NO_THROW(reader->open(storage_options_, converter_options_)); - EXPECT_TRUE(reader->has_next_file()); + EXPECT_TRUE(reader->has_next_file(false)); } TEST_F(SequentialCompressionReaderTest, can_find_prefixed_filenames_in_renamed_bag) @@ -282,7 +282,7 @@ TEST_F(SequentialCompressionReaderTest, can_find_prefixed_filenames_in_renamed_b auto reader = create_reader(); EXPECT_NO_THROW(reader->open(storage_options_, converter_options_)); - EXPECT_TRUE(reader->has_next_file()); + EXPECT_TRUE(reader->has_next_file(false)); } TEST_F(SequentialCompressionReaderTest, does_not_decompress_again_on_seek) From f597525d1e96ce9910b397422d38c5de4f4433b0 Mon Sep 17 00:00:00 2001 From: Emerson Knapp Date: Tue, 1 Nov 2022 16:36:21 -0700 Subject: [PATCH 3/3] has_prev_file and load_prev_file API instead of has_next(bool) Signed-off-by: Emerson Knapp --- .../test_sequential_compression_reader.cpp | 8 +-- .../rosbag2_cpp/readers/sequential_reader.hpp | 29 +++++++--- .../rosbag2_cpp/readers/sequential_reader.cpp | 53 ++++++++++++------- 3 files changed, 59 insertions(+), 31 deletions(-) diff --git a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_reader.cpp b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_reader.cpp index a9202c8d0e..4f77e1dee2 100644 --- a/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_reader.cpp +++ b/rosbag2_compression/test/rosbag2_compression/test_sequential_compression_reader.cpp @@ -237,7 +237,7 @@ TEST_F(SequentialCompressionReaderTest, compression_called_when_loading_split_ba std::move(metadata_io_)); compression_reader->open(storage_options_, converter_options_); - EXPECT_EQ(compression_reader->has_next_file(false), true); + EXPECT_EQ(compression_reader->has_next_file(), true); EXPECT_EQ(compression_reader->has_next(), true); // false then true compression_reader->read_next(); // calls has_next true } @@ -246,7 +246,7 @@ TEST_F(SequentialCompressionReaderTest, can_find_v4_names) { auto reader = create_reader(); reader->open(storage_options_, converter_options_); - EXPECT_TRUE(reader->has_next_file(false)); + EXPECT_TRUE(reader->has_next_file()); } TEST_F(SequentialCompressionReaderTest, throws_on_incorrect_filenames) @@ -268,7 +268,7 @@ TEST_F(SequentialCompressionReaderTest, can_find_prefixed_filenames) auto reader = create_reader(); EXPECT_NO_THROW(reader->open(storage_options_, converter_options_)); - EXPECT_TRUE(reader->has_next_file(false)); + EXPECT_TRUE(reader->has_next_file()); } TEST_F(SequentialCompressionReaderTest, can_find_prefixed_filenames_in_renamed_bag) @@ -282,7 +282,7 @@ TEST_F(SequentialCompressionReaderTest, can_find_prefixed_filenames_in_renamed_b auto reader = create_reader(); EXPECT_NO_THROW(reader->open(storage_options_, converter_options_)); - EXPECT_TRUE(reader->has_next_file(false)); + EXPECT_TRUE(reader->has_next_file()); } TEST_F(SequentialCompressionReaderTest, does_not_decompress_again_on_seek) diff --git a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp index cca7652634..a0970c710a 100644 --- a/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp +++ b/rosbag2_cpp/include/rosbag2_cpp/readers/sequential_reader.hpp @@ -91,13 +91,18 @@ class ROSBAG2_CPP_PUBLIC SequentialReader void seek(const rcutils_time_point_value_t & timestamp) override; /** - * Ask whether there is another storage file to read from the list of relative - * file paths. + * Ask whether there is another storage file to read from the list of relative file paths. * - * \param reverse Look for a previous file instead of a next file - * \return true if there are still files to read in the list + * \return true if iteration is not on the last file */ - virtual bool has_next_file(bool reverse) const; + virtual bool has_next_file() const; + + /** + * Ask whether there is a previous file to read from the list of relative file paths. + * + * \return true if iteration is not on the first file + */ + virtual bool has_prev_file() const; /** * Return the relative file path pointed to by the current file iterator. @@ -128,14 +133,24 @@ class ROSBAG2_CPP_PUBLIC SequentialReader */ virtual void load_current_file(); + /** * Increment the current file iterator to point to the next file in the list of relative file - * paths, and opens the next file by calling open_current_file() + * paths, and opens that file by calling open_current_file() * * Expected usage: * if (has_next_file()) load_next_file(); */ - virtual void load_next_file(bool reverse); + virtual void load_next_file(); + + /** + * Increment the current file iterator to point to the previous file in the list of relative file + * paths, and opens that file by calling open_current_file() + * + * Expected usage: + * if (has_prev_file()) load_prev_file(); + */ + virtual void load_prev_file(); /** * Checks if all topics in the bagfile have the same RMW serialization format. diff --git a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp index 94babad3f0..c57508562a 100644 --- a/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp +++ b/rosbag2_cpp/src/rosbag2_cpp/readers/sequential_reader.cpp @@ -143,10 +143,15 @@ bool SequentialReader::has_next() // If there's no new message, check if there's at least another file to read and update storage // to read from there. Otherwise, check if there's another message. bool current_storage_has_next = storage_->has_next(); - if (!current_storage_has_next && has_next_file(read_order_.reverse)) { - load_next_file(read_order_.reverse); - // recursively call has_next again after rollover - return has_next(); + if (!current_storage_has_next) { + if (!read_order_.reverse && has_next_file()) { + load_next_file(); + return has_next(); + } + if (read_order_.reverse && has_prev_file()) { + load_prev_file(); + return has_next(); + } } return current_storage_has_next; } @@ -210,13 +215,13 @@ void SequentialReader::seek(const rcutils_time_point_value_t & timestamp) auto start_time = metadata.starting_time.time_since_epoch().count(); auto end_time = (metadata.starting_time + metadata.duration).time_since_epoch().count(); - if (timestamp < start_time && has_next_file(true)) { + if (timestamp < start_time && has_prev_file()) { // Check back a file if the timestamp is before the beginning of the current file - load_next_file(true); + load_prev_file(); return seek(timestamp); - } else if (timestamp > end_time && has_next_file(false)) { + } else if (timestamp > end_time && has_next_file()) { // Check forward a file if the timestamp is after the end of the current file - load_next_file(false); + load_next_file(); return seek(timestamp); } else { // The timestamp lies in the range of this file, or there are no files left to go to @@ -225,13 +230,14 @@ void SequentialReader::seek(const rcutils_time_point_value_t & timestamp) return; } -bool SequentialReader::has_next_file(bool reverse) const +bool SequentialReader::has_next_file() const { - if (reverse) { - return current_file_iterator_ != file_paths_.begin(); - } else { - return (current_file_iterator_ + 1) != file_paths_.end(); - } + return (current_file_iterator_ + 1) != file_paths_.end(); +} + +bool SequentialReader::has_prev_file() const +{ + return current_file_iterator_ != file_paths_.begin(); } void SequentialReader::load_current_file() @@ -254,16 +260,23 @@ void SequentialReader::load_current_file() set_filter(topics_filter_); } -void SequentialReader::load_next_file(bool reverse) +void SequentialReader::load_next_file() { assert(current_file_iterator_ != file_paths_.end()); auto info = std::make_shared(); info->closed_file = get_current_file(); - if (reverse) { - current_file_iterator_--; - } else { - current_file_iterator_++; - } + current_file_iterator_++; + info->opened_file = get_current_file(); + load_current_file(); + callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info); +} + +void SequentialReader::load_prev_file() +{ + assert(current_file_iterator_ != file_paths_.begin()); + auto info = std::make_shared(); + info->closed_file = get_current_file(); + current_file_iterator_--; info->opened_file = get_current_file(); load_current_file(); callback_manager_.execute_callbacks(bag_events::BagEvent::READ_SPLIT, info);