Skip to content

Commit

Permalink
test: pipeline update unittest
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 24, 2024
1 parent cacbf20 commit ddbd911
Show file tree
Hide file tree
Showing 26 changed files with 1,829 additions and 71 deletions.
6 changes: 5 additions & 1 deletion core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HttpResponse {
HttpResponse()
: mHeader(compareHeader),
mBody(new std::string(), [](void* p) { delete static_cast<std::string*>(p); }),
mWriteCallback(DefaultWriteCallback) {};
mWriteCallback(DefaultWriteCallback){};
HttpResponse(void* body,
const std::function<void(void*)>& bodyDeleter,
size_t (*callback)(char*, size_t, size_t, void*))
Expand Down Expand Up @@ -155,6 +155,10 @@ class HttpResponse {
std::map<std::string, std::string, decltype(compareHeader)*> mHeader;
std::unique_ptr<void, std::function<void(void*)>> mBody;
size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class HttpSinkMock;
#endif
};

} // namespace logtail
9 changes: 8 additions & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -250,6 +250,11 @@ bool PipelineConfig::Parse() {
if (pluginType == "input_file" || pluginType == "input_container_stdio") {
hasFileInput = true;
}
#ifdef APSARA_UNIT_TEST_MAIN
if (pluginType.find("mock") != string::npos) {
hasFileInput = true;
}
#endif
}
// TODO: remove these special restrictions
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
Expand Down Expand Up @@ -530,7 +535,9 @@ bool PipelineConfig::Parse() {
}
mRouter.emplace_back(i, itr);
} else {
mRouter.emplace_back(i, nullptr);
if (!IsFlushingThroughGoPipelineExisted()) {
mRouter.emplace_back(i, nullptr);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {

void LogInput::UpdateCriticalMetric(int32_t curTime) {
mLastRunTime->Set(mLastReadEventTime.load());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
mEventProcessCount = 0;
Expand Down Expand Up @@ -529,6 +530,7 @@ Event* LogInput::PopEventQueue() {
#ifdef APSARA_UNIT_TEST_MAIN
void LogInput::CleanEnviroments() {
mIdleFlag = true;
mInteruptFlag = true;
usleep(100 * 1000);
while (true) {
Event* ev = PopEventQueue();
Expand Down
29 changes: 27 additions & 2 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "plugin/processor/ProcessorParseApsaraNative.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

DECLARE_FLAG_INT32(default_plugin_log_queue_size);

Expand Down Expand Up @@ -338,18 +341,29 @@ bool Pipeline::Init(PipelineConfig&& config) {
void Pipeline::Start() {
// #ifndef APSARA_UNIT_TEST_MAIN
// TODO: 应该保证指定时间内返回,如果无法返回,将配置放入startDisabled里
LOG_WARNING(sLogger, ("debug", "8"));
for (const auto& flusher : mFlushers) {
flusher->Start();
}

LOG_WARNING(sLogger, ("debug", "9"));
if (!mGoPipelineWithoutInput.isNull()) {
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput());
#else
LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput());
#endif
}

LOG_WARNING(sLogger, ("debug", "10"));
ProcessQueueManager::GetInstance()->EnablePop(mName);

if (!mGoPipelineWithInput.isNull()) {
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput());
#else
LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput());
#endif
}

for (const auto& input : mInputs) {
Expand Down Expand Up @@ -395,8 +409,9 @@ bool Pipeline::Send(vector<PipelineEventGroup>&& groupList) {
auto res = mRouter.Route(group);
for (auto& item : res) {
if (item.first >= mFlushers.size()) {
LOG_ERROR(sLogger,
("unexpected error", "invalid flusher index")("flusher index", item.first)("config", mName));
LOG_WARNING(sLogger,
("pipeline send", "discard data")("config", mName)(
"reason", "invalid flusher index or config update flusher from C++ to Go"));
allSucceeded = false;
continue;
}
Expand Down Expand Up @@ -424,7 +439,11 @@ void Pipeline::Stop(bool isRemoving) {

if (!mGoPipelineWithInput.isNull()) {
// Go pipeline `Stop` will stop and delete
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving);
#else
LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving);
#endif
}

ProcessQueueManager::GetInstance()->DisablePop(mName, isRemoving);
Expand All @@ -434,7 +453,11 @@ void Pipeline::Stop(bool isRemoving) {

if (!mGoPipelineWithoutInput.isNull()) {
// Go pipeline `Stop` will stop and delete
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving);
#else
LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving);
#endif
}

for (const auto& flusher : mFlushers) {
Expand Down Expand Up @@ -488,6 +511,7 @@ void Pipeline::CopyNativeGlobalParamToGoPipeline(Json::Value& pipeline) {
}

bool Pipeline::LoadGoPipelines() const {
#ifndef APSARA_UNIT_TEST_MAIN
if (!mGoPipelineWithoutInput.isNull()) {
string content = mGoPipelineWithoutInput.toStyledString();
if (!LogtailPlugin::GetInstance()->LoadPipeline(GetConfigNameOfGoPipelineWithoutInput(),
Expand Down Expand Up @@ -529,6 +553,7 @@ bool Pipeline::LoadGoPipelines() const {
return false;
}
}
#endif
return true;
}

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/Pipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Pipeline {
PipelineContext& GetContext() const { return mContext; }
const Json::Value& GetConfig() const { return *mConfig; }
const std::optional<std::string>& GetSingletonInput() const { return mSingletonInput; }
const std::vector<std::unique_ptr<ProcessorInstance>>& GetProcessors() const { return mProcessorLine; }
const std::vector<std::unique_ptr<FlusherInstance>>& GetFlushers() const { return mFlushers; }
bool IsFlushingThroughGoPipeline() const { return !mGoPipelineWithoutInput.isNull(); }
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& GetPluginStatistics() const {
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/instance/ProcessorInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class ProcessorInstance : public PluginInstance {
ProcessorInstance(Processor* plugin, const PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {}

const std::string& Name() const override { return mPlugin->Name(); };
const Processor* GetPlugin() const { return mPlugin.get(); }

bool Init(const Json::Value& config, PipelineContext& context);
void Process(std::vector<PipelineEventGroup>& logGroupList);
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/interface/Flusher.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ class Flusher : public Plugin {
friend class FlusherInstanceUnittest;
friend class FlusherRunnerUnittest;
friend class FlusherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/ProcessQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,7 @@ class ProcessQueueManager : public FeedbackInterface {
void Clear();
friend class ProcessQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/SenderQueueManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ class SenderQueueManager : public FeedbackInterface {
#ifdef APSARA_UNIT_TEST_MAIN
friend class SenderQueueManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
6 changes: 6 additions & 0 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,9 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline

// CompressType
if (BOOL_FLAG(sls_client_send_compress)) {
#ifndef APSARA_UNIT_TEST_MAIN
mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, mPluginID, CompressType::LZ4);
#endif
}

mGroupSerializer = make_unique<SLSEventGroupSerializer>(this);
Expand Down Expand Up @@ -672,7 +674,11 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)

auto data = static_cast<SLSSenderQueueItem*>(item);
string configName = HasContext() ? GetContext().GetConfigName() : "";
#ifndef APSARA_UNIT_TEST_MAIN
bool isProfileData = GetProfileSender()->IsProfileData(mRegion, mProject, data->mLogstore);
#else
bool isProfileData = false;
#endif
int32_t curTime = time(NULL);
auto curSystemTime = chrono::system_clock::now();
bool hasAuthError = false;
Expand Down
9 changes: 9 additions & 0 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "runner/sink/http/HttpSink.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/HttpSinkMock.h"
#endif
// TODO: temporarily used here
#include "plugin/flusher/sls/PackIdManager.h"
#include "plugin/flusher/sls/SLSClientManager.h"
Expand Down Expand Up @@ -59,6 +62,7 @@ bool FlusherRunner::Init() {

mThreadRes = async(launch::async, &FlusherRunner::Run, this);
mLastCheckSendClientTime = time(nullptr);
mIsFlush = false;

return true;
}
Expand Down Expand Up @@ -139,12 +143,17 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) {
}

req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now();
#ifndef APSARA_UNIT_TEST_MAIN
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
LOG_DEBUG(sLogger,
("send item to http sink, item address", item)("config-flusher-dst",
QueueKeyManager::GetInstance()->GetName(item->mQueueKey))(
"sending cnt", ToString(mHttpSendingCnt.load())));
#else
HttpSinkMock::GetInstance()->AddRequest(std::move(req)); // release item here
++mHttpSendingCnt;
#endif
}

void FlusherRunner::Run() {
Expand Down
16 changes: 14 additions & 2 deletions core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,9 @@
#include "monitor/AlarmManager.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "pipeline/PipelineManager.h"
#include "queue/ExactlyOnceQueueManager.h"
#include "queue/ProcessQueueManager.h"
#include "queue/QueueKeyManager.h"
#include "unittest/pipeline/LogtailPluginMock.h"

DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1);
DEFINE_FLAG_INT32(processor_runner_exit_timeout_secs, "", 60);
Expand All @@ -49,6 +49,7 @@ void ProcessorRunner::Init() {
for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) {
mThreadRes[threadNo] = async(launch::async, &ProcessorRunner::Run, this, threadNo);
}
mIsFlush = false;
}

void ProcessorRunner::Stop() {
Expand Down Expand Up @@ -141,8 +142,11 @@ void ProcessorRunner::Run(uint32_t threadNo) {
eventGroupList.emplace_back(std::move(item->mEventGroup));
pipeline->Process(eventGroupList, item->mInputIndex);
// if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline
shared_ptr<Pipeline> oldPipeline;
if (hasOldPipeline) {
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName);
pipeline->SubInProcessCnt(); // old pipeline
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); // update to new pipeline
pipeline->AddInProcessCnt();
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
Expand Down Expand Up @@ -175,11 +179,19 @@ void ProcessorRunner::Run(uint32_t threadNo) {
pipeline->GetContext().GetRegion());
continue;
}
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->ProcessLogGroup(
pipeline->GetContext().GetConfigName(),
res,
group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string());
}
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(
pipeline->GetContext().GetConfigName(),
res,
group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string());
}
#endif
}
} else {
pipeline->Send(std::move(eventGroupList));
Expand Down
3 changes: 2 additions & 1 deletion core/runner/sink/http/HttpSink.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@
#include <future>
#include <mutex>

#include "monitor/MetricManager.h"
#include "runner/sink/Sink.h"
#include "runner/sink/http/HttpSinkRequest.h"
#include "monitor/MetricManager.h"

namespace logtail {

Expand Down Expand Up @@ -68,6 +68,7 @@ class HttpSink : public Sink<HttpSinkRequest> {

#ifdef APSARA_UNIT_TEST_MAIN
friend class FlusherRunnerUnittest;
friend class HttpSinkMock;
#endif
};

Expand Down
6 changes: 3 additions & 3 deletions core/unittest/config/PipelineManagerMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,6 @@ class PipelineMock : public Pipeline {
mContext.SetCreateTime(config.mCreateTime);
return (*mConfig)["valid"].asBool();
}

bool Start() { return true; }
void Stop(bool isRemoving) {}
};

class PipelineManagerMock : public PipelineManager {
Expand All @@ -44,6 +41,9 @@ class PipelineManagerMock : public PipelineManager {
}

void ClearEnvironment() {
for (auto& it : mPipelineNameEntityMap) {
it.second->Stop(true);
}
mPipelineNameEntityMap.clear();
mPluginCntMap.clear();
}
Expand Down
Loading

0 comments on commit ddbd911

Please sign in to comment.