From a76787f22bcd47e67101409df0b05c3b56ec870d Mon Sep 17 00:00:00 2001 From: Tom Yu Date: Mon, 25 Nov 2024 13:42:37 +0800 Subject: [PATCH] [backport 2.1] fix PreservedDirDepth not working with polling and wildcard path (#1887) * fix PreservedDirDepth not working with polling and wildcard path * fix memleak * add UT and fix register --- core/checkpoint/CheckPointManager.cpp | 14 +- core/checkpoint/CheckPointManager.h | 2 +- core/config/provider/CommonConfigProvider.cpp | 3 + core/config_manager/ConfigManager.cpp | 114 ++++-- core/config_manager/ConfigManager.h | 5 +- core/controller/EventDispatcher.cpp | 9 +- core/dependencies.cmake | 9 +- core/event_handler/EventHandler.cpp | 7 +- core/event_handler/LogInput.cpp | 5 +- core/file_server/FileDiscoveryOptions.cpp | 2 +- core/file_server/FileServer.cpp | 10 + core/monitor/LogtailAlarm.cpp | 4 + core/monitor/LogtailAlarm.h | 3 +- core/monitor/Monitor.cpp | 3 + core/polling/PollingCache.h | 5 + core/polling/PollingDirFile.cpp | 297 +++++++++------- core/polling/PollingDirFile.h | 11 +- core/polling/PollingModify.cpp | 103 +++--- core/polling/PollingModify.h | 2 +- core/unittest/CMakeLists.txt | 2 + core/unittest/polling/CMakeLists.txt | 8 +- .../PollingPreservedDirDepthUnittest.cpp | 335 ++++++++++++++++++ 22 files changed, 713 insertions(+), 240 deletions(-) create mode 100644 core/unittest/polling/PollingPreservedDirDepthUnittest.cpp diff --git a/core/checkpoint/CheckPointManager.cpp b/core/checkpoint/CheckPointManager.cpp index e29d19b6be..0d114e8a79 100644 --- a/core/checkpoint/CheckPointManager.cpp +++ b/core/checkpoint/CheckPointManager.cpp @@ -74,10 +74,18 @@ bool CheckPointManager::GetCheckPoint(DevInode devInode, const std::string& conf return false; } -void CheckPointManager::DeleteDirCheckPoint(const std::string& filename) { - std::unordered_map::iterator it = mDirNameMap.find(filename); - if (it != mDirNameMap.end()) +void CheckPointManager::DeleteDirCheckPoint(const std::string& dirname) { + std::unordered_map::iterator it = mDirNameMap.find(dirname); + if (it != mDirNameMap.end()) { mDirNameMap.erase(it); + } + auto parentpos = dirname.find_last_of(PATH_SEPARATOR); + if (parentpos != std::string::npos) { + auto parentDirCheckpoint = mDirNameMap.find(dirname.substr(0, parentpos)); + if (parentDirCheckpoint != mDirNameMap.end()) { + parentDirCheckpoint->second->mSubDir.erase(dirname); + } + } } bool CheckPointManager::GetDirCheckPoint(const std::string& dirname, DirCheckPointPtr& dirCheckPointPtr) { diff --git a/core/checkpoint/CheckPointManager.h b/core/checkpoint/CheckPointManager.h index 21f98d174e..1365818106 100644 --- a/core/checkpoint/CheckPointManager.h +++ b/core/checkpoint/CheckPointManager.h @@ -126,7 +126,7 @@ class CheckPointManager { void AddCheckPoint(CheckPoint* checkPointPtr); void AddDirCheckPoint(const std::string& dirname); void DeleteCheckPoint(DevInode devInode, const std::string& configName); - void DeleteDirCheckPoint(const std::string& filename); + void DeleteDirCheckPoint(const std::string& dirname); void LoadCheckPoint(); void LoadDirCheckPoint(const Json::Value& root); void LoadFileCheckPoint(const Json::Value& root); diff --git a/core/config/provider/CommonConfigProvider.cpp b/core/config/provider/CommonConfigProvider.cpp index 18338ceafa..04cfccc579 100644 --- a/core/config/provider/CommonConfigProvider.cpp +++ b/core/config/provider/CommonConfigProvider.cpp @@ -87,6 +87,9 @@ void CommonConfigProvider::Stop() { mIsThreadRunning = false; } mStopCV.notify_one(); + if (!mThreadRes.valid()) { + return; + } future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("common config provider", "stopped successfully")); diff --git a/core/config_manager/ConfigManager.cpp b/core/config_manager/ConfigManager.cpp index 0c9c62b940..b7b2886438 100644 --- a/core/config_manager/ConfigManager.cpp +++ b/core/config_manager/ConfigManager.cpp @@ -114,10 +114,15 @@ ParseConfResult ParseConfig(const std::string& configName, Json::Value& jsonRoot ifstream is; is.open(fullPath.c_str()); - if (!is.good()) { + if (!is) { // https://horstmann.com/cpp/pitfalls.html + return CONFIG_NOT_EXIST; + } + std::string buffer; + try { + buffer.assign(std::istreambuf_iterator(is), std::istreambuf_iterator()); + } catch (const std::ios_base::failure& e) { return CONFIG_NOT_EXIST; } - std::string buffer((std::istreambuf_iterator(is)), (std::istreambuf_iterator())); if (!IsValidJson(buffer.c_str(), buffer.length())) { return CONFIG_INVALID_FORMAT; } @@ -309,9 +314,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons if (registerStatus == GET_REGISTER_STATUS_ERROR) { return; } - if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) { + if (config.first->mPreservedDirDepth < 0) RegisterDescendants( item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); + else { + // preserve_depth register + RegisterHandlersWithinDepth(item, + config, + config.first->mPreservedDirDepth, + config.first->mMaxDirSearchDepth < 0 ? 100 + : config.first->mMaxDirSearchDepth); } } else { RegisterWildcardPath(config, item, depth + 1); @@ -378,9 +390,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons if (registerStatus == GET_REGISTER_STATUS_ERROR) { return; } - if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) { + if (config.first->mPreservedDirDepth < 0) RegisterDescendants( item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); + else { + // preserve_depth register + RegisterHandlersWithinDepth( + item, + config, + config.first->mPreservedDirDepth, + config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); } } else { RegisterWildcardPath(config, item, depth + 1); @@ -417,25 +436,23 @@ bool ConfigManager::RegisterHandlers(const string& basePath, const FileDiscovery DirRegisterStatus registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(basePath); if (registerStatus == GET_REGISTER_STATUS_ERROR) return result; - // dir in config is valid by default, do not call pathValidator - result = EventDispatcher::GetInstance()->RegisterEventHandler(basePath.c_str(), config, mSharedHandler); - // if we come into a failure, do not try to register others, there must be something wrong! - if (!result) - return result; if (config.first->mPreservedDirDepth < 0) result = RegisterDescendants( basePath, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); else { // preserve_depth register - int depth = config.first->mPreservedDirDepth; - result = RegisterHandlersWithinDepth(basePath, config, depth); + result = RegisterHandlersWithinDepth(basePath, + config, + config.first->mPreservedDirDepth, + config.first->mMaxDirSearchDepth < 0 ? 100 + : config.first->mMaxDirSearchDepth); } return result; } bool ConfigManager::RegisterDirectory(const std::string& source, const std::string& object) { - // TODO��A potential bug: FindBestMatch will test @object with filePattern, which has very + // TODO: A potential bug: FindBestMatch will test @object with filePattern, which has very // low possibility to match a sub directory name, so here will return false in most cases. // e.g.: source: /path/to/monitor, file pattern: *.log, object: subdir. // Match(subdir, *.log) = false. @@ -445,24 +462,30 @@ bool ConfigManager::RegisterDirectory(const std::string& source, const std::stri return false; } -bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth) { +bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, + const FileDiscoveryConfig& config, + int preservedDirDepth, + int maxDepth) { + if (maxDepth < 0) { + return true; + } if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) { LOG_INFO(sLogger, ("ignore path matching host path blacklist", path)); return false; } - if (depth <= 0) { - DirCheckPointPtr dirCheckPoint; - if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint) == false) + if (preservedDirDepth < 0) { + fsutil::PathStat statBuf; + if (!fsutil::PathStat::stat(path, statBuf)) { + return true; + } + int64_t sec = 0; + int64_t nsec = 0; + statBuf.GetLastWriteTime(sec, nsec); + auto curTime = time(nullptr); + if (curTime - sec > INT32_FLAG(timeout_interval)) { return true; - // path had dircheckpoint means it was watched before, so it is valid - const set& subdir = dirCheckPoint.get()->mSubDir; - for (set::iterator it = subdir.begin(); it != subdir.end(); it++) { - if (EventDispatcher::GetInstance()->RegisterEventHandler((*it).c_str(), config, mSharedHandler)) - RegisterHandlersWithinDepth(*it, config, depth - 1); } - return true; } - bool result = true; fsutil::Dir dir(path); if (!dir.Open()) { @@ -476,30 +499,45 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const F LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err)); return false; } + if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler))) { + // break;// fail early, do not try to register others + return false; + } + if (maxDepth == 0) { + return true; + } + + if (preservedDirDepth == 0) { + DirCheckPointPtr dirCheckPoint; + if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) { + // path had dircheckpoint means it was watched before, so it is valid + const set& subdir = dirCheckPoint.get()->mSubDir; + for (const auto& it : subdir) { + RegisterHandlersWithinDepth(it, config, 0, maxDepth - 1); + } + return true; + } + } fsutil::Entry ent; while ((ent = dir.ReadNext())) { string item = PathJoin(path, ent.Name()); if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) { - if (!(EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler))) { - // break;// fail early, do not try to register others - result = false; - } else // sub dir will not be registered if parent dir fails - RegisterHandlersWithinDepth(item, config, depth - 1); + RegisterHandlersWithinDepth(item, config, preservedDirDepth - 1, maxDepth - 1); } } - return result; + return true; } // path not terminated by '/', path already registered bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryConfig& config, int withinDepth) { + if (withinDepth < 0) { + return true; + } if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) { LOG_INFO(sLogger, ("ignore path matching host path blacklist", path)); return false; } - if (withinDepth <= 0) { - return true; - } fsutil::Dir dir(path); if (!dir.Open()) { @@ -512,14 +550,20 @@ bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryC LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err)); return false; } + if (!EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler)) { + // break;// fail early, do not try to register others + return false; + } + if (withinDepth == 0) { + return true; + } + fsutil::Entry ent; bool result = true; while ((ent = dir.ReadNext())) { string item = PathJoin(path, ent.Name()); if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) { - result = EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler); - if (result) - RegisterDescendants(item, config, withinDepth - 1); + RegisterDescendants(item, config, withinDepth - 1); } } return result; diff --git a/core/config_manager/ConfigManager.h b/core/config_manager/ConfigManager.h index 3b3ec203c4..9dca1119e6 100644 --- a/core/config_manager/ConfigManager.h +++ b/core/config_manager/ConfigManager.h @@ -510,7 +510,10 @@ class ConfigManager { * @param path is the current dir that being registered * @depth is the num of sub dir layers that should be registered */ - bool RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth); + bool RegisterHandlersWithinDepth(const std::string& path, + const FileDiscoveryConfig& config, + int preservedDirDepth, + int maxDepth); bool RegisterDescendants(const std::string& path, const FileDiscoveryConfig& config, int withinDepth); // bool CheckLogType(const std::string& logTypeStr, LogType& logType); // 废弃 diff --git a/core/controller/EventDispatcher.cpp b/core/controller/EventDispatcher.cpp index 133a91dd07..6c985ec8c1 100644 --- a/core/controller/EventDispatcher.cpp +++ b/core/controller/EventDispatcher.cpp @@ -286,13 +286,6 @@ bool EventDispatcher::RegisterEventHandler(const char* path, bool dirTimeOutFlag = config.first->IsTimeout(path); if (!mEventListener->IsValidID(wd)) { - if (dirTimeOutFlag) { - LOG_DEBUG( - sLogger, - ("Drop timeout path, source", path)("config, basepath", config.first->GetBasePath())( - "preseveDepth", config.first->mPreservedDirDepth)("maxDepth", config.first->mMaxDirSearchDepth)); - return false; - } wd = mNonInotifyWd; if (mNonInotifyWd == INT_MIN) mNonInotifyWd = -1; @@ -904,7 +897,7 @@ void EventDispatcher::HandleTimeout() { time_t curTime = time(NULL); MapType::Type::iterator itr = mWdUpdateTimeMap.begin(); for (; itr != mWdUpdateTimeMap.end(); ++itr) { - if (curTime - (itr->second) >= INT32_FLAG(timeout_interval)) { + if (curTime - (itr->second) > INT32_FLAG(timeout_interval)) { // add to vector then batch process to avoid possible iterator change problem // mHandler may remove what itr points to, thus change the layout of the map container // what follows may not work diff --git a/core/dependencies.cmake b/core/dependencies.cmake index 310a2fa4cc..13f79b34a1 100644 --- a/core/dependencies.cmake +++ b/core/dependencies.cmake @@ -363,8 +363,13 @@ endmacro() # asan for debug macro(link_asan target_name) if(CMAKE_BUILD_TYPE MATCHES Debug) - target_compile_options(${target_name} PUBLIC -fsanitize=address) - target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan) + if (UNIX) + target_compile_options(${target_name} PUBLIC -fsanitize=address) + target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan) + elseif(MSVC) + target_compile_options(${target_name} PUBLIC /fsanitize=address) + target_link_options(${target_name} PUBLIC /fsanitize=address) + endif() endif() endmacro() diff --git a/core/event_handler/EventHandler.cpp b/core/event_handler/EventHandler.cpp index 8b2517a9a4..c94eb87d98 100644 --- a/core/event_handler/EventHandler.cpp +++ b/core/event_handler/EventHandler.cpp @@ -158,11 +158,13 @@ void CreateHandler::Handle(const Event& event) { if (!config.first) return; else if (event.IsDir()) - ConfigManager::GetInstance()->RegisterHandlersRecursively(path, config, false); + ConfigManager::GetInstance()->RegisterHandlers(path, config); else { // symbolic link - if (EventDispatcher::GetInstance()->IsDirRegistered(path) == PATH_INODE_NOT_REGISTERED) + if (EventDispatcher::GetInstance()->IsDirRegistered(path) == PATH_INODE_NOT_REGISTERED) { + // TODO: why not use RegisterHandlers ConfigManager::GetInstance()->RegisterHandlersRecursively(path, config, true); + } } } @@ -176,6 +178,7 @@ void TimeoutHandler::Handle(const Event& ev) { const string& dir = ev.GetSource(); EventDispatcher::GetInstance()->UnregisterEventHandler(dir.c_str()); ConfigManager::GetInstance()->RemoveHandler(dir); + CheckPointManager::Instance()->DeleteDirCheckPoint(dir); } diff --git a/core/event_handler/LogInput.cpp b/core/event_handler/LogInput.cpp index 2e1d48e482..6f53a76738 100644 --- a/core/event_handler/LogInput.cpp +++ b/core/event_handler/LogInput.cpp @@ -50,6 +50,7 @@ using namespace std; DEFINE_FLAG_INT32(check_symbolic_link_interval, "seconds", 120); DEFINE_FLAG_INT32(check_base_dir_interval, "seconds", 60); +DEFINE_FLAG_INT32(check_timeout_interval, "seconds", 600); DEFINE_FLAG_INT32(log_input_thread_wait_interval, "microseconds", 20 * 1000); DEFINE_FLAG_INT64(read_fs_events_interval, "microseconds", 20 * 1000); DEFINE_FLAG_INT32(check_handler_timeout_interval, "seconds", 180); @@ -364,7 +365,7 @@ void* LogInput::ProcessLoop() { int32_t prevTime = time(NULL); mLastReadEventTime = prevTime; int32_t curTime = prevTime; - srand(prevTime); + srand(0); // avoid random failures in unit tests int32_t lastCheckDir = prevTime - rand() % 60; int32_t lastCheckSymbolicLink = prevTime - rand() % 60; time_t lastCheckHandlerTimeOut = prevTime - rand() % 60; @@ -422,7 +423,7 @@ void* LogInput::ProcessLoop() { lastCheckSymbolicLink = 0; } - if (curTime - prevTime >= INT32_FLAG(timeout_interval)) { + if (curTime - prevTime >= INT32_FLAG(check_timeout_interval)) { dispatcher->HandleTimeout(); prevTime = curTime; } diff --git a/core/file_server/FileDiscoveryOptions.cpp b/core/file_server/FileDiscoveryOptions.cpp index 913828aedb..0c8f91b24f 100644 --- a/core/file_server/FileDiscoveryOptions.cpp +++ b/core/file_server/FileDiscoveryOptions.cpp @@ -575,7 +575,7 @@ bool FileDiscoveryOptions::IsWildcardPathMatch(const string& path, const string& // XXX: assume path is a subdir under mBasePath bool FileDiscoveryOptions::IsTimeout(const string& path) const { - if (mPreservedDirDepth < 0 || mWildcardPaths.size() > 0) + if (mPreservedDirDepth < 0) return false; // we do not check if (path.find(mBasePath) == 0) diff --git a/core/file_server/FileServer.cpp b/core/file_server/FileServer.cpp index ef9c17a350..b857124638 100644 --- a/core/file_server/FileServer.cpp +++ b/core/file_server/FileServer.cpp @@ -35,7 +35,17 @@ namespace logtail { void FileServer::Start() { ConfigManager::GetInstance()->LoadDockerConfig(); CheckPointManager::Instance()->LoadCheckPoint(); + LOG_INFO(sLogger, ("watch dirs", "start")); + auto start = GetCurrentTimeInMilliSeconds(); ConfigManager::GetInstance()->RegisterHandlers(); + auto costMs = GetCurrentTimeInMilliSeconds() - start; + if (costMs >= 60 * 1000) { + LogtailAlarm::GetInstance()->SendAlarm(REGISTER_HANDLERS_TOO_SLOW_ALARM, + "Registering handlers took " + ToString(costMs) + " ms"); + LOG_WARNING(sLogger, ("watch dirs", "succeeded")("costMs", costMs)); + } else { + LOG_INFO(sLogger, ("watch dirs", "succeeded")("costMs", costMs)); + } LOG_INFO(sLogger, ("watch dirs", "succeeded")); EventDispatcher::GetInstance()->AddExistedCheckPointFileEvents(); // the dump time must be reset after dir registration, since it may take long on NFS. diff --git a/core/monitor/LogtailAlarm.cpp b/core/monitor/LogtailAlarm.cpp index c7fb3c52d3..20f4028f35 100644 --- a/core/monitor/LogtailAlarm.cpp +++ b/core/monitor/LogtailAlarm.cpp @@ -102,6 +102,7 @@ LogtailAlarm::LogtailAlarm() { mMessageType[OBSERVER_RUNTIME_ALARM] = "OBSERVER_RUNTIME_ALARM"; mMessageType[OBSERVER_STOP_ALARM] = "OBSERVER_STOP_ALARM"; mMessageType[INVALID_CONTAINER_PATH_ALARM] = "INVALID_CONTAINER_PATH_ALARM"; + mMessageType[REGISTER_HANDLERS_TOO_SLOW_ALARM] = "REGISTER_HANDLERS_TOO_SLOW_ALARM"; } void LogtailAlarm::Init() { @@ -115,6 +116,9 @@ void LogtailAlarm::Stop() { mIsThreadRunning = false; } mStopCV.notify_one(); + if (!mThreadRes.valid()) { + return; + } future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("alarm gathering", "stopped successfully")); diff --git a/core/monitor/LogtailAlarm.h b/core/monitor/LogtailAlarm.h index c2e653f229..f2bdb8b09c 100644 --- a/core/monitor/LogtailAlarm.h +++ b/core/monitor/LogtailAlarm.h @@ -97,7 +97,8 @@ enum LogtailAlarmType { OBSERVER_RUNTIME_ALARM = 62, OBSERVER_STOP_ALARM = 63, INVALID_CONTAINER_PATH_ALARM = 64, - ALL_LOGTAIL_ALARM_NUM = 65, + REGISTER_HANDLERS_TOO_SLOW_ALARM = 65, + ALL_LOGTAIL_ALARM_NUM = 66, }; struct LogtailAlarmMessage { diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index 25e86bfa5f..e5724e12bd 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -131,6 +131,9 @@ void LogtailMonitor::Stop() { mIsThreadRunning = false; } mStopCV.notify_one(); + if (!mThreadRes.valid()) { + return; + } future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("profiling", "stopped successfully")); diff --git a/core/polling/PollingCache.h b/core/polling/PollingCache.h index 575eba7303..ea3ed8c38a 100644 --- a/core/polling/PollingCache.h +++ b/core/polling/PollingCache.h @@ -32,6 +32,10 @@ struct DirFileCache { void SetConfigMatched(bool configMatched) { mConfigMatched = configMatched; } bool HasMatchedConfig() const { return mConfigMatched; } + + void SetExceedPreservedDirDepth(bool exceed) { mExceedPreservedDirDepth = exceed; } + bool GetExceedPreservedDirDepth() const { return mExceedPreservedDirDepth; } + void SetCheckRound(uint64_t curRound) { mLastCheckRound = curRound; } uint64_t GetLastCheckRound() const { return mLastCheckRound; } @@ -51,6 +55,7 @@ struct DirFileCache { int32_t mLastEventTime = 0; bool mConfigMatched = false; + bool mExceedPreservedDirDepth = false; uint64_t mLastCheckRound = 0; // Last modified time on filesystem in nanoseconds. int64_t mLastModifyTime = 0; diff --git a/core/polling/PollingDirFile.cpp b/core/polling/PollingDirFile.cpp index 9b59b5cf73..4dff9c1ff5 100644 --- a/core/polling/PollingDirFile.cpp +++ b/core/polling/PollingDirFile.cpp @@ -130,133 +130,135 @@ void PollingDirFile::Polling() { LOG_INFO(sLogger, ("polling discovery", "started")); mHoldOnFlag = false; while (mRuningFlag) { - LOG_DEBUG(sLogger, ("start dir file polling, mCurrentRound", mCurrentRound)); - { - PTScopedLock thradLock(mPollingThreadLock); - mStatCount = 0; - mNewFileVec.clear(); - ++mCurrentRound; - - // Get a copy of config list from ConfigManager. - // PollingDirFile has to be held on at first because raw pointers are used here. - vector sortedConfigs; - vector wildcardConfigs; - auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); - for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) { - if (itr->second.first->GetWildcardPaths().empty()) - sortedConfigs.push_back(itr->second); - else - wildcardConfigs.push_back(itr->second); - } - sort(sortedConfigs.begin(), sortedConfigs.end(), FileDiscoveryOptions::CompareByPathLength); - - size_t configTotal = nameConfigMap.size(); - LogtailMonitor::GetInstance()->UpdateMetric("config_count", configTotal); - mGlobalConfigTotal->Set(configTotal); - { - ScopedSpinLock lock(mCacheLock); - size_t pollingDirCacheSize = mDirCacheMap.size(); - LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize); - mGlobalPollingDirCacheSizeTotal->Set(pollingDirCacheSize); - size_t pollingFileCacheSize = mFileCacheMap.size(); - LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize); - mGlobalPollingFileCacheSizeTotal->Set(pollingFileCacheSize); - } + PollingIteration(); - // Iterate all normal configs, make sure stat count will not exceed limit. - for (auto itr = sortedConfigs.begin(); - itr != sortedConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count); - ++itr) { - if (!mRuningFlag || mHoldOnFlag) - break; - - const FileDiscoveryOptions* config = itr->first; - const PipelineContext* ctx = itr->second; - if (!config->IsContainerDiscoveryEnabled()) { - fsutil::PathStat baseDirStat; - if (!fsutil::PathStat::stat(config->GetBasePath(), baseDirStat)) { - LOG_DEBUG(sLogger, - ("get base dir info error: ", config->GetBasePath())(ctx->GetProjectName(), - ctx->GetLogstoreName())); - continue; - } + // Sleep for a while, by default, 5s on Linux, 1s on Windows. + for (int i = 0; i < 10 && mRuningFlag; ++i) { + usleep(INT32_FLAG(dirfile_check_interval_ms) * 100); + } + } + LOG_DEBUG(sLogger, ("dir file polling thread done", "")); +} - int32_t lastConfigStatCount = mStatCount; - if (!PollingNormalConfigPath(*itr, config->GetBasePath(), string(), baseDirStat, 0)) { - LOG_DEBUG(sLogger, - ("logPath in config not exist", config->GetBasePath())(ctx->GetProjectName(), - ctx->GetLogstoreName())); - } - CheckConfigPollingStatCount(lastConfigStatCount, *itr, false); - } else { - for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { - const string& basePath = (*config->GetContainerInfo())[i].mRealBaseDir; - fsutil::PathStat baseDirStat; - if (!fsutil::PathStat::stat(basePath.c_str(), baseDirStat)) { - LOG_DEBUG(sLogger, - ("get docker base dir info error: ", basePath)(ctx->GetProjectName(), - ctx->GetLogstoreName())); - continue; - } - int32_t lastConfigStatCount = mStatCount; - if (!PollingNormalConfigPath(*itr, basePath, string(), baseDirStat, 0)) { - LOG_DEBUG(sLogger, - ("docker logPath in config not exist", basePath)(ctx->GetProjectName(), - ctx->GetLogstoreName())); - } - CheckConfigPollingStatCount(lastConfigStatCount, *itr, true); - } - } +void PollingDirFile::PollingIteration() { + LOG_DEBUG(sLogger, ("start dir file polling, mCurrentRound", mCurrentRound)); + PTScopedLock thradLock(mPollingThreadLock); + mStatCount = 0; + mNewFileVec.clear(); + ++mCurrentRound; + + // Get a copy of config list from ConfigManager. + // PollingDirFile has to be held on at first because raw pointers are used here. + vector sortedConfigs; + vector wildcardConfigs; + auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); + for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) { + if (itr->second.first->GetWildcardPaths().empty()) + sortedConfigs.push_back(itr->second); + else + wildcardConfigs.push_back(itr->second); + } + sort(sortedConfigs.begin(), sortedConfigs.end(), FileDiscoveryOptions::CompareByPathLength); + + size_t configTotal = nameConfigMap.size(); + LogtailMonitor::GetInstance()->UpdateMetric("config_count", configTotal); + mGlobalConfigTotal->Set(configTotal); + { + ScopedSpinLock lock(mCacheLock); + size_t pollingDirCacheSize = mDirCacheMap.size(); + LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize); + mGlobalPollingDirCacheSizeTotal->Set(pollingDirCacheSize); + size_t pollingFileCacheSize = mFileCacheMap.size(); + LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize); + mGlobalPollingFileCacheSizeTotal->Set(pollingFileCacheSize); + } + + // Iterate all normal configs, make sure stat count will not exceed limit. + for (auto itr = sortedConfigs.begin(); + itr != sortedConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count); + ++itr) { + if (!mRuningFlag || mHoldOnFlag) + break; + + const FileDiscoveryOptions* config = itr->first; + const PipelineContext* ctx = itr->second; + if (!config->IsContainerDiscoveryEnabled()) { + fsutil::PathStat baseDirStat; + if (!fsutil::PathStat::stat(config->GetBasePath(), baseDirStat)) { + LOG_DEBUG(sLogger, + ("get base dir info error: ", config->GetBasePath())(ctx->GetProjectName(), + ctx->GetLogstoreName())); + continue; } - // Iterate all wildcard configs, make sure stat count will not exceed limit. - for (auto itr = wildcardConfigs.begin(); - itr != wildcardConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count); - ++itr) { - if (!mRuningFlag || mHoldOnFlag) - break; - - const FileDiscoveryOptions* config = itr->first; - const PipelineContext* ctx = itr->second; - if (!config->IsContainerDiscoveryEnabled()) { - int32_t lastConfigStatCount = mStatCount; - if (!PollingWildcardConfigPath(*itr, config->GetWildcardPaths()[0], 0)) { - LOG_DEBUG(sLogger, - ("can not find matched path in config, Wildcard begin logPath", - config->GetBasePath())(ctx->GetProjectName(), ctx->GetLogstoreName())); - } - CheckConfigPollingStatCount(lastConfigStatCount, *itr, false); - } else { - for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { - const string& baseWildcardPath = (*config->GetContainerInfo())[i].mRealBaseDir; - int32_t lastConfigStatCount = mStatCount; - if (!PollingWildcardConfigPath(*itr, baseWildcardPath, 0)) { - LOG_DEBUG(sLogger, - ("can not find matched path in config, " - "Wildcard begin logPath ", - baseWildcardPath)(ctx->GetProjectName(), ctx->GetLogstoreName())); - } - CheckConfigPollingStatCount(lastConfigStatCount, *itr, true); - } + int32_t lastConfigStatCount = mStatCount; + if (!PollingNormalConfigPath(*itr, config->GetBasePath(), string(), baseDirStat, 0)) { + LOG_DEBUG(sLogger, + ("logPath in config not exist", config->GetBasePath())(ctx->GetProjectName(), + ctx->GetLogstoreName())); + } + CheckConfigPollingStatCount(lastConfigStatCount, *itr, false); + } else { + for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { + const string& basePath = (*config->GetContainerInfo())[i].mRealBaseDir; + fsutil::PathStat baseDirStat; + if (!fsutil::PathStat::stat(basePath, baseDirStat)) { + LOG_DEBUG( + sLogger, + ("get docker base dir info error: ", basePath)(ctx->GetProjectName(), ctx->GetLogstoreName())); + continue; } + int32_t lastConfigStatCount = mStatCount; + if (!PollingNormalConfigPath(*itr, basePath, string(), baseDirStat, 0)) { + LOG_DEBUG(sLogger, + ("docker logPath in config not exist", basePath)(ctx->GetProjectName(), + ctx->GetLogstoreName())); + } + CheckConfigPollingStatCount(lastConfigStatCount, *itr, true); } + } + } - // Add collected new files to PollingModify. - PollingModify::GetInstance()->AddNewFile(mNewFileVec); + // Iterate all wildcard configs, make sure stat count will not exceed limit. + for (auto itr = wildcardConfigs.begin(); + itr != wildcardConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count); + ++itr) { + if (!mRuningFlag || mHoldOnFlag) + break; - // Check cache, clear unavailable and overtime items. - if (mCurrentRound % INT32_FLAG(check_not_exist_file_dir_round) == 0) { - ClearUnavailableFileAndDir(); + const FileDiscoveryOptions* config = itr->first; + const PipelineContext* ctx = itr->second; + if (!config->IsContainerDiscoveryEnabled()) { + int32_t lastConfigStatCount = mStatCount; + if (!PollingWildcardConfigPath(*itr, config->GetWildcardPaths()[0], 0)) { + LOG_DEBUG(sLogger, + ("can not find matched path in config, Wildcard begin logPath", + config->GetBasePath())(ctx->GetProjectName(), ctx->GetLogstoreName())); + } + CheckConfigPollingStatCount(lastConfigStatCount, *itr, false); + } else { + for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { + const string& baseWildcardPath = (*config->GetContainerInfo())[i].mRealBaseDir; + int32_t lastConfigStatCount = mStatCount; + if (!PollingWildcardConfigPath(*itr, baseWildcardPath, 0)) { + LOG_DEBUG(sLogger, + ("can not find matched path in config, " + "Wildcard begin logPath ", + baseWildcardPath)(ctx->GetProjectName(), ctx->GetLogstoreName())); + } + CheckConfigPollingStatCount(lastConfigStatCount, *itr, true); } - ClearTimeoutFileAndDir(); } + } - // Sleep for a while, by default, 5s on Linux, 1s on Windows. - for (int i = 0; i < 10 && mRuningFlag; ++i) { - usleep(INT32_FLAG(dirfile_check_interval_ms) * 100); - } + // Add collected new files to PollingModify. + PollingModify::GetInstance()->AddNewFile(mNewFileVec); + + // Check cache, clear unavailable and overtime items. + if (mCurrentRound % INT32_FLAG(check_not_exist_file_dir_round) == 0) { + ClearUnavailableFileAndDir(); } - LOG_DEBUG(sLogger, ("dir file polling thread done", "")); + ClearTimeoutFileAndDir(); } // Last Modified Time (LMD) of directory changes when a file or a subdirectory is added, @@ -265,6 +267,7 @@ void PollingDirFile::Polling() { // NOTE: So, we can not find changes in subdirectories of the directory according to LMD. bool PollingDirFile::CheckAndUpdateDirMatchCache(const string& dirPath, const fsutil::PathStat& statBuf, + bool exceedPreservedDirDepth, bool& newFlag) { int64_t sec, nsec; statBuf.GetLastWriteTime(sec, nsec); @@ -277,6 +280,7 @@ bool PollingDirFile::CheckAndUpdateDirMatchCache(const string& dirPath, if (iter == mDirCacheMap.end()) { DirFileCache& dirCache = mDirCacheMap[dirPath]; dirCache.SetConfigMatched(true); + dirCache.SetExceedPreservedDirDepth(exceedPreservedDirDepth); dirCache.SetCheckRound(mCurrentRound); dirCache.SetLastModifyTime(modifyTime); // Directories found at round 1 or too old are considered as old data. @@ -301,7 +305,8 @@ bool PollingDirFile::CheckAndUpdateDirMatchCache(const string& dirPath, bool PollingDirFile::CheckAndUpdateFileMatchCache(const string& fileDir, const string& fileName, const fsutil::PathStat& statBuf, - bool needFindBestMatch) { + bool needFindBestMatch, + bool exceedPreservedDirDepth) { int64_t sec, nsec; statBuf.GetLastWriteTime(sec, nsec); int64_t modifyTime = NANO_CONVERTING * sec + nsec; @@ -317,6 +322,7 @@ bool PollingDirFile::CheckAndUpdateFileMatchCache(const string& fileDir, DirFileCache& fileCache = mFileCacheMap[filePath]; fileCache.SetConfigMatched(matchFlag); + fileCache.SetExceedPreservedDirDepth(exceedPreservedDirDepth); fileCache.SetCheckRound(mCurrentRound); fileCache.SetLastModifyTime(modifyTime); @@ -357,10 +363,20 @@ bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig, const string& obj, const fsutil::PathStat& statBuf, int depth) { - if (pConfig.first->mMaxDirSearchDepth >= 0 && depth > pConfig.first->mMaxDirSearchDepth) - return false; - if (pConfig.first->mPreservedDirDepth >= 0 && depth > pConfig.first->mPreservedDirDepth) + if (pConfig.first->mMaxDirSearchDepth >= 0 && depth > pConfig.first->mMaxDirSearchDepth) { return false; + } + bool exceedPreservedDirDepth = false; + if (pConfig.first->mPreservedDirDepth >= 0 && depth > pConfig.first->mPreservedDirDepth) { + exceedPreservedDirDepth = true; + int64_t sec = 0; + int64_t nsec = 0; + statBuf.GetLastWriteTime(sec, nsec); + auto curTime = time(nullptr); + if (curTime - sec > INT32_FLAG(timeout_interval)) { + return false; + } + } string dirPath = obj.empty() ? srcPath : PathJoin(srcPath, obj); if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(dirPath)) { @@ -368,7 +384,7 @@ bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig, return false; } bool isNewDirectory = false; - if (!CheckAndUpdateDirMatchCache(dirPath, statBuf, isNewDirectory)) + if (!CheckAndUpdateDirMatchCache(dirPath, statBuf, exceedPreservedDirDepth, isNewDirectory)) return true; if (isNewDirectory) { PollingEventQueue::GetInstance()->PushEvent(new Event(srcPath, obj, EVENT_CREATE | EVENT_ISDIR, -1, 0)); @@ -471,7 +487,7 @@ bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig, if (buf.IsDir() && (!needCheckDirMatch || !pConfig.first->IsDirectoryInBlacklist(item))) { PollingNormalConfigPath(pConfig, dirPath, entName, buf, depth + 1); } else if (buf.IsRegFile()) { - if (CheckAndUpdateFileMatchCache(dirPath, entName, buf, needFindBestMatch)) { + if (CheckAndUpdateFileMatchCache(dirPath, entName, buf, needFindBestMatch, exceedPreservedDirDepth)) { LOG_DEBUG(sLogger, ("add to modify event", entName)("round", mCurrentRound)); mNewFileVec.push_back(SplitedFilePath(dirPath, entName)); } @@ -632,12 +648,44 @@ void PollingDirFile::ClearTimeoutFileAndDir() { } // Collect deleted files, so that it can notify PollingModify later. + s_lastClearTime = curTime; std::vector deleteFileVec; { ScopedSpinLock lock(mCacheLock); + bool clearExceedPreservedDirDepth = false; + for (auto iter = mDirCacheMap.begin(); iter != mDirCacheMap.end();) { + if (iter->second.GetExceedPreservedDirDepth() + && (NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) + > NANO_CONVERTING * INT32_FLAG(timeout_interval)) { + iter = mDirCacheMap.erase(iter); + clearExceedPreservedDirDepth = true; + } else + ++iter; + } + if (clearExceedPreservedDirDepth) { + LOG_INFO(sLogger, ("After clear DirCache size", mDirCacheMap.size())); + } + clearExceedPreservedDirDepth = false; + for (auto iter = mFileCacheMap.begin(); iter != mFileCacheMap.end();) { + if (iter->second.GetExceedPreservedDirDepth() + && (NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) + > NANO_CONVERTING * INT32_FLAG(timeout_interval)) { + // If the file has been added to PollingModify, remove it here. + if (iter->second.HasMatchedConfig() && iter->second.HasEventFlag()) { + deleteFileVec.push_back(SplitedFilePath(iter->first)); + LOG_INFO(sLogger, ("delete file cache", iter->first)("vec size", deleteFileVec.size())); + } + iter = mFileCacheMap.erase(iter); + clearExceedPreservedDirDepth = true; + } else + ++iter; + } + if (clearExceedPreservedDirDepth) { + LOG_INFO(sLogger, ("After clear FileCache size", mFileCacheMap.size())); + } + if (mDirCacheMap.size() > (size_t)INT32_FLAG(polling_dir_upperlimit)) { LOG_INFO(sLogger, ("start clear dir cache", mDirCacheMap.size())); - s_lastClearTime = curTime; for (auto iter = mDirCacheMap.begin(); iter != mDirCacheMap.end();) { if ((NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) > NANO_CONVERTING * INT32_FLAG(polling_dir_timeout)) { @@ -645,12 +693,11 @@ void PollingDirFile::ClearTimeoutFileAndDir() { } else ++iter; } - LOG_INFO(sLogger, ("After clear", mDirCacheMap.size())("Cost time", time(NULL) - s_lastClearTime)); + LOG_INFO(sLogger, ("After clear DirCache size", mDirCacheMap.size())); } if (mFileCacheMap.size() > (size_t)INT32_FLAG(polling_file_upperlimit)) { LOG_INFO(sLogger, ("start clear file cache", mFileCacheMap.size())); - s_lastClearTime = curTime; for (auto iter = mFileCacheMap.begin(); iter != mFileCacheMap.end();) { if ((NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) > NANO_CONVERTING * INT32_FLAG(polling_file_timeout)) { @@ -663,7 +710,7 @@ void PollingDirFile::ClearTimeoutFileAndDir() { } else ++iter; } - LOG_INFO(sLogger, ("After clear", mFileCacheMap.size())("Cost time", time(NULL) - s_lastClearTime)); + LOG_INFO(sLogger, ("After clear FileCache size", mFileCacheMap.size())); } } diff --git a/core/polling/PollingDirFile.h b/core/polling/PollingDirFile.h index 4f83b5e0c4..d64d07fb3f 100644 --- a/core/polling/PollingDirFile.h +++ b/core/polling/PollingDirFile.h @@ -58,7 +58,7 @@ class PollingDirFile : public LogRunnable { ~PollingDirFile(); void Polling(); - + void PollingIteration(); // PollingNormalConfigPath polls config with normal base path recursively. // @config: config to poll. // @srcPath+@obj: directory path to poll, for base directory, @obj is empty. @@ -85,8 +85,10 @@ class PollingDirFile : public LogRunnable { // @newFlag: a boolean to indicate caller that it is a new directory, generate event for it. // @return a boolean to indicate should the directory be continued to poll. // It will returns true always now (might change in future). - bool CheckAndUpdateDirMatchCache(const std::string& dirPath, const fsutil::PathStat& statBuf, bool& newFlag); - + bool CheckAndUpdateDirMatchCache(const std::string& dirPath, + const fsutil::PathStat& statBuf, + bool exceedPreservedDirDepth, + bool& newFlag); // CheckAndUpdateFileMatchCache updates file cache (add if not existing). // @fileDir+@fileName: absolute path of the file. // @needFindBestMatch: false indicates that the file has already found the @@ -96,7 +98,8 @@ class PollingDirFile : public LogRunnable { bool CheckAndUpdateFileMatchCache(const std::string& fileDir, const std::string& fileName, const fsutil::PathStat& statBuf, - bool needFindBestMatch); + bool needFindBestMatch, + bool exceedPreservedDirDepth); // ClearUnavailableFileAndDir checks cache, remove unavailable items. // By default, it will be called every 20 rounds (flag check_not_exist_file_dir_round). diff --git a/core/polling/PollingModify.cpp b/core/polling/PollingModify.cpp index 755e774917..d4ba3584cf 100644 --- a/core/polling/PollingModify.cpp +++ b/core/polling/PollingModify.cpp @@ -241,59 +241,7 @@ void PollingModify::Polling() { LOG_INFO(sLogger, ("polling modify", "started")); mHoldOnFlag = false; while (mRuningFlag) { - { - PTScopedLock threadLock(mPollingThreadLock); - LoadFileNameInQueues(); - - vector deletedFileVec; - vector pollingEventVec; - int32_t statCount = 0; - size_t pollingModifySizeTotal = mModifyCacheMap.size(); - LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal); - mGlobalPollingModifySizeTotal->Set(pollingModifySizeTotal); - for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) { - if (!mRuningFlag || mHoldOnFlag) - break; - - const SplitedFilePath& filePath = iter->first; - ModifyCheckCache& modifyCache = iter->second; - fsutil::PathStat logFileStat; - if (!fsutil::PathStat::stat(PathJoin(filePath.mFileDir, filePath.mFileName), logFileStat)) { - if (errno == ENOENT) { - LOG_DEBUG(sLogger, ("file deleted", PathJoin(filePath.mFileDir, filePath.mFileName))); - if (UpdateDeletedFile(filePath, modifyCache, pollingEventVec)) { - deletedFileVec.push_back(filePath); - } - } else { - LOG_DEBUG(sLogger, ("get file info error", PathJoin(filePath.mFileDir, filePath.mFileName))); - } - } else { - int64_t sec, nsec; - logFileStat.GetLastWriteTime(sec, nsec); - timespec mtim{sec, nsec}; - auto devInode = logFileStat.GetDevInode(); - UpdateFile(filePath, - modifyCache, - devInode.dev, - devInode.inode, - logFileStat.GetFileSize(), - mtim, - pollingEventVec); - } - - ++statCount; - if (statCount % INT32_FLAG(modify_stat_count) == 0) { - usleep(1000 * INT32_FLAG(modify_stat_sleepMs)); - } - } - - if (pollingEventVec.size() > 0) { - PollingEventQueue::GetInstance()->PushEvent(pollingEventVec); - } - for (size_t i = 0; i < deletedFileVec.size(); ++i) { - mModifyCacheMap.erase(deletedFileVec[i]); - } - } + PollingIteration(); // Sleep for a while, by default, 1s. for (int i = 0; i < 10 && mRuningFlag; ++i) { @@ -303,6 +251,55 @@ void PollingModify::Polling() { LOG_INFO(sLogger, ("PollingModify::Polling", "stop")); } +void PollingModify::PollingIteration() { + PTScopedLock threadLock(mPollingThreadLock); + LoadFileNameInQueues(); + + vector deletedFileVec; + vector pollingEventVec; + int32_t statCount = 0; + size_t pollingModifySizeTotal = mModifyCacheMap.size(); + LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal); + mGlobalPollingModifySizeTotal->Set(pollingModifySizeTotal); + for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) { + if (!mRuningFlag || mHoldOnFlag) + break; + + const SplitedFilePath& filePath = iter->first; + ModifyCheckCache& modifyCache = iter->second; + fsutil::PathStat logFileStat; + if (!fsutil::PathStat::stat(PathJoin(filePath.mFileDir, filePath.mFileName), logFileStat)) { + if (errno == ENOENT) { + LOG_DEBUG(sLogger, ("file deleted", PathJoin(filePath.mFileDir, filePath.mFileName))); + if (UpdateDeletedFile(filePath, modifyCache, pollingEventVec)) { + deletedFileVec.push_back(filePath); + } + } else { + LOG_DEBUG(sLogger, ("get file info error", PathJoin(filePath.mFileDir, filePath.mFileName))); + } + } else { + int64_t sec, nsec; + logFileStat.GetLastWriteTime(sec, nsec); + timespec mtim{sec, nsec}; + auto devInode = logFileStat.GetDevInode(); + UpdateFile( + filePath, modifyCache, devInode.dev, devInode.inode, logFileStat.GetFileSize(), mtim, pollingEventVec); + } + + ++statCount; + if (statCount % INT32_FLAG(modify_stat_count) == 0) { + usleep(1000 * INT32_FLAG(modify_stat_sleepMs)); + } + } + + if (pollingEventVec.size() > 0) { + PollingEventQueue::GetInstance()->PushEvent(pollingEventVec); + } + for (size_t i = 0; i < deletedFileVec.size(); ++i) { + mModifyCacheMap.erase(deletedFileVec[i]); + } +} + #ifdef APSARA_UNIT_TEST_MAIN bool PollingModify::FindNewFile(const std::string& dir, const std::string& fileName) { PTScopedLock lock(mFileLock); diff --git a/core/polling/PollingModify.h b/core/polling/PollingModify.h index 959c19515e..ec5899227f 100644 --- a/core/polling/PollingModify.h +++ b/core/polling/PollingModify.h @@ -62,7 +62,7 @@ class PollingModify : public LogRunnable { ~PollingModify(); void Polling(); - + void PollingIteration(); // MakeSpaceForNewFile tries to release some space from modify cache // for LoadFileNameInQueues to add new files. void MakeSpaceForNewFile(); diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index a5fd8c40a5..6403005e1a 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -15,6 +15,8 @@ cmake_minimum_required(VERSION 3.22) project(unittest_base) +# Unittest should be able to visit private members +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-access-control") if (NOT WITHSPL) add_definitions(-D__EXCLUDE_SPL__) endif() diff --git a/core/unittest/polling/CMakeLists.txt b/core/unittest/polling/CMakeLists.txt index 20b8197bd9..133f6ac576 100644 --- a/core/unittest/polling/CMakeLists.txt +++ b/core/unittest/polling/CMakeLists.txt @@ -16,4 +16,10 @@ cmake_minimum_required(VERSION 3.22) project(polling_unittest) # add_executable(polling_unittest PollingUnittest.cpp) -# target_link_libraries(polling_unittest unittest_base) \ No newline at end of file +# target_link_libraries(polling_unittest unittest_base) + +add_executable(polling_preserved_dir_depth_unittest PollingPreservedDirDepthUnittest.cpp) +target_link_libraries(polling_preserved_dir_depth_unittest unittest_base) + +include(GoogleTest) +gtest_discover_tests(polling_preserved_dir_depth_unittest) \ No newline at end of file diff --git a/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp b/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp new file mode 100644 index 0000000000..57dcfaed34 --- /dev/null +++ b/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp @@ -0,0 +1,335 @@ +#include + +#include // Include the header for sleep_for +#include // Include the header for this_thread + +#include "application/Application.h" +#include "common/Flags.h" +#include "common/JsonUtil.h" +#include "controller/EventDispatcher.h" +#include "event_handler/LogInput.h" +#include "pipeline/PipelineManager.h" +#include "plugin/PluginRegistry.h" +#include "polling/PollingDirFile.h" +#include "polling/PollingEventQueue.h" +#include "polling/PollingModify.h" +#include "processor/daemon/LogProcess.h" +#include "unittest/Unittest.h" + +using namespace std; + +DECLARE_FLAG_INT32(default_max_inotify_watch_num); +DECLARE_FLAG_BOOL(enable_polling_discovery); +DECLARE_FLAG_INT32(check_timeout_interval); +DECLARE_FLAG_INT32(log_input_thread_wait_interval); +DECLARE_FLAG_INT32(check_not_exist_file_dir_round); +DECLARE_FLAG_INT32(polling_check_timeout_interval); + +namespace logtail { + +struct TestVector { + string mConfigInputDir; + string mTestDir0; + string mTestDir1; + int mPreservedDirDepth; + bool mLetTimeoutBefore2ndWriteTestFile0; + // expected results + bool mCollectTestFile1stWrite; + bool mCollectTestFile2ndWrite; + bool mCollectTestFile3rdWrite; + bool mCollectTestFile2; +}; + +// clang-format off +// | 用例 | PreservedDirDepth | Path | 第一次文件和目录变化 | 预期采集结果 | 第二次变化时间 | 第二次文件和目录变化 | 预期采集结果 | 第三次变化时间 | 第三次文件和目录变化 | 预期采集结果 /var/log/0/0.log | 预期采集结果 /var/log/1/0.log | +// | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | +// | 0 | 0 | /var/log | /var/log/app/0/0.log | 采集 | timeout | /var/log/app/1/0.log | 不采集 | 采集 | +// | 1 | 0 | /var/\*/log | /var/app/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | 不采集 | \>timeout | /var/app/log/1/0.log | 不采集 | 采集 | +// | 2 | 1 | /var/log | /var/app/log/0/0.log | 不采集 | timeout | /var/app/log/1/0.log | 不采集 | 不采集 | +// | 3 | 0 | /var/log | /var/log/0/0.log | 采集 | timeout | /var/log/1/0.log | 不采集 | 采集 | +// | 4 | 1 | /var/\*/log | /var/log/0/0.log | 不采集 | \>timeout | 在原有文件上追加数据 | 不采集 | \>timeout | /var/log/1/0.log | 不采集 | 不采集 | +// | 5 | 1 | /var/\*/log | /var/app/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | 采集 | \>timeout | /var/app/log/1/0.log | 采集 | 采集 | +// | 6 | 0 | /var/log | /var/log/app/0/0.log | 采集 | timeout | /var/log/app/0/1/0.log | 不采集 | 不采集 | +// clang-format on + +class PollingPreservedDirDepthUnittest : public ::testing::Test { + static std::string gRootDir; + static std::string gCheckpoint; + static vector gTestMatrix; + +public: + static void SetUpTestCase() { + gRootDir = GetProcessExecutionDir() + "var" + PATH_SEPARATOR; + gTestMatrix = { + {"log", "log/app/0", "log/app/1", 0, false, true, true, false, true}, + {"*/log", "app/log/0", "app/log/1", 0, true, true, false, false, true}, + {"log", "app/log/0", "app/log/1", 1, false, false, false, false, false}, + {"log", "log/0", "log/1", 0, false, true, true, false, true}, + {"*/log", "log/0", "log/1", 1, true, false, false, false, false}, + {"*/log", "app/log/0", "app/log/1", 1, true, true, true, true, true}, + {"log", "log/app/0", "log/app/0/1", 0, false, true, true, false, false}, + }; + + sLogger->set_level(spdlog::level::trace); + srand(time(nullptr)); + INT32_FLAG(default_max_inotify_watch_num) = 0; + BOOL_FLAG(enable_polling_discovery) = false; // we will call poll manually + INT32_FLAG(timeout_interval) = 1; + INT32_FLAG(check_timeout_interval) = 0; + INT32_FLAG(check_not_exist_file_dir_round) = 1; + INT32_FLAG(polling_check_timeout_interval) = 0; + AppConfig::GetInstance()->mCheckPointFilePath = GetProcessExecutionDir() + gCheckpoint; + if (bfs::exists(AppConfig::GetInstance()->mCheckPointFilePath)) { + bfs::remove_all(AppConfig::GetInstance()->mCheckPointFilePath); + } + LogFileProfiler::GetInstance(); + LoongCollectorMonitor::GetInstance()->Init(); + PluginRegistry::GetInstance()->LoadPlugins(); + LogProcess::GetInstance()->Start(); + PipelineManager::GetInstance(); + FileServer::GetInstance()->Start(); + PollingDirFile::GetInstance()->Start(); + PollingModify::GetInstance()->Start(); + PollingModify::GetInstance()->Stop(); + PollingDirFile::GetInstance()->Stop(); + PollingDirFile::GetInstance()->mRuningFlag = true; + PollingModify::GetInstance()->mRuningFlag = true; + } + + static void TearDownTestCase() { + PollingDirFile::GetInstance()->mRuningFlag = false; + PollingModify::GetInstance()->mRuningFlag = false; + Application::GetInstance()->Exit(); + } + + void SetUp() override { + if (bfs::exists(AppConfig::GetInstance()->mCheckPointFilePath)) { + bfs::remove_all(AppConfig::GetInstance()->mCheckPointFilePath); + } + if (bfs::exists(gRootDir)) { + bfs::remove_all(gRootDir); + } + bfs::create_directories(gRootDir); + } + + void TearDown() override { + FileServer::GetInstance()->Pause(); + for (auto& p : PipelineManager::GetInstance()->mPipelineNameEntityMap) { + p.second->Stop(true); + } + PipelineManager::GetInstance()->mPipelineNameEntityMap.clear(); + // EventDispatcher::GetInstance()->CleanEnviroments(); + // ConfigManager::GetInstance()->CleanEnviroments(); + PollingDirFile::GetInstance()->ClearCache(); + PollingModify::GetInstance()->ClearCache(); + CheckPointManager::Instance()->RemoveAllCheckPoint(); + // PollingEventQueue::GetInstance()->Clear(); + bfs::remove_all(gRootDir); + if (bfs::exists(AppConfig::GetInstance()->mCheckPointFilePath)) { + bfs::remove_all(AppConfig::GetInstance()->mCheckPointFilePath); + } + FileServer::GetInstance()->Resume(); + } + +private: + unique_ptr createPipelineConfig(const string& filePath, int preservedDirDepth) { + const char* confCstr = R"({ + "inputs": [ + { + "Type": "input_file", + "FilePaths": ["/var/log/**/0.log"], + "MaxDirSearchDepth": 2, + "PreservedDirDepth": -1 + } + ], + "flushers": [ + { + "Type": "flusher_stdout" + } + ] + })"; + unique_ptr conf(new Json::Value(Json::objectValue)); + string errorMsg; + ParseJsonTable(confCstr, *conf, errorMsg); + auto& input = (*conf)["inputs"][0]; + input["FilePaths"][0] = filePath; + input["PreservedDirDepth"] = preservedDirDepth; + return conf; + } + + void generateLog(const string& testFile) { + LOG_DEBUG(sLogger, ("Generate log", testFile)); + auto pos = testFile.rfind(PATH_SEPARATOR); + auto dir = testFile.substr(0, pos); + bfs::create_directories(dir); + ofstream ofs(testFile, std::ios::app); + ofs << "0\n"; + } + + bool isFileDirRegistered(const string& testFile) { + auto pos = testFile.rfind(PATH_SEPARATOR); + auto dir = testFile.substr(0, pos); + auto registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(dir); + return registerStatus == DirRegisterStatus::PATH_INODE_REGISTERED; + } + + void testPollingDirFile(const TestVector& testVector) { + auto configInputFilePath + = gRootDir + testVector.mConfigInputDir + PATH_SEPARATOR + "**" + PATH_SEPARATOR + "0.log"; + auto testFile1 = gRootDir + testVector.mTestDir0 + PATH_SEPARATOR + "0.log"; + auto testFile2 = gRootDir + testVector.mTestDir1 + PATH_SEPARATOR + "0.log"; + FileServer::GetInstance()->Pause(); + auto configJson = createPipelineConfig(configInputFilePath, testVector.mPreservedDirDepth); + Config pipelineConfig("polling", std::move(configJson)); + APSARA_TEST_TRUE_FATAL(pipelineConfig.Parse()); + auto p = PipelineManager::GetInstance()->BuildPipeline( + std::move(pipelineConfig)); // reference: PipelineManager::UpdatePipelines + APSARA_TEST_FALSE_FATAL(p.get() == nullptr); + PipelineManager::GetInstance()->mPipelineNameEntityMap[pipelineConfig.mName] = p; + p->Start(); + FileServer::GetInstance()->Resume(); + + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + + // write testFile1 for the 1st time + generateLog(testFile1); + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + if (testVector.mCollectTestFile1stWrite) { + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile1)); + } else { + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile1)); + } + + if (testVector.mLetTimeoutBefore2ndWriteTestFile0) { + std::this_thread::sleep_for(std::chrono::seconds( + 2 + * INT32_FLAG( + timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s + } + + // trigger clean timeout polling cache + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + // write testFile1 for the 2nd time + generateLog(testFile1); + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + if (testVector.mCollectTestFile2ndWrite) { + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile1)); + } else { + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile1)); + } + + std::this_thread::sleep_for(std::chrono::seconds( + 2 + * INT32_FLAG( + timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s + + // trigger clean timeout polling cache + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + generateLog(testFile1); + generateLog(testFile2); + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + if (testVector.mCollectTestFile3rdWrite) { + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile1)); + } else { + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile1)); + } + if (testVector.mCollectTestFile2) { + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile2)); + } else { + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile2)); + } + } + +public: + void TestPollingDirFile0() { testPollingDirFile(gTestMatrix[0]); } + void TestPollingDirFile1() { testPollingDirFile(gTestMatrix[1]); } + void TestPollingDirFile2() { testPollingDirFile(gTestMatrix[2]); } + void TestPollingDirFile3() { testPollingDirFile(gTestMatrix[3]); } + void TestPollingDirFile4() { testPollingDirFile(gTestMatrix[4]); } + void TestPollingDirFile5() { testPollingDirFile(gTestMatrix[5]); } + + void TestPollingDirFile6() { testPollingDirFile(gTestMatrix[6]); } + + void TestCheckpoint() { + auto configInputFilePath = gRootDir + "log/**/0.log"; + auto testFile = gRootDir + "log/0/0.log"; + FileServer::GetInstance()->Pause(); + auto configJson = createPipelineConfig(configInputFilePath, 0); + Config pipelineConfig("polling", std::move(configJson)); + APSARA_TEST_TRUE_FATAL(pipelineConfig.Parse()); + auto p = PipelineManager::GetInstance()->BuildPipeline( + std::move(pipelineConfig)); // reference: PipelineManager::UpdatePipelines + APSARA_TEST_FALSE_FATAL(p.get() == nullptr); + PipelineManager::GetInstance()->mPipelineNameEntityMap[pipelineConfig.mName] = p; + p->Start(); + FileServer::GetInstance()->Resume(); + + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + + // generate log for testFile1 for the 1st time + generateLog(testFile); + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile)); + + // Dump and load checkpoint + FileServer::GetInstance()->Pause(true); + std::this_thread::sleep_for(std::chrono::seconds( + 2 + * INT32_FLAG( + timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s + FileServer::GetInstance()->Resume(true); + // Should remain registered after checkpoint + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile)); + + std::this_thread::sleep_for(std::chrono::seconds( + 2 + * INT32_FLAG( + timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s + + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile)); + // Dump and load checkpoint + FileServer::GetInstance()->Pause(true); + FileServer::GetInstance()->Resume(true); + // Should remain unregistered after checkpoint + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile)); + } +}; + +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile0); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile1); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile2); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile3); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile4); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile5); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestCheckpoint); + +std::string PollingPreservedDirDepthUnittest::gRootDir; +std::string PollingPreservedDirDepthUnittest::gCheckpoint = "checkpoint"; +vector PollingPreservedDirDepthUnittest::gTestMatrix; +} // namespace logtail + +int main(int argc, char** argv) { + logtail::Logger::Instance().InitGlobalLoggers(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file