Skip to content

Commit 0544cfc

Browse files
authored
feat: prom stream scrape and stream process (#1925)
1 parent 9800d9e commit 0544cfc

19 files changed

+568
-215
lines changed

core/CMakeLists.txt

+1-1
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ set(SUB_DIRECTORIES_LIST
127127
runner runner/sink/http
128128
protobuf/sls protobuf/models
129129
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
130-
prometheus prometheus/labels prometheus/schedulers prometheus/async
130+
prometheus prometheus/labels prometheus/schedulers prometheus/async prometheus/component
131131
ebpf ebpf/observer ebpf/security ebpf/handler
132132
parser sls_control sdk
133133
)

core/models/PipelineEventGroup.h

+2
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,8 @@ enum class EventGroupMetaKey {
5858
PROMETHEUS_SAMPLES_SCRAPED,
5959
PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC,
6060
PROMETHEUS_UP_STATE,
61+
PROMETHEUS_STREAM_ID,
62+
PROMETHEUS_STREAM_TOTAL,
6163

6264
SOURCE_ID
6365
};

core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp

+64-48
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,12 @@ void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& metricGroup)
7171
metricGroup.DelTag(k);
7272
}
7373

74-
AddAutoMetrics(metricGroup);
74+
if (metricGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL)) {
75+
auto autoMetric = prom::AutoMetric();
76+
UpdateAutoMetrics(metricGroup, autoMetric);
77+
AddAutoMetrics(metricGroup, autoMetric);
78+
}
79+
7580

7681
// delete all tags
7782
for (const auto& [k, v] : targetTags) {
@@ -138,78 +143,89 @@ vector<StringView> ProcessorPromRelabelMetricNative::GetToDeleteTargetLabels(con
138143
return toDelete;
139144
}
140145

141-
void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& metricGroup) {
142-
// if up is set, then add self monitor metrics
143-
if (metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE).empty()) {
144-
return;
146+
void ProcessorPromRelabelMetricNative::UpdateAutoMetrics(const PipelineEventGroup& eGroup,
147+
prom::AutoMetric& autoMetric) const {
148+
if (eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION)) {
149+
autoMetric.mScrapeDurationSeconds
150+
= StringTo<double>(eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION).to_string());
145151
}
152+
if (eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE)) {
153+
autoMetric.mScrapeResponseSizeBytes
154+
= StringTo<uint64_t>(eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE).to_string());
155+
}
156+
autoMetric.mScrapeSamplesLimit = mScrapeConfigPtr->mSampleLimit;
157+
if (eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED)) {
158+
autoMetric.mScrapeSamplesScraped
159+
= StringTo<uint64_t>(eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED).to_string());
160+
}
161+
autoMetric.mScrapeTimeoutSeconds = mScrapeConfigPtr->mScrapeTimeoutSeconds;
146162

147-
auto targetTags = metricGroup.GetTags();
163+
if (eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE)) {
164+
autoMetric.mScrapeState = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE).to_string();
165+
}
166+
167+
if (eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE)) {
168+
autoMetric.mUp = StringTo<bool>(eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE).to_string());
169+
}
170+
}
171+
172+
void ProcessorPromRelabelMetricNative::AddAutoMetrics(PipelineEventGroup& eGroup,
173+
const prom::AutoMetric& autoMetric) const {
174+
auto targetTags = eGroup.GetTags();
175+
if (!eGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC)) {
176+
LOG_ERROR(sLogger, ("scrape_timestamp_milliseconds is not set", ""));
177+
return;
178+
}
148179

149-
StringView scrapeTimestampMilliSecStr
150-
= metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC);
180+
StringView scrapeTimestampMilliSecStr = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC);
151181
auto timestampMilliSec = StringTo<uint64_t>(scrapeTimestampMilliSecStr.to_string());
152182
auto timestamp = timestampMilliSec / 1000;
153183
auto nanoSec = timestampMilliSec % 1000 * 1000000;
154184

155-
uint64_t samplesPostMetricRelabel = metricGroup.GetEvents().size();
156-
157-
auto scrapeDurationSeconds
158-
= StringTo<double>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION).to_string());
159-
160-
AddMetric(metricGroup, prometheus::SCRAPE_DURATION_SECONDS, scrapeDurationSeconds, timestamp, nanoSec, targetTags);
161185

