Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Update queue and statless events models #267

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions src/agent/communicator/src/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ namespace http_client
if (!params.Body.empty())
{
req.set(boost::beast::http::field::content_type, "application/json");
req.set(boost::beast::http::field::transfer_encoding, "chunked");
req.body() = params.Body;
req.prepare_payload();
}
Expand Down
22 changes: 15 additions & 7 deletions src/agent/multitype_queue/include/imultitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,39 @@ class IMultiTypeQueue
* @brief Retrieves the next message from the queue.
*
* @param type The type of the queue to use as the source.
* @param module The name of the module requesting the message.
* @param moduleName The name of the module requesting the message.
* @param moduleType The type of the module requesting the messages.
* @return Message The next message from the queue.
*/
virtual Message getNext(MessageType type, const std::string module = "") = 0;
virtual Message getNext(MessageType type, const std::string moduleName = "", const std::string moduleType = "") = 0;

/**
* @brief Retrieves the next message from the queue asynchronously.
*
* @param type The type of the queue to use as the source.
* @param moduleName The name of the module requesting the message.
* @param messageQuantity The quantity of messages to return.
* @param moduleName The name of the module requesting the message.
* @param moduleType The type of the module requesting the messages.
* @return boost::asio::awaitable<Message> Awaitable object representing the next message.
*/
virtual boost::asio::awaitable<Message>
getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = "") = 0;
virtual boost::asio::awaitable<Message> getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;

