Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: fix PreservedDirDepth not working with polling and wildcard path #1866

Merged
merged 9 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from 7 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ _deps
core/build/
core/protobuf/config_server/*/*.pb.*
core/protobuf/*/*.pb.*
core/log_pb/*.pb.*
core/common/Version.cpp
!/Makefile
# Enterprise
Expand Down
5 changes: 3 additions & 2 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ cmake_dependent_option(ENABLE_STATIC_LINK_CRT "Build Logtail by linking CRT stat
option(WITHOUTGDB "Build Logtail without gdb")
option(WITHSPL "Build Logtail and UT with SPL" ON)
option(BUILD_LOGTAIL_UT "Build unit test for Logtail")
cmake_dependent_option(ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" ON "CMAKE_BUILD_TYPE STREQUAL Debug;NOT ANDROID" OFF)
set(PROVIDER_PATH "provider" CACHE PATH "Path to the provider module") # external provider path can be set with -DPROVIDER_PATH
set(UNITTEST_PATH "unittest" CACHE PATH "Path to the unittest module") # external unittest path can be set with -DUNITTEST_PATH

Expand Down Expand Up @@ -61,8 +62,8 @@ if (UNIX)
set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -ggdb")
set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -ggdb")
endif ()
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O1 -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O1 -fno-omit-frame-pointer")
set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0 -fno-omit-frame-pointer")
set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -fno-omit-frame-pointer")
set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -O2")
set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2")
string(REPLACE "-O3" "" CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE}")
Expand Down
19 changes: 13 additions & 6 deletions core/checkpoint/CheckPointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,10 +71,18 @@ bool CheckPointManager::GetCheckPoint(DevInode devInode, const std::string& conf
return false;
}

void CheckPointManager::DeleteDirCheckPoint(const std::string& filename) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(filename);
if (it != mDirNameMap.end())
void CheckPointManager::DeleteDirCheckPoint(const std::string& dirname) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(dirname);
if (it != mDirNameMap.end()) {
mDirNameMap.erase(it);
}
auto parentpos = dirname.find_last_of(PATH_SEPARATOR);
if (parentpos != std::string::npos) {
auto parentDirCheckpoint = mDirNameMap.find(dirname.substr(0, parentpos));
if (parentDirCheckpoint != mDirNameMap.end()) {
parentDirCheckpoint->second->mSubDir.erase(dirname);
}
}
}

bool CheckPointManager::GetDirCheckPoint(const std::string& dirname, DirCheckPointPtr& dirCheckPointPtr) {
Expand Down Expand Up @@ -123,8 +131,7 @@ void CheckPointManager::LoadCheckPoint() {
Json::Value root;
ParseConfResult cptRes = ParseConfig(AppConfig::GetInstance()->GetCheckPointFilePath(), root);
// if new checkpoint file not exist, check old checkpoint file.
if (cptRes == CONFIG_NOT_EXIST
&& AppConfig::GetInstance()->GetCheckPointFilePath() != GetCheckPointFileName()) {
if (cptRes == CONFIG_NOT_EXIST && AppConfig::GetInstance()->GetCheckPointFilePath() != GetCheckPointFileName()) {
cptRes = ParseConfig(GetCheckPointFileName(), root);
}
if (cptRes != CONFIG_OK) {
Expand Down Expand Up @@ -408,7 +415,7 @@ bool CheckPointManager::DumpCheckPointToLocal() {
result["dir_check_point"] = dirJson;
result["version"] = Json::Value(Json::UInt(INT32_FLAG(check_point_version)));
fout << result.toStyledString();
if (!fout.good()) {
if (!fout) {
LOG_ERROR(sLogger, ("dump check point to file failed", checkPointFile));
AlarmManager::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "dump check point to file failed");
fout.close();
Expand Down
2 changes: 1 addition & 1 deletion core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class CheckPointManager {
void AddCheckPoint(CheckPoint* checkPointPtr);
void AddDirCheckPoint(const std::string& dirname);
void DeleteCheckPoint(DevInode devInode, const std::string& configName);
void DeleteDirCheckPoint(const std::string& filename);
void DeleteDirCheckPoint(const std::string& dirname);
void LoadCheckPoint();
void LoadDirCheckPoint(const Json::Value& root);
void LoadFileCheckPoint(const Json::Value& root);
Expand Down
2 changes: 1 addition & 1 deletion core/common/links.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ macro(common_link target_name)
link_zlib(${target_name})
link_zstd(${target_name})
link_unwind(${target_name})
if (NOT ANDROID)
if (ENABLE_ADDRESS_SANITIZER)
link_asan(${target_name})
endif()
if (UNIX)
Expand Down
3 changes: 3 additions & 0 deletions core/config/common_provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,9 @@ void CommonConfigProvider::Stop() {
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, (sName, "stopped successfully"));
Expand Down
3 changes: 3 additions & 0 deletions core/config/common_provider/LegacyCommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ void LegacyCommonConfigProvider::Stop() {
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("legacy common config provider", "stopped successfully"));
Expand Down
4 changes: 2 additions & 2 deletions core/config/watcher/ConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,7 @@ PipelineConfigDiff ConfigWatcher::CheckConfigDiff() {
filesystem::file_time_type mTime = filesystem::last_write_time(path, ec);
if (iter == mFileInfoMap.end()) {
mFileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = make_unique<Json::Value>(new Json::Value());
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
continue;
}
Expand All @@ -106,7 +106,7 @@ PipelineConfigDiff ConfigWatcher::CheckConfigDiff() {
} else if (iter->second.first != size || iter->second.second != mTime) {
// for config currently running, we leave it untouched if new config is invalid
mFileInfoMap[filepath] = make_pair(size, mTime);
unique_ptr<Json::Value> detail = make_unique<Json::Value>(new Json::Value());
unique_ptr<Json::Value> detail = make_unique<Json::Value>();
if (!LoadConfigDetailFromFile(path, *detail)) {
continue;
}
Expand Down
5 changes: 4 additions & 1 deletion core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -392,9 +392,12 @@ endmacro()

# asan for debug
macro(link_asan target_name)
if(CMAKE_BUILD_TYPE MATCHES Debug)
if (UNIX)
target_compile_options(${target_name} PUBLIC -fsanitize=address)
target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan)
elseif(MSVC)
target_compile_options(${target_name} PUBLIC /fsanitize=address)
target_link_options(${target_name} PUBLIC /fsanitize=address)
endif()
endmacro()

Expand Down
128 changes: 86 additions & 42 deletions core/file_server/ConfigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,6 @@
#include "app_config/AppConfig.h"
#include "checkpoint/CheckPointManager.h"
#include "common/CompressTools.h"
#include "constants/Constants.h"
#include "common/ErrorUtil.h"
#include "common/ExceptionBase.h"
#include "common/FileSystemUtil.h"
Expand All @@ -46,9 +45,10 @@
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "common/version.h"
#include "constants/Constants.h"
#include "file_server/EventDispatcher.h"
#include "file_server/event_handler/EventHandler.h"
#include "file_server/FileServer.h"
#include "file_server/event_handler/EventHandler.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/AlarmManager.h"
#include "pipeline/Pipeline.h"
Expand Down Expand Up @@ -101,7 +101,7 @@ DEFINE_FLAG_INT32(docker_config_update_interval, "interval between docker config

namespace logtail {

//
//
ParseConfResult ParseConfig(const std::string& configName, Json::Value& jsonRoot) {
// Get full path, if it is a relative path, prepend process execution dir.
std::string fullPath = configName;
Expand All @@ -111,10 +111,15 @@ ParseConfResult ParseConfig(const std::string& configName, Json::Value& jsonRoot

ifstream is;
is.open(fullPath.c_str());
if (!is.good()) {
if (!is) { // https://horstmann.com/cpp/pitfalls.html
return CONFIG_NOT_EXIST;
}
std::string buffer;
try {
buffer.assign(std::istreambuf_iterator<char>(is), std::istreambuf_iterator<char>());
} catch (const std::ios_base::failure& e) {
return CONFIG_NOT_EXIST;
}
std::string buffer((std::istreambuf_iterator<char>(is)), (std::istreambuf_iterator<char>()));
if (!IsValidJson(buffer.c_str(), buffer.length())) {
return CONFIG_INVALID_FORMAT;
}
Expand Down Expand Up @@ -145,7 +150,7 @@ bool ConfigManager::RegisterHandlersRecursively(const std::string& path,
return result;

if (!config.first->IsDirectoryInBlacklist(path))
result = EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler);
result = EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler);

if (!result)
return result;
Expand Down Expand Up @@ -306,9 +311,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons
if (registerStatus == GET_REGISTER_STATUS_ERROR) {
return;
}
if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) {
if (config.first->mPreservedDirDepth < 0)
RegisterDescendants(
item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
RegisterHandlersWithinDepth(item,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100
: config.first->mMaxDirSearchDepth);
}
} else {
RegisterWildcardPath(config, item, depth + 1);
Expand Down Expand Up @@ -382,9 +394,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons
if (registerStatus == GET_REGISTER_STATUS_ERROR) {
return;
}
if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) {
if (config.first->mPreservedDirDepth < 0)
RegisterDescendants(
item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
RegisterHandlersWithinDepth(
item,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
}
} else {
RegisterWildcardPath(config, item, depth + 1);
Expand Down Expand Up @@ -421,52 +440,57 @@ bool ConfigManager::RegisterHandlers(const string& basePath, const FileDiscovery
DirRegisterStatus registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(basePath);
if (registerStatus == GET_REGISTER_STATUS_ERROR)
return result;
// dir in config is valid by default, do not call pathValidator
result = EventDispatcher::GetInstance()->RegisterEventHandler(basePath.c_str(), config, mSharedHandler);
// if we come into a failure, do not try to register others, there must be something wrong!
if (!result)
return result;

if (config.first->mPreservedDirDepth < 0)
result = RegisterDescendants(
basePath, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
int depth = config.first->mPreservedDirDepth;
result = RegisterHandlersWithinDepth(basePath, config, depth);
result = RegisterHandlersWithinDepth(basePath,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100
: config.first->mMaxDirSearchDepth);
}
return result;
}

bool ConfigManager::RegisterDirectory(const std::string& source, const std::string& object) {
// TODO��A potential bug: FindBestMatch will test @object with filePattern, which has very
// TODO: A potential bug: FindBestMatch will test @object with filePattern, which has very
// low possibility to match a sub directory name, so here will return false in most cases.
// e.g.: source: /path/to/monitor, file pattern: *.log, object: subdir.
// Match(subdir, *.log) = false.
FileDiscoveryConfig config = FindBestMatch(source, object);
if (config.first && !config.first->IsDirectoryInBlacklist(source))
return EventDispatcher::GetInstance()->RegisterEventHandler(source.c_str(), config, mSharedHandler);
if (config.first && !config.first->IsDirectoryInBlacklist(source)) {
return EventDispatcher::GetInstance()->RegisterEventHandler(source, config, mSharedHandler);
}
return false;
}

bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth) {
bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path,
const FileDiscoveryConfig& config,
int preservedDirDepth,
int maxDepth) {
if (maxDepth < 0) {
return true;
}
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
if (depth <= 0) {
DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint) == false)
if (preservedDirDepth < 0) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=0和<0需要区分,=0时总是注册baseDir,<0时必须符合不超时的条件

fsutil::PathStat statBuf;
if (!fsutil::PathStat::stat(path, statBuf)) {
return true;
}
int64_t sec = 0;
int64_t nsec = 0;
statBuf.GetLastWriteTime(sec, nsec);
auto curTime = time(nullptr);
if (curTime - sec > INT32_FLAG(timeout_interval)) {
return true;
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (set<string>::iterator it = subdir.begin(); it != subdir.end(); it++) {
if (EventDispatcher::GetInstance()->RegisterEventHandler((*it).c_str(), config, mSharedHandler))
RegisterHandlersWithinDepth(*it, config, depth - 1);
}
return true;
}
bool result = true;

fsutil::Dir dir(path);
if (!dir.Open()) {
Expand All @@ -480,30 +504,44 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const F
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler))) {
// break;// fail early, do not try to register others
return false;
}
if (maxDepth == 0) {
return true;
}

if (preservedDirDepth == 0) {
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

=0时就必须进入checkpoint恢复的注册方式,在此方式下保持PreservedDirDepth=0确保注册成功

DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) {
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (const auto& it : subdir) {
RegisterHandlersWithinDepth(it, config, 0, maxDepth - 1);
}
return true;
}
}
fsutil::Entry ent;
while ((ent = dir.ReadNext())) {
string item = PathJoin(path, ent.Name());
if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) {
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler))) {
// break;// fail early, do not try to register others
result = false;
} else // sub dir will not be registered if parent dir fails
RegisterHandlersWithinDepth(item, config, depth - 1);
RegisterHandlersWithinDepth(item, config, preservedDirDepth - 1, maxDepth - 1);
}
}

return result;
return true;
}

// path not terminated by '/', path already registered
bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryConfig& config, int withinDepth) {
if (withinDepth < 0) {
return true;
}
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
if (withinDepth <= 0) {
return true;
}

fsutil::Dir dir(path);
if (!dir.Open()) {
Expand All @@ -516,14 +554,20 @@ bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryC
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler)) {
// break;// fail early, do not try to register others
return false;
}
if (withinDepth == 0) {
return true;
}

fsutil::Entry ent;
bool result = true;
while ((ent = dir.ReadNext())) {
string item = PathJoin(path, ent.Name());
if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) {
result = EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler);
if (result)
RegisterDescendants(item, config, withinDepth - 1);
RegisterDescendants(item, config, withinDepth - 1);
}
}
return result;
Expand Down
5 changes: 4 additions & 1 deletion core/file_server/ConfigManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,10 @@ class ConfigManager {
* @param path is the current dir that being registered
* @depth is the num of sub dir layers that should be registered
*/
bool RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth);
bool RegisterHandlersWithinDepth(const std::string& path,
const FileDiscoveryConfig& config,
int preservedDirDepth,
int maxDepth);
bool RegisterDescendants(const std::string& path, const FileDiscoveryConfig& config, int withinDepth);
// bool CheckLogType(const std::string& logTypeStr, LogType& logType);
// 废弃
Expand Down
Loading
Loading