Skip to content

Commit

Permalink
Add update_metadata(BagMetadata) for storage plugins interface
Browse files Browse the repository at this point in the history
- We need update_metadata(BagMetadata) API to be able to save
BagMetadata inside each bag file, directly in SQLite DB or MCAP file.
- update_metadata(BagMetadata) calling ones after creating storage
object and second time just before destructing it.

Signed-off-by: Michael Orlov <[email protected]>
  • Loading branch information
morlov-apexai committed Nov 6, 2022
1 parent 8ebf406 commit 77662e5
Show file tree
Hide file tree
Showing 8 changed files with 52 additions and 15 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
MOCK_METHOD2(
open,
void(const rosbag2_storage::StorageOptions &, rosbag2_storage::storage_interfaces::IOFlag));
MOCK_METHOD1(update_metadata, void(const rosbag2_storage::BagMetadata &));
MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &));
Expand Down
4 changes: 4 additions & 0 deletions rosbag2_cpp/src/rosbag2_cpp/writers/sequential_writer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ void SequentialWriter::open(
}

init_metadata();
storage_->update_metadata(metadata_);
}

void SequentialWriter::close()
Expand All @@ -154,6 +155,7 @@ void SequentialWriter::close()

if (!base_folder_.empty()) {
finalize_metadata();
storage_->update_metadata(metadata_);
metadata_io_->write_metadata(base_folder_, metadata_);
}

Expand Down Expand Up @@ -242,10 +244,12 @@ void SequentialWriter::switch_to_next_storage()
message_cache_->log_dropped();
}

storage_->update_metadata(metadata_);
storage_options_.uri = format_storage_uri(
base_folder_,
metadata_.relative_file_paths.size());
storage_ = storage_factory_->open_read_write(storage_options_);
storage_->update_metadata(metadata_);