/**
* @brief Retrieves the next N messages from the queue.
*
* @param type The type of the queue to use as the source.
* @param moduleName The name of the module requesting the messages.
* @param messageQuantity The quantity of messages to return.
* @param moduleName The name of the module requesting the messages.
* @param moduleType The type of the module requesting the messages.
* @return std::vector<Message> A vector of messages fetched from the queue.
*/
virtual std::vector<Message> getNextN(MessageType type, int messageQuantity, const std::string moduleName = "") = 0;
virtual std::vector<Message> getNextN(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;

/**
* @brief Deletes a message from the queue.
Expand Down
12 changes: 9 additions & 3 deletions src/agent/multitype_queue/include/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ enum class MessageType
};

/**
* @brief Wrapper for Message, contains the message type, the json data and the module name.
* @brief Wrapper for Message, contains the message type, the json data, the
* module name, the module type and the metadata.
*
*/
class Message
Expand All @@ -25,17 +26,22 @@ class Message
MessageType type;
nlohmann::json data;
std::string moduleName;
std::string moduleType;
std::string metaData;

Message(MessageType t, nlohmann::json d, std::string mN = "")
Message(MessageType t, nlohmann::json d, std::string mN = "", std::string mT = "", std::string mD = "")
: type(t)
, data(d)
, moduleName(mN)
, moduleType(mT)
, metaData(mD)
{
}

// Define equality operator
bool operator==(const Message& other) const
{
return type == other.type && data == other.data;
return type == other.type && data == other.data && moduleName == other.moduleName &&
moduleType == other.moduleType && metaData == other.metaData;
}
};
13 changes: 9 additions & 4 deletions src/agent/multitype_queue/include/multitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,23 @@ class MultiTypeQueue : public IMultiTypeQueue
/**
* @copydoc IMultiTypeQueue::getNext(MessageType, const std::string)
*/
Message getNext(MessageType type, const std::string module = "") override;
Message getNext(MessageType type, const std::string moduleName = "", const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::getNextNAwaitable(MessageType, int, const std::string)
*/
boost::asio::awaitable<Message>
getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = "") override;
boost::asio::awaitable<Message> getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::getNextN(MessageType, int, const std::string)
*/
std::vector<Message> getNextN(MessageType type, int messageQuantity, const std::string moduleName = "") override;
std::vector<Message> getNextN(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::pop(MessageType, const std::string)
Expand Down
23 changes: 17 additions & 6 deletions src/agent/multitype_queue/include/persistence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,42 @@ class Persistence
* @param message The JSON message to be stored.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @param moduleType The type of the module.
* @param metadata The metadata message to store.
* @return int The number of messages stored.
*/
virtual int
Store(const nlohmann::json& message, const std::string& queueName, const std::string& moduleName = "") = 0;

virtual int Store(const nlohmann::json& message,
const std::string& tableName,
const std::string& moduleName = "",
const std::string& moduleType = "",
const std::string& metadata = "") = 0;
/**
* @brief Retrieve a JSON message from the specified queue.
*
* @param id rowid of the message to be retrieved.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @param moduleType The type of the module.
* @return nlohmann::json The retrieved JSON message.
*/
virtual nlohmann::json Retrieve(int id, const std::string& queueName, const std::string& moduleName = "") = 0;
virtual nlohmann::json Retrieve(int id,
const std::string& queueName,
const std::string& moduleName = "",
const std::string& moduleType = "") = 0;

/**
* @brief Retrieve multiple JSON messages from the specified queue.
*
* @param n number of messages to be retrieved.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @param moduleType The type of the module.
* @return nlohmann::json The retrieved JSON messages.
*/
virtual nlohmann::json
RetrieveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0;
virtual nlohmann::json RetrieveMultiple(int n,
const std::string& queueName,
const std::string& moduleName = "",
const std::string& moduleType = "") = 0;

/**
* @brief Remove a JSON message from the specified queue.
Expand Down
70 changes: 39 additions & 31 deletions src/agent/multitype_queue/src/multitype_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,19 @@ int MultiTypeQueue::push(Message message, bool shouldWait)
{
for (const auto& singleMessageData : messageData)
{
result += m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName);
result += m_persistenceDest->Store(
singleMessageData, sMessageType, message.moduleName, message.moduleType, message.metaData);
m_cv.notify_all();
}
}
}
else
{
result =
m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName);
result = m_persistenceDest->Store(message.data,
m_mapMessageTypeName.at(message.type),
message.moduleName,
message.moduleType,
message.metaData);
m_cv.notify_all();
}
}
Expand Down Expand Up @@ -99,15 +103,19 @@ boost::asio::awaitable<int> MultiTypeQueue::pushAwaitable(Message message)
{
for (const auto& singleMessageData : messageData)
{
result += m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName);
result += m_persistenceDest->Store(
singleMessageData, sMessageType, message.moduleName, message.moduleType, message.metaData);
m_cv.notify_all();
}
}
}
else
{
result =
m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName);
result = m_persistenceDest->Store(message.data,
m_mapMessageTypeName.at(message.type),
message.moduleName,
message.moduleType,
message.metaData);
m_cv.notify_all();
}
}
Expand All @@ -129,19 +137,18 @@ int MultiTypeQueue::push(std::vector<Message> messages)
return result;
}

Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName)
Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName, const std::string moduleType)
{
Message result(type, "{}"_json, moduleName);
Message result(type, "{}"_json, moduleName, moduleType, "");
if (m_mapMessageTypeName.contains(type))
{
auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName);
auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName, moduleType);
if (!resultData.empty())
{
result.data = resultData;
if (moduleName.empty())
{
result.moduleName = result.data.at(0).at("module");
}
result.data = resultData[0]["data"];
result.metaData = resultData[0]["metadata"];
result.moduleName = resultData[0]["moduleName"];
result.moduleType = resultData[0]["moduleType"];
}
}
else
Expand All @@ -152,12 +159,14 @@ Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName)
return result;
}

boost::asio::awaitable<Message>
MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName)
boost::asio::awaitable<Message> MultiTypeQueue::getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName,
const std::string moduleType)
{
boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor);

Message result(type, "{}"_json, moduleName);
Message result(type, "{}"_json, moduleName, moduleType, "");
if (m_mapMessageTypeName.contains(type))
{
while (isEmpty(type))
Expand All @@ -167,14 +176,14 @@ MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const s
}

auto resultData =
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName);
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName, moduleType);

if (!resultData.empty())
{
result.data = resultData;
if (moduleName.empty())
{
result.moduleName = result.data.at(0).at("module");
}
result.data = resultData[0]["data"];
result.metaData = resultData[0]["metadata"];
result.moduleName = resultData[0]["moduleName"];
result.moduleType = resultData[0]["moduleType"];
}
}
else
Expand All @@ -185,21 +194,20 @@ MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const s
co_return result;
}

std::vector<Message> MultiTypeQueue::getNextN(MessageType type, int messageQuantity, const std::string moduleName)
std::vector<Message> MultiTypeQueue::getNextN(MessageType type,
int messageQuantity,
const std::string moduleName,
const std::string moduleType)
{
std::vector<Message> result;
if (m_mapMessageTypeName.contains(type))
{
auto arrayData =
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName);
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName, moduleType);
for (auto singleJson : arrayData)
{
auto finalModuleName = moduleName;
if (moduleName.empty())
{
finalModuleName = singleJson["module"];
}
result.emplace_back(type, singleJson, finalModuleName);
result.emplace_back(
type, singleJson["data"], singleJson["moduleName"], singleJson["moduleType"], singleJson["metadata"]);
}
}
else
Expand Down
Loading
Loading