From 0c409e794f7b37e0cffb9d201dd33830f0567dfb Mon Sep 17 00:00:00 2001 From: abingcbc Date: Tue, 17 Dec 2024 09:51:51 +0800 Subject: [PATCH 1/3] fix: judge file server changed --- core/pipeline/PipelineManager.cpp | 36 +++++++++++++++++++------------ core/pipeline/PipelineManager.h | 2 +- 2 files changed, 23 insertions(+), 15 deletions(-) diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 57abcde874..2d22f90b1f 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -40,7 +40,7 @@ PipelineManager::PipelineManager() : mInputRunners({ PrometheusInputRunner::GetInstance(), #if defined(__linux__) && !defined(__ANDROID__) - ebpf::eBPFServer::GetInstance(), + ebpf::eBPFServer::GetInstance(), #endif }) { } @@ -51,16 +51,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { #ifndef APSARA_UNIT_TEST_MAIN // 过渡使用 static bool isFileServerStarted = false; - bool isFileServerInputChanged = false; - for (const auto& name : diff.mRemoved) { - isFileServerInputChanged = CheckIfFileServerUpdated(mPipelineNameEntityMap[name]->GetConfig()["inputs"][0]); - } - for (const auto& config : diff.mModified) { - isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]); - } - for (const auto& config : diff.mAdded) { - isFileServerInputChanged = CheckIfFileServerUpdated(*config.mInputs[0]); - } + bool isFileServerInputChanged = CheckIfFileServerUpdated(diff); #if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__) if (AppConfig::GetInstance()->ShennongSocketEnabled()) { @@ -238,9 +229,26 @@ void PipelineManager::DecreasePluginUsageCnt(const unordered_mapGetConfig()["inputs"][0]["Type"].asString(); + if (inputType == "input_file" || inputType == "input_container_stdio") { + return true; + } + } + for (const auto& config : diff.mModified) { + string inputType = (*config.mInputs[0])["Type"].asString(); + if (inputType == "input_file" || inputType == "input_container_stdio") { + return true; + } + } + for (const auto& config : diff.mAdded) { + string inputType = (*config.mInputs[0])["Type"].asString(); + if (inputType == "input_file" || inputType == "input_container_stdio") { + return true; + } + } + return false; } } // namespace logtail diff --git a/core/pipeline/PipelineManager.h b/core/pipeline/PipelineManager.h index 5dc1535f77..6c3cf5cbe0 100644 --- a/core/pipeline/PipelineManager.h +++ b/core/pipeline/PipelineManager.h @@ -59,7 +59,7 @@ class PipelineManager { const std::unordered_map>& statistics); void FlushAllBatch(); // TODO: 长期过渡使用 - bool CheckIfFileServerUpdated(const Json::Value& config); + bool CheckIfFileServerUpdated(PipelineConfigDiff& diff); std::unordered_map> mPipelineNameEntityMap; mutable SpinLock mPluginCntMapLock; From 4204183427b04934d47aa9ed974e829083ce165e Mon Sep 17 00:00:00 2001 From: abingcbc Date: Wed, 18 Dec 2024 17:17:54 +0800 Subject: [PATCH 2/3] ut --- core/file_server/event_handler/LogInput.h | 1 + core/monitor/Monitor.h | 12 +- core/pipeline/PipelineManager.cpp | 6 +- .../config/CommonConfigProviderUnittest.cpp | 5 + core/unittest/config/ConfigUpdateUnittest.cpp | 2 + core/unittest/pipeline/CMakeLists.txt | 4 + .../pipeline/PipelineUpdateUnittest.cpp | 117 ++++++++++++++++++ 7 files changed, 142 insertions(+), 5 deletions(-) create mode 100644 core/unittest/pipeline/PipelineUpdateUnittest.cpp diff --git a/core/file_server/event_handler/LogInput.h b/core/file_server/event_handler/LogInput.h index ec294f00da..0692586d7c 100644 --- a/core/file_server/event_handler/LogInput.h +++ b/core/file_server/event_handler/LogInput.h @@ -103,6 +103,7 @@ class LogInput : public LogRunnable { friend class FuxiSceneUnittest; friend class ConfigMatchUnittest; friend class FuseFileUnittest; + friend class PipelineUpdateUnittest; void CleanEnviroments(); #endif diff --git a/core/monitor/Monitor.h b/core/monitor/Monitor.h index 355ff2e2c0..3582d6cf39 100644 --- a/core/monitor/Monitor.h +++ b/core/monitor/Monitor.h @@ -191,8 +191,16 @@ class LoongCollectorMonitor { void SetAgentMemory(uint64_t mem) { mAgentMemory->Set(mem); } void SetAgentGoMemory(uint64_t mem) { mAgentGoMemory->Set(mem); } void SetAgentGoRoutinesTotal(uint64_t total) { mAgentGoRoutinesTotal->Set(total); } - void SetAgentOpenFdTotal(uint64_t total) { mAgentOpenFdTotal->Set(total); } - void SetAgentConfigTotal(uint64_t total) { mAgentConfigTotal->Set(total); } + void SetAgentOpenFdTotal(uint64_t total) { +#ifndef APSARA_UNIT_TEST_MAIN + mAgentOpenFdTotal->Set(total); +#endif + } + void SetAgentConfigTotal(uint64_t total) { +#ifndef APSARA_UNIT_TEST_MAIN + mAgentConfigTotal->Set(total); +#endif + } static std::string mHostname; static std::string mIpAddr; diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 2d22f90b1f..ba5379eb4e 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -48,20 +48,20 @@ PipelineManager::PipelineManager() static shared_ptr sEmptyPipeline; void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { -#ifndef APSARA_UNIT_TEST_MAIN // 过渡使用 static bool isFileServerStarted = false; bool isFileServerInputChanged = CheckIfFileServerUpdated(diff); +#ifndef APSARA_UNIT_TEST_MAIN #if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__) if (AppConfig::GetInstance()->ShennongSocketEnabled()) { ShennongManager::GetInstance()->Pause(); } +#endif #endif if (isFileServerStarted && isFileServerInputChanged) { FileServer::GetInstance()->Pause(); } -#endif for (const auto& name : diff.mRemoved) { auto iter = mPipelineNameEntityMap.find(name); @@ -125,7 +125,6 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { ConfigFeedbackStatus::APPLIED); } -#ifndef APSARA_UNIT_TEST_MAIN // 在Flusher改造完成前,先不执行如下步骤,不会造成太大影响 // Sender::CleanUnusedAk(); @@ -138,6 +137,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { } } +#ifndef APSARA_UNIT_TEST_MAIN #if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__) if (AppConfig::GetInstance()->ShennongSocketEnabled()) { ShennongManager::GetInstance()->Resume(); diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index cecc538727..4543eff7c6 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -26,6 +26,7 @@ #endif #include "config/watcher/InstanceConfigWatcher.h" #include "config/watcher/PipelineConfigWatcher.h" +#include "file_server/FileServer.h" #include "gmock/gmock.h" #include "monitor/Monitor.h" #include "pipeline/PipelineManager.h" @@ -72,6 +73,10 @@ class CommonConfigProviderUnittest : public ::testing::Test { return true; } + static void SetUpTestCase() {} + + static void TearDownTestCase() { FileServer::GetInstance()->Stop(); } + // 在每个测试用例开始前的设置 void SetUp() override { if (BOOL_FLAG(logtail_mode)) { diff --git a/core/unittest/config/ConfigUpdateUnittest.cpp b/core/unittest/config/ConfigUpdateUnittest.cpp index fddd53804a..7d9167673a 100644 --- a/core/unittest/config/ConfigUpdateUnittest.cpp +++ b/core/unittest/config/ConfigUpdateUnittest.cpp @@ -24,6 +24,7 @@ #include "config/provider/EnterpriseConfigProvider.h" #endif #include "config/watcher/PipelineConfigWatcher.h" +#include "file_server/FileServer.h" #include "pipeline/Pipeline.h" #include "pipeline/PipelineManager.h" #include "pipeline/plugin/PluginRegistry.h" @@ -57,6 +58,7 @@ class ConfigUpdateUnittest : public testing::Test { static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); TaskRegistry::GetInstance()->UnloadPlugins(); + FileServer::GetInstance()->Stop(); } void SetUp() override { diff --git a/core/unittest/pipeline/CMakeLists.txt b/core/unittest/pipeline/CMakeLists.txt index 000294a4e5..0695aad08a 100644 --- a/core/unittest/pipeline/CMakeLists.txt +++ b/core/unittest/pipeline/CMakeLists.txt @@ -27,9 +27,13 @@ target_link_libraries(pipeline_manager_unittest ${UT_BASE_TARGET}) add_executable(concurrency_limiter_unittest ConcurrencyLimiterUnittest.cpp) target_link_libraries(concurrency_limiter_unittest ${UT_BASE_TARGET}) +add_executable(pipeline_update_unittest PipelineUpdateUnittest.cpp) +target_link_libraries(pipeline_update_unittest ${UT_BASE_TARGET}) + include(GoogleTest) gtest_discover_tests(global_config_unittest) gtest_discover_tests(pipeline_unittest) gtest_discover_tests(pipeline_manager_unittest) gtest_discover_tests(concurrency_limiter_unittest) +gtest_discover_tests(pipeline_update_unittest) diff --git a/core/unittest/pipeline/PipelineUpdateUnittest.cpp b/core/unittest/pipeline/PipelineUpdateUnittest.cpp new file mode 100644 index 0000000000..e6b7eecca4 --- /dev/null +++ b/core/unittest/pipeline/PipelineUpdateUnittest.cpp @@ -0,0 +1,117 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include + +#include "common/JsonUtil.h" +#include "config/PipelineConfig.h" +#include "file_server/event_handler/LogInput.h" +#include "pipeline/plugin/PluginRegistry.h" +#include "unittest/Unittest.h" +#include "unittest/config/PipelineManagerMock.h" + +using namespace std; + +namespace logtail { + +class PipelineUpdateUnittest : public testing::Test { +public: + void TestRunner() const; + +protected: + static void SetUpTestCase() { PluginRegistry::GetInstance()->LoadPlugins(); } + + static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); } + + void SetUp() override {} + + void TearDown() override {} + +private: + Json::Value GeneratePipelineConfigJson(const string& inputConfig, + const string& processorConfig, + const string& flusherConfig) const { + Json::Value json; + string errorMsg; + ParseJsonTable(R"( + { + "valid": true, + "inputs": [)" + + inputConfig + R"(], + "processors": [)" + + processorConfig + R"(], + "flushers": [)" + + flusherConfig + R"(] + })", + json, + errorMsg); + return json; + } + string nativeInputConfig = R"( + { + "Type": "input_file" + })"; + string nativeProcessorConfig = R"( + { + "Type": "processor_parse_regex_native" + })"; + string nativeFlusherConfig = R"( + { + "Type": "flusher_sls" + })"; + string goInputConfig = R"( + { + "Type": "input_docker_stdout" + })"; + string goProcessorConfig = R"( + { + "Type": "processor_regex" + })"; + string goFlusherConfig = R"( + { + "Type": "flusher_stdout" + })"; +}; + +void PipelineUpdateUnittest::TestRunner() const { + { // input_file + Json::Value nativePipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + Json::Value goPipelineConfigJson + = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + auto pipelineManager = PipelineManagerMock::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig nativePipelineConfigObj + = PipelineConfig("test1", make_unique(nativePipelineConfigJson)); + nativePipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(nativePipelineConfigObj)); + PipelineConfig goPipelineConfigObj = PipelineConfig("test2", make_unique(goPipelineConfigJson)); + goPipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(goPipelineConfigObj)); + + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(2U, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(false, LogInput::GetInstance()->mInteruptFlag); + } +} + +UNIT_TEST_CASE(PipelineUpdateUnittest, TestRunner) + +} // namespace logtail + +UNIT_TEST_MAIN From f168c5b86bebb16fd379da3a7b9903dfbe845580 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Wed, 18 Dec 2024 17:40:11 +0800 Subject: [PATCH 3/3] fix --- core/unittest/config/PipelineManagerMock.h | 3 ++ .../pipeline/PipelineUpdateUnittest.cpp | 45 ++++++++++--------- 2 files changed, 26 insertions(+), 22 deletions(-) diff --git a/core/unittest/config/PipelineManagerMock.h b/core/unittest/config/PipelineManagerMock.h index b25e4f05b9..4a27a50802 100644 --- a/core/unittest/config/PipelineManagerMock.h +++ b/core/unittest/config/PipelineManagerMock.h @@ -31,6 +31,9 @@ class PipelineMock : public Pipeline { mContext.SetCreateTime(config.mCreateTime); return (*mConfig)["valid"].asBool(); } + + bool Start() { return true; } + void Stop(bool isRemoving) {} }; class PipelineManagerMock : public PipelineManager { diff --git a/core/unittest/pipeline/PipelineUpdateUnittest.cpp b/core/unittest/pipeline/PipelineUpdateUnittest.cpp index e6b7eecca4..26c2ef1702 100644 --- a/core/unittest/pipeline/PipelineUpdateUnittest.cpp +++ b/core/unittest/pipeline/PipelineUpdateUnittest.cpp @@ -20,6 +20,7 @@ #include "common/JsonUtil.h" #include "config/PipelineConfig.h" +#include "file_server/FileServer.h" #include "file_server/event_handler/LogInput.h" #include "pipeline/plugin/PluginRegistry.h" #include "unittest/Unittest.h" @@ -31,12 +32,15 @@ namespace logtail { class PipelineUpdateUnittest : public testing::Test { public: - void TestRunner() const; + void TestFileServerStart() const; protected: static void SetUpTestCase() { PluginRegistry::GetInstance()->LoadPlugins(); } - static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); } + static void TearDownTestCase() { + PluginRegistry::GetInstance()->UnloadPlugins(); + FileServer::GetInstance()->Stop(); + } void SetUp() override {} @@ -88,29 +92,26 @@ class PipelineUpdateUnittest : public testing::Test { })"; }; -void PipelineUpdateUnittest::TestRunner() const { - { // input_file - Json::Value nativePipelineConfigJson - = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); - Json::Value goPipelineConfigJson - = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); - auto pipelineManager = PipelineManagerMock::GetInstance(); - PipelineConfigDiff diff; - PipelineConfig nativePipelineConfigObj - = PipelineConfig("test1", make_unique(nativePipelineConfigJson)); - nativePipelineConfigObj.Parse(); - diff.mAdded.push_back(std::move(nativePipelineConfigObj)); - PipelineConfig goPipelineConfigObj = PipelineConfig("test2", make_unique(goPipelineConfigJson)); - goPipelineConfigObj.Parse(); - diff.mAdded.push_back(std::move(goPipelineConfigObj)); +void PipelineUpdateUnittest::TestFileServerStart() const { + Json::Value nativePipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + Json::Value goPipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + auto pipelineManager = PipelineManagerMock::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig nativePipelineConfigObj + = PipelineConfig("test1", make_unique(nativePipelineConfigJson)); + nativePipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(nativePipelineConfigObj)); + PipelineConfig goPipelineConfigObj = PipelineConfig("test2", make_unique(goPipelineConfigJson)); + goPipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(goPipelineConfigObj)); - pipelineManager->UpdatePipelines(diff); - APSARA_TEST_EQUAL_FATAL(2U, pipelineManager->GetAllPipelines().size()); - APSARA_TEST_EQUAL_FATAL(false, LogInput::GetInstance()->mInteruptFlag); - } + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(2U, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(false, LogInput::GetInstance()->mInteruptFlag); } -UNIT_TEST_CASE(PipelineUpdateUnittest, TestRunner) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestFileServerStart) } // namespace logtail