From 3ad7c63323576224ec68766ed3d3e5e7a0766a50 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=8E=84=E9=A3=8F?= Date: Tue, 26 Nov 2024 07:39:05 +0000 Subject: [PATCH] fix flusher bug --- core/plugin/flusher/local_file/FlusherLocalFile.cpp | 11 ++++++++++- core/plugin/flusher/local_file/FlusherLocalFile.h | 1 + 2 files changed, 11 insertions(+), 1 deletion(-) diff --git a/core/plugin/flusher/local_file/FlusherLocalFile.cpp b/core/plugin/flusher/local_file/FlusherLocalFile.cpp index 5b84cced81..ff1f26f932 100644 --- a/core/plugin/flusher/local_file/FlusherLocalFile.cpp +++ b/core/plugin/flusher/local_file/FlusherLocalFile.cpp @@ -27,6 +27,10 @@ namespace logtail { const string FlusherLocalFile::sName = "flusher_local_file"; bool FlusherLocalFile::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { + static uint32_t cnt = 0; + GenerateQueueKey(to_string(++cnt)); + SenderQueueManager::GetInstance()->CreateQueue(mQueueKey, mPluginID, *mContext); + string errorMsg; // FileName if (!GetMandatoryStringParam(config, "FileName", mFileName, errorMsg)) { @@ -39,6 +43,8 @@ bool FlusherLocalFile::Init(const Json::Value& config, Json::Value& optionalGoPi mContext->GetLogstoreName(), mContext->GetRegion()); } + // Pattern + GetMandatoryStringParam(config, "Pattern", mPattern, errorMsg); // MaxFileSize GetMandatoryUIntParam(config, "MaxFileSize", mMaxFileSize, errorMsg); // MaxFiles @@ -48,8 +54,9 @@ bool FlusherLocalFile::Init(const Json::Value& config, Json::Value& optionalGoPi auto file_sink = std::make_shared(mFileName, mMaxFileSize, mMaxFiles, true); mFileWriter = std::make_shared( sName, file_sink, spdlog::thread_pool(), spdlog::async_overflow_policy::block); - mFileWriter->set_pattern("[%Y-%m-%d %H:%M:%S.%f] %v"); + mFileWriter->set_pattern(mPattern); + mBatcher.Init(Json::Value(), this, DefaultFlushStrategyOptions{}); mGroupSerializer = make_unique(this); mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL); return true; @@ -90,6 +97,7 @@ bool FlusherLocalFile::SerializeAndPush(PipelineEventGroup&& group) { } else { LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); } + mFileWriter->flush(); return true; } @@ -104,6 +112,7 @@ bool FlusherLocalFile::SerializeAndPush(BatchedEventsList&& groupList) { LOG_ERROR(sLogger, ("serialize pipeline event group error", errorMsg)); } } + mFileWriter->flush(); return true; } diff --git a/core/plugin/flusher/local_file/FlusherLocalFile.h b/core/plugin/flusher/local_file/FlusherLocalFile.h index 5cfd0d0283..9fab539871 100644 --- a/core/plugin/flusher/local_file/FlusherLocalFile.h +++ b/core/plugin/flusher/local_file/FlusherLocalFile.h @@ -43,6 +43,7 @@ class FlusherLocalFile : public Flusher { std::shared_ptr mFileWriter; std::string mFileName; + std::string mPattern = "[%Y-%m-%d %H:%M:%S.%f] %v"; uint32_t mMaxFileSize = 1024 * 1024 * 10; uint32_t mMaxFiles = 10; Batcher mBatcher;