Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

test: pipeline update unittest #1991

Merged
merged 16 commits into from
Dec 30, 2024
Merged
1 change: 1 addition & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -533,6 +533,7 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
6 changes: 5 additions & 1 deletion core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HttpResponse {
HttpResponse()
: mHeader(compareHeader),
mBody(new std::string(), [](void* p) { delete static_cast<std::string*>(p); }),
mWriteCallback(DefaultWriteCallback) {};
mWriteCallback(DefaultWriteCallback){};
HttpResponse(void* body,
const std::function<void(void*)>& bodyDeleter,
size_t (*callback)(char*, size_t, size_t, void*))
Expand Down Expand Up @@ -155,6 +155,10 @@ class HttpResponse {
std::map<std::string, std::string, decltype(compareHeader)*> mHeader;
std::unique_ptr<void, std::function<void(void*)>> mBody;
size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class HttpSinkMock;
#endif
};

} // namespace logtail
11 changes: 10 additions & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,17 @@ bool PipelineConfig::Parse() {
}
}
mInputs.push_back(&plugin);
#ifndef APSARA_UNIT_TEST_MAIN
// TODO: remove these special restrictions
if (pluginType == "input_file" || pluginType == "input_container_stdio") {
hasFileInput = true;
}
#else
// TODO: remove these special restrictions after all C++ inputs support Go processors
if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) {
hasFileInput = true;
}
#endif
}
// TODO: remove these special restrictions
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
Expand Down Expand Up @@ -530,7 +537,9 @@ bool PipelineConfig::Parse() {
}
mRouter.emplace_back(i, itr);
} else {
mRouter.emplace_back(i, nullptr);
if (!IsFlushingThroughGoPipelineExisted()) {
mRouter.emplace_back(i, nullptr);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {

void LogInput::UpdateCriticalMetric(int32_t curTime) {
mLastRunTime->Set(mLastReadEventTime.load());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
mEventProcessCount = 0;
Expand Down Expand Up @@ -529,6 +530,7 @@ Event* LogInput::PopEventQueue() {
#ifdef APSARA_UNIT_TEST_MAIN
void LogInput::CleanEnviroments() {
mIdleFlag = true;
mInteruptFlag = true;
usleep(100 * 1000);
while (true) {
Event* ev = PopEventQueue();
Expand Down
28 changes: 28 additions & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "provider/Provider.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand Down Expand Up @@ -86,6 +89,7 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
const std::string& logstore,
const std::string& region,
logtail::QueueKey logstoreKey) {
#ifndef APSARA_UNIT_TEST_MAIN
if (!mPluginValid) {
LoadPluginBase();
}
Expand All @@ -110,9 +114,14 @@ bool LogtailPlugin::LoadPipeline(const std::string& pipelineName,
}

return false;
#else
return LogtailPluginMock::GetInstance()->LoadPipeline(
pipelineName, pipeline, project, logstore, region, logstoreKey);
#endif
}

bool LogtailPlugin::UnloadPipeline(const std::string& pipelineName) {
#ifndef APSARA_UNIT_TEST_MAIN
if (!mPluginValid) {
LOG_ERROR(sLogger, ("UnloadPipeline", "plugin not valid"));
return false;
Expand All @@ -128,9 +137,13 @@ bool LogtailPlugin::UnloadPipeline(const std::string& pipelineName) {
}

return false;
#else
return LogtailPluginMock::GetInstance()->UnloadPipeline(pipelineName);
#endif
}

void LogtailPlugin::StopAllPipelines(bool withInputFlag) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStopAllPipelinesFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop all", "starts"));
auto stopAllStart = GetCurrentTimeInMilliSeconds();
Expand All @@ -142,9 +155,13 @@ void LogtailPlugin::StopAllPipelines(bool withInputFlag) {
"Stopping all Go pipelines took " + ToString(stopAllCost) + "ms");
}
}
#else
LogtailPluginMock::GetInstance()->StopAllPipelines(withInputFlag);
#endif
}

void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop", "starts")("config", configName));
auto stopStart = GetCurrentTimeInMilliSeconds();
Expand All @@ -159,6 +176,9 @@ void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
HOLD_ON_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms");
}
}
#else
LogtailPluginMock::GetInstance()->Stop(configName, removedFlag);
#endif
}

void LogtailPlugin::StopBuiltInModules() {
Expand All @@ -170,6 +190,7 @@ void LogtailPlugin::StopBuiltInModules() {
}

void LogtailPlugin::Start(const std::string& configName) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName));
GoString goConfigName;
Expand All @@ -178,6 +199,9 @@ void LogtailPlugin::Start(const std::string& configName) {
mStartFun(goConfigName);
LOG_INFO(sLogger, ("Go pipelines start", "succeeded")("config name", configName));
}
#else
LogtailPluginMock::GetInstance()->Start(configName);
#endif
}

