diff --git a/core/unittest/pipeline/HttpSinkMock.h b/core/unittest/pipeline/HttpSinkMock.h index 99d97ba714..777493b962 100644 --- a/core/unittest/pipeline/HttpSinkMock.h +++ b/core/unittest/pipeline/HttpSinkMock.h @@ -57,7 +57,6 @@ class HttpSinkMock : public HttpSink { void Run() { LOG_INFO(sLogger, ("http sink mock", "started")); while (true) { - LOG_DEBUG(sLogger, ("http sink mock", "running")("mIsFlush", mIsFlush.load())("mQueueSize", mQueue.Size())); std::unique_ptr request; if (mQueue.WaitAndPop(request, 500)) { { @@ -75,7 +74,6 @@ class HttpSinkMock : public HttpSink { static_cast(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); request.reset(); - LOG_DEBUG(sLogger, ("http sink mock", "pop one request")); } else if (mIsFlush && mQueue.Empty()) { break; } else { diff --git a/core/unittest/pipeline/PipelineUpdateUnittest.cpp b/core/unittest/pipeline/PipelineUpdateUnittest.cpp index 65e7985003..b9a363ea40 100644 --- a/core/unittest/pipeline/PipelineUpdateUnittest.cpp +++ b/core/unittest/pipeline/PipelineUpdateUnittest.cpp @@ -138,6 +138,7 @@ class PipelineUpdateUnittest : public testing::Test { ProcessQueueManager::GetInstance()->mBoundedQueueParam.mCapacity = 100; FLAGS_sls_client_send_compress = false; AppConfig::GetInstance()->mSendRequestConcurrency = 100; + AppConfig::GetInstance()->mSendRequestGlobalConcurrency = 200; } static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); } @@ -247,11 +248,6 @@ class PipelineUpdateUnittest : public testing::Test { auto requests = HttpSinkMock::GetInstance()->GetRequests(); i = from; j = 0; - for (auto request : requests) { - LOG_DEBUG( - sLogger, - ("request", request.mData)("logstore", static_cast(requests[j].mFlusher)->mLogstore)); - } while ((i < to + 1) && j < requests.size()) { auto content = requests[j].mData; auto actualLogstore = static_cast(requests[j].mFlusher)->mLogstore; @@ -269,7 +265,7 @@ class PipelineUpdateUnittest : public testing::Test { APSARA_TEST_EQUAL_FATAL(to + 1, i); return; } - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); } } @@ -441,7 +437,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase1() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -568,7 +564,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase4() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -625,7 +621,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase1() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -752,7 +748,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase4() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -809,7 +805,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase1() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -861,7 +857,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase2() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -920,7 +916,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase3() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -1211,7 +1207,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase10() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -1269,7 +1265,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase11() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -1320,7 +1316,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase12() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); LogtailPluginMock::GetInstance()->UnblockProcess(); }); pipelineManager->UpdatePipelines(diffUpdate); @@ -1385,11 +1381,11 @@ void PipelineUpdateUnittest::TestPipelineInputBlock() const { pipelineManager->UpdatePipelines(diffUpdate); BlockProcessor(configName); }); - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result1.wait_for(chrono::milliseconds(0))); input->Unblock(); auto result2 = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); processor->Unblock(); }); result1.get(); @@ -1438,7 +1434,7 @@ void PipelineUpdateUnittest::TestPipelineGoInputBlockCase1() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result.wait_for(chrono::milliseconds(0))); LogtailPluginMock::GetInstance()->UnblockStop(); result.get(); @@ -1489,7 +1485,7 @@ void PipelineUpdateUnittest::TestPipelineGoInputBlockCase2() const { pipelineConfigObjUpdate.Parse(); diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate)); auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); }); - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result.wait_for(chrono::milliseconds(0))); LogtailPluginMock::GetInstance()->UnblockStop(); result.get(); @@ -1681,7 +1677,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase1() const { pipelineConfigObjUpdate3.Parse(); diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); @@ -1759,7 +1755,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase2() const { pipelineConfigObjUpdate3.Parse(); diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); @@ -1837,7 +1833,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase3() const { pipelineConfigObjUpdate3.Parse(); diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); @@ -1912,7 +1908,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase4() const { pipelineConfigObjUpdate3.Parse(); diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); @@ -1988,7 +1984,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase5() const { pipelineConfigObjUpdate3.Parse(); diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); @@ -2060,7 +2056,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase6() const { pipelineConfigObjUpdate3.Parse(); diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); @@ -2132,7 +2128,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase7() const { pipelineConfigObjUpdate3.Parse(); diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); @@ -2201,7 +2197,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase8() const { pipelineConfigObjUpdate3.Parse(); diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3)); auto result = async(launch::async, [&]() { - this_thread::sleep_for(chrono::milliseconds(2000)); + this_thread::sleep_for(chrono::milliseconds(1000)); auto processor1 = static_cast(const_cast(pipeline1->mProcessorLine[0].get()->mPlugin.get())); processor1->Unblock(); diff --git a/core/unittest/plugin/PluginMock.h b/core/unittest/plugin/PluginMock.h index 0661a30d80..154d1930be 100644 --- a/core/unittest/plugin/PluginMock.h +++ b/core/unittest/plugin/PluginMock.h @@ -95,8 +95,6 @@ class ProcessorMock : public Processor { void Process(PipelineEventGroup& logGroup) override { while (mBlockFlag) { std::this_thread::sleep_for(std::chrono::milliseconds(100)); - LOG_DEBUG(sLogger, - ("processor mock", "block")("cnt", mCnt)("data", logGroup.GetEvents()[0]->ToJsonString())); } ++mCnt; };