Skip to content

Commit

Permalink
Merge pull request #267 from wazuh/enhancement/253-normalise-events-t…
Browse files Browse the repository at this point in the history
…o-a-common-scheme

Update queue and statless events models
  • Loading branch information
TomasTurina authored Nov 12, 2024
2 parents 4fb8806 + 38e9670 commit d1cf856
Show file tree
Hide file tree
Showing 20 changed files with 432 additions and 291 deletions.
1 change: 1 addition & 0 deletions src/agent/communicator/src/http_client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,7 @@ namespace http_client
if (!params.Body.empty())
{
req.set(boost::beast::http::field::content_type, "application/json");
req.set(boost::beast::http::field::transfer_encoding, "chunked");
req.body() = params.Body;
req.prepare_payload();
}
Expand Down
22 changes: 15 additions & 7 deletions src/agent/multitype_queue/include/imultitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,31 +51,39 @@ class IMultiTypeQueue
* @brief Retrieves the next message from the queue.
*
* @param type The type of the queue to use as the source.
* @param module The name of the module requesting the message.
* @param moduleName The name of the module requesting the message.
* @param moduleType The type of the module requesting the messages.
* @return Message The next message from the queue.
*/
virtual Message getNext(MessageType type, const std::string module = "") = 0;
virtual Message getNext(MessageType type, const std::string moduleName = "", const std::string moduleType = "") = 0;

/**
* @brief Retrieves the next message from the queue asynchronously.
*
* @param type The type of the queue to use as the source.
* @param moduleName The name of the module requesting the message.
* @param messageQuantity The quantity of messages to return.
* @param moduleName The name of the module requesting the message.
* @param moduleType The type of the module requesting the messages.
* @return boost::asio::awaitable<Message> Awaitable object representing the next message.
*/
virtual boost::asio::awaitable<Message>
getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = "") = 0;
virtual boost::asio::awaitable<Message> getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;

/**
* @brief Retrieves the next N messages from the queue.
*
* @param type The type of the queue to use as the source.
* @param moduleName The name of the module requesting the messages.
* @param messageQuantity The quantity of messages to return.
* @param moduleName The name of the module requesting the messages.
* @param moduleType The type of the module requesting the messages.
* @return std::vector<Message> A vector of messages fetched from the queue.
*/
virtual std::vector<Message> getNextN(MessageType type, int messageQuantity, const std::string moduleName = "") = 0;
virtual std::vector<Message> getNextN(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") = 0;

/**
* @brief Deletes a message from the queue.
Expand Down
12 changes: 9 additions & 3 deletions src/agent/multitype_queue/include/message.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@ enum class MessageType
};

/**
* @brief Wrapper for Message, contains the message type, the json data and the module name.
* @brief Wrapper for Message, contains the message type, the json data, the
* module name, the module type and the metadata.
*
*/
class Message
Expand All @@ -25,17 +26,22 @@ class Message
MessageType type;
nlohmann::json data;
std::string moduleName;
std::string moduleType;
std::string metaData;

Message(MessageType t, nlohmann::json d, std::string mN = "")
Message(MessageType t, nlohmann::json d, std::string mN = "", std::string mT = "", std::string mD = "")
: type(t)
, data(d)
, moduleName(mN)
, moduleType(mT)
, metaData(mD)
{
}

// Define equality operator
bool operator==(const Message& other) const
{
return type == other.type && data == other.data;
return type == other.type && data == other.data && moduleName == other.moduleName &&
moduleType == other.moduleType && metaData == other.metaData;
}
};
13 changes: 9 additions & 4 deletions src/agent/multitype_queue/include/multitype_queue.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -92,18 +92,23 @@ class MultiTypeQueue : public IMultiTypeQueue
/**
* @copydoc IMultiTypeQueue::getNext(MessageType, const std::string)
*/
Message getNext(MessageType type, const std::string module = "") override;
Message getNext(MessageType type, const std::string moduleName = "", const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::getNextNAwaitable(MessageType, int, const std::string)
*/
boost::asio::awaitable<Message>
getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName = "") override;
boost::asio::awaitable<Message> getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::getNextN(MessageType, int, const std::string)
*/
std::vector<Message> getNextN(MessageType type, int messageQuantity, const std::string moduleName = "") override;
std::vector<Message> getNextN(MessageType type,
int messageQuantity,
const std::string moduleName = "",
const std::string moduleType = "") override;

/**
* @copydoc IMultiTypeQueue::pop(MessageType, const std::string)
Expand Down
23 changes: 17 additions & 6 deletions src/agent/multitype_queue/include/persistence.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,31 +24,42 @@ class Persistence
* @param message The JSON message to be stored.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @param moduleType The type of the module.
* @param metadata The metadata message to store.
* @return int The number of messages stored.
*/
virtual int
Store(const nlohmann::json& message, const std::string& queueName, const std::string& moduleName = "") = 0;

virtual int Store(const nlohmann::json& message,
const std::string& tableName,
const std::string& moduleName = "",
const std::string& moduleType = "",
const std::string& metadata = "") = 0;
/**
* @brief Retrieve a JSON message from the specified queue.
*
* @param id rowid of the message to be retrieved.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @param moduleType The type of the module.
* @return nlohmann::json The retrieved JSON message.
*/
virtual nlohmann::json Retrieve(int id, const std::string& queueName, const std::string& moduleName = "") = 0;
virtual nlohmann::json Retrieve(int id,
const std::string& queueName,
const std::string& moduleName = "",
const std::string& moduleType = "") = 0;

