diff --git a/core/config/PipelineConfig.cpp b/core/config/PipelineConfig.cpp index a9c43c3872..7ac1ec6e95 100644 --- a/core/config/PipelineConfig.cpp +++ b/core/config/PipelineConfig.cpp @@ -251,7 +251,7 @@ bool PipelineConfig::Parse() { hasFileInput = true; } #ifdef APSARA_UNIT_TEST_MAIN - if (pluginType.find("mock") != string::npos) { + if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) { hasFileInput = true; } #endif diff --git a/core/go_pipeline/LogtailPlugin.cpp b/core/go_pipeline/LogtailPlugin.cpp index 6e41963c82..e8447d8caa 100644 --- a/core/go_pipeline/LogtailPlugin.cpp +++ b/core/go_pipeline/LogtailPlugin.cpp @@ -31,6 +31,9 @@ #include "pipeline/PipelineManager.h" #include "pipeline/queue/SenderQueueManager.h" #include "provider/Provider.h" +#ifdef APSARA_UNIT_TEST_MAIN +#include "unittest/pipeline/LogtailPluginMock.h" +#endif DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false); DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect, @@ -145,6 +148,7 @@ void LogtailPlugin::StopAllPipelines(bool withInputFlag) { } void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) { +#ifndef APSARA_UNIT_TEST_MAIN if (mPluginValid && mStopFun != NULL) { LOG_INFO(sLogger, ("Go pipelines stop", "starts")("config", configName)); auto stopStart = GetCurrentTimeInMilliSeconds(); @@ -159,6 +163,9 @@ void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) { HOLD_ON_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms"); } } +#else + LogtailPluginMock::GetInstance()->Stop(configName, removedFlag); +#endif } void LogtailPlugin::StopBuiltInModules() { @@ -170,6 +177,7 @@ void LogtailPlugin::StopBuiltInModules() { } void LogtailPlugin::Start(const std::string& configName) { +#ifndef APSARA_UNIT_TEST_MAIN if (mPluginValid && mStartFun != NULL) { LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName)); GoString goConfigName; @@ -178,6 +186,9 @@ void LogtailPlugin::Start(const std::string& configName) { mStartFun(goConfigName); LOG_INFO(sLogger, ("Go pipelines start", "succeeded")("config name", configName)); } +#else + LogtailPluginMock::GetInstance()->Start(configName); +#endif } int LogtailPlugin::IsValidToSend(long long logstoreKey) { diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 9d8ff87751..17c1649b54 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -32,9 +32,6 @@ #include "plugin/flusher/sls/FlusherSLS.h" #include "plugin/input/InputFeedbackInterfaceRegistry.h" #include "plugin/processor/ProcessorParseApsaraNative.h" -#ifdef APSARA_UNIT_TEST_MAIN -#include "unittest/pipeline/LogtailPluginMock.h" -#endif DECLARE_FLAG_INT32(default_plugin_log_queue_size); @@ -341,29 +338,18 @@ bool Pipeline::Init(PipelineConfig&& config) { void Pipeline::Start() { // #ifndef APSARA_UNIT_TEST_MAIN // TODO: 应该保证指定时间内返回,如果无法返回,将配置放入startDisabled里 - LOG_WARNING(sLogger, ("debug", "8")); for (const auto& flusher : mFlushers) { flusher->Start(); } - LOG_WARNING(sLogger, ("debug", "9")); if (!mGoPipelineWithoutInput.isNull()) { -#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput()); -#else - LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput()); -#endif } - LOG_WARNING(sLogger, ("debug", "10")); ProcessQueueManager::GetInstance()->EnablePop(mName); if (!mGoPipelineWithInput.isNull()) { -#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput()); -#else - LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput()); -#endif } for (const auto& input : mInputs) { @@ -439,11 +425,7 @@ void Pipeline::Stop(bool isRemoving) { if (!mGoPipelineWithInput.isNull()) { // Go pipeline `Stop` will stop and delete -#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving); -#else - LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving); -#endif } ProcessQueueManager::GetInstance()->DisablePop(mName, isRemoving); @@ -453,11 +435,7 @@ void Pipeline::Stop(bool isRemoving) { if (!mGoPipelineWithoutInput.isNull()) { // Go pipeline `Stop` will stop and delete -#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving); -#else - LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving); -#endif } for (const auto& flusher : mFlushers) { diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 440c1dcd88..619d455a9d 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -486,9 +486,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline // CompressType if (BOOL_FLAG(sls_client_send_compress)) { -#ifndef APSARA_UNIT_TEST_MAIN mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, mPluginID, CompressType::LZ4); -#endif } mGroupSerializer = make_unique(this); @@ -816,8 +814,8 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) // the possibility of hash key conflict is very low, so data is // dropped here. cpt->Commit(); - failDetail << ", drop exactly once log group and commit checkpoint" - << " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString(); + failDetail << ", drop exactly once log group and commit checkpoint" << " checkpointKey:" << cpt->key + << " checkpoint:" << cpt->data.DebugString(); suggestion << "no suggestion"; AlarmManager::GetInstance()->SendAlarm( EXACTLY_ONCE_ALARM, diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 3289df110b..23ca1f1067 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -26,13 +26,10 @@ #include "pipeline/queue/SenderQueueItem.h" #include "pipeline/queue/SenderQueueManager.h" #include "plugin/flusher/sls/DiskBufferWriter.h" -#include "runner/sink/http/HttpSink.h" -#ifdef APSARA_UNIT_TEST_MAIN -#include "unittest/pipeline/HttpSinkMock.h" -#endif // TODO: temporarily used here #include "plugin/flusher/sls/PackIdManager.h" #include "plugin/flusher/sls/SLSClientManager.h" +#include "unittest/pipeline/HttpSinkMock.h" DEFINE_FLAG_INT32(flusher_runner_exit_timeout_secs, "", 60); DEFINE_FLAG_INT32(check_send_client_timeout_interval, "", 600); @@ -143,17 +140,16 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) { } req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now(); -#ifndef APSARA_UNIT_TEST_MAIN - HttpSink::GetInstance()->AddRequest(std::move(req)); - ++mHttpSendingCnt; LOG_DEBUG(sLogger, ("send item to http sink, item address", item)("config-flusher-dst", QueueKeyManager::GetInstance()->GetName(item->mQueueKey))( - "sending cnt", ToString(mHttpSendingCnt.load()))); + "sending cnt", ToString(mHttpSendingCnt.load() + 1))); +#ifndef APSARA_UNIT_TEST_MAIN + HttpSink::GetInstance()->AddRequest(std::move(req)); #else - HttpSinkMock::GetInstance()->AddRequest(std::move(req)); // release item here - ++mHttpSendingCnt; + HttpSinkMock::GetInstance()->AddRequest(std::move(req)); #endif + ++mHttpSendingCnt; } void FlusherRunner::Run() { @@ -204,7 +200,9 @@ void FlusherRunner::Run() { PackIdManager::GetInstance()->CleanTimeoutEntry(); mLastCheckSendClientTime = time(NULL); } - + LOG_WARNING(sLogger, + ("flusher runner", "exit")("is_flush", mIsFlush)( + "all queue empty", SenderQueueManager::GetInstance()->IsAllQueueEmpty())); if (mIsFlush && SenderQueueManager::GetInstance()->IsAllQueueEmpty()) { break; } diff --git a/core/runner/FlusherRunner.h b/core/runner/FlusherRunner.h index e23856aed9..3390a021b6 100644 --- a/core/runner/FlusherRunner.h +++ b/core/runner/FlusherRunner.h @@ -24,6 +24,7 @@ #include "pipeline/plugin/interface/Flusher.h" #include "pipeline/queue/SenderQueueItem.h" #include "runner/sink/SinkType.h" +#include "runner/sink/http/HttpSink.h" namespace logtail { @@ -83,6 +84,7 @@ class FlusherRunner { friend class PluginRegistryUnittest; friend class FlusherRunnerUnittest; friend class InstanceConfigManagerUnittest; + friend class PipelineUpdateUnittest; #endif }; diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index 1c7ecdd65f..8b32e42b2d 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -24,7 +24,6 @@ #include "pipeline/PipelineManager.h" #include "queue/ProcessQueueManager.h" #include "queue/QueueKeyManager.h" -#include "unittest/pipeline/LogtailPluginMock.h" DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1); DEFINE_FLAG_INT32(processor_runner_exit_timeout_secs, "", 60); @@ -179,19 +178,11 @@ void ProcessorRunner::Run(uint32_t threadNo) { pipeline->GetContext().GetRegion()); continue; } -#ifndef APSARA_UNIT_TEST_MAIN LogtailPlugin::GetInstance()->ProcessLogGroup( pipeline->GetContext().GetConfigName(), res, group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string()); } -#else - LogtailPluginMock::GetInstance()->ProcessLogGroup( - pipeline->GetContext().GetConfigName(), - res, - group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string()); - } -#endif } } else { pipeline->Send(std::move(eventGroupList)); diff --git a/core/runner/sink/Sink.h b/core/runner/sink/Sink.h index b8df22a28d..59992dc6c4 100644 --- a/core/runner/sink/Sink.h +++ b/core/runner/sink/Sink.h @@ -27,7 +27,7 @@ class Sink { public: virtual bool Init() = 0; virtual void Stop() = 0; - + bool AddRequest(std::unique_ptr&& request) { mQueue.Push(std::move(request)); return true; diff --git a/core/unittest/pipeline/HttpSinkMock.h b/core/unittest/pipeline/HttpSinkMock.h index cfc5bf92fd..0bbd0b967a 100644 --- a/core/unittest/pipeline/HttpSinkMock.h +++ b/core/unittest/pipeline/HttpSinkMock.h @@ -34,7 +34,12 @@ class HttpSinkMock : public HttpSink { return &instance; } + bool Init() override { return true; } + bool AddRequest(std::unique_ptr&& request) { + if (useRealHttpSink) { + return HttpSink::GetInstance()->AddRequest(std::move(request)); + } { std::lock_guard lock(mMutex); mRequests.push_back(request->mBody); @@ -58,10 +63,14 @@ class HttpSinkMock : public HttpSink { mRequests.clear(); } + void SetUseRealHttpSink(bool useReal) { useRealHttpSink = useReal; } + private: HttpSinkMock() = default; ~HttpSinkMock() = default; + bool useRealHttpSink = false; + std::atomic_bool mIsFlush = false; mutable std::mutex mMutex; std::vector mRequests; diff --git a/core/unittest/pipeline/LogtailPluginMock.h b/core/unittest/pipeline/LogtailPluginMock.h index 35c8130070..ebff377f8c 100644 --- a/core/unittest/pipeline/LogtailPluginMock.h +++ b/core/unittest/pipeline/LogtailPluginMock.h @@ -17,6 +17,9 @@ #pragma once #include "go_pipeline/LogtailPlugin.h" +#ifdef APSARA_UNIT_TEST_MAIN +#include "unittest/pipeline/LogtailPluginMock.h" +#endif namespace logtail { class LogtailPluginMock : public LogtailPlugin { @@ -53,6 +56,7 @@ class LogtailPluginMock : public LogtailPlugin { void ProcessLogGroup(const std::string& configName, const std::string& logGroup, const std::string& packId) { +#ifndef APSARA_UNIT_TEST_MAIN while (processBlockFlag) { LOG_DEBUG(sLogger, ("LogtailPluginMock process log group", "block")("config", configName)); std::this_thread::sleep_for(std::chrono::milliseconds(100)); @@ -69,6 +73,9 @@ class LogtailPluginMock : public LogtailPlugin { LOG_INFO(sLogger, ("LogtailPluginMock process log group", "success")("config", configName)("logGroup", logGroup)("packId", packId)); +#else + LogtailPluginMock::GetInstance()->ProcessLogGroup(configName, logGroup, packId); +#endif } bool IsStarted() const { return startFlag; } diff --git a/core/unittest/pipeline/PipelineUpdateUnittest.cpp b/core/unittest/pipeline/PipelineUpdateUnittest.cpp index 831d50ef83..68ca8b68ad 100644 --- a/core/unittest/pipeline/PipelineUpdateUnittest.cpp +++ b/core/unittest/pipeline/PipelineUpdateUnittest.cpp @@ -77,6 +77,7 @@ class PipelineUpdateUnittest : public testing::Test { builtinPipelineCnt = EnterpriseConfigProvider::GetInstance()->GetAllBuiltInPipelineConfigs().size(); #endif SenderQueueManager::GetInstance()->mDefaultQueueParam.mCapacity = 1; // test extra buffer + FLAGS_sls_client_send_compress = false; } static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); } @@ -168,7 +169,6 @@ class PipelineUpdateUnittest : public testing::Test { void VerifyData(size_t expectedDataCount) const { for (size_t retry = 0; retry < 8; ++retry) { - this_thread::sleep_for(chrono::milliseconds(1000)); auto requests = HttpSinkMock::GetInstance()->GetRequests(); size_t i = 1; size_t j = 0; @@ -185,6 +185,7 @@ class PipelineUpdateUnittest : public testing::Test { APSARA_TEST_EQUAL_FATAL(expectedDataCount + 1, i); return; } + this_thread::sleep_for(chrono::milliseconds(1000)); } } @@ -197,21 +198,21 @@ class PipelineUpdateUnittest : public testing::Test { })"; string nativeInputConfig = R"( { - "Type": "input_mock", + "Type": "input_file_mock", "FilePaths": [ "/tmp/not_found.log" ] })"; string nativeInputConfig2 = R"( { - "Type": "input_mock", + "Type": "input_file_mock", "FilePaths": [ "/tmp/*.log" ] })"; string nativeInputConfig3 = R"( { - "Type": "input_mock2", + "Type": "input_file_mock2", "FilePaths": [ "/tmp/not_found.log" ] @@ -356,7 +357,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase1() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -471,7 +472,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase4() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -523,7 +524,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase1() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -638,7 +639,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase4() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -690,7 +691,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase1() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -742,7 +743,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase2() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -794,7 +795,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase3() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -1040,7 +1041,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase10() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -1091,7 +1092,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase11() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -1142,7 +1143,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase12() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -1196,11 +1197,11 @@ void PipelineUpdateUnittest::TestPipelineInputBlock() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result1 = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); auto result2 = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result2.wait_for(chrono::milliseconds(0))); input->Unblock(); result1.get(); @@ -1243,7 +1244,7 @@ void PipelineUpdateUnittest::TestPipelineGoInputBlockCase1() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result.wait_for(chrono::milliseconds(0))); LogtailPluginMock::GetInstance()->UnblockStop(); result.get(); @@ -1287,7 +1288,7 @@ void PipelineUpdateUnittest::TestPipelineGoInputBlockCase2() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result.wait_for(chrono::milliseconds(0))); LogtailPluginMock::GetInstance()->UnblockStop(); result.get(); diff --git a/core/unittest/plugin/PluginMock.h b/core/unittest/plugin/PluginMock.h index d5443dad58..4439be217c 100644 --- a/core/unittest/plugin/PluginMock.h +++ b/core/unittest/plugin/PluginMock.h @@ -87,12 +87,19 @@ class InputMock : public Input { const std::string InputMock::sName = "input_mock"; -class InputMock2 : public InputMock { +class InputFileMock : public InputMock { public: static const std::string sName; }; -const std::string InputMock2::sName = "input_mock2"; +const std::string InputFileMock::sName = "input_file_mock"; + +class InputFileMock2 : public InputMock { +public: + static const std::string sName; +}; + +const std::string InputFileMock2::sName = "input_file_mock2"; class ProcessorMock : public Processor { public: @@ -198,7 +205,7 @@ class FlusherSLSMock : public FlusherSLS { std::map header; req = std::make_unique( "POST", false, "test-host", 80, "/test-operation", "", header, data->mData, item); - LOG_WARNING(sLogger, ("build mock request", data->mData)); + LOG_DEBUG(sLogger, ("build mock request", data->mData)); return true; } }; @@ -238,7 +245,8 @@ void LoadPluginMock() { PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator()); PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator()); PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator()); - PluginRegistry::GetInstance()->RegisterInputCreator(new StaticInputCreator()); + PluginRegistry::GetInstance()->RegisterInputCreator(new StaticInputCreator()); + PluginRegistry::GetInstance()->RegisterInputCreator(new StaticInputCreator()); PluginRegistry::GetInstance()->RegisterProcessorCreator(new StaticProcessorCreator()); PluginRegistry::GetInstance()->RegisterFlusherCreator(new StaticFlusherCreator()); } diff --git a/core/unittest/sender/FlusherRunnerUnittest.cpp b/core/unittest/sender/FlusherRunnerUnittest.cpp index 9bb1b6e1fa..e0ba13f0d2 100644 --- a/core/unittest/sender/FlusherRunnerUnittest.cpp +++ b/core/unittest/sender/FlusherRunnerUnittest.cpp @@ -17,6 +17,7 @@ #include "runner/FlusherRunner.h" #include "runner/sink/http/HttpSink.h" #include "unittest/Unittest.h" +#include "unittest/pipeline/HttpSinkMock.h" #include "unittest/plugin/PluginMock.h" DECLARE_FLAG_INT32(discard_send_fail_interval); @@ -30,6 +31,8 @@ class FlusherRunnerUnittest : public ::testing::Test { void TestDispatch(); void TestPushToHttpSink(); + static void SetUpTestCase() { HttpSinkMock::GetInstance()->SetUseRealHttpSink(true); } + protected: void TearDown() override { SenderQueueManager::GetInstance()->Clear(); diff --git a/pluginmanager/plugin_manager_test.go b/pluginmanager/plugin_manager_test.go index c71ce35e0f..e6a45f23a0 100644 --- a/pluginmanager/plugin_manager_test.go +++ b/pluginmanager/plugin_manager_test.go @@ -117,7 +117,6 @@ func LoadAndStartMockConfig(args ...string) error { { "type": "service_mock", "detail": { - "Block": true, "LogsPerSecond": 100, "Fields": { "content": "Active connections: 1\nserver accepts handled requests\n 6079 6079 11596\n Reading: 0 Writing: 1 Waiting: 0"