int LogtailPlugin::IsValidToSend(long long logstoreKey) {
Expand Down Expand Up @@ -503,6 +527,7 @@ void LogtailPlugin::ProcessLog(const std::string& configName,
void LogtailPlugin::ProcessLogGroup(const std::string& configName,
const std::string& logGroup,
const std::string& packId) {
#ifndef APSARA_UNIT_TEST_MAIN
if (logGroup.empty() || !(mPluginValid && mProcessLogsFun != NULL)) {
return;
}
Expand All @@ -521,6 +546,9 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
if (rst != (GoInt)0) {
LOG_WARNING(sLogger, ("process loggroup error", configName)("result", rst));
}
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(configName, logGroup, packId);
#endif
}

void LogtailPlugin::GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList,
Expand Down
9 changes: 7 additions & 2 deletions core/monitor/profile_sender/ProfileSender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,9 @@
#ifdef __ENTERPRISE__
#include "EnterpriseProfileSender.h"
#endif
#include "sdk/Exception.h"
#include "plugin/flusher/sls/SLSClientManager.h"
#include "app_config/AppConfig.h"
#include "plugin/flusher/sls/SLSClientManager.h"
#include "sdk/Exception.h"
// TODO: temporarily used
#include "common/compression/CompressorFactory.h"

Expand Down Expand Up @@ -119,12 +119,17 @@ FlusherSLS* ProfileSender::GetFlusher(const string& region) {
}

bool ProfileSender::IsProfileData(const string& region, const string& project, const string& logstore) {
// TODO: temporarily used, profile should work in unit test
#ifndef APSARA_UNIT_TEST_MAIN
if ((logstore == "shennong_log_profile" || logstore == "logtail_alarm" || logstore == "logtail_status_profile"
|| logstore == "logtail_suicide_profile")
&& (project == GetProfileProjectName(region) || region == ""))
return true;
else
return false;
#else
return false;
#endif
}

void ProfileSender::SendToProfileProject(const string& region, sls_logs::LogGroup& logGroup) {
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class Pipeline {
friend class InputProcessSecurityUnittest;
friend class InputNetworkSecurityUnittest;
friend class InputNetworkObserverUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class PipelineManager {
friend class CircularProcessQueueUnittest;
friend class CommonConfigProviderUnittest;
friend class FlusherUnittest;
friend class PipelineUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/instance/ProcessorInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ class ProcessorInstance : public PluginInstance {
friend class InputFileUnittest;
friend class InputPrometheusUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Flusher : public Plugin {
friend class FlusherInstanceUnittest;
friend class FlusherRunnerUnittest;
friend class FlusherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<Process
friend class ProcessQueueManagerUnittest;
friend class ExactlyOnceQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
10 changes: 3 additions & 7 deletions core/pipeline/queue/ProcessQueueItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@ struct ProcessQueueItem {
ProcessQueueItem(PipelineEventGroup&& group, size_t index) : mEventGroup(std::move(group)), mInputIndex(index) {}

void AddPipelineInProcessCnt(const std::string& configName) {
if (mPipeline) {
mPipeline->AddInProcessCnt();
} else {
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
}
};
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/ProcessQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ProcessQueueManager : public FeedbackInterface {
void Clear();
friend class ProcessQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/SenderQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class SenderQueueManager : public FeedbackInterface {
#ifdef APSARA_UNIT_TEST_MAIN
friend class SenderQueueManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
4 changes: 2 additions & 2 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -810,8 +810,8 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
// the possibility of hash key conflict is very low, so data is
// dropped here.
cpt->Commit();
failDetail << ", drop exactly once log group and commit checkpoint"
<< " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString();
failDetail << ", drop exactly once log group and commit checkpoint" << " checkpointKey:" << cpt->key
<< " checkpoint:" << cpt->data.DebugString();
suggestion << "no suggestion";
AlarmManager::GetInstance()->SendAlarm(
EXACTLY_ONCE_ALARM,
Expand Down
9 changes: 4 additions & 5 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@
#include "pipeline/queue/SenderQueueItem.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "runner/sink/http/HttpSink.h"
// TODO: temporarily used here
#include "plugin/flusher/sls/PackIdManager.h"
#include "plugin/flusher/sls/SLSClientManager.h"
Expand Down Expand Up @@ -59,6 +58,7 @@ bool FlusherRunner::Init() {

mThreadRes = async(launch::async, &FlusherRunner::Run, this);
mLastCheckSendClientTime = time(nullptr);
mIsFlush = false;

return true;
}
Expand Down Expand Up @@ -139,12 +139,12 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) {
}

req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now();
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
LOG_DEBUG(sLogger,
("send item to http sink, item address", item)("config-flusher-dst",
QueueKeyManager::GetInstance()->GetName(item->mQueueKey))(
"sending cnt", ToString(mHttpSendingCnt.load())));
"sending cnt", ToString(mHttpSendingCnt.load() + 1)));
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
}

void FlusherRunner::Run() {
Expand Down Expand Up @@ -195,7 +195,6 @@ void FlusherRunner::Run() {
PackIdManager::GetInstance()->CleanTimeoutEntry();
mLastCheckSendClientTime = time(NULL);
}

if (mIsFlush && SenderQueueManager::GetInstance()->IsAllQueueEmpty()) {
break;
}
Expand Down
2 changes: 2 additions & 0 deletions core/runner/FlusherRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "pipeline/plugin/interface/Flusher.h"
#include "pipeline/queue/SenderQueueItem.h"
#include "runner/sink/SinkType.h"
#include "runner/sink/http/HttpSink.h"

namespace logtail {

Expand Down Expand Up @@ -83,6 +84,7 @@ class FlusherRunner {
friend class PluginRegistryUnittest;
friend class FlusherRunnerUnittest;
friend class InstanceConfigManagerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
4 changes: 2 additions & 2 deletions core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
#include "monitor/AlarmManager.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "pipeline/PipelineManager.h"
#include "queue/ExactlyOnceQueueManager.h"
#include "queue/ProcessQueueManager.h"
#include "queue/QueueKeyManager.h"

Expand All @@ -49,6 +48,7 @@ void ProcessorRunner::Init() {
for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) {
mThreadRes[threadNo] = async(launch::async, &ProcessorRunner::Run, this, threadNo);
}
mIsFlush = false;
}

void ProcessorRunner::Stop() {
Expand Down Expand Up @@ -142,7 +142,7 @@ void ProcessorRunner::Run(uint32_t threadNo) {
pipeline->Process(eventGroupList, item->mInputIndex);
// if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline
if (hasOldPipeline) {
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName);
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); // update to new pipeline
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
Expand Down
2 changes: 1 addition & 1 deletion core/runner/sink/Sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Sink {
public:
virtual bool Init() = 0;
virtual void Stop() = 0;

bool AddRequest(std::unique_ptr<T>&& request) {
mQueue.Push(std::move(request));
return true;
Expand Down
Loading
Loading