if (!storage_) {
std::stringstream errmsg;
Expand Down
1 change: 1 addition & 0 deletions rosbag2_cpp/test/rosbag2_cpp/mock_storage.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ class MockStorage : public rosbag2_storage::storage_interfaces::ReadWriteInterfa
MOCK_METHOD2(
open,
void(const rosbag2_storage::StorageOptions &, rosbag2_storage::storage_interfaces::IOFlag));
MOCK_METHOD1(update_metadata, void(const rosbag2_storage::BagMetadata &));
MOCK_METHOD1(create_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(remove_topic, void(const rosbag2_storage::TopicMetadata &));
MOCK_METHOD1(set_read_order, void(const rosbag2_storage::ReadOrder &));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,8 @@ class ROSBAG2_STORAGE_PUBLIC BaseWriteInterface

virtual void write(const std::vector<std::shared_ptr<const SerializedBagMessage>> & msg) = 0;

virtual void update_metadata(const BagMetadata & bag_metadata) = 0;

virtual void create_topic(const TopicMetadata & topic) = 0;

virtual void remove_topic(const TopicMetadata & topic) = 0;
Expand Down
6 changes: 6 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,12 @@ void TestPlugin::open(
std::cout << "config file uri: " << storage_options.storage_config_uri << ".\n";
}

void TestPlugin::update_metadata(const rosbag2_storage::BagMetadata & metadata)
{
std::cout << "Set metadata" << std::endl;
(void)metadata;
}

void TestPlugin::set_read_order(const rosbag2_storage::ReadOrder & order)
{
std::cout << "Set read order " << order.sort_by << " " << order.reverse << std::endl;
Expand Down
2 changes: 2 additions & 0 deletions rosbag2_storage/test/rosbag2_storage/test_plugin.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ class TestPlugin : public rosbag2_storage::storage_interfaces::ReadWriteInterfac
const rosbag2_storage::StorageOptions & storage_options,
rosbag2_storage::storage_interfaces::IOFlag flag) override;

void update_metadata(const rosbag2_storage::BagMetadata & metadata) override;

void create_topic(const rosbag2_storage::TopicMetadata & topic) override;

void remove_topic(const rosbag2_storage::TopicMetadata & topic) override;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include "rosbag2_storage/serialized_bag_message.hpp"
#include "rosbag2_storage/storage_filter.hpp"
#include "rosbag2_storage/topic_metadata.hpp"
#include "rosbag2_storage/bag_metadata.hpp"
#include "rosbag2_storage_sqlite3/sqlite_wrapper.hpp"
#include "rosbag2_storage_sqlite3/visibility_control.hpp"

Expand Down Expand Up @@ -54,6 +55,8 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
rosbag2_storage::storage_interfaces::IOFlag io_flag =
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override;

void update_metadata(const rosbag2_storage::BagMetadata & metadata) override;

void remove_topic(const rosbag2_storage::TopicMetadata & topic) override;

void create_topic(const rosbag2_storage::TopicMetadata & topic) override;
Expand Down Expand Up @@ -105,6 +108,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage

private:
void initialize();
void read_metadata();
void prepare_for_writing();
void prepare_for_reading();
void fill_topics_and_types();
Expand Down Expand Up @@ -132,11 +136,14 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
int seek_row_id_ = 0;
rosbag2_storage::ReadOrder read_order_{};
rosbag2_storage::StorageFilter storage_filter_ {};
rosbag2_storage::storage_interfaces::IOFlag storage_mode_{
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE};

// This mutex is necessary to protect:
// a) database access (this could also be done with FULLMUTEX), but see b)
// b) topics_ collection - since we could be writing and reading it at the same time
std::mutex database_write_mutex_;
rosbag2_storage::BagMetadata metadata_{};
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,7 @@ void SqliteStorage::open(
const rosbag2_storage::StorageOptions & storage_options,
rosbag2_storage::storage_interfaces::IOFlag io_flag)
{
storage_mode_ = io_flag;
const auto preset = parse_preset_profile(storage_options.storage_preset_profile);
auto pragmas = parse_pragmas(storage_options.storage_config_uri, io_flag);
if (preset == PresetProfile::Resilient && is_read_write(io_flag)) {
Expand Down Expand Up @@ -213,6 +214,8 @@ void SqliteStorage::open(
// initialize only for READ_WRITE since the DB is already initialized if in APPEND.
if (is_read_write(io_flag)) {
initialize();
} else {
read_metadata();
}

// Reset the read and write statements in case the database changed.
Expand All @@ -224,6 +227,12 @@ void SqliteStorage::open(
"Opened database '" << relative_path_ << "' for " << to_string(io_flag) << ".");
}

void SqliteStorage::update_metadata(const rosbag2_storage::BagMetadata & metadata)
{
metadata_ = metadata;
// TODO(morlov:) update BagMetadata in DB
}

void SqliteStorage::activate_transaction()
{
if (active_transaction_) {
Expand Down Expand Up @@ -511,14 +520,13 @@ uint64_t SqliteStorage::get_minimum_split_file_size() const
return MIN_SPLIT_FILE_SIZE;
}

rosbag2_storage::BagMetadata SqliteStorage::get_metadata()
void SqliteStorage::read_metadata()
{
rosbag2_storage::BagMetadata metadata;
metadata.storage_identifier = get_storage_identifier();
metadata.relative_file_paths = {get_relative_file_path()};
metadata_.storage_identifier = get_storage_identifier();
metadata_.relative_file_paths = {get_relative_file_path()};

metadata.message_count = 0;
metadata.topics_with_message_count = {};
metadata_.message_count = 0;
metadata_.topics_with_message_count = {};

rcutils_time_point_value_t min_time = INT64_MAX;
rcutils_time_point_value_t max_time = 0;
Expand All @@ -536,13 +544,13 @@ rosbag2_storage::BagMetadata SqliteStorage::get_metadata()
rcutils_time_point_value_t, std::string>();

for (auto result : query_results) {
metadata.topics_with_message_count.push_back(
metadata_.topics_with_message_count.push_back(
{
{std::get<0>(result), std::get<1>(result), std::get<2>(result), std::get<6>(result)},
static_cast<size_t>(std::get<3>(result))
});

metadata.message_count += std::get<3>(result);
metadata_.message_count += std::get<3>(result);
min_time = std::get<4>(result) < min_time ? std::get<4>(result) : min_time;
max_time = std::get<5>(result) > max_time ? std::get<5>(result) : max_time;
}
Expand All @@ -558,29 +566,35 @@ rosbag2_storage::BagMetadata SqliteStorage::get_metadata()
rcutils_time_point_value_t>();

for (auto result : query_results) {
metadata.topics_with_message_count.push_back(
metadata_.topics_with_message_count.push_back(
{
{std::get<0>(result), std::get<1>(result), std::get<2>(result), ""},
static_cast<size_t>(std::get<3>(result))
});

metadata.message_count += std::get<3>(result);
metadata_.message_count += std::get<3>(result);
min_time = std::get<4>(result) < min_time ? std::get<4>(result) : min_time;
max_time = std::get<5>(result) > max_time ? std::get<5>(result) : max_time;
}
}

if (metadata.message_count == 0) {
if (metadata_.message_count == 0) {
min_time = 0;
max_time = 0;
}

metadata.starting_time =
metadata_.starting_time =
std::chrono::time_point<std::chrono::high_resolution_clock>(std::chrono::nanoseconds(min_time));
metadata.duration = std::chrono::nanoseconds(max_time) - std::chrono::nanoseconds(min_time);
metadata.bag_size = get_bagfile_size();
metadata_.duration = std::chrono::nanoseconds(max_time) - std::chrono::nanoseconds(min_time);
metadata_.bag_size = get_bagfile_size();
}

return metadata;
rosbag2_storage::BagMetadata SqliteStorage::get_metadata()
{
if (storage_mode_ != rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY) {
read_metadata();
}
return metadata_;
}

void SqliteStorage::set_filter(
Expand Down

0 comments on commit 77662e5

Please sign in to comment.