Skip to content

Commit

Permalink
Identify topics in SQL by the topic_name, type_name pair
Browse files Browse the repository at this point in the history
Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate committed Jun 27, 2024
1 parent d6dc634 commit f8e0431
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 85 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include <map>
#include <set>
#include <string>
#include <utility>

#include <sqlite3.h>

Expand Down Expand Up @@ -113,36 +114,11 @@ class SqlReaderParticipant : public BaseReaderParticipant
const std::vector<std::string>& bind_values,
const std::function<void(sqlite3_stmt*)>& process_row);

/**
* @brief Get the \c DdsTopic for \c topic_name.
*
* If the topic is in the cache, return it.
* If not, find the topic's type in the database, create a topic, and store it in the cache.
*
* @param topic_name: Name of the topic to be found.
* @return \c DdsTopic instance for \c topic_name.
*/
ddspipe::core::types::DdsTopic find_topic_(
const std::string& topic_name);

/**
* @brief Find the information related to \c topic_name.
*
* @param topic_name: Name of the topic.
* @param type_name: Type of the topic.
* @param is_ros2_type: Whether the topic is a ROS 2 type.
*/
void find_topic_info_(
const std::string& topic_name,
std::string& type_name,
std::string& topic_qos,
bool& is_ros2_type);

// Database
sqlite3* database_;

// Topics cache
std::map<std::string, ddspipe::core::types::DdsTopic> topics_;
// Link a topic name and a type name to a DdsTopic instance
std::map<std::pair<std::string, std::string>, ddspipe::core::types::DdsTopic> topics_;
};

} /* namespace participants */
Expand Down
17 changes: 10 additions & 7 deletions ddsrecorder_participants/src/cpp/recorder/sql/SqlWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -108,10 +108,11 @@ void SqlWriter::open_new_file_nts_(
const std::string create_topics_table{
R"(
CREATE TABLE IF NOT EXISTS Topics (
name TEXT PRIMARY KEY NOT NULL,
name TEXT NOT NULL,
type TEXT NOT NULL,
qos TEXT NOT NULL,
is_ros2_topic TEXT NOT NULL,
PRIMARY KEY(name, type),
FOREIGN KEY(type) REFERENCES Types(name)
);
)"};
Expand All @@ -126,10 +127,11 @@ void SqlWriter::open_new_file_nts_(
data BLOB NOT NULL,
data_size INTEGER NOT NULL,
topic TEXT NOT NULL,
type TEXT NOT NULL,
key TEXT NOT NULL,
log_time DATETIME NOT NULL,
publish_time DATETIME NOT NULL,
FOREIGN KEY(topic) REFERENCES Topics(name)
FOREIGN KEY(topic, type) REFERENCES Topics(name, type)
);
)"};

Expand Down Expand Up @@ -208,8 +210,8 @@ void SqlWriter::write_nts_(

// Define the SQL statement
const char* insert_statement = R"(
INSERT INTO Messages (data, data_size, topic, key, log_time, publish_time)
VALUES (?, ?, ?, ?, ?, ?);
INSERT INTO Messages (data, data_size, topic, type, key, log_time, publish_time)
VALUES (?, ?, ?, ?, ?, ?, ?);
)";

