diff --git a/core/config/ConfigDiff.h b/core/config/ConfigDiff.h index 38fa7ba00a..5bd9c8d205 100644 --- a/core/config/ConfigDiff.h +++ b/core/config/ConfigDiff.h @@ -38,4 +38,6 @@ using PipelineConfigDiff = ConfigDiff; using TaskConfigDiff = ConfigDiff; using InstanceConfigDiff = ConfigDiff; +enum ConfigDiffEnum { Added, Modified, Removed, Unchanged }; + } // namespace logtail diff --git a/core/config/PipelineConfig.cpp b/core/config/PipelineConfig.cpp index e0642b6ee1..4aeea027ca 100644 --- a/core/config/PipelineConfig.cpp +++ b/core/config/PipelineConfig.cpp @@ -191,6 +191,20 @@ bool PipelineConfig::Parse() { mRegion); } const string pluginType = it->asString(); + // when input is singleton, there should only one input to simpify config load transaction + if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(pluginType)) { + mSingletonInput = pluginType; + if (itr->size() > 1) { + PARAM_ERROR_RETURN(sLogger, + alarm, + "more than 1 input plugin is given when global singleton input plugin is used", + noModule, + mName, + mProject, + mLogstore, + mRegion); + } + } if (i == 0) { if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { mHasGoInput = true; @@ -241,7 +255,7 @@ bool PipelineConfig::Parse() { if (hasFileInput && (*mDetail)["inputs"].size() > 1) { PARAM_ERROR_RETURN(sLogger, alarm, - "more than 1 input_file or input_container_stdio plugin is given", + "more than 1 input_file or input_container_stdio is given", noModule, mName, mProject, diff --git a/core/config/PipelineConfig.h b/core/config/PipelineConfig.h index 7d845126e9..6a9219cdcd 100644 --- a/core/config/PipelineConfig.h +++ b/core/config/PipelineConfig.h @@ -32,6 +32,7 @@ struct PipelineConfig { uint32_t mCreateTime = 0; const Json::Value* mGlobal = nullptr; std::vector mInputs; + std::optional mSingletonInput; std::vector mProcessors; std::vector mAggregators; std::vector mFlushers; @@ -49,7 +50,8 @@ struct PipelineConfig { std::string mLogstore; std::string mRegion; - PipelineConfig(const std::string& name, std::unique_ptr&& detail) : mName(name), mDetail(std::move(detail)) {} + PipelineConfig(const std::string& name, std::unique_ptr&& detail) + : mName(name), mDetail(std::move(detail)) {} bool Parse(); @@ -67,7 +69,8 @@ struct PipelineConfig { // bool IsProcessRunnerInvolved() const { // // 长期过渡使用,待C++部分的时序聚合能力与Go持平后恢复下面的正式版 // return !(mHasGoInput && !mHasNativeProcessor); - // // return !(mHasGoInput && !mHasNativeProcessor && (mHasGoProcessor || (mHasGoFlusher && !mHasNativeFlusher))); + // // return !(mHasGoInput && !mHasNativeProcessor && (mHasGoProcessor || (mHasGoFlusher && + // !mHasNativeFlusher))); // } bool HasGoPlugin() const { return mHasGoFlusher || mHasGoProcessor || mHasGoInput; } diff --git a/core/config/watcher/PipelineConfigWatcher.cpp b/core/config/watcher/PipelineConfigWatcher.cpp index d68d7cedf3..fb914c4d55 100644 --- a/core/config/watcher/PipelineConfigWatcher.cpp +++ b/core/config/watcher/PipelineConfigWatcher.cpp @@ -41,11 +41,13 @@ pair PipelineConfigWatcher::CheckConfigDiff( PipelineConfigDiff pDiff; TaskConfigDiff tDiff; unordered_set configSet; + SingletonConfigCache singletonCache; // builtin pipeline configs - InsertBuiltInPipelines(pDiff, tDiff, configSet); - // file pipeline configs - InsertPipelines(pDiff, tDiff, configSet); + InsertBuiltInPipelines(pDiff, tDiff, configSet, singletonCache); + // file pipeline configs + InsertPipelines(pDiff, tDiff, configSet, singletonCache); + CheckSingletonInput(pDiff, singletonCache); for (const auto& name : mPipelineManager->GetAllConfigNames()) { if (configSet.find(name) == configSet.end()) { pDiff.mRemoved.push_back(name); @@ -88,8 +90,9 @@ pair PipelineConfigWatcher::CheckConfigDiff( } void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff, - TaskConfigDiff& tDiff, - unordered_set& configSet) { + TaskConfigDiff& tDiff, + unordered_set& configSet, + SingletonConfigCache& singletonCache) { #ifdef __ENTERPRISE__ const std::map& builtInPipelines = EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs(); @@ -120,7 +123,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff, LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", pipelineName)); continue; } - if (!CheckAddedConfig(pipelineName, std::move(detail), pDiff, tDiff)) { + if (!CheckAddedConfig(pipelineName, std::move(detail), pDiff, tDiff, singletonCache)) { continue; } } else if (pipleineDetail != iter->second) { @@ -161,7 +164,7 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff, } continue; } - if (!CheckModifiedConfig(pipelineName, std::move(detail), pDiff, tDiff)) { + if (!CheckModifiedConfig(pipelineName, std::move(detail), pDiff, tDiff, singletonCache)) { continue; } } else { @@ -175,7 +178,8 @@ void PipelineConfigWatcher::InsertBuiltInPipelines(PipelineConfigDiff& pDiff, void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, - std::unordered_set& configSet) { + std::unordered_set& configSet, + SingletonConfigCache& singletonCache) { for (const auto& dir : mSourceDir) { error_code ec; filesystem::file_status s = filesystem::status(dir, ec); @@ -231,7 +235,7 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff, LOG_INFO(sLogger, ("new config found and disabled", "skip current object")("config", configName)); continue; } - if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff)) { + if (!CheckAddedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) { continue; } } else if (iter->second.first != size || iter->second.second != mTime) { @@ -270,11 +274,12 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff, } continue; } - if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff)) { + if (!CheckModifiedConfig(configName, std::move(detail), pDiff, tDiff, singletonCache)) { continue; } } else { LOG_DEBUG(sLogger, ("existing config file unchanged", "skip current object")); + CheckUnchangedConfig(configName, path, pDiff, tDiff, singletonCache); } } } @@ -283,7 +288,8 @@ void PipelineConfigWatcher::InsertPipelines(PipelineConfigDiff& pDiff, bool PipelineConfigWatcher::CheckAddedConfig(const string& configName, unique_ptr&& configDetail, PipelineConfigDiff& pDiff, - TaskConfigDiff& tDiff) { + TaskConfigDiff& tDiff, + SingletonConfigCache& singletonCache) { switch (GetConfigType(*configDetail)) { case ConfigType::Pipeline: { PipelineConfig config(configName, std::move(configDetail)); @@ -297,7 +303,7 @@ bool PipelineConfigWatcher::CheckAddedConfig(const string& configName, config.mRegion); return false; } - pDiff.mAdded.push_back(std::move(config)); + PushPipelineConfig(std::move(config), ConfigDiffEnum::Added, pDiff, singletonCache); LOG_INFO(sLogger, ("new config found and passed topology check", "prepare to build pipeline")("config", configName)); break; @@ -322,7 +328,8 @@ bool PipelineConfigWatcher::CheckAddedConfig(const string& configName, bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName, unique_ptr&& configDetail, PipelineConfigDiff& pDiff, - TaskConfigDiff& tDiff) { + TaskConfigDiff& tDiff, + SingletonConfigCache& singletonCache) { switch (GetConfigType(*configDetail)) { case ConfigType::Pipeline: { shared_ptr p = mPipelineManager->FindConfigByName(configName); @@ -341,10 +348,10 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName, config.mRegion); return false; } - pDiff.mAdded.push_back(std::move(config)); LOG_INFO(sLogger, ("existing invalid config modified and passed topology check", "prepare to build pipeline")("config", configName)); + PushPipelineConfig(std::move(config), ConfigDiffEnum::Added, pDiff, singletonCache); } else if (*configDetail != p->GetConfig()) { PipelineConfig config(configName, std::move(configDetail)); if (!config.Parse()) { @@ -360,10 +367,10 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName, config.mRegion); return false; } - pDiff.mModified.push_back(std::move(config)); LOG_INFO(sLogger, ("existing valid config modified and passed topology check", "prepare to rebuild pipeline")("config", configName)); + PushPipelineConfig(std::move(config), ConfigDiffEnum::Modified, pDiff, singletonCache); } else { LOG_DEBUG(sLogger, ("existing valid config file modified, but no change found", "skip current object")); } @@ -412,4 +419,132 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName, return true; } +bool PipelineConfigWatcher::CheckUnchangedConfig(const std::string& configName, + const filesystem::path& path, + PipelineConfigDiff& pDiff, + TaskConfigDiff& tDiff, + SingletonConfigCache& singletonCache) { + auto pipeline = mPipelineManager->FindConfigByName(configName); + auto task = mTaskPipelineManager->FindPipelineByName(configName).get(); + if (task) { + return true; + } else if (pipeline) { // running pipeline in last config update + std::unique_ptr configDetail = make_unique(); + PipelineConfig config(configName, std::move(configDetail)); + config.mCreateTime = pipeline->GetContext().GetCreateTime(); + config.mSingletonInput = pipeline->GetSingletonInput(); + PushPipelineConfig(std::move(config), ConfigDiffEnum::Unchanged, pDiff, singletonCache); + } else { // low priority singleton input in last config update, sort it again + unique_ptr detail = make_unique(); + if (!LoadConfigDetailFromFile(path, *detail)) { + return false; + } + if (!IsConfigEnabled(configName, *detail)) { + LOG_INFO(sLogger, ("unchanged config found and disabled", "skip current object")("config", configName)); + return false; + } + PipelineConfig config(configName, std::move(detail)); + if (!config.Parse()) { + LOG_ERROR(sLogger, ("new config found but invalid", "skip current object")("config", configName)); + AlarmManager::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM, + "new config found but invalid: skip current object, config: " + + configName, + config.mProject, + config.mLogstore, + config.mRegion); + return false; + } + if (config.mSingletonInput) { + singletonCache[config.mSingletonInput.value()].push_back( + make_shared(std::move(config), ConfigDiffEnum::Added)); + } + } + return true; +} + +void PipelineConfigWatcher::PushPipelineConfig(PipelineConfig&& config, + ConfigDiffEnum diffEnum, + PipelineConfigDiff& pDiff, + SingletonConfigCache& singletonCache) { + // singleton input + if (config.mSingletonInput) { + if (diffEnum == ConfigDiffEnum::Added || diffEnum == ConfigDiffEnum::Modified + || diffEnum == ConfigDiffEnum::Unchanged) { + singletonCache[config.mSingletonInput.value()].push_back( + make_shared(std::move(config), diffEnum)); + } else { + LOG_ERROR(sLogger, ("should not reach here", "invalid diff enum")("diff", diffEnum)); + } + return; + } + // no singleton input + switch (diffEnum) { + case ConfigDiffEnum::Added: + pDiff.mAdded.push_back(std::move(config)); + break; + case ConfigDiffEnum::Modified: + pDiff.mModified.push_back(std::move(config)); + break; + default: + break; + } +} + +void PipelineConfigWatcher::CheckSingletonInput(PipelineConfigDiff& pDiff, SingletonConfigCache& singletonCache) { + for (auto& [name, configs] : singletonCache) { + std::sort(configs.begin(), + configs.end(), + [](const std::shared_ptr& a, + const std::shared_ptr& b) -> bool { + if (a->config.mCreateTime == b->config.mCreateTime) { + return a->config.mName < b->config.mName; + } + return a->config.mCreateTime < b->config.mCreateTime; + }); + for (size_t i = 0; i < configs.size(); ++i) { + const auto& diffEnum = configs[i]->diffEnum; + const auto& configName = configs[i]->config.mName; + if (i == 0) { + switch (diffEnum) { + // greatest priority config + case ConfigDiffEnum::Added: + LOG_INFO(sLogger, + ("new config with singleton input found and passed topology check", + "prepare to build pipeline")("config", configName)); + pDiff.mAdded.push_back(std::move(configs[0]->config)); + break; + case ConfigDiffEnum::Modified: + LOG_INFO(sLogger, + ("existing config with singleton input modified and passed topology check", + "prepare to build pipeline")("config", configName)); + pDiff.mModified.push_back(std::move(configs[0]->config)); + break; + default: + break; + } + } else { + // other low priority configs + switch (diffEnum) { + case ConfigDiffEnum::Modified: + LOG_WARNING(sLogger, + ("global singleton plugin found, but another older config or smaller name config " + "already exists", + "skip current object")("config", configName)); + pDiff.mRemoved.push_back(configName); + break; + case ConfigDiffEnum::Unchanged: + LOG_WARNING(sLogger, + ("existing valid config with global singleton plugin, but another older config or " + "smaller name config found", + "prepare to stop current running pipeline")("config", configName)); + pDiff.mRemoved.push_back(configName); + break; + default: + break; + } + } + } + } +} + } // namespace logtail diff --git a/core/config/watcher/PipelineConfigWatcher.h b/core/config/watcher/PipelineConfigWatcher.h index 98c2677264..766b55c512 100644 --- a/core/config/watcher/PipelineConfigWatcher.h +++ b/core/config/watcher/PipelineConfigWatcher.h @@ -16,6 +16,9 @@ #pragma once +#include +#include +#include #include #include "config/ConfigDiff.h" @@ -26,6 +29,14 @@ namespace logtail { class PipelineManager; class TaskPipelineManager; +struct PipelineConfigWithDiffInfo { + PipelineConfig config; + ConfigDiffEnum diffEnum; + PipelineConfigWithDiffInfo(PipelineConfig&& config, ConfigDiffEnum diffEnum) + : config(std::move(config)), diffEnum(diffEnum) {} +}; +using SingletonConfigCache = std::unordered_map>>; + class PipelineConfigWatcher : public ConfigWatcher { public: PipelineConfigWatcher(const PipelineConfigWatcher&) = delete; @@ -46,19 +57,41 @@ class PipelineConfigWatcher : public ConfigWatcher { PipelineConfigWatcher(); ~PipelineConfigWatcher() = default; - void InsertBuiltInPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); - void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); + void InsertBuiltInPipelines(PipelineConfigDiff& pDiff, + TaskConfigDiff& tDiff, + std::unordered_set& configSet, + SingletonConfigCache& singletonCache); + void InsertPipelines(PipelineConfigDiff& pDiff, + TaskConfigDiff& tDiff, + std::unordered_set& configSet, + SingletonConfigCache& singletonCache); bool CheckAddedConfig(const std::string& configName, std::unique_ptr&& configDetail, PipelineConfigDiff& pDiff, - TaskConfigDiff& tDiff); + TaskConfigDiff& tDiff, + SingletonConfigCache& singletonCache); bool CheckModifiedConfig(const std::string& configName, std::unique_ptr&& configDetail, PipelineConfigDiff& pDiff, - TaskConfigDiff& tDiff); + TaskConfigDiff& tDiff, + SingletonConfigCache& singletonCache); + bool CheckUnchangedConfig(const std::string& configName, + const std::filesystem::path& path, + PipelineConfigDiff& pDiff, + TaskConfigDiff& tDiff, + SingletonConfigCache& singletonCache); + void PushPipelineConfig(PipelineConfig&& config, + ConfigDiffEnum diffEnum, + PipelineConfigDiff& pDiff, + SingletonConfigCache& singletonCache); + void CheckSingletonInput(PipelineConfigDiff& pDiff, SingletonConfigCache& singletonCache); const PipelineManager* mPipelineManager = nullptr; const TaskPipelineManager* mTaskPipelineManager = nullptr; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class PipelineConfigWatcherUnittest; +#endif }; } // namespace logtail diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 1f8e4d5868..3ed21f7d2b 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -70,6 +70,7 @@ void AddExtendedGlobalParamToGoPipeline(const Json::Value& extendedParams, Json: bool Pipeline::Init(PipelineConfig&& config) { mName = config.mName; mConfig = std::move(config.mDetail); + mSingletonInput = config.mSingletonInput; mContext.SetConfigName(mName); mContext.SetCreateTime(config.mCreateTime); mContext.SetPipeline(*this); diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index d6a55911c7..29666c68c1 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -61,6 +61,7 @@ class Pipeline { const std::string& Name() const { return mName; } PipelineContext& GetContext() const { return mContext; } const Json::Value& GetConfig() const { return *mConfig; } + const std::optional& GetSingletonInput() const { return mSingletonInput; } const std::vector>& GetFlushers() const { return mFlushers; } bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); } const std::unordered_map>& GetPluginStatistics() const { @@ -100,6 +101,7 @@ class Pipeline { mutable PipelineContext mContext; std::unordered_map> mPluginCntMap; std::unique_ptr mConfig; + std::optional mSingletonInput; std::atomic_uint16_t mPluginID; std::atomic_int16_t mInProcessCnt; diff --git a/core/pipeline/plugin/PluginRegistry.cpp b/core/pipeline/plugin/PluginRegistry.cpp index bf0d7acbe6..d62648bf27 100644 --- a/core/pipeline/plugin/PluginRegistry.cpp +++ b/core/pipeline/plugin/PluginRegistry.cpp @@ -127,13 +127,13 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const { void PluginRegistry::LoadStaticPlugins() { RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); + RegisterInputCreator(new StaticInputCreator(), true); #if defined(__linux__) && !defined(__ANDROID__) RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); + RegisterInputCreator(new StaticInputCreator(), true); + RegisterInputCreator(new StaticInputCreator(), true); + RegisterInputCreator(new StaticInputCreator(), true); + RegisterInputCreator(new StaticInputCreator(), true); #endif RegisterProcessorCreator(new StaticProcessorCreator()); @@ -183,16 +183,16 @@ void PluginRegistry::LoadDynamicPlugins(const set& plugins) { } } -void PluginRegistry::RegisterInputCreator(PluginCreator* creator) { - RegisterCreator(INPUT_PLUGIN, creator); +void PluginRegistry::RegisterInputCreator(PluginCreator* creator, bool isSingleton) { + RegisterCreator(INPUT_PLUGIN, creator, isSingleton); } void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator) { - RegisterCreator(PROCESSOR_PLUGIN, creator); + RegisterCreator(PROCESSOR_PLUGIN, creator, false); } -void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator) { - RegisterCreator(FLUSHER_PLUGIN, creator); +void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator, bool isSingleton) { + RegisterCreator(FLUSHER_PLUGIN, creator, isSingleton); } PluginCreator* PluginRegistry::LoadProcessorPlugin(DynamicLibLoader& loader, const string pluginType) { @@ -217,11 +217,12 @@ PluginCreator* PluginRegistry::LoadProcessorPlugin(DynamicLibLoader& loader, con return new DynamicCProcessorCreator(plugin, loader.Release()); } -void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator) { +void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton) { if (!creator) { return; } - mPluginDict.emplace(PluginKey(cat, creator->Name()), shared_ptr(creator)); + mPluginDict.emplace(PluginKey(cat, creator->Name()), + PluginCreatorWithInfo(shared_ptr(creator), isSingleton)); } unique_ptr @@ -229,9 +230,21 @@ PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance:: unique_ptr ins; auto creatorEntry = mPluginDict.find(PluginKey(cat, name)); if (creatorEntry != mPluginDict.end()) { - ins = creatorEntry->second->Create(pluginMeta); + ins = creatorEntry->second.first->Create(pluginMeta); } return ins; } -} // namespace logtail \ No newline at end of file +bool PluginRegistry::IsGlobalSingletonInputPlugin(const string& name) const { + return IsGlobalSingleton(INPUT_PLUGIN, name); +} + +bool PluginRegistry::IsGlobalSingleton(PluginCat cat, const string& name) const { + auto creatorEntry = mPluginDict.find(PluginKey(cat, name)); + if (creatorEntry != mPluginDict.end()) { + return creatorEntry->second.second; + } + return false; +} + +} // namespace logtail diff --git a/core/pipeline/plugin/PluginRegistry.h b/core/pipeline/plugin/PluginRegistry.h index 22213d6c39..1c552da6dc 100644 --- a/core/pipeline/plugin/PluginRegistry.h +++ b/core/pipeline/plugin/PluginRegistry.h @@ -46,12 +46,15 @@ class PluginRegistry { void LoadPlugins(); void UnloadPlugins(); std::unique_ptr CreateInput(const std::string& name, const PluginInstance::PluginMeta& pluginMeta); - std::unique_ptr CreateProcessor(const std::string& name, const PluginInstance::PluginMeta& pluginMeta); - std::unique_ptr CreateFlusher(const std::string& name, const PluginInstance::PluginMeta& pluginMeta); + std::unique_ptr CreateProcessor(const std::string& name, + const PluginInstance::PluginMeta& pluginMeta); + std::unique_ptr CreateFlusher(const std::string& name, + const PluginInstance::PluginMeta& pluginMeta); bool IsValidGoPlugin(const std::string& name) const; bool IsValidNativeInputPlugin(const std::string& name) const; bool IsValidNativeProcessorPlugin(const std::string& name) const; bool IsValidNativeFlusherPlugin(const std::string& name) const; + bool IsGlobalSingletonInputPlugin(const std::string& name) const; private: enum PluginCat { INPUT_PLUGIN, PROCESSOR_PLUGIN, FLUSHER_PLUGIN }; @@ -69,19 +72,23 @@ class PluginRegistry { } }; + using PluginCreatorWithInfo = std::pair, bool>; + PluginRegistry() {} ~PluginRegistry() = default; void LoadStaticPlugins(); void LoadDynamicPlugins(const std::set& plugins); - void RegisterInputCreator(PluginCreator* creator); + void RegisterInputCreator(PluginCreator* creator, bool isSingleton = false); void RegisterProcessorCreator(PluginCreator* creator); - void RegisterFlusherCreator(PluginCreator* creator); + void RegisterFlusherCreator(PluginCreator* creator, bool isSingleton = false); PluginCreator* LoadProcessorPlugin(DynamicLibLoader& loader, const std::string pluginType); - void RegisterCreator(PluginCat cat, PluginCreator* creator); - std::unique_ptr Create(PluginCat cat, const std::string& name, const PluginInstance::PluginMeta& pluginMeta); + void RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton); + std::unique_ptr + Create(PluginCat cat, const std::string& name, const PluginInstance::PluginMeta& pluginMeta); + bool IsGlobalSingleton(PluginCat cat, const std::string& name) const; - std::unordered_map, PluginKeyHash> mPluginDict; + std::unordered_map mPluginDict; #ifdef APSARA_UNIT_TEST_MAIN friend class PluginRegistryUnittest; diff --git a/core/unittest/config/CMakeLists.txt b/core/unittest/config/CMakeLists.txt index 560d47d393..7efb993180 100644 --- a/core/unittest/config/CMakeLists.txt +++ b/core/unittest/config/CMakeLists.txt @@ -44,6 +44,9 @@ target_link_libraries(config_feedbackable_unittest ${UT_BASE_TARGET}) add_executable(common_config_provider_unittest CommonConfigProviderUnittest.cpp) target_link_libraries(common_config_provider_unittest ${UT_BASE_TARGET}) +add_executable(pipeline_config_watcher_unittest PipelineConfigWatcherUnittest.cpp) +target_link_libraries(pipeline_config_watcher_unittest ${UT_BASE_TARGET}) + include(GoogleTest) gtest_discover_tests(pipeline_config_unittest) gtest_discover_tests(task_config_unittest) @@ -54,3 +57,4 @@ if (ENABLE_ENTERPRISE) endif () gtest_discover_tests(config_feedbackable_unittest) gtest_discover_tests(common_config_provider_unittest) +gtest_discover_tests(pipeline_config_watcher_unittest) diff --git a/core/unittest/config/ConfigUpdateUnittest.cpp b/core/unittest/config/ConfigUpdateUnittest.cpp index 341f48f00d..6b71e40eb8 100644 --- a/core/unittest/config/ConfigUpdateUnittest.cpp +++ b/core/unittest/config/ConfigUpdateUnittest.cpp @@ -26,48 +26,13 @@ #include "pipeline/plugin/PluginRegistry.h" #include "task_pipeline/TaskPipelineManager.h" #include "unittest/Unittest.h" +#include "unittest/config/PipelineManagerMock.h" #include "unittest/plugin/PluginMock.h" using namespace std; namespace logtail { -class PipelineMock : public Pipeline { -public: - bool Init(PipelineConfig&& config) { - mConfig = std::move(config.mDetail); - WriteMetrics::GetInstance()->PrepareMetricsRecordRef( - mMetricsRecordRef, - MetricCategory::METRIC_CATEGORY_PIPELINE, - {{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()}, {METRIC_LABEL_KEY_PIPELINE_NAME, mName}}); - mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME); - return (*mConfig)["valid"].asBool(); - } -}; - -class PipelineManagerMock : public PipelineManager { -public: - static PipelineManagerMock* GetInstance() { - static PipelineManagerMock instance; - return &instance; - } - - void ClearEnvironment() { - mPipelineNameEntityMap.clear(); - mPluginCntMap.clear(); - } - -private: - shared_ptr BuildPipeline(PipelineConfig&& config) override { - // this should be synchronized with PipelineManager::BuildPipeline, except for the pointer type. - shared_ptr p = make_shared(); - if (!p->Init(std::move(config))) { - return nullptr; - } - return p; - } -}; - class ConfigUpdateUnittest : public testing::Test { public: void OnStartUp() const; diff --git a/core/unittest/config/PipelineConfigWatcherUnittest.cpp b/core/unittest/config/PipelineConfigWatcherUnittest.cpp new file mode 100644 index 0000000000..56c8c3fdd4 --- /dev/null +++ b/core/unittest/config/PipelineConfigWatcherUnittest.cpp @@ -0,0 +1,1454 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include "common/JsonUtil.h" +#include "config/watcher/PipelineConfigWatcher.h" +#include "plugin/PluginRegistry.h" +#include "unittest/Unittest.h" +#include "unittest/config/PipelineManagerMock.h" + +using namespace std; + +namespace logtail { + +class PipelineConfigWatcherUnittest : public testing::Test { +public: + void TestLoadAddedSingletonConfig(); + void TestLoadModifiedSingletonConfig(); + void TestLoadRemovedSingletonConfig(); + void TestLoadUnchangedSingletonConfig(); + +protected: + static void SetUpTestCase() { + PluginRegistry::GetInstance()->LoadPlugins(); + PipelineConfigWatcher::GetInstance()->SetPipelineManager(PipelineManagerMock::GetInstance()); + } + static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); } + +private: + void PrepareConfig() { + filesystem::create_directories(configDir1); + PipelineConfigWatcher::GetInstance()->AddSource(configDir1.string()); + filesystem::create_directories(configDir2); + PipelineConfigWatcher::GetInstance()->AddSource(configDir2.string()); + } + + void ClearConfig() { + PipelineManagerMock::GetInstance()->ClearEnvironment(); + PipelineConfigWatcher::GetInstance()->ClearEnvironment(); + filesystem::remove_all(configDir1); + filesystem::remove_all(configDir2); + } + + filesystem::path configDir1 = "./continuous_pipeline_config1"; + filesystem::path configDir2 = "./continuous_pipeline_config2"; + + const std::string greaterPriorityConfig = R"( + { + "createTime": 1, + "valid": true, + "inputs": [ + { + "Type": "input_network_observer" + } + ], + "flushers": [ + { + "Type": "flusher_sls" + } + ] + } + )"; + + const std::string lessPriorityConfig = R"( + { + "createTime": 2, + "valid": true, + "inputs": [ + { + "Type": "input_network_observer" + } + ], + "flushers": [ + { + "Type": "flusher_sls" + } + ] + } + )"; + + const std::string modifiedGreaterPriorityConfig = R"( + { + "createTime": 1, + "valid": true, + "inputs": [ + { + "Type": "input_network_observer" + } + ], + "processors": [], + "flushers": [ + { + "Type": "flusher_sls" + } + ] + } + )"; + + const std::string modifiedLessPriorityConfig = R"( + { + "createTime": 2, + "valid": true, + "inputs": [ + { + "Type": "input_network_observer" + } + ], + "processors": [], + "flushers": [ + { + "Type": "flusher_sls" + } + ] + } + )"; + + const std::string otherConfig = R"( + { + "createTime": 3, + "valid": true, + "inputs": [ + { + "Type": "input_process_security" + } + ], + "flushers": [ + { + "Type": "flusher_sls" + } + ] + } + )"; + + const std::string modifiedOtherConfig = R"( + { + "createTime": 3, + "valid": true, + "inputs": [ + { + "Type": "input_process_security" + } + ], + "processors": [], + "flushers": [ + { + "Type": "flusher_sls" + } + ] + } + )"; +}; + +// there are 4 kinds of a config: added, modified, removed, unchanged +// there are 4 kinds of priority relationship: first > second, first < second, +// first > second -> first < second, first < second -> first > second +// total case: 4 (first kind) * 4(second kind) * 4(priority) = 64 +void PipelineConfigWatcherUnittest::TestLoadAddedSingletonConfig() { + { // case: added -> added, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: added -> added, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: added -> added, first > second -> first < second + // should not happen + } + { // case: added -> added, first < second -> first > second + // should not happen + } + { // case: added -> modified, first > second + PrepareConfig(); + ofstream fout(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: added -> modified, first < second + PrepareConfig(); + ofstream fout(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: added -> modified, first > second -> first < second + // should not happen + } + { // case: added -> modified, first < second -> first > second + // should not happen + } + { // case: added -> removed, first > second + PrepareConfig(); + ofstream fout(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + filesystem::remove(configDir2 / "test2.json"); + filesystem::remove(configDir2 / "test-other.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(1U, allConfigNames.size()); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[0]); + ClearConfig(); + } + { // case: added -> removed, first < second + PrepareConfig(); + ofstream fout(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + filesystem::remove(configDir2 / "test2.json"); + filesystem::remove(configDir2 / "test-other.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(1U, allConfigNames.size()); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[0]); + ClearConfig(); + } + { // case: added -> removed, first > second -> first < second + // should not happen + } + { // case: added -> removed, first < second -> first > second + // should not happen + } + { // case: added -> unchanged, first > second + PrepareConfig(); + ofstream fout(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: added -> unchanged, first < second + PrepareConfig(); + ofstream fout(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: added -> unchanged, first > second -> first < second + // should not happen + } + { // case: added -> unchanged, first < second -> first > second + // should not happen + } +} + +void PipelineConfigWatcherUnittest::TestLoadModifiedSingletonConfig() { + { // case: modified -> added, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(1U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: modified -> added, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(1U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: modified -> added, first > second -> first < second + // should not happen + } + { // case: modified -> added, first < second -> first > second + // should not happen + } + { // case: modified -> modified, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: modified -> modified, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: modified -> modified, first > second -> first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: modified -> modified, first < second -> first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: modified -> removed, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + filesystem::remove(configDir2 / "test2.json"); + filesystem::remove(configDir2 / "test-other.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(1U, allConfigNames.size()); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[0]); + ClearConfig(); + } + { // case: modified -> removed, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + filesystem::remove(configDir2 / "test2.json"); + filesystem::remove(configDir2 / "test-other.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(1U, allConfigNames.size()); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[0]); + ClearConfig(); + } + { // case: modified -> removed, first > second -> first < second + // should not happen + } + { // case: modified -> removed, first < second -> first > second + // should not happen + } + { // case: modified -> unchanged, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: modified -> unchanged, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + filesystem::remove(configDir2 / "test2.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: modified -> unchanged, first > second -> first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: modified -> unchanged, first < second -> first > second + PrepareConfig(); + ofstream fout(configDir1 / "test3.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + std::this_thread::sleep_for(std::chrono::milliseconds(1)); + + fout.open(configDir1 / "test3.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test3", allConfigNames[1]); + ClearConfig(); + } +} + +void PipelineConfigWatcherUnittest::TestLoadRemovedSingletonConfig() { + { // case: removed -> added, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(1U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + filesystem::remove(configDir1 / "test1.json"); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: removed -> added, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(1U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + filesystem::remove(configDir1 / "test1.json"); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: removed -> added, first > second -> first < second + // should not happen + } + { // case: removed -> added, first < second -> first > second + // should not happen + } + { // case: removed -> modified, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + this_thread::sleep_for(chrono::milliseconds(1)); + + filesystem::remove(configDir1 / "test1.json"); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: removed -> modified, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + this_thread::sleep_for(chrono::milliseconds(1)); + + filesystem::remove(configDir1 / "test1.json"); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: removed -> modified, first > second -> first < second + // should not happen + } + { // case: removed -> modified, first < second -> first > second + // should not happen + } + { // case: removed -> removed, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + filesystem::remove(configDir1 / "test1.json"); + filesystem::remove(configDir2 / "test2.json"); + filesystem::remove(configDir2 / "test-other.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(0U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + ClearConfig(); + } + { // case: removed -> removed, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + filesystem::remove(configDir1 / "test1.json"); + filesystem::remove(configDir2 / "test2.json"); + filesystem::remove(configDir2 / "test-other.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(0U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + ClearConfig(); + } + { // case: removed -> removed, first > second -> first < second + // should not happen + } + { // case: removed -> removed, first < second -> first > second + // should not happen + } + { // case: removed -> unchanged, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + filesystem::remove(configDir1 / "test1.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: removed -> unchanged, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + filesystem::remove(configDir1 / "test1.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: removed -> unchanged, first > second -> first < second + // should not happen + } + { // case: removed -> unchanged, first < second -> first > second + // should not happen + } +} + +void PipelineConfigWatcherUnittest::TestLoadUnchangedSingletonConfig() { + { // case: unchanged -> added, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(1U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: unchanged -> added, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(1U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: unchanged -> added, first > second -> first < second + // should not happen + } + { // case: unchanged -> added, first < second -> first > second + // should not happen + } + { // case: unchanged -> modified, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: unchanged -> modified, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedGreaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: unchanged -> modified, first > second -> first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: unchanged -> modified, first < second -> first > second + PrepareConfig(); + ofstream fout(configDir1 / "test3.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + fout.open(configDir2 / "test2.json", ios::trunc); + fout << modifiedLessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << modifiedOtherConfig; + fout.close(); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test3", allConfigNames[1]); + ClearConfig(); + } + { // case: unchanged -> removed, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + filesystem::remove(configDir2 / "test2.json"); + filesystem::remove(configDir2 / "test-other.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(1U, allConfigNames.size()); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[0]); + ClearConfig(); + } + { // case: unchanged -> removed, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + filesystem::remove(configDir2 / "test2.json"); + filesystem::remove(configDir2 / "test-other.json"); + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(1, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(2, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(1U, allConfigNames.size()); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[0]); + ClearConfig(); + } + { // case: unchanged -> removed, first > second -> first < second + // should not happen + } + { // case: unchanged -> removed, first < second -> first > second + // should not happen + } + { // case: unchanged -> unchanged, first > second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test1", allConfigNames[1]); + ClearConfig(); + } + { // case: unchanged -> unchanged, first < second + PrepareConfig(); + ofstream fout(configDir1 / "test1.json", ios::trunc); + fout << lessPriorityConfig; + fout.close(); + fout.open(configDir2 / "test2.json", ios::trunc); + fout << greaterPriorityConfig; + fout.close(); + fout.open(configDir2 / "test-other.json", ios::trunc); + fout << otherConfig; + fout.close(); + auto diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + APSARA_TEST_EQUAL_FATAL(2U, PipelineManagerMock::GetInstance()->GetAllConfigNames().size()); + + diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mAdded.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mModified.size()); + APSARA_TEST_EQUAL_FATAL(0, diff.first.mRemoved.size()); + + PipelineManagerMock::GetInstance()->UpdatePipelines(diff.first); + auto allConfigNames = PipelineManagerMock::GetInstance()->GetAllConfigNames(); + APSARA_TEST_EQUAL_FATAL(2U, allConfigNames.size()); + sort(allConfigNames.begin(), allConfigNames.end()); + APSARA_TEST_EQUAL_FATAL("test-other", allConfigNames[0]); + APSARA_TEST_EQUAL_FATAL("test2", allConfigNames[1]); + ClearConfig(); + } + { // case: unchanged -> unchanged, first > second -> first < second + // should not happen + } + { // case: unchanged -> unchanged, first < second -> first > second + // should not happen + } +} + +UNIT_TEST_CASE(PipelineConfigWatcherUnittest, TestLoadAddedSingletonConfig) +UNIT_TEST_CASE(PipelineConfigWatcherUnittest, TestLoadModifiedSingletonConfig) +UNIT_TEST_CASE(PipelineConfigWatcherUnittest, TestLoadRemovedSingletonConfig) +UNIT_TEST_CASE(PipelineConfigWatcherUnittest, TestLoadUnchangedSingletonConfig) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/config/PipelineManagerMock.h b/core/unittest/config/PipelineManagerMock.h new file mode 100644 index 0000000000..b25e4f05b9 --- /dev/null +++ b/core/unittest/config/PipelineManagerMock.h @@ -0,0 +1,58 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "pipeline/PipelineManager.h" + +using namespace std; + +namespace logtail { + +class PipelineMock : public Pipeline { +public: + bool Init(PipelineConfig&& config) { + mConfig = std::move(config.mDetail); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef( + mMetricsRecordRef, + MetricCategory::METRIC_CATEGORY_PIPELINE, + {{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()}, {METRIC_LABEL_KEY_PIPELINE_NAME, mName}}); + mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME); + mSingletonInput = config.mSingletonInput; + mContext.SetCreateTime(config.mCreateTime); + return (*mConfig)["valid"].asBool(); + } +}; + +class PipelineManagerMock : public PipelineManager { +public: + static PipelineManagerMock* GetInstance() { + static PipelineManagerMock instance; + return &instance; + } + + void ClearEnvironment() { + mPipelineNameEntityMap.clear(); + mPluginCntMap.clear(); + } + +private: + shared_ptr BuildPipeline(PipelineConfig&& config) override { + // this should be synchronized with PipelineManager::BuildPipeline, except for the pointer type. + shared_ptr p = make_shared(); + if (!p->Init(std::move(config))) { + return nullptr; + } + return p; + } +}; +} // namespace logtail \ No newline at end of file