Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 25, 2024
1 parent ddbd911 commit f1a24f5
Show file tree
Hide file tree
Showing 14 changed files with 76 additions and 71 deletions.
2 changes: 1 addition & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -251,7 +251,7 @@ bool PipelineConfig::Parse() {
hasFileInput = true;
}
#ifdef APSARA_UNIT_TEST_MAIN
if (pluginType.find("mock") != string::npos) {
if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) {
hasFileInput = true;
}
#endif
Expand Down
11 changes: 11 additions & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,9 @@
#include "pipeline/PipelineManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "provider/Provider.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false);
DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect,
Expand Down Expand Up @@ -145,6 +148,7 @@ void LogtailPlugin::StopAllPipelines(bool withInputFlag) {
}

void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStopFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines stop", "starts")("config", configName));
auto stopStart = GetCurrentTimeInMilliSeconds();
Expand All @@ -159,6 +163,9 @@ void LogtailPlugin::Stop(const std::string& configName, bool removedFlag) {
HOLD_ON_TOO_SLOW_ALARM, "Stopping Go pipeline " + configName + " took " + ToString(stopCost) + "ms");
}
}
#else
LogtailPluginMock::GetInstance()->Stop(configName, removedFlag);
#endif
}

void LogtailPlugin::StopBuiltInModules() {
Expand All @@ -170,6 +177,7 @@ void LogtailPlugin::StopBuiltInModules() {
}

void LogtailPlugin::Start(const std::string& configName) {
#ifndef APSARA_UNIT_TEST_MAIN
if (mPluginValid && mStartFun != NULL) {
LOG_INFO(sLogger, ("Go pipelines start", "starts")("config name", configName));
GoString goConfigName;
Expand All @@ -178,6 +186,9 @@ void LogtailPlugin::Start(const std::string& configName) {
mStartFun(goConfigName);
LOG_INFO(sLogger, ("Go pipelines start", "succeeded")("config name", configName));
}
#else
LogtailPluginMock::GetInstance()->Start(configName);
#endif
}

int LogtailPlugin::IsValidToSend(long long logstoreKey) {
Expand Down
22 changes: 0 additions & 22 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,9 +32,6 @@
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "plugin/processor/ProcessorParseApsaraNative.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

DECLARE_FLAG_INT32(default_plugin_log_queue_size);

Expand Down Expand Up @@ -341,29 +338,18 @@ bool Pipeline::Init(PipelineConfig&& config) {
void Pipeline::Start() {
// #ifndef APSARA_UNIT_TEST_MAIN
// TODO: 应该保证指定时间内返回,如果无法返回,将配置放入startDisabled里
LOG_WARNING(sLogger, ("debug", "8"));
for (const auto& flusher : mFlushers) {
flusher->Start();
}

LOG_WARNING(sLogger, ("debug", "9"));
if (!mGoPipelineWithoutInput.isNull()) {
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput());
#else
LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithoutInput());
#endif
}

LOG_WARNING(sLogger, ("debug", "10"));
ProcessQueueManager::GetInstance()->EnablePop(mName);

if (!mGoPipelineWithInput.isNull()) {
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput());
#else
LogtailPluginMock::GetInstance()->Start(GetConfigNameOfGoPipelineWithInput());
#endif
}

for (const auto& input : mInputs) {
Expand Down Expand Up @@ -439,11 +425,7 @@ void Pipeline::Stop(bool isRemoving) {

if (!mGoPipelineWithInput.isNull()) {
// Go pipeline `Stop` will stop and delete
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving);
#else
LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithInput(), isRemoving);
#endif
}

ProcessQueueManager::GetInstance()->DisablePop(mName, isRemoving);
Expand All @@ -453,11 +435,7 @@ void Pipeline::Stop(bool isRemoving) {

if (!mGoPipelineWithoutInput.isNull()) {
// Go pipeline `Stop` will stop and delete
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving);
#else
LogtailPluginMock::GetInstance()->Stop(GetConfigNameOfGoPipelineWithoutInput(), isRemoving);
#endif
}

for (const auto& flusher : mFlushers) {
Expand Down
6 changes: 2 additions & 4 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -486,9 +486,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline

// CompressType
if (BOOL_FLAG(sls_client_send_compress)) {
#ifndef APSARA_UNIT_TEST_MAIN
mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, mPluginID, CompressType::LZ4);
#endif
}

mGroupSerializer = make_unique<SLSEventGroupSerializer>(this);
Expand Down Expand Up @@ -816,8 +814,8 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
// the possibility of hash key conflict is very low, so data is
// dropped here.
cpt->Commit();
failDetail << ", drop exactly once log group and commit checkpoint"
<< " checkpointKey:" << cpt->key << " checkpoint:" << cpt->data.DebugString();
failDetail << ", drop exactly once log group and commit checkpoint" << " checkpointKey:" << cpt->key
<< " checkpoint:" << cpt->data.DebugString();
suggestion << "no suggestion";
AlarmManager::GetInstance()->SendAlarm(
EXACTLY_ONCE_ALARM,
Expand Down
20 changes: 9 additions & 11 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,10 @@
#include "pipeline/queue/SenderQueueItem.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "runner/sink/http/HttpSink.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/HttpSinkMock.h"
#endif
// TODO: temporarily used here
#include "plugin/flusher/sls/PackIdManager.h"
#include "plugin/flusher/sls/SLSClientManager.h"
#include "unittest/pipeline/HttpSinkMock.h"

DEFINE_FLAG_INT32(flusher_runner_exit_timeout_secs, "", 60);
DEFINE_FLAG_INT32(check_send_client_timeout_interval, "", 600);
Expand Down Expand Up @@ -143,17 +140,16 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) {
}

req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now();
#ifndef APSARA_UNIT_TEST_MAIN
HttpSink::GetInstance()->AddRequest(std::move(req));
++mHttpSendingCnt;
LOG_DEBUG(sLogger,
("send item to http sink, item address", item)("config-flusher-dst",
QueueKeyManager::GetInstance()->GetName(item->mQueueKey))(
"sending cnt", ToString(mHttpSendingCnt.load())));
"sending cnt", ToString(mHttpSendingCnt.load() + 1)));
#ifndef APSARA_UNIT_TEST_MAIN
HttpSink::GetInstance()->AddRequest(std::move(req));
#else
HttpSinkMock::GetInstance()->AddRequest(std::move(req)); // release item here
++mHttpSendingCnt;
HttpSinkMock::GetInstance()->AddRequest(std::move(req));
#endif
++mHttpSendingCnt;
}