// Prepare the SQL statement
Expand All @@ -230,9 +232,10 @@ void SqlWriter::write_nts_(
sqlite3_bind_blob(statement, 1, msg.get_data(), msg.get_data_size(), SQLITE_TRANSIENT);
sqlite3_bind_int64(statement, 2, msg.get_data_size());
sqlite3_bind_text(statement, 3, msg.topic.topic_name().c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 4, msg.key.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 5, to_sql_timestamp(msg.log_time).c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 6, to_sql_timestamp(msg.publish_time).c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 4, msg.topic.type_name.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 5, msg.key.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 6, to_sql_timestamp(msg.log_time).c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 7, to_sql_timestamp(msg.publish_time).c_str(), -1, SQLITE_TRANSIENT);

// Execute the SQL statement
const auto step_ret = sqlite3_step(statement);
Expand Down
79 changes: 28 additions & 51 deletions ddsrecorder_participants/src/cpp/replayer/SqlReaderParticipant.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include <cstring>
#include <map>
#include <string>
#include <utility>

#include <sqlite3.h>

Expand Down Expand Up @@ -73,7 +74,16 @@ void SqlReaderParticipant::process_summary(
topic->topic_qos.set_qos(topic_qos, utils::FuzzyLevelValues::fuzzy_level_fuzzy);

// Store the topic in the cache
topics_[topic_name] = *topic;
const auto topic_id = std::make_pair(topic_name, type_name);

if (topics_.find(topic_id) != topics_.end())
{
logWarning(DDSREPLAYER_MCAP_READER_PARTICIPANT,
"Topic " << topic_name << " with type " << type_name << " already exists. Skipping...");
return;
}

topics_[topic_id] = *topic;

// Store the topic in the set
topics.insert(topic);
Expand Down Expand Up @@ -119,7 +129,7 @@ void SqlReaderParticipant::process_messages()
utils::the_end_of_time());

exec_sql_statement_(
"SELECT log_time, topic, data, data_size FROM Messages "
"SELECT log_time, topic, type, data, data_size FROM Messages "
"WHERE log_time >= ? AND log_time <= ? "
"ORDER BY log_time;",
{begin_time, end_time},
Expand All @@ -132,7 +142,20 @@ void SqlReaderParticipant::process_messages()

// Create a DdsTopic to publish the message
const std::string topic_name = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1));
const auto topic = find_topic_(topic_name);
const std::string type_name = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 2));

const auto topic_id = std::make_pair(topic_name, type_name);

// Find the topic
if (topics_.find(topic_id) == topics_.end())
{
logError(DDSREPLAYER_MCAP_READER_PARTICIPANT,
"Failed to find topic " << topic_name << " with type " << type_name << ". "
"Did you process the summary before the messages? Skipping...");
return;
}

const auto topic = topics_[topic_id];

// Find the reader for the topic
if (readers_.find(topic) == readers_.end())
Expand All @@ -152,8 +175,8 @@ void SqlReaderParticipant::process_messages()
std::chrono::time_point_cast<utils::Timestamp::duration>(initial_timestamp + delay_ns);

// Create a RtpsPayloadData from the raw data
const auto raw_data = sqlite3_column_blob(stmt, 2);
const auto raw_data_size = sqlite3_column_int(stmt, 3);
const auto raw_data = sqlite3_column_blob(stmt, 3);
const auto raw_data_size = sqlite3_column_int(stmt, 4);
auto data = create_payload_(raw_data, raw_data_size);

// Set source timestamp
Expand Down Expand Up @@ -193,52 +216,6 @@ void SqlReaderParticipant::close_file_()
sqlite3_close(database_);
}

ddspipe::core::types::DdsTopic SqlReaderParticipant::find_topic_(
const std::string& topic_name)
{
// Check if the topic is in the cache
if (topics_.find(topic_name) != topics_.end())
{
return topics_[topic_name];
}

std::string type_name;
std::string topic_qos;
bool is_ros2_topic;

find_topic_info_(topic_name, type_name, topic_qos, is_ros2_topic);

// Create the DdsTopic
const auto topic = create_topic_(topic_name, type_name, is_ros2_topic);

// Store the topic in the cache
topics_[topic_name] = topic;

return topic;
}

void SqlReaderParticipant::find_topic_info_(
const std::string& topic_name,
std::string& type_name,
std::string& topic_qos,
bool& is_ros2_topic)
{
exec_sql_statement_("SELECT type, topic_qos, is_ros2_topic FROM Topics WHERE name = ?;", {topic_name}, [&](sqlite3_stmt* stmt)
{
if (!type_name.empty())
{
const std::string error_msg = utils::Formatter() << "Multiple types found for topic " << topic_name;

logError(DDSREPLAYER_SQL_READER_PARTICIPANT, "FAIL_SQL_READ | " << error_msg);
throw std::runtime_error(error_msg);
}

type_name = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 0));
topic_qos = reinterpret_cast<const char*>(sqlite3_column_text(stmt, 1));
is_ros2_topic = sqlite3_column_int(stmt, 2) == 0;
});
}

void SqlReaderParticipant::exec_sql_statement_(
const std::string& statement,
const std::vector<std::string>& bind_values,
Expand Down

0 comments on commit f8e0431

Please sign in to comment.