Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: prom agent info for HPA and Rebalance #2028

Merged
merged 58 commits into from
Feb 28, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
7af6044
feat: prom agent basic info and targets info
catdogpandas Jan 6, 2025
39c7284
feat: prom agent info basic health value
catdogpandas Jan 6, 2025
27a1849
chore: remove unnecessory logs
catdogpandas Jan 6, 2025
bb81d0e
feat: use response size instead of series
catdogpandas Jan 8, 2025
78c78a8
chore: remove job name in targets info
catdogpandas Jan 8, 2025
08e51b9
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 8, 2025
4399c2b
feat: update labels hash calc
catdogpandas Jan 8, 2025
51fe67d
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 8, 2025
f479d2d
feat: update instance calc
catdogpandas Jan 9, 2025
f9ac8bc
Revert "feat: update instance calc"
catdogpandas Jan 9, 2025
bd68622
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 9, 2025
32f3a57
chore: update instance
catdogpandas Jan 9, 2025
d811148
chore: update
catdogpandas Jan 9, 2025
c74b96c
feat: calc hash
catdogpandas Jan 9, 2025
5ca13cb
chore: update
catdogpandas Jan 9, 2025
2cb6eb3
update
catdogpandas Jan 13, 2025
0874105
update
catdogpandas Jan 13, 2025
60d1a82
update
catdogpandas Jan 14, 2025
3c43d20
feat: update health calc
catdogpandas Jan 14, 2025
502b9d6
update
catdogpandas Jan 14, 2025
6c9d94c
update
catdogpandas Jan 14, 2025
1a63a12
update
catdogpandas Jan 14, 2025
9f15143
update
catdogpandas Jan 14, 2025
dbdc8c6
update
catdogpandas Jan 14, 2025
7eea950
feat: add lc_target_hash
catdogpandas Jan 14, 2025
eea0acd
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 14, 2025
46cfaf4
update
catdogpandas Jan 15, 2025
728a34d
update
catdogpandas Jan 15, 2025
4db3f54
prom job
catdogpandas Jan 15, 2025
0a6d8c8
update
catdogpandas Jan 15, 2025
7ff69db
update
catdogpandas Jan 15, 2025
58d713c
chore: update ut
catdogpandas Jan 17, 2025
f89dc8c
chore: update ut
catdogpandas Jan 17, 2025
574fd92
update
catdogpandas Jan 17, 2025
640729b
update
catdogpandas Jan 17, 2025
b67091a
chore: update code style
catdogpandas Jan 23, 2025
9eacb7b
Merge branch 'main' into feat/prom-agent-info
catdogpandas Jan 23, 2025
b24f0f6
update code style
catdogpandas Jan 23, 2025
bcb3c42
update code style
catdogpandas Jan 23, 2025
123ff49
update code style
catdogpandas Jan 23, 2025
ecb992d
chore: update
catdogpandas Jan 24, 2025
3c61cf7
chore: update
catdogpandas Jan 24, 2025
3b82b6d
chore: update code style
catdogpandas Jan 24, 2025
408cf35
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 11, 2025
8e02f74
chore: remove health ut
catdogpandas Feb 11, 2025
94558d0
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 19, 2025
6d508cd
chore: update
catdogpandas Feb 19, 2025
315925f
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 25, 2025
ccb5f20
chore: fix ut
catdogpandas Feb 25, 2025
cb0d2ea
chore: add ut
catdogpandas Feb 25, 2025
0c0acba
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 27, 2025
690e009
chore: update
catdogpandas Feb 27, 2025
2794943
feat: add http sink info
catdogpandas Feb 27, 2025
a9711a3
chore: update
catdogpandas Feb 27, 2025
c444ee8
Merge branch 'main' into feat/prom-agent-info
catdogpandas Feb 27, 2025
baa25ae
chore: update
catdogpandas Feb 27, 2025
2718e8e
chore: update
catdogpandas Feb 27, 2025
a643527
chore: update
catdogpandas Feb 27, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -173,18 +173,22 @@ void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& eGroup
LOG_ERROR(sLogger, ("scrape_timestamp_milliseconds is not set", ""));
return;
}
if (!eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID)) {
LOG_ERROR(sLogger, ("prometheus stream id", ""));
return;
}
targetTags[prometheus::LC_TARGET_HASH] = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID);

StringView scrapeTimestampMilliSecStr = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC);
auto timestampMilliSec = StringTo<uint64_t>(scrapeTimestampMilliSecStr.to_string());
auto timestamp = timestampMilliSec / 1000;
auto nanoSec = timestampMilliSec % 1000 * 1000000;


AddMetric(
eGroup, prometheus::SCRAPE_DURATION_SECONDS, autoMetric.mScrapeDurationSeconds, timestamp, nanoSec, targetTags);

