Skip to content

Commit

Permalink
test: pipeline update unittest (#1991)
Browse files Browse the repository at this point in the history
* test: pipeline update unittest

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix
  • Loading branch information
Abingcbc authored Dec 30, 2024
1 parent e8b9318 commit 9800d9e
Show file tree
Hide file tree
Showing 35 changed files with 2,785 additions and 99 deletions.
1 change: 1 addition & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
6 changes: 5 additions & 1 deletion core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HttpResponse {
HttpResponse()
: mHeader(compareHeader),
mBody(new std::string(), [](void* p) { delete static_cast<std::string*>(p); }),
mWriteCallback(DefaultWriteCallback) {};
mWriteCallback(DefaultWriteCallback){};
HttpResponse(void* body,
const std::function<void(void*)>& bodyDeleter,
size_t (*callback)(char*, size_t, size_t, void*))
Expand Down Expand Up @@ -155,6 +155,10 @@ class HttpResponse {
std::map<std::string, std::string, decltype(compareHeader)*> mHeader;
std::unique_ptr<void, std::function<void(void*)>> mBody;
size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class HttpSinkMock;
#endif
};

} // namespace logtail
11 changes: 10 additions & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,17 @@ bool PipelineConfig::Parse() {
}
}
mInputs.push_back(&plugin);
#ifndef APSARA_UNIT_TEST_MAIN
// TODO: remove these special restrictions
if (pluginType == "input_file" || pluginType == "input_container_stdio") {
hasFileInput = true;
}
#else
// TODO: remove these special restrictions after all C++ inputs support Go processors
if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) {
hasFileInput = true;
}
#endif
}
// TODO: remove these special restrictions
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
Expand Down Expand Up @@ -530,7 +537,9 @@ bool PipelineConfig::Parse() {
}
mRouter.emplace_back(i, itr);
} else {
mRouter.emplace_back(i, nullptr);
if (!IsFlushingThroughGoPipelineExisted()) {
mRouter.emplace_back(i, nullptr);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {

void LogInput::UpdateCriticalMetric(int32_t curTime) {
mLastRunTime->Set(mLastReadEventTime.load());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
mEventProcessCount = 0;
Expand Down Expand Up @@ -529,6 +530,7 @@ Event* LogInput::PopEventQueue() {
#ifdef APSARA_UNIT_TEST_MAIN
void LogInput::CleanEnviroments() {
mIdleFlag = true;
mInteruptFlag = true;
usleep(100 * 1000);
while (true) {
Event* ev = PopEventQueue();
Expand Down
28 changes: 28 additions & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "provider/Provider.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand Down Expand Up @@ -86,6 +89,7 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
const std::string& logstore,
const std::string& region,
logtail::QueueKey logstoreKey) {
#ifndef APSARA_UNIT_TEST_MAIN
if (!mPluginValid) {
LoadPluginBase();
}
Expand All @@ -110,9 +114,14 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
}

return false;
#else
return LogtailPluginMock::GetInstance()->LoadPipeline(
pipelineName, pipeline, project, logstore, region, logstoreKey);
#endif
}

bool LogtailPlugin::UnloadPipeline(const std::string& pipelineName) {
#ifndef APSARA_UNIT_TEST_MAIN
if (!mPluginValid) {
LOG_ERROR(sLogger, ("UnloadPipeline", "plugin not valid"));
return false;
Expand All @@ -128,9 +137,13 @@ bool LogtailPlugin::UnloadPipeline(const std::string& pipelineName) {
}

return false;
#else
return LogtailPluginMock::GetInstance()->UnloadPipeline(pipelineName);
#endif
}

void LogtailPlugin::StopAllPipelines(bool withInputFlag) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStopAllPipelinesFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop all", "starts"));
auto stopAllStart = GetCurrentTimeInMilliSeconds();
Expand All @@ -142,9 +155,13 @@ void LogtailPlugin::StopAllPipelines(bool withInputFlag) {
"Stopping all Go pipelines took " + ToString(stopAllCost) + "ms");
}
}
#else
LogtailPluginMock::GetInstance()->StopAllPipelines(withInputFlag);
#endif
}

void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop", "starts")("config", configName));
auto stopStart = GetCurrentTimeInMilliSeconds();
Expand All @@ -159,6 +176,9 @@ void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
HOLD_ON_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms");
}
}
#else
LogtailPluginMock::GetInstance()->Stop(configName, removedFlag);
#endif
}

