Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Store db schema version and ROS_DISTRO name in db3 files #1156

Merged
merged 4 commits into from
Nov 22, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,9 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
*/
SqliteWrapper & get_sqlite_database_wrapper();

int get_db_schema_version() const;
std::string get_recorded_ros_distro() const;

enum class PresetProfile
{
Resilient,
Expand All @@ -113,6 +116,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
void write_locked(std::shared_ptr<const rosbag2_storage::SerializedBagMessage> message)
RCPPUTILS_TSA_REQUIRES(database_write_mutex_);
int get_last_rowid();
int read_db_schema_version();

using ReadQueryResult = SqliteStatementWrapper::QueryResult<
std::shared_ptr<rcutils_uint8_array_t>, rcutils_time_point_value_t, std::string, int>;
Expand All @@ -137,6 +141,9 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteStorage
// a) database access (this could also be done with FULLMUTEX), but see b)
// b) topics_ collection - since we could be writing and reading it at the same time
std::mutex database_write_mutex_;

const int kDBSchemaVersion_ = 3;
int db_schema_version_ = -1; // Valid version number starting from 1
};


Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,7 @@ class ROSBAG2_STORAGE_DEFAULT_PLUGINS_PUBLIC SqliteWrapper
SqliteWrapper();
~SqliteWrapper();

bool table_exists(const std::string & table_name);
bool field_exists(const std::string & table_name, const std::string & field_name);
SqliteStatement prepare_statement(const std::string & query);
std::string query_pragma_value(const std::string & key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <utility>
#include <vector>

#include "rcpputils/env.hpp"
#include "rcpputils/filesystem_helper.hpp"

#include "rosbag2_storage/metadata_io.hpp"
Expand Down Expand Up @@ -212,7 +213,10 @@ void SqliteStorage::open(

// initialize only for READ_WRITE since the DB is already initialized if in APPEND.
if (is_read_write(io_flag)) {
db_schema_version_ = kDBSchemaVersion_;
initialize();
} else {
db_schema_version_ = read_db_schema_version();
}

// Reset the read and write statements in case the database changed.
Expand Down Expand Up @@ -368,21 +372,41 @@ uint64_t SqliteStorage::get_bagfile_size() const

void SqliteStorage::initialize()
{
std::string create_stmt = "CREATE TABLE topics(" \
std::string create_stmt = "CREATE TABLE schema(" \
"schema_version INTEGER PRIMARY KEY," \
"ros_distro TEXT NOT NULL);";
database_->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE TABLE metadata(" \
"id INTEGER PRIMARY KEY," \
"metadata_version INTEGER NOT NULL," \
"metadata TEXT NOT NULL);";
database_->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE TABLE topics(" \
"id INTEGER PRIMARY KEY," \
"name TEXT NOT NULL," \
"type TEXT NOT NULL," \
"serialization_format TEXT NOT NULL," \
"offered_qos_profiles TEXT NOT NULL);";
database_->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE TABLE messages(" \
"id INTEGER PRIMARY KEY," \
"topic_id INTEGER NOT NULL," \
"timestamp INTEGER NOT NULL, " \
"data BLOB NOT NULL);";
database_->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE INDEX timestamp_idx ON messages (timestamp ASC);";
database_->prepare_statement(create_stmt)->execute_and_reset();

std::string ros_distro = rcpputils::get_env_var("ROS_DISTRO");
auto insert_db_schema =
database_->prepare_statement(
"INSERT INTO schema (schema_version, ros_distro) VALUES (?, ?)");
insert_db_schema->bind(kDBSchemaVersion_, ros_distro);
insert_db_schema->execute_and_reset();
}

void SqliteStorage::create_topic(const rosbag2_storage::TopicMetadata & topic)
Expand Down Expand Up @@ -619,13 +643,49 @@ SqliteWrapper & SqliteStorage::get_sqlite_database_wrapper()
return *database_;
}

int SqliteStorage::get_db_schema_version() const
{
return db_schema_version_;
}

std::string SqliteStorage::get_recorded_ros_distro() const
{
std::string ros_distro;
if (db_schema_version_ >= 3 && database_->table_exists("schema")) {
// Read schema version
auto statement = database_->prepare_statement("SELECT ros_distro from schema;");
auto query_results = statement->execute_query<std::string>();
ros_distro = std::get<0>(*query_results.begin());
}
return ros_distro;
}

int SqliteStorage::get_last_rowid()
{
auto statement = database_->prepare_statement("SELECT max(rowid) from messages;");
auto query_results = statement->execute_query<int>();
return std::get<0>(*query_results.begin());
}

int SqliteStorage::read_db_schema_version()
{
int schema_version = -1;
if (database_->table_exists("schema")) {
// Read schema version
auto statement = database_->prepare_statement("SELECT schema_version from schema;");
auto query_results = statement->execute_query<int>();
schema_version = std::get<0>(*query_results.begin());
} else {
if (database_->field_exists("topics", "offered_qos_profiles")) {
schema_version = 2;
} else {
schema_version = 1;
}
}

return schema_version;
}

} // namespace rosbag2_storage_plugins

#include "pluginlib/class_list_macros.hpp" // NOLINT
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,14 @@ std::string SqliteWrapper::query_pragma_value(const std::string & key)
return std::get<0>(pragma_value);
}