AddMetric(eGroup,
prometheus::SCRAPE_RESPONSE_SIZE_BYTES,
prometheus::SCRAPE_BODY_SIZE_BYTES,
autoMetric.mScrapeResponseSizeBytes,
timestamp,
nanoSec,
Expand Down
17 changes: 16 additions & 1 deletion core/prometheus/Constants.h
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ const uint64_t RefeshIntervalSeconds = 5;
const char* const META = "__meta_";
const char* const UNDEFINED = "undefined";
const std::string PROMETHEUS = "prometheus";
const char* const LC_TARGET_HASH = "lc_target_hash";

// relabel config
const char* const SOURCE_LABELS = "source_labels";
Expand Down Expand Up @@ -100,11 +101,13 @@ const char* const SCHEME_LABEL_NAME = "__scheme__";
const char* const METRICS_PATH_LABEL_NAME = "__metrics_path__";
const char* const PARAM_LABEL_NAME = "__param_";
const char* const LABELS = "labels";
const char* const TARGET_HASH = "hash";
const char* const REBALANCE_MS = "rebalance_ms";

// auto metrics
const char* const SCRAPE_STATE = "scrape_state";
const char* const SCRAPE_DURATION_SECONDS = "scrape_duration_seconds";
const char* const SCRAPE_RESPONSE_SIZE_BYTES = "scrape_response_size_bytes";
const char* const SCRAPE_BODY_SIZE_BYTES = "scrape_body_size_bytes";
const char* const SCRAPE_SAMPLES_LIMIT = "scrape_samples_limit";
const char* const SCRAPE_SAMPLES_POST_METRIC_RELABELING = "scrape_samples_post_metric_relabeling";
const char* const SCRAPE_SAMPLES_SCRAPED = "scrape_samples_scraped";
Expand All @@ -119,4 +122,16 @@ const char* const ACCEPT_ENCODING = "Accept-Encoding";
const char* const GZIP = "gzip";
const char* const IDENTITY = "identity";

const char* const AGENT_INFO = "AgentInfo";
const char* const TARGETS_INFO = "TargetsInfo";
const char* const CPU_LIMIT = "CpuLimit";
const char* const CPU_USAGE = "CpuUsage";
const char* const MEM_LIMIT = "MemLimit";
const char* const MEM_USAGE = "MemUsage";
const char* const HTTP_SINK_IN_ITEMS_TOTAL = "HttpSinkInItemsTotal";
const char* const HTTP_SINK_OUT_FAILED = "HttpSinkOutFailed";
const char* const HASH = "Hash";
const char* const SIZE = "Size";
const char* const SCRAPE_DELAY_SECONDS = "ScrapeDelaySeconds";

} // namespace logtail::prometheus
4 changes: 2 additions & 2 deletions core/prometheus/PrometheusInputRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,14 +25,12 @@
#include "common/Flags.h"
#include "common/JsonUtil.h"
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "common/http/AsynCurlRunner.h"
#include "common/http/Constant.h"
#include "common/http/Curl.h"
#include "common/timer/Timer.h"
#include "logger/Logger.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "prometheus/Constants.h"
#include "prometheus/Utils.h"

Expand Down Expand Up @@ -173,6 +171,8 @@ void PrometheusInputRunner::Init() {
mUnRegisterMs = 0;
} else {
mUnRegisterMs.store(StringTo<uint64_t>(tmpStr));
// adjust unRegisterMs to scrape targets for zero-cost
mUnRegisterMs -= 1000;
LOG_INFO(sLogger, ("unRegisterMs", ToString(mUnRegisterMs)));
}
}
Expand Down
4 changes: 2 additions & 2 deletions core/prometheus/component/StreamScraper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ void StreamScraper::PushEventGroup(PipelineEventGroup&& eGroup) const {
void StreamScraper::SendMetrics() {
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC,
ToString(mScrapeTimestampMilliSec));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId() + ToString(mScrapeTimestampMilliSec));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId());

