Skip to content

Commit

Permalink
new storage for snapshot
Browse files Browse the repository at this point in the history
  • Loading branch information
Dmytro.Korenkov authored and Dmytro.Korenkov committed Mar 13, 2024
1 parent 7f0dd71 commit 04d7f17
Show file tree
Hide file tree
Showing 2 changed files with 240 additions and 0 deletions.
7 changes: 7 additions & 0 deletions rosbag2_storage_mcap/plugin_description.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,4 +6,11 @@
>
<description>rosbag2 storage plugin using the MCAP file format</description>
</class>
<class
name="mcap_snapshot"
type="rosbag2_storage_plugins::MCAPStorageImmediately"
base_class_type="rosbag2_storage::storage_interfaces::ReadWriteInterface"
>
<description>rosbag2 snapshot storage plugin using the MCAP file format</description>
</class>
</library>
233 changes: 233 additions & 0 deletions rosbag2_storage_mcap/src/mcap_storage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -841,12 +841,245 @@ void MCAPStorage::ensure_rosdistro_metadata_added()
has_added_ros_distro_metadata_ = true;
}

/**
* A storage implementation for the MCAP file format of snapshots.
*/
class ROSBAG2_STORAGE_MCAP_PUBLIC MCAPStorageSnapshot
: public rosbag2_storage::storage_interfaces::ReadWriteInterface
{
public:
MCAPStorageSnapshot();
~MCAPStorageSnapshot() override;
/** BaseIOInterface **/
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
void open(const rosbag2_storage::StorageOptions & storage_options,
rosbag2_storage::storage_interfaces::IOFlag io_flag =
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override;
void open(const std::string & uri, rosbag2_storage::storage_interfaces::IOFlag io_flag =
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE);
#else
void open(const std::string & uri,
rosbag2_storage::storage_interfaces::IOFlag io_flag =
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE) override;
#endif


/** BaseInfoInterface **/
rosbag2_storage::BagMetadata get_metadata() override;
std::string get_relative_file_path() const override;
uint64_t get_bagfile_size() const override;
std::string get_storage_identifier() const override;

/** BaseReadInterface **/
#ifdef ROSBAG2_STORAGE_MCAP_HAS_SET_READ_ORDER
bool set_read_order(const rosbag2_storage::ReadOrder &) override;
#endif
bool has_next() override;
std::shared_ptr<rosbag2_storage::SerializedBagMessage> read_next() override;
std::vector<rosbag2_storage::TopicMetadata> get_all_topics_and_types() override;
void get_all_message_definitions(
std::vector<rosbag2_storage::MessageDefinition> & definitions) override;

/** ReadOnlyInterface **/
void set_filter(const rosbag2_storage::StorageFilter & storage_filter) override;
void reset_filter() override;
#ifdef ROSBAG2_STORAGE_MCAP_OVERRIDE_SEEK_METHOD
void seek(const rcutils_time_point_value_t & time_stamp) override;
#else
void seek(const rcutils_time_point_value_t & time_stamp);
#endif

/** ReadWriteInterface **/
uint64_t get_minimum_split_file_size() const override;

/** BaseWriteInterface **/
void write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg) override;
void write(
const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msg) override;
void create_topic(const rosbag2_storage::TopicMetadata & topic,
const rosbag2_storage::MessageDefinition & message_definition) override;
void remove_topic(const rosbag2_storage::TopicMetadata & topic) override;
#ifdef ROSBAG2_STORAGE_MCAP_HAS_UPDATE_METADATA
void update_metadata(const rosbag2_storage::BagMetadata &) override;
#endif

private:
std::shared_ptr<MCAPStorage> storage_;
std::string uri_;
rosbag2_storage::storage_interfaces::IOFlag io_flag_ = rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE;
rosbag2_storage::BagMetadata metadata_;
std::vector<std::pair<rosbag2_storage::TopicMetadata,rosbag2_storage::MessageDefinition>> topics_;
rosbag2_storage::StorageOptions storage_options_;
uint64_t current_storage_size_;
};

MCAPStorageSnapshot::MCAPStorageSnapshot():storage_(nullptr),current_storage_size_(0)
{
}

MCAPStorageSnapshot::~MCAPStorageSnapshot()
{
}

/** BaseIOInterface **/
#ifdef ROSBAG2_STORAGE_MCAP_HAS_STORAGE_OPTIONS
void MCAPStorageSnapshot::open(const rosbag2_storage::StorageOptions & storage_options,
rosbag2_storage::storage_interfaces::IOFlag io_flag)
{
(void)io_flag;
storage_options_ = storage_options;
uri_ = storage_options.uri;
}
#endif