162-
auto scrapeResponseSize
163-
= StringTo<uint64_t>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE).to_string());
164-
AddMetric(metricGroup, prometheus::SCRAPE_RESPONSE_SIZE_BYTES, scrapeResponseSize, timestamp, nanoSec, targetTags);
186+
AddMetric(
187+
eGroup, prometheus::SCRAPE_DURATION_SECONDS, autoMetric.mScrapeDurationSeconds, timestamp, nanoSec, targetTags);
165188

166-
if (mScrapeConfigPtr->mSampleLimit > 0) {
167-
AddMetric(metricGroup,
168-
prometheus::SCRAPE_SAMPLES_LIMIT,
169-
mScrapeConfigPtr->mSampleLimit,
170-
timestamp,
171-
nanoSec,
172-
targetTags);
173-
}
174-
175-
AddMetric(metricGroup,
176-
prometheus::SCRAPE_SAMPLES_POST_METRIC_RELABELING,
177-
samplesPostMetricRelabel,
189+
AddMetric(eGroup,
190+
prometheus::SCRAPE_RESPONSE_SIZE_BYTES,
191+
autoMetric.mScrapeResponseSizeBytes,
178192
timestamp,
179193
nanoSec,
180194
targetTags);
181195

182-
auto samplesScraped
183-
= StringTo<uint64_t>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED).to_string());
196+
if (autoMetric.mScrapeSamplesLimit > 0) {
197+
AddMetric(
198+
eGroup, prometheus::SCRAPE_SAMPLES_LIMIT, autoMetric.mScrapeSamplesLimit, timestamp, nanoSec, targetTags);
199+
}
184200

185-
AddMetric(metricGroup, prometheus::SCRAPE_SAMPLES_SCRAPED, samplesScraped, timestamp, nanoSec, targetTags);
201+
// AddMetric(eGroup,
202+
// prometheus::SCRAPE_SAMPLES_POST_METRIC_RELABELING,
203+
// autoMetric.mPostRelabel,
204+
// timestamp,
205+
// nanoSec,
206+
// targetTags);
186207

187-
AddMetric(metricGroup,
188-
prometheus::SCRAPE_TIMEOUT_SECONDS,
189-
mScrapeConfigPtr->mScrapeTimeoutSeconds,
190-
timestamp,
191-
nanoSec,
192-
targetTags);
208+
AddMetric(
209+
eGroup, prometheus::SCRAPE_SAMPLES_SCRAPED, autoMetric.mScrapeSamplesScraped, timestamp, nanoSec, targetTags);
193210

194-
// up metric must be the last one
195-
bool upState = StringTo<bool>(metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE).to_string());
211+
AddMetric(
212+
eGroup, prometheus::SCRAPE_TIMEOUT_SECONDS, autoMetric.mScrapeTimeoutSeconds, timestamp, nanoSec, targetTags);
196213

197-
if (metricGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE)) {
198-
auto scrapeState = metricGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE);
199-
AddMetric(metricGroup, prometheus::SCRAPE_STATE, 1.0 * upState, timestamp, nanoSec, targetTags);
200-
auto& last = metricGroup.MutableEvents()[metricGroup.GetEvents().size() - 1];
201-
last.Cast<MetricEvent>().SetTag(METRIC_LABEL_KEY_STATUS, scrapeState);
202-
}
214+
AddMetric(eGroup, prometheus::SCRAPE_STATE, 1.0 * autoMetric.mUp, timestamp, nanoSec, targetTags);
215+
auto& last = eGroup.MutableEvents()[eGroup.GetEvents().size() - 1];
216+
auto scrapeState = eGroup.GetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE);
217+
last.Cast<MetricEvent>().SetTag(METRIC_LABEL_KEY_STATUS, scrapeState);
203218

204-
AddMetric(metricGroup, prometheus::UP, 1.0 * upState, timestamp, nanoSec, targetTags);
219+
// up metric must be the last one
220+
AddMetric(eGroup, prometheus::UP, 1.0 * autoMetric.mUp, timestamp, nanoSec, targetTags);
205221
}
206222

