From 9271cfe940909a4556762961de2c399c1101aa3d Mon Sep 17 00:00:00 2001 From: tempate Date: Fri, 14 Jun 2024 11:13:40 +0200 Subject: [PATCH] Identify messages by their writer GUID and sequence number Signed-off-by: tempate --- .../recorder/message/SqlMessage.hpp | 8 ++++++ .../src/cpp/recorder/message/SqlMessage.cpp | 2 ++ .../src/cpp/recorder/sql/SqlWriter.cpp | 27 ++++++++++++------- 3 files changed, 27 insertions(+), 10 deletions(-) diff --git a/ddsrecorder_participants/include/ddsrecorder_participants/recorder/message/SqlMessage.hpp b/ddsrecorder_participants/include/ddsrecorder_participants/recorder/message/SqlMessage.hpp index 9d2ae82fd..b89c23f00 100644 --- a/ddsrecorder_participants/include/ddsrecorder_participants/recorder/message/SqlMessage.hpp +++ b/ddsrecorder_participants/include/ddsrecorder_participants/recorder/message/SqlMessage.hpp @@ -21,10 +21,12 @@ #include #include +#include #include #include #include +#include #include #include @@ -66,6 +68,12 @@ struct SqlMessage : public BaseMessage void set_key( fastrtps::types::DynamicType_ptr dynamic_type); + // Writer GUID + ddspipe::core::types::Guid writer_guid; + + // Sequence number + fastrtps::rtps::SequenceNumber_t sequence_number; + // Hashed value identifying the instance ddspipe::core::types::InstanceHandle instance_handle; diff --git a/ddsrecorder_participants/src/cpp/recorder/message/SqlMessage.cpp b/ddsrecorder_participants/src/cpp/recorder/message/SqlMessage.cpp index 9fff3bda9..9c4a019f6 100644 --- a/ddsrecorder_participants/src/cpp/recorder/message/SqlMessage.cpp +++ b/ddsrecorder_participants/src/cpp/recorder/message/SqlMessage.cpp @@ -42,6 +42,8 @@ SqlMessage::SqlMessage( const bool log_publish_time, const std::string& key /* = "" */) : BaseMessage(data, payload_pool, topic, log_publish_time) + , writer_guid(data.sample_identity.writer_guid()) + , sequence_number(data.sample_identity.sequence_number()) , instance_handle(data.instanceHandle) , key(key) { diff --git a/ddsrecorder_participants/src/cpp/recorder/sql/SqlWriter.cpp b/ddsrecorder_participants/src/cpp/recorder/sql/SqlWriter.cpp index 8836b5fc5..41ca4c5a1 100644 --- a/ddsrecorder_participants/src/cpp/recorder/sql/SqlWriter.cpp +++ b/ddsrecorder_participants/src/cpp/recorder/sql/SqlWriter.cpp @@ -123,7 +123,8 @@ void SqlWriter::open_new_file_nts_( const std::string create_messages_table{ R"( CREATE TABLE IF NOT EXISTS Messages ( - id INTEGER PRIMARY KEY AUTOINCREMENT, + writer_guid TEXT NOT NULL, + sequence_number INTEGER NOT NULL, data BLOB NOT NULL, data_size INTEGER NOT NULL, topic TEXT NOT NULL, @@ -131,6 +132,7 @@ void SqlWriter::open_new_file_nts_( key TEXT NOT NULL, log_time DATETIME NOT NULL, publish_time DATETIME NOT NULL, + PRIMARY KEY(writer_guid, sequence_number), FOREIGN KEY(topic, type) REFERENCES Topics(name, type) ); )"}; @@ -210,8 +212,8 @@ void SqlWriter::write_nts_( // Define the SQL statement const char* insert_statement = R"( - INSERT INTO Messages (data, data_size, topic, type, key, log_time, publish_time) - VALUES (?, ?, ?, ?, ?, ?, ?); + INSERT INTO Messages (writer_guid, sequence_number, data, data_size, topic, type, key, log_time, publish_time) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?); )"; // Prepare the SQL statement @@ -229,13 +231,18 @@ void SqlWriter::write_nts_( } // Bind the SqlMessage to the SQL statement - sqlite3_bind_blob(statement, 1, msg.get_data(), msg.get_data_size(), SQLITE_TRANSIENT); - sqlite3_bind_int64(statement, 2, msg.get_data_size()); - sqlite3_bind_text(statement, 3, msg.topic.topic_name().c_str(), -1, SQLITE_TRANSIENT); - sqlite3_bind_text(statement, 4, msg.topic.type_name.c_str(), -1, SQLITE_TRANSIENT); - sqlite3_bind_text(statement, 5, msg.key.c_str(), -1, SQLITE_TRANSIENT); - sqlite3_bind_text(statement, 6, to_sql_timestamp(msg.log_time).c_str(), -1, SQLITE_TRANSIENT); - sqlite3_bind_text(statement, 7, to_sql_timestamp(msg.publish_time).c_str(), -1, SQLITE_TRANSIENT); + std::ostringstream writer_guid_ss; + writer_guid_ss << msg.writer_guid; + + sqlite3_bind_text(statement, 1, writer_guid_ss.str().c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_int64(statement, 2, msg.sequence_number.to64long()); + sqlite3_bind_blob(statement, 3, msg.get_data(), msg.get_data_size(), SQLITE_TRANSIENT); + sqlite3_bind_int64(statement, 4, msg.get_data_size()); + sqlite3_bind_text(statement, 5, msg.topic.topic_name().c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text(statement, 6, msg.topic.type_name.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text(statement, 7, msg.key.c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text(statement, 8, to_sql_timestamp(msg.log_time).c_str(), -1, SQLITE_TRANSIENT); + sqlite3_bind_text(statement, 9, to_sql_timestamp(msg.publish_time).c_str(), -1, SQLITE_TRANSIENT); // Execute the SQL statement const auto step_ret = sqlite3_step(statement);