Skip to content

Commit

Permalink
Identify messages by their writer GUID and sequence number
Browse files Browse the repository at this point in the history
Signed-off-by: tempate <[email protected]>
  • Loading branch information
Tempate committed Jun 27, 2024
1 parent f965d89 commit 9271cfe
Show file tree
Hide file tree
Showing 3 changed files with 27 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
#include <memory>
#include <string>

#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastrtps/types/DynamicType.h>

#include <ddspipe_core/efficiency/payload/PayloadPool.hpp>
#include <ddspipe_core/types/data/RtpsPayloadData.hpp>
#include <ddspipe_core/types/dds/Guid.hpp>
#include <ddspipe_core/types/dds/Payload.hpp>
#include <ddspipe_core/types/topic/dds/DdsTopic.hpp>

Expand Down Expand Up @@ -66,6 +68,12 @@ struct SqlMessage : public BaseMessage
void set_key(
fastrtps::types::DynamicType_ptr dynamic_type);

// Writer GUID
ddspipe::core::types::Guid writer_guid;

// Sequence number
fastrtps::rtps::SequenceNumber_t sequence_number;

// Hashed value identifying the instance
ddspipe::core::types::InstanceHandle instance_handle;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ SqlMessage::SqlMessage(
const bool log_publish_time,
const std::string& key /* = "" */)
: BaseMessage(data, payload_pool, topic, log_publish_time)
, writer_guid(data.sample_identity.writer_guid())
, sequence_number(data.sample_identity.sequence_number())
, instance_handle(data.instanceHandle)
, key(key)
{
Expand Down
27 changes: 17 additions & 10 deletions ddsrecorder_participants/src/cpp/recorder/sql/SqlWriter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,14 +123,16 @@ void SqlWriter::open_new_file_nts_(
const std::string create_messages_table{
R"(
CREATE TABLE IF NOT EXISTS Messages (
id INTEGER PRIMARY KEY AUTOINCREMENT,
writer_guid TEXT NOT NULL,
sequence_number INTEGER NOT NULL,
data BLOB NOT NULL,
data_size INTEGER NOT NULL,
topic TEXT NOT NULL,
type TEXT NOT NULL,
key TEXT NOT NULL,
log_time DATETIME NOT NULL,
publish_time DATETIME NOT NULL,
PRIMARY KEY(writer_guid, sequence_number),
FOREIGN KEY(topic, type) REFERENCES Topics(name, type)
);
)"};
Expand Down Expand Up @@ -210,8 +212,8 @@ void SqlWriter::write_nts_(

// Define the SQL statement
const char* insert_statement = R"(
INSERT INTO Messages (data, data_size, topic, type, key, log_time, publish_time)
VALUES (?, ?, ?, ?, ?, ?, ?);
INSERT INTO Messages (writer_guid, sequence_number, data, data_size, topic, type, key, log_time, publish_time)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?);
)";

// Prepare the SQL statement
Expand All @@ -229,13 +231,18 @@ void SqlWriter::write_nts_(
}

// Bind the SqlMessage to the SQL statement
sqlite3_bind_blob(statement, 1, msg.get_data(), msg.get_data_size(), SQLITE_TRANSIENT);
sqlite3_bind_int64(statement, 2, msg.get_data_size());
sqlite3_bind_text(statement, 3, msg.topic.topic_name().c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 4, msg.topic.type_name.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 5, msg.key.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 6, to_sql_timestamp(msg.log_time).c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 7, to_sql_timestamp(msg.publish_time).c_str(), -1, SQLITE_TRANSIENT);
std::ostringstream writer_guid_ss;
writer_guid_ss << msg.writer_guid;

sqlite3_bind_text(statement, 1, writer_guid_ss.str().c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_int64(statement, 2, msg.sequence_number.to64long());
sqlite3_bind_blob(statement, 3, msg.get_data(), msg.get_data_size(), SQLITE_TRANSIENT);
sqlite3_bind_int64(statement, 4, msg.get_data_size());
sqlite3_bind_text(statement, 5, msg.topic.topic_name().c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 6, msg.topic.type_name.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 7, msg.key.c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 8, to_sql_timestamp(msg.log_time).c_str(), -1, SQLITE_TRANSIENT);
sqlite3_bind_text(statement, 9, to_sql_timestamp(msg.publish_time).c_str(), -1, SQLITE_TRANSIENT);

// Execute the SQL statement
const auto step_ret = sqlite3_step(statement);
Expand Down

0 comments on commit 9271cfe

Please sign in to comment.