From 14b47f6e6678bd60546b8d88d0f59dd7f99fa8ae Mon Sep 17 00:00:00 2001 From: liqiang Date: Thu, 28 Nov 2024 12:37:27 +0000 Subject: [PATCH] update --- .../inner/ProcessorPromParseMetricNative.cpp | 21 ------------------- .../inner/ProcessorPromParseMetricNative.h | 16 -------------- .../prometheus/schedulers/ScrapeScheduler.cpp | 4 ++++ core/prometheus/schedulers/ScrapeScheduler.h | 1 + 4 files changed, 5 insertions(+), 37 deletions(-) diff --git a/core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp b/core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp index 532ba3d880..9950e9df29 100644 --- a/core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp +++ b/core/plugin/processor/inner/ProcessorPromParseMetricNative.cpp @@ -43,27 +43,6 @@ void ProcessorPromParseMetricNative::Process(PipelineEventGroup& eGroup) { for (auto& rawEvent : rawEvents) { ProcessEvent(rawEvent, events, eGroup, parser); } - - auto streamID = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID).to_string(); - // cache the metrics count - { - Lock(); - mStreamCounter.Add(streamID); - mMetricCountCache[streamID] += events.size(); - if (eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL)) { - mStreamCounter.SetTotal( - streamID, - StringTo(eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL).to_string())); - } - // add auto metric,if this is the last one of the stream - if (mStreamCounter.IsLast(streamID)) { - eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED, ToString(mMetricCountCache[streamID])); - // erase the cache - mMetricCountCache.erase(streamID); - mStreamCounter.Erase(streamID); - } - UnLock(); - } } bool ProcessorPromParseMetricNative::IsSupportedEvent(const PipelineEventPtr& e) const { diff --git a/core/plugin/processor/inner/ProcessorPromParseMetricNative.h b/core/plugin/processor/inner/ProcessorPromParseMetricNative.h index dff53e648a..f72327ce62 100644 --- a/core/plugin/processor/inner/ProcessorPromParseMetricNative.h +++ b/core/plugin/processor/inner/ProcessorPromParseMetricNative.h @@ -1,12 +1,10 @@ #pragma once #include -#include #include "models/PipelineEventGroup.h" #include "models/PipelineEventPtr.h" #include "pipeline/plugin/interface/Processor.h" -#include "prometheus/labels/StreamCounter.h" #include "prometheus/labels/TextParser.h" #include "prometheus/schedulers/ScrapeConfig.h" @@ -29,22 +27,8 @@ class ProcessorPromParseMetricNative : public Processor { void AddEvent(const char* data, size_t size, EventsContainer& events, PipelineEventGroup& eGroup, TextParser& parser); - void Lock() { - if (INT32_FLAG(process_thread_count) > 1) { - mStreamMutex.lock(); - } - } - void UnLock() { - if (INT32_FLAG(process_thread_count) > 1) { - mStreamMutex.unlock(); - } - } std::unique_ptr mScrapeConfigPtr; - std::mutex mStreamMutex; - prom::StreamCounter mStreamCounter; - std::unordered_map mMetricCountCache; - #ifdef APSARA_UNIT_TEST_MAIN friend class InputPrometheusUnittest; #endif diff --git a/core/prometheus/schedulers/ScrapeScheduler.cpp b/core/prometheus/schedulers/ScrapeScheduler.cpp index 4b25e3a2da..f6c2b557c3 100644 --- a/core/prometheus/schedulers/ScrapeScheduler.cpp +++ b/core/prometheus/schedulers/ScrapeScheduler.cpp @@ -78,6 +78,7 @@ size_t ScrapeScheduler::PromMetricWriteCallback(char* buffer, size_t size, size_ body->SetTargetLabels(body->mEventGroup); body->PushEventGroup(std::move(body->mEventGroup)); + body->mScrapeSamplesScraped += body->mEventGroup.GetEvents().size(); body->mStreamIndex++; body->mEventGroup = PipelineEventGroup(std::make_shared()); body->mCurrStreamSize = 0; @@ -126,6 +127,7 @@ void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t timestampM auto& responseBody = *response.GetBody(); responseBody.FlushCache(); mStreamIndex++; + responseBody.mScrapeSamplesScraped += responseBody.mEventGroup.GetEvents().size(); mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_EVENTS_TOTAL, response.GetStatusCode()); mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_SIZE_BYTES, response.GetStatusCode(), responseBody.mRawSize); mSelfMonitor->AddCounter( @@ -154,12 +156,14 @@ void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t timestampM responseBody.mCurrStreamSize = 0; responseBody.mCache.clear(); responseBody.mStreamIndex = 0; + responseBody.mScrapeSamplesScraped = 0; mPluginTotalDelayMs->Add(GetCurrentTimeInMilliSeconds() - timestampMilliSec); } void ScrapeScheduler::SetAutoMetricMeta(PipelineEventGroup& eGroup) const { eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC, ToString(mScrapeTimestampMilliSec)); + eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED, ToString(mScrapeSamplesScraped)); eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION, ToString(mScrapeDurationSeconds)); eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE, ToString(mScrapeResponseSizeBytes)); eGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE, ToString(mUpState)); diff --git a/core/prometheus/schedulers/ScrapeScheduler.h b/core/prometheus/schedulers/ScrapeScheduler.h index e8535fefdc..bc4ecc3f4f 100644 --- a/core/prometheus/schedulers/ScrapeScheduler.h +++ b/core/prometheus/schedulers/ScrapeScheduler.h @@ -83,6 +83,7 @@ class ScrapeScheduler : public BaseScheduler { // auto metrics uint64_t mScrapeTimestampMilliSec = 0; + uint64_t mScrapeSamplesScraped = 0; double mScrapeDurationSeconds = 0; uint64_t mScrapeResponseSizeBytes = 0; bool mUpState = true;