Skip to content

Commit

Permalink
Add update_metadata(BagMetadata) API for storage plugin interface (#…
Browse files Browse the repository at this point in the history
…1149)

* Add `update_metadata(BagMetadata)` for storage plugins interface

- We need update_metadata(BagMetadata) API to be able to save
BagMetadata inside each bag file, directly in SQLite DB or MCAP file.
- update_metadata(BagMetadata) calling ones after creating storage
object and second time just before destructing it.

Signed-off-by: Michael Orlov <[email protected]>

* Fix for failing unit tests

- Check for `storage_` is not nullptr since it could be called from
destructor
- Add `storage_->update_metadata(metadata_)` in
`SequentialCompressionWriter::close()`

Signed-off-by: Michael Orlov <[email protected]>

* Add unit tests for `update_metadata` for `sequential_compression_writer`

- Test that storage->update_metadata() called on writer open and during
writer destruction.
- Test that storage->update_metadata() calling with appropriate
parameters when calling `bag_split()`

Signed-off-by: Michael Orlov <[email protected]>

* Add unit tests for `update_metadata` for `sequential_writer`

- Test that storage->update_metadata() called on writer open and during
writer destruction.
- Test that storage->update_metadata() calling with appropriate
parameters when calling `bag_split()`

Signed-off-by: Michael Orlov <[email protected]>

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
morlov-apexai authored Nov 23, 2022
1 parent 9498426 commit 86efa58
Show file tree
Hide file tree
Showing 13 changed files with 230 additions and 25 deletions.
1 change: 1 addition & 0 deletions rosbag2_compression/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
<name>rosbag2_compression</name>
<version>0.18.0</version>
<description>Compression implementations for rosbag2 bags and messages.</description>
<maintainer email="[email protected]">Michael Orlov</maintainer>
<maintainer email="[email protected]">Geoffrey Biggs</maintainer>
<maintainer email="[email protected]">Michel Hidalgo</maintainer>
<maintainer email="[email protected]">Emerson Knapp</maintainer>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,9 @@ void SequentialCompressionWriter::close()
stop_compressor_threads();

finalize_metadata();
if (storage_) {
storage_->update_metadata(metadata_);
}
metadata_io_->write_metadata(base_folder_, metadata_);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
MOCK_METHOD2(
open,
void(const rosbag2_storage::StorageOptions &, rosbag2_storage::storage_interfaces::IOFlag));
MOCK_METHOD1(update_metadata, void(const rosbag2_storage::BagMetadata &));
MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,15 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
// intercept the metadata write so we can analyze it.
ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
intercepted_metadata_ = metadata;
intercepted_write_metadata_ = metadata;
});
ON_CALL(*storage_, update_metadata).WillByDefault(
[this](const rosbag2_storage::BagMetadata & metadata) {
v_intercepted_update_metadata_.emplace_back(metadata);
});
}

~SequentialCompressionWriterTest()
~SequentialCompressionWriterTest() override
{
rcpputils::fs::remove_all(tmp_dir_);
}
Expand Down Expand Up @@ -128,11 +132,12 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>

rcpputils::fs::path tmp_dir_;
rosbag2_storage::StorageOptions tmp_dir_storage_options_;
rosbag2_storage::BagMetadata intercepted_metadata_;
rosbag2_storage::BagMetadata intercepted_write_metadata_;
std::vector<rosbag2_storage::BagMetadata> v_intercepted_update_metadata_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;

std::string serialization_format_;
uint64_t fake_storage_size_;
uint64_t fake_storage_size_{};
std::string fake_storage_uri_;

const uint64_t kDefaultCompressionQueueSize = 1;
Expand Down Expand Up @@ -249,19 +254,103 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
writer_->write(message);
writer_.reset();

EXPECT_EQ(
intercepted_metadata_.relative_file_paths.size(), 3u);
EXPECT_EQ(intercepted_write_metadata_.relative_file_paths.size(), 3u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
for (const auto & path : intercepted_metadata_.relative_file_paths) {
for (const auto & path : intercepted_write_metadata_.relative_file_paths) {
std::stringstream ss;
ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor;
counter++;
EXPECT_EQ(ss.str(), path);
}
}

TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_destruction)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";

// queue size should be 0 or at least the number of remaining messages to prevent message loss
rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
};

initializeFakeFileStorage();
initializeWriter(compression_options);

EXPECT_CALL(*storage_, update_metadata(_)).Times(2);
writer_->open(tmp_dir_storage_options_);
writer_->create_topic({test_topic_name, test_topic_type, "", ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = test_topic_name;

const size_t kNumMessagesToWrite = 5;
for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(v_intercepted_update_metadata_.size(), 2u);
using rosbag2_compression::compression_mode_from_string;
auto compression_mode =
compression_mode_from_string(v_intercepted_update_metadata_[0].compression_mode);
EXPECT_EQ(compression_mode, rosbag2_compression::CompressionMode::MESSAGE);
EXPECT_EQ(v_intercepted_update_metadata_[0].message_count, 0u);
EXPECT_EQ(v_intercepted_update_metadata_[1].message_count, kNumMessagesToWrite);
}

TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";

// queue size should be 0 or at least the number of remaining messages to prevent message loss
rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
};

