From 48f1354b540a5d7bd75763a555c5648c62ecd8e2 Mon Sep 17 00:00:00 2001 From: Tao Yu Date: Fri, 8 Nov 2024 07:30:53 +0000 Subject: [PATCH 1/7] fix PreservedDirDepth not working with polling and wildcard path --- core/file_server/ConfigManager.cpp | 99 ++++++++++++++------- core/file_server/ConfigManager.h | 5 +- core/file_server/EventDispatcher.cpp | 7 -- core/file_server/event_handler/LogInput.cpp | 3 +- core/file_server/polling/PollingCache.h | 5 ++ core/file_server/polling/PollingDirFile.cpp | 76 +++++++++++++--- core/file_server/polling/PollingDirFile.h | 9 +- 7 files changed, 150 insertions(+), 54 deletions(-) diff --git a/core/file_server/ConfigManager.cpp b/core/file_server/ConfigManager.cpp index d339e9ca5f..7c29277096 100644 --- a/core/file_server/ConfigManager.cpp +++ b/core/file_server/ConfigManager.cpp @@ -306,9 +306,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons if (registerStatus == GET_REGISTER_STATUS_ERROR) { return; } - if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) { + if (config.first->mPreservedDirDepth < 0) RegisterDescendants( item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); + else { + // preserve_depth register + RegisterHandlersWithinDepth(item, + config, + config.first->mPreservedDirDepth, + config.first->mMaxDirSearchDepth < 0 ? 100 + : config.first->mMaxDirSearchDepth); } } else { RegisterWildcardPath(config, item, depth + 1); @@ -382,9 +389,16 @@ void ConfigManager::RegisterWildcardPath(const FileDiscoveryConfig& config, cons if (registerStatus == GET_REGISTER_STATUS_ERROR) { return; } - if (EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler)) { + if (config.first->mPreservedDirDepth < 0) RegisterDescendants( item, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); + else { + // preserve_depth register + RegisterHandlersWithinDepth( + item, + config, + config.first->mPreservedDirDepth, + config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); } } else { RegisterWildcardPath(config, item, depth + 1); @@ -421,25 +435,23 @@ bool ConfigManager::RegisterHandlers(const string& basePath, const FileDiscovery DirRegisterStatus registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(basePath); if (registerStatus == GET_REGISTER_STATUS_ERROR) return result; - // dir in config is valid by default, do not call pathValidator - result = EventDispatcher::GetInstance()->RegisterEventHandler(basePath.c_str(), config, mSharedHandler); - // if we come into a failure, do not try to register others, there must be something wrong! - if (!result) - return result; if (config.first->mPreservedDirDepth < 0) result = RegisterDescendants( basePath, config, config.first->mMaxDirSearchDepth < 0 ? 100 : config.first->mMaxDirSearchDepth); else { // preserve_depth register - int depth = config.first->mPreservedDirDepth; - result = RegisterHandlersWithinDepth(basePath, config, depth); + result = RegisterHandlersWithinDepth(basePath, + config, + config.first->mPreservedDirDepth, + config.first->mMaxDirSearchDepth < 0 ? 100 + : config.first->mMaxDirSearchDepth); } return result; } bool ConfigManager::RegisterDirectory(const std::string& source, const std::string& object) { - // TODO��A potential bug: FindBestMatch will test @object with filePattern, which has very + // TODO: A potential bug: FindBestMatch will test @object with filePattern, which has very // low possibility to match a sub directory name, so here will return false in most cases. // e.g.: source: /path/to/monitor, file pattern: *.log, object: subdir. // Match(subdir, *.log) = false. @@ -449,24 +461,39 @@ bool ConfigManager::RegisterDirectory(const std::string& source, const std::stri return false; } -bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth) { +bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, + const FileDiscoveryConfig& config, + int preservedDirDepth, + int maxDepth) { + if (maxDepth < 0) { + return true; + } if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) { LOG_INFO(sLogger, ("ignore path matching host path blacklist", path)); return false; } - if (depth <= 0) { + if (preservedDirDepth <= 0) { DirCheckPointPtr dirCheckPoint; - if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint) == false) + if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) { + // path had dircheckpoint means it was watched before, so it is valid + const set& subdir = dirCheckPoint.get()->mSubDir; + for (const auto& it : subdir) { + RegisterHandlersWithinDepth(it, config, preservedDirDepth - 1, maxDepth - 1); + } + return true; + } + fsutil::PathStat statBuf; + if (!fsutil::PathStat::stat(path, statBuf)) { + return true; + } + int64_t sec = 0; + int64_t nsec = 0; + statBuf.GetLastWriteTime(sec, nsec); + auto curTime = time(nullptr); + if (curTime - sec > INT32_FLAG(timeout_interval)) { return true; - // path had dircheckpoint means it was watched before, so it is valid - const set& subdir = dirCheckPoint.get()->mSubDir; - for (set::iterator it = subdir.begin(); it != subdir.end(); it++) { - if (EventDispatcher::GetInstance()->RegisterEventHandler((*it).c_str(), config, mSharedHandler)) - RegisterHandlersWithinDepth(*it, config, depth - 1); } - return true; } - bool result = true; fsutil::Dir dir(path); if (!dir.Open()) { @@ -480,15 +507,19 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const F LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err)); return false; } + if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler))) { + // break;// fail early, do not try to register others + return false; + } + if (maxDepth == 0) { + return true; + } + bool result = true; fsutil::Entry ent; while ((ent = dir.ReadNext())) { string item = PathJoin(path, ent.Name()); if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) { - if (!(EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler))) { - // break;// fail early, do not try to register others - result = false; - } else // sub dir will not be registered if parent dir fails - RegisterHandlersWithinDepth(item, config, depth - 1); + RegisterHandlersWithinDepth(item, config, preservedDirDepth - 1, maxDepth - 1); } } @@ -497,13 +528,13 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, const F // path not terminated by '/', path already registered bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryConfig& config, int withinDepth) { + if (withinDepth < 0) { + return true; + } if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) { LOG_INFO(sLogger, ("ignore path matching host path blacklist", path)); return false; } - if (withinDepth <= 0) { - return true; - } fsutil::Dir dir(path); if (!dir.Open()) { @@ -516,14 +547,20 @@ bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryC LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err)); return false; } + if (!EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler)) { + // break;// fail early, do not try to register others + return false; + } + if (withinDepth == 0) { + return true; + } + fsutil::Entry ent; bool result = true; while ((ent = dir.ReadNext())) { string item = PathJoin(path, ent.Name()); if (ent.IsDir() && !config.first->IsDirectoryInBlacklist(item)) { - result = EventDispatcher::GetInstance()->RegisterEventHandler(item.c_str(), config, mSharedHandler); - if (result) - RegisterDescendants(item, config, withinDepth - 1); + RegisterDescendants(item, config, withinDepth - 1); } } return result; diff --git a/core/file_server/ConfigManager.h b/core/file_server/ConfigManager.h index 71020a467f..c9f718e6c0 100644 --- a/core/file_server/ConfigManager.h +++ b/core/file_server/ConfigManager.h @@ -510,7 +510,10 @@ class ConfigManager { * @param path is the current dir that being registered * @depth is the num of sub dir layers that should be registered */ - bool RegisterHandlersWithinDepth(const std::string& path, const FileDiscoveryConfig& config, int depth); + bool RegisterHandlersWithinDepth(const std::string& path, + const FileDiscoveryConfig& config, + int preservedDirDepth, + int maxDepth); bool RegisterDescendants(const std::string& path, const FileDiscoveryConfig& config, int withinDepth); // bool CheckLogType(const std::string& logTypeStr, LogType& logType); // 废弃 diff --git a/core/file_server/EventDispatcher.cpp b/core/file_server/EventDispatcher.cpp index dbbd80cd11..1895347a34 100644 --- a/core/file_server/EventDispatcher.cpp +++ b/core/file_server/EventDispatcher.cpp @@ -279,13 +279,6 @@ bool EventDispatcher::RegisterEventHandler(const char* path, bool dirTimeOutFlag = config.first->IsTimeout(path); if (!mEventListener->IsValidID(wd)) { - if (dirTimeOutFlag) { - LOG_DEBUG( - sLogger, - ("Drop timeout path, source", path)("config, basepath", config.first->GetBasePath())( - "preseveDepth", config.first->mPreservedDirDepth)("maxDepth", config.first->mMaxDirSearchDepth)); - return false; - } wd = mNonInotifyWd; if (mNonInotifyWd == INT_MIN) mNonInotifyWd = -1; diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index 38b6e4ab36..634e5cf09d 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -48,6 +48,7 @@ using namespace std; DEFINE_FLAG_INT32(check_symbolic_link_interval, "seconds", 120); DEFINE_FLAG_INT32(check_base_dir_interval, "seconds", 60); +DEFINE_FLAG_INT32(check_timeout_interval, "seconds", 600); DEFINE_FLAG_INT32(log_input_thread_wait_interval, "microseconds", 20 * 1000); DEFINE_FLAG_INT64(read_fs_events_interval, "microseconds", 20 * 1000); DEFINE_FLAG_INT32(check_handler_timeout_interval, "seconds", 180); @@ -428,7 +429,7 @@ void* LogInput::ProcessLoop() { lastCheckSymbolicLink = 0; } - if (curTime - prevTime >= INT32_FLAG(timeout_interval)) { + if (curTime - prevTime >= INT32_FLAG(check_timeout_interval)) { dispatcher->HandleTimeout(); prevTime = curTime; } diff --git a/core/file_server/polling/PollingCache.h b/core/file_server/polling/PollingCache.h index 575eba7303..ea3ed8c38a 100644 --- a/core/file_server/polling/PollingCache.h +++ b/core/file_server/polling/PollingCache.h @@ -32,6 +32,10 @@ struct DirFileCache { void SetConfigMatched(bool configMatched) { mConfigMatched = configMatched; } bool HasMatchedConfig() const { return mConfigMatched; } + + void SetExceedPreservedDirDepth(bool exceed) { mExceedPreservedDirDepth = exceed; } + bool GetExceedPreservedDirDepth() const { return mExceedPreservedDirDepth; } + void SetCheckRound(uint64_t curRound) { mLastCheckRound = curRound; } uint64_t GetLastCheckRound() const { return mLastCheckRound; } @@ -51,6 +55,7 @@ struct DirFileCache { int32_t mLastEventTime = 0; bool mConfigMatched = false; + bool mExceedPreservedDirDepth = false; uint64_t mLastCheckRound = 0; // Last modified time on filesystem in nanoseconds. int64_t mLastModifyTime = 0; diff --git a/core/file_server/polling/PollingDirFile.cpp b/core/file_server/polling/PollingDirFile.cpp index 22c0866c1e..ceee841079 100644 --- a/core/file_server/polling/PollingDirFile.cpp +++ b/core/file_server/polling/PollingDirFile.cpp @@ -192,7 +192,7 @@ void PollingDirFile::Polling() { for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { const string& basePath = (*config->GetContainerInfo())[i].mRealBaseDir; fsutil::PathStat baseDirStat; - if (!fsutil::PathStat::stat(basePath.c_str(), baseDirStat)) { + if (!fsutil::PathStat::stat(basePath, baseDirStat)) { LOG_DEBUG(sLogger, ("get docker base dir info error: ", basePath)(ctx->GetProjectName(), ctx->GetLogstoreName())); @@ -265,6 +265,7 @@ void PollingDirFile::Polling() { // NOTE: So, we can not find changes in subdirectories of the directory according to LMD. bool PollingDirFile::CheckAndUpdateDirMatchCache(const string& dirPath, const fsutil::PathStat& statBuf, + bool exceedPreservedDirDepth, bool& newFlag) { int64_t sec, nsec; statBuf.GetLastWriteTime(sec, nsec); @@ -277,6 +278,7 @@ bool PollingDirFile::CheckAndUpdateDirMatchCache(const string& dirPath, if (iter == mDirCacheMap.end()) { DirFileCache& dirCache = mDirCacheMap[dirPath]; dirCache.SetConfigMatched(true); + dirCache.SetExceedPreservedDirDepth(exceedPreservedDirDepth); dirCache.SetCheckRound(mCurrentRound); dirCache.SetLastModifyTime(modifyTime); // Directories found at round 1 or too old are considered as old data. @@ -301,7 +303,8 @@ bool PollingDirFile::CheckAndUpdateDirMatchCache(const string& dirPath, bool PollingDirFile::CheckAndUpdateFileMatchCache(const string& fileDir, const string& fileName, const fsutil::PathStat& statBuf, - bool needFindBestMatch) { + bool needFindBestMatch, + bool exceedPreservedDirDepth) { int64_t sec, nsec; statBuf.GetLastWriteTime(sec, nsec); int64_t modifyTime = NANO_CONVERTING * sec + nsec; @@ -317,6 +320,7 @@ bool PollingDirFile::CheckAndUpdateFileMatchCache(const string& fileDir, DirFileCache& fileCache = mFileCacheMap[filePath]; fileCache.SetConfigMatched(matchFlag); + fileCache.SetExceedPreservedDirDepth(exceedPreservedDirDepth); fileCache.SetCheckRound(mCurrentRound); fileCache.SetLastModifyTime(modifyTime); @@ -357,10 +361,23 @@ bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig, const string& obj, const fsutil::PathStat& statBuf, int depth) { - if (pConfig.first->mMaxDirSearchDepth >= 0 && depth > pConfig.first->mMaxDirSearchDepth) - return false; - if (pConfig.first->mPreservedDirDepth >= 0 && depth > pConfig.first->mPreservedDirDepth) + if (pConfig.first->mMaxDirSearchDepth >= 0 && depth > pConfig.first->mMaxDirSearchDepth) { return false; + } + bool exceedPreservedDirDepth = false; + if (pConfig.first->mPreservedDirDepth >= 0 && depth > pConfig.first->mPreservedDirDepth) { + exceedPreservedDirDepth = true; + int64_t sec = 0; + int64_t nsec = 0; + statBuf.GetLastWriteTime(sec, nsec); + auto curTime = time(nullptr); + LOG_DEBUG(sLogger, + ("srcPath", srcPath)("obj", obj)("exceedPreservedDirDepth", exceedPreservedDirDepth)( + "curTime", curTime)("sec", sec)("INT32_FLAG(timeout_interval)", INT32_FLAG(timeout_interval))); + if (curTime - sec > INT32_FLAG(timeout_interval)) { + return false; + } + } string dirPath = obj.empty() ? srcPath : PathJoin(srcPath, obj); if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(dirPath)) { @@ -368,7 +385,7 @@ bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig, return false; } bool isNewDirectory = false; - if (!CheckAndUpdateDirMatchCache(dirPath, statBuf, isNewDirectory)) + if (!CheckAndUpdateDirMatchCache(dirPath, statBuf, exceedPreservedDirDepth, isNewDirectory)) return true; if (isNewDirectory) { PollingEventQueue::GetInstance()->PushEvent(new Event(srcPath, obj, EVENT_CREATE | EVENT_ISDIR, -1, 0)); @@ -471,7 +488,7 @@ bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig, if (buf.IsDir() && (!needCheckDirMatch || !pConfig.first->IsDirectoryInBlacklist(item))) { PollingNormalConfigPath(pConfig, dirPath, entName, buf, depth + 1); } else if (buf.IsRegFile()) { - if (CheckAndUpdateFileMatchCache(dirPath, entName, buf, needFindBestMatch)) { + if (CheckAndUpdateFileMatchCache(dirPath, entName, buf, needFindBestMatch, exceedPreservedDirDepth)) { LOG_DEBUG(sLogger, ("add to modify event", entName)("round", mCurrentRound)); mNewFileVec.push_back(SplitedFilePath(dirPath, entName)); } @@ -639,12 +656,50 @@ void PollingDirFile::ClearTimeoutFileAndDir() { } // Collect deleted files, so that it can notify PollingModify later. + s_lastClearTime = curTime; std::vector deleteFileVec; { ScopedSpinLock lock(mCacheLock); + bool clearExceedPreservedDirDepth = false; + for (auto iter = mDirCacheMap.begin(); iter != mDirCacheMap.end();) { + LOG_DEBUG(sLogger, + ("Dir", iter->first)("GetExceedPreservedDirDepth", iter->second.GetExceedPreservedDirDepth())( + "iter->second.GetLastModifyTime()", iter->second.GetLastModifyTime())); + if (iter->second.GetExceedPreservedDirDepth() + && (NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) + > NANO_CONVERTING * INT32_FLAG(timeout_interval)) { + iter = mDirCacheMap.erase(iter); + clearExceedPreservedDirDepth = true; + } else + ++iter; + } + if (clearExceedPreservedDirDepth) { + LOG_INFO(sLogger, ("After clear DirCache size", mDirCacheMap.size())); + } + clearExceedPreservedDirDepth = false; + for (auto iter = mFileCacheMap.begin(); iter != mFileCacheMap.end();) { + LOG_DEBUG(sLogger, + ("File", iter->first)("GetExceedPreservedDirDepth", iter->second.GetExceedPreservedDirDepth())( + "iter->second.GetLastModifyTime()", iter->second.GetLastModifyTime())); + if (iter->second.GetExceedPreservedDirDepth() + && (NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) + > NANO_CONVERTING * INT32_FLAG(timeout_interval)) { + // If the file has been added to PollingModify, remove it here. + if (iter->second.HasMatchedConfig() && iter->second.HasEventFlag()) { + deleteFileVec.push_back(SplitedFilePath(iter->first)); + LOG_INFO(sLogger, ("delete file cache", iter->first)("vec size", deleteFileVec.size())); + } + iter = mFileCacheMap.erase(iter); + clearExceedPreservedDirDepth = true; + } else + ++iter; + } + if (clearExceedPreservedDirDepth) { + LOG_INFO(sLogger, ("After clear FileCache size", mFileCacheMap.size())); + } + if (mDirCacheMap.size() > (size_t)INT32_FLAG(polling_dir_upperlimit)) { LOG_INFO(sLogger, ("start clear dir cache", mDirCacheMap.size())); - s_lastClearTime = curTime; for (auto iter = mDirCacheMap.begin(); iter != mDirCacheMap.end();) { if ((NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) > NANO_CONVERTING * INT32_FLAG(polling_dir_timeout)) { @@ -652,12 +707,11 @@ void PollingDirFile::ClearTimeoutFileAndDir() { } else ++iter; } - LOG_INFO(sLogger, ("After clear", mDirCacheMap.size())("Cost time", time(NULL) - s_lastClearTime)); + LOG_INFO(sLogger, ("After clear DirCache size", mDirCacheMap.size())); } if (mFileCacheMap.size() > (size_t)INT32_FLAG(polling_file_upperlimit)) { LOG_INFO(sLogger, ("start clear file cache", mFileCacheMap.size())); - s_lastClearTime = curTime; for (auto iter = mFileCacheMap.begin(); iter != mFileCacheMap.end();) { if ((NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) > NANO_CONVERTING * INT32_FLAG(polling_file_timeout)) { @@ -670,7 +724,7 @@ void PollingDirFile::ClearTimeoutFileAndDir() { } else ++iter; } - LOG_INFO(sLogger, ("After clear", mFileCacheMap.size())("Cost time", time(NULL) - s_lastClearTime)); + LOG_INFO(sLogger, ("After clear FileCache size", mFileCacheMap.size())); } } diff --git a/core/file_server/polling/PollingDirFile.h b/core/file_server/polling/PollingDirFile.h index 4bf7fb8f52..244b3ea28d 100644 --- a/core/file_server/polling/PollingDirFile.h +++ b/core/file_server/polling/PollingDirFile.h @@ -85,8 +85,10 @@ class PollingDirFile : public LogRunnable { // @newFlag: a boolean to indicate caller that it is a new directory, generate event for it. // @return a boolean to indicate should the directory be continued to poll. // It will returns true always now (might change in future). - bool CheckAndUpdateDirMatchCache(const std::string& dirPath, const fsutil::PathStat& statBuf, bool& newFlag); - + bool CheckAndUpdateDirMatchCache(const std::string& dirPath, + const fsutil::PathStat& statBuf, + bool exceedPreservedDirDepth, + bool& newFlag); // CheckAndUpdateFileMatchCache updates file cache (add if not existing). // @fileDir+@fileName: absolute path of the file. // @needFindBestMatch: false indicates that the file has already found the @@ -96,7 +98,8 @@ class PollingDirFile : public LogRunnable { bool CheckAndUpdateFileMatchCache(const std::string& fileDir, const std::string& fileName, const fsutil::PathStat& statBuf, - bool needFindBestMatch); + bool needFindBestMatch, + bool exceedPreservedDirDepth); // ClearUnavailableFileAndDir checks cache, remove unavailable items. // By default, it will be called every 20 rounds (flag check_not_exist_file_dir_round). From 3be637769e24c444787e19e9423a6393ae0d4c0d Mon Sep 17 00:00:00 2001 From: Tao Yu Date: Thu, 14 Nov 2024 08:48:27 +0000 Subject: [PATCH 2/7] Add unittest and fix mem leaks --- core/CMakeLists.txt | 4 +- core/checkpoint/CheckPointManager.cpp | 5 +- .../common_provider/CommonConfigProvider.cpp | 3 + .../LegacyCommonConfigProvider.cpp | 3 + core/config/watcher/ConfigWatcher.cpp | 4 +- core/file_server/ConfigManager.cpp | 18 +- core/file_server/EventDispatcher.cpp | 4 +- core/file_server/FileDiscoveryOptions.cpp | 2 +- core/file_server/FileServer.cpp | 11 +- .../event_handler/EventHandler.cpp | 6 +- core/file_server/event_handler/LogInput.cpp | 41 +-- core/file_server/event_handler/LogInput.h | 4 +- core/file_server/polling/PollingDirFile.cpp | 241 ++++++++-------- core/file_server/polling/PollingDirFile.h | 1 + core/file_server/polling/PollingEventQueue.h | 1 + core/file_server/polling/PollingModify.cpp | 103 ++++--- core/file_server/polling/PollingModify.h | 1 + core/logger/Logger.cpp | 10 +- core/monitor/LogtailAlarm.cpp | 17 +- core/monitor/LogtailAlarm.h | 3 +- core/monitor/Monitor.cpp | 13 +- core/pipeline/Pipeline.cpp | 8 +- core/plugin/flusher/sls/DiskBufferWriter.cpp | 4 +- core/plugin/flusher/sls/SLSClientManager.cpp | 4 +- core/runner/FlusherRunner.cpp | 3 + core/runner/ProcessorRunner.cpp | 3 + core/runner/sink/http/HttpSink.cpp | 3 + core/unittest/CMakeLists.txt | 3 + core/unittest/common/FileSystemUtilUnittest.h | 39 ++- core/unittest/config/ConfigUpdateUnittest.cpp | 6 + .../event/BlockedEventManagerUnittest.cpp | 3 + .../unittest/metadata/K8sMetadataUnittest.cpp | 2 +- .../models/PipelineEventPtrUnittest.cpp | 3 +- core/unittest/plugin/FlusherUnittest.cpp | 4 +- core/unittest/polling/CMakeLists.txt | 8 +- .../PollingPreservedDirDepthUnittest.cpp | 267 ++++++++++++++++++ core/unittest/polling/PollingUnittest.cpp | 6 +- core/unittest/reader/ForceReadUnittest.cpp | 12 +- 38 files changed, 601 insertions(+), 272 deletions(-) create mode 100644 core/unittest/polling/PollingPreservedDirDepthUnittest.cpp diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 9c9d7cffab..7ed1d6636e 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -61,8 +61,8 @@ if (UNIX) set(CMAKE_C_FLAGS "${CMAKE_C_FLAGS} -g -ggdb") set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -g -ggdb") endif () - set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O1 -fno-omit-frame-pointer") - set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O1 -fno-omit-frame-pointer") + set(CMAKE_C_FLAGS_DEBUG "${CMAKE_C_FLAGS_DEBUG} -O0 -fno-omit-frame-pointer") + set(CMAKE_CXX_FLAGS_DEBUG "${CMAKE_CXX_FLAGS_DEBUG} -O0 -fno-omit-frame-pointer") set(CMAKE_C_FLAGS_RELEASE "${CMAKE_C_FLAGS_RELEASE} -O2") set(CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE} -O2") string(REPLACE "-O3" "" CMAKE_CXX_FLAGS_RELEASE "${CMAKE_CXX_FLAGS_RELEASE}") diff --git a/core/checkpoint/CheckPointManager.cpp b/core/checkpoint/CheckPointManager.cpp index 6d69990782..7bc16efb75 100644 --- a/core/checkpoint/CheckPointManager.cpp +++ b/core/checkpoint/CheckPointManager.cpp @@ -123,8 +123,7 @@ void CheckPointManager::LoadCheckPoint() { Json::Value root; ParseConfResult cptRes = ParseConfig(AppConfig::GetInstance()->GetCheckPointFilePath(), root); // if new checkpoint file not exist, check old checkpoint file. - if (cptRes == CONFIG_NOT_EXIST - && AppConfig::GetInstance()->GetCheckPointFilePath() != GetCheckPointFileName()) { + if (cptRes == CONFIG_NOT_EXIST && AppConfig::GetInstance()->GetCheckPointFilePath() != GetCheckPointFileName()) { cptRes = ParseConfig(GetCheckPointFileName(), root); } if (cptRes != CONFIG_OK) { @@ -408,7 +407,7 @@ bool CheckPointManager::DumpCheckPointToLocal() { result["dir_check_point"] = dirJson; result["version"] = Json::Value(Json::UInt(INT32_FLAG(check_point_version))); fout << result.toStyledString(); - if (!fout.good()) { + if (!fout) { LOG_ERROR(sLogger, ("dump check point to file failed", checkPointFile)); LogtailAlarm::GetInstance()->SendAlarm(CHECKPOINT_ALARM, "dump check point to file failed"); fout.close(); diff --git a/core/config/common_provider/CommonConfigProvider.cpp b/core/config/common_provider/CommonConfigProvider.cpp index 4551e2a127..ae6da25f54 100644 --- a/core/config/common_provider/CommonConfigProvider.cpp +++ b/core/config/common_provider/CommonConfigProvider.cpp @@ -120,6 +120,9 @@ void CommonConfigProvider::Stop() { mIsThreadRunning = false; } mStopCV.notify_one(); + if (!mThreadRes.valid()) { + return; + } future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, (sName, "stopped successfully")); diff --git a/core/config/common_provider/LegacyCommonConfigProvider.cpp b/core/config/common_provider/LegacyCommonConfigProvider.cpp index f5d000711f..b075bbc6a6 100644 --- a/core/config/common_provider/LegacyCommonConfigProvider.cpp +++ b/core/config/common_provider/LegacyCommonConfigProvider.cpp @@ -87,6 +87,9 @@ void LegacyCommonConfigProvider::Stop() { mIsThreadRunning = false; } mStopCV.notify_one(); + if (!mThreadRes.valid()) { + return; + } future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("legacy common config provider", "stopped successfully")); diff --git a/core/config/watcher/ConfigWatcher.cpp b/core/config/watcher/ConfigWatcher.cpp index baa3d28a9f..66c9f30566 100644 --- a/core/config/watcher/ConfigWatcher.cpp +++ b/core/config/watcher/ConfigWatcher.cpp @@ -80,7 +80,7 @@ PipelineConfigDiff ConfigWatcher::CheckConfigDiff() { filesystem::file_time_type mTime = filesystem::last_write_time(path, ec); if (iter == mFileInfoMap.end()) { mFileInfoMap[filepath] = make_pair(size, mTime); - unique_ptr detail = make_unique(new Json::Value()); + unique_ptr detail = make_unique(); if (!LoadConfigDetailFromFile(path, *detail)) { continue; } @@ -106,7 +106,7 @@ PipelineConfigDiff ConfigWatcher::CheckConfigDiff() { } else if (iter->second.first != size || iter->second.second != mTime) { // for config currently running, we leave it untouched if new config is invalid mFileInfoMap[filepath] = make_pair(size, mTime); - unique_ptr detail = make_unique(new Json::Value()); + unique_ptr detail = make_unique(); if (!LoadConfigDetailFromFile(path, *detail)) { continue; } diff --git a/core/file_server/ConfigManager.cpp b/core/file_server/ConfigManager.cpp index 7c29277096..a7db095cdb 100644 --- a/core/file_server/ConfigManager.cpp +++ b/core/file_server/ConfigManager.cpp @@ -36,7 +36,6 @@ #include "app_config/AppConfig.h" #include "checkpoint/CheckPointManager.h" #include "common/CompressTools.h" -#include "constants/Constants.h" #include "common/ErrorUtil.h" #include "common/ExceptionBase.h" #include "common/FileSystemUtil.h" @@ -46,9 +45,10 @@ #include "common/StringTools.h" #include "common/TimeUtil.h" #include "common/version.h" +#include "constants/Constants.h" #include "file_server/EventDispatcher.h" -#include "file_server/event_handler/EventHandler.h" #include "file_server/FileServer.h" +#include "file_server/event_handler/EventHandler.h" #include "monitor/LogFileProfiler.h" #include "monitor/LogtailAlarm.h" #include "pipeline/Pipeline.h" @@ -101,7 +101,7 @@ DEFINE_FLAG_INT32(docker_config_update_interval, "interval between docker config namespace logtail { -// +// ParseConfResult ParseConfig(const std::string& configName, Json::Value& jsonRoot) { // Get full path, if it is a relative path, prepend process execution dir. std::string fullPath = configName; @@ -111,10 +111,15 @@ ParseConfResult ParseConfig(const std::string& configName, Json::Value& jsonRoot ifstream is; is.open(fullPath.c_str()); - if (!is.good()) { + if (!is) { // https://horstmann.com/cpp/pitfalls.html + return CONFIG_NOT_EXIST; + } + std::string buffer; + try { + buffer.assign(std::istreambuf_iterator(is), std::istreambuf_iterator()); + } catch (const std::ios_base::failure& e) { return CONFIG_NOT_EXIST; } - std::string buffer((std::istreambuf_iterator(is)), (std::istreambuf_iterator())); if (!IsValidJson(buffer.c_str(), buffer.length())) { return CONFIG_INVALID_FORMAT; } @@ -456,8 +461,9 @@ bool ConfigManager::RegisterDirectory(const std::string& source, const std::stri // e.g.: source: /path/to/monitor, file pattern: *.log, object: subdir. // Match(subdir, *.log) = false. FileDiscoveryConfig config = FindBestMatch(source, object); - if (config.first && !config.first->IsDirectoryInBlacklist(source)) + if (config.first && !config.first->IsDirectoryInBlacklist(source)) { return EventDispatcher::GetInstance()->RegisterEventHandler(source.c_str(), config, mSharedHandler); + } return false; } diff --git a/core/file_server/EventDispatcher.cpp b/core/file_server/EventDispatcher.cpp index 1895347a34..40f250ef06 100644 --- a/core/file_server/EventDispatcher.cpp +++ b/core/file_server/EventDispatcher.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include "EventDispatcher.h" + #include "Flags.h" #if defined(__linux__) #include @@ -827,6 +828,7 @@ void EventDispatcher::UnregisterEventHandler(const char* path) { mBrokenLinkSet.insert(path); } } + LOG_INFO(sLogger, ("remove a new watcher for dir", path)("wd", wd)); RemoveOneToOneMapEntry(wd); mWdUpdateTimeMap.erase(wd); if (mEventListener->IsValidID(wd) && mEventListener->IsInit()) { @@ -888,7 +890,7 @@ void EventDispatcher::HandleTimeout() { time_t curTime = time(NULL); MapType::Type::iterator itr = mWdUpdateTimeMap.begin(); for (; itr != mWdUpdateTimeMap.end(); ++itr) { - if (curTime - (itr->second) >= INT32_FLAG(timeout_interval)) { + if (curTime - (itr->second) > INT32_FLAG(timeout_interval)) { // add to vector then batch process to avoid possible iterator change problem // mHandler may remove what itr points to, thus change the layout of the map container // what follows may not work diff --git a/core/file_server/FileDiscoveryOptions.cpp b/core/file_server/FileDiscoveryOptions.cpp index 7aeda2786d..ad47349f7f 100644 --- a/core/file_server/FileDiscoveryOptions.cpp +++ b/core/file_server/FileDiscoveryOptions.cpp @@ -575,7 +575,7 @@ bool FileDiscoveryOptions::IsWildcardPathMatch(const string& path, const string& // XXX: assume path is a subdir under mBasePath bool FileDiscoveryOptions::IsTimeout(const string& path) const { - if (mPreservedDirDepth < 0 || mWildcardPaths.size() > 0) + if (mPreservedDirDepth < 0) return false; // we do not check if (path.find(mBasePath) == 0) diff --git a/core/file_server/FileServer.cpp b/core/file_server/FileServer.cpp index 5a072c6c0f..53de1d0b5e 100644 --- a/core/file_server/FileServer.cpp +++ b/core/file_server/FileServer.cpp @@ -42,8 +42,17 @@ FileServer::FileServer() { void FileServer::Start() { ConfigManager::GetInstance()->LoadDockerConfig(); CheckPointManager::Instance()->LoadCheckPoint(); + LOG_INFO(sLogger, ("watch dirs", "start")); + auto start = GetCurrentTimeInMilliSeconds(); ConfigManager::GetInstance()->RegisterHandlers(); - LOG_INFO(sLogger, ("watch dirs", "succeeded")); + auto costMs = GetCurrentTimeInMilliSeconds() - start; + if (costMs >= 60 * 1000) { + LogtailAlarm::GetInstance()->SendAlarm(REGISTER_HANDLERS_TOO_SLOW_ALARM, + "Registering handlers took " + ToString(costMs) + " ms"); + LOG_WARNING(sLogger, ("watch dirs", "succeeded")("costMs", costMs)); + } else { + LOG_INFO(sLogger, ("watch dirs", "succeeded")("costMs", costMs)); + } EventDispatcher::GetInstance()->AddExistedCheckPointFileEvents(); // the dump time must be reset after dir registration, since it may take long on NFS. CheckPointManager::Instance()->ResetLastDumpTime(); diff --git a/core/file_server/event_handler/EventHandler.cpp b/core/file_server/event_handler/EventHandler.cpp index 4c9774b373..9148b1c03e 100644 --- a/core/file_server/event_handler/EventHandler.cpp +++ b/core/file_server/event_handler/EventHandler.cpp @@ -774,7 +774,7 @@ void ModifyHandler::Handle(const Event& event) { reader->GetQueueKey(), mConfigName, event, reader->GetDevInode(), curTime); return; } - unique_ptr logBuffer(new LogBuffer); + auto logBuffer = make_unique(); hasMoreData = reader->ReadLog(*logBuffer, &event); int32_t pushRetry = PushLogToProcessor(reader, logBuffer.get()); if (!hasMoreData) { @@ -1066,10 +1066,10 @@ void ModifyHandler::DeleteRollbackReader() { } void ModifyHandler::ForceReadLogAndPush(LogFileReaderPtr reader) { - LogBuffer* logBuffer = new LogBuffer; + auto logBuffer = make_unique(); auto pEvent = reader->CreateFlushTimeoutEvent(); reader->ReadLog(*logBuffer, pEvent.get()); - PushLogToProcessor(reader, logBuffer); + PushLogToProcessor(reader, logBuffer.get()); } int32_t ModifyHandler::PushLogToProcessor(LogFileReaderPtr reader, LogBuffer* logBuffer) { diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index 634e5cf09d..4e5cbe9d9b 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -89,11 +89,14 @@ void LogInput::Start() { mInteruptFlag = false; mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); - mRegisterdHandlersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL); - mActiveReadersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL); - mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG); - - new Thread([this]() { ProcessLoop(); }); + mRegisterdHandlersTotal + = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL); + mActiveReadersTotal + = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL); + mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge( + METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG); + + mThreadRes = async(launch::async, &LogInput::ProcessLoop, this); } void LogInput::Resume() { @@ -104,15 +107,20 @@ void LogInput::Resume() { } void LogInput::HoldOn() { - LOG_INFO(sLogger, ("event handle daemon pause", "starts")); - if (BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting()) { + if (Application::GetInstance()->IsExiting()) { + LOG_INFO(sLogger, ("input event handle daemon", "stop starts")); unique_lock lock(mThreadRunningMux); - mStopCV.wait(lock, [this]() { return mInteruptFlag; }); + if (!mThreadRes.valid()) { + return; + } + mThreadRes.wait(); // should we set a timeout here? what it network outrage for an hour? + LOG_INFO(sLogger, ("input event handle daemon", "stopped successfully")); } else { + LOG_INFO(sLogger, ("input event handle daemon pause", "starts")); mInteruptFlag = true; mAccessMainThreadRWL.lock(); + LOG_INFO(sLogger, ("input event handle daemon pause", "succeeded")); } - LOG_INFO(sLogger, ("event handle daemon pause", "succeeded")); } void LogInput::TryReadEvents(bool forceRead) { @@ -213,8 +221,7 @@ bool LogInput::ReadLocalEvents() { } // set discard old data flag, so that history data will not be dropped. BOOL_FLAG(ilogtail_discard_old_data) = false; - LOG_INFO(sLogger, - ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size())); + LOG_INFO(sLogger, ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size())); for (Json::ValueIterator iter = localEventJson.begin(); iter != localEventJson.end(); ++iter) { const Json::Value& eventItem = *iter; if (!eventItem.isObject()) { @@ -365,7 +372,7 @@ void LogInput::UpdateCriticalMetric(int32_t curTime) { mEventProcessCount = 0; } -void* LogInput::ProcessLoop() { +void LogInput::ProcessLoop() { LOG_INFO(sLogger, ("event handle daemon", "started")); EventDispatcher* dispatcher = EventDispatcher::GetInstance(); dispatcher->StartTimeCount(); @@ -463,19 +470,13 @@ void* LogInput::ProcessLoop() { lastClearConfigCache = curTime; } - if (BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting() - && EventDispatcher::GetInstance()->IsAllFileRead()) { + if (Application::GetInstance()->IsExiting() + && (!BOOL_FLAG(enable_full_drain_mode) || EventDispatcher::GetInstance()->IsAllFileRead())) { break; } } mInteruptFlag = true; - mStopCV.notify_one(); - - if (!BOOL_FLAG(enable_full_drain_mode)) { - LOG_WARNING(sLogger, ("LogInputThread", "Exit")); - } - return NULL; } void LogInput::PushEventQueue(std::vector& eventVec) { diff --git a/core/file_server/event_handler/LogInput.h b/core/file_server/event_handler/LogInput.h index 05a6ebbc6f..5a36573b31 100644 --- a/core/file_server/event_handler/LogInput.h +++ b/core/file_server/event_handler/LogInput.h @@ -63,7 +63,7 @@ class LogInput : public LogRunnable { private: LogInput(); ~LogInput(); - void* ProcessLoop(); + void ProcessLoop(); void ProcessEvent(EventDispatcher* dispatcher, Event* ev); Event* PopEventQueue(); void UpdateCriticalMetric(int32_t curTime); @@ -86,8 +86,8 @@ class LogInput : public LogRunnable { IntGaugePtr mEnableFileIncludedByMultiConfigs; std::atomic_int mLastReadEventTime{0}; + std::future mThreadRes; mutable std::mutex mThreadRunningMux; - mutable std::condition_variable mStopCV; #ifdef APSARA_UNIT_TEST_MAIN friend class LogInputUnittest; diff --git a/core/file_server/polling/PollingDirFile.cpp b/core/file_server/polling/PollingDirFile.cpp index ceee841079..486fe918ee 100644 --- a/core/file_server/polling/PollingDirFile.cpp +++ b/core/file_server/polling/PollingDirFile.cpp @@ -130,133 +130,135 @@ void PollingDirFile::Polling() { LOG_INFO(sLogger, ("polling discovery", "started")); mHoldOnFlag = false; while (mRuningFlag) { - LOG_DEBUG(sLogger, ("start dir file polling, mCurrentRound", mCurrentRound)); - { - PTScopedLock thradLock(mPollingThreadLock); - mStatCount = 0; - mNewFileVec.clear(); - ++mCurrentRound; - - // Get a copy of config list from ConfigManager. - // PollingDirFile has to be held on at first because raw pointers are used here. - vector sortedConfigs; - vector wildcardConfigs; - auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); - for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) { - if (itr->second.first->GetWildcardPaths().empty()) - sortedConfigs.push_back(itr->second); - else - wildcardConfigs.push_back(itr->second); - } - sort(sortedConfigs.begin(), sortedConfigs.end(), FileDiscoveryOptions::CompareByPathLength); - - size_t configTotal = nameConfigMap.size(); - LogtailMonitor::GetInstance()->UpdateMetric("config_count", configTotal); - LoongCollectorMonitor::GetInstance()->SetAgentConfigTotal(configTotal); - { - ScopedSpinLock lock(mCacheLock); - size_t pollingDirCacheSize = mDirCacheMap.size(); - LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize); - mPollingDirCacheSize->Set(pollingDirCacheSize); - size_t pollingFileCacheSize = mFileCacheMap.size(); - LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize); - mPollingFileCacheSize->Set(pollingFileCacheSize); - } + PollingIteration(); - // Iterate all normal configs, make sure stat count will not exceed limit. - for (auto itr = sortedConfigs.begin(); - itr != sortedConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count); - ++itr) { - if (!mRuningFlag || mHoldOnFlag) - break; - - const FileDiscoveryOptions* config = itr->first; - const PipelineContext* ctx = itr->second; - if (!config->IsContainerDiscoveryEnabled()) { - fsutil::PathStat baseDirStat; - if (!fsutil::PathStat::stat(config->GetBasePath(), baseDirStat)) { - LOG_DEBUG(sLogger, - ("get base dir info error: ", config->GetBasePath())(ctx->GetProjectName(), - ctx->GetLogstoreName())); - continue; - } + // Sleep for a while, by default, 5s on Linux, 1s on Windows. + for (int i = 0; i < 10 && mRuningFlag; ++i) { + usleep(INT32_FLAG(dirfile_check_interval_ms) * 100); + } + } + LOG_DEBUG(sLogger, ("dir file polling thread done", "")); +} - int32_t lastConfigStatCount = mStatCount; - if (!PollingNormalConfigPath(*itr, config->GetBasePath(), string(), baseDirStat, 0)) { - LOG_DEBUG(sLogger, - ("logPath in config not exist", config->GetBasePath())(ctx->GetProjectName(), - ctx->GetLogstoreName())); - } - CheckConfigPollingStatCount(lastConfigStatCount, *itr, false); - } else { - for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { - const string& basePath = (*config->GetContainerInfo())[i].mRealBaseDir; - fsutil::PathStat baseDirStat; - if (!fsutil::PathStat::stat(basePath, baseDirStat)) { - LOG_DEBUG(sLogger, - ("get docker base dir info error: ", basePath)(ctx->GetProjectName(), - ctx->GetLogstoreName())); - continue; - } - int32_t lastConfigStatCount = mStatCount; - if (!PollingNormalConfigPath(*itr, basePath, string(), baseDirStat, 0)) { - LOG_DEBUG(sLogger, - ("docker logPath in config not exist", basePath)(ctx->GetProjectName(), - ctx->GetLogstoreName())); - } - CheckConfigPollingStatCount(lastConfigStatCount, *itr, true); - } - } +void PollingDirFile::PollingIteration() { + LOG_DEBUG(sLogger, ("start dir file polling, mCurrentRound", mCurrentRound)); + PTScopedLock thradLock(mPollingThreadLock); + mStatCount = 0; + mNewFileVec.clear(); + ++mCurrentRound; + + // Get a copy of config list from ConfigManager. + // PollingDirFile has to be held on at first because raw pointers are used here. + vector sortedConfigs; + vector wildcardConfigs; + auto nameConfigMap = FileServer::GetInstance()->GetAllFileDiscoveryConfigs(); + for (auto itr = nameConfigMap.begin(); itr != nameConfigMap.end(); ++itr) { + if (itr->second.first->GetWildcardPaths().empty()) + sortedConfigs.push_back(itr->second); + else + wildcardConfigs.push_back(itr->second); + } + sort(sortedConfigs.begin(), sortedConfigs.end(), FileDiscoveryOptions::CompareByPathLength); + + size_t configTotal = nameConfigMap.size(); + LogtailMonitor::GetInstance()->UpdateMetric("config_count", configTotal); + LoongCollectorMonitor::GetInstance()->SetAgentConfigTotal(configTotal); + { + ScopedSpinLock lock(mCacheLock); + size_t pollingDirCacheSize = mDirCacheMap.size(); + LogtailMonitor::GetInstance()->UpdateMetric("polling_dir_cache", pollingDirCacheSize); + mPollingDirCacheSize->Set(pollingDirCacheSize); + size_t pollingFileCacheSize = mFileCacheMap.size(); + LogtailMonitor::GetInstance()->UpdateMetric("polling_file_cache", pollingFileCacheSize); + mPollingFileCacheSize->Set(pollingFileCacheSize); + } + + // Iterate all normal configs, make sure stat count will not exceed limit. + for (auto itr = sortedConfigs.begin(); + itr != sortedConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count); + ++itr) { + if (!mRuningFlag || mHoldOnFlag) + break; + + const FileDiscoveryOptions* config = itr->first; + const PipelineContext* ctx = itr->second; + if (!config->IsContainerDiscoveryEnabled()) { + fsutil::PathStat baseDirStat; + if (!fsutil::PathStat::stat(config->GetBasePath(), baseDirStat)) { + LOG_DEBUG(sLogger, + ("get base dir info error: ", config->GetBasePath())(ctx->GetProjectName(), + ctx->GetLogstoreName())); + continue; } - // Iterate all wildcard configs, make sure stat count will not exceed limit. - for (auto itr = wildcardConfigs.begin(); - itr != wildcardConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count); - ++itr) { - if (!mRuningFlag || mHoldOnFlag) - break; - - const FileDiscoveryOptions* config = itr->first; - const PipelineContext* ctx = itr->second; - if (!config->IsContainerDiscoveryEnabled()) { - int32_t lastConfigStatCount = mStatCount; - if (!PollingWildcardConfigPath(*itr, config->GetWildcardPaths()[0], 0)) { - LOG_DEBUG(sLogger, - ("can not find matched path in config, Wildcard begin logPath", - config->GetBasePath())(ctx->GetProjectName(), ctx->GetLogstoreName())); - } - CheckConfigPollingStatCount(lastConfigStatCount, *itr, false); - } else { - for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { - const string& baseWildcardPath = (*config->GetContainerInfo())[i].mRealBaseDir; - int32_t lastConfigStatCount = mStatCount; - if (!PollingWildcardConfigPath(*itr, baseWildcardPath, 0)) { - LOG_DEBUG(sLogger, - ("can not find matched path in config, " - "Wildcard begin logPath ", - baseWildcardPath)(ctx->GetProjectName(), ctx->GetLogstoreName())); - } - CheckConfigPollingStatCount(lastConfigStatCount, *itr, true); - } + int32_t lastConfigStatCount = mStatCount; + if (!PollingNormalConfigPath(*itr, config->GetBasePath(), string(), baseDirStat, 0)) { + LOG_DEBUG(sLogger, + ("logPath in config not exist", config->GetBasePath())(ctx->GetProjectName(), + ctx->GetLogstoreName())); + } + CheckConfigPollingStatCount(lastConfigStatCount, *itr, false); + } else { + for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { + const string& basePath = (*config->GetContainerInfo())[i].mRealBaseDir; + fsutil::PathStat baseDirStat; + if (!fsutil::PathStat::stat(basePath, baseDirStat)) { + LOG_DEBUG( + sLogger, + ("get docker base dir info error: ", basePath)(ctx->GetProjectName(), ctx->GetLogstoreName())); + continue; } + int32_t lastConfigStatCount = mStatCount; + if (!PollingNormalConfigPath(*itr, basePath, string(), baseDirStat, 0)) { + LOG_DEBUG(sLogger, + ("docker logPath in config not exist", basePath)(ctx->GetProjectName(), + ctx->GetLogstoreName())); + } + CheckConfigPollingStatCount(lastConfigStatCount, *itr, true); } + } + } - // Add collected new files to PollingModify. - PollingModify::GetInstance()->AddNewFile(mNewFileVec); + // Iterate all wildcard configs, make sure stat count will not exceed limit. + for (auto itr = wildcardConfigs.begin(); + itr != wildcardConfigs.end() && mStatCount <= INT32_FLAG(polling_max_stat_count); + ++itr) { + if (!mRuningFlag || mHoldOnFlag) + break; - // Check cache, clear unavailable and overtime items. - if (mCurrentRound % INT32_FLAG(check_not_exist_file_dir_round) == 0) { - ClearUnavailableFileAndDir(); + const FileDiscoveryOptions* config = itr->first; + const PipelineContext* ctx = itr->second; + if (!config->IsContainerDiscoveryEnabled()) { + int32_t lastConfigStatCount = mStatCount; + if (!PollingWildcardConfigPath(*itr, config->GetWildcardPaths()[0], 0)) { + LOG_DEBUG(sLogger, + ("can not find matched path in config, Wildcard begin logPath", + config->GetBasePath())(ctx->GetProjectName(), ctx->GetLogstoreName())); + } + CheckConfigPollingStatCount(lastConfigStatCount, *itr, false); + } else { + for (size_t i = 0; i < config->GetContainerInfo()->size(); ++i) { + const string& baseWildcardPath = (*config->GetContainerInfo())[i].mRealBaseDir; + int32_t lastConfigStatCount = mStatCount; + if (!PollingWildcardConfigPath(*itr, baseWildcardPath, 0)) { + LOG_DEBUG(sLogger, + ("can not find matched path in config, " + "Wildcard begin logPath ", + baseWildcardPath)(ctx->GetProjectName(), ctx->GetLogstoreName())); + } + CheckConfigPollingStatCount(lastConfigStatCount, *itr, true); } - ClearTimeoutFileAndDir(); } + } - // Sleep for a while, by default, 5s on Linux, 1s on Windows. - for (int i = 0; i < 10 && mRuningFlag; ++i) { - usleep(INT32_FLAG(dirfile_check_interval_ms) * 100); - } + // Add collected new files to PollingModify. + PollingModify::GetInstance()->AddNewFile(mNewFileVec); + + // Check cache, clear unavailable and overtime items. + if (mCurrentRound % INT32_FLAG(check_not_exist_file_dir_round) == 0) { + ClearUnavailableFileAndDir(); } - LOG_DEBUG(sLogger, ("dir file polling thread done", "")); + ClearTimeoutFileAndDir(); } // Last Modified Time (LMD) of directory changes when a file or a subdirectory is added, @@ -371,9 +373,6 @@ bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig, int64_t nsec = 0; statBuf.GetLastWriteTime(sec, nsec); auto curTime = time(nullptr); - LOG_DEBUG(sLogger, - ("srcPath", srcPath)("obj", obj)("exceedPreservedDirDepth", exceedPreservedDirDepth)( - "curTime", curTime)("sec", sec)("INT32_FLAG(timeout_interval)", INT32_FLAG(timeout_interval))); if (curTime - sec > INT32_FLAG(timeout_interval)) { return false; } @@ -662,9 +661,6 @@ void PollingDirFile::ClearTimeoutFileAndDir() { ScopedSpinLock lock(mCacheLock); bool clearExceedPreservedDirDepth = false; for (auto iter = mDirCacheMap.begin(); iter != mDirCacheMap.end();) { - LOG_DEBUG(sLogger, - ("Dir", iter->first)("GetExceedPreservedDirDepth", iter->second.GetExceedPreservedDirDepth())( - "iter->second.GetLastModifyTime()", iter->second.GetLastModifyTime())); if (iter->second.GetExceedPreservedDirDepth() && (NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) > NANO_CONVERTING * INT32_FLAG(timeout_interval)) { @@ -678,9 +674,6 @@ void PollingDirFile::ClearTimeoutFileAndDir() { } clearExceedPreservedDirDepth = false; for (auto iter = mFileCacheMap.begin(); iter != mFileCacheMap.end();) { - LOG_DEBUG(sLogger, - ("File", iter->first)("GetExceedPreservedDirDepth", iter->second.GetExceedPreservedDirDepth())( - "iter->second.GetLastModifyTime()", iter->second.GetLastModifyTime())); if (iter->second.GetExceedPreservedDirDepth() && (NANO_CONVERTING * curTime - iter->second.GetLastModifyTime()) > NANO_CONVERTING * INT32_FLAG(timeout_interval)) { diff --git a/core/file_server/polling/PollingDirFile.h b/core/file_server/polling/PollingDirFile.h index 244b3ea28d..8406bafdb7 100644 --- a/core/file_server/polling/PollingDirFile.h +++ b/core/file_server/polling/PollingDirFile.h @@ -58,6 +58,7 @@ class PollingDirFile : public LogRunnable { ~PollingDirFile(); void Polling(); + void PollingIteration(); // PollingNormalConfigPath polls config with normal base path recursively. // @config: config to poll. diff --git a/core/file_server/polling/PollingEventQueue.h b/core/file_server/polling/PollingEventQueue.h index dbad5c611e..513c13cad9 100644 --- a/core/file_server/polling/PollingEventQueue.h +++ b/core/file_server/polling/PollingEventQueue.h @@ -46,6 +46,7 @@ class PollingEventQueue { friend class EventDispatcher; friend class EventDispatcherBase; friend class PollingUnittest; + friend class PollingPreservedDirDepthUnittest; void Clear(); Event* FindEvent(const std::string& src, const std::string& obj, int32_t eventType = -1); diff --git a/core/file_server/polling/PollingModify.cpp b/core/file_server/polling/PollingModify.cpp index 0a940cd4a3..e925793e58 100644 --- a/core/file_server/polling/PollingModify.cpp +++ b/core/file_server/polling/PollingModify.cpp @@ -245,59 +245,7 @@ void PollingModify::Polling() { LOG_INFO(sLogger, ("polling modify", "started")); mHoldOnFlag = false; while (mRuningFlag) { - { - PTScopedLock threadLock(mPollingThreadLock); - LoadFileNameInQueues(); - - vector deletedFileVec; - vector pollingEventVec; - int32_t statCount = 0; - size_t pollingModifySizeTotal = mModifyCacheMap.size(); - LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal); - mPollingModifySize->Set(pollingModifySizeTotal); - for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) { - if (!mRuningFlag || mHoldOnFlag) - break; - - const SplitedFilePath& filePath = iter->first; - ModifyCheckCache& modifyCache = iter->second; - fsutil::PathStat logFileStat; - if (!fsutil::PathStat::stat(PathJoin(filePath.mFileDir, filePath.mFileName), logFileStat)) { - if (errno == ENOENT) { - LOG_DEBUG(sLogger, ("file deleted", PathJoin(filePath.mFileDir, filePath.mFileName))); - if (UpdateDeletedFile(filePath, modifyCache, pollingEventVec)) { - deletedFileVec.push_back(filePath); - } - } else { - LOG_DEBUG(sLogger, ("get file info error", PathJoin(filePath.mFileDir, filePath.mFileName))); - } - } else { - int64_t sec, nsec; - logFileStat.GetLastWriteTime(sec, nsec); - timespec mtim{sec, nsec}; - auto devInode = logFileStat.GetDevInode(); - UpdateFile(filePath, - modifyCache, - devInode.dev, - devInode.inode, - logFileStat.GetFileSize(), - mtim, - pollingEventVec); - } - - ++statCount; - if (statCount % INT32_FLAG(modify_stat_count) == 0) { - usleep(1000 * INT32_FLAG(modify_stat_sleepMs)); - } - } - - if (pollingEventVec.size() > 0) { - PollingEventQueue::GetInstance()->PushEvent(pollingEventVec); - } - for (size_t i = 0; i < deletedFileVec.size(); ++i) { - mModifyCacheMap.erase(deletedFileVec[i]); - } - } + PollingIteration(); // Sleep for a while, by default, 1s. for (int i = 0; i < 10 && mRuningFlag; ++i) { @@ -307,6 +255,55 @@ void PollingModify::Polling() { LOG_INFO(sLogger, ("PollingModify::Polling", "stop")); } +void PollingModify::PollingIteration() { + PTScopedLock threadLock(mPollingThreadLock); + LoadFileNameInQueues(); + + vector deletedFileVec; + vector pollingEventVec; + int32_t statCount = 0; + size_t pollingModifySizeTotal = mModifyCacheMap.size(); + LogtailMonitor::GetInstance()->UpdateMetric("polling_modify_size", pollingModifySizeTotal); + mPollingModifySize->Set(pollingModifySizeTotal); + for (auto iter = mModifyCacheMap.begin(); iter != mModifyCacheMap.end(); ++iter) { + if (!mRuningFlag || mHoldOnFlag) + break; + + const SplitedFilePath& filePath = iter->first; + ModifyCheckCache& modifyCache = iter->second; + fsutil::PathStat logFileStat; + if (!fsutil::PathStat::stat(PathJoin(filePath.mFileDir, filePath.mFileName), logFileStat)) { + if (errno == ENOENT) { + LOG_DEBUG(sLogger, ("file deleted", PathJoin(filePath.mFileDir, filePath.mFileName))); + if (UpdateDeletedFile(filePath, modifyCache, pollingEventVec)) { + deletedFileVec.push_back(filePath); + } + } else { + LOG_DEBUG(sLogger, ("get file info error", PathJoin(filePath.mFileDir, filePath.mFileName))); + } + } else { + int64_t sec, nsec; + logFileStat.GetLastWriteTime(sec, nsec); + timespec mtim{sec, nsec}; + auto devInode = logFileStat.GetDevInode(); + UpdateFile( + filePath, modifyCache, devInode.dev, devInode.inode, logFileStat.GetFileSize(), mtim, pollingEventVec); + } + + ++statCount; + if (statCount % INT32_FLAG(modify_stat_count) == 0) { + usleep(1000 * INT32_FLAG(modify_stat_sleepMs)); + } + } + + if (pollingEventVec.size() > 0) { + PollingEventQueue::GetInstance()->PushEvent(pollingEventVec); + } + for (size_t i = 0; i < deletedFileVec.size(); ++i) { + mModifyCacheMap.erase(deletedFileVec[i]); + } +} + #ifdef APSARA_UNIT_TEST_MAIN bool PollingModify::FindNewFile(const std::string& dir, const std::string& fileName) { PTScopedLock lock(mFileLock); diff --git a/core/file_server/polling/PollingModify.h b/core/file_server/polling/PollingModify.h index 5e3738a92a..938a0596de 100644 --- a/core/file_server/polling/PollingModify.h +++ b/core/file_server/polling/PollingModify.h @@ -62,6 +62,7 @@ class PollingModify : public LogRunnable { ~PollingModify(); void Polling(); + void PollingIteration(); // MakeSpaceForNewFile tries to release some space from modify cache // for LoadFileNameInQueues to add new files. diff --git a/core/logger/Logger.cpp b/core/logger/Logger.cpp index c754d5b37b..b1e8ed8b19 100644 --- a/core/logger/Logger.cpp +++ b/core/logger/Logger.cpp @@ -195,7 +195,7 @@ void Logger::LoadConfig(const std::string& filePath) { // Load config file, check if it is valid or not. do { std::ifstream in(filePath); - if (!in.good()) + if (!in) break; in.seekg(0, std::ios::end); @@ -405,8 +405,8 @@ void Logger::LoadDefaultConfig(std::map& loggerCfgs, loggerCfgs.insert({DEFAULT_LOGGER_NAME, LoggerConfig{"AsyncFileSink", level::warn}}); if (sinkCfgs.find("AsyncFileSink") != sinkCfgs.end()) return; - sinkCfgs.insert({"AsyncFileSink", - SinkConfig{"AsyncFile", 10, 20000000, 300, GetAgentLogDir() + GetAgentLogName(), "Gzip"}}); + sinkCfgs.insert( + {"AsyncFileSink", SinkConfig{"AsyncFile", 10, 20000000, 300, GetAgentLogDir() + GetAgentLogName(), "Gzip"}}); } void Logger::LoadAllDefaultConfigs(std::map& loggerCfgs, @@ -421,8 +421,8 @@ void Logger::LoadAllDefaultConfigs(std::map& loggerCf if (!Mkdir(dirPath)) { LogMsg(std::string("Create snapshot dir error ") + dirPath + ", error" + ErrnoToString(GetErrno())); } - sinkCfgs.insert( - {"AsyncFileSinkProfile", SinkConfig{"AsyncFile", 61, 1, 1, dirPath + PATH_SEPARATOR + GetAgentProfileLogName()}}); + sinkCfgs.insert({"AsyncFileSinkProfile", + SinkConfig{"AsyncFile", 61, 1, 1, dirPath + PATH_SEPARATOR + GetAgentProfileLogName()}}); sinkCfgs.insert( {"AsyncFileSinkStatus", SinkConfig{"AsyncFile", 61, 1, 1, dirPath + PATH_SEPARATOR + GetAgentStatusLogName()}}); } diff --git a/core/monitor/LogtailAlarm.cpp b/core/monitor/LogtailAlarm.cpp index 0abf2a3ebd..6f7ce0fca4 100644 --- a/core/monitor/LogtailAlarm.cpp +++ b/core/monitor/LogtailAlarm.cpp @@ -117,6 +117,9 @@ void LogtailAlarm::Stop() { mIsThreadRunning = false; } mStopCV.notify_one(); + if (!mThreadRes.valid()) { + return; + } future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("alarm gathering", "stopped successfully")); @@ -141,7 +144,6 @@ bool LogtailAlarm::SendAlarmLoop() { } void LogtailAlarm::SendAllRegionAlarm() { - LogtailAlarmMessage* messagePtr = nullptr; int32_t currentTime = time(nullptr); size_t sendRegionIndex = 0; size_t sendAlarmTypeIndex = 0; @@ -186,7 +188,7 @@ void LogtailAlarm::SendAllRegionAlarm() { // LOG_DEBUG(sLogger, ("3Send Alarm", region)("region", sendRegionIndex)("alarm index", // mMessageType[sendAlarmTypeIndex])); - map& alarmMap = alarmBufferVec[sendAlarmTypeIndex]; + map>& alarmMap = alarmBufferVec[sendAlarmTypeIndex]; if (alarmMap.size() == 0 || currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) { // go next alarm type @@ -220,9 +222,10 @@ void LogtailAlarm::SendAllRegionAlarm() { logGroup.set_source(LogFileProfiler::mIpAddr); logGroup.set_category(ALARM_SLS_LOGSTORE_NAME); auto now = GetCurrentLogtailTime(); - for (map::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end(); + for (map>::iterator mapIter = alarmMap.begin(); + mapIter != alarmMap.end(); ++mapIter) { - messagePtr = mapIter->second; + auto& messagePtr = mapIter->second; // LOG_DEBUG(sLogger, ("5Send Alarm", region)("region", sendRegionIndex)("alarm index", // sendAlarmTypeIndex)("msg", messagePtr->mMessage)); @@ -266,7 +269,6 @@ void LogtailAlarm::SendAllRegionAlarm() { contentPtr->set_key("category"); contentPtr->set_value(messagePtr->mCategory); } - delete messagePtr; } lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime; alarmMap.clear(); @@ -319,9 +321,8 @@ void LogtailAlarm::SendAlarm(const LogtailAlarmType alarmType, string key = projectName + "_" + category; LogtailAlarmVector& alarmBufferVec = *MakesureLogtailAlarmMapVecUnlocked(region); if (alarmBufferVec[alarmType].find(key) == alarmBufferVec[alarmType].end()) { - LogtailAlarmMessage* messagePtr - = new LogtailAlarmMessage(mMessageType[alarmType], projectName, category, message, 1); - alarmBufferVec[alarmType].insert(pair(key, messagePtr)); + auto* messagePtr = new LogtailAlarmMessage(mMessageType[alarmType], projectName, category, message, 1); + alarmBufferVec[alarmType].emplace(key, messagePtr); } else alarmBufferVec[alarmType][key]->IncCount(); } diff --git a/core/monitor/LogtailAlarm.h b/core/monitor/LogtailAlarm.h index bd375c8f1c..deb3b65429 100644 --- a/core/monitor/LogtailAlarm.h +++ b/core/monitor/LogtailAlarm.h @@ -95,6 +95,7 @@ enum LogtailAlarmType { COMPRESS_FAIL_ALARM = 65, SERIALIZE_FAIL_ALARM = 66, RELABEL_METRIC_FAIL_ALARM = 67, + REGISTER_HANDLERS_TOO_SLOW_ALARM = 68, ALL_LOGTAIL_ALARM_NUM = 68 }; @@ -134,7 +135,7 @@ class LogtailAlarm { bool IsLowLevelAlarmValid(); private: - typedef std::vector > LogtailAlarmVector; + typedef std::vector>> LogtailAlarmVector; LogtailAlarm(); ~LogtailAlarm() = default; diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index 7eca928e46..bbe09534b9 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -23,7 +23,7 @@ #include #include "app_config/AppConfig.h" -#include "constants/Constants.h" +#include "application/Application.h" #include "common/DevInode.h" #include "common/ExceptionBase.h" #include "common/LogtailCommonFlags.h" @@ -32,6 +32,7 @@ #include "common/StringTools.h" #include "common/TimeUtil.h" #include "common/version.h" +#include "constants/Constants.h" #include "file_server/event_handler/LogInput.h" #include "go_pipeline/LogtailPlugin.h" #include "logger/Logger.h" @@ -41,7 +42,6 @@ #include "plugin/flusher/sls/FlusherSLS.h" #include "protobuf/sls/sls_logs.pb.h" #include "runner/FlusherRunner.h" -#include "application/Application.h" #include "sdk/Common.h" #ifdef __ENTERPRISE__ #include "config/provider/EnterpriseConfigProvider.h" @@ -121,6 +121,9 @@ void LogtailMonitor::Stop() { mIsThreadRunning = false; } mStopCV.notify_one(); + if (!mThreadRes.valid()) { + return; + } future_status s = mThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("profiling", "stopped successfully")); @@ -348,7 +351,7 @@ bool LogtailMonitor::GetMemStat() { std::ifstream fin; fin.open(SELF_STATM_PATH); - if (!fin.good()) { + if (!fin) { LOG_ERROR(sLogger, ("open stat error", "")); return false; } @@ -380,7 +383,7 @@ bool LogtailMonitor::GetCpuStat(CpuStat& cur) { std::ifstream fin; fin.open(SELF_STAT_PATH); uint64_t start = GetCurrentTimeInMilliSeconds(); - if (!fin.good()) { + if (!fin) { LOG_ERROR(sLogger, ("open stat error", "")); return false; } @@ -528,7 +531,7 @@ std::string LogtailMonitor::GetLoadAvg() { std::ifstream fin; std::string loadStr; fin.open(PROC_LOAD_PATH); - if (!fin.good()) { + if (!fin) { LOG_ERROR(sLogger, ("open load error", "")); return loadStr; } diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 40dec54bd6..55b649b86b 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -336,8 +336,8 @@ bool Pipeline::Init(PipelineConfig&& config) { } void Pipeline::Start() { -#ifndef APSARA_UNIT_TEST_MAIN - // TODO: 应该保证指定时间内返回,如果无法返回,将配置放入startDisabled里 + // #ifndef APSARA_UNIT_TEST_MAIN + // TODO: 应该保证指定时间内返回,如果无法返回,将配置放入startDisabled里 for (const auto& flusher : mFlushers) { flusher->Start(); } @@ -357,7 +357,7 @@ void Pipeline::Start() { } mStartTime->Set(chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count()); -#endif + // #endif LOG_INFO(sLogger, ("pipeline start", "succeeded")("config", mName)); } @@ -417,7 +417,6 @@ bool Pipeline::FlushBatch() { } void Pipeline::Stop(bool isRemoving) { -#ifndef APSARA_UNIT_TEST_MAIN // TODO: 应该保证指定时间内返回,如果无法返回,将配置放入stopDisabled里 for (const auto& input : mInputs) { input->Stop(isRemoving); @@ -441,7 +440,6 @@ void Pipeline::Stop(bool isRemoving) { for (const auto& flusher : mFlushers) { flusher->Stop(isRemoving); } -#endif LOG_INFO(sLogger, ("pipeline stop", "succeeded")("config", mName)); } diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 6be224f86a..0c0713abd0 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -67,7 +67,7 @@ void DiskBufferWriter::Stop() { mIsSendBufferThreadRunning = false; } mStopCV.notify_one(); - { + if (mBufferWriterThreadRes.valid()) { future_status s = mBufferWriterThreadRes.wait_for(chrono::seconds(5)); if (s == future_status::ready) { LOG_INFO(sLogger, ("disk buffer writer", "stopped successfully")); @@ -75,7 +75,7 @@ void DiskBufferWriter::Stop() { LOG_WARNING(sLogger, ("disk buffer writer", "forced to stopped")); } } - { + if (mBufferSenderThreadRes.valid()) { // timeout should be larger than network timeout, which is 15 for now future_status s = mBufferSenderThreadRes.wait_for(chrono::seconds(20)); if (s == future_status::ready) { diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index 33a0381bef..5d8a57b4d7 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -149,7 +149,7 @@ void SLSClientManager::Stop() { mIsUpdateRealIpThreadRunning = false; } mStopCV.notify_all(); - if (mDataServerSwitchPolicy == EndpointSwitchPolicy::DESIGNATED_FIRST) { + if (mDataServerSwitchPolicy == EndpointSwitchPolicy::DESIGNATED_FIRST && mProbeNetworkThreadRes.valid()) { future_status s = mProbeNetworkThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("sls endpoint probe", "stopped successfully")); @@ -157,7 +157,7 @@ void SLSClientManager::Stop() { LOG_WARNING(sLogger, ("sls endpoint probe", "forced to stopped")); } } - if (BOOL_FLAG(send_prefer_real_ip)) { + if (BOOL_FLAG(send_prefer_real_ip) && mUpdateRealIpThreadRes.valid()) { future_status s = mUpdateRealIpThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { LOG_INFO(sLogger, ("sls real ip update", "stopped successfully")); diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 095241c6ef..ecff88b3e8 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -94,6 +94,9 @@ void FlusherRunner::UpdateSendFlowControl() { void FlusherRunner::Stop() { mIsFlush = true; SenderQueueManager::GetInstance()->Trigger(); + if (!mThreadRes.valid()) { + return; + } future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(flusher_runner_exit_timeout_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("flusher runner", "stopped successfully")); diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index 98b604747a..791ca3fb62 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -56,6 +56,9 @@ void ProcessorRunner::Stop() { mIsFlush = true; ProcessQueueManager::GetInstance()->Trigger(); for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) { + if (!mThreadRes[threadNo].valid()) { + continue; + } future_status s = mThreadRes[threadNo].wait_for(chrono::seconds(INT32_FLAG(processor_runner_exit_timeout_secs))); if (s == future_status::ready) { diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 2969ee5bf9..cd86570968 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -62,6 +62,9 @@ bool HttpSink::Init() { void HttpSink::Stop() { mIsFlush = true; + if (!mThreadRes.valid()) { + return; + } future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(http_sink_exit_timeout_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("http sink", "stopped successfully")); diff --git a/core/unittest/CMakeLists.txt b/core/unittest/CMakeLists.txt index 6b41223ab5..6d0e9fd913 100644 --- a/core/unittest/CMakeLists.txt +++ b/core/unittest/CMakeLists.txt @@ -15,6 +15,9 @@ cmake_minimum_required(VERSION 3.22) project(unittest_base) +# Unittest should be able to visit private members +set(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} -fno-access-control") + add_definitions(-DAPSARA_UNIT_TEST_MAIN) set(NO_TCMALLOC TRUE) add_definitions(-DLOGTAIL_NO_TC_MALLOC) diff --git a/core/unittest/common/FileSystemUtilUnittest.h b/core/unittest/common/FileSystemUtilUnittest.h index b1427a4409..c507d8a647 100644 --- a/core/unittest/common/FileSystemUtilUnittest.h +++ b/core/unittest/common/FileSystemUtilUnittest.h @@ -14,13 +14,14 @@ * limitations under the License. */ -#include "unittest/Unittest.h" -#include -#include #include +#include +#include + #include "common/FileSystemUtil.h" -#include "common/RuntimeUtil.h" #include "common/LogtailCommonFlags.h" +#include "common/RuntimeUtil.h" +#include "unittest/Unittest.h" namespace logtail { @@ -129,7 +130,9 @@ TEST_F(FileSystemUtilUnittest, TestDirNormal) { #ifndef _MSC_VER TEST_F(FileSystemUtilUnittest, TestDirSymbolic) { - { std::ofstream((mTestRoot / "f1").string()); } + { + std::ofstream((mTestRoot / "f1").string()); + } bfs::create_directory(mTestRoot / "d1"); std::map symbolics = {{"s1", "f1"}, {"s2", "d1"}}; for (auto& s : symbolics) { @@ -181,7 +184,9 @@ TEST_F(FileSystemUtilUnittest, TestPathStat_stat) { { auto filePath = ((mTestRoot / "file").string()); - { std::ofstream(filePath).write("xxx", 3); } + { + std::ofstream(filePath).write("xxx", 3); + } fsutil::PathStat stat; EXPECT_TRUE(fsutil::PathStat::stat(filePath, stat)); DevInode devInode = stat.GetDevInode(); @@ -221,7 +226,9 @@ TEST_F(FileSystemUtilUnittest, TestPathStat_stat) { TEST_F(FileSystemUtilUnittest, TestPathStat_fstat) { auto currentTime = time(NULL); auto filePath = ((mTestRoot / "file").string()); - { std::ofstream(filePath).write("xxx", 3); } + { + std::ofstream(filePath).write("xxx", 3); + } FILE* file = fopen(filePath.c_str(), "r"); EXPECT_TRUE(file != NULL); @@ -262,7 +269,9 @@ TEST_F(FileSystemUtilUnittest, TestPathStat_fstat) { TEST_F(FileSystemUtilUnittest, TestPathStat_GetLastWriteTime) { auto filePath = ((mTestRoot / "file").string()); - { std::ofstream(filePath).write("xxx", 3); } + { + std::ofstream(filePath).write("xxx", 3); + } { int64_t sec = -1, nsec = -1; @@ -286,7 +295,9 @@ TEST_F(FileSystemUtilUnittest, TestPathStat_GetLastWriteTime) { TEST_F(FileSystemUtilUnittest, TestFileReadOnlyOpen) { auto filePath = ((mTestRoot / "file").string()); const std::string fileContent{"xxx"}; - { std::ofstream(filePath) << fileContent; } + { + std::ofstream(filePath) << fileContent; + } // Open the file and delete it before closing. // File can still be read after deleting. @@ -319,7 +330,7 @@ TEST_F(FileSystemUtilUnittest, TestFileWriteOnlyOpen) { fflush(file); // Open with C++ fstream. std::ifstream in(filePath); - EXPECT_TRUE(in.good()); + EXPECT_TRUE(in); { EXPECT_TRUE(in.read(buffer.data(), fileContentLen)); EXPECT_EQ(std::string(buffer.data(), buffer.size()), fileContent); @@ -353,7 +364,9 @@ TEST_F(FileSystemUtilUnittest, TestFileWriteOnlyOpen) { // Case #2: File is existing, open will truncate it. { - { std::ofstream(filePath) << fileContent; } + { + std::ofstream(filePath) << fileContent; + } auto file = FileWriteOnlyOpen(filePath.c_str(), "w"); ASSERT_TRUE(file != NULL); @@ -405,7 +418,9 @@ TEST_F(FileSystemUtilUnittest, TestFileAppendOpen) { // Case #3: Open existing file, check its cursor position. { - { std::ofstream(filePath) << fileContent; } + { + std::ofstream(filePath) << fileContent; + } auto file = FileAppendOpen(filePath.c_str(), "a"); EXPECT_EQ(ftell(file), fileContentLen); diff --git a/core/unittest/config/ConfigUpdateUnittest.cpp b/core/unittest/config/ConfigUpdateUnittest.cpp index 45b6fc4c7f..f041e04a9f 100644 --- a/core/unittest/config/ConfigUpdateUnittest.cpp +++ b/core/unittest/config/ConfigUpdateUnittest.cpp @@ -33,6 +33,12 @@ class PipelineMock : public Pipeline { public: bool Init(PipelineConfig&& config) { mConfig = std::move(config.mDetail); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef( + mMetricsRecordRef, + {{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()}, + {METRIC_LABEL_KEY_PIPELINE_NAME, mName}, + {METRIC_LABEL_KEY_METRIC_CATEGORY, METRIC_LABEL_KEY_METRIC_CATEGORY_PIPELINE}}); + mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME); return (*mConfig)["valid"].asBool(); } }; diff --git a/core/unittest/event/BlockedEventManagerUnittest.cpp b/core/unittest/event/BlockedEventManagerUnittest.cpp index ec5489e2d7..f30646f905 100644 --- a/core/unittest/event/BlockedEventManagerUnittest.cpp +++ b/core/unittest/event/BlockedEventManagerUnittest.cpp @@ -40,6 +40,9 @@ void BlockedEventManagerUnittest::OnFeedback() const { APSARA_TEST_EQUAL("dir", res[0]->GetSource()); APSARA_TEST_EQUAL("file", res[0]->GetObject()); APSARA_TEST_EQUAL(1U, BlockedEventManager::GetInstance()->mEventMap.size()); + for (auto* e : res) { + delete e; + } } UNIT_TEST_CASE(BlockedEventManagerUnittest, OnFeedback) diff --git a/core/unittest/metadata/K8sMetadataUnittest.cpp b/core/unittest/metadata/K8sMetadataUnittest.cpp index ba0fe76607..46b077e186 100644 --- a/core/unittest/metadata/K8sMetadataUnittest.cpp +++ b/core/unittest/metadata/K8sMetadataUnittest.cpp @@ -170,7 +170,7 @@ class k8sMetadataUnittest : public ::testing::Test { })"; eventGroup.FromJsonString(eventStr); eventGroup.AddMetricEvent(); - LabelingK8sMetadata& processor = *(new LabelingK8sMetadata); + LabelingK8sMetadata processor; processor.AddLabelToLogGroup(eventGroup); EventsContainer& eventsEnd = eventGroup.MutableEvents(); auto& metricEvent = eventsEnd[0].Cast(); diff --git a/core/unittest/models/PipelineEventPtrUnittest.cpp b/core/unittest/models/PipelineEventPtrUnittest.cpp index 9748e78191..2844759e11 100644 --- a/core/unittest/models/PipelineEventPtrUnittest.cpp +++ b/core/unittest/models/PipelineEventPtrUnittest.cpp @@ -69,9 +69,10 @@ void PipelineEventPtrUnittest::TestCast() { void PipelineEventPtrUnittest::TestRelease() { auto logUPtr = mEventGroup->CreateLogEvent(); - auto addr = logUPtr.get(); + auto* addr = logUPtr.get(); PipelineEventPtr logEventPtr(std::move(logUPtr), false, nullptr); APSARA_TEST_EQUAL_FATAL(addr, logEventPtr.Release()); + delete addr; } void PipelineEventPtrUnittest::TestCopy() { diff --git a/core/unittest/plugin/FlusherUnittest.cpp b/core/unittest/plugin/FlusherUnittest.cpp index 199ebfe306..a3deca71a1 100644 --- a/core/unittest/plugin/FlusherUnittest.cpp +++ b/core/unittest/plugin/FlusherUnittest.cpp @@ -39,12 +39,12 @@ void FlusherUnittest::TestStop() const { auto ctx = PipelineContext(); ctx.SetConfigName("test_config"); - FlusherMock* mock = new FlusherMock(); + auto mock = make_unique(); mock->SetContext(ctx); Json::Value tmp; mock->Init(Json::Value(), tmp); - auto q = SenderQueueManager::GetInstance()->GetQueue(mock->GetQueueKey()); + auto* q = SenderQueueManager::GetInstance()->GetQueue(mock->GetQueueKey()); // push items to queue for (size_t i = 0; i < q->mCapacity; ++i) { auto item = make_unique("content", 0, nullptr, mock->GetQueueKey()); diff --git a/core/unittest/polling/CMakeLists.txt b/core/unittest/polling/CMakeLists.txt index 82b1b54a17..23843d1891 100644 --- a/core/unittest/polling/CMakeLists.txt +++ b/core/unittest/polling/CMakeLists.txt @@ -16,4 +16,10 @@ cmake_minimum_required(VERSION 3.22) project(polling_unittest) # add_executable(polling_unittest PollingUnittest.cpp) -# target_link_libraries(polling_unittest ${UT_BASE_TARGET}) \ No newline at end of file +# target_link_libraries(polling_unittest ${UT_BASE_TARGET}) + +add_executable(polling_preserved_dir_depth_unittest PollingPreservedDirDepthUnittest.cpp) +target_link_libraries(polling_preserved_dir_depth_unittest ${UT_BASE_TARGET}) + +include(GoogleTest) +gtest_discover_tests(polling_preserved_dir_depth_unittest) \ No newline at end of file diff --git a/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp b/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp new file mode 100644 index 0000000000..6c66ff874e --- /dev/null +++ b/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp @@ -0,0 +1,267 @@ + +#include + +#include // Include the header for sleep_for +#include // Include the header for this_thread + +#include "application/Application.h" +#include "common/Flags.h" +#include "common/JsonUtil.h" +#include "file_server/EventDispatcher.h" +#include "file_server/event_handler/LogInput.h" +#include "file_server/polling/PollingDirFile.h" +#include "file_server/polling/PollingEventQueue.h" +#include "file_server/polling/PollingModify.h" +#include "pipeline/PipelineManager.h" +#include "pipeline/plugin/PluginRegistry.h" +#include "runner/FlusherRunner.h" +#include "runner/ProcessorRunner.h" +#include "unittest/Unittest.h" + +using namespace std; + +DECLARE_FLAG_INT32(default_max_inotify_watch_num); +DECLARE_FLAG_BOOL(enable_polling_discovery); +DECLARE_FLAG_INT32(check_timeout_interval); +DECLARE_FLAG_INT32(log_input_thread_wait_interval); +DECLARE_FLAG_INT32(check_not_exist_file_dir_round); +DECLARE_FLAG_INT32(polling_check_timeout_interval); + +namespace logtail { + +struct TestVector { + string mConfigInputDir; + string mTestDir0; + string mTestDir1; + int mPreservedDirDepth; + bool mLetTimeoutBefore2ndWriteTestFile0; + // expected results + bool mCollectTestFile2ndWrite; + bool mCollectTestFile2; +}; + +// clang-format off +/* +| No. | PreservedDirDepth | Path | 第一次文件和目录变化 | 预期采集结果 | 第二次变化时间 | 第二次文件和目录变化 | 第三次变化时间 | 第三次文件和目录变化 | 预期采集结果 /var/log/0/0.log | 预期采集结果 /var/log/1/0.log | +| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | +| 0 | 0 | /var/\*\/log | /var/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/log/1/0.log | 不采集 | 采集 | +| 1 | 0 | /var/log | /var/log/app/0/0.log | 采集 | timeout | /var/log/app/1/0.log | 采集 | 不采集 | +| 2 | 0 | /var/\*\/log | /var/app/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/app/log/1/0.log | 不采集 | 采集 | +| 3 | 1 | /var/\*\/log | /var/log/app/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/log/app/1/0.log | 不采集 | 采集 | +| 4 | 0 | /var/log | /var/log/0/0.log | 采集 | timeout | /var/log/1/0.log | 采集 | 采集 | +| 5 | 0 | /var/\*\/log | /var/log/app/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/log/app/1/0.log | 不采集 | 不采集 | +| 6 | 0 | /var/log | /var/app/log/0/0.log | 采集 | timeout | /var/app/log/1/0.log | 采集 | 采集 | +| 7 | 1 | /var/\*\/log | /var/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/log/1/0.log | 采集 | 采集 | +| 8 | 1 | /var/\*\/log | /var/app/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/app/log/1/0.log | 采集 | 采集 | +*/ +// clang-format on + +class PollingPreservedDirDepthUnittest : public ::testing::Test { + static std::string gRootDir; + static vector gTestMatrix; + +public: + static void SetUpTestCase() { + gRootDir = GetProcessExecutionDir() + "var" + PATH_SEPARATOR; + gTestMatrix = {{"*/log", "log/0", "log/1", 0, true, false, true}, + {"log", "log/app/0", "log/app/1", 0, false, true, false}, + {"*/log", "app/log/0", "app/log/1", 0, true, false, true}, + {"*/log", "log/app/0", "log/app/1", 1, true, false, true}, + {"log", "log/0", "log/1", 0, false, true, true}, + {"*/log", "log/app/0", "log/app/1", 0, true, false, false}, + {"log", "app/log/0", "app/log/1", 0, false, true, true}, + {"*/log", "log/0", "log/1", 1, true, true, true}, + {"*/log", "app/log/0", "app/log/1", 1, true, true, true}}; + + sLogger->set_level(spdlog::level::trace); + srand(time(nullptr)); + INT32_FLAG(default_max_inotify_watch_num) = 0; + BOOL_FLAG(enable_polling_discovery) = false; // we will call poll manually + INT32_FLAG(timeout_interval) = 1; + INT32_FLAG(check_timeout_interval) = 0; + INT32_FLAG(check_not_exist_file_dir_round) = 1; + INT32_FLAG(polling_check_timeout_interval) = 0; + LogFileProfiler::GetInstance(); + LoongCollectorMonitor::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); // reference: Application::Start + PluginRegistry::GetInstance()->LoadPlugins(); + ProcessorRunner::GetInstance()->Init(); + PipelineManager::GetInstance(); + FileServer::GetInstance()->Start(); + PollingDirFile::GetInstance()->Start(); + PollingModify::GetInstance()->Start(); + PollingModify::GetInstance()->Stop(); + PollingDirFile::GetInstance()->Stop(); + PollingDirFile::GetInstance()->mRuningFlag = true; + PollingModify::GetInstance()->mRuningFlag = true; + } + + static void TearDownTestCase() { + PollingDirFile::GetInstance()->mRuningFlag = false; + PollingModify::GetInstance()->mRuningFlag = false; + Application::GetInstance()->Exit(); + } + + void SetUp() override { + if (bfs::exists(gRootDir)) { + bfs::remove_all(gRootDir); + } + bfs::create_directories(gRootDir); + } + + void TearDown() override { + FileServer::GetInstance()->Pause(); + for (auto& p : PipelineManager::GetInstance()->mPipelineNameEntityMap) { + p.second->Stop(true); + } + PipelineManager::GetInstance()->mPipelineNameEntityMap.clear(); + // EventDispatcher::GetInstance()->CleanEnviroments(); + // ConfigManager::GetInstance()->CleanEnviroments(); + PollingDirFile::GetInstance()->ClearCache(); + PollingModify::GetInstance()->ClearCache(); + // PollingEventQueue::GetInstance()->Clear(); + bfs::remove_all(gRootDir); + FileServer::GetInstance()->Resume(); + } + + unique_ptr createPipelineConfig(const string& filePath, int preservedDirDepth) { + const char* confCstr = R"({ + "inputs": [ + { + "Type": "input_file", + "FilePaths": ["/var/log/**/0.log"], + "MaxDirSearchDepth": 2, + "PreservedDirDepth": -1 + } + ], + "flushers": [ + { + "Type": "flusher_blackhole" + } + ] + })"; + unique_ptr conf(new Json::Value(Json::objectValue)); + string errorMsg; + ParseJsonTable(confCstr, *conf, errorMsg); + auto& input = (*conf)["inputs"][0]; + input["FilePaths"][0] = filePath; + input["PreservedDirDepth"] = preservedDirDepth; + return conf; + } + + void generateLog(const string& testFile) { + auto pos = testFile.rfind(PATH_SEPARATOR); + auto dir = testFile.substr(0, pos); + bfs::create_directories(dir); + ofstream ofs(testFile, std::ios::app); + ofs << "0\n"; + } + + bool isFileDirRegistered(const string& testFile) { + auto pos = testFile.rfind(PATH_SEPARATOR); + auto dir = testFile.substr(0, pos); + auto registerStatus = EventDispatcher::GetInstance()->IsDirRegistered(dir); + return registerStatus == DirRegisterStatus::PATH_INODE_REGISTERED; + } + + void testPollingDirFile(const TestVector& testVector) { + auto configInputFilePath + = gRootDir + gTestMatrix[0].mConfigInputDir + PATH_SEPARATOR + "**" + PATH_SEPARATOR + "0.log"; + auto testFile1 = gRootDir + gTestMatrix[0].mTestDir0 + PATH_SEPARATOR + "0.log"; + auto testFile2 = gRootDir + gTestMatrix[0].mTestDir1 + PATH_SEPARATOR + "0.log"; + FileServer::GetInstance()->Pause(); + auto configJson = createPipelineConfig(configInputFilePath, testVector.mPreservedDirDepth); + PipelineConfig pipelineConfig("polling", std::move(configJson)); + APSARA_TEST_TRUE_FATAL(pipelineConfig.Parse()); + auto p = PipelineManager::GetInstance()->BuildPipeline( + std::move(pipelineConfig)); // reference: PipelineManager::UpdatePipelines + APSARA_TEST_FALSE_FATAL(p.get() == nullptr); + PipelineManager::GetInstance()->mPipelineNameEntityMap[pipelineConfig.mName] = p; + p->Start(); + FileServer::GetInstance()->Resume(); + + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + + // generate log for testFile1 for the 1st time + generateLog(testFile1); + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile1)); + + if (testVector.mLetTimeoutBefore2ndWriteTestFile0) { + std::this_thread::sleep_for(std::chrono::seconds( + 2 + * INT32_FLAG( + timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s + } + + // generate log for testFile1 for the 2nd time + generateLog(testFile1); + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + if (testVector.mCollectTestFile2ndWrite) { + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile1)); + } else { + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile1)); + } + + std::this_thread::sleep_for(std::chrono::seconds( + 2 + * INT32_FLAG( + timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s + + generateLog(testFile1); + generateLog(testFile2); + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + if (testVector.mCollectTestFile2ndWrite) { + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile1)); + } else { + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile1)); + } + if (testVector.mCollectTestFile2) { + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile2)); + } else { + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile2)); + } + } + + void TestPollingDirFile0() { testPollingDirFile(gTestMatrix[0]); } + void TestPollingDirFile1() { testPollingDirFile(gTestMatrix[1]); } + void TestPollingDirFile2() { testPollingDirFile(gTestMatrix[2]); } + void TestPollingDirFile3() { testPollingDirFile(gTestMatrix[3]); } + void TestPollingDirFile4() { testPollingDirFile(gTestMatrix[4]); } + void TestPollingDirFile5() { testPollingDirFile(gTestMatrix[5]); } + void TestPollingDirFile6() { testPollingDirFile(gTestMatrix[6]); } + void TestPollingDirFile7() { testPollingDirFile(gTestMatrix[7]); } + void TestPollingDirFile8() { testPollingDirFile(gTestMatrix[8]); } +}; + +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile0); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile1); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile2); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile3); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile4); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile5); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile6); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile7); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile8); + +std::string PollingPreservedDirDepthUnittest::gRootDir; +vector PollingPreservedDirDepthUnittest::gTestMatrix; +} // namespace logtail + +int main(int argc, char** argv) { + logtail::Logger::Instance().InitGlobalLoggers(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} \ No newline at end of file diff --git a/core/unittest/polling/PollingUnittest.cpp b/core/unittest/polling/PollingUnittest.cpp index 3efdc1700b..99f9fa0460 100644 --- a/core/unittest/polling/PollingUnittest.cpp +++ b/core/unittest/polling/PollingUnittest.cpp @@ -4,9 +4,9 @@ // you may not use this file except in compliance with the License. // You may obtain a copy of the License at // -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software +// ht +// Unless required by applicable law or agr.h" +#include "unittest/Unittesteed to in writing, software // distributed under the License is distributed on an "AS IS" BASIS, // WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. // See the License for the specific language governing permissions and diff --git a/core/unittest/reader/ForceReadUnittest.cpp b/core/unittest/reader/ForceReadUnittest.cpp index 411d85a33f..f08db3aaff 100644 --- a/core/unittest/reader/ForceReadUnittest.cpp +++ b/core/unittest/reader/ForceReadUnittest.cpp @@ -150,7 +150,7 @@ void ForceReadUnittest::TestTimeoutForceRead() { reader.InitReader(true, LogFileReader::BACKWARD_TO_BEGINNING); reader.CheckFileSignatureAndOffset(true); - ModifyHandler* pHanlder = new ModifyHandler(mConfigName, mConfig); + auto pHanlder = make_unique(mConfigName, mConfig); pHanlder->mReadFileTimeSlice = 0; // force one read for one event Event e1 = Event(reader.mHostLogPathDir, @@ -191,7 +191,7 @@ void ForceReadUnittest::TestTimeoutForceRead() { std::string expectedPart1(expectedContent.get()); expectedPart1.resize(expectedPart1.find("\n")); LogFileReader::BUFFER_SIZE = expectedPart1.size() + 1; - ModifyHandler* pHanlder = new ModifyHandler(mConfigName, mConfig); + auto pHanlder = make_unique(mConfigName, mConfig); pHanlder->mReadFileTimeSlice = 0; // force one read for one event Event e1 = Event(reader.mHostLogPathDir, @@ -233,7 +233,7 @@ void ForceReadUnittest::TestTimeoutForceRead() { std::string expectedPart1(expectedContent.get()); expectedPart1.resize(expectedPart1.find("\n")); LogFileReader::BUFFER_SIZE = expectedPart1.size() + 1; - ModifyHandler* pHanlder = new ModifyHandler(mConfigName, mConfig); + auto pHanlder = make_unique(mConfigName, mConfig); pHanlder->mReadFileTimeSlice = 0; // force one read for one event Event e1 = Event(reader.mHostLogPathDir, @@ -305,7 +305,7 @@ void ForceReadUnittest::TestFileCloseForceRead() { reader.CheckFileSignatureAndOffset(true); LogFileReader::BUFFER_SIZE = 1024 * 512; - ModifyHandler* pHanlder = new ModifyHandler(mConfigName, mConfig); + auto pHanlder = make_unique(mConfigName, mConfig); pHanlder->mReadFileTimeSlice = 0; // force one read for one event Event e1 = Event(reader.mHostLogPathDir, @@ -351,7 +351,7 @@ void ForceReadUnittest::TestAddTimeoutEvent() { BlockedEventManager::GetInstance()->mEventMap.clear(); APSARA_TEST_EQUAL_FATAL(BlockedEventManager::GetInstance()->mEventMap.size(), 0U); - ModifyHandler* pHanlder = new ModifyHandler(mConfigName, mConfig); + auto pHanlder = make_unique(mConfigName, mConfig); pHanlder->mReadFileTimeSlice = 0; // force one read for one event Event e1 = Event(reader.mHostLogPathDir, @@ -376,7 +376,7 @@ void ForceReadUnittest::TestAddTimeoutEvent() { BlockedEventManager::GetInstance()->mEventMap.clear(); APSARA_TEST_EQUAL_FATAL(BlockedEventManager::GetInstance()->mEventMap.size(), 0U); - ModifyHandler* pHanlder = new ModifyHandler(mConfigName, mConfig); + auto pHanlder = make_unique(mConfigName, mConfig); pHanlder->mReadFileTimeSlice = 0; // force one read for one event Event e1 = Event(reader.mHostLogPathDir, From bc4ba21383ce01d60b9399c33c5f1da142aa4aa6 Mon Sep 17 00:00:00 2001 From: Tao Yu Date: Thu, 14 Nov 2024 10:36:11 +0000 Subject: [PATCH 3/7] fix compile --- core/file_server/FileServer.cpp | 2 +- core/unittest/config/ConfigUpdateUnittest.cpp | 5 ++--- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/core/file_server/FileServer.cpp b/core/file_server/FileServer.cpp index 03b6edcefd..fc04b83a38 100644 --- a/core/file_server/FileServer.cpp +++ b/core/file_server/FileServer.cpp @@ -47,7 +47,7 @@ void FileServer::Start() { ConfigManager::GetInstance()->RegisterHandlers(); auto costMs = GetCurrentTimeInMilliSeconds() - start; if (costMs >= 60 * 1000) { - LogtailAlarm::GetInstance()->SendAlarm(REGISTER_HANDLERS_TOO_SLOW_ALARM, + AlarmManager::GetInstance()->SendAlarm(REGISTER_HANDLERS_TOO_SLOW_ALARM, "Registering handlers took " + ToString(costMs) + " ms"); LOG_WARNING(sLogger, ("watch dirs", "succeeded")("costMs", costMs)); } else { diff --git a/core/unittest/config/ConfigUpdateUnittest.cpp b/core/unittest/config/ConfigUpdateUnittest.cpp index f041e04a9f..796ae72212 100644 --- a/core/unittest/config/ConfigUpdateUnittest.cpp +++ b/core/unittest/config/ConfigUpdateUnittest.cpp @@ -35,9 +35,8 @@ class PipelineMock : public Pipeline { mConfig = std::move(config.mDetail); WriteMetrics::GetInstance()->PrepareMetricsRecordRef( mMetricsRecordRef, - {{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()}, - {METRIC_LABEL_KEY_PIPELINE_NAME, mName}, - {METRIC_LABEL_KEY_METRIC_CATEGORY, METRIC_LABEL_KEY_METRIC_CATEGORY_PIPELINE}}); + MetricCategory::METRIC_CATEGORY_PIPELINE, + {{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()}, {METRIC_LABEL_KEY_PIPELINE_NAME, mName}}); mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME); return (*mConfig)["valid"].asBool(); } From fcdfc679cdf920f9b0abe13e4da79e6179840806 Mon Sep 17 00:00:00 2001 From: Tao Yu Date: Sat, 16 Nov 2024 08:02:09 +0000 Subject: [PATCH 4/7] fix register dir --- .gitignore | 1 + core/CMakeLists.txt | 1 + core/checkpoint/CheckPointManager.cpp | 14 +- core/checkpoint/CheckPointManager.h | 2 +- core/common/links.cmake | 2 +- core/dependencies.cmake | 5 +- core/file_server/ConfigManager.cpp | 35 ++--- core/file_server/EventDispatcher.cpp | 91 ++++++------ core/file_server/EventDispatcher.h | 13 +- .../event_handler/EventHandler.cpp | 11 +- core/file_server/event_handler/LogInput.cpp | 2 +- .../event_listener/EventListener_Linux.cpp | 1 - core/file_server/polling/PollingDirFile.cpp | 3 +- .../PollingPreservedDirDepthUnittest.cpp | 139 +++++++++++++----- 14 files changed, 204 insertions(+), 116 deletions(-) diff --git a/.gitignore b/.gitignore index 06bddcc1e9..4120c0008a 100644 --- a/.gitignore +++ b/.gitignore @@ -57,6 +57,7 @@ _deps core/build/ core/protobuf/config_server/*/*.pb.* core/protobuf/*/*.pb.* +core/log_pb/*.pb.* core/common/Version.cpp !/Makefile # Enterprise diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index eb048c4d1f..04800250c2 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -33,6 +33,7 @@ cmake_dependent_option(ENABLE_STATIC_LINK_CRT "Build Logtail by linking CRT stat option(WITHOUTGDB "Build Logtail without gdb") option(WITHSPL "Build Logtail and UT with SPL" ON) option(BUILD_LOGTAIL_UT "Build unit test for Logtail") +cmake_dependent_option(ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" OFF "CMAKE_BUILD_TYPE MATCHES Debug;NOT ANDROID" ON) set(PROVIDER_PATH "provider" CACHE PATH "Path to the provider module") # external provider path can be set with -DPROVIDER_PATH set(UNITTEST_PATH "unittest" CACHE PATH "Path to the unittest module") # external unittest path can be set with -DUNITTEST_PATH diff --git a/core/checkpoint/CheckPointManager.cpp b/core/checkpoint/CheckPointManager.cpp index c0a8ef9d67..8f25b23b75 100644 --- a/core/checkpoint/CheckPointManager.cpp +++ b/core/checkpoint/CheckPointManager.cpp @@ -71,10 +71,18 @@ bool CheckPointManager::GetCheckPoint(DevInode devInode, const std::string& conf return false; } -void CheckPointManager::DeleteDirCheckPoint(const std::string& filename) { - std::unordered_map::iterator it = mDirNameMap.find(filename); - if (it != mDirNameMap.end()) +void CheckPointManager::DeleteDirCheckPoint(const std::string& dirname) { + std::unordered_map::iterator it = mDirNameMap.find(dirname); + if (it != mDirNameMap.end()) { mDirNameMap.erase(it); + } + auto parentpos = dirname.find_last_of(PATH_SEPARATOR); + if (parentpos != std::string::npos) { + auto parentDirCheckpoint = mDirNameMap.find(dirname.substr(0, parentpos)); + if (parentDirCheckpoint != mDirNameMap.end()) { + parentDirCheckpoint->second->mSubDir.erase(dirname); + } + } } bool CheckPointManager::GetDirCheckPoint(const std::string& dirname, DirCheckPointPtr& dirCheckPointPtr) { diff --git a/core/checkpoint/CheckPointManager.h b/core/checkpoint/CheckPointManager.h index 0d652b90d9..3e5a63c3cb 100644 --- a/core/checkpoint/CheckPointManager.h +++ b/core/checkpoint/CheckPointManager.h @@ -126,7 +126,7 @@ class CheckPointManager { void AddCheckPoint(CheckPoint* checkPointPtr); void AddDirCheckPoint(const std::string& dirname); void DeleteCheckPoint(DevInode devInode, const std::string& configName); - void DeleteDirCheckPoint(const std::string& filename); + void DeleteDirCheckPoint(const std::string& dirname); void LoadCheckPoint(); void LoadDirCheckPoint(const Json::Value& root); void LoadFileCheckPoint(const Json::Value& root); diff --git a/core/common/links.cmake b/core/common/links.cmake index d8ca1feb6b..2790181f58 100644 --- a/core/common/links.cmake +++ b/core/common/links.cmake @@ -23,7 +23,7 @@ macro(common_link target_name) link_zlib(${target_name}) link_zstd(${target_name}) link_unwind(${target_name}) - if (NOT ANDROID) + if (ENABLE_ADDRESS_SANITIZER) link_asan(${target_name}) endif() if (UNIX) diff --git a/core/dependencies.cmake b/core/dependencies.cmake index cc27bea370..5b69e49d82 100644 --- a/core/dependencies.cmake +++ b/core/dependencies.cmake @@ -392,9 +392,12 @@ endmacro() # asan for debug macro(link_asan target_name) - if(CMAKE_BUILD_TYPE MATCHES Debug) + if (UNIX) target_compile_options(${target_name} PUBLIC -fsanitize=address) target_link_options(${target_name} PUBLIC -fsanitize=address -static-libasan) + elseif(MSVC) + target_compile_options(${target_name} PUBLIC /fsanitize=address) + target_link_options(${target_name} PUBLIC /fsanitize=address) endif() endmacro() diff --git a/core/file_server/ConfigManager.cpp b/core/file_server/ConfigManager.cpp index 599efd5961..3e3d67c404 100644 --- a/core/file_server/ConfigManager.cpp +++ b/core/file_server/ConfigManager.cpp @@ -150,7 +150,7 @@ bool ConfigManager::RegisterHandlersRecursively(const std::string& path, return result; if (!config.first->IsDirectoryInBlacklist(path)) - result = EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler); + result = EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler); if (!result) return result; @@ -462,7 +462,7 @@ bool ConfigManager::RegisterDirectory(const std::string& source, const std::stri // Match(subdir, *.log) = false. FileDiscoveryConfig config = FindBestMatch(source, object); if (config.first && !config.first->IsDirectoryInBlacklist(source)) { - return EventDispatcher::GetInstance()->RegisterEventHandler(source.c_str(), config, mSharedHandler); + return EventDispatcher::GetInstance()->RegisterEventHandler(source, config, mSharedHandler); } return false; } @@ -478,16 +478,7 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, LOG_INFO(sLogger, ("ignore path matching host path blacklist", path)); return false; } - if (preservedDirDepth <= 0) { - DirCheckPointPtr dirCheckPoint; - if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) { - // path had dircheckpoint means it was watched before, so it is valid - const set& subdir = dirCheckPoint.get()->mSubDir; - for (const auto& it : subdir) { - RegisterHandlersWithinDepth(it, config, preservedDirDepth - 1, maxDepth - 1); - } - return true; - } + if (preservedDirDepth < 0) { fsutil::PathStat statBuf; if (!fsutil::PathStat::stat(path, statBuf)) { return true; @@ -513,14 +504,25 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err)); return false; } - if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler))) { + if (!(EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler))) { // break;// fail early, do not try to register others return false; } if (maxDepth == 0) { return true; } - bool result = true; + + if (preservedDirDepth == 0) { + DirCheckPointPtr dirCheckPoint; + if (CheckPointManager::Instance()->GetDirCheckPoint(path, dirCheckPoint)) { + // path had dircheckpoint means it was watched before, so it is valid + const set& subdir = dirCheckPoint.get()->mSubDir; + for (const auto& it : subdir) { + RegisterHandlersWithinDepth(it, config, 0, maxDepth - 1); + } + return true; + } + } fsutil::Entry ent; while ((ent = dir.ReadNext())) { string item = PathJoin(path, ent.Name()); @@ -528,8 +530,7 @@ bool ConfigManager::RegisterHandlersWithinDepth(const std::string& path, RegisterHandlersWithinDepth(item, config, preservedDirDepth - 1, maxDepth - 1); } } - - return result; + return true; } // path not terminated by '/', path already registered @@ -553,7 +554,7 @@ bool ConfigManager::RegisterDescendants(const string& path, const FileDiscoveryC LOG_ERROR(sLogger, ("Open dir error: ", path.c_str())("errno", err)); return false; } - if (!EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, mSharedHandler)) { + if (!EventDispatcher::GetInstance()->RegisterEventHandler(path, config, mSharedHandler)) { // break;// fail early, do not try to register others return false; } diff --git a/core/file_server/EventDispatcher.cpp b/core/file_server/EventDispatcher.cpp index 70ef041fe4..1255381ceb 100644 --- a/core/file_server/EventDispatcher.cpp +++ b/core/file_server/EventDispatcher.cpp @@ -89,7 +89,7 @@ DEFINE_FLAG_INT32(default_max_inotify_watch_num, "the max allowed inotify watch namespace logtail { -EventDispatcher::EventDispatcher() : mWatchNum(0), mInotifyWatchNum(0) { +EventDispatcher::EventDispatcher() : mWatchNum(0), mInotifyWatchNum(0), mEventListener(EventListener::GetInstance()) { /* * May add multiple inotify fd instances in the future, * so use epoll here though a little more sophisticated than select @@ -100,7 +100,6 @@ EventDispatcher::EventDispatcher() : mWatchNum(0), mInotifyWatchNum(0) { // mListenFd = -1; // mStreamLogTcpFd = -1; // #endif - mEventListener = EventListener::GetInstance(); if (!AppConfig::GetInstance()->NoInotify()) { if (!mEventListener->Init()) { AlarmManager::GetInstance()->SendAlarm(EPOLL_ERROR_ALARM, @@ -142,7 +141,7 @@ EventDispatcher::~EventDispatcher() { delete mTimeoutHandler; } -bool EventDispatcher::RegisterEventHandler(const char* path, +bool EventDispatcher::RegisterEventHandler(const string& path, const FileDiscoveryConfig& config, EventHandler*& handler) { if (AppConfig::GetInstance()->IsHostPathMatchBlacklist(path)) { @@ -177,7 +176,7 @@ bool EventDispatcher::RegisterEventHandler(const char* path, return false; } uint64_t inode = statBuf.GetDevInode().inode; - int wd; + int wd = -1; MapType::Type::iterator pathIter = mPathWdMap.find(path); if (pathIter != mPathWdMap.end()) { wd = pathIter->second; @@ -236,8 +235,8 @@ bool EventDispatcher::RegisterEventHandler(const char* path, } else { // need check mEventListener valid if (mEventListener->IsInit() && !AppConfig::GetInstance()->IsInInotifyBlackList(path)) { - wd = mEventListener->AddWatch(path); - if (!mEventListener->IsValidID(wd)) { + wd = mEventListener->AddWatch(path.c_str()); + if (!EventListener::IsValidID(wd)) { string str = ErrnoToString(GetErrno()); LOG_WARNING(sLogger, ("failed to register dir", path)("reason", str)); #if defined(__linux__) @@ -279,7 +278,7 @@ bool EventDispatcher::RegisterEventHandler(const char* path, bool dirTimeOutFlag = config.first->IsTimeout(path); - if (!mEventListener->IsValidID(wd)) { + if (!EventListener::IsValidID(wd)) { wd = mNonInotifyWd; if (mNonInotifyWd == INT_MIN) mNonInotifyWd = -1; @@ -311,7 +310,7 @@ bool EventDispatcher::RegisterEventHandler(const char* path, } // read files when add dir inotify watcher at first time -void EventDispatcher::AddExistedFileEvents(const char* path, int wd) { +void EventDispatcher::AddExistedFileEvents(const string& path, int wd) { fsutil::Dir dir(path); if (!dir.Open()) { auto err = GetErrno(); @@ -618,7 +617,7 @@ void EventDispatcher::AddExistedCheckPointFileEvents() { // Because they are not in v1 checkpoint manager, no need to delete them. auto exactlyOnceConfigs = FileServer::GetInstance()->GetExactlyOnceConfigs(); if (!exactlyOnceConfigs.empty()) { - static auto sCptMV2 = CheckpointManagerV2::GetInstance(); + static auto* sCptMV2 = CheckpointManagerV2::GetInstance(); auto exactlyOnceCpts = sCptMV2->ScanCheckpoints(exactlyOnceConfigs); LOG_INFO(sLogger, ("start add exactly once checkpoint events", @@ -687,14 +686,13 @@ void EventDispatcher::AddExistedCheckPointFileEvents() { } } -bool EventDispatcher::AddTimeoutWatch(const char* path) { +bool EventDispatcher::AddTimeoutWatch(const string& path) { MapType::Type::iterator itr = mPathWdMap.find(path); if (itr != mPathWdMap.end()) { mWdUpdateTimeMap[itr->second] = time(NULL); return true; - } else { - return false; } + return false; } void EventDispatcher::AddOneToOneMapEntry(DirInfo* dirInfo, int wd) { @@ -811,11 +809,11 @@ void EventDispatcher::UnregisterAllDir(const string& baseDir) { LOG_DEBUG(sLogger, ("Remove all sub dir", baseDir)); auto subDirAndHandlers = FindAllSubDirAndHandler(baseDir); for (auto& subDirAndHandler : subDirAndHandlers) { - mTimeoutHandler->Handle(Event(subDirAndHandler.first.c_str(), "", 0, 0)); + mTimeoutHandler->Handle(Event(subDirAndHandler.first, "", 0, 0)); } } -void EventDispatcher::UnregisterEventHandler(const char* path) { +void EventDispatcher::UnregisterEventHandler(const string& path) { MapType::Type::iterator pos = mPathWdMap.find(path); if (pos == mPathWdMap.end()) return; @@ -828,10 +826,9 @@ void EventDispatcher::UnregisterEventHandler(const char* path) { mBrokenLinkSet.insert(path); } } - LOG_INFO(sLogger, ("remove a new watcher for dir", path)("wd", wd)); RemoveOneToOneMapEntry(wd); mWdUpdateTimeMap.erase(wd); - if (mEventListener->IsValidID(wd) && mEventListener->IsInit()) { + if (EventListener::IsValidID(wd) && mEventListener->IsInit()) { mEventListener->RemoveWatch(wd); mInotifyWatchNum--; } @@ -843,7 +840,7 @@ void EventDispatcher::StopAllDir(const string& baseDir) { LOG_DEBUG(sLogger, ("Stop all sub dir", baseDir)); auto subDirAndHandlers = FindAllSubDirAndHandler(baseDir); for (auto& subDirAndHandler : subDirAndHandlers) { - Event e(subDirAndHandler.first.c_str(), "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, -1, 0); + Event e(subDirAndHandler.first, "", EVENT_ISDIR | EVENT_CONTAINER_STOPPED, -1, 0); subDirAndHandler.second->Handle(e); } } @@ -864,7 +861,7 @@ DirRegisterStatus EventDispatcher::IsDirRegistered(const string& path) { return PATH_INODE_NOT_REGISTERED; } -bool EventDispatcher::IsRegistered(const char* path) { +bool EventDispatcher::IsRegistered(const std::string& path) { MapType::Type::iterator itr = mPathWdMap.find(path); if (itr == mPathWdMap.end()) return false; @@ -890,6 +887,8 @@ void EventDispatcher::HandleTimeout() { time_t curTime = time(NULL); MapType::Type::iterator itr = mWdUpdateTimeMap.begin(); for (; itr != mWdUpdateTimeMap.end(); ++itr) { + LOG_ERROR(sLogger, + ("path", mWdDirInfoMap[itr->first]->mPath)("curTime", curTime)("lastupdatetime", itr->second)); if (curTime - (itr->second) > INT32_FLAG(timeout_interval)) { // add to vector then batch process to avoid possible iterator change problem // mHandler may remove what itr points to, thus change the layout of the map container @@ -911,33 +910,30 @@ void EventDispatcher::HandleTimeout() { } } -void EventDispatcher::PropagateTimeout(const char* path) { - char* tmp = strdup(path); - MapType::Type::iterator pathpos = mPathWdMap.find(tmp); +void EventDispatcher::PropagateTimeout(const std::string& path) { + auto pathpos = mPathWdMap.find(path); if (pathpos == mPathWdMap.end()) { // walkarond of bug#5760293, should find the scenarios - AlarmManager::GetInstance()->SendAlarm( - INVALID_MEMORY_ACCESS_ALARM, "PropagateTimeout access invalid key of mPathWdMap, path : " + string(tmp)); - LOG_ERROR(sLogger, ("PropagateTimeout access invalid key of mPathWdMap, path", string(tmp))); - free(tmp); + AlarmManager::GetInstance()->SendAlarm(INVALID_MEMORY_ACCESS_ALARM, + "PropagateTimeout access invalid key of mPathWdMap, path : " + path); + LOG_ERROR(sLogger, ("PropagateTimeout access invalid key of mPathWdMap, path", path)); return; } - MapType::Type::iterator pos = mWdUpdateTimeMap.find(pathpos->second); - char* slashpos; - time_t curTime = time(NULL); + string tmp(path); + auto pos = mWdUpdateTimeMap.find(pathpos->second); + time_t curTime = time(nullptr); while (pos != mWdUpdateTimeMap.end()) { pos->second = curTime; - slashpos = strrchr(tmp, '/'); - if (slashpos == NULL) + auto slashpos = tmp.rfind('/'); + if (slashpos == string::npos) break; - *slashpos = '\0'; + tmp.resize(slashpos); pathpos = mPathWdMap.find(tmp); if (pathpos != mPathWdMap.end()) pos = mWdUpdateTimeMap.find(pathpos->second); else break; } - free(tmp); } void EventDispatcher::StartTimeCount() { @@ -963,7 +959,7 @@ void EventDispatcher::DumpAllHandlersMeta(bool remove) { int wd = timeout[i]; string path = mWdDirInfoMap[wd]->mPath; if (remove) { - UnregisterEventHandler(path.c_str()); + UnregisterEventHandler(path); ConfigManager::GetInstance()->RemoveHandler(path, false); if (ConfigManager::GetInstance()->FindBestMatch(path).first == NULL) { continue; @@ -979,26 +975,29 @@ void EventDispatcher::ProcessHandlerTimeOut() { for (; mapIter != mWdDirInfoMap.end(); ++mapIter) { mapIter->second->mHandler->HandleTimeOut(); } - return; } void EventDispatcher::DumpCheckPointPeriod(int32_t curTime) { if (CheckPointManager::Instance()->NeedDump(curTime)) { - LOG_INFO(sLogger, ("checkpoint dump", "starts")); - FileServer::GetInstance()->Pause(false); - DumpAllHandlersMeta(false); - - if (!(CheckPointManager::Instance()->DumpCheckPointToLocal())) - LOG_WARNING(sLogger, ("dump checkpoint to local", "failed")); - else - LOG_DEBUG(sLogger, ("dump checkpoint to local", "succeeded")); - // after save checkpoint, we should clear all checkpoint - CheckPointManager::Instance()->RemoveAllCheckPoint(); - FileServer::GetInstance()->Resume(false); - LOG_INFO(sLogger, ("checkpoint dump", "succeeded")); + DumpCheckPoint(); } } +void EventDispatcher::DumpCheckPoint() { + LOG_INFO(sLogger, ("checkpoint dump", "starts")); + FileServer::GetInstance()->Pause(false); + DumpAllHandlersMeta(false); + + if (!(CheckPointManager::Instance()->DumpCheckPointToLocal())) + LOG_WARNING(sLogger, ("dump checkpoint to local", "failed")); + else + LOG_DEBUG(sLogger, ("dump checkpoint to local", "succeeded")); + // after save checkpoint, we should clear all checkpoint + CheckPointManager::Instance()->RemoveAllCheckPoint(); + FileServer::GetInstance()->Resume(false); + LOG_INFO(sLogger, ("checkpoint dump", "succeeded")); +} + bool EventDispatcher::IsAllFileRead() { for (auto it = mWdDirInfoMap.begin(); it != mWdDirInfoMap.end(); ++it) { if (!((it->second)->mHandler)->IsAllFileRead()) { diff --git a/core/file_server/EventDispatcher.h b/core/file_server/EventDispatcher.h index d3162720a4..98f78ac472 100644 --- a/core/file_server/EventDispatcher.h +++ b/core/file_server/EventDispatcher.h @@ -99,7 +99,7 @@ class EventDispatcher { * * @return true on success; on error false is returned */ - bool RegisterEventHandler(const char* path, const FileDiscoveryConfig&, EventHandler*& handler); + bool RegisterEventHandler(const std::string& path, const FileDiscoveryConfig&, EventHandler*& handler); /** Unregister handler for path; If no handler registered for path, do nothing but return. * After this call, no event watched on this path any more. @@ -109,7 +109,7 @@ class EventDispatcher { * @param path for whom event handler will be removed. */ // TODO see whether report errors - void UnregisterEventHandler(const char* path); + void UnregisterEventHandler(const std::string& path); /** Close handlers for path; If no handler registered for path, do nothing but return. * @@ -134,7 +134,7 @@ class EventDispatcher { * * @return true if registered, false if not */ - bool IsRegistered(const char* path); + bool IsRegistered(const std::string& path); /** Test whether a directory is registered. * @@ -203,9 +203,10 @@ class EventDispatcher { void CheckSymbolicLink(); void DumpCheckPointPeriod(int32_t curTime); + void DumpCheckPoint(); void StartTimeCount(); - void PropagateTimeout(const char* path); + void PropagateTimeout(const std::string& path); void HandleTimeout(); void ReadInotifyEvents(std::vector& eventVec); @@ -230,8 +231,8 @@ class EventDispatcher { /** * @return true on success; false if path isn't registered by RegisterEventHandler. */ - bool AddTimeoutWatch(const char* path); - void AddExistedFileEvents(const char* path, int wd); + bool AddTimeoutWatch(const std::string& path); + void AddExistedFileEvents(const std::string& path, int wd); enum class ValidateCheckpointResult { kNormal, diff --git a/core/file_server/event_handler/EventHandler.cpp b/core/file_server/event_handler/EventHandler.cpp index b4914abc82..49968bc083 100644 --- a/core/file_server/event_handler/EventHandler.cpp +++ b/core/file_server/event_handler/EventHandler.cpp @@ -119,7 +119,7 @@ void NormalEventHandler::Handle(const Event& event) { "max depth", config.first->mMaxDirSearchDepth)); EventHandler* newHandler = new CreateModifyHandler(mCreateHandlerPtr); EventHandler* handler = newHandler; - if (EventDispatcher::GetInstance()->RegisterEventHandler(path.c_str(), config, handler)) { + if (EventDispatcher::GetInstance()->RegisterEventHandler(path, config, handler)) { if (handler != newHandler) delete newHandler; else @@ -157,11 +157,13 @@ void CreateHandler::Handle(const Event& event) { if (!config.first) return; else if (event.IsDir()) - ConfigManager::GetInstance()->RegisterHandlersRecursively(path, config, false); + ConfigManager::GetInstance()->RegisterHandlers(path, config); else { // symbolic link - if (EventDispatcher::GetInstance()->IsDirRegistered(path) == PATH_INODE_NOT_REGISTERED) + if (EventDispatcher::GetInstance()->IsDirRegistered(path) == PATH_INODE_NOT_REGISTERED) { + // TODO: why not use RegisterHandlers ConfigManager::GetInstance()->RegisterHandlersRecursively(path, config, true); + } } } @@ -173,8 +175,9 @@ void CreateHandler::HandleTimeOut() { // TimeoutHandler implementation void TimeoutHandler::Handle(const Event& ev) { const string& dir = ev.GetSource(); - EventDispatcher::GetInstance()->UnregisterEventHandler(dir.c_str()); + EventDispatcher::GetInstance()->UnregisterEventHandler(dir); ConfigManager::GetInstance()->RemoveHandler(dir); + CheckPointManager::Instance()->DeleteDirCheckPoint(dir); } diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index ee5f7ad4f7..6765717922 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -374,7 +374,7 @@ void LogInput::ProcessLoop() { int32_t prevTime = time(NULL); mLastReadEventTime = prevTime; int32_t curTime = prevTime; - srand(prevTime); + srand(0); // avoid random failures in unit tests int32_t lastCheckDir = prevTime - rand() % 60; int32_t lastCheckSymbolicLink = prevTime - rand() % 60; time_t lastCheckHandlerTimeOut = prevTime - rand() % 60; diff --git a/core/file_server/event_listener/EventListener_Linux.cpp b/core/file_server/event_listener/EventListener_Linux.cpp index 634fc27a40..ae07e7223e 100644 --- a/core/file_server/event_listener/EventListener_Linux.cpp +++ b/core/file_server/event_listener/EventListener_Linux.cpp @@ -41,7 +41,6 @@ bool logtail::EventListener::Init() { int logtail::EventListener::AddWatch(const char* dir) { return inotify_add_watch(mInotifyFd, dir, mWatchEventMask); - ; } bool logtail::EventListener::RemoveWatch(int wd) { diff --git a/core/file_server/polling/PollingDirFile.cpp b/core/file_server/polling/PollingDirFile.cpp index 60fbfd315f..330e1e85f3 100644 --- a/core/file_server/polling/PollingDirFile.cpp +++ b/core/file_server/polling/PollingDirFile.cpp @@ -366,13 +366,14 @@ bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig, if (pConfig.first->mMaxDirSearchDepth >= 0 && depth > pConfig.first->mMaxDirSearchDepth) { return false; } - bool exceedPreservedDirDepth = false; + int exceedPreservedDirDepth = 0; if (pConfig.first->mPreservedDirDepth >= 0 && depth > pConfig.first->mPreservedDirDepth) { exceedPreservedDirDepth = true; int64_t sec = 0; int64_t nsec = 0; statBuf.GetLastWriteTime(sec, nsec); auto curTime = time(nullptr); + LOG_DEBUG(sLogger, ("PollingNormalConfigPath", srcPath + "/" + obj)("curTime", curTime)("writeTime", sec)); if (curTime - sec > INT32_FLAG(timeout_interval)) { return false; } diff --git a/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp b/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp index 6c66ff874e..43ba61b196 100644 --- a/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp +++ b/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp @@ -36,42 +36,41 @@ struct TestVector { int mPreservedDirDepth; bool mLetTimeoutBefore2ndWriteTestFile0; // expected results + bool mCollectTestFile1stWrite; bool mCollectTestFile2ndWrite; + bool mCollectTestFile3rdWrite; bool mCollectTestFile2; }; // clang-format off -/* -| No. | PreservedDirDepth | Path | 第一次文件和目录变化 | 预期采集结果 | 第二次变化时间 | 第二次文件和目录变化 | 第三次变化时间 | 第三次文件和目录变化 | 预期采集结果 /var/log/0/0.log | 预期采集结果 /var/log/1/0.log | -| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | -| 0 | 0 | /var/\*\/log | /var/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/log/1/0.log | 不采集 | 采集 | -| 1 | 0 | /var/log | /var/log/app/0/0.log | 采集 | timeout | /var/log/app/1/0.log | 采集 | 不采集 | -| 2 | 0 | /var/\*\/log | /var/app/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/app/log/1/0.log | 不采集 | 采集 | -| 3 | 1 | /var/\*\/log | /var/log/app/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/log/app/1/0.log | 不采集 | 采集 | -| 4 | 0 | /var/log | /var/log/0/0.log | 采集 | timeout | /var/log/1/0.log | 采集 | 采集 | -| 5 | 0 | /var/\*\/log | /var/log/app/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/log/app/1/0.log | 不采集 | 不采集 | -| 6 | 0 | /var/log | /var/app/log/0/0.log | 采集 | timeout | /var/app/log/1/0.log | 采集 | 采集 | -| 7 | 1 | /var/\*\/log | /var/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/log/1/0.log | 采集 | 采集 | -| 8 | 1 | /var/\*\/log | /var/app/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | \>timeout | /var/app/log/1/0.log | 采集 | 采集 | -*/ +// | 用例 | PreservedDirDepth | Path | 第一次文件和目录变化 | 预期采集结果 | 第二次变化时间 | 第二次文件和目录变化 | 预期采集结果 | 第三次变化时间 | 第三次文件和目录变化 | 预期采集结果 /var/log/0/0.log | 预期采集结果 /var/log/1/0.log | +// | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | --- | +// | 0 | 0 | /var/log | /var/log/app/0/0.log | 采集 | timeout | /var/log/app/1/0.log | 不采集 | 采集 | +// | 1 | 0 | /var/\*/log | /var/app/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | 不采集 | \>timeout | /var/app/log/1/0.log | 不采集 | 采集 | +// | 2 | 1 | /var/log | /var/app/log/0/0.log | 不采集 | timeout | /var/app/log/1/0.log | 不采集 | 不采集 | +// | 3 | 0 | /var/log | /var/log/0/0.log | 采集 | timeout | /var/log/1/0.log | 不采集 | 采集 | +// | 4 | 1 | /var/\*/log | /var/log/0/0.log | 不采集 | \>timeout | 在原有文件上追加数据 | 不采集 | \>timeout | /var/log/1/0.log | 不采集 | 不采集 | +// | 5 | 1 | /var/\*/log | /var/app/log/0/0.log | 采集 | \>timeout | 在原有文件上追加数据 | 采集 | \>timeout | /var/app/log/1/0.log | 采集 | 采集 | +// | 6 | 0 | /var/log | /var/log/app/0/0.log | 采集 | timeout | /var/log/app/0/1/0.log | 不采集 | 不采集 | // clang-format on class PollingPreservedDirDepthUnittest : public ::testing::Test { static std::string gRootDir; + static std::string gCheckpoint; static vector gTestMatrix; public: static void SetUpTestCase() { gRootDir = GetProcessExecutionDir() + "var" + PATH_SEPARATOR; - gTestMatrix = {{"*/log", "log/0", "log/1", 0, true, false, true}, - {"log", "log/app/0", "log/app/1", 0, false, true, false}, - {"*/log", "app/log/0", "app/log/1", 0, true, false, true}, - {"*/log", "log/app/0", "log/app/1", 1, true, false, true}, - {"log", "log/0", "log/1", 0, false, true, true}, - {"*/log", "log/app/0", "log/app/1", 0, true, false, false}, - {"log", "app/log/0", "app/log/1", 0, false, true, true}, - {"*/log", "log/0", "log/1", 1, true, true, true}, - {"*/log", "app/log/0", "app/log/1", 1, true, true, true}}; + gTestMatrix = { + {"log", "log/app/0", "log/app/1", 0, false, true, true, false, true}, + {"*/log", "app/log/0", "app/log/1", 0, true, true, false, false, true}, + {"log", "app/log/0", "app/log/1", 1, false, false, false, false, false}, + {"log", "log/0", "log/1", 0, false, true, true, false, true}, + {"*/log", "log/0", "log/1", 1, true, false, false, false, false}, + {"*/log", "app/log/0", "app/log/1", 1, true, true, true, true, true}, + {"log", "log/app/0", "log/app/0/1", 0, false, true, true, false, false}, + }; sLogger->set_level(spdlog::level::trace); srand(time(nullptr)); @@ -81,6 +80,10 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { INT32_FLAG(check_timeout_interval) = 0; INT32_FLAG(check_not_exist_file_dir_round) = 1; INT32_FLAG(polling_check_timeout_interval) = 0; + AppConfig::GetInstance()->mCheckPointFilePath = GetProcessExecutionDir() + gCheckpoint; + if (bfs::exists(AppConfig::GetInstance()->mCheckPointFilePath)) { + bfs::remove_all(AppConfig::GetInstance()->mCheckPointFilePath); + } LogFileProfiler::GetInstance(); LoongCollectorMonitor::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); // reference: Application::Start @@ -103,6 +106,9 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { } void SetUp() override { + if (bfs::exists(AppConfig::GetInstance()->mCheckPointFilePath)) { + bfs::remove_all(AppConfig::GetInstance()->mCheckPointFilePath); + } if (bfs::exists(gRootDir)) { bfs::remove_all(gRootDir); } @@ -119,11 +125,16 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { // ConfigManager::GetInstance()->CleanEnviroments(); PollingDirFile::GetInstance()->ClearCache(); PollingModify::GetInstance()->ClearCache(); + CheckPointManager::Instance()->RemoveAllCheckPoint(); // PollingEventQueue::GetInstance()->Clear(); bfs::remove_all(gRootDir); + if (bfs::exists(AppConfig::GetInstance()->mCheckPointFilePath)) { + bfs::remove_all(AppConfig::GetInstance()->mCheckPointFilePath); + } FileServer::GetInstance()->Resume(); } +private: unique_ptr createPipelineConfig(const string& filePath, int preservedDirDepth) { const char* confCstr = R"({ "inputs": [ @@ -150,6 +161,7 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { } void generateLog(const string& testFile) { + LOG_DEBUG(sLogger, ("Generate log", testFile)); auto pos = testFile.rfind(PATH_SEPARATOR); auto dir = testFile.substr(0, pos); bfs::create_directories(dir); @@ -166,9 +178,9 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { void testPollingDirFile(const TestVector& testVector) { auto configInputFilePath - = gRootDir + gTestMatrix[0].mConfigInputDir + PATH_SEPARATOR + "**" + PATH_SEPARATOR + "0.log"; - auto testFile1 = gRootDir + gTestMatrix[0].mTestDir0 + PATH_SEPARATOR + "0.log"; - auto testFile2 = gRootDir + gTestMatrix[0].mTestDir1 + PATH_SEPARATOR + "0.log"; + = gRootDir + testVector.mConfigInputDir + PATH_SEPARATOR + "**" + PATH_SEPARATOR + "0.log"; + auto testFile1 = gRootDir + testVector.mTestDir0 + PATH_SEPARATOR + "0.log"; + auto testFile2 = gRootDir + testVector.mTestDir1 + PATH_SEPARATOR + "0.log"; FileServer::GetInstance()->Pause(); auto configJson = createPipelineConfig(configInputFilePath, testVector.mPreservedDirDepth); PipelineConfig pipelineConfig("polling", std::move(configJson)); @@ -185,13 +197,17 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { std::this_thread::sleep_for(std::chrono::microseconds( 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event - // generate log for testFile1 for the 1st time + // write testFile1 for the 1st time generateLog(testFile1); PollingDirFile::GetInstance()->PollingIteration(); PollingModify::GetInstance()->PollingIteration(); std::this_thread::sleep_for(std::chrono::microseconds( 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event - APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile1)); + if (testVector.mCollectTestFile1stWrite) { + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile1)); + } else { + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile1)); + } if (testVector.mLetTimeoutBefore2ndWriteTestFile0) { std::this_thread::sleep_for(std::chrono::seconds( @@ -200,7 +216,10 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s } - // generate log for testFile1 for the 2nd time + // trigger clean timeout polling cache + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + // write testFile1 for the 2nd time generateLog(testFile1); PollingDirFile::GetInstance()->PollingIteration(); PollingModify::GetInstance()->PollingIteration(); @@ -217,13 +236,16 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { * INT32_FLAG( timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s + // trigger clean timeout polling cache + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); generateLog(testFile1); generateLog(testFile2); PollingDirFile::GetInstance()->PollingIteration(); PollingModify::GetInstance()->PollingIteration(); std::this_thread::sleep_for(std::chrono::microseconds( 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event - if (testVector.mCollectTestFile2ndWrite) { + if (testVector.mCollectTestFile3rdWrite) { APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile1)); } else { APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile1)); @@ -235,15 +257,65 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { } } +public: void TestPollingDirFile0() { testPollingDirFile(gTestMatrix[0]); } void TestPollingDirFile1() { testPollingDirFile(gTestMatrix[1]); } void TestPollingDirFile2() { testPollingDirFile(gTestMatrix[2]); } void TestPollingDirFile3() { testPollingDirFile(gTestMatrix[3]); } void TestPollingDirFile4() { testPollingDirFile(gTestMatrix[4]); } void TestPollingDirFile5() { testPollingDirFile(gTestMatrix[5]); } + void TestPollingDirFile6() { testPollingDirFile(gTestMatrix[6]); } - void TestPollingDirFile7() { testPollingDirFile(gTestMatrix[7]); } - void TestPollingDirFile8() { testPollingDirFile(gTestMatrix[8]); } + + void TestCheckpoint() { + auto configInputFilePath = gRootDir + "log/**/0.log"; + auto testFile = gRootDir + "log/0/0.log"; + FileServer::GetInstance()->Pause(); + auto configJson = createPipelineConfig(configInputFilePath, 0); + PipelineConfig pipelineConfig("polling", std::move(configJson)); + APSARA_TEST_TRUE_FATAL(pipelineConfig.Parse()); + auto p = PipelineManager::GetInstance()->BuildPipeline( + std::move(pipelineConfig)); // reference: PipelineManager::UpdatePipelines + APSARA_TEST_FALSE_FATAL(p.get() == nullptr); + PipelineManager::GetInstance()->mPipelineNameEntityMap[pipelineConfig.mName] = p; + p->Start(); + FileServer::GetInstance()->Resume(); + + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + + // generate log for testFile1 for the 1st time + generateLog(testFile); + PollingDirFile::GetInstance()->PollingIteration(); + PollingModify::GetInstance()->PollingIteration(); + std::this_thread::sleep_for(std::chrono::microseconds( + 10 * INT32_FLAG(log_input_thread_wait_interval))); // give enough time to consume event + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile)); + + // Dump and load checkpoint + FileServer::GetInstance()->Pause(true); + std::this_thread::sleep_for(std::chrono::seconds( + 2 + * INT32_FLAG( + timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s + FileServer::GetInstance()->Resume(true); + // Should remain registered after checkpoint + APSARA_TEST_TRUE_FATAL(isFileDirRegistered(testFile)); + + std::this_thread::sleep_for(std::chrono::seconds( + 2 + * INT32_FLAG( + timeout_interval))); // let timeout happen, must *2 since timeout happen only if time interval > 1s + + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile)); + // Dump and load checkpoint + FileServer::GetInstance()->Pause(true); + FileServer::GetInstance()->Resume(true); + // Should remain unregistered after checkpoint + APSARA_TEST_FALSE_FATAL(isFileDirRegistered(testFile)); + } }; UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile0); @@ -252,11 +324,10 @@ UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile2); UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile3); UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile4); UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile5); -UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile6); -UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile7); -UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestPollingDirFile8); +UNIT_TEST_CASE(PollingPreservedDirDepthUnittest, TestCheckpoint); std::string PollingPreservedDirDepthUnittest::gRootDir; +std::string PollingPreservedDirDepthUnittest::gCheckpoint = "checkpoint"; vector PollingPreservedDirDepthUnittest::gTestMatrix; } // namespace logtail From 92642c28272734f6e89f332aefa7db250e80a875 Mon Sep 17 00:00:00 2001 From: Tao Yu Date: Sat, 16 Nov 2024 10:24:47 +0000 Subject: [PATCH 5/7] fix CMakeLists.txt --- core/CMakeLists.txt | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 04800250c2..34f0733996 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -33,7 +33,7 @@ cmake_dependent_option(ENABLE_STATIC_LINK_CRT "Build Logtail by linking CRT stat option(WITHOUTGDB "Build Logtail without gdb") option(WITHSPL "Build Logtail and UT with SPL" ON) option(BUILD_LOGTAIL_UT "Build unit test for Logtail") -cmake_dependent_option(ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" OFF "CMAKE_BUILD_TYPE MATCHES Debug;NOT ANDROID" ON) +cmake_dependent_option(ENABLE_ADDRESS_SANITIZER "Enable address sanitizer" ON "CMAKE_BUILD_TYPE STREQUAL Debug;NOT ANDROID" OFF) set(PROVIDER_PATH "provider" CACHE PATH "Path to the provider module") # external provider path can be set with -DPROVIDER_PATH set(UNITTEST_PATH "unittest" CACHE PATH "Path to the unittest module") # external unittest path can be set with -DUNITTEST_PATH From 250d6d322a6a726d8c11f2ed655734c46b43c467 Mon Sep 17 00:00:00 2001 From: Tao Yu Date: Sat, 16 Nov 2024 14:58:06 +0000 Subject: [PATCH 6/7] remove unwanted logs --- core/file_server/EventDispatcher.cpp | 2 -- core/file_server/polling/PollingDirFile.cpp | 2 +- 2 files changed, 1 insertion(+), 3 deletions(-) diff --git a/core/file_server/EventDispatcher.cpp b/core/file_server/EventDispatcher.cpp index 1255381ceb..ab1f7cab38 100644 --- a/core/file_server/EventDispatcher.cpp +++ b/core/file_server/EventDispatcher.cpp @@ -887,8 +887,6 @@ void EventDispatcher::HandleTimeout() { time_t curTime = time(NULL); MapType::Type::iterator itr = mWdUpdateTimeMap.begin(); for (; itr != mWdUpdateTimeMap.end(); ++itr) { - LOG_ERROR(sLogger, - ("path", mWdDirInfoMap[itr->first]->mPath)("curTime", curTime)("lastupdatetime", itr->second)); if (curTime - (itr->second) > INT32_FLAG(timeout_interval)) { // add to vector then batch process to avoid possible iterator change problem // mHandler may remove what itr points to, thus change the layout of the map container diff --git a/core/file_server/polling/PollingDirFile.cpp b/core/file_server/polling/PollingDirFile.cpp index 330e1e85f3..0b106605ff 100644 --- a/core/file_server/polling/PollingDirFile.cpp +++ b/core/file_server/polling/PollingDirFile.cpp @@ -366,7 +366,7 @@ bool PollingDirFile::PollingNormalConfigPath(const FileDiscoveryConfig& pConfig, if (pConfig.first->mMaxDirSearchDepth >= 0 && depth > pConfig.first->mMaxDirSearchDepth) { return false; } - int exceedPreservedDirDepth = 0; + bool exceedPreservedDirDepth = false; if (pConfig.first->mPreservedDirDepth >= 0 && depth > pConfig.first->mPreservedDirDepth) { exceedPreservedDirDepth = true; int64_t sec = 0; From 642f50cb107610a14039f19b4f8039d5ba10be39 Mon Sep 17 00:00:00 2001 From: Tao Yu Date: Mon, 25 Nov 2024 06:01:52 +0000 Subject: [PATCH 7/7] fix UT --- core/unittest/polling/PollingPreservedDirDepthUnittest.cpp | 1 - 1 file changed, 1 deletion(-) diff --git a/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp b/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp index 43ba61b196..1e860271bd 100644 --- a/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp +++ b/core/unittest/polling/PollingPreservedDirDepthUnittest.cpp @@ -84,7 +84,6 @@ class PollingPreservedDirDepthUnittest : public ::testing::Test { if (bfs::exists(AppConfig::GetInstance()->mCheckPointFilePath)) { bfs::remove_all(AppConfig::GetInstance()->mCheckPointFilePath); } - LogFileProfiler::GetInstance(); LoongCollectorMonitor::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); // reference: Application::Start PluginRegistry::GetInstance()->LoadPlugins();