void MCAPStorageSnapshot::open(const std::string & uri, rosbag2_storage::storage_interfaces::IOFlag io_flag)
{
uri_ = uri;
//this class is used only for READ_WRITE
(void)io_flag;
}

/** BaseInfoInterface **/
rosbag2_storage::BagMetadata MCAPStorageSnapshot::get_metadata()
{
return metadata_;
}

std::string MCAPStorageSnapshot::get_relative_file_path() const
{
return uri_;
}

uint64_t MCAPStorageSnapshot::get_bagfile_size() const
{
return current_storage_size_;
}

std::string MCAPStorageSnapshot::get_storage_identifier() const
{
return "mcap_snapshot";
}

/** BaseReadInterface **/
#ifdef ROSBAG2_STORAGE_MCAP_HAS_SET_READ_ORDER
bool MCAPStorageSnapshot::set_read_order(const rosbag2_storage::ReadOrder & read_order)
{
(void)read_order;
return false;
}
#endif

bool MCAPStorageSnapshot::has_next()
{
return false;
}

std::shared_ptr<rosbag2_storage::SerializedBagMessage> MCAPStorageSnapshot::read_next()
{
return nullptr;
}

std::vector<rosbag2_storage::TopicMetadata> MCAPStorageSnapshot::get_all_topics_and_types()
{
return std::vector<rosbag2_storage::TopicMetadata>();
}

void MCAPStorageSnapshot::get_all_message_definitions(
std::vector<rosbag2_storage::MessageDefinition> & definitions)
{
(void)definitions;
}

/** ReadOnlyInterface **/
void MCAPStorageSnapshot::set_filter(const rosbag2_storage::StorageFilter & storage_filter)
{
(void)storage_filter;
}

void MCAPStorageSnapshot::reset_filter()
{
}

#ifdef ROSBAG2_STORAGE_MCAP_OVERRIDE_SEEK_METHOD
void MCAPStorageSnapshot::seek(const rcutils_time_point_value_t & time_stamp)
{
(void)time_stamp;
}

#else
void MCAPStorageSnapshot::seek(const rcutils_time_point_value_t & time_stamp)
{
(void)time_stamp;
}
#endif

/** ReadWriteInterface **/
uint64_t MCAPStorageSnapshot::get_minimum_split_file_size() const
{
return 1024;
}

/** BaseWriteInterface **/
void MCAPStorageSnapshot::write(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> msg)
{
(void)msg;
}

void MCAPStorageSnapshot::write(const std::vector<std::shared_ptr<const rosbag2_storage::SerializedBagMessage>> & msgs)
{
storage_ = std::make_shared<MCAPStorage>();
if (!storage_) {
RCUTILS_LOG_WARN_NAMED(LOG_NAME, "Was not able to allocate memory for MCAPStorage");
return;
}
storage_->open(storage_options_, io_flag_);
storage_->update_metadata(metadata_);
for(auto cur_pair : topics_)
{
storage_->create_topic(cur_pair.first, cur_pair.second);
}
for(auto cur_msg : msgs)
{
storage_->write(cur_msg);
}
storage_.reset();
current_storage_size_ = storage_options_.max_bagfile_size + 1;
}

void MCAPStorageSnapshot::create_topic(const rosbag2_storage::TopicMetadata & topic,
const rosbag2_storage::MessageDefinition & message_definition)
{
topics_.push_back(std::make_pair(topic,message_definition));
}

void MCAPStorageSnapshot::remove_topic(const rosbag2_storage::TopicMetadata & topic)
{
auto iter = std::remove_if(
topics_.begin(),
topics_.end(),
[topic](std::pair<rosbag2_storage::TopicMetadata, rosbag2_storage::MessageDefinition> const& data) {
if(data.first == topic) return true;
else return false;
});
topics_.erase(iter, topics_.end());
}

#ifdef ROSBAG2_STORAGE_MCAP_HAS_UPDATE_METADATA
void MCAPStorageSnapshot::update_metadata(const rosbag2_storage::BagMetadata & metadata)
{
metadata_ = metadata;
}
#endif

} // namespace rosbag2_storage_plugins

#include "pluginlib/class_list_macros.hpp" // NOLINT
PLUGINLIB_EXPORT_CLASS(rosbag2_storage_plugins::MCAPStorage,
rosbag2_storage::storage_interfaces::ReadWriteInterface)

PLUGINLIB_EXPORT_CLASS(rosbag2_storage_plugins::MCAPStorageSnapshot,
rosbag2_storage::storage_interfaces::ReadWriteInterface)

#ifdef _WIN32
#pragma warning(pop)
#endif

0 comments on commit 04d7f17

Please sign in to comment.