Skip to content

Commit

Permalink
[backport 2.1] fix PreservedDirDepth not working with polling and wil…
Browse files Browse the repository at this point in the history
…dcard path (#1887)

* fix PreservedDirDepth not working with polling and wildcard path

* fix memleak

* add UT and fix register
  • Loading branch information
yyuuttaaoo authored Nov 25, 2024
1 parent 48efbbb commit a76787f
Show file tree
Hide file tree
Showing 22 changed files with 713 additions and 240 deletions.
14 changes: 11 additions & 3 deletions core/checkpoint/CheckPointManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,18 @@ bool CheckPointManager::GetCheckPoint(DevInode devInode, const std::string& conf
return false;
}

void CheckPointManager::DeleteDirCheckPoint(const std::string& filename) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(filename);
if (it != mDirNameMap.end())
void CheckPointManager::DeleteDirCheckPoint(const std::string& dirname) {
std::unordered_map<std::string, DirCheckPointPtr>::iterator it = mDirNameMap.find(dirname);
if (it != mDirNameMap.end()) {
mDirNameMap.erase(it);
}
auto parentpos = dirname.find_last_of(PATH_SEPARATOR);
if (parentpos != std::string::npos) {
auto parentDirCheckpoint = mDirNameMap.find(dirname.substr(0, parentpos));
if (parentDirCheckpoint != mDirNameMap.end()) {
parentDirCheckpoint->second->mSubDir.erase(dirname);
}
}
}

bool CheckPointManager::GetDirCheckPoint(const std::string& dirname, DirCheckPointPtr& dirCheckPointPtr) {
Expand Down
2 changes: 1 addition & 1 deletion core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -126,7 +126,7 @@ class CheckPointManager {
void AddCheckPoint(CheckPoint* checkPointPtr);
void AddDirCheckPoint(const std::string& dirname);
void DeleteCheckPoint(DevInode devInode, const std::string& configName);
void DeleteDirCheckPoint(const std::string& filename);
void DeleteDirCheckPoint(const std::string& dirname);
void LoadCheckPoint();
void LoadDirCheckPoint(const Json::Value& root);
void LoadFileCheckPoint(const Json::Value& root);
Expand Down
3 changes: 3 additions & 0 deletions core/config/provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,9 @@ void CommonConfigProvider::Stop() {
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("common config provider", "stopped successfully"));
Expand Down
114 changes: 79 additions & 35 deletions core/config_manager/ConfigManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -114,10 +114,15 @@ ParseConfResult ParseConfig(const std::string& configName, Json::Value& jsonRoot

ifstream is;
is.open(fullPath.c_str());
if (!is.good()) {
if (!is) { // https://horstmann.com/cpp/pitfalls.html
return CONFIG_NOT_EXIST;
}
std::string buffer;
try {
buffer.assign(std::istreambuf_iterator<char>(is), std::istreambuf_iterator<char>());
} catch (const std::ios_base::failure& e) {
return CONFIG_NOT_EXIST;
}
std::string buffer((std::istreambuf_iterator<char>(is)), (std::istreambuf_iterator<char>()));
if (!IsValidJson(buffer.c_str(), buffer.length())) {
return CONFIG_INVALID_FORMAT;
}
Expand Down Expand Up @@ -309,9 +314,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons
if (registerStatus == GET_REGISTER_STATUS_ERROR) {
return;
}
if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) {
if (config.first->mPreservedDirDepth < 0)
RegisterDescendants(
item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
RegisterHandlersWithinDepth(item,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100
: config.first->mMaxDirSearchDepth);
}
} else {
RegisterWildcardPath(config, item, depth + 1);
Expand Down Expand Up @@ -378,9 +390,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons
if (registerStatus == GET_REGISTER_STATUS_ERROR) {
return;
}
if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) {
if (config.first->mPreservedDirDepth < 0)
RegisterDescendants(
item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
RegisterHandlersWithinDepth(
item,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
}
} else {
RegisterWildcardPath(config, item, depth + 1);
Expand Down Expand Up @@ -417,25 +436,23 @@ bool ConfigManager::RegisterHandlers(const string& basePath, const FileDiscovery
DirRegisterStatus registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(basePath);
if (registerStatus == GET_REGISTER_STATUS_ERROR)
return result;
// dir in config is valid by default, do not call pathValidator
result = EventDispatcher::GetInstance()->RegisterEventHandler(basePath.c_str(), config, mSharedHandler);
// if we come into a failure, do not try to register others, there must be something wrong!
if (!result)
return result;

if (config.first->mPreservedDirDepth < 0)
result = RegisterDescendants(
basePath, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth);
else {
// preserve_depth register
int depth = config.first->mPreservedDirDepth;
result = RegisterHandlersWithinDepth(basePath, config, depth);
result = RegisterHandlersWithinDepth(basePath,
config,
config.first->mPreservedDirDepth,
config.first->mMaxDirSearchDepth < 0 ? 100
: config.first->mMaxDirSearchDepth);
}
return result;
}

bool ConfigManager::RegisterDirectory(const std::string& source, const std::string& object) {
// TODO��A potential bug: FindBestMatch will test @object with filePattern, which has very
// TODO: A potential bug: FindBestMatch will test @object with filePattern, which has very
// low possibility to match a sub directory name, so here will return false in most cases.
// e.g.: source: /path/to/monitor, file pattern: *.log, object: subdir.
// Match(subdir, *.log) = false.
Expand All @@ -445,24 +462,30 @@ bool ConfigManager::RegisterDirectory(const std::string& source, const std::stri
return false;
}

bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth) {
bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path,
const FileDiscoveryConfig& config,
int preservedDirDepth,
int maxDepth) {
if (maxDepth < 0) {
return true;
}
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
if (depth <= 0) {
DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint) == false)
if (preservedDirDepth < 0) {
fsutil::PathStat statBuf;
if (!fsutil::PathStat::stat(path, statBuf)) {
return true;
}
int64_t sec = 0;
int64_t nsec = 0;
statBuf.GetLastWriteTime(sec, nsec);
auto curTime = time(nullptr);
if (curTime - sec > INT32_FLAG(timeout_interval)) {
return true;
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (set<string>::iterator it = subdir.begin(); it != subdir.end(); it++) {
if (EventDispatcher::GetInstance()->RegisterEventHandler((*it).c_str(), config, mSharedHandler))
RegisterHandlersWithinDepth(*it, config, depth - 1);
}
return true;
}
bool result = true;

fsutil::Dir dir(path);
if (!dir.Open()) {
Expand All @@ -476,30 +499,45 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const F
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler))) {
// break;// fail early, do not try to register others
return false;
}
if (maxDepth == 0) {
return true;
}

if (preservedDirDepth == 0) {
DirCheckPointPtr dirCheckPoint;
if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) {
// path had dircheckpoint means it was watched before, so it is valid
const set<string>& subdir = dirCheckPoint.get()->mSubDir;
for (const auto& it : subdir) {
RegisterHandlersWithinDepth(it, config, 0, maxDepth - 1);
}
return true;
}
}
fsutil::Entry ent;
while ((ent = dir.ReadNext())) {
string item = PathJoin(path, ent.Name());
if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) {
if (!(EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler))) {
// break;// fail early, do not try to register others
result = false;
} else // sub dir will not be registered if parent dir fails
RegisterHandlersWithinDepth(item, config, depth - 1);
RegisterHandlersWithinDepth(item, config, preservedDirDepth - 1, maxDepth - 1);
}
}

return result;
return true;
}

// path not terminated by '/', path already registered
bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryConfig& config, int withinDepth) {
if (withinDepth < 0) {
return true;
}
if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) {
LOG_INFO(sLogger, ("ignore path matching host path blacklist", path));
return false;
}
if (withinDepth <= 0) {
return true;
}

