From 0693bb10a29eb9223838463e41b0f82b38851281 Mon Sep 17 00:00:00 2001 From: abingcbc Date: Fri, 27 Dec 2024 22:42:55 +0800 Subject: [PATCH] fix --- core/common/timer/Timer.cpp | 7 ++++ core/common/timer/Timer.h | 3 +- core/host_monitor/HostMonitorInputRunner.cpp | 6 +++ core/plugin/input/InputHostMeta.cpp | 2 +- core/prometheus/PrometheusInputRunner.cpp | 2 - core/prometheus/PrometheusInputRunner.h | 1 - core/prometheus/schedulers/BaseScheduler.cpp | 1 - core/prometheus/schedulers/BaseScheduler.h | 4 +- .../prometheus/schedulers/ScrapeScheduler.cpp | 6 +-- .../schedulers/TargetSubscriberScheduler.cpp | 40 +++++++++---------- .../HostMonitorInputRunnerUnittest.cpp | 8 +++- .../ProcessEntityCollectorUnittest.cpp | 2 +- .../prometheus/ScrapeSchedulerUnittest.cpp | 2 + 13 files changed, 47 insertions(+), 37 deletions(-) diff --git a/core/common/timer/Timer.cpp b/core/common/timer/Timer.cpp index 7533b1da24..e33aa0aacc 100644 --- a/core/common/timer/Timer.cpp +++ b/core/common/timer/Timer.cpp @@ -51,6 +51,13 @@ void Timer::Stop() { } } +void Timer::Clear() { + lock_guard lock(mQueueMux); + while (!mQueue.empty()) { + mQueue.pop(); + } +} + void Timer::PushEvent(unique_ptr&& e) { lock_guard lock(mQueueMux); if (mQueue.empty() || e->GetExecTime() < mQueue.top()->GetExecTime()) { diff --git a/core/common/timer/Timer.h b/core/common/timer/Timer.h index ad1ca15be4..efb1ec4f5e 100644 --- a/core/common/timer/Timer.h +++ b/core/common/timer/Timer.h @@ -46,6 +46,7 @@ class Timer { void Init(); void Stop(); void PushEvent(std::unique_ptr&& e); + void Clear(); private: Timer() = default; @@ -57,7 +58,7 @@ class Timer { std::future mThreadRes; mutable std::mutex mThreadRunningMux; - bool mIsThreadRunning = true; + bool mIsThreadRunning = false; mutable std::condition_variable mCV; #ifdef APSARA_UNIT_TEST_MAIN diff --git a/core/host_monitor/HostMonitorInputRunner.cpp b/core/host_monitor/HostMonitorInputRunner.cpp index 3ac973df04..a852c3d67f 100755 --- a/core/host_monitor/HostMonitorInputRunner.cpp +++ b/core/host_monitor/HostMonitorInputRunner.cpp @@ -45,12 +45,17 @@ void HostMonitorInputRunner::UpdateCollector(const std::vector& new for (auto& collector : newCollectors) { auto oldCollector = mRegisteredCollectorMap.find(collector); if (oldCollector == mRegisteredCollectorMap.end()) { + if (mCollectorInstanceMap.find(collector) == mCollectorInstanceMap.end()) { + LOG_ERROR(sLogger, ("host monitor", "collector not found")("collector", collector)); + continue; + } mRegisteredCollectorMap[collector] = true; HostMonitorTimerEvent::CollectConfig collectConfig( collector, processQueueKey, inputIndex, std::chrono::seconds(DEFAULT_SCHEDULE_INTERVAL)); auto now = std::chrono::steady_clock::now(); auto event = std::make_unique(now + collectConfig.mInterval, collectConfig); Timer::GetInstance()->PushEvent(std::move(event)); + LOG_INFO(sLogger, ("host monitor", "add new collector")("collector", collector)); } else { // config removed and added again, timer event is still in the queue if (!oldCollector->second) { @@ -122,6 +127,7 @@ void HostMonitorInputRunner::ScheduleOnce(HostMonitorTimerEvent::CollectConfig& return; } + LOG_DEBUG(sLogger, ("host monitor collect", "collector")(config.mCollectorName, group.GetEvents().size())); if (group.GetEvents().size() > 0) { bool result = ProcessorRunner::GetInstance()->PushQueue( config.mProcessQueueKey, config.mInputIndex, std::move(group)); diff --git a/core/plugin/input/InputHostMeta.cpp b/core/plugin/input/InputHostMeta.cpp index 88a4443d6d..0a5c06117a 100644 --- a/core/plugin/input/InputHostMeta.cpp +++ b/core/plugin/input/InputHostMeta.cpp @@ -30,7 +30,7 @@ bool InputHostMeta::Init(const Json::Value& config, Json::Value& optionalGoPipel bool InputHostMeta::Start() { LOG_INFO(sLogger, ("input host meta start", mContext->GetConfigName())); HostMonitorInputRunner::GetInstance()->Init(); - HostMonitorInputRunner::GetInstance()->UpdateCollector({"process"}, mContext->GetProcessQueueKey(), mIndex); + HostMonitorInputRunner::GetInstance()->UpdateCollector({"process_entity"}, mContext->GetProcessQueueKey(), mIndex); return true; } diff --git a/core/prometheus/PrometheusInputRunner.cpp b/core/prometheus/PrometheusInputRunner.cpp index eeb4260413..931bedf374 100644 --- a/core/prometheus/PrometheusInputRunner.cpp +++ b/core/prometheus/PrometheusInputRunner.cpp @@ -137,7 +137,6 @@ void PrometheusInputRunner::Init() { mIsStarted = true; #ifndef APSARA_UNIT_TEST_MAIN - mTimer->Init(); AsynCurlRunner::GetInstance()->Init(); #endif @@ -203,7 +202,6 @@ void PrometheusInputRunner::Stop() { } #ifndef APSARA_UNIT_TEST_MAIN - mTimer->Stop(); LOG_INFO(sLogger, ("PrometheusInputRunner", "stop asyn curl runner")); AsynCurlRunner::GetInstance()->Stop(); #endif diff --git a/core/prometheus/PrometheusInputRunner.h b/core/prometheus/PrometheusInputRunner.h index 996caf163e..cca25857f8 100644 --- a/core/prometheus/PrometheusInputRunner.h +++ b/core/prometheus/PrometheusInputRunner.h @@ -75,7 +75,6 @@ class PrometheusInputRunner : public InputRunner { std::string mPodName; std::unique_ptr mClient; - std::shared_ptr mTimer; EventPool mEventPool; mutable ReadWriteLock mSubscriberMapRWLock; diff --git a/core/prometheus/schedulers/BaseScheduler.cpp b/core/prometheus/schedulers/BaseScheduler.cpp index 72532851a9..58d286b36b 100644 --- a/core/prometheus/schedulers/BaseScheduler.cpp +++ b/core/prometheus/schedulers/BaseScheduler.cpp @@ -40,7 +40,6 @@ bool BaseScheduler::IsCancelled() { } void BaseScheduler::SetComponent(EventPool* eventPool) { - mTimer = Timer::GetInstance(); mEventPool = eventPool; } } // namespace logtail \ No newline at end of file diff --git a/core/prometheus/schedulers/BaseScheduler.h b/core/prometheus/schedulers/BaseScheduler.h index e246b63609..985d2bbe55 100644 --- a/core/prometheus/schedulers/BaseScheduler.h +++ b/core/prometheus/schedulers/BaseScheduler.h @@ -20,7 +20,8 @@ class BaseScheduler { std::chrono::steady_clock::time_point GetNextExecTime(); - void SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime,std::chrono::system_clock::time_point firstScrapeTime); + void SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime, + std::chrono::system_clock::time_point firstScrapeTime); void DelayExecTime(uint64_t delaySeconds); virtual void Cancel(); @@ -44,7 +45,6 @@ class BaseScheduler { std::shared_ptr> mFuture; std::shared_ptr> mIsContextValidFuture; - Timer* mTimer; EventPool* mEventPool = nullptr; }; } // namespace logtail \ No newline at end of file diff --git a/core/prometheus/schedulers/ScrapeScheduler.cpp b/core/prometheus/schedulers/ScrapeScheduler.cpp index 0adfae2209..1e97e6a78e 100644 --- a/core/prometheus/schedulers/ScrapeScheduler.cpp +++ b/core/prometheus/schedulers/ScrapeScheduler.cpp @@ -190,7 +190,7 @@ void ScrapeScheduler::ScheduleNext() { } auto event = BuildScrapeTimerEvent(GetNextExecTime()); - mTimer->PushEvent(std::move(event)); + Timer::GetInstance()->PushEvent(std::move(event)); } void ScrapeScheduler::ScrapeOnce(std::chrono::steady_clock::time_point execTime) { @@ -201,9 +201,7 @@ void ScrapeScheduler::ScrapeOnce(std::chrono::steady_clock::time_point execTime) }); mFuture = future; auto event = BuildScrapeTimerEvent(execTime); - if (mTimer) { - mTimer->PushEvent(std::move(event)); - } + Timer::GetInstance()->PushEvent(std::move(event)); } std::unique_ptr ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::steady_clock::time_point execTime) { diff --git a/core/prometheus/schedulers/TargetSubscriberScheduler.cpp b/core/prometheus/schedulers/TargetSubscriberScheduler.cpp index ffad53f844..451cd4d9c1 100644 --- a/core/prometheus/schedulers/TargetSubscriberScheduler.cpp +++ b/core/prometheus/schedulers/TargetSubscriberScheduler.cpp @@ -106,25 +106,23 @@ void TargetSubscriberScheduler::UpdateScrapeScheduler( for (const auto& [k, v] : newScrapeSchedulerMap) { if (mScrapeSchedulerMap.find(k) == mScrapeSchedulerMap.end()) { mScrapeSchedulerMap[k] = v; - if (mTimer) { - auto tmpCurrentMilliSeconds = GetCurrentTimeInMilliSeconds(); - auto tmpRandSleepMilliSec = GetRandSleepMilliSec( - v->GetId(), mScrapeConfigPtr->mScrapeIntervalSeconds, tmpCurrentMilliSeconds); - - // zero-cost upgrade - if (mUnRegisterMs > 0 - && (tmpCurrentMilliSeconds + tmpRandSleepMilliSec - - (uint64_t)mScrapeConfigPtr->mScrapeIntervalSeconds * 1000 - > mUnRegisterMs) - && (tmpCurrentMilliSeconds + tmpRandSleepMilliSec - - (uint64_t)mScrapeConfigPtr->mScrapeIntervalSeconds * 1000 * 2 - < mUnRegisterMs)) { - // scrape once just now - LOG_INFO(sLogger, ("scrape zero cost", ToString(tmpCurrentMilliSeconds))); - v->ScrapeOnce(std::chrono::steady_clock::now()); - } - v->ScheduleNext(); + auto tmpCurrentMilliSeconds = GetCurrentTimeInMilliSeconds(); + auto tmpRandSleepMilliSec = GetRandSleepMilliSec( + v->GetId(), mScrapeConfigPtr->mScrapeIntervalSeconds, tmpCurrentMilliSeconds); + + // zero-cost upgrade + if (mUnRegisterMs > 0 + && (tmpCurrentMilliSeconds + tmpRandSleepMilliSec + - (uint64_t)mScrapeConfigPtr->mScrapeIntervalSeconds * 1000 + > mUnRegisterMs) + && (tmpCurrentMilliSeconds + tmpRandSleepMilliSec + - (uint64_t)mScrapeConfigPtr->mScrapeIntervalSeconds * 1000 * 2 + < mUnRegisterMs)) { + // scrape once just now + LOG_INFO(sLogger, ("scrape zero cost", ToString(tmpCurrentMilliSeconds))); + v->ScrapeOnce(std::chrono::steady_clock::now()); } + v->ScheduleNext(); } } } @@ -261,7 +259,7 @@ void TargetSubscriberScheduler::ScheduleNext() { } auto event = BuildSubscriberTimerEvent(GetNextExecTime()); - mTimer->PushEvent(std::move(event)); + Timer::GetInstance()->PushEvent(std::move(event)); } void TargetSubscriberScheduler::Cancel() { @@ -281,9 +279,7 @@ void TargetSubscriberScheduler::SubscribeOnce(std::chrono::steady_clock::time_po }); mFuture = future; auto event = BuildSubscriberTimerEvent(execTime); - if (mTimer) { - mTimer->PushEvent(std::move(event)); - } + Timer::GetInstance()->PushEvent(std::move(event)); } std::unique_ptr diff --git a/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp b/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp index 6087696990..f017279b60 100644 --- a/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp +++ b/core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp @@ -33,6 +33,12 @@ class HostMonitorInputRunnerUnittest : public testing::Test { public: void TestUpdateAndRemoveCollector() const; void TestScheduleOnce() const; + +private: + void TearDown() override { + HostMonitorInputRunner::GetInstance()->Stop(); + Timer::GetInstance()->Clear(); + } }; void HostMonitorInputRunnerUnittest::TestUpdateAndRemoveCollector() const { @@ -67,8 +73,6 @@ void HostMonitorInputRunnerUnittest::TestScheduleOnce() const { APSARA_TEST_TRUE_FATAL(ProcessQueueManager::GetInstance()->PopItem(0, item, configName)); APSARA_TEST_EQUAL_FATAL("test", configName); - // verify schdule next - APSARA_TEST_EQUAL_FATAL(Timer::GetInstance()->mQueue.size(), 1); runner->mThreadPool.Stop(); runner->Stop(); } diff --git a/core/unittest/host_monitor/ProcessEntityCollectorUnittest.cpp b/core/unittest/host_monitor/ProcessEntityCollectorUnittest.cpp index 3bcadcb618..d9234228b7 100644 --- a/core/unittest/host_monitor/ProcessEntityCollectorUnittest.cpp +++ b/core/unittest/host_monitor/ProcessEntityCollectorUnittest.cpp @@ -30,7 +30,7 @@ class ProcessEntityCollectorUnittest : public testing::Test { void ProcessEntityCollectorUnittest::TestGetNewProcessStat() const { PROCESS_DIR = "."; auto collector = ProcessEntityCollector(); - auto ptr = collector.GetNewProcessStat(1); + auto ptr = collector.ReadNewProcessStat(1); APSARA_TEST_NOT_EQUAL(nullptr, ptr); APSARA_TEST_EQUAL(1, ptr->pid); APSARA_TEST_EQUAL("cat", ptr->name); diff --git a/core/unittest/prometheus/ScrapeSchedulerUnittest.cpp b/core/unittest/prometheus/ScrapeSchedulerUnittest.cpp index 9b2f4f30e6..2be9643ba7 100644 --- a/core/unittest/prometheus/ScrapeSchedulerUnittest.cpp +++ b/core/unittest/prometheus/ScrapeSchedulerUnittest.cpp @@ -54,6 +54,8 @@ class ScrapeSchedulerUnittest : public testing::Test { mScrapeConfig->mRequestHeaders = {{"Authorization", "Bearer xxxxx"}}; } + void TearDown() override { Timer::GetInstance()->Clear(); } + private: std::shared_ptr mScrapeConfig; };