From 8559f64d1464a4272000bbf06ac5f204887a6967 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Wed, 11 Dec 2024 00:59:05 +0800 Subject: [PATCH] fix --- core/config/PipelineConfig.cpp | 1 + core/config/PipelineConfig.h | 2 +- core/config/watcher/PipelineConfigWatcher.cpp | 8 ++++---- core/pipeline/Pipeline.h | 4 ++-- core/pipeline/plugin/PluginRegistry.cpp | 4 ++-- core/pipeline/plugin/PluginRegistry.h | 2 +- 6 files changed, 11 insertions(+), 10 deletions(-) diff --git a/core/config/PipelineConfig.cpp b/core/config/PipelineConfig.cpp index 4a5dfb2e50..4aeea027ca 100644 --- a/core/config/PipelineConfig.cpp +++ b/core/config/PipelineConfig.cpp @@ -191,6 +191,7 @@ bool PipelineConfig::Parse() { mRegion); } const string pluginType = it->asString(); + // when input is singleton, there should only one input to simpify config load transaction if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) { mSingletonInput = pluginType; if (itr->size() > 1) { diff --git a/core/config/PipelineConfig.h b/core/config/PipelineConfig.h index d4e896daf6..6a9219cdcd 100644 --- a/core/config/PipelineConfig.h +++ b/core/config/PipelineConfig.h @@ -32,7 +32,7 @@ struct PipelineConfig { uint32_t mCreateTime = 0; const Json::Value* mGlobal = nullptr; std::vector mInputs; - std::string mSingletonInput; + std::optional mSingletonInput; std::vector mProcessors; std::vector mAggregators; std::vector mFlushers; diff --git a/core/config/watcher/PipelineConfigWatcher.cpp b/core/config/watcher/PipelineConfigWatcher.cpp index 8a284ce815..fb914c4d55 100644 --- a/core/config/watcher/PipelineConfigWatcher.cpp +++ b/core/config/watcher/PipelineConfigWatcher.cpp @@ -454,8 +454,8 @@ bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName, config.mRegion); return false; } - if (!config.mSingletonInput.empty()) { - singletonCache[config.mSingletonInput].push_back( + if (config.mSingletonInput) { + singletonCache[config.mSingletonInput.value()].push_back( make_shared(std::move(config), ConfigDiffEnum::Added)); } } @@ -467,10 +467,10 @@ void PipelineConfigWatcher::PushPipelineConfig(PipelineConfig&& config, PipelineConfigDiff& pDiff, SingletonConfigCache& singletonCache) { // singleton input - if (!config.mSingletonInput.empty()) { + if (config.mSingletonInput) { if (diffEnum == ConfigDiffEnum::Added || diffEnum == ConfigDiffEnum::Modified || diffEnum == ConfigDiffEnum::Unchanged) { - singletonCache[config.mSingletonInput].push_back( + singletonCache[config.mSingletonInput.value()].push_back( make_shared(std::move(config), diffEnum)); } else { LOG_ERROR(sLogger, ("should not reach here", "invalid diff enum")("diff", diffEnum)); diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index bd1794cee7..29666c68c1 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -61,7 +61,7 @@ class Pipeline { const std::string& Name() const { return mName; } PipelineContext& GetContext() const { return mContext; } const Json::Value& GetConfig() const { return *mConfig; } - const std::string& GetSingletonInput() const { return mSingletonInput; } + const std::optional& GetSingletonInput() const { return mSingletonInput; } const std::vector>& GetFlushers() const { return mFlushers; } bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); } const std::unordered_map>& GetPluginStatistics() const { @@ -101,7 +101,7 @@ class Pipeline { mutable PipelineContext mContext; std::unordered_map> mPluginCntMap; std::unique_ptr mConfig; - std::string mSingletonInput; + std::optional mSingletonInput; std::atomic_uint16_t mPluginID; std::atomic_int16_t mInProcessCnt; diff --git a/core/pipeline/plugin/PluginRegistry.cpp b/core/pipeline/plugin/PluginRegistry.cpp index acb0d82b50..d62648bf27 100644 --- a/core/pipeline/plugin/PluginRegistry.cpp +++ b/core/pipeline/plugin/PluginRegistry.cpp @@ -187,8 +187,8 @@ void PluginRegistry::RegisterInputCreator(PluginCreator* creator, bool isSinglet RegisterCreator(INPUT_PLUGIN, creator, isSingleton); } -void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator, bool isSingleton) { - RegisterCreator(PROCESSOR_PLUGIN, creator, isSingleton); +void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator) { + RegisterCreator(PROCESSOR_PLUGIN, creator, false); } void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator, bool isSingleton) { diff --git a/core/pipeline/plugin/PluginRegistry.h b/core/pipeline/plugin/PluginRegistry.h index ddfa3a7bd8..1c552da6dc 100644 --- a/core/pipeline/plugin/PluginRegistry.h +++ b/core/pipeline/plugin/PluginRegistry.h @@ -80,7 +80,7 @@ class PluginRegistry { void LoadStaticPlugins(); void LoadDynamicPlugins(const std::set& plugins); void RegisterInputCreator(PluginCreator* creator, bool isSingleton = false); - void RegisterProcessorCreator(PluginCreator* creator, bool isSingleton = false); + void RegisterProcessorCreator(PluginCreator* creator); void RegisterFlusherCreator(PluginCreator* creator, bool isSingleton = false); PluginCreator* LoadProcessorPlugin(DynamicLibLoader& loader, const std::string pluginType); void RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton);