Skip to content

Commit

Permalink
Add unit tests for update_metadata for sequential_compression_writer
Browse files Browse the repository at this point in the history
- Test that storage->update_metadata() called on writer open and during
writer destruction.
- Test that storage->update_metadata() calling with appropriate
parameters when calling `bag_split()`

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
morlov-apexai committed Nov 7, 2022
1 parent 294db35 commit a67be0d
Showing 1 changed file with 97 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -60,11 +60,16 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>
// intercept the metadata write so we can analyze it.
ON_CALL(*metadata_io_, write_metadata).WillByDefault(
[this](const std::string &, const rosbag2_storage::BagMetadata & metadata) {
intercepted_metadata_ = metadata;
intercepted_write_metadata_ = metadata;
});
ON_CALL(*storage_, update_metadata).WillByDefault(
[this](const rosbag2_storage::BagMetadata & metadata) {
v_intercepted_update_metadata_.emplace_back(metadata);
});
// intercepted_update_metadata_1
}

~SequentialCompressionWriterTest()
~SequentialCompressionWriterTest() override
{
rcpputils::fs::remove_all(tmp_dir_);
}
Expand Down Expand Up @@ -128,11 +133,12 @@ class SequentialCompressionWriterTest : public TestWithParam<uint64_t>

rcpputils::fs::path tmp_dir_;
rosbag2_storage::StorageOptions tmp_dir_storage_options_;
rosbag2_storage::BagMetadata intercepted_metadata_;
rosbag2_storage::BagMetadata intercepted_write_metadata_;
std::vector<rosbag2_storage::BagMetadata> v_intercepted_update_metadata_;
std::unique_ptr<rosbag2_cpp::Writer> writer_;

std::string serialization_format_;
uint64_t fake_storage_size_;
uint64_t fake_storage_size_{};
std::string fake_storage_uri_;

const uint64_t kDefaultCompressionQueueSize = 1;
Expand Down Expand Up @@ -250,18 +256,103 @@ TEST_F(SequentialCompressionWriterTest, writer_creates_correct_metadata_relative
writer_.reset();

EXPECT_EQ(
intercepted_metadata_.relative_file_paths.size(), 3u);
intercepted_write_metadata_.relative_file_paths.size(), 3u);

const auto base_path = tmp_dir_storage_options_.uri;
int counter = 0;
for (const auto & path : intercepted_metadata_.relative_file_paths) {
for (const auto & path : intercepted_write_metadata_.relative_file_paths) {
std::stringstream ss;
ss << bag_name_ << "_" << counter << "." << DefaultTestCompressor;
counter++;
EXPECT_EQ(ss.str(), path);
}
}

TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_open_and_destruction)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";

// queue size should be 0 or at least the number of remaining messages to prevent message loss
rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
};

initializeFakeFileStorage();
initializeWriter(compression_options);

EXPECT_CALL(*storage_, update_metadata(_)).Times(2);
writer_->open(tmp_dir_storage_options_);
writer_->create_topic({test_topic_name, test_topic_type, "", ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = test_topic_name;

const size_t kNumMessagesToWrite = 5;
for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}
writer_.reset(); // reset will call writer destructor

EXPECT_EQ(v_intercepted_update_metadata_.size(), 2u);
using rosbag2_compression::compression_mode_from_string;
auto compression_mode =
compression_mode_from_string(v_intercepted_update_metadata_[0].compression_mode);
EXPECT_EQ(compression_mode, rosbag2_compression::CompressionMode::MESSAGE);
EXPECT_EQ(v_intercepted_update_metadata_[0].message_count, 0u);
EXPECT_EQ(v_intercepted_update_metadata_[1].message_count, kNumMessagesToWrite);
}

TEST_F(SequentialCompressionWriterTest, writer_call_metadata_update_on_bag_split)
{
const std::string test_topic_name = "test_topic";
const std::string test_topic_type = "test_msgs/BasicTypes";

// queue size should be 0 or at least the number of remaining messages to prevent message loss
rosbag2_compression::CompressionOptions compression_options {
DefaultTestCompressor,
rosbag2_compression::CompressionMode::MESSAGE,
0,
kDefaultCompressionQueueThreads
};

initializeFakeFileStorage();
initializeWriter(compression_options);

EXPECT_CALL(*storage_, update_metadata(_)).Times(4);
writer_->open(tmp_dir_storage_options_);
writer_->create_topic({test_topic_name, test_topic_type, "", ""});

auto message = std::make_shared<rosbag2_storage::SerializedBagMessage>();
message->topic_name = test_topic_name;

const size_t kNumMessagesToWrite = 5;
for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}

writer_->split_bagfile();

for (size_t i = 0; i < kNumMessagesToWrite; i++) {
writer_->write(message);
}
writer_.reset(); // reset will call writer destructor

ASSERT_EQ(v_intercepted_update_metadata_.size(), 4u);
using rosbag2_compression::compression_mode_from_string;
auto compression_mode =
compression_mode_from_string(v_intercepted_update_metadata_[0].compression_mode);
EXPECT_EQ(compression_mode, rosbag2_compression::CompressionMode::MESSAGE);
EXPECT_EQ(v_intercepted_update_metadata_[0].message_count, 0u); // On opening first bag file
EXPECT_EQ(v_intercepted_update_metadata_[1].files.size(), 1u); // On closing first bag file
EXPECT_EQ(v_intercepted_update_metadata_[2].files.size(), 1u); // On opening second bag file
EXPECT_EQ(v_intercepted_update_metadata_[3].files.size(), 2u); // On writer destruction
EXPECT_EQ(v_intercepted_update_metadata_[3].message_count, 2 * kNumMessagesToWrite);
}

TEST_P(SequentialCompressionWriterTest, writer_writes_with_compression_queue_sizes)
{
const std::string test_topic_name = "test_topic";
Expand Down

0 comments on commit a67be0d

Please sign in to comment.