Skip to content

Commit

Permalink
Merge pull request #465 from wazuh/feat/206-implement-logcollector-ev…
Browse files Browse the repository at this point in the history
…entchannel-reader-for-windows

Implement Logcollector EventChannel Reader for Windows
  • Loading branch information
TomasTurina authored Jan 17, 2025
2 parents dc1d124 + 0709df9 commit db6679b
Show file tree
Hide file tree
Showing 20 changed files with 667 additions and 45 deletions.
2 changes: 2 additions & 0 deletions src/cmake/config.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,8 @@ set(DEFAULT_FILE_WAIT 500 CACHE STRING "Default Logcollector file reading interv

set(DEFAULT_RELOAD_INTERVAL 60000 CACHE STRING "Default Logcollector reload interval (1m)")

set(DEFAULT_CHANNEL_REFRESH_INTERVAL 5000 CACHE STRING "Default Logcollector Windows eventchannel reconnect time (5000ms)")

set(DEFAULT_INVENTORY_ENABLED true CACHE BOOL "Default inventory enabled")

set(DEFAULT_INTERVAL 3600000 CACHE STRING "Default inventory interval (1h)")
Expand Down
1 change: 1 addition & 0 deletions src/common/config/include/config.h.in
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ namespace config
constexpr auto DEFAULT_FILE_WAIT = @DEFAULT_FILE_WAIT@;
constexpr auto DEFAULT_RELOAD_INTERVAL = @DEFAULT_RELOAD_INTERVAL@;
constexpr auto DEFAULT_LOCALFILES = "/var/log/auth.log";
constexpr auto DEFAULT_CHANNEL_REFRESH_INTERVAL = @DEFAULT_CHANNEL_REFRESH_INTERVAL@;
}

namespace inventory
Expand Down
9 changes: 7 additions & 2 deletions src/modules/logcollector/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,16 @@ find_package(nlohmann_json CONFIG REQUIRED)
find_package(OpenSSL REQUIRED)
find_package(Boost REQUIRED COMPONENTS asio system)

