Skip to content

Commit

Permalink
update: PrometheusInputRunner inherits the InputRunner basic class (#…
Browse files Browse the repository at this point in the history
…1699)

* add input runner StopIfNotInUse ut

* chore: unlock in else case manually

* chore: add started mutex

* chore: init InputRunner when start Input plugin

* chore: remove unused data in ut

* update input runner
  • Loading branch information
catdogpandas authored Aug 30, 2024
1 parent 4307c2a commit e183d4a
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 99 deletions.
1 change: 1 addition & 0 deletions core/input/InputPrometheus.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ bool InputPrometheus::Init(const Json::Value& config, Json::Value&) {
/// @brief register scrape job by PrometheusInputRunner
bool InputPrometheus::Start() {
LOG_INFO(sLogger, ("input config start", mJobName));
PrometheusInputRunner::GetInstance()->Init();

mTargetSubscirber->mQueueKey = mContext->GetProcessQueueKey();

Expand Down
19 changes: 5 additions & 14 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,30 +46,27 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
static bool isInputStreamStarted = false;
#endif
bool isInputObserverChanged = false, isInputFileChanged = false, isInputStreamChanged = false,
isInputContainerStdioChanged = false, isInputPrometheusChanged = false;
isInputContainerStdioChanged = false;
for (const auto& name : diff.mRemoved) {
CheckIfInputUpdated(mPipelineNameEntityMap[name]->GetConfig()["inputs"][0],
isInputObserverChanged,
isInputFileChanged,
isInputStreamChanged,
isInputContainerStdioChanged,
isInputPrometheusChanged);
isInputContainerStdioChanged);
}
for (const auto& config : diff.mModified) {
CheckIfInputUpdated(*config.mInputs[0],
isInputObserverChanged,
isInputFileChanged,
isInputStreamChanged,
isInputContainerStdioChanged,
isInputPrometheusChanged);
isInputContainerStdioChanged);
}
for (const auto& config : diff.mAdded) {
CheckIfInputUpdated(*config.mInputs[0],
isInputObserverChanged,
isInputFileChanged,
isInputStreamChanged,
isInputContainerStdioChanged,
isInputPrometheusChanged);
isInputContainerStdioChanged);
}

#if defined(__ENTERPRISE__) && defined(__linux__) && !defined(__ANDROID__)
Expand All @@ -90,9 +87,6 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
}
LogProcess::GetInstance()->HoldOn();
LogtailPlugin::GetInstance()->HoldOn(false);
if (isInputPrometheusChanged) {
PrometheusInputRunner::GetInstance()->Start();
}
#endif

