diff --git a/core/common/http/HttpResponse.h b/core/common/http/HttpResponse.h index 228f491264..c4282df428 100644 --- a/core/common/http/HttpResponse.h +++ b/core/common/http/HttpResponse.h @@ -75,7 +75,7 @@ class HttpResponse { HttpResponse() : mHeader(compareHeader), mBody(new std::string(), [](void* p) { delete static_cast(p); }), - mWriteCallback(DefaultWriteCallback) {}; + mWriteCallback(DefaultWriteCallback){}; HttpResponse(void* body, const std::function& bodyDeleter, size_t (*callback)(char*, size_t, size_t, void*)) @@ -155,6 +155,10 @@ class HttpResponse { std::map mHeader; std::unique_ptr> mBody; size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class HttpSinkMock; +#endif }; } // namespace logtail diff --git a/core/config/PipelineConfig.cpp b/core/config/PipelineConfig.cpp index 4aeea027ca..a9c43c3872 100644 --- a/core/config/PipelineConfig.cpp +++ b/core/config/PipelineConfig.cpp @@ -250,6 +250,11 @@ bool PipelineConfig::Parse() { if (pluginType == "input_file" || pluginType == "input_container_stdio") { hasFileInput = true; } +#ifdef APSARA_UNIT_TEST_MAIN + if (pluginType.find("mock") != string::npos) { + hasFileInput = true; + } +#endif } // TODO: remove these special restrictions if (hasFileInput && (*mDetail)["inputs"].size() > 1) { @@ -530,7 +535,9 @@ bool PipelineConfig::Parse() { } mRouter.emplace_back(i, itr); } else { - mRouter.emplace_back(i, nullptr); + if (!IsFlushingThroughGoPipelineExisted()) { + mRouter.emplace_back(i, nullptr); + } } } diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index 340a6b6763..012d83967c 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -350,7 +350,8 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) { void LogInput::UpdateCriticalMetric(int32_t curTime) { mLastRunTime->Set(mLastReadEventTime.load()); - LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize()); + LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal( + GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize()); mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount()); mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount()); mEventProcessCount = 0; @@ -529,6 +530,7 @@ Event* LogInput::PopEventQueue() { #ifdef APSARA_UNIT_TEST_MAIN void LogInput::CleanEnviroments() { mIdleFlag = true; + mInteruptFlag = true; usleep(100 * 1000); while (true) { Event* ev = PopEventQueue(); diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 3ed21f7d2b..9d8ff87751 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -32,6 +32,9 @@ #include "plugin/flusher/sls/FlusherSLS.h" #include "plugin/input/InputFeedbackInterfaceRegistry.h" #include "plugin/processor/ProcessorParseApsaraNative.h" +#ifdef APSARA_UNIT_TEST_MAIN +#include "unittest/pipeline/LogtailPluginMock.h" +#endif DECLARE_FLAG_INT32(default_plugin_log_queue_size); @@ -338,18 +341,29 @@ bool Pipeline::Init(PipelineConfig&& config) { void Pipeline::Start() { // #ifndef APSARA_UNIT_TEST_MAIN // TODO: 应该保证指定时间内返回,如果无法返回,将配置放入startDisabled里 + LOG_WARNING(sLogger, ("debug", "8")); for (const auto& flusher : mFlushers) { flusher->Start(); } + LOG_WARNING(sLogger, ("debug", "9")); if (!mGoPipelineWithoutInput.isNull()) { +#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput()); +#else + LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput()); +#endif } + LOG_WARNING(sLogger, ("debug", "10")); ProcessQueueManager::GetInstance()->EnablePop(mName); if (!mGoPipelineWithInput.isNull()) { +#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput()); +#else + LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput()); +#endif } for (const auto& input : mInputs) { @@ -395,8 +409,9 @@ bool Pipeline::Send(vector&& groupList) { auto res = mRouter.Route(group); for (auto& item : res) { if (item.first >= mFlushers.size()) { - LOG_ERROR(sLogger, - ("unexpected error", "invalid flusher index")("flusher index", item.first)("config", mName)); + LOG_WARNING(sLogger, + ("pipeline send", "discard data")("config", mName)( + "reason", "invalid flusher index or config update flusher from C++ to Go")); allSucceeded = false; continue; } @@ -424,7 +439,11 @@ void Pipeline::Stop(bool isRemoving) { if (!mGoPipelineWithInput.isNull()) { // Go pipeline `Stop` will stop and delete +#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving); +#else + LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving); +#endif } ProcessQueueManager::GetInstance()->DisablePop(mName, isRemoving); @@ -434,7 +453,11 @@ void Pipeline::Stop(bool isRemoving) { if (!mGoPipelineWithoutInput.isNull()) { // Go pipeline `Stop` will stop and delete +#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving); +#else + LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving); +#endif } for (const auto& flusher : mFlushers) { @@ -488,6 +511,7 @@ void Pipeline::CopyNativeGlobalParamToGoPipeline(Json::Value& pipeline) { } bool Pipeline::LoadGoPipelines() const { +#ifndef APSARA_UNIT_TEST_MAIN if (!mGoPipelineWithoutInput.isNull()) { string content = mGoPipelineWithoutInput.toStyledString(); if (!LogtailPlugin::GetInstance()->LoadPipeline(GetConfigNameOfGoPipelineWithoutInput(), @@ -529,6 +553,7 @@ bool Pipeline::LoadGoPipelines() const { return false; } } +#endif return true; } diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index 29666c68c1..a5a4799bba 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -62,6 +62,7 @@ class Pipeline { PipelineContext& GetContext() const { return mContext; } const Json::Value& GetConfig() const { return *mConfig; } const std::optional& GetSingletonInput() const { return mSingletonInput; } + const std::vector>& GetProcessors() const { return mProcessorLine; } const std::vector>& GetFlushers() const { return mFlushers; } bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); } const std::unordered_map>& GetPluginStatistics() const { diff --git a/core/pipeline/plugin/instance/ProcessorInstance.h b/core/pipeline/plugin/instance/ProcessorInstance.h index 298c99a78a..154c0b9ceb 100644 --- a/core/pipeline/plugin/instance/ProcessorInstance.h +++ b/core/pipeline/plugin/instance/ProcessorInstance.h @@ -33,6 +33,7 @@ class ProcessorInstance : public PluginInstance { ProcessorInstance(Processor* plugin, const PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {} const std::string& Name() const override { return mPlugin->Name(); }; + const Processor* GetPlugin() const { return mPlugin.get(); } bool Init(const Json::Value& config, PipelineContext& context); void Process(std::vector& logGroupList); diff --git a/core/pipeline/plugin/interface/Flusher.h b/core/pipeline/plugin/interface/Flusher.h index 232020df34..a59e73eb6d 100644 --- a/core/pipeline/plugin/interface/Flusher.h +++ b/core/pipeline/plugin/interface/Flusher.h @@ -62,6 +62,7 @@ class Flusher : public Plugin { friend class FlusherInstanceUnittest; friend class FlusherRunnerUnittest; friend class FlusherUnittest; + friend class PipelineUpdateUnittest; #endif }; diff --git a/core/pipeline/queue/ProcessQueueManager.h b/core/pipeline/queue/ProcessQueueManager.h index dbe47efeaa..289bafba5b 100644 --- a/core/pipeline/queue/ProcessQueueManager.h +++ b/core/pipeline/queue/ProcessQueueManager.h @@ -93,6 +93,7 @@ class ProcessQueueManager : public FeedbackInterface { void Clear(); friend class ProcessQueueManagerUnittest; friend class PipelineUnittest; + friend class PipelineUpdateUnittest; #endif }; diff --git a/core/pipeline/queue/SenderQueueManager.h b/core/pipeline/queue/SenderQueueManager.h index 6a8c6a1ad7..cc159daa86 100644 --- a/core/pipeline/queue/SenderQueueManager.h +++ b/core/pipeline/queue/SenderQueueManager.h @@ -98,6 +98,7 @@ class SenderQueueManager : public FeedbackInterface { #ifdef APSARA_UNIT_TEST_MAIN friend class SenderQueueManagerUnittest; friend class FlusherRunnerUnittest; + friend class PipelineUpdateUnittest; #endif }; diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index e9c0e39861..440c1dcd88 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -486,7 +486,9 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline // CompressType if (BOOL_FLAG(sls_client_send_compress)) { +#ifndef APSARA_UNIT_TEST_MAIN mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, mPluginID, CompressType::LZ4); +#endif } mGroupSerializer = make_unique(this); @@ -672,7 +674,11 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) auto data = static_cast(item); string configName = HasContext() ? GetContext().GetConfigName() : ""; +#ifndef APSARA_UNIT_TEST_MAIN bool isProfileData = GetProfileSender()->IsProfileData(mRegion, mProject, data->mLogstore); +#else + bool isProfileData = false; +#endif int32_t curTime = time(NULL); auto curSystemTime = chrono::system_clock::now(); bool hasAuthError = false; diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 38b09d78de..3289df110b 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -27,6 +27,9 @@ #include "pipeline/queue/SenderQueueManager.h" #include "plugin/flusher/sls/DiskBufferWriter.h" #include "runner/sink/http/HttpSink.h" +#ifdef APSARA_UNIT_TEST_MAIN +#include "unittest/pipeline/HttpSinkMock.h" +#endif // TODO: temporarily used here #include "plugin/flusher/sls/PackIdManager.h" #include "plugin/flusher/sls/SLSClientManager.h" @@ -59,6 +62,7 @@ bool FlusherRunner::Init() { mThreadRes = async(launch::async, &FlusherRunner::Run, this); mLastCheckSendClientTime = time(nullptr); + mIsFlush = false; return true; } @@ -139,12 +143,17 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) { } req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now(); +#ifndef APSARA_UNIT_TEST_MAIN HttpSink::GetInstance()->AddRequest(std::move(req)); ++mHttpSendingCnt; LOG_DEBUG(sLogger, ("send item to http sink, item address", item)("config-flusher-dst", QueueKeyManager::GetInstance()->GetName(item->mQueueKey))( "sending cnt", ToString(mHttpSendingCnt.load()))); +#else + HttpSinkMock::GetInstance()->AddRequest(std::move(req)); // release item here + ++mHttpSendingCnt; +#endif } void FlusherRunner::Run() { diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index 76c34afe28..1c7ecdd65f 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -22,9 +22,9 @@ #include "monitor/AlarmManager.h" #include "monitor/metric_constants/MetricConstants.h" #include "pipeline/PipelineManager.h" -#include "queue/ExactlyOnceQueueManager.h" #include "queue/ProcessQueueManager.h" #include "queue/QueueKeyManager.h" +#include "unittest/pipeline/LogtailPluginMock.h" DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1); DEFINE_FLAG_INT32(processor_runner_exit_timeout_secs, "", 60); @@ -49,6 +49,7 @@ void ProcessorRunner::Init() { for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) { mThreadRes[threadNo] = async(launch::async, &ProcessorRunner::Run, this, threadNo); } + mIsFlush = false; } void ProcessorRunner::Stop() { @@ -141,8 +142,11 @@ void ProcessorRunner::Run(uint32_t threadNo) { eventGroupList.emplace_back(std::move(item->mEventGroup)); pipeline->Process(eventGroupList, item->mInputIndex); // if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline + shared_ptr oldPipeline; if (hasOldPipeline) { - pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); + pipeline->SubInProcessCnt(); // old pipeline + pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); // update to new pipeline + pipeline->AddInProcessCnt(); if (!pipeline) { LOG_INFO(sLogger, ("pipeline not found during processing, perhaps due to config deletion", @@ -175,11 +179,19 @@ void ProcessorRunner::Run(uint32_t threadNo) { pipeline->GetContext().GetRegion()); continue; } +#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->ProcessLogGroup( pipeline->GetContext().GetConfigName(), res, group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string()); } +#else + LogtailPluginMock::GetInstance()->ProcessLogGroup( + pipeline->GetContext().GetConfigName(), + res, + group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string()); + } +#endif } } else { pipeline->Send(std::move(eventGroupList)); diff --git a/core/runner/sink/http/HttpSink.h b/core/runner/sink/http/HttpSink.h index ad788adee8..3ab9201623 100644 --- a/core/runner/sink/http/HttpSink.h +++ b/core/runner/sink/http/HttpSink.h @@ -23,9 +23,9 @@ #include #include +#include "monitor/MetricManager.h" #include "runner/sink/Sink.h" #include "runner/sink/http/HttpSinkRequest.h" -#include "monitor/MetricManager.h" namespace logtail { @@ -68,6 +68,7 @@ class HttpSink : public Sink { #ifdef APSARA_UNIT_TEST_MAIN friend class FlusherRunnerUnittest; + friend class HttpSinkMock; #endif }; diff --git a/core/unittest/config/PipelineManagerMock.h b/core/unittest/config/PipelineManagerMock.h index 4a27a50802..8bcd8cb6ce 100644 --- a/core/unittest/config/PipelineManagerMock.h +++ b/core/unittest/config/PipelineManagerMock.h @@ -31,9 +31,6 @@ class PipelineMock : public Pipeline { mContext.SetCreateTime(config.mCreateTime); return (*mConfig)["valid"].asBool(); } - - bool Start() { return true; } - void Stop(bool isRemoving) {} }; class PipelineManagerMock : public PipelineManager { @@ -44,6 +41,9 @@ class PipelineManagerMock : public PipelineManager { } void ClearEnvironment() { + for (auto& it : mPipelineNameEntityMap) { + it.second->Stop(true); + } mPipelineNameEntityMap.clear(); mPluginCntMap.clear(); } diff --git a/core/unittest/pipeline/HttpSinkMock.h b/core/unittest/pipeline/HttpSinkMock.h new file mode 100644 index 0000000000..cfc5bf92fd --- /dev/null +++ b/core/unittest/pipeline/HttpSinkMock.h @@ -0,0 +1,70 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "logger/Logger.h" +#include "pipeline/plugin/interface/HttpFlusher.h" +#include "pipeline/queue/SLSSenderQueueItem.h" +#include "runner/FlusherRunner.h" +#include "runner/sink/http/HttpSink.h" +#include "sdk/Common.h" + +namespace logtail { +class HttpSinkMock : public HttpSink { +public: + HttpSinkMock(const HttpSinkMock&) = delete; + HttpSinkMock& operator=(const HttpSinkMock&) = delete; + + static HttpSinkMock* GetInstance() { + static HttpSinkMock instance; + return &instance; + } + + bool AddRequest(std::unique_ptr&& request) { + { + std::lock_guard lock(mMutex); + mRequests.push_back(request->mBody); + } + request->mResponse.SetStatusCode(200); + request->mResponse.mHeader[sdk::X_LOG_REQUEST_ID] = "request_id"; + static_cast(request->mItem)->mExactlyOnceCheckpoint = nullptr; + static_cast(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem); + FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); + request.reset(); + return true; + } + + std::vector GetRequests() { + std::lock_guard lock(mMutex); + return mRequests; + } + + void ClearRequests() { + std::lock_guard lock(mMutex); + mRequests.clear(); + } + +private: + HttpSinkMock() = default; + ~HttpSinkMock() = default; + + std::atomic_bool mIsFlush = false; + mutable std::mutex mMutex; + std::vector mRequests; +}; + +} // namespace logtail \ No newline at end of file diff --git a/core/unittest/pipeline/LogtailPluginMock.h b/core/unittest/pipeline/LogtailPluginMock.h new file mode 100644 index 0000000000..35c8130070 --- /dev/null +++ b/core/unittest/pipeline/LogtailPluginMock.h @@ -0,0 +1,83 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "go_pipeline/LogtailPlugin.h" + +namespace logtail { +class LogtailPluginMock : public LogtailPlugin { +public: + static LogtailPluginMock* GetInstance() { + static LogtailPluginMock instance; + return &instance; + } + + void BlockStart() { startBlockFlag = true; } + void UnblockStart() { startBlockFlag = false; } + void BlockProcess() { processBlockFlag = true; } + void UnblockProcess() { processBlockFlag = false; } + void BlockStop() { stopBlockFlag = true; } + void UnblockStop() { stopBlockFlag = false; } + + void Start(const std::string& configName) { + while (startBlockFlag) { + LOG_DEBUG(sLogger, ("LogtailPluginMock start", "block")("config", configName)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + startFlag = true; + LOG_INFO(sLogger, ("LogtailPluginMock start", "success")("config", configName)); + } + + void Stop(const std::string& configName, bool removingFlag) { + while (stopBlockFlag) { + LOG_DEBUG(sLogger, ("LogtailPluginMock stop", "block")("config", configName)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + startFlag = false; + LOG_INFO(sLogger, ("LogtailPluginMock stop", "success")("config", configName)); + } + + + void ProcessLogGroup(const std::string& configName, const std::string& logGroup, const std::string& packId) { + while (processBlockFlag) { + LOG_DEBUG(sLogger, ("LogtailPluginMock process log group", "block")("config", configName)); + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + } + LogtailPlugin::SendPbV2(configName.c_str(), + configName.size(), + "", + 0, + const_cast(logGroup.c_str()), + logGroup.size(), + 0, + "", + 0); + LOG_INFO(sLogger, + ("LogtailPluginMock process log group", "success")("config", configName)("logGroup", + logGroup)("packId", packId)); + } + + bool IsStarted() const { return startFlag; } + +private: + std::atomic_bool startBlockFlag = false; + std::atomic_bool processBlockFlag = false; + std::atomic_bool stopBlockFlag = false; + std::atomic_bool startFlag = false; +}; + +} // namespace logtail \ No newline at end of file diff --git a/core/unittest/pipeline/PipelineUpdateUnittest.cpp b/core/unittest/pipeline/PipelineUpdateUnittest.cpp index 26c2ef1702..831d50ef83 100644 --- a/core/unittest/pipeline/PipelineUpdateUnittest.cpp +++ b/core/unittest/pipeline/PipelineUpdateUnittest.cpp @@ -12,39 +12,97 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include -#include #include #include #include #include "common/JsonUtil.h" #include "config/PipelineConfig.h" -#include "file_server/FileServer.h" +#include "file_server/EventDispatcher.h" #include "file_server/event_handler/LogInput.h" #include "pipeline/plugin/PluginRegistry.h" +#include "pipeline/queue/ProcessQueueManager.h" +#include "pipeline/queue/QueueKeyManager.h" +#include "pipeline/queue/SLSSenderQueueItem.h" +#include "pipeline/queue/SenderQueueManager.h" +#include "runner/FlusherRunner.h" +#include "runner/ProcessorRunner.h" #include "unittest/Unittest.h" #include "unittest/config/PipelineManagerMock.h" +#include "unittest/pipeline/HttpSinkMock.h" +#include "unittest/pipeline/LogtailPluginMock.h" +#include "unittest/plugin/PluginMock.h" using namespace std; namespace logtail { + class PipelineUpdateUnittest : public testing::Test { public: - void TestFileServerStart() const; + void TestFileServerStart(); + void TestPipelineParamUpdateCase1() const; + void TestPipelineParamUpdateCase2() const; + void TestPipelineParamUpdateCase3() const; + void TestPipelineParamUpdateCase4() const; + void TestPipelineTypeUpdateCase1() const; + void TestPipelineTypeUpdateCase2() const; + void TestPipelineTypeUpdateCase3() const; + void TestPipelineTypeUpdateCase4() const; + void TestPipelineTopoUpdateCase1() const; + void TestPipelineTopoUpdateCase2() const; + void TestPipelineTopoUpdateCase3() const; + void TestPipelineTopoUpdateCase4() const; + void TestPipelineTopoUpdateCase5() const; + void TestPipelineTopoUpdateCase6() const; + void TestPipelineTopoUpdateCase7() const; + void TestPipelineTopoUpdateCase8() const; + void TestPipelineTopoUpdateCase9() const; + void TestPipelineTopoUpdateCase10() const; + void TestPipelineTopoUpdateCase11() const; + void TestPipelineTopoUpdateCase12() const; + void TestPipelineInputBlock() const; + void TestPipelineGoInputBlockCase1() const; + void TestPipelineGoInputBlockCase2() const; + void TestPipelineIsolationCase1() const; + void TestPipelineIsolationCase2() const; protected: - static void SetUpTestCase() { PluginRegistry::GetInstance()->LoadPlugins(); } + static void SetUpTestCase() { + PluginRegistry::GetInstance()->LoadPlugins(); + LoadPluginMock(); - static void TearDownTestCase() { - PluginRegistry::GetInstance()->UnloadPlugins(); - FileServer::GetInstance()->Stop(); + FlusherRunner::GetInstance()->mEnableRateLimiter = false; +#ifdef __ENTERPRISE__ + builtinPipelineCnt = EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); +#endif + SenderQueueManager::GetInstance()->mDefaultQueueParam.mCapacity = 1; // test extra buffer } - void SetUp() override {} + static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); } + + void SetUp() override { + LogInput::GetInstance()->CleanEnviroments(); + ProcessorRunner::GetInstance()->Init(); + isFileServerStart = false; // file server stop is not reentrant, so we stop it only when start it + } - void TearDown() override {} + void TearDown() override { + LogInput::GetInstance()->CleanEnviroments(); + EventDispatcher::GetInstance()->CleanEnviroments(); + for (auto& pipeline : PipelineManager::GetInstance()->GetAllPipelines()) { + pipeline.second->Stop(true); + } + PipelineManager::GetInstance()->mPipelineNameEntityMap.clear(); + if (isFileServerStart) { + FileServer::GetInstance()->Stop(); + } + ProcessorRunner::GetInstance()->Stop(); + FlusherRunner::GetInstance()->Stop(); + HttpSinkMock::GetInstance()->ClearRequests(); + HttpSinkMock::GetInstance()->Stop(); + LOG_WARNING(sLogger, ("tear down", "all")); + } private: Json::Value GeneratePipelineConfigJson(const string& inputConfig, @@ -66,43 +124,191 @@ class PipelineUpdateUnittest : public testing::Test { errorMsg); return json; } + + void AddDataToProcessQueue(const string& configName, const string& data) const { + auto key = QueueKeyManager::GetInstance()->GetKey(configName); + PipelineEventGroup g(std::make_shared()); + auto event = g.AddLogEvent(); + event->SetContent("content", data); + std::unique_ptr item = std::make_unique(std::move(g), 0); + { + auto manager = ProcessQueueManager::GetInstance(); + manager->CreateOrUpdateBoundedQueue(key, 0, PipelineContext{}); + lock_guard lock(manager->mQueueMux); + auto iter = manager->mQueues.find(key); + APSARA_TEST_NOT_EQUAL(iter, manager->mQueues.end()); + APSARA_TEST_TRUE_FATAL((*iter->second.first)->Push(std::move(item))); + } + }; + + void AddDataToProcessor(const string& configName, const string& data) const { + auto key = QueueKeyManager::GetInstance()->GetKey(configName); + PipelineEventGroup g(std::make_shared()); + auto event = g.AddLogEvent(); + event->SetContent("content", data); + ProcessorRunner::GetInstance()->PushQueue(key, 0, std::move(g)); + } + + void AddDataToSenderQueue(const string& configName, string&& data, Flusher* flusher) const { + auto key = flusher->mQueueKey; + auto cpt = make_shared(); + LOG_WARNING(sLogger, ("sender queue key", key)); + std::unique_ptr item = std::make_unique( + std::move(data), data.size(), flusher, key, "", RawDataType::EVENT_GROUP, "", std::move(cpt), false); + { + auto manager = SenderQueueManager::GetInstance(); + manager->CreateQueue(key, "", PipelineContext{}); + lock_guard lock(manager->mQueueMux); + auto iter = manager->mQueues.find(key); + APSARA_TEST_NOT_EQUAL(iter, manager->mQueues.end()); + LOG_WARNING(sLogger, ("add data to sender queue", item->mData)); + APSARA_TEST_TRUE_FATAL(iter->second.Push(std::move(item))); + } + } + + void VerifyData(size_t expectedDataCount) const { + for (size_t retry = 0; retry < 8; ++retry) { + this_thread::sleep_for(chrono::milliseconds(1000)); + auto requests = HttpSinkMock::GetInstance()->GetRequests(); + size_t i = 1; + size_t j = 0; + while ((i < expectedDataCount + 1) && j < requests.size()) { + LOG_WARNING(sLogger, ("requests", requests[j])); + if (requests[j].find("test-data-" + to_string(i)) != string::npos) { + ++i; + continue; + } + ++j; + } + if (i == expectedDataCount + 1) { + APSARA_TEST_EQUAL_FATAL(requests.size() - 1, j); + APSARA_TEST_EQUAL_FATAL(expectedDataCount + 1, i); + return; + } + } + } + + string nativeInputFileConfig = R"( + { + "Type": "input_file", + "FilePaths": [ + "/tmp/not_found.log" + ] + })"; string nativeInputConfig = R"( { - "Type": "input_file" + "Type": "input_mock", + "FilePaths": [ + "/tmp/not_found.log" + ] + })"; + string nativeInputConfig2 = R"( + { + "Type": "input_mock", + "FilePaths": [ + "/tmp/*.log" + ] + })"; + string nativeInputConfig3 = R"( + { + "Type": "input_mock2", + "FilePaths": [ + "/tmp/not_found.log" + ] })"; string nativeProcessorConfig = R"( { - "Type": "processor_parse_regex_native" + "Type": "processor_mock" + })"; + string nativeProcessorConfig2 = R"( + { + "Type": "processor_mock", + "Regex": ".*" + })"; + string nativeProcessorConfig3 = R"( + { + "Type": "processor_mock2" })"; string nativeFlusherConfig = R"( { - "Type": "flusher_sls" + "Type": "flusher_sls_mock", + "Project": "test_project", + "Logstore": "test_logstore_1", + "Region": "test_region", + "Endpoint": "test_endpoint" + })"; + string nativeFlusherConfig2 = R"( + { + "Type": "flusher_sls_mock", + "Project": "test_project", + "Logstore": "test_logstore_2", + "Region": "test_region", + "Endpoint": "test_endpoint" + })"; + string nativeFlusherConfig3 = R"( + { + "Type": "flusher_sls_mock2", + "Project": "test_project", + "Logstore": "test_logstore_1", + "Region": "test_region", + "Endpoint": "test_endpoint" })"; string goInputConfig = R"( { - "Type": "input_docker_stdout" + "Type": "service_docker_stdout_v2" + })"; + string goInputConfig2 = R"( + { + "Type": "service_docker_stdout_v2", + "Stdout": true + })"; + string goInputConfig3 = R"( + { + "Type": "service_docker_stdout_v3" })"; string goProcessorConfig = R"( { "Type": "processor_regex" })"; + string goProcessorConfig2 = R"( + { + "Type": "processor_regex", + "Regex": ".*" + })"; + string goProcessorConfig3 = R"( + { + "Type": "processor_regex2" + })"; string goFlusherConfig = R"( { "Type": "flusher_stdout" })"; + string goFlusherConfig2 = R"( + { + "Type": "flusher_stdout", + "Stdout": true + })"; + string goFlusherConfig3 = R"( + { + "Type": "flusher_stdout2" + })"; + + size_t builtinPipelineCnt = 0; + bool isFileServerStart = false; }; -void PipelineUpdateUnittest::TestFileServerStart() const { +void PipelineUpdateUnittest::TestFileServerStart() { + isFileServerStart = true; Json::Value nativePipelineConfigJson - = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + = GeneratePipelineConfigJson(nativeInputFileConfig, nativeProcessorConfig, nativeFlusherConfig); Json::Value goPipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); auto pipelineManager = PipelineManagerMock::GetInstance(); PipelineConfigDiff diff; PipelineConfig nativePipelineConfigObj - = PipelineConfig("test1", make_unique(nativePipelineConfigJson)); + = PipelineConfig("test-file-1", make_unique(nativePipelineConfigJson)); nativePipelineConfigObj.Parse(); diff.mAdded.push_back(std::move(nativePipelineConfigObj)); - PipelineConfig goPipelineConfigObj = PipelineConfig("test2", make_unique(goPipelineConfigJson)); + PipelineConfig goPipelineConfigObj = PipelineConfig("test-file-2", make_unique(goPipelineConfigJson)); goPipelineConfigObj.Parse(); diff.mAdded.push_back(std::move(goPipelineConfigObj)); @@ -111,7 +317,1137 @@ void PipelineUpdateUnittest::TestFileServerStart() const { APSARA_TEST_EQUAL_FATAL(false, LogInput::GetInstance()->mInteruptFlag); } +void PipelineUpdateUnittest::TestPipelineParamUpdateCase1() const { + // C++ -> C++ -> C++ + const std::string configName = "test1"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + auto processor + = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + processor->Block(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig2, nativeProcessorConfig2, nativeFlusherConfig2); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + processor->Unblock(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(7); +} + +void PipelineUpdateUnittest::TestPipelineParamUpdateCase2() const { + // Go -> Go -> Go + const std::string configName = "test2"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig2, goProcessorConfig2, goFlusherConfig2); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); +} + +void PipelineUpdateUnittest::TestPipelineParamUpdateCase3() const { + // Go -> Go -> C++ + const std::string configName = "test3"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig2, goProcessorConfig2, nativeFlusherConfig2); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(3); +} + +void PipelineUpdateUnittest::TestPipelineParamUpdateCase4() const { + // C++ -> Go -> C++ + const std::string configName = "test4"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + LogtailPluginMock::GetInstance()->BlockProcess(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig2, goProcessorConfig2, nativeFlusherConfig2); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + LogtailPluginMock::GetInstance()->UnblockProcess(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(7); +} + +void PipelineUpdateUnittest::TestPipelineTypeUpdateCase1() const { + // C++ -> C++ -> C++ + const std::string configName = "test1"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + auto processor + = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + processor->Block(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig3, nativeProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + processor->Unblock(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(7); +} + +void PipelineUpdateUnittest::TestPipelineTypeUpdateCase2() const { + // Go -> Go -> Go + const std::string configName = "test2"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig3, goProcessorConfig3, goFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); +} + +void PipelineUpdateUnittest::TestPipelineTypeUpdateCase3() const { + // Go -> Go -> C++ + const std::string configName = "test3"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig3, goProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(3); +} + +void PipelineUpdateUnittest::TestPipelineTypeUpdateCase4() const { + // C++ -> Go -> C++ + const std::string configName = "test4"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + LogtailPluginMock::GetInstance()->BlockProcess(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig3, goProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + LogtailPluginMock::GetInstance()->UnblockProcess(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(7); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase1() const { + // C++ -> C++ -> C++ => Go -> Go -> Go + const std::string configName = "test1"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + auto processor + = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + processor->Block(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + processor->Unblock(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(4); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase2() const { + // C++ -> C++ -> C++ => Go -> Go -> C++ + const std::string configName = "test2"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + auto processor + = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + processor->Block(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + processor->Unblock(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(7); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase3() const { + // C++ -> C++ -> C++ => C++ -> Go -> C++ + const std::string configName = "test3"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + auto processor + = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + processor->Block(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig3, goProcessorConfig, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + processor->Unblock(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(7); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase4() const { + // Go -> Go -> Go => C++ -> C++ -> C++ + const std::string configName = "test4"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig3, nativeProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(false, LogtailPluginMock::GetInstance()->IsStarted()); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase5() const { + // Go -> Go -> Go => Go -> Go -> C++ + const std::string configName = "test5"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig3, goProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase6() const { + // Go -> Go -> Go => C++ -> Go -> C++ + const std::string configName = "test6"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig3, goProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase7() const { + // Go -> Go -> C++ => C++ -> C++ -> C++ + const std::string configName = "test7"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig3, nativeProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(false, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(3); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase8() const { + // Go -> Go -> C++ => Go -> Go -> Go + const std::string configName = "test8"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig3, goProcessorConfig3, goFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(3); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase9() const { + // Go -> Go -> C++ => C++ -> Go -> C++ + const std::string configName = "test9"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig3, goProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + pipelineManager->UpdatePipelines(diffUpdate); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(3); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase10() const { + // C++ -> Go -> C++ => C++ -> C++ -> C++ + const std::string configName = "test10"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + LogtailPluginMock::GetInstance()->BlockProcess(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig3, nativeProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + LogtailPluginMock::GetInstance()->UnblockProcess(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(false, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(7); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase11() const { + // C++ -> Go -> C++ => Go -> Go -> Go + const std::string configName = "test11"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + LogtailPluginMock::GetInstance()->BlockProcess(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig3, goProcessorConfig3, goFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + LogtailPluginMock::GetInstance()->UnblockProcess(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(4); +} + +void PipelineUpdateUnittest::TestPipelineTopoUpdateCase12() const { + // C++ -> Go -> C++ => Go -> Go -> C++ + const std::string configName = "test12"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + LogtailPluginMock::GetInstance()->BlockProcess(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig3, goProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + LogtailPluginMock::GetInstance()->UnblockProcess(); + }); + pipelineManager->UpdatePipelines(diffUpdate); + result.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(7); +} + +void PipelineUpdateUnittest::TestPipelineInputBlock() const { + // C++ -> C++ -> C++ + const std::string configName = "test1"; + // load old pipeline + Json::Value pipelineConfigJson + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto input = static_cast(const_cast(pipeline->GetInputs()[0].get()->GetPlugin())); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + auto processor + = static_cast(const_cast(pipeline->GetProcessors()[0].get()->GetPlugin())); + input->Block(); + processor->Block(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + AddDataToProcessor(configName, "test-data-4"); + + AddDataToProcessQueue(configName, "test-data-5"); + AddDataToProcessQueue(configName, "test-data-6"); + AddDataToProcessQueue(configName, "test-data-7"); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig2, nativeProcessorConfig2, nativeFlusherConfig2); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result1 = async(launch::async, [&]() { + this_thread::sleep_for(chrono::milliseconds(2000)); + processor->Unblock(); + }); + auto result2 = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); + this_thread::sleep_for(chrono::milliseconds(2000)); + APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result2.wait_for(chrono::milliseconds(0))); + input->Unblock(); + result1.get(); + result2.get(); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(7); +} + +void PipelineUpdateUnittest::TestPipelineGoInputBlockCase1() const { + // Go -> Go -> C++ => Go -> Go -> C++ + const std::string configName = "test1"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + LogtailPluginMock::GetInstance()->BlockStop(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(goInputConfig3, goProcessorConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); + this_thread::sleep_for(chrono::milliseconds(2000)); + APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result.wait_for(chrono::milliseconds(0))); + LogtailPluginMock::GetInstance()->UnblockStop(); + result.get(); + + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(3); +} + +void PipelineUpdateUnittest::TestPipelineGoInputBlockCase2() const { + // Go -> Go -> C++ => C++ -> Go -> C++ + const std::string configName = "test1"; + // load old pipeline + Json::Value pipelineConfigJson = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig); + auto pipelineManager = PipelineManager::GetInstance(); + PipelineConfigDiff diff; + PipelineConfig pipelineConfigObj = PipelineConfig(configName, make_unique(pipelineConfigJson)); + pipelineConfigObj.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj)); + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); + + // Add data without trigger + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + LogtailPluginMock::GetInstance()->BlockStop(); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + + // load new pipeline + Json::Value pipelineConfigJsonUpdate + = GeneratePipelineConfigJson(nativeInputConfig3, nativeFlusherConfig3, nativeFlusherConfig3); + PipelineConfigDiff diffUpdate; + PipelineConfig pipelineConfigObjUpdate + = PipelineConfig(configName, make_unique(pipelineConfigJsonUpdate)); + pipelineConfigObjUpdate.Parse(); + diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); + auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); + this_thread::sleep_for(chrono::milliseconds(2000)); + APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result.wait_for(chrono::milliseconds(0))); + LogtailPluginMock::GetInstance()->UnblockStop(); + result.get(); + + APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + APSARA_TEST_EQUAL_FATAL(false, LogtailPluginMock::GetInstance()->IsStarted()); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + VerifyData(3); +} + +void PipelineUpdateUnittest::TestPipelineIsolationCase1() const { + PipelineConfigDiff diff; + auto pipelineManager = PipelineManager::GetInstance(); + // C++ -> C++ -> C++ + Json::Value pipelineConfigJson1 + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + PipelineConfig pipelineConfigObj1 = PipelineConfig("test1", make_unique(pipelineConfigJson1)); + pipelineConfigObj1.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj1)); + // Go -> Go -> Go + Json::Value pipelineConfigJson2 = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + PipelineConfig pipelineConfigObj2 = PipelineConfig("test2", make_unique(pipelineConfigJson2)); + pipelineConfigObj2.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj2)); + // Go -> Go -> C++ + Json::Value pipelineConfigJson3 = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig); + PipelineConfig pipelineConfigObj3 = PipelineConfig("test3", make_unique(pipelineConfigJson3)); + pipelineConfigObj3.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj3)); + // C++ -> Go -> C++ + Json::Value pipelineConfigJson4 + = GeneratePipelineConfigJson(nativeInputConfig, goProcessorConfig, nativeFlusherConfig); + PipelineConfig pipelineConfigObj4 = PipelineConfig("test4", make_unique(pipelineConfigJson4)); + pipelineConfigObj4.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj4)); + + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(4U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + PipelineConfigDiff diffUpdate; + diffUpdate.mRemoved.push_back("test1"); + auto pipeline = pipelineManager->GetAllPipelines().at("test1"); + auto input = static_cast(const_cast(pipeline->GetInputs()[0].get()->GetPlugin())); + input->Block(); + + HttpSinkMock::GetInstance()->Init(); + FlusherRunner::GetInstance()->Init(); + auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); + { // add data to Go -> Go -> C++ + std::string configName = "test3"; + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + VerifyData(3); + } + HttpSinkMock::GetInstance()->ClearRequests(); + { // add data to C++ -> Go -> C++ + std::string configName = "test4"; + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + AddDataToProcessQueue(configName, "test-data-1"); + AddDataToProcessQueue(configName, "test-data-2"); + AddDataToProcessQueue(configName, "test-data-3"); + VerifyData(3); + } + + input->Unblock(); + result.get(); + APSARA_TEST_EQUAL_FATAL(3U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); +} + +void PipelineUpdateUnittest::TestPipelineIsolationCase2() const { + PipelineConfigDiff diff; + auto pipelineManager = PipelineManager::GetInstance(); + // C++ -> C++ -> C++ + Json::Value pipelineConfigJson1 + = GeneratePipelineConfigJson(nativeInputConfig, nativeProcessorConfig, nativeFlusherConfig); + PipelineConfig pipelineConfigObj1 = PipelineConfig("test1", make_unique(pipelineConfigJson1)); + pipelineConfigObj1.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj1)); + // Go -> Go -> Go + Json::Value pipelineConfigJson2 = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, goFlusherConfig); + PipelineConfig pipelineConfigObj2 = PipelineConfig("test2", make_unique(pipelineConfigJson2)); + pipelineConfigObj2.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj2)); + // Go -> Go -> C++ + Json::Value pipelineConfigJson3 = GeneratePipelineConfigJson(goInputConfig, goProcessorConfig, nativeFlusherConfig); + PipelineConfig pipelineConfigObj3 = PipelineConfig("test3", make_unique(pipelineConfigJson3)); + pipelineConfigObj3.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj3)); + // C++ -> Go -> C++ + Json::Value pipelineConfigJson4 + = GeneratePipelineConfigJson(nativeInputConfig, goProcessorConfig, nativeFlusherConfig); + PipelineConfig pipelineConfigObj4 = PipelineConfig("test4", make_unique(pipelineConfigJson4)); + pipelineConfigObj4.Parse(); + diff.mAdded.push_back(std::move(pipelineConfigObj4)); + + pipelineManager->UpdatePipelines(diff); + APSARA_TEST_EQUAL_FATAL(4U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); + + PipelineConfigDiff diffUpdate; + diffUpdate.mRemoved.push_back("test4"); + auto pipeline = pipelineManager->GetAllPipelines().at("test4"); + auto input = static_cast(const_cast(pipeline->GetInputs()[0].get()->GetPlugin())); + input->Block(); + + auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); + { // add data to C++ -> Go -> C++ + std::string configName = "test1"; + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + AddDataToProcessQueue(configName, "test-data-1"); + AddDataToProcessQueue(configName, "test-data-2"); + AddDataToProcessQueue(configName, "test-data-3"); + VerifyData(3); + } + HttpSinkMock::GetInstance()->ClearRequests(); + { // add data to Go -> Go -> C++ + std::string configName = "test3"; + auto pipeline = PipelineManager::GetInstance()->GetAllPipelines().at(configName); + auto flusher = const_cast(pipeline->GetFlushers()[0].get()->GetPlugin()); + AddDataToSenderQueue(configName, "test-data-1", flusher); + AddDataToSenderQueue(configName, "test-data-2", flusher); + AddDataToSenderQueue(configName, "test-data-3", flusher); + VerifyData(3); + } + + input->Unblock(); + result.get(); + APSARA_TEST_EQUAL_FATAL(3U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); +} + UNIT_TEST_CASE(PipelineUpdateUnittest, TestFileServerStart) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineParamUpdateCase1) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineParamUpdateCase2) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineParamUpdateCase3) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineParamUpdateCase4) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTypeUpdateCase1) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTypeUpdateCase2) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTypeUpdateCase3) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTypeUpdateCase4) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase1) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase2) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase3) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase4) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase5) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase6) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase7) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase8) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase9) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase10) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase11) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineTopoUpdateCase12) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineInputBlock) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineGoInputBlockCase1) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineGoInputBlockCase2) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineIsolationCase1) +UNIT_TEST_CASE(PipelineUpdateUnittest, TestPipelineIsolationCase2) } // namespace logtail diff --git a/core/unittest/plugin/PluginMock.h b/core/unittest/plugin/PluginMock.h index 14c95c8ce3..d5443dad58 100644 --- a/core/unittest/plugin/PluginMock.h +++ b/core/unittest/plugin/PluginMock.h @@ -27,7 +27,9 @@ #include "pipeline/plugin/interface/HttpFlusher.h" #include "pipeline/plugin/interface/Input.h" #include "pipeline/plugin/interface/Processor.h" +#include "pipeline/queue/SLSSenderQueueItem.h" #include "pipeline/queue/SenderQueueManager.h" +#include "plugin/flusher/sls/FlusherSLS.h" #include "task_pipeline/Task.h" #include "task_pipeline/TaskRegistry.h" @@ -65,30 +67,68 @@ class InputMock : public Input { return true; } bool Start() override { return true; } - bool Stop(bool isPipelineRemoving) override { return true; } + bool Stop(bool isPipelineRemoving) override { + while (mBlockFlag) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + LOG_DEBUG(sLogger, ("input mock", "block")); + } + return true; + } bool SupportAck() const override { return mSupportAck; } + void Block() { mBlockFlag = true; } + void Unblock() { mBlockFlag = false; } + bool mSupportAck = true; + +private: + std::atomic_bool mBlockFlag = false; }; const std::string InputMock::sName = "input_mock"; +class InputMock2 : public InputMock { +public: + static const std::string sName; +}; + +const std::string InputMock2::sName = "input_mock2"; + class ProcessorMock : public Processor { public: static const std::string sName; const std::string& Name() const override { return sName; } bool Init(const Json::Value& config) override { return true; } - void Process(PipelineEventGroup& logGroup) override { ++mCnt; }; + void Process(PipelineEventGroup& logGroup) override { + while (mBlockFlag) { + std::this_thread::sleep_for(std::chrono::milliseconds(100)); + LOG_INFO(sLogger, ("processor mock", "block")("cnt", mCnt)); + } + ++mCnt; + LOG_DEBUG(sLogger, ("processor mock", "process")("cnt", mCnt)); + }; + + void Block() { mBlockFlag.store(true); } + void Unblock() { mBlockFlag.store(false); } uint32_t mCnt = 0; protected: bool IsSupportedEvent(const PipelineEventPtr& e) const override { return true; }; + + std::atomic_bool mBlockFlag = false; }; const std::string ProcessorMock::sName = "processor_mock"; +class ProcessorMock2 : public ProcessorMock { +public: + static const std::string sName; +}; + +const std::string ProcessorMock2::sName = "processor_mock2"; + class FlusherMock : public Flusher { public: static const std::string sName; @@ -149,6 +189,29 @@ class FlusherHttpMock : public HttpFlusher { const std::string FlusherHttpMock::sName = "flusher_http_mock"; +class FlusherSLSMock : public FlusherSLS { +public: + static const std::string sName; + + bool BuildRequest(SenderQueueItem* item, std::unique_ptr& req, bool* keepItem) const override { + auto data = static_cast(item); + std::map header; + req = std::make_unique( + "POST", false, "test-host", 80, "/test-operation", "", header, data->mData, item); + LOG_WARNING(sLogger, ("build mock request", data->mData)); + return true; + } +}; + +const std::string FlusherSLSMock::sName = "flusher_sls_mock"; + +class FlusherSLSMock2 : public FlusherSLSMock { +public: + static const std::string sName; +}; + +const std::string FlusherSLSMock2::sName = "flusher_sls_mock2"; + class TaskMock : public Task { public: static const std::string sName; @@ -174,6 +237,10 @@ void LoadPluginMock() { PluginRegistry::GetInstance()->RegisterProcessorCreator(new StaticProcessorCreator()); PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator()); PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator()); + PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator()); + PluginRegistry::GetInstance()->RegisterInputCreator(new StaticInputCreator()); + PluginRegistry::GetInstance()->RegisterProcessorCreator(new StaticProcessorCreator()); + PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator()); } void LoadTaskMock() { diff --git a/pluginmanager/config_update_test.go b/pluginmanager/config_update_test.go index 66a3d6aef7..a31fbd0cbe 100644 --- a/pluginmanager/config_update_test.go +++ b/pluginmanager/config_update_test.go @@ -25,12 +25,14 @@ import ( "github.com/alibaba/ilogtail/pkg/logger" _ "github.com/alibaba/ilogtail/plugins/aggregator/baseagg" "github.com/alibaba/ilogtail/plugins/flusher/checker" + "github.com/alibaba/ilogtail/plugins/input/mockd" "github.com/stretchr/testify/suite" ) var updateConfigName = "update_mock_block" var noblockUpdateConfigName = "update_mock_noblock" +var noblockUpdateNoInputConfigName = "update_mock_noblock_no_input" func TestConfigUpdate(t *testing.T) { suite.Run(t, new(configUpdateTestSuite)) @@ -57,6 +59,7 @@ func (s *configUpdateTestSuite) AfterTest(suiteName, testName string) { } func (s *configUpdateTestSuite) TestConfigUpdate() { + // block config -> block config, unblock config, no input config // block config LogtailConfigLock.RLock() config := LogtailConfig[updateConfigName] @@ -64,6 +67,8 @@ func (s *configUpdateTestSuite) TestConfigUpdate() { s.NotNil(config, "%s logstrore config should exist", updateConfigName) checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) s.True(ok) + mockInput, ok := GetConfigInputs(config.PluginRunner)[0].(*mockd.ServiceMock) + s.True(ok) s.Equal(0, checkFlusher.GetLogCount(), "the block flusher checker doesn't have any logs") // update same hang config @@ -72,15 +77,55 @@ func (s *configUpdateTestSuite) TestConfigUpdate() { _ = LoadAndStartMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName)) // Since independently load config, reload block config will be allowed s.NoError(LoadAndStartMockConfig(noblockUpdateConfigName, noblockUpdateConfigName, noblockUpdateConfigName, GetTestConfig(noblockUpdateConfigName))) + s.NoError(LoadAndStartMockConfig(noblockUpdateNoInputConfigName, noblockUpdateNoInputConfigName, noblockUpdateNoInputConfigName, GetTestConfig(noblockUpdateNoInputConfigName))) LogtailConfigLock.RLock() s.NotNil(LogtailConfig[updateConfigName]) s.NotNil(LogtailConfig[noblockUpdateConfigName]) + s.NotNil(LogtailConfig[noblockUpdateNoInputConfigName]) + LogtailConfigLock.RUnlock() + + time.Sleep(time.Second * time.Duration(10)) + LogtailConfigLock.RLock() + s.Equal(20000, GetConfigFlushers(LogtailConfig[noblockUpdateConfigName].PluginRunner)[0].(*checker.FlusherChecker).GetLogCount()) LogtailConfigLock.RUnlock() // unblock old config - checkFlusher.Block = false - time.Sleep(time.Second * time.Duration(5)) + mockInput.Block = false + s.Equal(0, checkFlusher.GetLogCount()) +} + +func (s *configUpdateTestSuite) TestConfigUpdateTimeout() { + // block config -> block config, unblock config, no input config + // block config + LogtailConfigLock.RLock() + config := LogtailConfig[updateConfigName] + LogtailConfigLock.RUnlock() + s.NotNil(config, "%s logstrore config should exist", updateConfigName) + checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) + s.True(ok) + mockInput, ok := GetConfigInputs(config.PluginRunner)[0].(*mockd.ServiceMock) + s.True(ok) + s.Equal(0, checkFlusher.GetLogCount(), "the block flusher checker doesn't have any logs") + + // update same hang config + s.NoError(Stop(updateConfigName, false)) + s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs") + + // unblock old config first to mock timeout instead of block + mockInput.Block = false s.Equal(0, checkFlusher.GetLogCount()) + + _ = LoadAndStartMockConfig(updateConfigName, updateConfigName, updateConfigName, GetTestConfig(updateConfigName)) + // Since independently load config, reload block config will be allowed + s.NoError(LoadAndStartMockConfig(noblockUpdateConfigName, noblockUpdateConfigName, noblockUpdateConfigName, GetTestConfig(noblockUpdateConfigName))) + s.NoError(LoadAndStartMockConfig(noblockUpdateNoInputConfigName, noblockUpdateNoInputConfigName, noblockUpdateNoInputConfigName, GetTestConfig(noblockUpdateNoInputConfigName))) + LogtailConfigLock.RLock() + s.NotNil(LogtailConfig[updateConfigName]) + s.NotNil(LogtailConfig[noblockUpdateConfigName]) + s.NotNil(LogtailConfig[noblockUpdateNoInputConfigName]) + LogtailConfigLock.RUnlock() + + time.Sleep(time.Second * time.Duration(10)) LogtailConfigLock.RLock() s.Equal(20000, GetConfigFlushers(LogtailConfig[noblockUpdateConfigName].PluginRunner)[0].(*checker.FlusherChecker).GetLogCount()) LogtailConfigLock.RUnlock() @@ -93,6 +138,8 @@ func (s *configUpdateTestSuite) TestConfigUpdateMany() { s.NotNil(config, "%s logstrore config should exist", updateConfigName) checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) s.True(ok) + mockInput, ok := GetConfigInputs(config.PluginRunner)[0].(*mockd.ServiceMock) + s.True(ok) s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs") // load block config @@ -101,7 +148,7 @@ func (s *configUpdateTestSuite) TestConfigUpdateMany() { s.Nil(err) s.NotNil(LogtailConfig[updateConfigName]) s.Equal(0, checkFlusher.GetLogCount(), "the hold on block flusher checker doesn't have any logs") - checkFlusher.Block = false + mockInput.Block = false time.Sleep(time.Second * time.Duration(5)) s.Equal(checkFlusher.GetLogCount(), 0) @@ -130,12 +177,14 @@ func (s *configUpdateTestSuite) TestConfigUpdateName() { LogtailConfigLock.RUnlock() s.NotNil(config) checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) + s.True(ok) + mockInput, ok := GetConfigInputs(config.PluginRunner)[0].(*mockd.ServiceMock) + s.True(ok) defer func() { - checkFlusher.Block = false + mockInput.Block = false time.Sleep(time.Second * 5) s.Equal(checkFlusher.GetLogCount(), 20000) }() - s.True(ok) s.Equal(0, checkFlusher.GetLogCount(), "the hold on blocking flusher checker doesn't have any logs") s.NoError(LoadAndStartMockConfig(updateConfigName+"_", updateConfigName+"_", updateConfigName+"_", GetTestConfig(updateConfigName))) @@ -147,7 +196,7 @@ func (s *configUpdateTestSuite) TestConfigUpdateName() { LogtailConfigLock.RUnlock() s.True(ok) s.Equal(checkFlusher.GetLogCount(), 0) - checkFlusher.Block = false + mockInput.Block = false time.Sleep(time.Second * 5) s.Equal(checkFlusher.GetLogCount(), 20000) } @@ -160,7 +209,9 @@ func (s *configUpdateTestSuite) TestStopAllExit() { s.NotNil(config) checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) s.True(ok) - checkFlusher.Block = false + mockInput, ok := GetConfigInputs(config.PluginRunner)[0].(*mockd.ServiceMock) + s.True(ok) + mockInput.Block = false time.Sleep(time.Second * time.Duration(5)) s.NoError(StopAllPipelines(true)) s.NoError(StopAllPipelines(false)) @@ -175,12 +226,14 @@ func (s *configUpdateTestSuite) TestStopAllExitTimeout() { s.NotNil(config) checkFlusher, ok := GetConfigFlushers(config.PluginRunner)[0].(*checker.FlusherChecker) s.True(ok) + mockInput, ok := GetConfigInputs(config.PluginRunner)[0].(*mockd.ServiceMock) + s.True(ok) s.Equal(0, checkFlusher.GetLogCount()) s.NoError(StopAllPipelines(true)) s.NoError(StopAllPipelines(false)) time.Sleep(time.Second) s.Equal(0, checkFlusher.GetLogCount()) - checkFlusher.Block = false + mockInput.Block = false time.Sleep(time.Second * time.Duration(5)) s.Equal(0, checkFlusher.GetLogCount()) } diff --git a/pluginmanager/plugin_manager_test.go b/pluginmanager/plugin_manager_test.go index e6a45f23a0..c71ce35e0f 100644 --- a/pluginmanager/plugin_manager_test.go +++ b/pluginmanager/plugin_manager_test.go @@ -117,6 +117,7 @@ func LoadAndStartMockConfig(args ...string) error { { "type": "service_mock", "detail": { + "Block": true, "LogsPerSecond": 100, "Fields": { "content": "Active connections: 1\nserver accepts handled requests\n 6079 6079 11596\n Reading: 0 Writing: 1 Waiting: 0" diff --git a/pluginmanager/plugin_runner_helper.go b/pluginmanager/plugin_runner_helper.go index b111134c9f..d92c27c0ff 100644 --- a/pluginmanager/plugin_runner_helper.go +++ b/pluginmanager/plugin_runner_helper.go @@ -126,6 +126,20 @@ func GetFlushCancelToken(runner PluginRunner) <-chan struct{} { return make(<-chan struct{}) } +func GetConfigInputs(runner PluginRunner) []pipeline.ServiceInput { + inputs := make([]pipeline.ServiceInput, 0) + if r, ok := runner.(*pluginv1Runner); ok { + for _, i := range r.ServicePlugins { + inputs = append(inputs, i.Input) + } + } else if r, ok := runner.(*pluginv2Runner); ok { + for _, i := range r.ServicePlugins { + inputs = append(inputs, i.Input) + } + } + return inputs +} + func GetConfigFlushers(runner PluginRunner) []pipeline.Flusher { flushers := make([]pipeline.Flusher, 0) if r, ok := runner.(*pluginv1Runner); ok { diff --git a/pluginmanager/test_config/update_mock_block.json b/pluginmanager/test_config/update_mock_block.json index c3c699d24d..ae4ddbbfa6 100644 --- a/pluginmanager/test_config/update_mock_block.json +++ b/pluginmanager/test_config/update_mock_block.json @@ -6,39 +6,41 @@ "DefaultLogQueueSize": 2, "DefaultLogGroupQueueSize": 1 }, - "inputs" : [ + "inputs": [ { - "type" : "service_mock", - "detail" : { + "type": "service_mock", + "detail": { + "Block": true, "LogsPerSecond": 10000, "MaxLogCount": 20000, - "Fields" : { - "content" : "time:2017.09.12 20:55:36\tjson:{\"array\" : [1, 2, 3, 4], \"key1\" : \"xx\", \"key2\": false, \"key3\":123.456, \"key4\" : { \"inner1\" : 1, \"inner2\" : {\"xxxx\" : \"yyyy\", \"zzzz\" : \"中文\"}}}\n" + "Fields": { + "content": "time:2017.09.12 20:55:36\tjson:{\"array\" : [1, 2, 3, 4], \"key1\" : \"xx\", \"key2\": false, \"key3\":123.456, \"key4\" : { \"inner1\" : 1, \"inner2\" : {\"xxxx\" : \"yyyy\", \"zzzz\" : \"中文\"}}}\n" } } } ], - "processors" : [ + "processors": [ { - "type" : "processor_anchor", - "detail" : {"SourceKey" : "content", - "NoAnchorError" : true, - "Anchors" : [ + "type": "processor_anchor", + "detail": { + "SourceKey": "content", + "NoAnchorError": true, + "Anchors": [ { - "Start" : "time", - "Stop" : "\t", - "FieldName" : "time", - "FieldType" : "string", - "ExpondJson" : false + "Start": "time", + "Stop": "\t", + "FieldName": "time", + "FieldType": "string", + "ExpondJson": false }, { - "Start" : "json:", - "Stop" : "\n", - "FieldName" : "val", - "FieldType" : "json", - "ExpondJson" : true, - "MaxExpondDepth" : 2, - "ExpondConnecter" : "#" + "Start": "json:", + "Stop": "\n", + "FieldName": "val", + "FieldType": "json", + "ExpondJson": true, + "MaxExpondDepth": 2, + "ExpondConnecter": "#" } ] } @@ -53,12 +55,10 @@ } } ], - "flushers" : [ + "flushers": [ { - "type" : "flusher_checker", - "detail" : { - "Block" : true - } + "type": "flusher_checker", + "detail": {} } ] } \ No newline at end of file diff --git a/pluginmanager/test_config/update_mock_noblock.json b/pluginmanager/test_config/update_mock_noblock.json index c56b300e2c..907b64f282 100644 --- a/pluginmanager/test_config/update_mock_noblock.json +++ b/pluginmanager/test_config/update_mock_noblock.json @@ -10,6 +10,7 @@ { "type": "service_mock", "detail": { + "Block": false, "LogsPerSecond": 10000, "MaxLogCount": 20000, "Fields": { @@ -57,9 +58,7 @@ "flushers": [ { "type": "flusher_checker", - "detail": { - "Block": false - } + "detail": {} } ] } \ No newline at end of file diff --git a/pluginmanager/test_config/update_mock_noblock_no_input.json b/pluginmanager/test_config/update_mock_noblock_no_input.json new file mode 100644 index 0000000000..bc99b3a412 --- /dev/null +++ b/pluginmanager/test_config/update_mock_noblock_no_input.json @@ -0,0 +1,52 @@ +{ + "global": { + "InputIntervalMs": 10000, + "AggregatIntervalMs": 300, + "FlushIntervalMs": 300, + "DefaultLogQueueSize": 2, + "DefaultLogGroupQueueSize": 3 + }, + "inputs": [], + "processors": [ + { + "type": "processor_anchor", + "detail": { + "SourceKey": "content", + "NoAnchorError": true, + "Anchors": [ + { + "Start": "time", + "Stop": "\t", + "FieldName": "time", + "FieldType": "string", + "ExpondJson": false + }, + { + "Start": "json:", + "Stop": "\n", + "FieldName": "val", + "FieldType": "json", + "ExpondJson": true, + "MaxExpondDepth": 2, + "ExpondConnecter": "#" + } + ] + } + } + ], + "aggregators": [ + { + "type": "aggregator_base", + "detail": { + "MaxLogGroupCount": 1, + "MaxLogCount": 100 + } + } + ], + "flushers": [ + { + "type": "flusher_checker", + "detail": {} + } + ] +} \ No newline at end of file diff --git a/plugins/flusher/checker/flusher_checker.go b/plugins/flusher/checker/flusher_checker.go index 3b17486db2..b73512750a 100644 --- a/plugins/flusher/checker/flusher_checker.go +++ b/plugins/flusher/checker/flusher_checker.go @@ -29,7 +29,6 @@ type FlusherChecker struct { context pipeline.Context LogGroup protocol.LogGroup Lock sync.RWMutex - Block bool } func (p *FlusherChecker) Init(context pipeline.Context) error { @@ -140,7 +139,7 @@ func (p *FlusherChecker) Flush(projectName string, logstoreName string, configNa // IsReady is ready to flush func (p *FlusherChecker) IsReady(projectName string, logstoreName string, logstoreKey int64) bool { - return !p.Block + return true } // Stop ... diff --git a/plugins/input/mockd/input_mockd.go b/plugins/input/mockd/input_mockd.go index d59a5024e6..31cd82d2f2 100644 --- a/plugins/input/mockd/input_mockd.go +++ b/plugins/input/mockd/input_mockd.go @@ -33,6 +33,7 @@ type ServiceMock struct { Index int64 LogsPerSecond int MaxLogCount int + Block bool nowLogCount int context pipeline.Context } @@ -66,6 +67,18 @@ func (p *ServiceMock) Start(c pipeline.Collector) error { p.waitGroup.Add(1) defer p.waitGroup.Done() for { + for { + if p.Block { + time.Sleep(time.Millisecond * 100) + continue + } + select { + case <-p.shutdown: + return nil + default: + } + break + } beginTime := time.Now() for i := 0; i < p.LogsPerSecond; i++ { p.MockOneLog(c)