initializeFakeFileStorage();
initializeWriter(compression_options);

EXPECT_CALL(*storage_, update_metadata(_)).Times(4);
writer_->open(tmp_dir_storage_options_);
writer_->create_topic({test_topic_name, test_topic_type, "", ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = test_topic_name;

const size_t kNumMessagesToWrite = 5;
for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}

writer_->split_bagfile();

for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}
writer_.reset(); // reset will call writer destructor

ASSERT_EQ(v_intercepted_update_metadata_.size(), 4u);
using rosbag2_compression::compression_mode_from_string;
auto compression_mode =
compression_mode_from_string(v_intercepted_update_metadata_[0].compression_mode);
EXPECT_EQ(compression_mode, rosbag2_compression::CompressionMode::MESSAGE);
EXPECT_EQ(v_intercepted_update_metadata_[0].message_count, 0u); // On opening first bag file
EXPECT_EQ(v_intercepted_update_metadata_[1].files.size(), 1u); // On closing first bag file
EXPECT_EQ(v_intercepted_update_metadata_[2].files.size(), 1u); // On opening second bag file
EXPECT_EQ(v_intercepted_update_metadata_[3].files.size(), 2u); // On writer destruction
EXPECT_EQ(v_intercepted_update_metadata_[3].message_count, 2 * kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_sizes)
{
const std::string test_topic_name = "test_topic";
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/package.xml
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<name>rosbag2_cpp</name>
<version>0.18.0</version>
<description>C++ ROSBag2 client library</description>
<maintainer email="[email protected]">Michael Orlov</maintainer>
<maintainer email="[email protected]">Geoffrey Biggs</maintainer>
<maintainer email="[email protected]">Michel Hidalgo</maintainer>
<maintainer email="[email protected]">Emerson Knapp</maintainer>
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@ void SequentialWriter::open(
}

init_metadata();
storage_->update_metadata(metadata_);
}

void SequentialWriter::close()
Expand All @@ -158,6 +159,9 @@ void SequentialWriter::close()

if (!base_folder_.empty()) {
finalize_metadata();
if (storage_) {
storage_->update_metadata(metadata_);
}
metadata_io_->write_metadata(base_folder_, metadata_);
}

Expand Down Expand Up @@ -246,10 +250,12 @@ void SequentialWriter::switch_to_next_storage()
message_cache_->log_dropped();
}

storage_->update_metadata(metadata_);
storage_options_.uri = format_storage_uri(
base_folder_,
metadata_.relative_file_paths.size());
storage_ = storage_factory_->open_read_write(storage_options_);
storage_->update_metadata(metadata_);

if (!storage_) {
std::stringstream errmsg;
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
MOCK_METHOD2(
open,
void(const rosbag2_storage::StorageOptions &, rosbag2_storage::storage_interfaces::IOFlag));
MOCK_METHOD1(update_metadata, void(const rosbag2_storage::BagMetadata &));
MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &));
Expand Down
79 changes: 76 additions & 3 deletions rosbag2_cpp/test/rosbag2_cpp/test_sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -64,11 +64,16 @@ class SequentialWriterTest : public Test
fake_storage_uri_ = storage_options.uri;
}),
Return(storage_)));
EXPECT_CALL(
*storage_factory_, open_read_write(_)).Times(AtLeast(0));
EXPECT_CALL(*storage_factory_, open_read_write(_)).Times(AtLeast(0));

// intercept the metadata write so we can analyze it.
ON_CALL(*storage_, update_metadata).WillByDefault(
[this](const rosbag2_storage::BagMetadata & metadata) {
v_intercepted_update_metadata_.emplace_back(metadata);
});
}

~SequentialWriterTest()
~SequentialWriterTest() override
{
rcpputils::fs::path dir(storage_options_.uri);
rcpputils::fs::remove_all(dir);
Expand All @@ -83,6 +88,7 @@ class SequentialWriterTest : public Test
std::atomic<uint32_t> fake_storage_size_{0}; // Need to be atomic for cache update since it
// uses in callback from cache_consumer thread
rosbag2_storage::BagMetadata fake_metadata_;
std::vector<rosbag2_storage::BagMetadata> v_intercepted_update_metadata_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;
std::string fake_storage_uri_;
};
Expand Down Expand Up @@ -155,6 +161,73 @@ TEST_F(SequentialWriterTest, metadata_io_writes_metadata_file_in_destructor) {
writer_.reset();
}

