diff --git a/core/config/watcher/PipelineConfigWatcher.cpp b/core/config/watcher/PipelineConfigWatcher.cpp index 7c91068b72..dc0f698897 100644 --- a/core/config/watcher/PipelineConfigWatcher.cpp +++ b/core/config/watcher/PipelineConfigWatcher.cpp @@ -80,6 +80,7 @@ pair PipelineConfigWatcher::CheckConfigDiff( LOG_DEBUG(sLogger, ("config files scan done", "no task update")); } + SortPipelineConfigDiff(pDiff); return make_pair(std::move(pDiff), std::move(tDiff)); } @@ -405,4 +406,23 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName, return true; } +void PipelineConfigWatcher::SortPipelineConfigDiff(PipelineConfigDiff& pDiff) { + // sort rule + // 1. sort by create time first, if create time is 0 (include local config), put it back + // 2. if create time is the same, sort by name + auto cmp = [](const PipelineConfig& a, const PipelineConfig& b) { + if (a.mCreateTime == b.mCreateTime) { + return a.mName < b.mName; + } else if (a.mCreateTime == 0 && b.mCreateTime != 0) { + return false; + } else if (a.mCreateTime != 0 && b.mCreateTime == 0) { + return true; + } else { + return a.mCreateTime < b.mCreateTime; + } + }; + sort(pDiff.mAdded.begin(), pDiff.mAdded.end(), cmp); + sort(pDiff.mModified.begin(), pDiff.mModified.end(), cmp); +} + } // namespace logtail diff --git a/core/config/watcher/PipelineConfigWatcher.h b/core/config/watcher/PipelineConfigWatcher.h index 28a7f00f97..138483db36 100644 --- a/core/config/watcher/PipelineConfigWatcher.h +++ b/core/config/watcher/PipelineConfigWatcher.h @@ -46,7 +46,8 @@ class PipelineConfigWatcher : public ConfigWatcher { PipelineConfigWatcher(); ~PipelineConfigWatcher() = default; - void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); + void + InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set& configSet); bool CheckAddedConfig(const std::string& configName, std::unique_ptr&& configDetail, @@ -56,9 +57,14 @@ class PipelineConfigWatcher : public ConfigWatcher { std::unique_ptr&& configDetail, PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff); + void SortPipelineConfigDiff(PipelineConfigDiff& pDiff); const PipelineManager* mPipelineManager = nullptr; const TaskPipelineManager* mTaskPipelineManager = nullptr; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class PipelineConfigWatcherUnittest; +#endif }; } // namespace logtail diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 57abcde874..fc6c442bff 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -29,8 +29,7 @@ #include "shennong/ShennongManager.h" #endif #include "config/feedbacker/ConfigFeedbackReceiver.h" -#include "pipeline/queue/ProcessQueueManager.h" -#include "pipeline/queue/QueueKeyManager.h" +#include "plugin/PluginRegistry.h" using namespace std; @@ -40,7 +39,7 @@ PipelineManager::PipelineManager() : mInputRunners({ PrometheusInputRunner::GetInstance(), #if defined(__linux__) && !defined(__ANDROID__) - ebpf::eBPFServer::GetInstance(), + ebpf::eBPFServer::GetInstance(), #endif }) { } @@ -82,6 +81,10 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { ConfigFeedbackStatus::DELETED); } for (auto& config : diff.mModified) { + if (!PreCheckPipelineConfig(config)) { + continue; + } + auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue if (!p) { LOG_WARNING(sLogger, @@ -97,6 +100,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { ConfigFeedbackStatus::FAILED); continue; } + LOG_INFO(sLogger, ("pipeline building for existing config succeeded", "stop the old pipeline and start the new one")("config", config.mName)); @@ -111,6 +115,10 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { ConfigFeedbackStatus::APPLIED); } for (auto& config : diff.mAdded) { + if (!PreCheckPipelineConfig(config)) { + continue; + } + auto p = BuildPipeline(std::move(config)); if (!p) { LOG_WARNING(sLogger, @@ -238,6 +246,35 @@ void PipelineManager::DecreasePluginUsageCnt(const unordered_mapSendAlarm( + CATEGORY_CONFIG_ALARM, + "global singleton input plugin is already loaded: skip current object, config: " + config.mName, + config.mProject, + config.mLogstore, + config.mRegion); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName, + ConfigFeedbackStatus::FAILED); + return false; + } + return true; +} + +bool PipelineManager::CheckIfGlobalSingletonInputLoaded(std::vector& inputConfig) { + for (const auto& input : inputConfig) { + auto inputType = (*input)["Type"].asString(); + if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(inputType) + && mPluginCntMap["inputs"][inputType] > 0) { + return true; + } + } + return false; +} + bool PipelineManager::CheckIfFileServerUpdated(const Json::Value& config) { string inputType = config["Type"].asString(); return inputType == "input_file" || inputType == "input_container_stdio"; diff --git a/core/pipeline/PipelineManager.h b/core/pipeline/PipelineManager.h index 5dc1535f77..b4880b59ca 100644 --- a/core/pipeline/PipelineManager.h +++ b/core/pipeline/PipelineManager.h @@ -58,6 +58,8 @@ class PipelineManager { void DecreasePluginUsageCnt( const std::unordered_map>& statistics); void FlushAllBatch(); + bool PreCheckPipelineConfig(PipelineConfig& config); + bool CheckIfGlobalSingletonInputLoaded(std::vector& inputConfig); // TODO: 长期过渡使用 bool CheckIfFileServerUpdated(const Json::Value& config); diff --git a/core/pipeline/plugin/PluginRegistry.cpp b/core/pipeline/plugin/PluginRegistry.cpp index bf0d7acbe6..bbb81753f6 100644 --- a/core/pipeline/plugin/PluginRegistry.cpp +++ b/core/pipeline/plugin/PluginRegistry.cpp @@ -127,13 +127,13 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const { void PluginRegistry::LoadStaticPlugins() { RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); + RegisterInputCreator(new StaticInputCreator(), true); #if defined(__linux__) && !defined(__ANDROID__) RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); - RegisterInputCreator(new StaticInputCreator()); + RegisterInputCreator(new StaticInputCreator(), true); + RegisterInputCreator(new StaticInputCreator(), true); + RegisterInputCreator(new StaticInputCreator(), true); + RegisterInputCreator(new StaticInputCreator(), true); #endif RegisterProcessorCreator(new StaticProcessorCreator()); @@ -183,16 +183,16 @@ void PluginRegistry::LoadDynamicPlugins(const set& plugins) { } } -void PluginRegistry::RegisterInputCreator(PluginCreator* creator) { - RegisterCreator(INPUT_PLUGIN, creator); +void PluginRegistry::RegisterInputCreator(PluginCreator* creator, bool isSingleton) { + RegisterCreator(INPUT_PLUGIN, creator, isSingleton); } -void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator) { - RegisterCreator(PROCESSOR_PLUGIN, creator); +void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator, bool isSingleton) { + RegisterCreator(PROCESSOR_PLUGIN, creator, isSingleton); } -void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator) { - RegisterCreator(FLUSHER_PLUGIN, creator); +void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator, bool isSingleton) { + RegisterCreator(FLUSHER_PLUGIN, creator, isSingleton); } PluginCreator* PluginRegistry::LoadProcessorPlugin(DynamicLibLoader& loader, const string pluginType) { @@ -217,11 +217,12 @@ PluginCreator* PluginRegistry::LoadProcessorPlugin(DynamicLibLoader& loader, con return new DynamicCProcessorCreator(plugin, loader.Release()); } -void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator) { +void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton) { if (!creator) { return; } - mPluginDict.emplace(PluginKey(cat, creator->Name()), shared_ptr(creator)); + mPluginDict.emplace(PluginKey(cat, creator->Name()), + PluginCreatorWithInfo(shared_ptr(creator), isSingleton)); } unique_ptr @@ -229,9 +230,29 @@ PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance:: unique_ptr ins; auto creatorEntry = mPluginDict.find(PluginKey(cat, name)); if (creatorEntry != mPluginDict.end()) { - ins = creatorEntry->second->Create(pluginMeta); + ins = creatorEntry->second.first->Create(pluginMeta); } return ins; } +bool PluginRegistry::IsGlobalSingletonInputPlugin(const string& name) const { + return IsGlobalSingleton(INPUT_PLUGIN, name); +} + +bool PluginRegistry::IsGlobalSingletonProcessorPlugin(const string& name) const { + return IsGlobalSingleton(PROCESSOR_PLUGIN, name); +} + +bool PluginRegistry::IsGlobalSingletonFlusherPlugin(const string& name) const { + return IsGlobalSingleton(FLUSHER_PLUGIN, name); +} + +bool PluginRegistry::IsGlobalSingleton(PluginCat cat, const string& name) const { + auto creatorEntry = mPluginDict.find(PluginKey(cat, name)); + if (creatorEntry != mPluginDict.end()) { + return creatorEntry->second.second; + } + return false; +} + } // namespace logtail \ No newline at end of file diff --git a/core/pipeline/plugin/PluginRegistry.h b/core/pipeline/plugin/PluginRegistry.h index 22213d6c39..9a094e16fa 100644 --- a/core/pipeline/plugin/PluginRegistry.h +++ b/core/pipeline/plugin/PluginRegistry.h @@ -46,12 +46,17 @@ class PluginRegistry { void LoadPlugins(); void UnloadPlugins(); std::unique_ptr CreateInput(const std::string& name, const PluginInstance::PluginMeta& pluginMeta); - std::unique_ptr CreateProcessor(const std::string& name, const PluginInstance::PluginMeta& pluginMeta); - std::unique_ptr CreateFlusher(const std::string& name, const PluginInstance::PluginMeta& pluginMeta); + std::unique_ptr CreateProcessor(const std::string& name, + const PluginInstance::PluginMeta& pluginMeta); + std::unique_ptr CreateFlusher(const std::string& name, + const PluginInstance::PluginMeta& pluginMeta); bool IsValidGoPlugin(const std::string& name) const; bool IsValidNativeInputPlugin(const std::string& name) const; bool IsValidNativeProcessorPlugin(const std::string& name) const; bool IsValidNativeFlusherPlugin(const std::string& name) const; + bool IsGlobalSingletonInputPlugin(const std::string& name) const; + bool IsGlobalSingletonProcessorPlugin(const std::string& name) const; + bool IsGlobalSingletonFlusherPlugin(const std::string& name) const; private: enum PluginCat { INPUT_PLUGIN, PROCESSOR_PLUGIN, FLUSHER_PLUGIN }; @@ -69,19 +74,23 @@ class PluginRegistry { } }; + using PluginCreatorWithInfo = std::pair, bool>; + PluginRegistry() {} ~PluginRegistry() = default; void LoadStaticPlugins(); void LoadDynamicPlugins(const std::set& plugins); - void RegisterInputCreator(PluginCreator* creator); - void RegisterProcessorCreator(PluginCreator* creator); - void RegisterFlusherCreator(PluginCreator* creator); + void RegisterInputCreator(PluginCreator* creator, bool isSingleton = false); + void RegisterProcessorCreator(PluginCreator* creator, bool isSingleton = false); + void RegisterFlusherCreator(PluginCreator* creator, bool isSingleton = false); PluginCreator* LoadProcessorPlugin(DynamicLibLoader& loader, const std::string pluginType); - void RegisterCreator(PluginCat cat, PluginCreator* creator); - std::unique_ptr Create(PluginCat cat, const std::string& name, const PluginInstance::PluginMeta& pluginMeta); + void RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton); + std::unique_ptr + Create(PluginCat cat, const std::string& name, const PluginInstance::PluginMeta& pluginMeta); + bool IsGlobalSingleton(PluginCat cat, const std::string& name) const; - std::unordered_map, PluginKeyHash> mPluginDict; + std::unordered_map mPluginDict; #ifdef APSARA_UNIT_TEST_MAIN friend class PluginRegistryUnittest; diff --git a/core/pipeline/plugin/creator/PluginCreator.h b/core/pipeline/plugin/creator/PluginCreator.h index 6888927186..a61d492cdb 100644 --- a/core/pipeline/plugin/creator/PluginCreator.h +++ b/core/pipeline/plugin/creator/PluginCreator.h @@ -16,8 +16,8 @@ #pragma once -#include #include +#include #include "pipeline/plugin/instance/PluginInstance.h" diff --git a/core/pipeline/plugin/instance/InputInstance.h b/core/pipeline/plugin/instance/InputInstance.h index 139f5b554b..8505ecb130 100644 --- a/core/pipeline/plugin/instance/InputInstance.h +++ b/core/pipeline/plugin/instance/InputInstance.h @@ -28,14 +28,12 @@ namespace logtail { class InputInstance : public PluginInstance { public: - InputInstance(Input* plugin, const PluginInstance::PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {} + InputInstance(Input* plugin, const PluginInstance::PluginMeta& pluginMeta) + : PluginInstance(pluginMeta), mPlugin(plugin) {} const std::string& Name() const override { return mPlugin->Name(); } - bool Init(const Json::Value& config, - PipelineContext& context, - size_t inputIdx, - Json::Value& optionalGoPipeline); + bool Init(const Json::Value& config, PipelineContext& context, size_t inputIdx, Json::Value& optionalGoPipeline); bool Start() { return mPlugin->Start(); } bool Stop(bool isPipelineRemoving) { return mPlugin->Stop(isPipelineRemoving); } bool SupportAck() const { return mPlugin->SupportAck(); } diff --git a/core/plugin/input/InputContainerStdio.h b/core/plugin/input/InputContainerStdio.h index a9d1e51aed..67ad10e550 100644 --- a/core/plugin/input/InputContainerStdio.h +++ b/core/plugin/input/InputContainerStdio.h @@ -21,9 +21,9 @@ #include "container_manager/ContainerDiscoveryOptions.h" #include "file_server/FileDiscoveryOptions.h" #include "file_server/MultilineOptions.h" +#include "file_server/reader/FileReaderOptions.h" #include "monitor/PluginMetricManager.h" #include "pipeline/plugin/interface/Input.h" -#include "file_server/reader/FileReaderOptions.h" namespace logtail { diff --git a/core/plugin/input/InputFile.h b/core/plugin/input/InputFile.h index ee8275ef7c..641f1067e5 100644 --- a/core/plugin/input/InputFile.h +++ b/core/plugin/input/InputFile.h @@ -21,9 +21,9 @@ #include "container_manager/ContainerDiscoveryOptions.h" #include "file_server/FileDiscoveryOptions.h" #include "file_server/MultilineOptions.h" +#include "file_server/reader/FileReaderOptions.h" #include "monitor/PluginMetricManager.h" #include "pipeline/plugin/interface/Input.h" -#include "file_server/reader/FileReaderOptions.h" namespace logtail { diff --git a/core/plugin/input/InputFileSecurity.h b/core/plugin/input/InputFileSecurity.h index fea0b459fc..3eac3a7a9f 100644 --- a/core/plugin/input/InputFileSecurity.h +++ b/core/plugin/input/InputFileSecurity.h @@ -19,9 +19,9 @@ #include #include "ebpf/config.h" -#include "pipeline/plugin/interface/Input.h" #include "ebpf/eBPFServer.h" #include "monitor/PluginMetricManager.h" +#include "pipeline/plugin/interface/Input.h" namespace logtail { @@ -34,7 +34,7 @@ class InputFileSecurity : public Input { bool Start() override; bool Stop(bool isPipelineRemoving) override; bool SupportAck() const override { return false; } - + ebpf::SecurityOptions mSecurityOptions; PluginMetricManagerPtr mPluginMgr; }; diff --git a/core/plugin/input/InputInternalMetrics.h b/core/plugin/input/InputInternalMetrics.h index 694edf85af..c59cf89608 100644 --- a/core/plugin/input/InputInternalMetrics.h +++ b/core/plugin/input/InputInternalMetrics.h @@ -30,6 +30,7 @@ class InputInternalMetrics : public Input { bool Start() override; bool Stop(bool isPipelineRemoving) override; bool SupportAck() const override { return true; } + private: SelfMonitorMetricRules mSelfMonitorMetricRules; }; diff --git a/core/plugin/input/InputNetworkObserver.h b/core/plugin/input/InputNetworkObserver.h index 7f204a2c90..72db90501d 100644 --- a/core/plugin/input/InputNetworkObserver.h +++ b/core/plugin/input/InputNetworkObserver.h @@ -19,9 +19,9 @@ #include #include "ebpf/config.h" -#include "pipeline/plugin/interface/Input.h" #include "ebpf/include/export.h" #include "monitor/PluginMetricManager.h" +#include "pipeline/plugin/interface/Input.h" namespace logtail { diff --git a/core/plugin/input/InputNetworkSecurity.h b/core/plugin/input/InputNetworkSecurity.h index cda3a7c170..84a3294a03 100644 --- a/core/plugin/input/InputNetworkSecurity.h +++ b/core/plugin/input/InputNetworkSecurity.h @@ -19,8 +19,8 @@ #include #include "ebpf/config.h" -#include "pipeline/plugin/interface/Input.h" #include "monitor/PluginMetricManager.h" +#include "pipeline/plugin/interface/Input.h" namespace logtail { diff --git a/core/plugin/input/InputProcessSecurity.h b/core/plugin/input/InputProcessSecurity.h index d26d7a95e3..46cd00fcbe 100644 --- a/core/plugin/input/InputProcessSecurity.h +++ b/core/plugin/input/InputProcessSecurity.h @@ -19,8 +19,8 @@ #include #include "ebpf/config.h" -#include "pipeline/plugin/interface/Input.h" #include "monitor/PluginMetricManager.h" +#include "pipeline/plugin/interface/Input.h" namespace logtail { diff --git a/core/unittest/config/CMakeLists.txt b/core/unittest/config/CMakeLists.txt index 560d47d393..7efb993180 100644 --- a/core/unittest/config/CMakeLists.txt +++ b/core/unittest/config/CMakeLists.txt @@ -44,6 +44,9 @@ target_link_libraries(config_feedbackable_unittest ${UT_BASE_TARGET}) add_executable(common_config_provider_unittest CommonConfigProviderUnittest.cpp) target_link_libraries(common_config_provider_unittest ${UT_BASE_TARGET}) +add_executable(pipeline_config_watcher_unittest PipelineConfigWatcherUnittest.cpp) +target_link_libraries(pipeline_config_watcher_unittest ${UT_BASE_TARGET}) + include(GoogleTest) gtest_discover_tests(pipeline_config_unittest) gtest_discover_tests(task_config_unittest) @@ -54,3 +57,4 @@ if (ENABLE_ENTERPRISE) endif () gtest_discover_tests(config_feedbackable_unittest) gtest_discover_tests(common_config_provider_unittest) +gtest_discover_tests(pipeline_config_watcher_unittest) diff --git a/core/unittest/config/PipelineConfigWatcherUnittest.cpp b/core/unittest/config/PipelineConfigWatcherUnittest.cpp new file mode 100644 index 0000000000..0d301f34f7 --- /dev/null +++ b/core/unittest/config/PipelineConfigWatcherUnittest.cpp @@ -0,0 +1,136 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include + +#include +#include + +#include "config/ConfigDiff.h" +#include "config/PipelineConfig.h" +#include "config/watcher/PipelineConfigWatcher.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class PipelineConfigWatcherUnittest : public testing::Test { +public: + void TestSortPipelineConfigDiff() const; + +private: + const string configName = "test"; +}; + +void PipelineConfigWatcherUnittest::TestSortPipelineConfigDiff() const { + PipelineConfigDiff diff; + auto configName1 = "test1"; + auto configJson1 = std::make_unique(); + (*configJson1)["name"] = configName1; + auto config1 = PipelineConfig(configName1, std::move(configJson1)); + config1.Parse(); + + auto configName2 = "test2"; + auto configJson2 = std::make_unique(); + (*configJson2)["name"] = configName2; + (*configJson2)["createTime"] = 1; + auto config2 = PipelineConfig(configName2, std::move(configJson2)); + config2.Parse(); + + auto configName3 = "test3"; + auto configJson3 = std::make_unique(); + (*configJson3)["name"] = configName3; + (*configJson3)["createTime"] = 2; + auto config3 = PipelineConfig(configName3, std::move(configJson3)); + config3.Parse(); + + auto configName4 = "test4"; + auto configJson4 = std::make_unique(); + (*configJson4)["name"] = configName4; + auto config4 = PipelineConfig(configName4, std::move(configJson4)); + config4.Parse(); + + auto configName5 = "test5"; + auto configJson5 = std::make_unique(); + (*configJson5)["name"] = configName5; + (*configJson5)["createTime"] = 1; + auto config5 = PipelineConfig(configName5, std::move(configJson5)); + config5.Parse(); + + diff.mAdded.push_back(std::move(config1)); + diff.mAdded.push_back(std::move(config2)); + diff.mAdded.push_back(std::move(config3)); + diff.mAdded.push_back(std::move(config4)); + diff.mAdded.push_back(std::move(config5)); + + auto configName6 = "test6"; + auto configJson6 = std::make_unique(); + (*configJson6)["name"] = configName6; + auto config6 = PipelineConfig(configName6, std::move(configJson6)); + config6.Parse(); + + auto configName7 = "test7"; + auto configJson7 = std::make_unique(); + (*configJson7)["name"] = configName7; + (*configJson7)["createTime"] = 1; + auto config7 = PipelineConfig(configName7, std::move(configJson7)); + config7.Parse(); + + auto configName8 = "test8"; + auto configJson8 = std::make_unique(); + (*configJson8)["name"] = configName8; + (*configJson8)["createTime"] = 2; + auto config8 = PipelineConfig(configName8, std::move(configJson8)); + config8.Parse(); + + auto configName9 = "test9"; + auto configJson9 = std::make_unique(); + (*configJson9)["name"] = configName9; + auto config9 = PipelineConfig(configName9, std::move(configJson9)); + config9.Parse(); + + auto configName10 = "test10"; + auto configJson10 = std::make_unique(); + (*configJson10)["name"] = configName10; + (*configJson10)["createTime"] = 1; + auto config10 = PipelineConfig(configName10, std::move(configJson10)); + config10.Parse(); + + diff.mModified.push_back(std::move(config6)); + diff.mModified.push_back(std::move(config7)); + diff.mModified.push_back(std::move(config8)); + diff.mModified.push_back(std::move(config9)); + diff.mModified.push_back(std::move(config10)); + + PipelineConfigWatcher::GetInstance()->SortPipelineConfigDiff(diff); + + APSARA_TEST_EQUAL(diff.mAdded[0].mName, configName2); + APSARA_TEST_EQUAL(diff.mAdded[1].mName, configName5); + APSARA_TEST_EQUAL(diff.mAdded[2].mName, configName3); + APSARA_TEST_EQUAL(diff.mAdded[3].mName, configName1); + APSARA_TEST_EQUAL(diff.mAdded[4].mName, configName4); + + APSARA_TEST_EQUAL(diff.mModified[0].mName, configName10); + APSARA_TEST_EQUAL(diff.mModified[1].mName, configName7); + APSARA_TEST_EQUAL(diff.mModified[2].mName, configName8); + APSARA_TEST_EQUAL(diff.mModified[3].mName, configName6); + APSARA_TEST_EQUAL(diff.mModified[4].mName, configName9); +} + +UNIT_TEST_CASE(PipelineConfigWatcherUnittest, TestSortPipelineConfigDiff) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/pipeline/PipelineManagerUnittest.cpp b/core/unittest/pipeline/PipelineManagerUnittest.cpp index 29c56da393..31e258c72f 100644 --- a/core/unittest/pipeline/PipelineManagerUnittest.cpp +++ b/core/unittest/pipeline/PipelineManagerUnittest.cpp @@ -14,6 +14,8 @@ #include "pipeline/Pipeline.h" #include "pipeline/PipelineManager.h" +#include "pipeline/plugin/PluginRegistry.h" +#include "plugin/input/InputNetworkSecurity.h" #include "unittest/Unittest.h" using namespace std; @@ -23,6 +25,11 @@ namespace logtail { class PipelineManagerUnittest : public testing::Test { public: void TestPipelineManagement() const; + void TestCheckIfGlobalSingletonInputLoaded() const; + +protected: + static void SetUpTestCase() { PluginRegistry::GetInstance()->LoadPlugins(); } + static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); } }; void PipelineManagerUnittest::TestPipelineManagement() const { @@ -34,7 +41,35 @@ void PipelineManagerUnittest::TestPipelineManagement() const { APSARA_TEST_EQUAL(nullptr, PipelineManager::GetInstance()->FindConfigByName("test3")); } +void PipelineManagerUnittest::TestCheckIfGlobalSingletonInputLoaded() const { + { // test not singleton input + auto inputConfig = Json::Value(); + inputConfig["Type"] = InputFile::sName; + std::vector inputConfigs = {&inputConfig}; + + PipelineManager::GetInstance()->mPluginCntMap["inputs"][InputFile::sName] = 1; + APSARA_TEST_EQUAL(false, PipelineManager::GetInstance()->CheckIfGlobalSingletonInputLoaded(inputConfigs)); + } + { // test singleton input not loaded + auto inputConfig = Json::Value(); + inputConfig["Type"] = InputNetworkSecurity::sName; + std::vector inputConfigs = {&inputConfig}; + + PipelineManager::GetInstance()->mPluginCntMap["inputs"][InputNetworkSecurity::sName] = 0; + APSARA_TEST_EQUAL(false, PipelineManager::GetInstance()->CheckIfGlobalSingletonInputLoaded(inputConfigs)); + } + { // test singleton input loaded + auto inputConfig = Json::Value(); + inputConfig["Type"] = InputNetworkSecurity::sName; + std::vector inputConfigs = {&inputConfig}; + + PipelineManager::GetInstance()->mPluginCntMap["inputs"][InputNetworkSecurity::sName] = 1; + APSARA_TEST_EQUAL(true, PipelineManager::GetInstance()->CheckIfGlobalSingletonInputLoaded(inputConfigs)); + } +} + UNIT_TEST_CASE(PipelineManagerUnittest, TestPipelineManagement) +UNIT_TEST_CASE(PipelineManagerUnittest, TestCheckIfGlobalSingletonInputLoaded) } // namespace logtail