From f2c3ade343eda5cab39487b10ecccc56cbf0b5a9 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 22 Mar 2024 10:29:58 +0800 Subject: [PATCH] feat: add full_drain_mode option to ensure complete data flush over network before shutdown (#1395) * support flushing out data rather than stopping immediately on app exit --- core/aggregator/Aggregator.cpp | 8 ++-- core/application/Application.h | 3 +- core/common/LogtailCommonFlags.cpp | 1 + core/common/LogtailCommonFlags.h | 1 + core/controller/EventDispatcher.cpp | 9 +++++ core/controller/EventDispatcher.h | 2 + core/event_handler/EventHandler.cpp | 44 ++++++++++++++++----- core/event_handler/EventHandler.h | 9 ++++- core/event_handler/LogInput.cpp | 60 +++++++++++++++++++---------- core/event_handler/LogInput.h | 8 +++- core/pipeline/PipelineManager.cpp | 2 - core/sender/Sender.cpp | 22 ++++++----- 12 files changed, 118 insertions(+), 51 deletions(-) diff --git a/core/aggregator/Aggregator.cpp b/core/aggregator/Aggregator.cpp index a42dd6bfb3..28eefa2b82 100644 --- a/core/aggregator/Aggregator.cpp +++ b/core/aggregator/Aggregator.cpp @@ -59,7 +59,7 @@ bool Aggregator::FlushReadyBuffer() { PTScopedLock lock(mMergeLock); unordered_map::iterator itr = mMergeMap.begin(); for (; itr != mMergeMap.end();) { - if (sender->IsFlush() + if (Application::GetInstance()->IsExiting() || (itr->second->IsReady() && sender->GetSenderFeedBackInterface()->IsValidToPush(itr->second->mLogstoreKey))) { if (itr->second->mMergeType == FlusherSLS::Batch::MergeType::TOPIC) @@ -85,7 +85,7 @@ bool Aggregator::FlushReadyBuffer() { PTScopedLock lock(mMergeLock); unordered_map::iterator pIter = mPackageListMergeMap.begin(); for (; pIter != mPackageListMergeMap.end();) { - if (sender->IsFlush() + if (Application::GetInstance()->IsExiting() || (pIter->second->IsReady(curTime) && pIter->second->mMergeItems.size() > 0 && sender->GetSenderFeedBackInterface()->IsValidToPush( pIter->second->GetFirstItem()->mLogstoreKey))) { @@ -345,7 +345,7 @@ bool Aggregator::Add(const std::string& projectName, if (mergeType == FlusherSLS::Batch::MergeType::LOGSTORE) { pIter->second->AddMergeItem(value); - if (pIter->second->IsReady(curTime) || sender->IsFlush()) { + if (pIter->second->IsReady(curTime) || Application::GetInstance()->IsExiting()) { #ifdef LOGTAIL_DEBUG_FLAG LOG_DEBUG(sLogger, ("Send logstore merged packet, size", pIter->second->mMergeItems.size())( @@ -358,7 +358,7 @@ bool Aggregator::Add(const std::string& projectName, mPackageListMergeMap.erase(pIter); } } else { - if (value != NULL && (value->IsReady() || sender->IsFlush() || context.mExactlyOnceCheckpoint)) { + if (value != NULL && (value->IsReady() || Application::GetInstance()->IsExiting() || context.mExactlyOnceCheckpoint)) { sendDataVec.push_back(value); if (itr != mMergeMap.end()) { mMergeMap.erase(itr); diff --git a/core/application/Application.h b/core/application/Application.h index 3c4f678622..fcb990b05e 100644 --- a/core/application/Application.h +++ b/core/application/Application.h @@ -20,8 +20,8 @@ #include #include -#include "common/Thread.h" #include "common/Lock.h" +#include "common/Thread.h" namespace logtail { @@ -38,6 +38,7 @@ class Application { void Init(); void Start(); void SetSigTermSignalFlag(bool flag) { mSigTermSignalFlag = flag; } + bool IsExiting() { return mSigTermSignalFlag; } std::string GetInstanceId() { return mInstanceId; } bool TryGetUUID(); diff --git a/core/common/LogtailCommonFlags.cpp b/core/common/LogtailCommonFlags.cpp index 040aa2f32f..9cdf7f0c5f 100644 --- a/core/common/LogtailCommonFlags.cpp +++ b/core/common/LogtailCommonFlags.cpp @@ -83,6 +83,7 @@ DEFINE_FLAG_STRING(logtail_integrity_snapshot, "integrity file on local disk", " DEFINE_FLAG_STRING(ilogtail_config, "set dataserver & configserver address; (optional)set cpu,mem,bufflerfile,buffermap and etc.", "ilogtail_config.json"); +DEFINE_FLAG_BOOL(enable_full_drain_mode, "", false); DEFINE_FLAG_INT32(cpu_limit_num, "cpu violate limit num before shutdown", 10); DEFINE_FLAG_INT32(mem_limit_num, "memory violate limit num before shutdown", 10); DEFINE_FLAG_DOUBLE(cpu_usage_up_limit, "cpu usage upper limit, cores", 2.0); diff --git a/core/common/LogtailCommonFlags.h b/core/common/LogtailCommonFlags.h index 0df7ffccab..85b9ec858b 100644 --- a/core/common/LogtailCommonFlags.h +++ b/core/common/LogtailCommonFlags.h @@ -24,6 +24,7 @@ DECLARE_FLAG_STRING(logtail_integrity_snapshot); // app config DECLARE_FLAG_STRING(ilogtail_config); +DECLARE_FLAG_BOOL(enable_full_drain_mode); DECLARE_FLAG_INT32(cpu_limit_num); DECLARE_FLAG_INT32(mem_limit_num); DECLARE_FLAG_DOUBLE(cpu_usage_up_limit); diff --git a/core/controller/EventDispatcher.cpp b/core/controller/EventDispatcher.cpp index b626defa5b..a64e80b677 100644 --- a/core/controller/EventDispatcher.cpp +++ b/core/controller/EventDispatcher.cpp @@ -1002,6 +1002,15 @@ void EventDispatcher::DumpCheckPointPeriod(int32_t curTime) { } } +bool EventDispatcher::IsAllFileRead() { + for (auto it = mWdDirInfoMap.begin(); it != mWdDirInfoMap.end(); ++it) { + if (!((it->second)->mHandler)->IsAllFileRead()) { + return false; + } + } + return true; +} + #ifdef APSARA_UNIT_TEST_MAIN void EventDispatcher::CleanEnviroments() { // mMainThreadRunning = false; diff --git a/core/controller/EventDispatcher.h b/core/controller/EventDispatcher.h index 485cfa767b..6511cc9cfe 100644 --- a/core/controller/EventDispatcher.h +++ b/core/controller/EventDispatcher.h @@ -217,6 +217,8 @@ class EventDispatcher { void ClearBrokenLinkSet() { mBrokenLinkSet.clear(); } + bool IsAllFileRead(); + protected: EventDispatcher(); ~EventDispatcher(); diff --git a/core/event_handler/EventHandler.cpp b/core/event_handler/EventHandler.cpp index 7ce15f8377..dca69e01fe 100644 --- a/core/event_handler/EventHandler.cpp +++ b/core/event_handler/EventHandler.cpp @@ -13,24 +13,26 @@ // limitations under the License. #include "EventHandler.h" + #include #include #include -#include "common/TimeUtil.h" -#include "common/RuntimeUtil.h" + +#include "LogInput.h" +#include "app_config/AppConfig.h" #include "common/FileSystemUtil.h" +#include "common/LogFileCollectOffsetIndicator.h" +#include "common/RuntimeUtil.h" #include "common/StringTools.h" -#include "app_config/AppConfig.h" -#include "event/BlockEventManager.h" -#include "controller/EventDispatcher.h" +#include "common/TimeUtil.h" #include "config_manager/ConfigManager.h" +#include "controller/EventDispatcher.h" +#include "event/BlockEventManager.h" +#include "file_server/FileServer.h" +#include "fuse/FuseFileBlacklist.h" +#include "logger/Logger.h" #include "monitor/LogtailAlarm.h" #include "processor/daemon/LogProcess.h" -#include "logger/Logger.h" -#include "fuse/FuseFileBlacklist.h" -#include "common/LogFileCollectOffsetIndicator.h" -#include "LogInput.h" -#include "file_server/FileServer.h" using namespace std; using namespace sls_logs; @@ -188,6 +190,15 @@ bool CreateModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigF return true; } +bool CreateModifyHandler::IsAllFileRead() { + for (ModifyHandlerMap::iterator iter = mModifyHandlerPtrMap.begin(); iter != mModifyHandlerPtrMap.end(); ++iter) { + if (!iter->second->IsAllFileRead()) { + return false; + } + } + return true; +} + ModifyHandler* CreateModifyHandler::GetOrCreateModifyHandler(const std::string& configName, const FileDiscoveryConfig& pConfig) { ModifyHandlerMap::iterator iter = mModifyHandlerPtrMap.find(configName); @@ -941,6 +952,19 @@ bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) { return true; } +bool ModifyHandler::IsAllFileRead() { + for (auto it = mNameReaderMap.begin(); it != mNameReaderMap.end(); ++it) { + if (it->second.size() > 1 || (!it->second.empty() && !it->second[0]->IsReadToEnd())) { + return false; + } + if (!it->second.empty()) { + // force flushing the last line immediately instead of waiting for timeout + ForceReadLogAndPush(it->second[0]); + } + } + return true; +} + void ModifyHandler::DeleteTimeoutReader() { if ((int32_t)mDevInodeReaderMap.size() > INT32_FLAG(logreader_count_maxlimit)) DeleteTimeoutReader(86400); diff --git a/core/event_handler/EventHandler.h b/core/event_handler/EventHandler.h index a729c5d9f6..f81966babf 100644 --- a/core/event_handler/EventHandler.h +++ b/core/event_handler/EventHandler.h @@ -15,12 +15,14 @@ */ #pragma once -#include "reader/LogFileReader.h" #include -#include + #include +#include #include +#include "reader/LogFileReader.h" + namespace logtail { class Event; @@ -37,6 +39,7 @@ class EventHandler { virtual void Handle(const Event& event) = 0; virtual void HandleTimeOut() = 0; virtual bool DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) = 0; + virtual bool IsAllFileRead() { return true; } virtual ~EventHandler() {} }; @@ -94,6 +97,7 @@ class ModifyHandler : public EventHandler { virtual void Handle(const Event& event); virtual void HandleTimeOut(); virtual bool DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag); + bool IsAllFileRead() override; #ifdef APSARA_UNIT_TEST_MAIN friend class ConfigUpdatorUnittest; @@ -147,6 +151,7 @@ class CreateModifyHandler : public EventHandler { virtual void Handle(const Event& event); virtual void HandleTimeOut(); virtual bool DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag); + bool IsAllFileRead() override; ModifyHandler* GetOrCreateModifyHandler(const std::string& configName, const FileDiscoveryConfig& pConfig); diff --git a/core/event_handler/LogInput.cpp b/core/event_handler/LogInput.cpp index 90c81835c6..d47142bfc0 100644 --- a/core/event_handler/LogInput.cpp +++ b/core/event_handler/LogInput.cpp @@ -13,32 +13,34 @@ // limitations under the License. #include "LogInput.h" + #include + +#include "EventHandler.h" +#include "HistoryFileImporter.h" +#include "app_config/AppConfig.h" +#include "application/Application.h" +#include "checkpoint/CheckPointManager.h" +#include "common/FileSystemUtil.h" +#include "common/HashUtil.h" #include "common/LogtailCommonFlags.h" #include "common/RuntimeUtil.h" #include "common/StringTools.h" -#include "common/HashUtil.h" #include "common/TimeUtil.h" -#include "common/FileSystemUtil.h" -#include "polling/PollingCache.h" -#include "monitor/Monitor.h" -#include "processor/daemon/LogProcess.h" +#include "config_manager/ConfigManager.h" #include "controller/EventDispatcher.h" +#include "event/BlockEventManager.h" +#include "logger/Logger.h" #include "monitor/LogtailAlarm.h" -#include "checkpoint/CheckPointManager.h" +#include "monitor/Monitor.h" +#include "polling/PollingCache.h" #include "polling/PollingDirFile.h" +#include "polling/PollingEventQueue.h" #include "polling/PollingModify.h" -#include "reader/LogFileReader.h" +#include "processor/daemon/LogProcess.h" #include "reader/GloablFileDescriptorManager.h" -#include "app_config/AppConfig.h" +#include "reader/LogFileReader.h" #include "sender/Sender.h" -#include "polling/PollingEventQueue.h" -#include "event/BlockEventManager.h" -#include "config_manager/ConfigManager.h" -#include "logger/Logger.h" -#include "EventHandler.h" -#include "HistoryFileImporter.h" -#include "application/Application.h" #ifdef __ENTERPRISE__ #include "config/provider/EnterpriseConfigProvider.h" #endif @@ -99,8 +101,13 @@ void LogInput::Resume() { void LogInput::HoldOn() { LOG_INFO(sLogger, ("event handle daemon pause", "starts")); - mInteruptFlag = true; - mAccessMainThreadRWL.lock(); + if (BOOL_FLAG(enable_full_drain_mode)) { + unique_lock lock(mThreadRunningMux); + mStopCV.wait(lock, [this]() { return mInteruptFlag; }); + } else { + mInteruptFlag = true; + mAccessMainThreadRWL.lock(); + } LOG_INFO(sLogger, ("event handle daemon pause", "succeeded")); } @@ -331,11 +338,12 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) { void LogInput::UpdateCriticalMetric(int32_t curTime) { LogtailMonitor::GetInstance()->UpdateMetric("last_read_event_time", - GetTimeStamp(mLastReadEventTime, "%Y-%m-%d %H:%M:%S")); + GetTimeStamp(mLastReadEventTime, "%Y-%m-%d %H:%M:%S")); - LogtailMonitor::GetInstance()->UpdateMetric("event_tps", 1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime)); + LogtailMonitor::GetInstance()->UpdateMetric("event_tps", + 1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime)); LogtailMonitor::GetInstance()->UpdateMetric("open_fd", - GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize()); + GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize()); LogtailMonitor::GetInstance()->UpdateMetric("register_handler", EventDispatcher::GetInstance()->GetHandlerCount()); LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount()); LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig()); @@ -440,9 +448,19 @@ void* LogInput::ProcessLoop() { ConfigManager::GetInstance()->ClearConfigMatchCache(); lastClearConfigCache = curTime; } + + if (BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting() + && EventDispatcher::GetInstance()->IsAllFileRead()) { + break; + } } - LOG_WARNING(sLogger, ("LogInputThread", "Exit")); + mInteruptFlag = true; + mStopCV.notify_one(); + + if (!BOOL_FLAG(enable_full_drain_mode)) { + LOG_WARNING(sLogger, ("LogInputThread", "Exit")); + } return NULL; } diff --git a/core/event_handler/LogInput.h b/core/event_handler/LogInput.h index 56db45546d..a405aeb1ce 100644 --- a/core/event_handler/LogInput.h +++ b/core/event_handler/LogInput.h @@ -17,10 +17,12 @@ #ifndef __LOG_ILOGTAIL_LOG_INPUT_H__ #define __LOG_ILOGTAIL_LOG_INPUT_H__ -#include +#include #include -#include +#include #include +#include + #include "common/Lock.h" #include "common/LogRunnable.h" @@ -78,6 +80,8 @@ class LogInput : public LogRunnable { int32_t mLastUpdateMetricTime; std::atomic_int mLastReadEventTime{0}; + mutable std::mutex mThreadRunningMux; + mutable std::condition_variable mStopCV; #ifdef APSARA_UNIT_TEST_MAIN friend class LogInputUnittest; diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 09043b9461..23fa291329 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -212,8 +212,6 @@ void PipelineManager::StopAllPipelines() { Sender::Instance()->SetQueueUrgent(); bool logProcessFlushFlag = false; for (int i = 0; !logProcessFlushFlag && i < 500; ++i) { - // deamon send thread may reset flush, so we should set flush every time - Sender::Instance()->SetFlush(); logProcessFlushFlag = LogProcess::GetInstance()->FlushOut(10); } if (!logProcessFlushFlag) { diff --git a/core/sender/Sender.cpp b/core/sender/Sender.cpp index 152cb2ebba..501ca9bd76 100644 --- a/core/sender/Sender.cpp +++ b/core/sender/Sender.cpp @@ -1397,7 +1397,7 @@ void Sender::DaemonSender() { uint32_t bufferPackageCount = 0; bool singleBatchMapFull = false; int32_t curTime = time(NULL); - if (IsFlush()) { + if (Application::GetInstance()->IsExiting()) { mSenderQueue.PopAllItem(logGroupToSend, curTime, singleBatchMapFull); } else { std::unordered_map regionConcurrencyLimits; @@ -1468,7 +1468,7 @@ void Sender::DaemonSender() { /////////////////////////////////////// // smoothing send tps, walk around webserver load burst - if (!IsFlush() && AppConfig::GetInstance()->IsSendRandomSleep()) { + if (!Application::GetInstance()->IsExiting() && AppConfig::GetInstance()->IsSendRandomSleep()) { int64_t sleepMicroseconds = 0; if (bufferPackageCount < 10) sleepMicroseconds = (rand() % 40) * 10000; // 0ms ~ 400ms @@ -1510,12 +1510,13 @@ void Sender::DaemonSender() { DescSendingCount(); } else { #endif - if (!IsFlush() && AppConfig::GetInstance()->IsSendFlowControl()) { + if (!Application::GetInstance()->IsExiting() && AppConfig::GetInstance()->IsSendFlowControl()) { FlowControl(data->mRawSize, REALTIME_SEND_THREAD); } int32_t beforeSleepTime = time(NULL); - while (!IsFlush() && GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestConcurrency()) { + while (!Application::GetInstance()->IsExiting() + && GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestConcurrency()) { usleep(10 * 1000); } int32_t afterSleepTime = time(NULL); @@ -1568,7 +1569,8 @@ void Sender::PutIntoSecondaryBuffer(LoggroupTimeValue* dataPtr, int32_t retryTim ++retry; { PTScopedLock lock(mSecondaryMutexLock); - if (IsFlush() || (mSecondaryBuffer.size() < (uint32_t)INT32_FLAG(secondary_buffer_count_limit))) { + if (Application::GetInstance()->IsExiting() + || (mSecondaryBuffer.size() < (uint32_t)INT32_FLAG(secondary_buffer_count_limit))) { mSecondaryBuffer.push_back(dataPtr); writeDone = true; break; @@ -2073,7 +2075,8 @@ SendResult Sender::SendToNetSync(sdk::Client* sendClient, void Sender::SendToNetAsync(LoggroupTimeValue* dataPtr) { auto& exactlyOnceCpt = dataPtr->mLogGroupContext.mExactlyOnceCheckpoint; - if (IsFlush()) // write local file avoid binary update fail + if (!BOOL_FLAG(enable_full_drain_mode) + && Application::GetInstance()->IsExiting()) // write local file avoid binary update fail { SubSendingBufferCount(); if (!exactlyOnceCpt) { @@ -2539,16 +2542,18 @@ void Sender::ResetRegionConcurrency(const std::string& region) { bool Sender::FlushOut(int32_t time_interval_in_mili_seconds) { static Aggregator* aggregator = Aggregator::GetInstance(); - SetFlush(); aggregator->FlushReadyBuffer(); + SetFlush(); for (int i = 0; i < time_interval_in_mili_seconds / 100; ++i) { mSenderQueue.Signal(); { WaitObject::Lock lock(mWriteSecondaryWait); mWriteSecondaryWait.signal(); } - if (IsFlush() == false) { + if (!IsFlush()) { // double check, fix bug #13758589 + // TODO: this is not necessary, the task of checking whether all data has been flushed should be done in + // this func, not in Sender thread aggregator->FlushReadyBuffer(); if (aggregator->IsMergeMapEmpty() && IsBatchMapEmpty() && GetSendingCount() == 0 && IsSecondaryBufferEmpty()) { @@ -2558,7 +2563,6 @@ bool Sender::FlushOut(int32_t time_interval_in_mili_seconds) { continue; } } - aggregator->FlushReadyBuffer(); usleep(100 * 1000); } ResetFlush();