void FlusherRunner::Run() {
Expand Down Expand Up @@ -204,7 +200,9 @@ void FlusherRunner::Run() {
PackIdManager::GetInstance()->CleanTimeoutEntry();
mLastCheckSendClientTime = time(NULL);
}

LOG_WARNING(sLogger,
("flusher runner", "exit")("is_flush", mIsFlush)(
"all queue empty", SenderQueueManager::GetInstance()->IsAllQueueEmpty()));
if (mIsFlush && SenderQueueManager::GetInstance()->IsAllQueueEmpty()) {
break;
}
Expand Down
2 changes: 2 additions & 0 deletions core/runner/FlusherRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
#include "pipeline/plugin/interface/Flusher.h"
#include "pipeline/queue/SenderQueueItem.h"
#include "runner/sink/SinkType.h"
#include "runner/sink/http/HttpSink.h"

namespace logtail {

Expand Down Expand Up @@ -83,6 +84,7 @@ class FlusherRunner {
friend class PluginRegistryUnittest;
friend class FlusherRunnerUnittest;
friend class InstanceConfigManagerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
9 changes: 0 additions & 9 deletions core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "pipeline/PipelineManager.h"
#include "queue/ProcessQueueManager.h"
#include "queue/QueueKeyManager.h"
#include "unittest/pipeline/LogtailPluginMock.h"

DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1);
DEFINE_FLAG_INT32(processor_runner_exit_timeout_secs, "", 60);
Expand Down Expand Up @@ -179,19 +178,11 @@ void ProcessorRunner::Run(uint32_t threadNo) {
pipeline->GetContext().GetRegion());
continue;
}
#ifndef APSARA_UNIT_TEST_MAIN
LogtailPlugin::GetInstance()->ProcessLogGroup(
pipeline->GetContext().GetConfigName(),
res,
group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string());
}
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(
pipeline->GetContext().GetConfigName(),
res,
group.GetMetadata(EventGroupMetaKey::SOURCE_ID).to_string());
}
#endif
}
} else {
pipeline->Send(std::move(eventGroupList));
Expand Down
2 changes: 1 addition & 1 deletion core/runner/sink/Sink.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ class Sink {
public:
virtual bool Init() = 0;
virtual void Stop() = 0;

bool AddRequest(std::unique_ptr<T>&& request) {
mQueue.Push(std::move(request));
return true;
Expand Down
9 changes: 9 additions & 0 deletions core/unittest/pipeline/HttpSinkMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,12 @@ class HttpSinkMock : public HttpSink {
return &instance;
}

bool Init() override { return true; }

bool AddRequest(std::unique_ptr<HttpSinkRequest>&& request) {
if (useRealHttpSink) {
return HttpSink::GetInstance()->AddRequest(std::move(request));
}
{
std::lock_guard<std::mutex> lock(mMutex);
mRequests.push_back(request->mBody);
Expand All @@ -58,10 +63,14 @@ class HttpSinkMock : public HttpSink {
mRequests.clear();
}

void SetUseRealHttpSink(bool useReal) { useRealHttpSink = useReal; }

private:
HttpSinkMock() = default;
~HttpSinkMock() = default;

bool useRealHttpSink = false;

std::atomic_bool mIsFlush = false;
mutable std::mutex mMutex;
std::vector<std::string> mRequests;
Expand Down
7 changes: 7 additions & 0 deletions core/unittest/pipeline/LogtailPluginMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
#pragma once

#include "go_pipeline/LogtailPlugin.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

namespace logtail {
class LogtailPluginMock : public LogtailPlugin {
Expand Down Expand Up @@ -53,6 +56,7 @@ class LogtailPluginMock : public LogtailPlugin {


void ProcessLogGroup(const std::string& configName, const std::string& logGroup, const std::string& packId) {
#ifndef APSARA_UNIT_TEST_MAIN
while (processBlockFlag) {
LOG_DEBUG(sLogger, ("LogtailPluginMock process log group", "block")("config", configName));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Expand All @@ -69,6 +73,9 @@ class LogtailPluginMock : public LogtailPlugin {
LOG_INFO(sLogger,
("LogtailPluginMock process log group", "success")("config", configName)("logGroup",
logGroup)("packId", packId));
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(configName, logGroup, packId);
#endif
}

bool IsStarted() const { return startFlag; }
Expand Down
Loading

0 comments on commit f1a24f5

Please sign in to comment.