FILE(GLOB LOGCOLLECTOR_SOURCES src/*.cpp)

if(WIN32)
FILE(GLOB WINEVT_READER_SOURCES src/winevt_reader/src/*.cpp)
list(APPEND LOGCOLLECTOR_SOURCES ${WINEVT_READER_SOURCES})
FILE(GLOB EXCLUDED_SOURCES src/*_unix.cpp)
else()
FILE(GLOB EXCLUDED_SOURCES src/*_win.cpp)
endif()

FILE(GLOB LOGCOLLECTOR_SOURCES src/*.cpp)
list(REMOVE_ITEM LOGCOLLECTOR_SOURCES ${EXCLUDED_SOURCES})

add_library(Logcollector ${LOGCOLLECTOR_SOURCES})
Expand All @@ -34,7 +37,9 @@ target_include_directories(Logcollector PUBLIC
${CMAKE_CURRENT_SOURCE_DIR}/../include
${COMMON_FOLDER}
${COMMON_FOLDER}/stringHelper/include
${COMMON_FOLDER}/timeHelper/include)
${COMMON_FOLDER}/timeHelper/include
PRIVATE
$<$<PLATFORM_ID:Windows>:${CMAKE_CURRENT_SOURCE_DIR}/src/winevt_reader/src/>)

target_link_libraries(Logcollector
PUBLIC
Expand Down
5 changes: 0 additions & 5 deletions src/modules/logcollector/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,14 +15,9 @@ logcollector:
enabled: true
file:
- location: /var/log/*.log
age: 1d
delim-regex: "\n"
use-bookmark: true
windows:
- channel: Application
query: Event[System/EventID = 4624]
use-bookmark: true
reconnect-time: 5s
journald:
- filter:
- field: "_SYSTEMD_UNIT"
Expand Down
4 changes: 4 additions & 0 deletions src/modules/logcollector/include/logcollector.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,10 @@ class Logcollector {
return s_instance;
}

/// @brief Add platform specific implementation of IReader to logcollector.
/// @param ConfigurationParser where to get parameters.
void AddPlatformSpecificReader(std::shared_ptr<const configuration::ConfigurationParser> configurationParser);

protected:
/// @brief Constructor
Logcollector() { }
Expand Down
60 changes: 41 additions & 19 deletions src/modules/logcollector/src/logcollector.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,18 +9,22 @@

#include <chrono>
#include <iomanip>
#include <map>
#include <sstream>

#include "file_reader.hpp"

using namespace logcollector;

namespace logcollector {
namespace logcollector
{
constexpr int ACTIVE_READERS_WAIT_MS = 10;
}

void Logcollector::Start() {
if (!m_enabled) {
void Logcollector::Start()
{
if (!m_enabled)
{
LogInfo("Logcollector module is disabled.");
return;
}
Expand All @@ -29,7 +33,8 @@ void Logcollector::Start() {
m_ioContext.run();
}

void Logcollector::EnqueueTask(boost::asio::awaitable<void> task) {
void Logcollector::EnqueueTask(boost::asio::awaitable<void> task)
{
// NOLINTBEGIN(cppcoreguidelines-avoid-capturing-lambda-coroutines)
boost::asio::co_spawn(
m_ioContext,
Expand All @@ -50,8 +55,10 @@ void Logcollector::EnqueueTask(boost::asio::awaitable<void> task) {
// NOLINTEND(cppcoreguidelines-avoid-capturing-lambda-coroutines)
}

void Logcollector::Setup(std::shared_ptr<const configuration::ConfigurationParser> configurationParser) {
if (!configurationParser) {
void Logcollector::Setup(std::shared_ptr<const configuration::ConfigurationParser> configurationParser)
{
if (!configurationParser)
{
LogError("Invalid Configuration Parser passed to setup, module set to disabled.");
m_enabled = false;
return;
Expand All @@ -65,39 +72,47 @@ void Logcollector::Setup(std::shared_ptr<const configuration::ConfigurationParse
}

SetupFileReader(configurationParser);

AddPlatformSpecificReader(configurationParser);
}

void Logcollector::SetupFileReader(const std::shared_ptr<const configuration::ConfigurationParser> configurationParser) {
void Logcollector::SetupFileReader(const std::shared_ptr<const configuration::ConfigurationParser> configurationParser)
{
auto fileWait = configurationParser->GetConfig<std::time_t>("logcollector", "file_wait").value_or(config::logcollector::DEFAULT_FILE_WAIT);

auto reloadInterval = configurationParser->GetConfig<std::time_t>("logcollector", "reload_interval").value_or(config::logcollector::DEFAULT_RELOAD_INTERVAL);

auto localfiles = configurationParser->GetConfig<std::vector<std::string>>("logcollector", "localfiles").value_or(std::vector<std::string>({config::logcollector::DEFAULT_LOCALFILES}));

for (auto& lf : localfiles) {
for (auto& lf : localfiles)
{
AddReader(std::make_shared<FileReader>(*this, lf, fileWait, reloadInterval));
}
}

void Logcollector::Stop() {
void Logcollector::Stop()
{
CleanAllReaders();
m_ioContext.stop();
LogInfo("Logcollector module stopped.");
}

// NOLINTBEGIN(performance-unnecessary-value-param)
Co_CommandExecutionResult Logcollector::ExecuteCommand(const std::string command,
[[maybe_unused]] const nlohmann::json parameters) {
[[maybe_unused]] const nlohmann::json parameters)
{
LogInfo("Logcollector command: ", command);
co_return module_command::CommandExecutionResult{module_command::Status::SUCCESS, "OK"};
}
// NOLINTEND(performance-unnecessary-value-param)

void Logcollector::SetPushMessageFunction(const std::function<int(Message)>& pushMessage) {
void Logcollector::SetPushMessageFunction(const std::function<int(Message)>& pushMessage)
{
m_pushMessage = pushMessage;
}

void Logcollector::SendMessage(const std::string& location, const std::string& log, const std::string& collectorType) {
void Logcollector::SendMessage(const std::string& location, const std::string& log, const std::string& collectorType)
{
auto metadata = nlohmann::json::object();
auto data = nlohmann::json::object();

Expand All @@ -117,31 +132,38 @@ void Logcollector::SendMessage(const std::string& location, const std::string& l
LogTrace("Message pushed: '{}':'{}'", location, log);
}

void Logcollector::AddReader(std::shared_ptr<IReader> reader) {
void Logcollector::AddReader(std::shared_ptr<IReader> reader)
{
m_readers.push_back(reader);
EnqueueTask(reader->Run());
}

void Logcollector::CleanAllReaders() {
for (const auto &reader : m_readers) {
void Logcollector::CleanAllReaders()
{
for (const auto &reader : m_readers)
{
reader->Stop();
}

{
std::lock_guard<std::mutex> lock(m_timersMutex);
for (const auto &timer : m_timers) {
for (const auto &timer : m_timers)
{
timer->cancel();
}
}

while (m_activeReaders) {
while (m_activeReaders)
{
std::this_thread::sleep_for(std::chrono::milliseconds(ACTIVE_READERS_WAIT_MS));
}
m_readers.clear();
}

Awaitable Logcollector::Wait(std::chrono::milliseconds ms) {
if (!m_ioContext.stopped()) {
Awaitable Logcollector::Wait(std::chrono::milliseconds ms)
{
if (!m_ioContext.stopped())
{
auto timer = boost::asio::steady_timer(m_ioContext, ms);
{
std::lock_guard<std::mutex> lock(m_timersMutex);
Expand Down
13 changes: 13 additions & 0 deletions src/modules/logcollector/src/logcollector_unix.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
#include <config.h>

#include <logcollector.hpp>

namespace logcollector
{

void Logcollector::AddPlatformSpecificReader([[maybe_unused]] std::shared_ptr<const configuration::ConfigurationParser> configurationParser)
{

}

}
29 changes: 29 additions & 0 deletions src/modules/logcollector/src/logcollector_win.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
#include "event_reader_win.hpp"

#include <config.h>
#include <timeHelper.h>

#include <chrono>
#include <map>

#include "logcollector.hpp"

namespace logcollector
{

void Logcollector::AddPlatformSpecificReader(std::shared_ptr<const configuration::ConfigurationParser> configurationParser)
{
const auto refreshInterval = configurationParser->GetConfig<time_t>("logcollector", "channel_refresh").value_or(config::logcollector::DEFAULT_CHANNEL_REFRESH_INTERVAL);

auto windowsConfig = configurationParser->GetConfig<std::vector<std::map<std::string, std::string>>>("logcollector", "windows").value_or(
std::vector<std::map<std::string, std::string>> {});

for (auto& entry : windowsConfig)
{
const auto channel = entry["channel"];
const auto query = entry["query"];
AddReader(std::make_shared<winevt::WindowsEventTracerReader>(*this, channel, query, refreshInterval));
}
}

}
1 change: 1 addition & 0 deletions src/modules/logcollector/src/reader.hpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once

#include <boost/asio/awaitable.hpp>
#include <logcollector.hpp>

namespace logcollector {

Expand Down
Loading

0 comments on commit db6679b

Please sign in to comment.