Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 27, 2024
1 parent cc1f455 commit 0693bb1
Show file tree
Hide file tree
Showing 13 changed files with 47 additions and 37 deletions.
7 changes: 7 additions & 0 deletions core/common/timer/Timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,13 @@ void Timer::Stop() {
}
}

void Timer::Clear() {
lock_guard<mutex> lock(mQueueMux);
while (!mQueue.empty()) {
mQueue.pop();
}
}

void Timer::PushEvent(unique_ptr<TimerEvent>&& e) {
lock_guard<mutex> lock(mQueueMux);
if (mQueue.empty() || e->GetExecTime() < mQueue.top()->GetExecTime()) {
Expand Down
3 changes: 2 additions & 1 deletion core/common/timer/Timer.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ class Timer {
void Init();
void Stop();
void PushEvent(std::unique_ptr<TimerEvent>&& e);
void Clear();

private:
Timer() = default;
Expand All @@ -57,7 +58,7 @@ class Timer {

std::future<void> mThreadRes;
mutable std::mutex mThreadRunningMux;
bool mIsThreadRunning = true;
bool mIsThreadRunning = false;
mutable std::condition_variable mCV;

#ifdef APSARA_UNIT_TEST_MAIN
Expand Down
6 changes: 6 additions & 0 deletions core/host_monitor/HostMonitorInputRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,17 @@ void HostMonitorInputRunner::UpdateCollector(const std::vector<std::string>& new
for (auto& collector : newCollectors) {
auto oldCollector = mRegisteredCollectorMap.find(collector);
if (oldCollector == mRegisteredCollectorMap.end()) {
if (mCollectorInstanceMap.find(collector) == mCollectorInstanceMap.end()) {
LOG_ERROR(sLogger, ("host monitor", "collector not found")("collector", collector));
continue;
}
mRegisteredCollectorMap[collector] = true;
HostMonitorTimerEvent::CollectConfig collectConfig(
collector, processQueueKey, inputIndex, std::chrono::seconds(DEFAULT_SCHEDULE_INTERVAL));
auto now = std::chrono::steady_clock::now();
auto event = std::make_unique<HostMonitorTimerEvent>(now + collectConfig.mInterval, collectConfig);
Timer::GetInstance()->PushEvent(std::move(event));
LOG_INFO(sLogger, ("host monitor", "add new collector")("collector", collector));
} else {
// config removed and added again, timer event is still in the queue
if (!oldCollector->second) {
Expand Down Expand Up @@ -122,6 +127,7 @@ void HostMonitorInputRunner::ScheduleOnce(HostMonitorTimerEvent::CollectConfig&
return;
}

LOG_DEBUG(sLogger, ("host monitor collect", "collector")(config.mCollectorName, group.GetEvents().size()));
if (group.GetEvents().size() > 0) {
bool result = ProcessorRunner::GetInstance()->PushQueue(
config.mProcessQueueKey, config.mInputIndex, std::move(group));
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputHostMeta.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ bool InputHostMeta::Init(const Json::Value& config, Json::Value& optionalGoPipel
bool InputHostMeta::Start() {
LOG_INFO(sLogger, ("input host meta start", mContext->GetConfigName()));
HostMonitorInputRunner::GetInstance()->Init();
HostMonitorInputRunner::GetInstance()->UpdateCollector({"process"}, mContext->GetProcessQueueKey(), mIndex);
HostMonitorInputRunner::GetInstance()->UpdateCollector({"process_entity"}, mContext->GetProcessQueueKey(), mIndex);
return true;
}

Expand Down
2 changes: 0 additions & 2 deletions core/prometheus/PrometheusInputRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -137,7 +137,6 @@ void PrometheusInputRunner::Init() {
mIsStarted = true;

#ifndef APSARA_UNIT_TEST_MAIN
mTimer->Init();
AsynCurlRunner::GetInstance()->Init();
#endif

Expand Down Expand Up @@ -203,7 +202,6 @@ void PrometheusInputRunner::Stop() {
}

#ifndef APSARA_UNIT_TEST_MAIN
mTimer->Stop();
LOG_INFO(sLogger, ("PrometheusInputRunner", "stop asyn curl runner"));
AsynCurlRunner::GetInstance()->Stop();
#endif
Expand Down
1 change: 0 additions & 1 deletion core/prometheus/PrometheusInputRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,6 @@ class PrometheusInputRunner : public InputRunner {
std::string mPodName;

std::unique_ptr<sdk::CurlClient> mClient;
std::shared_ptr<Timer> mTimer;
EventPool mEventPool;

mutable ReadWriteLock mSubscriberMapRWLock;
Expand Down
1 change: 0 additions & 1 deletion core/prometheus/schedulers/BaseScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ bool BaseScheduler::IsCancelled() {
}

void BaseScheduler::SetComponent(EventPool* eventPool) {
mTimer = Timer::GetInstance();
mEventPool = eventPool;
}
} // namespace logtail
4 changes: 2 additions & 2 deletions core/prometheus/schedulers/BaseScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,8 @@ class BaseScheduler {

std::chrono::steady_clock::time_point GetNextExecTime();

void SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime,std::chrono::system_clock::time_point firstScrapeTime);
void SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime,
std::chrono::system_clock::time_point firstScrapeTime);
void DelayExecTime(uint64_t delaySeconds);
virtual void Cancel();

Expand All @@ -44,7 +45,6 @@ class BaseScheduler {
std::shared_ptr<PromFuture<HttpResponse&, uint64_t>> mFuture;
std::shared_ptr<PromFuture<>> mIsContextValidFuture;

Timer* mTimer;
EventPool* mEventPool = nullptr;
};
} // namespace logtail
6 changes: 2 additions & 4 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -190,7 +190,7 @@ void ScrapeScheduler::ScheduleNext() {
}

auto event = BuildScrapeTimerEvent(GetNextExecTime());
mTimer->PushEvent(std::move(event));
Timer::GetInstance()->PushEvent(std::move(event));
}

void ScrapeScheduler::ScrapeOnce(std::chrono::steady_clock::time_point execTime) {
Expand All @@ -201,9 +201,7 @@ void ScrapeScheduler::ScrapeOnce(std::chrono::steady_clock::time_point execTime)
});
mFuture = future;
auto event = BuildScrapeTimerEvent(execTime);
if (mTimer) {
mTimer->PushEvent(std::move(event));
}
Timer::GetInstance()->PushEvent(std::move(event));
}

std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::steady_clock::time_point execTime) {
Expand Down
40 changes: 18 additions & 22 deletions core/prometheus/schedulers/TargetSubscriberScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -106,25 +106,23 @@ void TargetSubscriberScheduler::UpdateScrapeScheduler(
for (const auto& [k, v] : newScrapeSchedulerMap) {
if (mScrapeSchedulerMap.find(k) == mScrapeSchedulerMap.end()) {
mScrapeSchedulerMap[k] = v;
if (mTimer) {
auto tmpCurrentMilliSeconds = GetCurrentTimeInMilliSeconds();
auto tmpRandSleepMilliSec = GetRandSleepMilliSec(
v->GetId(), mScrapeConfigPtr->mScrapeIntervalSeconds, tmpCurrentMilliSeconds);

// zero-cost upgrade
if (mUnRegisterMs > 0
&& (tmpCurrentMilliSeconds + tmpRandSleepMilliSec
- (uint64_t)mScrapeConfigPtr->mScrapeIntervalSeconds * 1000
> mUnRegisterMs)
&& (tmpCurrentMilliSeconds + tmpRandSleepMilliSec
- (uint64_t)mScrapeConfigPtr->mScrapeIntervalSeconds * 1000 * 2
< mUnRegisterMs)) {
// scrape once just now
LOG_INFO(sLogger, ("scrape zero cost", ToString(tmpCurrentMilliSeconds)));
v->ScrapeOnce(std::chrono::steady_clock::now());
}
v->ScheduleNext();
auto tmpCurrentMilliSeconds = GetCurrentTimeInMilliSeconds();
auto tmpRandSleepMilliSec = GetRandSleepMilliSec(
v->GetId(), mScrapeConfigPtr->mScrapeIntervalSeconds, tmpCurrentMilliSeconds);

// zero-cost upgrade
if (mUnRegisterMs > 0
&& (tmpCurrentMilliSeconds + tmpRandSleepMilliSec
- (uint64_t)mScrapeConfigPtr->mScrapeIntervalSeconds * 1000
> mUnRegisterMs)
&& (tmpCurrentMilliSeconds + tmpRandSleepMilliSec
- (uint64_t)mScrapeConfigPtr->mScrapeIntervalSeconds * 1000 * 2
< mUnRegisterMs)) {
// scrape once just now
LOG_INFO(sLogger, ("scrape zero cost", ToString(tmpCurrentMilliSeconds)));
v->ScrapeOnce(std::chrono::steady_clock::now());
}
v->ScheduleNext();
}
}
}
Expand Down Expand Up @@ -261,7 +259,7 @@ void TargetSubscriberScheduler::ScheduleNext() {
}

auto event = BuildSubscriberTimerEvent(GetNextExecTime());
mTimer->PushEvent(std::move(event));
Timer::GetInstance()->PushEvent(std::move(event));
}

void TargetSubscriberScheduler::Cancel() {
Expand All @@ -281,9 +279,7 @@ void TargetSubscriberScheduler::SubscribeOnce(std::chrono::steady_clock::time_po
});
mFuture = future;
auto event = BuildSubscriberTimerEvent(execTime);
if (mTimer) {
mTimer->PushEvent(std::move(event));
}
Timer::GetInstance()->PushEvent(std::move(event));
}

std::unique_ptr<TimerEvent>
Expand Down
8 changes: 6 additions & 2 deletions core/unittest/host_monitor/HostMonitorInputRunnerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,12 @@ class HostMonitorInputRunnerUnittest : public testing::Test {
public:
void TestUpdateAndRemoveCollector() const;
void TestScheduleOnce() const;

private:
void TearDown() override {
HostMonitorInputRunner::GetInstance()->Stop();
Timer::GetInstance()->Clear();
}
};

void HostMonitorInputRunnerUnittest::TestUpdateAndRemoveCollector() const {
Expand Down Expand Up @@ -67,8 +73,6 @@ void HostMonitorInputRunnerUnittest::TestScheduleOnce() const {
APSARA_TEST_TRUE_FATAL(ProcessQueueManager::GetInstance()->PopItem(0, item, configName));
APSARA_TEST_EQUAL_FATAL("test", configName);

// verify schdule next
APSARA_TEST_EQUAL_FATAL(Timer::GetInstance()->mQueue.size(), 1);
runner->mThreadPool.Stop();
runner->Stop();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ class ProcessEntityCollectorUnittest : public testing::Test {
void ProcessEntityCollectorUnittest::TestGetNewProcessStat() const {
PROCESS_DIR = ".";
auto collector = ProcessEntityCollector();
auto ptr = collector.GetNewProcessStat(1);
auto ptr = collector.ReadNewProcessStat(1);
APSARA_TEST_NOT_EQUAL(nullptr, ptr);
APSARA_TEST_EQUAL(1, ptr->pid);
APSARA_TEST_EQUAL("cat", ptr->name);
Expand Down
2 changes: 2 additions & 0 deletions core/unittest/prometheus/ScrapeSchedulerUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ class ScrapeSchedulerUnittest : public testing::Test {
mScrapeConfig->mRequestHeaders = {{"Authorization", "Bearer xxxxx"}};
}

void TearDown() override { Timer::GetInstance()->Clear(); }

private:
std::shared_ptr<ScrapeConfig> mScrapeConfig;
};
Expand Down

0 comments on commit 0693bb1

Please sign in to comment.