for (const auto& name : diff.mRemoved) {
Expand Down Expand Up @@ -303,8 +297,7 @@ void PipelineManager::CheckIfInputUpdated(const Json::Value& config,
bool& isInputObserverChanged,
bool& isInputFileChanged,
bool& isInputStreamChanged,
bool& isInputContainerStdioChanged,
bool& isInputPrometheusChanged) {
bool& isInputContainerStdioChanged) {
string inputType = config["Type"].asString();
if (inputType == "input_observer_network") {
isInputObserverChanged = true;
Expand All @@ -314,8 +307,6 @@ void PipelineManager::CheckIfInputUpdated(const Json::Value& config,
isInputStreamChanged = true;
} else if (inputType == "input_container_stdio") {
isInputContainerStdioChanged = true;
} else if (inputType == "input_prometheus") {
isInputPrometheusChanged = true;
}
}

Expand Down
3 changes: 1 addition & 2 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -62,8 +62,7 @@ class PipelineManager {
bool& isInputObserverChanged,
bool& isInputFileChanged,
bool& isInputStreamChanged,
bool& isInputContainerStdioChanged,
bool& isInputPrometheusChanged);
bool& isInputContainerStdioChanged);

std::unordered_map<std::string, std::shared_ptr<Pipeline>> mPipelineNameEntityMap;
mutable SpinLock mPluginCntMapLock;
Expand Down
52 changes: 34 additions & 18 deletions core/prometheus/PrometheusInputRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,6 @@ DECLARE_FLAG_STRING(_pod_name_);
namespace logtail {

PrometheusInputRunner::PrometheusInputRunner() : mUnRegisterMs(0) {
mIsStarted.store(false);
mClient = std::make_unique<sdk::CurlClient>();

mServiceHost = STRING_FLAG(loong_collector_operator_service);
Expand All @@ -55,7 +54,7 @@ void PrometheusInputRunner::UpdateScrapeInput(std::shared_ptr<TargetSubscriberSc
targetSubscriber->mServicePort = mServicePort;
targetSubscriber->mPodName = mPodName;

targetSubscriber->mUnRegisterMs = mUnRegisterMs;
targetSubscriber->mUnRegisterMs = mUnRegisterMs.load();
targetSubscriber->SetTimer(mTimer);
targetSubscriber->SetFirstExecTime(std::chrono::steady_clock::now());
// 1. add subscriber to mTargetSubscriberSchedulerMap
Expand All @@ -76,18 +75,22 @@ void PrometheusInputRunner::RemoveScrapeInput(const std::string& jobName) {
}

/// @brief targets discovery and start scrape work
void PrometheusInputRunner::Start() {
LOG_INFO(sLogger, ("PrometheusInputRunner", "Start"));
if (mIsStarted.load()) {
void PrometheusInputRunner::Init() {
std::lock_guard<mutex> lock(mStartMutex);
if (mIsStarted) {
return;
}
mIsStarted.store(true);
LOG_INFO(sLogger, ("PrometheusInputRunner", "Start"));
mIsStarted = true;
mTimer->Init();
AsynCurlRunner::GetInstance()->Init();

mThreadRes = std::async(launch::async, [this]() {
// only register when operator exist
if (!mServiceHost.empty()) {
LOG_INFO(sLogger, ("PrometheusInputRunner", "register"));
// only register when operator exist
if (!mServiceHost.empty()) {
mIsThreadRunning.store(true);
auto res = std::async(launch::async, [this]() {
std::lock_guard<mutex> lock(mRegisterMutex);
int retry = 0;
while (mIsThreadRunning.load()) {
++retry;
Expand All @@ -109,28 +112,33 @@ void PrometheusInputRunner::Start() {
}
if (responseJson.isMember(prometheus::UNREGISTER_MS)
&& responseJson[prometheus::UNREGISTER_MS].isUInt64()) {
mUnRegisterMs = responseJson[prometheus::UNREGISTER_MS].asUInt64();
mUnRegisterMs.store(responseJson[prometheus::UNREGISTER_MS].asUInt64());
}
}
LOG_INFO(sLogger, ("Register Success", mPodName));
break;
}
std::this_thread::sleep_for(std::chrono::seconds(1));
}
}
});
});
}
}

/// @brief stop scrape work and clear all scrape jobs
void PrometheusInputRunner::Stop() {
LOG_INFO(sLogger, ("PrometheusInputRunner", "Stop"));
std::lock_guard<mutex> lock(mStartMutex);
if (!mIsStarted) {
return;
}

mIsStarted.store(false);
mIsStarted = false;
mIsThreadRunning.store(false);
mTimer->Stop();

LOG_INFO(sLogger, ("PrometheusInputRunner", "stop asyn curl runner"));
AsynCurlRunner::GetInstance()->Stop();

LOG_INFO(sLogger, ("PrometheusInputRunner", "cancel all target subscribers"));
CancelAllTargetSubscriber();
{
WriteLock lock(mSubscriberMapRWLock);
Expand All @@ -139,7 +147,9 @@ void PrometheusInputRunner::Stop() {

// only unregister when operator exist
if (!mServiceHost.empty()) {
LOG_INFO(sLogger, ("PrometheusInputRunner", "unregister"));
auto res = std::async(launch::async, [this]() {
std::lock_guard<mutex> lock(mRegisterMutex);
for (int retry = 0; retry < 3; ++retry) {
sdk::HttpMessage httpResponse = SendRegisterMessage(prometheus::UNREGISTER_COLLECTOR_PATH);
if (httpResponse.statusCode != 200) {
Expand All @@ -152,13 +162,23 @@ void PrometheusInputRunner::Stop() {
}
});
}
LOG_INFO(sLogger, ("PrometheusInputRunner", "Stop"));
}

bool PrometheusInputRunner::HasRegisteredPlugins() const {
ReadLock lock(mSubscriberMapRWLock);
return !mTargetSubscriberSchedulerMap.empty();
}

sdk::HttpMessage PrometheusInputRunner::SendRegisterMessage(const string& url) const {
map<string, string> httpHeader;
httpHeader[sdk::X_LOG_REQUEST_ID] = prometheus::PROMETHEUS_PREFIX + mPodName;
sdk::HttpMessage httpResponse;
httpResponse.header[sdk::X_LOG_REQUEST_ID] = prometheus::PROMETHEUS_PREFIX + mPodName;
#ifdef APSARA_UNIT_TEST_MAIN
httpResponse.statusCode = 200;
return httpResponse;
#endif
try {
mClient->Send(sdk::HTTP_GET,
mServiceHost,
Expand All @@ -177,10 +197,6 @@ sdk::HttpMessage PrometheusInputRunner::SendRegisterMessage(const string& url) c
return httpResponse;
}

bool PrometheusInputRunner::HasRegisteredPlugin() {
ReadLock lock(mSubscriberMapRWLock);
return !mTargetSubscriberSchedulerMap.empty();
}

void PrometheusInputRunner::CancelAllTargetSubscriber() {
ReadLock lock(mSubscriberMapRWLock);
Expand Down
21 changes: 11 additions & 10 deletions core/prometheus/PrometheusInputRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -23,17 +23,19 @@
#include "common/Lock.h"
#include "common/timer/Timer.h"
#include "prometheus/schedulers/TargetSubscriberScheduler.h"
#include "runner/InputRunner.h"
#include "sdk/Common.h"
#include "sdk/CurlImp.h"

namespace logtail {

class PrometheusInputRunner {
class PrometheusInputRunner : public InputRunner {
public:
PrometheusInputRunner(const PrometheusInputRunner&) = delete;
PrometheusInputRunner(PrometheusInputRunner&&) = delete;
PrometheusInputRunner& operator=(const PrometheusInputRunner&) = delete;
PrometheusInputRunner& operator=(PrometheusInputRunner&&) = delete;
~PrometheusInputRunner() override = default;
static PrometheusInputRunner* GetInstance() {
static PrometheusInputRunner sInstance;
return &sInstance;
Expand All @@ -44,21 +46,20 @@ class PrometheusInputRunner {
void RemoveScrapeInput(const std::string& jobName);

// target discover and scrape
void Start();
void Stop();
bool HasRegisteredPlugin();
void Init() override;
void Stop() override;
bool HasRegisteredPlugins() const override;

private:
PrometheusInputRunner();
~PrometheusInputRunner() = default;

sdk::HttpMessage SendRegisterMessage(const std::string& url) const;

void CancelAllTargetSubscriber();

std::atomic<bool> mIsStarted;
bool mIsStarted = false;
std::mutex mStartMutex;

std::future<void> mThreadRes;
std::mutex mRegisterMutex;
std::atomic<bool> mIsThreadRunning = true;

std::unique_ptr<sdk::CurlClient> mClient;
Expand All @@ -69,10 +70,10 @@ class PrometheusInputRunner {

std::shared_ptr<Timer> mTimer;

ReadWriteLock mSubscriberMapRWLock;
mutable ReadWriteLock mSubscriberMapRWLock;
std::map<std::string, std::shared_ptr<TargetSubscriberScheduler>> mTargetSubscriberSchedulerMap;

uint64_t mUnRegisterMs;
std::atomic<uint64_t> mUnRegisterMs;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PrometheusInputRunnerUnittest;
Expand Down
4 changes: 4 additions & 0 deletions core/unittest/input/InputPrometheusUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -134,6 +134,7 @@ void InputPrometheusUnittest::OnSuccessfulInit() {
APSARA_TEST_EQUAL(10 * 1024 * 1024, input->mTargetSubscirber->mScrapeConfigPtr->mMaxScrapeSizeBytes);
APSARA_TEST_EQUAL(1000000, input->mTargetSubscirber->mScrapeConfigPtr->mSampleLimit);
APSARA_TEST_EQUAL(1000000, input->mTargetSubscirber->mScrapeConfigPtr->mSeriesLimit);
PrometheusInputRunner::GetInstance()->Stop();
}

void InputPrometheusUnittest::OnFailedInit() {
Expand Down Expand Up @@ -177,6 +178,7 @@ void InputPrometheusUnittest::OnFailedInit() {
input->SetContext(ctx);
input->SetMetricsRecordRef(InputPrometheus::sName, "1", "1", "1");
APSARA_TEST_FALSE(input->Init(configJson, optionalGoPipeline));
PrometheusInputRunner::GetInstance()->Stop();
}

void InputPrometheusUnittest::OnPipelineUpdate() {
Expand Down Expand Up @@ -216,6 +218,7 @@ void InputPrometheusUnittest::OnPipelineUpdate() {
APSARA_TEST_TRUE(PrometheusInputRunner::GetInstance()->mTargetSubscriberSchedulerMap.find("_arms-prom/node-exporter/0")
== PrometheusInputRunner::GetInstance()->mTargetSubscriberSchedulerMap.end());

PrometheusInputRunner::GetInstance()->Stop();
}

void InputPrometheusUnittest::TestCreateInnerProcessor() {
Expand Down Expand Up @@ -377,6 +380,7 @@ void InputPrometheusUnittest::TestCreateInnerProcessor() {
->mRelabelConfigs[2]
.mAction);
}
PrometheusInputRunner::GetInstance()->Stop();
}

UNIT_TEST_CASE(InputPrometheusUnittest, OnSuccessfulInit)
Expand Down
Loading

0 comments on commit e183d4a

Please sign in to comment.