Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 26, 2024
1 parent 29de9d0 commit ada2037
Show file tree
Hide file tree
Showing 9 changed files with 824 additions and 92 deletions.
1 change: 1 addition & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -518,6 +518,7 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
4 changes: 4 additions & 0 deletions core/go_pipeline/LogtailPlugin.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -514,6 +514,7 @@ void LogtailPlugin::ProcessLog(const std::string& configName,
void LogtailPlugin::ProcessLogGroup(const std::string& configName,
const std::string& logGroup,
const std::string& packId) {
#ifndef APSARA_UNIT_TEST_MAIN
if (logGroup.empty() || !(mPluginValid && mProcessLogsFun != NULL)) {
return;
}
Expand All @@ -532,6 +533,9 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName,
if (rst != (GoInt)0) {
LOG_WARNING(sLogger, ("process loggroup error", configName)("result", rst));
}
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(configName, logGroup, packId);
#endif
}

void LogtailPlugin::GetGoMetrics(std::vector<std::map<std::string, std::string>>& metircsList,
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/queue/BoundedProcessQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ class BoundedProcessQueue : public BoundedQueueInterface<std::unique_ptr<Process
friend class ProcessQueueManagerUnittest;
friend class ExactlyOnceQueueManagerUnittest;
friend class PipelineUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
10 changes: 3 additions & 7 deletions core/pipeline/queue/ProcessQueueItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -35,13 +35,9 @@ struct ProcessQueueItem {
ProcessQueueItem(PipelineEventGroup&& group, size_t index) : mEventGroup(std::move(group)), mInputIndex(index) {}

void AddPipelineInProcessCnt(const std::string& configName) {
if (mPipeline) {
mPipeline->AddInProcessCnt();
} else {
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
const auto& p = PipelineManager::GetInstance()->FindConfigByName(configName);
if (p) {
p->AddInProcessCnt();
}
}
};
Expand Down
2 changes: 0 additions & 2 deletions core/runner/ProcessorRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,15 +143,13 @@ void ProcessorRunner::Run(uint32_t threadNo) {
// if the pipeline is updated, the pointer will be released, so we need to update it to the new pipeline
shared_ptr<Pipeline> oldPipeline;
if (hasOldPipeline) {
pipeline->SubInProcessCnt(); // old pipeline
pipeline = PipelineManager::GetInstance()->FindConfigByName(configName); // update to new pipeline
if (!pipeline) {
LOG_INFO(sLogger,
("pipeline not found during processing, perhaps due to config deletion",
"discard data")("config", configName));
continue;
}
pipeline->AddInProcessCnt();
}

if (pipeline->IsFlushingThroughGoPipeline()) {
Expand Down
15 changes: 11 additions & 4 deletions core/unittest/pipeline/HttpSinkMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
#include "logger/Logger.h"
#include "pipeline/plugin/interface/HttpFlusher.h"
#include "pipeline/queue/SLSSenderQueueItem.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "runner/FlusherRunner.h"
#include "runner/sink/http/HttpSink.h"
#include "sdk/Common.h"
Expand All @@ -42,7 +43,13 @@ class HttpSinkMock : public HttpSink {
}
{
std::lock_guard<std::mutex> lock(mMutex);
mRequests.push_back(request->mBody);
std::string logstore = "default";
if (static_cast<HttpFlusher*>(request->mItem->mFlusher)->Name().find("sls") != std::string::npos) {
auto flusher = static_cast<FlusherSLS*>(request->mItem->mFlusher);
logstore = flusher->mLogstore;
}
LOG_DEBUG(sLogger, ("http sink mock", "add request")("logstore", logstore)("body", request->mBody));
mRequests[logstore].push_back(request->mBody);
}
request->mResponse.SetStatusCode(200);
request->mResponse.mHeader[sdk::X_LOG_REQUEST_ID] = "request_id";
Expand All @@ -53,9 +60,9 @@ class HttpSinkMock : public HttpSink {
return true;
}

std::vector<std::string> GetRequests() {
std::vector<std::string> GetRequests(std::string logstore) {
std::lock_guard<std::mutex> lock(mMutex);
return mRequests;
return mRequests[logstore];
}

void ClearRequests() {
Expand All @@ -73,7 +80,7 @@ class HttpSinkMock : public HttpSink {

std::atomic_bool mIsFlush = false;
mutable std::mutex mMutex;
std::vector<std::string> mRequests;
std::unordered_map<std::string, std::vector<std::string>> mRequests;
};

} // namespace logtail
7 changes: 0 additions & 7 deletions core/unittest/pipeline/LogtailPluginMock.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,6 @@
#pragma once

#include "go_pipeline/LogtailPlugin.h"
#ifdef APSARA_UNIT_TEST_MAIN
#include "unittest/pipeline/LogtailPluginMock.h"
#endif

namespace logtail {
class LogtailPluginMock : public LogtailPlugin {
Expand Down Expand Up @@ -56,7 +53,6 @@ class LogtailPluginMock : public LogtailPlugin {


void ProcessLogGroup(const std::string& configName, const std::string& logGroup, const std::string& packId) {
#ifndef APSARA_UNIT_TEST_MAIN
while (processBlockFlag) {
LOG_DEBUG(sLogger, ("LogtailPluginMock process log group", "block")("config", configName));
std::this_thread::sleep_for(std::chrono::milliseconds(100));
Expand All @@ -73,9 +69,6 @@ class LogtailPluginMock : public LogtailPlugin {
LOG_INFO(sLogger,
("LogtailPluginMock process log group", "success")("config", configName)("logGroup",
logGroup)("packId", packId));
#else
LogtailPluginMock::GetInstance()->ProcessLogGroup(configName, logGroup, packId);
#endif
}

bool IsStarted() const { return startFlag; }
Expand Down
Loading

0 comments on commit ada2037

Please sign in to comment.