TEST_F(SequentialWriterTest, sequantial_writer_call_metadata_update_on_open_and_destruction)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";
EXPECT_CALL(*storage_, update_metadata(_)).Times(2);

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::string rmw_format = "rmw_format";
writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({test_topic_name, test_topic_type, "", ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = test_topic_name;

const size_t kNumMessagesToWrite = 5;
for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(v_intercepted_update_metadata_.size(), 2u);
EXPECT_TRUE(v_intercepted_update_metadata_[0].compression_mode.empty());
EXPECT_EQ(v_intercepted_update_metadata_[0].message_count, 0u);
EXPECT_EQ(v_intercepted_update_metadata_[1].message_count, kNumMessagesToWrite);
}

TEST_F(SequentialWriterTest, sequantial_writer_call_metadata_update_on_bag_split)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";
EXPECT_CALL(*storage_, update_metadata(_)).Times(4);

auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
writer_ = std::make_unique<rosbag2_cpp::Writer>(std::move(sequential_writer));

std::string rmw_format = "rmw_format";
writer_->open(storage_options_, {rmw_format, rmw_format});
writer_->create_topic({test_topic_name, test_topic_type, "", ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = test_topic_name;

const size_t kNumMessagesToWrite = 5;
for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}

writer_->split_bagfile();

for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}
writer_.reset(); // reset will call writer destructor

ASSERT_EQ(v_intercepted_update_metadata_.size(), 4u);
EXPECT_TRUE(v_intercepted_update_metadata_[0].compression_mode.empty());
EXPECT_EQ(v_intercepted_update_metadata_[0].message_count, 0u); // On opening first bag file
EXPECT_EQ(v_intercepted_update_metadata_[1].files.size(), 1u); // On closing first bag file
EXPECT_EQ(v_intercepted_update_metadata_[2].files.size(), 1u); // On opening second bag file
EXPECT_EQ(v_intercepted_update_metadata_[3].files.size(), 2u); // On writer destruction
EXPECT_EQ(v_intercepted_update_metadata_[3].message_count, 2 * kNumMessagesToWrite);
}

TEST_F(SequentialWriterTest, open_throws_error_if_converter_plugin_does_not_exist) {
auto sequential_writer = std::make_unique<rosbag2_cpp::writers::SequentialWriter>(
std::move(storage_factory_), converter_factory_, std::move(metadata_io_));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class ROSBAG2_STORAGE_PUBLIC BaseWriteInterface

virtual void write(const std::vector<std::shared_ptr<const SerializedBagMessage>> & msg) = 0;

virtual void update_metadata(const BagMetadata & bag_metadata) = 0;

virtual void create_topic(const TopicMetadata & topic) = 0;

virtual void remove_topic(const TopicMetadata & topic) = 0;
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ void TestPlugin::open(
std::cout << "config file uri: " << storage_options.storage_config_uri << ".\n";
}

void TestPlugin::update_metadata(const rosbag2_storage::BagMetadata & metadata)
{
std::cout << "Set metadata" << std::endl;
(void)metadata;
}

void TestPlugin::set_read_order(const rosbag2_storage::ReadOrder & order)
{
std::cout << "Set read order " << order.sort_by << " " << order.reverse << std::endl;
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac
const rosbag2_storage::StorageOptions & storage_options,
rosbag2_storage::storage_interfaces::IOFlag flag) override;

void update_metadata(const rosbag2_storage::BagMetadata & metadata) override;

void create_topic(const rosbag2_storage::TopicMetadata & topic) override;

void remove_topic(const rosbag2_storage::TopicMetadata & topic) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/topic_metadata.hpp"
#include "rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage_sqlite3/sqlite_wrapper.hpp"
#include "rosbag2_storage_sqlite3/visibility_control.hpp"

Expand Down Expand Up @@ -54,6 +55,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
rosbag2_storage::storage_interfaces::IOFlag io_flag =
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override;

void update_metadata(const rosbag2_storage::BagMetadata & metadata) override;

void remove_topic(const rosbag2_storage::TopicMetadata & topic) override;

void create_topic(const rosbag2_storage::TopicMetadata & topic) override;
Expand Down Expand Up @@ -108,6 +111,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage

private:
void initialize();
void read_metadata();
void prepare_for_writing();
void prepare_for_reading();
void fill_topics_and_types();
Expand Down Expand Up @@ -136,6 +140,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
int seek_row_id_ = 0;
rosbag2_storage::ReadOrder read_order_{};
rosbag2_storage::StorageFilter storage_filter_ {};
rosbag2_storage::storage_interfaces::IOFlag storage_mode_{
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE};

// This mutex is necessary to protect:
// a) database access (this could also be done with FULLMUTEX), but see b)
Expand All @@ -144,6 +150,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage

const int kDBSchemaVersion_ = 3;
int db_schema_version_ = -1; // Valid version number starting from 1
rosbag2_storage::BagMetadata metadata_{};
};


Expand Down
Loading

0 comments on commit 86efa58

Please sign in to comment.