From 891c75c2dd37005680a8f6d6a31f76e664654b1b Mon Sep 17 00:00:00 2001 From: abingcbc Date: Mon, 30 Dec 2024 11:36:32 +0800 Subject: [PATCH] fix --- core/runner/sink/http/HttpSink.cpp | 13 ++++ core/runner/sink/http/HttpSink.h | 7 +- core/unittest/pipeline/HttpSinkMock.h | 10 +-- .../pipeline/PipelineUpdateUnittest.cpp | 72 +++++++++---------- 4 files changed, 52 insertions(+), 50 deletions(-) diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 81eda613f4..1097bf699b 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -34,6 +34,15 @@ using namespace std; namespace logtail { +HttpSink* HttpSink::GetInstance() { +#ifndef APSARA_UNIT_TEST_MAIN + static HttpSink instance; + return &instance; +#else + return HttpSinkMock::GetInstance(); +#endif +} + bool HttpSink::Init() { #ifndef APSARA_UNIT_TEST_MAIN mClient = curl_multi_init(); @@ -68,6 +77,7 @@ bool HttpSink::Init() { } void HttpSink::Stop() { +#ifdef APSARA_UNIT_TEST_MAIN mIsFlush = true; if (!mThreadRes.valid()) { return; @@ -78,6 +88,9 @@ void HttpSink::Stop() { } else { LOG_WARNING(sLogger, ("http sink", "forced to stopped")); } +#else + HttpSinkMock::GetInstance()->Stop(); +#endif } void HttpSink::Run() { diff --git a/core/runner/sink/http/HttpSink.h b/core/runner/sink/http/HttpSink.h index e752f9c25a..4285692ce7 100644 --- a/core/runner/sink/http/HttpSink.h +++ b/core/runner/sink/http/HttpSink.h @@ -26,7 +26,6 @@ #include "monitor/MetricManager.h" #include "runner/sink/Sink.h" #include "runner/sink/http/HttpSinkRequest.h" - namespace logtail { class HttpSink : public Sink { @@ -34,14 +33,12 @@ class HttpSink : public Sink { HttpSink(const HttpSink&) = delete; HttpSink& operator=(const HttpSink&) = delete; - static HttpSink* GetInstance() { - static HttpSink instance; - return &instance; - } + static HttpSink* GetInstance(); bool Init() override; void Stop() override; + // rewrite for unittest bool AddRequest(std::unique_ptr&& request); private: diff --git a/core/unittest/pipeline/HttpSinkMock.h b/core/unittest/pipeline/HttpSinkMock.h index 777493b962..991bbdb720 100644 --- a/core/unittest/pipeline/HttpSinkMock.h +++ b/core/unittest/pipeline/HttpSinkMock.h @@ -52,6 +52,7 @@ class HttpSinkMock : public HttpSink { } else { LOG_WARNING(sLogger, ("http sink mock", "forced to stopped")); } + ClearRequests(); } void Run() { @@ -61,16 +62,10 @@ class HttpSinkMock : public HttpSink { if (mQueue.WaitAndPop(request, 500)) { { std::lock_guard lock(mMutex); - std::string logstore = "default"; - if (static_cast(request->mItem->mFlusher)->Name().find("sls") != std::string::npos) { - auto flusher = static_cast(request->mItem->mFlusher); - logstore = flusher->mLogstore; - } mRequests.push_back(*(request->mItem)); } request->mResponse.SetStatusCode(200); request->mResponse.mHeader[sdk::X_LOG_REQUEST_ID] = "request_id"; - static_cast(request->mItem)->mExactlyOnceCheckpoint = nullptr; static_cast(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); request.reset(); @@ -83,8 +78,7 @@ class HttpSinkMock : public HttpSink { } bool AddRequest(std::unique_ptr&& request) { - mQueue.Push(std::move(request)); - return true; + return Sink::AddRequest(std::move(request)); } std::vector& GetRequests() { diff --git a/core/unittest/pipeline/PipelineUpdateUnittest.cpp b/core/unittest/pipeline/PipelineUpdateUnittest.cpp index b9a363ea40..6e5a4be888 100644 --- a/core/unittest/pipeline/PipelineUpdateUnittest.cpp +++ b/core/unittest/pipeline/PipelineUpdateUnittest.cpp @@ -161,8 +161,7 @@ class PipelineUpdateUnittest : public testing::Test { } ProcessorRunner::GetInstance()->Stop(); FlusherRunner::GetInstance()->Stop(); - HttpSinkMock::GetInstance()->ClearRequests(); - HttpSinkMock::GetInstance()->Stop(); + HttpSink::GetInstance()->Stop(); } private: @@ -213,9 +212,8 @@ class PipelineUpdateUnittest : public testing::Test { void AddDataToSenderQueue(const string& configName, string&& data, Flusher* flusher) const { auto key = flusher->mQueueKey; - auto cpt = make_shared(); std::unique_ptr item = std::make_unique( - std::move(data), data.size(), flusher, key, "", RawDataType::EVENT_GROUP, "", std::move(cpt), false); + std::move(data), data.size(), flusher, key, "", RawDataType::EVENT_GROUP); { auto manager = SenderQueueManager::GetInstance(); manager->CreateQueue(key, "", PipelineContext{}); @@ -448,7 +446,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase1() const { AddDataToProcessor(configName, "test-data-9"); AddDataToProcessor(configName, "test-data-10"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); VerifyData("test_logstore_2", 5, 10); @@ -520,7 +518,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase3() const { AddDataToSenderQueue(configName, "test-data-5", flusher); AddDataToSenderQueue(configName, "test-data-6", flusher); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 3); VerifyData("test_logstore_2", 4, 6); @@ -576,7 +574,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase4() const { AddDataToProcessor(configName, "test-data-9"); AddDataToProcessor(configName, "test-data-10"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); VerifyData("test_logstore_2", 5, 10); @@ -632,7 +630,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase1() const { AddDataToProcessor(configName, "test-data-9"); AddDataToProcessor(configName, "test-data-10"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); VerifyData("test_logstore_3", 5, 10); @@ -704,7 +702,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase3() const { AddDataToSenderQueue(configName, "test-data-5", flusher); AddDataToSenderQueue(configName, "test-data-6", flusher); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 3); VerifyData("test_logstore_3", 4, 6); @@ -760,7 +758,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase4() const { AddDataToProcessor(configName, "test-data-9"); AddDataToProcessor(configName, "test-data-10"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); VerifyData("test_logstore_3", 5, 10); @@ -813,7 +811,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase1() const { result.get(); APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); } @@ -871,7 +869,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase2() const { AddDataToSenderQueue(configName, "test-data-9", flusher); AddDataToSenderQueue(configName, "test-data-10", flusher); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); VerifyData("test_logstore_3", 8, 10); @@ -928,7 +926,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase3() const { AddDataToProcessor(configName, "test-data-9"); AddDataToProcessor(configName, "test-data-10"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); VerifyData("test_logstore_3", 5, 10); @@ -966,7 +964,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase4() const { AddDataToProcessor(configName, "test-data-3"); UnBlockProcessor(configName); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_3", 1, 3); } @@ -1003,7 +1001,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase5() const { AddDataToSenderQueue(configName, "test-data-2", flusher); AddDataToSenderQueue(configName, "test-data-3", flusher); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_3", 1, 3); } @@ -1038,7 +1036,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase6() const { AddDataToProcessor(configName, "test-data-2"); AddDataToProcessor(configName, "test-data-3"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_3", 1, 3); } @@ -1082,7 +1080,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase7() const { AddDataToProcessor(configName, "test-data-6"); UnBlockProcessor(configName); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 3); VerifyData("test_logstore_3", 4, 6); @@ -1121,7 +1119,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase8() const { APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 3); } @@ -1163,7 +1161,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase9() const { AddDataToProcessor(configName, "test-data-5"); AddDataToProcessor(configName, "test-data-6"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 3); VerifyData("test_logstore_3", 4, 6); @@ -1221,7 +1219,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase10() const { AddDataToProcessor(configName, "test-data-10"); UnBlockProcessor(configName); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); VerifyData("test_logstore_3", 5, 10); @@ -1273,7 +1271,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase11() const { APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); APSARA_TEST_EQUAL_FATAL(true, LogtailPluginMock::GetInstance()->IsStarted()); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); } @@ -1331,7 +1329,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase12() const { AddDataToSenderQueue(configName, "test-data-9", flusher); AddDataToSenderQueue(configName, "test-data-10", flusher); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); VerifyData("test_logstore_3", 8, 10); @@ -1397,7 +1395,7 @@ void PipelineUpdateUnittest::TestPipelineInputBlock() const { AddDataToProcessor(configName, "test-data-10"); UnBlockProcessor(configName); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 4); VerifyData("test_logstore_2", 5, 10); @@ -1448,7 +1446,7 @@ void PipelineUpdateUnittest::TestPipelineGoInputBlockCase1() const { AddDataToSenderQueue(configName, "test-data-5", flusher); AddDataToSenderQueue(configName, "test-data-6", flusher); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 3); VerifyData("test_logstore_3", 4, 6); @@ -1493,7 +1491,7 @@ void PipelineUpdateUnittest::TestPipelineGoInputBlockCase2() const { APSARA_TEST_EQUAL_FATAL(1U + builtinPipelineCnt, pipelineManager->GetAllPipelines().size()); APSARA_TEST_EQUAL_FATAL(false, LogtailPluginMock::GetInstance()->IsStarted()); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); VerifyData("test_logstore_1", 1, 3); } @@ -1533,7 +1531,7 @@ void PipelineUpdateUnittest::TestPipelineIsolationCase1() const { auto input = static_cast(const_cast(pipeline->GetInputs()[0].get()->GetPlugin())); input->Block(); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); { // add data to Go -> Go -> C++ @@ -1594,7 +1592,7 @@ void PipelineUpdateUnittest::TestPipelineIsolationCase2() const { auto input = static_cast(const_cast(pipeline->GetInputs()[0].get()->GetPlugin())); input->Block(); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); { // add data to C++ -> C++ -> C++ @@ -1690,7 +1688,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase1() const { AddDataToProcessQueue(configName, "test-data-12"); AddDataToProcessQueue(configName, "test-data-13"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); @@ -1768,7 +1766,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase2() const { AddDataToProcessQueue(configName, "test-data-9"); AddDataToProcessQueue(configName, "test-data-10"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); @@ -1846,7 +1844,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase3() const { AddDataToProcessQueue(configName, "test-data-9"); AddDataToProcessQueue(configName, "test-data-10"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); @@ -1921,7 +1919,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase4() const { AddDataToProcessQueue(configName, "test-data-6"); AddDataToProcessQueue(configName, "test-data-7"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); @@ -1997,7 +1995,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase5() const { AddDataToProcessQueue(configName, "test-data-9"); AddDataToProcessQueue(configName, "test-data-10"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); @@ -2069,7 +2067,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase6() const { AddDataToProcessQueue(configName, "test-data-6"); AddDataToProcessQueue(configName, "test-data-7"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); @@ -2141,7 +2139,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase7() const { AddDataToProcessQueue(configName, "test-data-6"); AddDataToProcessQueue(configName, "test-data-7"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); @@ -2210,7 +2208,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase8() const { AddDataToProcessQueue(configName, "test-data-3"); AddDataToProcessQueue(configName, "test-data-4"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); auto processor2 = static_cast(const_cast(pipeline2->mProcessorLine[0].get()->mPlugin.get())); @@ -2279,7 +2277,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase9() const { AddDataToProcessQueue(configName, "test-data-8"); AddDataToProcessQueue(configName, "test-data-9"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); UnBlockProcessor(configName); VerifyData("test_logstore_1", 1, 3); @@ -2340,7 +2338,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase10() const { AddDataToProcessQueue(configName, "test-data-5"); AddDataToProcessQueue(configName, "test-data-6"); - HttpSinkMock::GetInstance()->Init(); + HttpSink::GetInstance()->Init(); FlusherRunner::GetInstance()->Init(); UnBlockProcessor(configName); VerifyData("test_logstore_1", 1, 3);