SetTargetLabels(mEventGroup);
PushEventGroup(std::move(mEventGroup));
Expand All @@ -132,7 +132,7 @@ void StreamScraper::SetAutoMetricMeta(double scrapeDurationSeconds, bool upState
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION, ToString(scrapeDurationSeconds));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE, ToString(mRawSize));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE, ToString(upState));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId() + ToString(mScrapeTimestampMilliSec));
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId());
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL, ToString(mStreamIndex));
}
std::string StreamScraper::GetId() {
Expand Down
24 changes: 20 additions & 4 deletions core/prometheus/labels/Labels.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,10 +111,26 @@ void Labels::Range(const std::function<void(const string& k, const string& v)>&
uint64_t Labels::Hash() {
string hash;
uint64_t sum = prometheus::OFFSET64;
Range([&hash](const string& k, const string& v) { hash += k + "\xff" + v + "\xff"; });
for (auto i : hash) {
sum ^= (uint64_t)i;
sum *= prometheus::PRIME64;
vector<string> names;
Range([&names](const string& k, const string&) { names.push_back(k); });
sort(names.begin(), names.end());
auto calc = [](uint64_t h, uint64_t c) {
h ^= (uint64_t)c;
h *= prometheus::PRIME64;
return h;
};
auto calcString = [](uint64_t h, const string& s) {
for (auto c : s) {
h ^= (uint64_t)c;
h *= prometheus::PRIME64;
}
return h;
};
for (const auto& name : names) {
sum = calcString(sum, name);
sum = calc(sum, 255);
sum = calcString(sum, Get(name));
sum = calc(sum, 255);
}
return sum;
}
Expand Down
4 changes: 4 additions & 0 deletions core/prometheus/schedulers/BaseScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,10 @@ using namespace std;
namespace logtail {
void BaseScheduler::ExecDone() {
mExecCount++;
while (mLatestExecTime > mFirstExecTime + chrono::seconds(mExecCount * mInterval)) {
mExecCount++;
}

mLatestExecTime = mFirstExecTime + chrono::seconds(mExecCount * mInterval);
mLatestScrapeTime = mFirstScrapeTime + chrono::seconds(mExecCount * mInterval);
}
Expand Down
39 changes: 20 additions & 19 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,27 +40,25 @@ using namespace std;
namespace logtail {

ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
std::string host,
string host,
int32_t port,
std::string scheme,
std::string metricsPath,
uint64_t scrapeIntervalSeconds,
uint64_t scrapeTimeoutSeconds,
Labels labels,
QueueKey queueKey,
size_t inputIndex)
size_t inputIndex,
const PromTargetInfo& targetInfo)
: mScrapeConfigPtr(std::move(scrapeConfigPtr)),
mHost(std::move(host)),
mPort(port),
mTargetInfo(targetInfo),
mMetricsPath(std::move(metricsPath)),
mScheme(std::move(scheme)),
mScrapeTimeoutSeconds(scrapeTimeoutSeconds),
mQueueKey(queueKey),
mInputIndex(inputIndex),
mTargetLabels(labels) {
string tmpTargetURL = mScheme + "://" + mHost + ":" + ToString(mPort) + mMetricsPath;
mHash = mScrapeConfigPtr->mJobName + tmpTargetURL + ToString(labels.Hash());
mInstance = mHost + ":" + ToString(mPort);
mScrapeResponseSizeBytes(-1) {
mInterval = scrapeIntervalSeconds;
}

Expand All @@ -78,7 +76,9 @@ void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t) {

const auto& networkStatus = response.GetNetworkStatus();
string scrapeState;
mUpState = false;
auto scrapeDurationSeconds = scrapeDurationMilliSeconds * sRate;
auto upState = false;

if (networkStatus.mCode != NetworkCode::Ok) {
// not 0 means curl error
scrapeState = prom::NetworkCodeToState(networkStatus.mCode);
Expand All @@ -87,32 +87,31 @@ void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t) {
} else {
// 0 means success
scrapeState = prom::NetworkCodeToState(NetworkCode::Ok);
mUpState = true;
upState = true;
}

mScrapeDurationSeconds = scrapeDurationMilliSeconds * sRate;
if (response.GetStatusCode() != 200) {
LOG_WARNING(sLogger,
("scrape failed, status code",
response.GetStatusCode())("target", mHash)("curl msg", response.GetNetworkStatus().mMessage));
("scrape failed, status code", response.GetStatusCode())("target", mTargetInfo.mHash)(
"curl msg", response.GetNetworkStatus().mMessage));
}

auto mScrapeDurationSeconds = scrapeDurationMilliSeconds * sRate;
streamScraper->mStreamIndex++;

if (mUpState) {
streamScraper->mStreamIndex++;
if (upState) {
streamScraper->FlushCache();
}
streamScraper->SetAutoMetricMeta(mScrapeDurationSeconds, mUpState, scrapeState);
streamScraper->SetAutoMetricMeta(scrapeDurationSeconds, upState, scrapeState);
streamScraper->SendMetrics();
mScrapeResponseSizeBytes = streamScraper->mRawSize;
streamScraper->Reset();

ADD_COUNTER(mPluginTotalDelayMs, scrapeDurationMilliSeconds);
}


string ScrapeScheduler::GetId() const {
return mHash;
return mTargetInfo.mHash;
}

uint64_t ScrapeScheduler::GetScrapeIntervalSeconds() const {
Expand All @@ -137,6 +136,7 @@ void ScrapeScheduler::ScheduleNext() {
return true;
}
this->DelayExecTime(1);
this->mExecDelayCount++;
ADD_COUNTER(this->mPromDelayTotal, 1);
this->ScheduleNext();
return false;
Expand Down Expand Up @@ -185,7 +185,8 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
mScrapeConfigPtr->mRequestHeaders,
"",
HttpResponse(
new prom::StreamScraper(mTargetLabels, mQueueKey, mInputIndex, mHash, mEventPool, mLatestScrapeTime),
new prom::StreamScraper(
mTargetInfo.mLabels, mQueueKey, mInputIndex, mTargetInfo.mHash, mEventPool, mLatestScrapeTime),
[](void* p) { delete static_cast<prom::StreamScraper*>(p); },
prom::StreamScraper::MetricWriteCallback),
mScrapeTimeoutSeconds,
Expand Down Expand Up @@ -215,7 +216,7 @@ void ScrapeScheduler::Cancel() {
void ScrapeScheduler::InitSelfMonitor(const MetricLabels& defaultLabels) {
mSelfMonitor = std::make_shared<PromSelfMonitorUnsafe>();
MetricLabels labels = defaultLabels;
labels.emplace_back(METRIC_LABEL_KEY_INSTANCE, mInstance);
labels.emplace_back(METRIC_LABEL_KEY_INSTANCE, mTargetInfo.mInstance);

static const std::unordered_map<std::string, MetricType> sScrapeMetricKeys
= {{METRIC_PLUGIN_OUT_EVENTS_TOTAL, MetricType::METRIC_TYPE_COUNTER},
Expand Down
27 changes: 16 additions & 11 deletions core/prometheus/schedulers/ScrapeScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@
#include "common/http/HttpResponse.h"
#include "monitor/metric_models/MetricTypes.h"
#include "prometheus/PromSelfMonitor.h"
#include "prometheus/component/StreamScraper.h"
#include "prometheus/schedulers/ScrapeConfig.h"

#ifdef APSARA_UNIT_TEST_MAIN
Expand All @@ -33,7 +32,16 @@

namespace logtail {

struct PromTargetInfo {
Labels mLabels;
std::string mInstance;
std::string mHash;
uint64_t mRebalanceMs = 0;
};

class ScrapeScheduler : public BaseScheduler {
friend class TargetSubscriberScheduler;

public:
ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
std::string host,
Expand All @@ -42,9 +50,9 @@ class ScrapeScheduler : public BaseScheduler {
std::string metricsPath,
uint64_t scrapeIntervalSeconds,
uint64_t scrapeTimeoutSeconds,
Labels labels,
QueueKey queueKey,
size_t inputIndex);
size_t inputIndex,
const PromTargetInfo& targetInfo);
ScrapeScheduler(const ScrapeScheduler&) = delete;
~ScrapeScheduler() override = default;

Expand All @@ -54,7 +62,9 @@ class ScrapeScheduler : public BaseScheduler {
uint64_t GetScrapeIntervalSeconds() const;

void SetComponent(EventPool* eventPool);
int64_t GetLastScrapeSize() const { return mScrapeResponseSizeBytes; }

uint64_t GetReBalanceMs() const { return mTargetInfo.mRebalanceMs; }
void ScheduleNext() override;
void ScrapeOnce(std::chrono::steady_clock::time_point execTime);
void Cancel() override;
Expand All @@ -64,10 +74,10 @@ class ScrapeScheduler : public BaseScheduler {
std::unique_ptr<TimerEvent> BuildScrapeTimerEvent(std::chrono::steady_clock::time_point execTime);

std::shared_ptr<ScrapeConfig> mScrapeConfigPtr;
std::string mHash;
std::atomic_int mExecDelayCount = 0;
std::string mHost;
int32_t mPort;
std::string mInstance;
PromTargetInfo mTargetInfo;
std::string mMetricsPath;
std::string mScheme;
uint64_t mScrapeTimeoutSeconds;
Expand All @@ -76,13 +86,8 @@ class ScrapeScheduler : public BaseScheduler {
QueueKey mQueueKey;
size_t mInputIndex;

Labels mTargetLabels;

// auto metrics
uint64_t mScrapeTimestampMilliSec = 0;
double mScrapeDurationSeconds = 0;
uint64_t mScrapeResponseSizeBytes = 0;
bool mUpState = true;
std::atomic_int mScrapeResponseSizeBytes;

// self monitor
std::shared_ptr<PromSelfMonitorUnsafe> mSelfMonitor;
Expand Down
Loading
Loading