Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 27, 2024
1 parent 5f28414 commit 11b93da
Show file tree
Hide file tree
Showing 3 changed files with 24 additions and 32 deletions.
2 changes: 0 additions & 2 deletions core/unittest/pipeline/HttpSinkMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ class HttpSinkMock : public HttpSink {
void Run() {
LOG_INFO(sLogger, ("http sink mock", "started"));
while (true) {
LOG_DEBUG(sLogger, ("http sink mock", "running")("mIsFlush", mIsFlush.load())("mQueueSize", mQueue.Size()));
std::unique_ptr<HttpSinkRequest> request;
if (mQueue.WaitAndPop(request, 500)) {
{
Expand All @@ -75,7 +74,6 @@ class HttpSinkMock : public HttpSink {
static_cast<HttpFlusher*>(request->mItem->mFlusher)->OnSendDone(request->mResponse, request->mItem);
FlusherRunner::GetInstance()->DecreaseHttpSendingCnt();
request.reset();
LOG_DEBUG(sLogger, ("http sink mock", "pop one request"));
} else if (mIsFlush && mQueue.Empty()) {
break;
} else {
Expand Down
52 changes: 24 additions & 28 deletions core/unittest/pipeline/PipelineUpdateUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ class PipelineUpdateUnittest : public testing::Test {
ProcessQueueManager::GetInstance()->mBoundedQueueParam.mCapacity = 100;
FLAGS_sls_client_send_compress = false;
AppConfig::GetInstance()->mSendRequestConcurrency = 100;
AppConfig::GetInstance()->mSendRequestGlobalConcurrency = 200;
}

static void TearDownTestCase() { PluginRegistry::GetInstance()->UnloadPlugins(); }
Expand Down Expand Up @@ -247,11 +248,6 @@ class PipelineUpdateUnittest : public testing::Test {
auto requests = HttpSinkMock::GetInstance()->GetRequests();
i = from;
j = 0;
for (auto request : requests) {
LOG_DEBUG(
sLogger,
("request", request.mData)("logstore", static_cast<FlusherSLS*>(requests[j].mFlusher)->mLogstore));
}
while ((i < to + 1) && j < requests.size()) {
auto content = requests[j].mData;
auto actualLogstore = static_cast<FlusherSLS*>(requests[j].mFlusher)->mLogstore;
Expand All @@ -269,7 +265,7 @@ class PipelineUpdateUnittest : public testing::Test {
APSARA_TEST_EQUAL_FATAL(to + 1, i);
return;
}
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
}
}

Expand Down Expand Up @@ -441,7 +437,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase1() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
processor->Unblock();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -568,7 +564,7 @@ void PipelineUpdateUnittest::TestPipelineParamUpdateCase4() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
LogtailPluginMock::GetInstance()->UnblockProcess();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -625,7 +621,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase1() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
processor->Unblock();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -752,7 +748,7 @@ void PipelineUpdateUnittest::TestPipelineTypeUpdateCase4() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
LogtailPluginMock::GetInstance()->UnblockProcess();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -809,7 +805,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase1() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
processor->Unblock();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -861,7 +857,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase2() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
processor->Unblock();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -920,7 +916,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase3() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
processor->Unblock();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -1211,7 +1207,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase10() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
LogtailPluginMock::GetInstance()->UnblockProcess();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -1269,7 +1265,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase11() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
LogtailPluginMock::GetInstance()->UnblockProcess();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -1320,7 +1316,7 @@ void PipelineUpdateUnittest::TestPipelineTopoUpdateCase12() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
LogtailPluginMock::GetInstance()->UnblockProcess();
});
pipelineManager->UpdatePipelines(diffUpdate);
Expand Down Expand Up @@ -1385,11 +1381,11 @@ void PipelineUpdateUnittest::TestPipelineInputBlock() const {
pipelineManager->UpdatePipelines(diffUpdate);
BlockProcessor(configName);
});
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result1.wait_for(chrono::milliseconds(0)));
input->Unblock();
auto result2 = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
processor->Unblock();
});
result1.get();
Expand Down Expand Up @@ -1438,7 +1434,7 @@ void PipelineUpdateUnittest::TestPipelineGoInputBlockCase1() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); });
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result.wait_for(chrono::milliseconds(0)));
LogtailPluginMock::GetInstance()->UnblockStop();
result.get();
Expand Down Expand Up @@ -1489,7 +1485,7 @@ void PipelineUpdateUnittest::TestPipelineGoInputBlockCase2() const {
pipelineConfigObjUpdate.Parse();
diffUpdate.mModified.push_back(std::move(pipelineConfigObjUpdate));
auto result = async(launch::async, [&]() { pipelineManager->UpdatePipelines(diffUpdate); });
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
APSARA_TEST_NOT_EQUAL_FATAL(future_status::ready, result.wait_for(chrono::milliseconds(0)));
LogtailPluginMock::GetInstance()->UnblockStop();
result.get();
Expand Down Expand Up @@ -1681,7 +1677,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase1() const {
pipelineConfigObjUpdate3.Parse();
diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
auto processor1
= static_cast<ProcessorMock*>(const_cast<Processor*>(pipeline1->mProcessorLine[0].get()->mPlugin.get()));
processor1->Unblock();
Expand Down Expand Up @@ -1759,7 +1755,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase2() const {
pipelineConfigObjUpdate3.Parse();
diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
auto processor1
= static_cast<ProcessorMock*>(const_cast<Processor*>(pipeline1->mProcessorLine[0].get()->mPlugin.get()));
processor1->Unblock();
Expand Down Expand Up @@ -1837,7 +1833,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase3() const {
pipelineConfigObjUpdate3.Parse();
diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
auto processor1
= static_cast<ProcessorMock*>(const_cast<Processor*>(pipeline1->mProcessorLine[0].get()->mPlugin.get()));
processor1->Unblock();
Expand Down Expand Up @@ -1912,7 +1908,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase4() const {
pipelineConfigObjUpdate3.Parse();
diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
auto processor1
= static_cast<ProcessorMock*>(const_cast<Processor*>(pipeline1->mProcessorLine[0].get()->mPlugin.get()));
processor1->Unblock();
Expand Down Expand Up @@ -1988,7 +1984,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase5() const {
pipelineConfigObjUpdate3.Parse();
diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
auto processor1
= static_cast<ProcessorMock*>(const_cast<Processor*>(pipeline1->mProcessorLine[0].get()->mPlugin.get()));
processor1->Unblock();
Expand Down Expand Up @@ -2060,7 +2056,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase6() const {
pipelineConfigObjUpdate3.Parse();
diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
auto processor1
= static_cast<ProcessorMock*>(const_cast<Processor*>(pipeline1->mProcessorLine[0].get()->mPlugin.get()));
processor1->Unblock();
Expand Down Expand Up @@ -2132,7 +2128,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase7() const {
pipelineConfigObjUpdate3.Parse();
diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
auto processor1
= static_cast<ProcessorMock*>(const_cast<Processor*>(pipeline1->mProcessorLine[0].get()->mPlugin.get()));
processor1->Unblock();
Expand Down Expand Up @@ -2201,7 +2197,7 @@ void PipelineUpdateUnittest::TestPipelineUpdateManyCase8() const {
pipelineConfigObjUpdate3.Parse();
diffUpdate3.mModified.push_back(std::move(pipelineConfigObjUpdate3));
auto result = async(launch::async, [&]() {
this_thread::sleep_for(chrono::milliseconds(2000));
this_thread::sleep_for(chrono::milliseconds(1000));
auto processor1
= static_cast<ProcessorMock*>(const_cast<Processor*>(pipeline1->mProcessorLine[0].get()->mPlugin.get()));
processor1->Unblock();
Expand Down
2 changes: 0 additions & 2 deletions core/unittest/plugin/PluginMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,8 +95,6 @@ class ProcessorMock : public Processor {
void Process(PipelineEventGroup& logGroup) override {
while (mBlockFlag) {
std::this_thread::sleep_for(std::chrono::milliseconds(100));
LOG_DEBUG(sLogger,
("processor mock", "block")("cnt", mCnt)("data", logGroup.GetEvents()[0]->ToJsonString()));
}
++mCnt;
};
Expand Down

0 comments on commit 11b93da

Please sign in to comment.