From 88d8dae5963e354e720a21a0a554c5e76062dba0 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Fri, 27 Dec 2024 13:33:45 +0800 Subject: [PATCH] fix --- core/config/PipelineConfig.cpp | 4 +- core/monitor/profile_sender/ProfileSender.cpp | 1 + core/pipeline/Pipeline.cpp | 5 +- core/pipeline/Pipeline.h | 2 +- .../plugin/instance/ProcessorInstance.h | 2 +- .../pipeline/PipelineUpdateUnittest.cpp | 173 +++++++++++++++--- 6 files changed, 158 insertions(+), 29 deletions(-) diff --git a/core/config/PipelineConfig.cpp b/core/config/PipelineConfig.cpp index 7ac1ec6e95..6cafcb6ed4 100644 --- a/core/config/PipelineConfig.cpp +++ b/core/config/PipelineConfig.cpp @@ -246,11 +246,13 @@ bool PipelineConfig::Parse() { } } mInputs.push_back(&plugin); +#ifndef APSARA_UNIT_TEST_MAIN // TODO: remove these special restrictions if (pluginType == "input_file" || pluginType == "input_container_stdio") { hasFileInput = true; } -#ifdef APSARA_UNIT_TEST_MAIN +#else + // TODO: remove these special restrictions after all C++ inputs support Go processors if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) { hasFileInput = true; } diff --git a/core/monitor/profile_sender/ProfileSender.cpp b/core/monitor/profile_sender/ProfileSender.cpp index 2acff991fd..2c6b942c67 100644 --- a/core/monitor/profile_sender/ProfileSender.cpp +++ b/core/monitor/profile_sender/ProfileSender.cpp @@ -119,6 +119,7 @@ FlusherSLS* ProfileSender::GetFlusher(const string& region) { } bool ProfileSender::IsProfileData(const string& region, const string& project, const string& logstore) { +// TODO: temporarily used, profile should work in unit test #ifndef APSARA_UNIT_TEST_MAIN if ((logstore == "shennong_log_profile" || logstore == "logtail_alarm" || logstore == "logtail_status_profile" || logstore == "logtail_suicide_profile") diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 3a9e5b2ed2..3ed21f7d2b 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -395,9 +395,8 @@ bool Pipeline::Send(vector&& groupList) { auto res = mRouter.Route(group); for (auto& item : res) { if (item.first >= mFlushers.size()) { - LOG_WARNING(sLogger, - ("pipeline send", "discard data")("config", mName)( - "reason", "invalid flusher index or config update flusher from C++ to Go")); + LOG_ERROR(sLogger, + ("unexpected error", "invalid flusher index")("flusher index", item.first)("config", mName)); allSucceeded = false; continue; } diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index a5a4799bba..5009b3c445 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -62,7 +62,6 @@ class Pipeline { PipelineContext& GetContext() const { return mContext; } const Json::Value& GetConfig() const { return *mConfig; } const std::optional& GetSingletonInput() const { return mSingletonInput; } - const std::vector>& GetProcessors() const { return mProcessorLine; } const std::vector>& GetFlushers() const { return mFlushers; } bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); } const std::unordered_map>& GetPluginStatistics() const { @@ -129,6 +128,7 @@ class Pipeline { friend class InputProcessSecurityUnittest; friend class InputNetworkSecurityUnittest; friend class InputNetworkObserverUnittest; + friend class PipelineUpdateUnittest; #endif }; diff --git a/core/pipeline/plugin/instance/ProcessorInstance.h b/core/pipeline/plugin/instance/ProcessorInstance.h index 154c0b9ceb..24373685c6 100644 --- a/core/pipeline/plugin/instance/ProcessorInstance.h +++ b/core/pipeline/plugin/instance/ProcessorInstance.h @@ -33,7 +33,6 @@ class ProcessorInstance : public PluginInstance { ProcessorInstance(Processor* plugin, const PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {} const std::string& Name() const override { return mPlugin->Name(); }; - const Processor* GetPlugin() const { return mPlugin.get(); } bool Init(const Json::Value& config, PipelineContext& context); void Process(std::vector& logGroupList); @@ -60,6 +59,7 @@ class ProcessorInstance : public PluginInstance { friend class InputFileUnittest; friend class InputPrometheusUnittest; friend class PipelineUnittest; + friend class PipelineUpdateUnittest; #endif }; diff --git a/core/unittest/pipeline/PipelineUpdateUnittest.cpp b/core/unittest/pipeline/PipelineUpdateUnittest.cpp index 4247f9cc7c..85ee73aa2e 100644 --- a/core/unittest/pipeline/PipelineUpdateUnittest.cpp +++ b/core/unittest/pipeline/PipelineUpdateUnittest.cpp @@ -75,6 +75,8 @@ class PipelineUpdateUnittest : public testing::Test { void TestPipelineUpdateManyCase6() const; void TestPipelineUpdateManyCase7() const; void TestPipelineUpdateManyCase8() const; + void TestPipelineUpdateManyCase9() const; + void TestPipelineUpdateManyCase10() const; protected: static void SetUpTestCase() { @@ -179,7 +181,7 @@ class PipelineUpdateUnittest : public testing::Test { void UnBlockProcessor(std::string configName) const { auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName).get(); auto processor - = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline->mProcessorLine[0].get()->mPlugin.get())); processor->Unblock(); } @@ -356,7 +358,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase1() const { auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName).get(); auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); auto processor - = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline->mProcessorLine[0].get()->mPlugin.get())); AddDataToSenderQueue(configName, "test-data-1", flusher); AddDataToSenderQueue(configName, "test-data-2", flusher); AddDataToSenderQueue(configName, "test-data-3", flusher); @@ -540,7 +542,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase1() const { auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName).get(); auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); auto processor - = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline->mProcessorLine[0].get()->mPlugin.get())); AddDataToSenderQueue(configName, "test-data-1", flusher); AddDataToSenderQueue(configName, "test-data-2", flusher); AddDataToSenderQueue(configName, "test-data-3", flusher); @@ -724,7 +726,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase1() const { auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName).get(); auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); auto processor - = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline->mProcessorLine[0].get()->mPlugin.get())); AddDataToSenderQueue(configName, "test-data-1", flusher); AddDataToSenderQueue(configName, "test-data-2", flusher); AddDataToSenderQueue(configName, "test-data-3", flusher); @@ -775,7 +777,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase2() const { auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName).get(); auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); auto processor - = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline->mProcessorLine[0].get()->mPlugin.get())); AddDataToSenderQueue(configName, "test-data-1", flusher); AddDataToSenderQueue(configName, "test-data-2", flusher); AddDataToSenderQueue(configName, "test-data-3", flusher); @@ -833,7 +835,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase3() const { auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName).get(); auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); auto processor - = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline->mProcessorLine[0].get()->mPlugin.get())); AddDataToSenderQueue(configName, "test-data-1", flusher); AddDataToSenderQueue(configName, "test-data-2", flusher); AddDataToSenderQueue(configName, "test-data-3", flusher); @@ -1290,7 +1292,7 @@ void PipelineUpdateUnittest::TestPipelineInputBlock() const { auto input = static_cast(const_cast(pipeline->GetInputs()[0].get()->GetPlugin())); auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); auto processor - = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline->mProcessorLine[0].get()->mPlugin.get())); input->Block(); AddDataToSenderQueue(configName, "test-data-1", flusher); AddDataToSenderQueue(configName, "test-data-2", flusher); @@ -1608,7 +1610,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase1() const { auto result = async(launch::async, [&]() { this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 - = static_cast(const_cast(pipeline1->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate3); @@ -1622,7 +1624,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase1() const { HttpSinkMock::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 - = static_cast(const_cast(pipeline2->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); processor2->Unblock(); UnBlockProcessor(configName); VerifyData("test_logstore_1", 1, 3); @@ -1684,7 +1686,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase2() const { auto result = async(launch::async, [&]() { this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 - = static_cast(const_cast(pipeline1->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate3); @@ -1698,7 +1700,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase2() const { HttpSinkMock::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 - = static_cast(const_cast(pipeline2->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); processor2->Unblock(); UnBlockProcessor(configName); VerifyData("test_logstore_1", 1, 3); @@ -1760,7 +1762,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase3() const { auto result = async(launch::async, [&]() { this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 - = static_cast(const_cast(pipeline1->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate3); @@ -1774,7 +1776,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase3() const { HttpSinkMock::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 - = static_cast(const_cast(pipeline2->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); processor2->Unblock(); UnBlockProcessor(configName); VerifyData("test_logstore_1", 1, 3); @@ -1833,7 +1835,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase4() const { auto result = async(launch::async, [&]() { this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 - = static_cast(const_cast(pipeline1->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate3); @@ -1847,7 +1849,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase4() const { HttpSinkMock::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 - = static_cast(const_cast(pipeline2->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); processor2->Unblock(); UnBlockProcessor(configName); VerifyData("test_logstore_1", 1, 3); @@ -1907,7 +1909,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase5() const { auto result = async(launch::async, [&]() { this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 - = static_cast(const_cast(pipeline1->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate3); @@ -1921,7 +1923,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase5() const { HttpSinkMock::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 - = static_cast(const_cast(pipeline2->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); processor2->Unblock(); UnBlockProcessor(configName); VerifyData("test_logstore_2", 1, 1); @@ -1977,7 +1979,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase6() const { auto result = async(launch::async, [&]() { this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 - = static_cast(const_cast(pipeline1->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate3); @@ -1991,7 +1993,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase6() const { HttpSinkMock::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 - = static_cast(const_cast(pipeline2->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); processor2->Unblock(); UnBlockProcessor(configName); VerifyData("test_logstore_2", 1, 1); @@ -2047,7 +2049,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase7() const { auto result = async(launch::async, [&]() { this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 - = static_cast(const_cast(pipeline1->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate3); @@ -2061,7 +2063,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase7() const { HttpSinkMock::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 - = static_cast(const_cast(pipeline2->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); processor2->Unblock(); UnBlockProcessor(configName); VerifyData("test_logstore_2", 1, 1); @@ -2114,7 +2116,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase8() const { auto result = async(launch::async, [&]() { this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 - = static_cast(const_cast(pipeline1->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate3); @@ -2128,13 +2130,136 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase8() const { HttpSinkMock::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 - = static_cast(const_cast(pipeline2->GetProcessors()[0].get()->GetPlugin())); + = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); processor2->Unblock(); UnBlockProcessor(configName); VerifyData("test_logstore_2", 1, 1); VerifyData("test_logstore_3", 2, 4); } +void PipelineUpdateUnittest::TestPipelineUpdateManyCase9() const { + // update 3 times + // 1. process queue empty, send queue not empty + // 2. add data to send queue + const std::string configName = "test1"; + ProcessorRunner::GetInstance()->Stop(); + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + // Add data without trigger + auto pipeline1 = PipelineManager::GetInstance()->GetAllPipelines().at(configName).get(); + auto flusher1 = const_cast(pipeline1->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-1", flusher1); + AddDataToSenderQueue(configName, "test-data-2", flusher1); + AddDataToSenderQueue(configName, "test-data-3", flusher1); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate2 + = GeneratePipelineConfigJson(nativeInputConfig2, nativeProcessorConfig2, nativeFlusherConfig2); + PipelineConfigDiff diffUpdate2; + PipelineConfig pipelineConfigObjUpdate2 + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate2)); + pipelineConfigObjUpdate2.Parse(); + diffUpdate2.mModified.push_back(std::move(pipelineConfigObjUpdate2)); + pipelineManager->UpdatePipelines(diffUpdate2); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + auto pipeline2 = PipelineManager::GetInstance()->GetAllPipelines().at(configName).get(); + auto flusher2 = const_cast(pipeline2->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-4", flusher2); + AddDataToSenderQueue(configName, "test-data-5", flusher2); + AddDataToSenderQueue(configName, "test-data-6", flusher2); + + ProcessorRunner::GetInstance()->Init(); + // load new pipeline + Json::Value pipelineConfigJsonUpdate3 + = GeneratePipelineConfigJson(nativeInputConfig3, nativeProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate3; + PipelineConfig pipelineConfigObjUpdate3 + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate3)); + pipelineConfigObjUpdate3.Parse(); + diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); + pipelineManager->UpdatePipelines(diffUpdate3); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + AddDataToProcessQueue(configName, "test-data-7"); + AddDataToProcessQueue(configName, "test-data-8"); + AddDataToProcessQueue(configName, "test-data-9"); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + UnBlockProcessor(configName); + VerifyData("test_logstore_1", 1, 3); + VerifyData("test_logstore_2", 4, 6); + VerifyData("test_logstore_3", 7, 9); +} + +void PipelineUpdateUnittest::TestPipelineUpdateManyCase10() const { + // update 3 times + // 1. process queue empty, send queue not empty + // 2. not add data to send queue + const std::string configName = "test1"; + ProcessorRunner::GetInstance()->Stop(); + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + // Add data without trigger + auto pipeline1 = PipelineManager::GetInstance()->GetAllPipelines().at(configName).get(); + auto flusher1 = const_cast(pipeline1->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-1", flusher1); + AddDataToSenderQueue(configName, "test-data-2", flusher1); + AddDataToSenderQueue(configName, "test-data-3", flusher1); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate2 + = GeneratePipelineConfigJson(nativeInputConfig2, nativeProcessorConfig2, nativeFlusherConfig2); + PipelineConfigDiff diffUpdate2; + PipelineConfig pipelineConfigObjUpdate2 + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate2)); + pipelineConfigObjUpdate2.Parse(); + diffUpdate2.mModified.push_back(std::move(pipelineConfigObjUpdate2)); + pipelineManager->UpdatePipelines(diffUpdate2); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + ProcessorRunner::GetInstance()->Init(); + // load new pipeline + Json::Value pipelineConfigJsonUpdate3 + = GeneratePipelineConfigJson(nativeInputConfig3, nativeProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate3; + PipelineConfig pipelineConfigObjUpdate3 + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate3)); + pipelineConfigObjUpdate3.Parse(); + diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); + pipelineManager->UpdatePipelines(diffUpdate3); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + AddDataToProcessQueue(configName, "test-data-4"); + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + UnBlockProcessor(configName); + VerifyData("test_logstore_1", 1, 3); + VerifyData("test_logstore_3", 4, 6); +} + UNIT_TEST_CASE(PipelineUpdateUnittest, TestFileServerStart) UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineParamUpdateCase1) UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineParamUpdateCase2) @@ -2169,6 +2294,8 @@ UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineUpdateManyCase5) UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineUpdateManyCase6) UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineUpdateManyCase7) UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineUpdateManyCase8) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineUpdateManyCase9) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineUpdateManyCase10) } // namespace logtail