bool SqliteWrapper::table_exists(const std::string & table_name)
{
auto query =
"SELECT count(*) FROM sqlite_master WHERE type='table' AND name='" + table_name + "';";
auto query_result = prepare_statement(query)->execute_query<int>();
return std::get<0>(*query_result.begin());
}

bool SqliteWrapper::field_exists(const std::string & table_name, const std::string & field_name)
{
auto query = "SELECT INSTR(sql, '" + field_name + "') FROM sqlite_master WHERE type='table' AND "
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,50 @@ class StorageTestFixture : public TemporaryDirectoryFixture
}
}

void create_new_db3_file_with_schema_version_2()
{
auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string();
std::string relative_path = db_file + ".db3";

// READ_WRITE requires the DB to not exist.
if (rcpputils::fs::path(relative_path).exists()) {
throw std::runtime_error(
"Failed to create bag: File '" + relative_path + "' already exists!");
}
using rosbag2_storage_plugins::SqliteWrapper;
using rosbag2_storage_plugins::SqliteException;
std::unordered_map<std::string, std::string> pragmas;

std::shared_ptr<SqliteWrapper> database;
try {
database = std::make_unique<SqliteWrapper>(
relative_path,
rosbag2_storage::storage_interfaces::IOFlag::READ_WRITE,
std::move(pragmas));
} catch (const SqliteException & e) {
throw std::runtime_error("Failed to setup storage. Error: " + std::string(e.what()));
}

// Init database
std::string create_stmt = "CREATE TABLE topics(" \
"id INTEGER PRIMARY KEY," \
"name TEXT NOT NULL," \
"type TEXT NOT NULL," \
"serialization_format TEXT NOT NULL," \
"offered_qos_profiles TEXT NOT NULL);";
database->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE TABLE messages(" \
"id INTEGER PRIMARY KEY," \
"topic_id INTEGER NOT NULL," \
"timestamp INTEGER NOT NULL, " \
"data BLOB NOT NULL);";
database->prepare_statement(create_stmt)->execute_and_reset();

create_stmt = "CREATE INDEX timestamp_idx ON messages (timestamp ASC);";
database->prepare_statement(create_stmt)->execute_and_reset();
}

std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>
read_all_messages_from_sqlite()
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
#include <utility>
#include <vector>

#include "rcpputils/env.hpp"

#include "rcpputils/filesystem_helper.hpp"

#include "rcutils/snprintf.h"
Expand Down Expand Up @@ -293,6 +295,46 @@ TEST_F(StorageTestFixture, messages_readable_for_prefoxy_db_schema) {
}
}

TEST_F(StorageTestFixture, get_db_schema_version_returns_correct_value) {
auto writable_storage = std::make_shared<rosbag2_storage_plugins::SqliteStorage>();
EXPECT_EQ(writable_storage->get_db_schema_version(), -1);

auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string();
writable_storage->open({db_file, plugin_id_});

EXPECT_GE(writable_storage->get_db_schema_version(), 3);
}

TEST_F(StorageTestFixture, get_recorded_ros_distro_returns_correct_value) {
auto writable_storage = std::make_shared<rosbag2_storage_plugins::SqliteStorage>();
EXPECT_TRUE(writable_storage->get_recorded_ros_distro().empty());

auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag").string();
writable_storage->open({db_file, plugin_id_});

std::string current_ros_distro = rcpputils::get_env_var("ROS_DISTRO");
EXPECT_EQ(writable_storage->get_recorded_ros_distro(), current_ros_distro);
}

TEST_F(StorageTestFixture, check_backward_compatibility_with_schema_version_2) {
create_new_db3_file_with_schema_version_2();
std::unique_ptr<rosbag2_storage_plugins::SqliteStorage> readable_storage =
std::make_unique<rosbag2_storage_plugins::SqliteStorage>();

EXPECT_EQ(readable_storage->get_db_schema_version(), -1);
EXPECT_TRUE(readable_storage->get_recorded_ros_distro().empty());

auto db_file = (rcpputils::fs::path(temporary_dir_path_) / "rosbag.db3").string();

readable_storage->open(
{db_file, plugin_id_},
rosbag2_storage::storage_interfaces::IOFlag::READ_ONLY);
std::vector<std::shared_ptr<rosbag2_storage::SerializedBagMessage>> read_messages;

EXPECT_EQ(readable_storage->get_db_schema_version(), 2);
EXPECT_TRUE(readable_storage->get_recorded_ros_distro().empty());
}

TEST_F(StorageTestFixture, get_metadata_returns_correct_struct_if_no_messages) {
write_messages_to_sqlite({});

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,3 +163,10 @@ TEST_F(SqliteWrapperTestFixture, field_exists) {
EXPECT_THROW(
db_.field_exists("non_existent_table", "data"), rosbag2_storage_plugins::SqliteException);
}

TEST_F(SqliteWrapperTestFixture, table_exists) {
db_.prepare_statement("CREATE TABLE test_table (timestamp INTEGER, data BLOB);")
->execute_and_reset();
EXPECT_TRUE(db_.table_exists("test_table"));
EXPECT_FALSE(db_.table_exists("non_existent_table"));
}