void LogtailPlugin::StopBuiltInModules() {
Expand All @@ -170,6 +190,7 @@ void LogtailPlugin::StopBuiltInModules() {
}

void LogtailPlugin::Start(const std::string& configName) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName));
GoString goConfigName;
Expand All @@ -178,6 +199,9 @@ void LogtailPlugin::Start(const std::string& configName) {
mStartFun(goConfigName);
LOG_INFO(sLogger, ("Go pipelines start", "succeeded")("config name", configName));
}
#else
LogtailPluginMock::GetInstance()->Start(configName);
#endif
}

int LogtailPlugin::IsValidToSend(long long logstoreKey) {
Expand Down Expand Up @@ -503,6 +527,7 @@ void LogtailPlugin::ProcessLog(const std::string& configName,
void LogtailPlugin::ProcessLogGroup(const std::string& configName,
const std::string& logGroup,
const std::string& packId) {
#ifndef APSARA_UNIT_TEST_MAIN
if (logGroup.empty() || !(mPluginValid && mProcessLogsFun != NULL)) {
return;
}
Expand All @@ -521,6 +546,9 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
if (rst != (GoInt)0) {
LOG_WARNING(sLogger, ("process loggroup error", configName)("result", rst));
}
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(configName, logGroup, packId);
#endif
}

void LogtailPlugin::GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList,
Expand Down
9 changes: 7 additions & 2 deletions core/monitor/profile_sender/ProfileSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
#ifdef __ENTERPRISE__
#include "EnterpriseProfileSender.h"
#endif
#include "sdk/Exception.h"
#include "plugin/flusher/sls/SLSClientManager.h"
#include "app_config/AppConfig.h"
#include "plugin/flusher/sls/SLSClientManager.h"
#include "sdk/Exception.h"
// TODO: temporarily used
#include "common/compression/CompressorFactory.h"

Expand Down Expand Up @@ -119,12 +119,17 @@ FlusherSLS* ProfileSender::GetFlusher(const string& region) {
}

bool ProfileSender::IsProfileData(const string& region, const string& project, const string& logstore) {
// TODO: temporarily used, profile should work in unit test
#ifndef APSARA_UNIT_TEST_MAIN
if ((logstore == "shennong_log_profile" || logstore == "logtail_alarm" || logstore == "logtail_status_profile"
|| logstore == "logtail_suicide_profile")
&& (project == GetProfileProjectName(region) || region == ""))
return true;
else
return false;
#else
return false;
#endif
}

void ProfileSender::SendToProfileProject(const string& region, sls_logs::LogGroup& logGroup) {
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class Pipeline {
friend class InputProcessSecurityUnittest;
friend class InputNetworkSecurityUnittest;
friend class InputNetworkObserverUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class PipelineManager {
friend class CircularProcessQueueUnittest;
friend class CommonConfigProviderUnittest;
friend class FlusherUnittest;
friend class PipelineUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/instance/ProcessorInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ProcessorInstance : public PluginInstance {
friend class InputFileUnittest;
friend class InputPrometheusUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Flusher : public Plugin {
friend class FlusherInstanceUnittest;
friend class FlusherRunnerUnittest;
friend class FlusherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<Process
friend class ProcessQueueManagerUnittest;
friend class ExactlyOnceQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
10 changes: 3 additions & 7 deletions core/pipeline/queue/ProcessQueueItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@ struct ProcessQueueItem {
ProcessQueueItem(PipelineEventGroup&& group, size_t index) : mEventGroup(std::move(group)), mInputIndex(index) {}

void AddPipelineInProcessCnt(const std::string& configName) {
if (mPipeline) {
mPipeline->AddInProcessCnt();
} else {
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
}
};
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/ProcessQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ProcessQueueManager : public FeedbackInterface {
void Clear();
friend class ProcessQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/SenderQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class SenderQueueManager : public FeedbackInterface {
#ifdef APSARA_UNIT_TEST_MAIN
friend class SenderQueueManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
4 changes: 2 additions & 2 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,8 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
// the possibility of hash key conflict is very low, so data is
// dropped here.
cpt->Commit();
failDetail << ", drop exactly once log group and commit checkpoint"
<< " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString();
failDetail << ", drop exactly once log group and commit checkpoint" << " checkpointKey:" << cpt->key
<< " checkpoint:" << cpt->data.DebugString();
suggestion << "no suggestion";
AlarmManager::GetInstance()->SendAlarm(
EXACTLY_ONCE_ALARM,
Expand Down
9 changes: 4 additions & 5 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "pipeline/queue/SenderQueueItem.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "runner/sink/http/HttpSink.h"
// TODO: temporarily used here
#include "plugin/flusher/sls/PackIdManager.h"
#include "plugin/flusher/sls/SLSClientManager.h"
Expand Down Expand Up @@ -59,6 +58,7 @@ bool FlusherRunner::Init() {

mThreadRes = async(launch::async, &FlusherRunner::Run, this);
mLastCheckSendClientTime = time(nullptr);
mIsFlush = false;

return true;
}
Expand Down Expand Up @@ -139,12 +139,12 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) {
}

req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now();
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
LOG_DEBUG(sLogger,
("send item to http sink, item address", item)("config-flusher-dst",
QueueKeyManager::GetInstance()->GetName(item->mQueueKey))(
"sending cnt", ToString(mHttpSendingCnt.load())));
"sending cnt", ToString(mHttpSendingCnt.load() + 1)));
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
}

void FlusherRunner::Run() {
Expand Down Expand Up @@ -195,7 +195,6 @@ void FlusherRunner::Run() {
PackIdManager::GetInstance()->CleanTimeoutEntry();
mLastCheckSendClientTime = time(NULL);
}

if (mIsFlush && SenderQueueManager::GetInstance()->IsAllQueueEmpty()) {
break;
}
Expand Down
2 changes: 2 additions & 0 deletions core/runner/FlusherRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "pipeline/plugin/interface/Flusher.h"
#include "pipeline/queue/SenderQueueItem.h"
#include "runner/sink/SinkType.h"
#include "runner/sink/http/HttpSink.h"

namespace logtail {

Expand Down Expand Up @@ -83,6 +84,7 @@ class FlusherRunner {
friend class PluginRegistryUnittest;
friend class FlusherRunnerUnittest;
friend class InstanceConfigManagerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
4 changes: 2 additions & 2 deletions core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "monitor/AlarmManager.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "pipeline/PipelineManager.h"
#include "queue/ExactlyOnceQueueManager.h"
#include "queue/ProcessQueueManager.h"
#include "queue/QueueKeyManager.h"

Expand All @@ -49,6 +48,7 @@ void ProcessorRunner::Init() {
for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) {
mThreadRes[threadNo] = async(launch::async, &ProcessorRunner::Run, this, threadNo);
}
mIsFlush = false;
}

void ProcessorRunner::Stop() {
Expand Down Expand Up @@ -142,7 +142,7 @@ void ProcessorRunner::Run(uint32_t threadNo) {
pipeline->Process(eventGroupList, item->mInputIndex);
// if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline
if (hasOldPipeline) {
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName);
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); // update to new pipeline
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
Expand Down
2 changes: 1 addition & 1 deletion core/runner/sink/Sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Sink {
public:
virtual bool Init() = 0;
virtual void Stop() = 0;

bool AddRequest(std::unique_ptr<T>&& request) {
mQueue.Push(std::move(request));
return true;
Expand Down
Loading

0 comments on commit 9800d9e

Please sign in to comment.