207223
void ProcessorPromRelabelMetricNative::AddMetric(PipelineEventGroup& metricGroup,
208224
const string& name,
209225
double value,
210226
time_t timestamp,
211227
uint32_t nanoSec,
212-
const GroupTags& targetTags) {
228+
const GroupTags& targetTags) const {
213229
auto* metricEvent = metricGroup.AddMetricEvent(true);
214230
metricEvent->SetName(name);
215231
metricEvent->SetValue<UntypedSingleValue>(value);

core/plugin/processor/inner/ProcessorPromRelabelMetricNative.h

+17-2
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,20 @@
2424
#include "prometheus/schedulers/ScrapeConfig.h"
2525

2626
namespace logtail {
27+
28+
namespace prom {
29+
struct AutoMetric {
30+
double mScrapeDurationSeconds;
31+
uint64_t mScrapeResponseSizeBytes;
32+
uint64_t mScrapeSamplesLimit;
33+
// uint64_t mPostRelabel;
34+
uint64_t mScrapeSamplesScraped;
35+
uint64_t mScrapeTimeoutSeconds;
36+
std::string mScrapeState;
37+
bool mUp;
38+
};
39+
} // namespace prom
40+
2741
class ProcessorPromRelabelMetricNative : public Processor {
2842
public:
2943
static const std::string sName;
@@ -39,13 +53,14 @@ class ProcessorPromRelabelMetricNative : public Processor {
3953
bool ProcessEvent(PipelineEventPtr& e, const GroupTags& targetTags, const std::vector<StringView>& toDelete);
4054
std::vector<StringView> GetToDeleteTargetLabels(const GroupTags& targetTags) const;
4155

42-
void AddAutoMetrics(PipelineEventGroup& metricGroup);
56+
void AddAutoMetrics(PipelineEventGroup& eGroup, const prom::AutoMetric& autoMetric) const;
57+
void UpdateAutoMetrics(const PipelineEventGroup& eGroup, prom::AutoMetric& autoMetric) const;
4358
void AddMetric(PipelineEventGroup& metricGroup,
4459
const std::string& name,
4560
double value,
4661
time_t timestamp,
4762
uint32_t nanoSec,
48-
const GroupTags& targetTags);
63+
const GroupTags& targetTags) const;
4964

5065
std::unique_ptr<ScrapeConfig> mScrapeConfigPtr;
5166
std::string mLoongCollectorScraper;

core/prometheus/async/PromHttpRequest.h

-1
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
#include <string>
55

66
#include "common/http/HttpRequest.h"
7-
#include "models/PipelineEventGroup.h"
87
#include "prometheus/async/PromFuture.h"
98

109
namespace logtail {
+132
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
#include "prometheus/component/StreamScraper.h"
2+
3+
#include <cstddef>
4+
#include <memory>
5+
#include <string>
6+
#include <utility>
7+
8+
#include "Flags.h"
9+
#include "Labels.h"
10+
#include "common/StringTools.h"
11+
#include "models/PipelineEventGroup.h"
12+
#include "pipeline/queue/ProcessQueueItem.h"
13+
#include "pipeline/queue/ProcessQueueManager.h"
14+
#include "prometheus/Utils.h"
15+
16+
DEFINE_FLAG_INT64(prom_stream_bytes_size, "stream bytes size", 1024 * 1024);
17+
18+
DEFINE_FLAG_BOOL(enable_prom_stream_scrape, "enable prom stream scrape", true);
19+
20+
using namespace std;
21+
22+
namespace logtail::prom {
23+
size_t StreamScraper::MetricWriteCallback(char* buffer, size_t size, size_t nmemb, void* data) {
24+
uint64_t sizes = size * nmemb;
25+
26+
if (buffer == nullptr || data == nullptr) {
27+
return 0;
28+
}
29+
30+
auto* body = static_cast<StreamScraper*>(data);
31+
32+
size_t begin = 0;
33+
for (size_t end = begin; end < sizes; ++end) {
34+
if (buffer[end] == '\n') {
35+
if (begin == 0 && !body->mCache.empty()) {
36+
body->mCache.append(buffer, end);
37+
body->AddEvent(body->mCache.data(), body->mCache.size());
38+
body->mCache.clear();
39+
} else if (begin != end) {
40+
body->AddEvent(buffer + begin, end - begin);
41+
}
42+
begin = end + 1;
43+
}
44+
}
45+
46+
if (begin < sizes) {
47+
body->mCache.append(buffer + begin, sizes - begin);
48+
}
49+
body->mRawSize += sizes;
50+
body->mCurrStreamSize += sizes;
51+
52+
if (BOOL_FLAG(enable_prom_stream_scrape) && body->mCurrStreamSize >= (size_t)INT64_FLAG(prom_stream_bytes_size)) {
53+
body->mStreamIndex++;
54+
body->SendMetrics();
55+
}
56+
57+
return sizes;
58+
}
59+
60+
void StreamScraper::AddEvent(const char* line, size_t len) {
61+
if (IsValidMetric(StringView(line, len))) {
62+
auto* e = mEventGroup.AddRawEvent(true, mEventPool);
63+
auto sb = mEventGroup.GetSourceBuffer()->CopyString(line, len);
64+
e->SetContentNoCopy(sb);
65+
mScrapeSamplesScraped++;
66+
}
67+
}
68+
69+
void StreamScraper::FlushCache() {
70+
if (!mCache.empty()) {
71+
AddEvent(mCache.data(), mCache.size());
72+
mCache.clear();
73+
}
74+
}
75+
76+
void StreamScraper::SetTargetLabels(PipelineEventGroup& eGroup) const {
77+
mTargetLabels.Range([&eGroup](const std::string& key, const std::string& value) { eGroup.SetTag(key, value); });
78+
}
79+
80+
void StreamScraper::PushEventGroup(PipelineEventGroup&& eGroup) const {
81+
auto item = make_unique<ProcessQueueItem>(std::move(eGroup), mInputIndex);
82+
#ifdef APSARA_UNIT_TEST_MAIN
83+
mItem.emplace_back(std::move(item));
84+
return;
85+
#endif
86+
while (true) {
87+
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item)) == 0) {
88+
break;
89+
}
90+
usleep(10 * 1000);
91+
}
92+
}
93+
94+
void StreamScraper::SendMetrics() {
95+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC,
96+
ToString(mScrapeTimestampMilliSec));
97+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId() + ToString(mScrapeTimestampMilliSec));
98+
99+
SetTargetLabels(mEventGroup);
100+
PushEventGroup(std::move(mEventGroup));
101+
mEventGroup = PipelineEventGroup(std::make_shared<SourceBuffer>());
102+
mCurrStreamSize = 0;
103+
}
104+
105+
void StreamScraper::Reset() {
106+
mEventGroup = PipelineEventGroup(std::make_shared<SourceBuffer>());
107+
mRawSize = 0;
108+
mCurrStreamSize = 0;
109+
mCache.clear();
110+
mStreamIndex = 0;
111+
mScrapeSamplesScraped = 0;
112+
}
113+
114+
void StreamScraper::SetAutoMetricMeta(double scrapeDurationSeconds, bool upState, const string& scrapeState) {
115+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_STATE, scrapeState);
116+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_TIMESTAMP_MILLISEC,
117+
ToString(mScrapeTimestampMilliSec));
118+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SAMPLES_SCRAPED, ToString(mScrapeSamplesScraped));
119+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_DURATION, ToString(scrapeDurationSeconds));
120+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_SCRAPE_RESPONSE_SIZE, ToString(mRawSize));
121+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_UP_STATE, ToString(upState));
122+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_ID, GetId() + ToString(mScrapeTimestampMilliSec));
123+
mEventGroup.SetMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL, ToString(mStreamIndex));
124+
}
125+
std::string StreamScraper::GetId() {
126+
return mHash;
127+
}
128+
void StreamScraper::SetScrapeTime(std::chrono::system_clock::time_point scrapeTime) {
129+
mScrapeTimestampMilliSec
130+
= std::chrono::duration_cast<std::chrono::milliseconds>(scrapeTime.time_since_epoch()).count();
131+
}
132+
} // namespace logtail::prom

0 commit comments

Comments
 (0)