Skip to content

Commit

Permalink
feat: add full_drain_mode option to ensure complete data flush over n…
Browse files Browse the repository at this point in the history
…etwork before shutdown (alibaba#1395)

* support flushing out data rather than stopping immediately on app exit
  • Loading branch information
henryzhx8 authored Mar 22, 2024
1 parent df2e5b6 commit f2c3ade
Show file tree
Hide file tree
Showing 12 changed files with 118 additions and 51 deletions.
8 changes: 4 additions & 4 deletions core/aggregator/Aggregator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ bool Aggregator::FlushReadyBuffer() {
PTScopedLock lock(mMergeLock);
unordered_map<int64_t, MergeItem*>::iterator itr = mMergeMap.begin();
for (; itr != mMergeMap.end();) {
if (sender->IsFlush()
if (Application::GetInstance()->IsExiting()
|| (itr->second->IsReady()
&& sender->GetSenderFeedBackInterface()->IsValidToPush(itr->second->mLogstoreKey))) {
if (itr->second->mMergeType == FlusherSLS::Batch::MergeType::TOPIC)
Expand All @@ -85,7 +85,7 @@ bool Aggregator::FlushReadyBuffer() {
PTScopedLock lock(mMergeLock);
unordered_map<int64_t, PackageListMergeBuffer*>::iterator pIter = mPackageListMergeMap.begin();
for (; pIter != mPackageListMergeMap.end();) {
if (sender->IsFlush()
if (Application::GetInstance()->IsExiting()
|| (pIter->second->IsReady(curTime) && pIter->second->mMergeItems.size() > 0
&& sender->GetSenderFeedBackInterface()->IsValidToPush(
pIter->second->GetFirstItem()->mLogstoreKey))) {
Expand Down Expand Up @@ -345,7 +345,7 @@ bool Aggregator::Add(const std::string& projectName,

if (mergeType == FlusherSLS::Batch::MergeType::LOGSTORE) {
pIter->second->AddMergeItem(value);
if (pIter->second->IsReady(curTime) || sender->IsFlush()) {
if (pIter->second->IsReady(curTime) || Application::GetInstance()->IsExiting()) {
#ifdef LOGTAIL_DEBUG_FLAG
LOG_DEBUG(sLogger,
("Send logstore merged packet, size", pIter->second->mMergeItems.size())(
Expand All @@ -358,7 +358,7 @@ bool Aggregator::Add(const std::string& projectName,
mPackageListMergeMap.erase(pIter);
}
} else {
if (value != NULL && (value->IsReady() || sender->IsFlush() || context.mExactlyOnceCheckpoint)) {
if (value != NULL && (value->IsReady() || Application::GetInstance()->IsExiting() || context.mExactlyOnceCheckpoint)) {
sendDataVec.push_back(value);
if (itr != mMergeMap.end()) {
mMergeMap.erase(itr);
Expand Down
3 changes: 2 additions & 1 deletion core/application/Application.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@
#include <cstdint>
#include <string>

#include "common/Thread.h"
#include "common/Lock.h"
#include "common/Thread.h"

namespace logtail {

Expand All @@ -38,6 +38,7 @@ class Application {
void Init();
void Start();
void SetSigTermSignalFlag(bool flag) { mSigTermSignalFlag = flag; }
bool IsExiting() { return mSigTermSignalFlag; }

std::string GetInstanceId() { return mInstanceId; }
bool TryGetUUID();
Expand Down
1 change: 1 addition & 0 deletions core/common/LogtailCommonFlags.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@ DEFINE_FLAG_STRING(logtail_integrity_snapshot, "integrity file on local disk", "
DEFINE_FLAG_STRING(ilogtail_config,
"set dataserver & configserver address; (optional)set cpu,mem,bufflerfile,buffermap and etc.",
"ilogtail_config.json");
DEFINE_FLAG_BOOL(enable_full_drain_mode, "", false);
DEFINE_FLAG_INT32(cpu_limit_num, "cpu violate limit num before shutdown", 10);
DEFINE_FLAG_INT32(mem_limit_num, "memory violate limit num before shutdown", 10);
DEFINE_FLAG_DOUBLE(cpu_usage_up_limit, "cpu usage upper limit, cores", 2.0);
Expand Down
1 change: 1 addition & 0 deletions core/common/LogtailCommonFlags.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ DECLARE_FLAG_STRING(logtail_integrity_snapshot);

// app config
DECLARE_FLAG_STRING(ilogtail_config);
DECLARE_FLAG_BOOL(enable_full_drain_mode);
DECLARE_FLAG_INT32(cpu_limit_num);
DECLARE_FLAG_INT32(mem_limit_num);
DECLARE_FLAG_DOUBLE(cpu_usage_up_limit);
Expand Down
9 changes: 9 additions & 0 deletions core/controller/EventDispatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1002,6 +1002,15 @@ void EventDispatcher::DumpCheckPointPeriod(int32_t curTime) {
}
}

bool EventDispatcher::IsAllFileRead() {
for (auto it = mWdDirInfoMap.begin(); it != mWdDirInfoMap.end(); ++it) {
if (!((it->second)->mHandler)->IsAllFileRead()) {
return false;
}
}
return true;
}

#ifdef APSARA_UNIT_TEST_MAIN
void EventDispatcher::CleanEnviroments() {
// mMainThreadRunning = false;
Expand Down
2 changes: 2 additions & 0 deletions core/controller/EventDispatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,8 @@ class EventDispatcher {

void ClearBrokenLinkSet() { mBrokenLinkSet.clear(); }

bool IsAllFileRead();

protected:
EventDispatcher();
~EventDispatcher();
Expand Down
44 changes: 34 additions & 10 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,24 +13,26 @@
// limitations under the License.

#include "EventHandler.h"

#include <iostream>
#include <string>
#include <vector>
#include "common/TimeUtil.h"
#include "common/RuntimeUtil.h"

#include "LogInput.h"
#include "app_config/AppConfig.h"
#include "common/FileSystemUtil.h"
#include "common/LogFileCollectOffsetIndicator.h"
#include "common/RuntimeUtil.h"
#include "common/StringTools.h"
#include "app_config/AppConfig.h"
#include "event/BlockEventManager.h"
#include "controller/EventDispatcher.h"
#include "common/TimeUtil.h"
#include "config_manager/ConfigManager.h"
#include "controller/EventDispatcher.h"
#include "event/BlockEventManager.h"
#include "file_server/FileServer.h"
#include "fuse/FuseFileBlacklist.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "processor/daemon/LogProcess.h"
#include "logger/Logger.h"
#include "fuse/FuseFileBlacklist.h"
#include "common/LogFileCollectOffsetIndicator.h"
#include "LogInput.h"
#include "file_server/FileServer.h"

using namespace std;
using namespace sls_logs;
Expand Down Expand Up @@ -188,6 +190,15 @@ bool CreateModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigF
return true;
}

bool CreateModifyHandler::IsAllFileRead() {
for (ModifyHandlerMap::iterator iter = mModifyHandlerPtrMap.begin(); iter != mModifyHandlerPtrMap.end(); ++iter) {
if (!iter->second->IsAllFileRead()) {
return false;
}
}
return true;
}

ModifyHandler* CreateModifyHandler::GetOrCreateModifyHandler(const std::string& configName,
const FileDiscoveryConfig& pConfig) {
ModifyHandlerMap::iterator iter = mModifyHandlerPtrMap.find(configName);
Expand Down Expand Up @@ -941,6 +952,19 @@ bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) {
return true;
}

bool ModifyHandler::IsAllFileRead() {
for (auto it = mNameReaderMap.begin(); it != mNameReaderMap.end(); ++it) {
if (it->second.size() > 1 || (!it->second.empty() && !it->second[0]->IsReadToEnd())) {
return false;
}
if (!it->second.empty()) {
// force flushing the last line immediately instead of waiting for timeout
ForceReadLogAndPush(it->second[0]);
}
}
return true;
}

void ModifyHandler::DeleteTimeoutReader() {
if ((int32_t)mDevInodeReaderMap.size() > INT32_FLAG(logreader_count_maxlimit))
DeleteTimeoutReader(86400);
Expand Down
9 changes: 7 additions & 2 deletions core/event_handler/EventHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,12 +15,14 @@
*/

#pragma once
#include "reader/LogFileReader.h"
#include <time.h>
#include <map>

#include <deque>
#include <map>
#include <unordered_map>

#include "reader/LogFileReader.h"

namespace logtail {

class Event;
Expand All @@ -37,6 +39,7 @@ class EventHandler {
virtual void Handle(const Event& event) = 0;
virtual void HandleTimeOut() = 0;
virtual bool DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) = 0;
virtual bool IsAllFileRead() { return true; }
virtual ~EventHandler() {}
};

Expand Down Expand Up @@ -94,6 +97,7 @@ class ModifyHandler : public EventHandler {
virtual void Handle(const Event& event);
virtual void HandleTimeOut();
virtual bool DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag);
bool IsAllFileRead() override;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ConfigUpdatorUnittest;
Expand Down Expand Up @@ -147,6 +151,7 @@ class CreateModifyHandler : public EventHandler {
virtual void Handle(const Event& event);
virtual void HandleTimeOut();
virtual bool DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag);
bool IsAllFileRead() override;

ModifyHandler* GetOrCreateModifyHandler(const std::string& configName, const FileDiscoveryConfig& pConfig);

Expand Down
60 changes: 39 additions & 21 deletions core/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -13,32 +13,34 @@
// limitations under the License.

#include "LogInput.h"

#include <time.h>

#include "EventHandler.h"
#include "HistoryFileImporter.h"
#include "app_config/AppConfig.h"
#include "application/Application.h"
#include "checkpoint/CheckPointManager.h"
#include "common/FileSystemUtil.h"
#include "common/HashUtil.h"
#include "common/LogtailCommonFlags.h"
#include "common/RuntimeUtil.h"
#include "common/StringTools.h"
#include "common/HashUtil.h"
#include "common/TimeUtil.h"
#include "common/FileSystemUtil.h"
#include "polling/PollingCache.h"
#include "monitor/Monitor.h"
#include "processor/daemon/LogProcess.h"
#include "config_manager/ConfigManager.h"
#include "controller/EventDispatcher.h"
#include "event/BlockEventManager.h"
#include "logger/Logger.h"
#include "monitor/LogtailAlarm.h"
#include "checkpoint/CheckPointManager.h"
#include "monitor/Monitor.h"
#include "polling/PollingCache.h"
#include "polling/PollingDirFile.h"
#include "polling/PollingEventQueue.h"
#include "polling/PollingModify.h"
#include "reader/LogFileReader.h"
#include "processor/daemon/LogProcess.h"
#include "reader/GloablFileDescriptorManager.h"
#include "app_config/AppConfig.h"
#include "reader/LogFileReader.h"
#include "sender/Sender.h"
#include "polling/PollingEventQueue.h"
#include "event/BlockEventManager.h"
#include "config_manager/ConfigManager.h"
#include "logger/Logger.h"
#include "EventHandler.h"
#include "HistoryFileImporter.h"
#include "application/Application.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#endif
Expand Down Expand Up @@ -99,8 +101,13 @@ void LogInput::Resume() {

void LogInput::HoldOn() {
LOG_INFO(sLogger, ("event handle daemon pause", "starts"));
mInteruptFlag = true;
mAccessMainThreadRWL.lock();
if (BOOL_FLAG(enable_full_drain_mode)) {
unique_lock<mutex> lock(mThreadRunningMux);
mStopCV.wait(lock, [this]() { return mInteruptFlag; });
} else {
mInteruptFlag = true;
mAccessMainThreadRWL.lock();
}
LOG_INFO(sLogger, ("event handle daemon pause", "succeeded"));
}

Expand Down Expand Up @@ -331,11 +338,12 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {

void LogInput::UpdateCriticalMetric(int32_t curTime) {
LogtailMonitor::GetInstance()->UpdateMetric("last_read_event_time",
GetTimeStamp(mLastReadEventTime, "%Y-%m-%d %H:%M:%S"));
GetTimeStamp(mLastReadEventTime, "%Y-%m-%d %H:%M:%S"));

LogtailMonitor::GetInstance()->UpdateMetric("event_tps", 1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime));
LogtailMonitor::GetInstance()->UpdateMetric("event_tps",
1.0 * mEventProcessCount / (curTime - mLastUpdateMetricTime));
LogtailMonitor::GetInstance()->UpdateMetric("open_fd",
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LogtailMonitor::GetInstance()->UpdateMetric("register_handler", EventDispatcher::GetInstance()->GetHandlerCount());
LogtailMonitor::GetInstance()->UpdateMetric("reader_count", CheckPointManager::Instance()->GetReaderCount());
LogtailMonitor::GetInstance()->UpdateMetric("multi_config", AppConfig::GetInstance()->IsAcceptMultiConfig());
Expand Down Expand Up @@ -440,9 +448,19 @@ void* LogInput::ProcessLoop() {
ConfigManager::GetInstance()->ClearConfigMatchCache();
lastClearConfigCache = curTime;
}

if (BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting()
&& EventDispatcher::GetInstance()->IsAllFileRead()) {
break;
}
}

LOG_WARNING(sLogger, ("LogInputThread", "Exit"));
mInteruptFlag = true;
mStopCV.notify_one();

if (!BOOL_FLAG(enable_full_drain_mode)) {
LOG_WARNING(sLogger, ("LogInputThread", "Exit"));
}
return NULL;
}

Expand Down
8 changes: 6 additions & 2 deletions core/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,10 +17,12 @@
#ifndef __LOG_ILOGTAIL_LOG_INPUT_H__
#define __LOG_ILOGTAIL_LOG_INPUT_H__

#include <string>
#include <condition_variable>
#include <queue>
#include <vector>
#include <string>
#include <unordered_set>
#include <vector>

#include "common/Lock.h"
#include "common/LogRunnable.h"

Expand Down Expand Up @@ -78,6 +80,8 @@ class LogInput : public LogRunnable {
int32_t mLastUpdateMetricTime;

std::atomic_int mLastReadEventTime{0};
mutable std::mutex mThreadRunningMux;
mutable std::condition_variable mStopCV;

#ifdef APSARA_UNIT_TEST_MAIN
friend class LogInputUnittest;
Expand Down
2 changes: 0 additions & 2 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -212,8 +212,6 @@ void PipelineManager::StopAllPipelines() {
Sender::Instance()->SetQueueUrgent();
bool logProcessFlushFlag = false;
for (int i = 0; !logProcessFlushFlag && i < 500; ++i) {
// deamon send thread may reset flush, so we should set flush every time
Sender::Instance()->SetFlush();
logProcessFlushFlag = LogProcess::GetInstance()->FlushOut(10);
}
if (!logProcessFlushFlag) {
Expand Down
Loading

0 comments on commit f2c3ade

Please sign in to comment.