/**
* @brief Retrieve multiple JSON messages from the specified queue.
*
* @param n number of messages to be retrieved.
* @param queueName The name of the queue.
* @param moduleName The name of the module.
* @param moduleType The type of the module.
* @return nlohmann::json The retrieved JSON messages.
*/
virtual nlohmann::json
RetrieveMultiple(int n, const std::string& queueName, const std::string& moduleName = "") = 0;
virtual nlohmann::json RetrieveMultiple(int n,
const std::string& queueName,
const std::string& moduleName = "",
const std::string& moduleType = "") = 0;

/**
* @brief Remove a JSON message from the specified queue.
Expand Down
70 changes: 39 additions & 31 deletions src/agent/multitype_queue/src/multitype_queue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -53,15 +53,19 @@ int MultiTypeQueue::push(Message message, bool shouldWait)
{
for (const auto& singleMessageData : messageData)
{
result += m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName);
result += m_persistenceDest->Store(
singleMessageData, sMessageType, message.moduleName, message.moduleType, message.metaData);
m_cv.notify_all();
}
}
}
else
{
result =
m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName);
result = m_persistenceDest->Store(message.data,
m_mapMessageTypeName.at(message.type),
message.moduleName,
message.moduleType,
message.metaData);
m_cv.notify_all();
}
}
Expand Down Expand Up @@ -99,15 +103,19 @@ boost::asio::awaitable<int> MultiTypeQueue::pushAwaitable(Message message)
{
for (const auto& singleMessageData : messageData)
{
result += m_persistenceDest->Store(singleMessageData, sMessageType, message.moduleName);
result += m_persistenceDest->Store(
singleMessageData, sMessageType, message.moduleName, message.moduleType, message.metaData);
m_cv.notify_all();
}
}
}
else
{
result =
m_persistenceDest->Store(message.data, m_mapMessageTypeName.at(message.type), message.moduleName);
result = m_persistenceDest->Store(message.data,
m_mapMessageTypeName.at(message.type),
message.moduleName,
message.moduleType,
message.metaData);
m_cv.notify_all();
}
}
Expand All @@ -129,19 +137,18 @@ int MultiTypeQueue::push(std::vector<Message> messages)
return result;
}

Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName)
Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName, const std::string moduleType)
{
Message result(type, "{}"_json, moduleName);
Message result(type, "{}"_json, moduleName, moduleType, "");
if (m_mapMessageTypeName.contains(type))
{
auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName);
auto resultData = m_persistenceDest->RetrieveMultiple(1, m_mapMessageTypeName.at(type), moduleName, moduleType);
if (!resultData.empty())
{
result.data = resultData;
if (moduleName.empty())
{
result.moduleName = result.data.at(0).at("module");
}
result.data = resultData[0]["data"];
result.metaData = resultData[0]["metadata"];
result.moduleName = resultData[0]["moduleName"];
result.moduleType = resultData[0]["moduleType"];
}
}
else
Expand All @@ -152,12 +159,14 @@ Message MultiTypeQueue::getNext(MessageType type, const std::string moduleName)
return result;
}

boost::asio::awaitable<Message>
MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const std::string moduleName)
boost::asio::awaitable<Message> MultiTypeQueue::getNextNAwaitable(MessageType type,
int messageQuantity,
const std::string moduleName,
const std::string moduleType)
{
boost::asio::steady_timer timer(co_await boost::asio::this_coro::executor);

Message result(type, "{}"_json, moduleName);
Message result(type, "{}"_json, moduleName, moduleType, "");
if (m_mapMessageTypeName.contains(type))
{
while (isEmpty(type))
Expand All @@ -167,14 +176,14 @@ MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const s
}

auto resultData =
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName);
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName, moduleType);

if (!resultData.empty())
{
result.data = resultData;
if (moduleName.empty())
{
result.moduleName = result.data.at(0).at("module");
}
result.data = resultData[0]["data"];
result.metaData = resultData[0]["metadata"];
result.moduleName = resultData[0]["moduleName"];
result.moduleType = resultData[0]["moduleType"];
}
}
else
Expand All @@ -185,21 +194,20 @@ MultiTypeQueue::getNextNAwaitable(MessageType type, int messageQuantity, const s
co_return result;
}

std::vector<Message> MultiTypeQueue::getNextN(MessageType type, int messageQuantity, const std::string moduleName)
std::vector<Message> MultiTypeQueue::getNextN(MessageType type,
int messageQuantity,
const std::string moduleName,
const std::string moduleType)
{
std::vector<Message> result;
if (m_mapMessageTypeName.contains(type))
{
auto arrayData =
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName);
m_persistenceDest->RetrieveMultiple(messageQuantity, m_mapMessageTypeName.at(type), moduleName, moduleType);
for (auto singleJson : arrayData)
{
auto finalModuleName = moduleName;
if (moduleName.empty())
{
finalModuleName = singleJson["module"];
}
result.emplace_back(type, singleJson, finalModuleName);
result.emplace_back(
type, singleJson["data"], singleJson["moduleName"], singleJson["moduleType"], singleJson["metadata"]);
}
}
else
Expand Down
Loading

0 comments on commit d1cf856

Please sign in to comment.