From b11fd8f130988798f1ef24ab6dbe26f389b3e49f Mon Sep 17 00:00:00 2001 From: Takuka0311 <1914426213@qq.com> Date: Mon, 25 Nov 2024 11:37:45 +0800 Subject: [PATCH] init (#1905) --- core/pipeline/Pipeline.cpp | 2 +- core/pipeline/batch/Batcher.h | 16 +++++++++++----- .../pipeline/plugin/instance/FlusherInstance.cpp | 3 ++- core/pipeline/plugin/instance/FlusherInstance.h | 5 +++-- core/pipeline/plugin/interface/Flusher.h | 3 +++ core/unittest/pipeline/PipelineUnittest.cpp | 12 ++++++------ core/unittest/plugin/FlusherInstanceUnittest.cpp | 2 +- 7 files changed, 27 insertions(+), 16 deletions(-) diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 359385c6c5..e4161db2ee 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -167,7 +167,7 @@ bool Pipeline::Init(PipelineConfig&& config) { = PluginRegistry::GetInstance()->CreateFlusher(pluginType, GenNextPluginMeta(false)); if (flusher) { Json::Value optionalGoPipeline; - if (!flusher->Init(detail, mContext, optionalGoPipeline)) { + if (!flusher->Init(detail, mContext, i, optionalGoPipeline)) { return false; } mFlushers.emplace_back(std::move(flusher)); diff --git a/core/pipeline/batch/Batcher.h b/core/pipeline/batch/Batcher.h index 0d47087ffe..a263d82228 100644 --- a/core/pipeline/batch/Batcher.h +++ b/core/pipeline/batch/Batcher.h @@ -176,7 +176,7 @@ class Batcher { } if (mGroupQueue->IsEmpty()) { TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(), - 0, + mFlusher->GetFlusherIndex(), 0, mGroupFlushStrategy->GetTimeoutSecs(), mFlusher); @@ -193,8 +193,11 @@ class Batcher { g.GetSourceBuffer(), g.GetExactlyOnceCheckpoint(), g.GetMetadata(EventGroupMetaKey::SOURCE_ID)); - TimeoutFlushManager::GetInstance()->UpdateRecord( - mFlusher->GetContext().GetConfigName(), 0, key, mEventFlushStrategy.GetTimeoutSecs(), mFlusher); + TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(), + mFlusher->GetFlusherIndex(), + key, + mEventFlushStrategy.GetTimeoutSecs(), + mFlusher); mBufferedGroupsTotal->Add(1); mBufferedDataSizeByte->Add(item.DataSize()); } else if (i == 0) { @@ -243,8 +246,11 @@ class Batcher { mGroupQueue->Flush(res); } if (mGroupQueue->IsEmpty()) { - TimeoutFlushManager::GetInstance()->UpdateRecord( - mFlusher->GetContext().GetConfigName(), 0, 0, mGroupFlushStrategy->GetTimeoutSecs(), mFlusher); + TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(), + mFlusher->GetFlusherIndex(), + 0, + mGroupFlushStrategy->GetTimeoutSecs(), + mFlusher); } iter->second.Flush(mGroupQueue.value()); mEventQueueMap.erase(iter); diff --git a/core/pipeline/plugin/instance/FlusherInstance.cpp b/core/pipeline/plugin/instance/FlusherInstance.cpp index 38cb9dd3b8..181e5c21e8 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.cpp +++ b/core/pipeline/plugin/instance/FlusherInstance.cpp @@ -20,9 +20,10 @@ using namespace std; namespace logtail { -bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline) { +bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, size_t flusherIdx, Json::Value& optionalGoPipeline) { mPlugin->SetContext(context); mPlugin->SetPluginID(PluginID()); + mPlugin->SetFlusherIndex(flusherIdx); mPlugin->SetMetricsRecordRef(Name(), PluginID()); if (!mPlugin->Init(config, optionalGoPipeline)) { return false; diff --git a/core/pipeline/plugin/instance/FlusherInstance.h b/core/pipeline/plugin/instance/FlusherInstance.h index 69bbba3db2..68089f60b1 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.h +++ b/core/pipeline/plugin/instance/FlusherInstance.h @@ -31,12 +31,13 @@ namespace logtail { class FlusherInstance : public PluginInstance { public: - FlusherInstance(Flusher* plugin, const PluginInstance::PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {} + FlusherInstance(Flusher* plugin, const PluginInstance::PluginMeta& pluginMeta) + : PluginInstance(pluginMeta), mPlugin(plugin) {} const std::string& Name() const override { return mPlugin->Name(); }; const Flusher* GetPlugin() const { return mPlugin.get(); } - bool Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline); + bool Init(const Json::Value& config, PipelineContext& context, size_t flusherIdx, Json::Value& optionalGoPipeline); bool Start() { return mPlugin->Start(); } bool Stop(bool isPipelineRemoving) { return mPlugin->Stop(isPipelineRemoving); } bool Send(PipelineEventGroup&& g); diff --git a/core/pipeline/plugin/interface/Flusher.h b/core/pipeline/plugin/interface/Flusher.h index 6bf3301477..232020df34 100644 --- a/core/pipeline/plugin/interface/Flusher.h +++ b/core/pipeline/plugin/interface/Flusher.h @@ -44,6 +44,8 @@ class Flusher : public Plugin { QueueKey GetQueueKey() const { return mQueueKey; } void SetPluginID(const std::string& pluginID) { mPluginID = pluginID; } + size_t GetFlusherIndex() { return mIndex; } + void SetFlusherIndex(size_t idx) { mIndex = idx; } const std::string& GetPluginID() const { return mPluginID; } protected: @@ -54,6 +56,7 @@ class Flusher : public Plugin { QueueKey mQueueKey; std::string mPluginID; + size_t mIndex = 0; #ifdef APSARA_UNIT_TEST_MAIN friend class FlusherInstanceUnittest; diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 8134336fc5..0f9b273a37 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -2736,13 +2736,13 @@ void PipelineUnittest::TestSend() const { { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); - flusher->Init(Json::Value(), ctx, tmp); + flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); - flusher->Init(Json::Value(), ctx, tmp); + flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } vector> configs; @@ -2788,13 +2788,13 @@ void PipelineUnittest::TestSend() const { { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); - flusher->Init(Json::Value(), ctx, tmp); + flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); - flusher->Init(Json::Value(), ctx, tmp); + flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } @@ -2855,13 +2855,13 @@ void PipelineUnittest::TestFlushBatch() const { { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); - flusher->Init(Json::Value(), ctx, tmp); + flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } { auto flusher = PluginRegistry::GetInstance()->CreateFlusher(FlusherMock::sName, pipeline.GenNextPluginMeta(false)); - flusher->Init(Json::Value(), ctx, tmp); + flusher->Init(Json::Value(), ctx, 0, tmp); pipeline.mFlushers.emplace_back(std::move(flusher)); } { diff --git a/core/unittest/plugin/FlusherInstanceUnittest.cpp b/core/unittest/plugin/FlusherInstanceUnittest.cpp index b6fcc33633..800529df70 100644 --- a/core/unittest/plugin/FlusherInstanceUnittest.cpp +++ b/core/unittest/plugin/FlusherInstanceUnittest.cpp @@ -46,7 +46,7 @@ void FlusherInstanceUnittest::TestInit() const { = make_unique(new FlusherMock(), PluginInstance::PluginMeta("0")); Json::Value config, opt; PipelineContext context; - APSARA_TEST_TRUE(flusher->Init(config, context, opt)); + APSARA_TEST_TRUE(flusher->Init(config, context, 0, opt)); APSARA_TEST_EQUAL(&context, &flusher->GetPlugin()->GetContext()); }