fsutil::Dir dir(path);
if (!dir.Open()) {
Expand All @@ -512,14 +550,20 @@ bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryC
LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err));
return false;
}
if (!EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler)) {
// break;// fail early, do not try to register others
return false;
}
if (withinDepth == 0) {
return true;
}

fsutil::Entry ent;
bool result = true;
while ((ent = dir.ReadNext())) {
string item = PathJoin(path, ent.Name());
if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) {
result = EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler);
if (result)
RegisterDescendants(item, config, withinDepth - 1);
RegisterDescendants(item, config, withinDepth - 1);
}
}
return result;
Expand Down
5 changes: 4 additions & 1 deletion core/config_manager/ConfigManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -510,7 +510,10 @@ class ConfigManager {
* @param path is the current dir that being registered
* @depth is the num of sub dir layers that should be registered
*/
bool RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth);
bool RegisterHandlersWithinDepth(const std::string& path,
const FileDiscoveryConfig& config,
int preservedDirDepth,
int maxDepth);
bool RegisterDescendants(const std::string& path, const FileDiscoveryConfig& config, int withinDepth);
// bool CheckLogType(const std::string& logTypeStr, LogType& logType);
// 废弃
Expand Down
9 changes: 1 addition & 8 deletions core/controller/EventDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -286,13 +286,6 @@ bool EventDispatcher::RegisterEventHandler(const char* path,
bool dirTimeOutFlag = config.first->IsTimeout(path);

if (!mEventListener->IsValidID(wd)) {
if (dirTimeOutFlag) {
LOG_DEBUG(
sLogger,
("Drop timeout path, source", path)("config, basepath", config.first->GetBasePath())(
"preseveDepth", config.first->mPreservedDirDepth)("maxDepth", config.first->mMaxDirSearchDepth));
return false;
}
wd = mNonInotifyWd;
if (mNonInotifyWd == INT_MIN)
mNonInotifyWd = -1;
Expand Down Expand Up @@ -904,7 +897,7 @@ void EventDispatcher::HandleTimeout() {
time_t curTime = time(NULL);
MapType<int, time_t>::Type::iterator itr = mWdUpdateTimeMap.begin();
for (; itr != mWdUpdateTimeMap.end(); ++itr) {
if (curTime - (itr->second) >= INT32_FLAG(timeout_interval)) {
if (curTime - (itr->second) > INT32_FLAG(timeout_interval)) {
// add to vector then batch process to avoid possible iterator change problem
// mHandler may remove what itr points to, thus change the layout of the map container
// what follows may not work
Expand Down
9 changes: 7 additions & 2 deletions core/dependencies.cmake
Original file line number Diff line number Diff line change
Expand Up @@ -363,8 +363,13 @@ endmacro()
# asan for debug
macro(link_asan target_name)
if(CMAKE_BUILD_TYPE MATCHES Debug)
target_compile_options(${target_name} PUBLIC -fsanitize=address)
target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan)
if (UNIX)
target_compile_options(${target_name} PUBLIC -fsanitize=address)
target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan)
elseif(MSVC)
target_compile_options(${target_name} PUBLIC /fsanitize=address)
target_link_options(${target_name} PUBLIC /fsanitize=address)
endif()
endif()
endmacro()

Expand Down
7 changes: 5 additions & 2 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -158,11 +158,13 @@ void CreateHandler::Handle(const Event& event) {
if (!config.first)
return;
else if (event.IsDir())
ConfigManager::GetInstance()->RegisterHandlersRecursively(path, config, false);
ConfigManager::GetInstance()->RegisterHandlers(path, config);
else {
// symbolic link
if (EventDispatcher::GetInstance()->IsDirRegistered(path) == PATH_INODE_NOT_REGISTERED)
if (EventDispatcher::GetInstance()->IsDirRegistered(path) == PATH_INODE_NOT_REGISTERED) {
// TODO: why not use RegisterHandlers
ConfigManager::GetInstance()->RegisterHandlersRecursively(path, config, true);
}
}
}

Expand All @@ -176,6 +178,7 @@ void TimeoutHandler::Handle(const Event& ev) {
const string& dir = ev.GetSource();
EventDispatcher::GetInstance()->UnregisterEventHandler(dir.c_str());
ConfigManager::GetInstance()->RemoveHandler(dir);
CheckPointManager::Instance()->DeleteDirCheckPoint(dir);
}


Expand Down
5 changes: 3 additions & 2 deletions core/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ using namespace std;

DEFINE_FLAG_INT32(check_symbolic_link_interval, "seconds", 120);
DEFINE_FLAG_INT32(check_base_dir_interval, "seconds", 60);
DEFINE_FLAG_INT32(check_timeout_interval, "seconds", 600);
DEFINE_FLAG_INT32(log_input_thread_wait_interval, "microseconds", 20 * 1000);
DEFINE_FLAG_INT64(read_fs_events_interval, "microseconds", 20 * 1000);
DEFINE_FLAG_INT32(check_handler_timeout_interval, "seconds", 180);
Expand Down Expand Up @@ -364,7 +365,7 @@ void* LogInput::ProcessLoop() {
int32_t prevTime = time(NULL);
mLastReadEventTime = prevTime;
int32_t curTime = prevTime;
srand(prevTime);
srand(0); // avoid random failures in unit tests
int32_t lastCheckDir = prevTime - rand() % 60;
int32_t lastCheckSymbolicLink = prevTime - rand() % 60;
time_t lastCheckHandlerTimeOut = prevTime - rand() % 60;
Expand Down Expand Up @@ -422,7 +423,7 @@ void* LogInput::ProcessLoop() {
lastCheckSymbolicLink = 0;
}

if (curTime - prevTime >= INT32_FLAG(timeout_interval)) {
if (curTime - prevTime >= INT32_FLAG(check_timeout_interval)) {
dispatcher->HandleTimeout();
prevTime = curTime;
}
Expand Down
2 changes: 1 addition & 1 deletion core/file_server/FileDiscoveryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -575,7 +575,7 @@ bool FileDiscoveryOptions::IsWildcardPathMatch(const string& path, const string&

// XXX: assume path is a subdir under mBasePath
bool FileDiscoveryOptions::IsTimeout(const string& path) const {
if (mPreservedDirDepth < 0 || mWildcardPaths.size() > 0)
if (mPreservedDirDepth < 0)
return false;

// we do not check if (path.find(mBasePath) == 0)
Expand Down
10 changes: 10 additions & 0 deletions core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,17 @@ namespace logtail {
void FileServer::Start() {
ConfigManager::GetInstance()->LoadDockerConfig();
CheckPointManager::Instance()->LoadCheckPoint();
LOG_INFO(sLogger, ("watch dirs", "start"));
auto start = GetCurrentTimeInMilliSeconds();
ConfigManager::GetInstance()->RegisterHandlers();
auto costMs = GetCurrentTimeInMilliSeconds() - start;
if (costMs >= 60 * 1000) {
LogtailAlarm::GetInstance()->SendAlarm(REGISTER_HANDLERS_TOO_SLOW_ALARM,
"Registering handlers took " + ToString(costMs) + " ms");
LOG_WARNING(sLogger, ("watch dirs", "succeeded")("costMs", costMs));
} else {
LOG_INFO(sLogger, ("watch dirs", "succeeded")("costMs", costMs));
}
LOG_INFO(sLogger, ("watch dirs", "succeeded"));
EventDispatcher::GetInstance()->AddExistedCheckPointFileEvents();
// the dump time must be reset after dir registration, since it may take long on NFS.
Expand Down
4 changes: 4 additions & 0 deletions core/monitor/LogtailAlarm.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ LogtailAlarm::LogtailAlarm() {
mMessageType[OBSERVER_RUNTIME_ALARM] = "OBSERVER_RUNTIME_ALARM";
mMessageType[OBSERVER_STOP_ALARM] = "OBSERVER_STOP_ALARM";
mMessageType[INVALID_CONTAINER_PATH_ALARM] = "INVALID_CONTAINER_PATH_ALARM";
mMessageType[REGISTER_HANDLERS_TOO_SLOW_ALARM] = "REGISTER_HANDLERS_TOO_SLOW_ALARM";
}

void LogtailAlarm::Init() {
Expand All @@ -115,6 +116,9 @@ void LogtailAlarm::Stop() {
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("alarm gathering", "stopped successfully"));
Expand Down
Loading

0 comments on commit a76787f

Please sign in to comment.