Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: build EventGroup in libCurl Callback #1860

Merged
merged 12 commits into from
Nov 13, 2024
46 changes: 2 additions & 44 deletions core/prometheus/async/PromHttpRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,9 @@
#include <string>

#include "common/http/HttpRequest.h"
#include "prometheus/Constants.h"

namespace logtail {

// size_t PromWriteCallback(char* buffer, size_t size, size_t nmemb, void* data) {
// unsigned long sizes = size * nmemb;

// if (buffer == nullptr) {
// return 0;
// }

// PromResponseBody* body = static_cast<PromResponseBody*>(data);

// size_t begin = 0;
// while (begin < sizes) {
// for (size_t end = begin; end < sizes; ++end) {
// if (buffer[end] == '\n') {
// if (begin == 0) {
// body->mCache.append(buffer, end);
// if (!body->mCache.empty()) {
// auto e = body->mEventGroup.AddLogEvent();
// auto sb = body->mEventGroup.GetSourceBuffer()->CopyString(body->mCache);
// body->mCache.clear();
// e->SetContentNoCopy(prometheus::PROMETHEUS, StringView(sb.data, sb.size));
// }
// } else if (begin != end) {
// auto e = body->mEventGroup.AddLogEvent();
// auto sb = body->mEventGroup.GetSourceBuffer()->CopyString(buffer + begin, end - begin);
// e->SetContentNoCopy(prometheus::PROMETHEUS, StringView(sb.data, sb.size));
// }
// begin += end - begin + 1;
// continue;
// }
// }
// break;
// }
// if (begin < sizes) {
// body->mCache.append(buffer + begin, sizes - begin);
// }
// body->mRawSize += sizes;
// return sizes;
// }

PromHttpRequest::PromHttpRequest(const std::string& method,
bool httpsFlag,
const std::string& host,
Expand All @@ -56,6 +16,7 @@ PromHttpRequest::PromHttpRequest(const std::string& method,
const std::string& query,
const std::map<std::string, std::string>& header,
const std::string& body,
HttpResponse&& response,
uint32_t timeout,
uint32_t maxTryCnt,
std::shared_ptr<PromFuture<HttpResponse&, uint64_t>> future,
Expand All @@ -68,10 +29,7 @@ PromHttpRequest::PromHttpRequest(const std::string& method,
query,
header,
body,
// HttpResponse(
// new PromResponseBody(), [](void* ptr) { delete static_cast<PromResponseBody*>(ptr); },
// PromWriteCallback),
HttpResponse(),
std::move(response),
timeout,
maxTryCnt),
mFuture(std::move(future)),
Expand Down
9 changes: 1 addition & 8 deletions core/prometheus/async/PromHttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PromHttpRequest : public AsynHttpRequest {
const std::string& query,
const std::map<std::string, std::string>& header,
const std::string& body,
HttpResponse&& response,
uint32_t timeout,
uint32_t maxTryCnt,
std::shared_ptr<PromFuture<HttpResponse&, uint64_t>> future,
Expand All @@ -36,12 +37,4 @@ class PromHttpRequest : public AsynHttpRequest {
std::shared_ptr<PromFuture<>> mIsContextValidFuture;
};

struct PromResponseBody {
PipelineEventGroup mEventGroup;
std::string mCache;
size_t mRawSize = 0;

PromResponseBody() : mEventGroup(std::make_shared<SourceBuffer>()) {};
};

} // namespace logtail
18 changes: 0 additions & 18 deletions core/prometheus/labels/TextParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,6 @@ PipelineEventGroup TextParser::Parse(const string& content, uint64_t defaultTime
return eGroup;
}

PipelineEventGroup TextParser::BuildLogGroup(const string& content) {
PipelineEventGroup eGroup(std::make_shared<SourceBuffer>());

vector<StringView> lines;
// pre-reserve vector size by 1024 which is experience value per line
lines.reserve(content.size() / 1024);
SplitStringView(content, '\n', lines);
for (const auto& line : lines) {
if (!IsValidMetric(line)) {
continue;
}
auto* logEvent = eGroup.AddLogEvent();
logEvent->SetContent(prometheus::PROMETHEUS, line);
}

return eGroup;
}

bool TextParser::ParseLine(StringView line, MetricEvent& metricEvent) {
mLine = line;
mPos = 0;
Expand Down
1 change: 0 additions & 1 deletion core/prometheus/labels/TextParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class TextParser {
void SetDefaultTimestamp(uint64_t defaultTimestamp, uint32_t defaultNanoSec);

PipelineEventGroup Parse(const std::string& content, uint64_t defaultTimestamp, uint32_t defaultNanoSec);
PipelineEventGroup BuildLogGroup(const std::string& content);

bool ParseLine(StringView line, MetricEvent& metricEvent);

Expand Down
82 changes: 58 additions & 24 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/Utils.h"
#include "prometheus/async/PromFuture.h"
#include "prometheus/async/PromHttpRequest.h"
#include "sdk/Common.h"
Expand All @@ -38,6 +39,35 @@ using namespace std;

namespace logtail {

size_t MetricWriteCallback(char* buffer, size_t size, size_t nmemb, void* data) {
uint64_t sizes = size * nmemb;

if (buffer == nullptr) {
return 0;
}

auto* body = static_cast<MetricResponseBody*>(data);

size_t begin = 0;
for (size_t end = begin; end < sizes; ++end) {
if (buffer[end] == '\n') {
if (begin == 0 && !body->mCache.empty()) {
body->mCache.append(buffer, end);
body->AddEvent(body->mCache.data(), body->mCache.size());
body->mCache.clear();
} else if (begin != end) {
body->AddEvent(buffer + begin, end - begin);
}
begin = end + 1;
}
}
if (begin < sizes) {
body->mCache.append(buffer + begin, sizes - begin);
}
body->mRawSize += sizes;
return sizes;
}

ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
std::string host,
int32_t port,
Expand All @@ -61,26 +91,29 @@ ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
}

void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t timestampMilliSec) {
auto& responseBody = *response.GetBody<string>();
auto& responseBody = *response.GetBody<MetricResponseBody>();
responseBody.FlushCache();
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_EVENTS_TOTAL, response.GetStatusCode());
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_SIZE_BYTES, response.GetStatusCode(), responseBody.size());
mSelfMonitor->AddCounter(
METRIC_PLUGIN_PROM_SCRAPE_TIME_MS, response.GetStatusCode(), GetCurrentTimeInMilliSeconds() - timestampMilliSec);
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_SIZE_BYTES, response.GetStatusCode(), responseBody.mRawSize);
mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SCRAPE_TIME_MS,
response.GetStatusCode(),
GetCurrentTimeInMilliSeconds() - timestampMilliSec);

mScrapeTimestampMilliSec = timestampMilliSec;
mScrapeDurationSeconds = 1.0 * (GetCurrentTimeInMilliSeconds() - timestampMilliSec) / 1000;
mScrapeResponseSizeBytes = responseBody.size();
mScrapeResponseSizeBytes = responseBody.mRawSize;
mUpState = response.GetStatusCode() == 200;
if (response.GetStatusCode() != 200) {
mScrapeResponseSizeBytes = 0;
string headerStr;
for (const auto& [k, v] : mScrapeConfigPtr->mRequestHeaders) {
headerStr.append(k).append(":").append(v).append(";");
}
LOG_WARNING(sLogger,
("scrape failed, status code", response.GetStatusCode())("target", mHash)("http header", headerStr));
LOG_WARNING(
sLogger,
("scrape failed, status code", response.GetStatusCode())("target", mHash)("http header", headerStr));
}
auto eventGroup = BuildPipelineEventGroup(responseBody);
auto eventGroup = std::move(responseBody.mEventGroup);
catdogpandas marked this conversation as resolved.
Show resolved Hide resolved

SetAutoMetricMeta(eventGroup);
SetTargetLabels(eventGroup);
Expand All @@ -99,10 +132,6 @@ void ScrapeScheduler::SetTargetLabels(PipelineEventGroup& eGroup) {
mTargetLabels.Range([&eGroup](const std::string& key, const std::string& value) { eGroup.SetTag(key, value); });
}

PipelineEventGroup ScrapeScheduler::BuildPipelineEventGroup(const std::string& content) {
return mParser->BuildLogGroup(content);
}

void ScrapeScheduler::PushEventGroup(PipelineEventGroup&& eGroup) {
auto item = make_unique<ProcessQueueItem>(std::move(eGroup), mInputIndex);
#ifdef APSARA_UNIT_TEST_MAIN
Expand Down Expand Up @@ -175,18 +204,23 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
if (retry > 0) {
retry -= 1;
}
auto request = std::make_unique<PromHttpRequest>(sdk::HTTP_GET,
mScrapeConfigPtr->mScheme == prometheus::HTTPS,
mHost,
mPort,
mScrapeConfigPtr->mMetricsPath,
mScrapeConfigPtr->mQueryString,
mScrapeConfigPtr->mRequestHeaders,
"",
mScrapeConfigPtr->mScrapeTimeoutSeconds,
retry,
this->mFuture,
this->mIsContextValidFuture);
auto request
= std::make_unique<PromHttpRequest>(sdk::HTTP_GET,
mScrapeConfigPtr->mScheme == prometheus::HTTPS,
mHost,
mPort,
mScrapeConfigPtr->mMetricsPath,
mScrapeConfigPtr->mQueryString,
mScrapeConfigPtr->mRequestHeaders,
"",
HttpResponse(
new MetricResponseBody(),
[](void* ptr) { delete static_cast<MetricResponseBody*>(ptr); },
MetricWriteCallback),
mScrapeConfigPtr->mScrapeTimeoutSeconds,
retry,
this->mFuture,
this->mIsContextValidFuture);
auto timerEvent = std::make_unique<HttpRequestTimerEvent>(execTime, std::move(request));
return timerEvent;
}
Expand Down
25 changes: 23 additions & 2 deletions core/prometheus/schedulers/ScrapeScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
#include "models/PipelineEventGroup.h"
#include "monitor/LoongCollectorMetricTypes.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/PromSelfMonitor.h"
#include "prometheus/Utils.h"
#include "prometheus/labels/TextParser.h"
#include "prometheus/schedulers/ScrapeConfig.h"

Expand All @@ -35,6 +37,27 @@

namespace logtail {

size_t MetricWriteCallback(char* buffer, size_t size, size_t nmemb, void* data);

struct MetricResponseBody {
catdogpandas marked this conversation as resolved.
Show resolved Hide resolved
PipelineEventGroup mEventGroup;
std::string mCache;
size_t mRawSize = 0;

MetricResponseBody() : mEventGroup(std::make_shared<SourceBuffer>()) {};
void AddEvent(char* line, size_t len) {
if (IsValidMetric(StringView(line, len))) {
auto* e = mEventGroup.AddLogEvent();
auto sb = mEventGroup.GetSourceBuffer()->CopyString(line, len);
e->SetContentNoCopy(prometheus::PROMETHEUS, StringView(sb.data, sb.size));
}
}
void FlushCache() {
AddEvent(mCache.data(), mCache.size());
mCache.clear();
}
};

class ScrapeScheduler : public BaseScheduler {
public:
ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
Expand All @@ -61,8 +84,6 @@ class ScrapeScheduler : public BaseScheduler {
void SetAutoMetricMeta(PipelineEventGroup& eGroup);
void SetTargetLabels(PipelineEventGroup& eGroup);

PipelineEventGroup BuildPipelineEventGroup(const std::string& content);

std::unique_ptr<TimerEvent> BuildScrapeTimerEvent(std::chrono::steady_clock::time_point execTime);

std::shared_ptr<ScrapeConfig> mScrapeConfigPtr;
Expand Down
6 changes: 4 additions & 2 deletions core/prometheus/schedulers/TargetSubscriberScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ bool TargetSubscriberScheduler::operator<(const TargetSubscriberScheduler& other

void TargetSubscriberScheduler::OnSubscription(HttpResponse& response, uint64_t timestampMilliSec) {
mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SUBSCRIBE_TOTAL, response.GetStatusCode());
mSelfMonitor->AddCounter(
METRIC_PLUGIN_PROM_SUBSCRIBE_TIME_MS, response.GetStatusCode(), GetCurrentTimeInMilliSeconds() - timestampMilliSec);
mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SUBSCRIBE_TIME_MS,
response.GetStatusCode(),
GetCurrentTimeInMilliSeconds() - timestampMilliSec);
if (response.GetStatusCode() == 304) {
// not modified
return;
Expand Down Expand Up @@ -304,6 +305,7 @@ TargetSubscriberScheduler::BuildSubscriberTimerEvent(std::chrono::steady_clock::
"collector_id=" + mPodName,
httpHeader,
"",
HttpResponse(),
prometheus::RefeshIntervalSeconds,
1,
this->mFuture);
Expand Down
2 changes: 1 addition & 1 deletion core/unittest/prometheus/PromAsynUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void PromAsynUnittest::TestExecTime() {
return true;
});
auto request = std::make_shared<PromHttpRequest>(
"http", false, "127.0.0.1", 8080, "/", "", map<string, string>(), "", 10, 3, future);
"http", false, "127.0.0.1", 8080, "/", "", map<string, string>(), "", HttpResponse(), 10, 3, future);
auto asynRequest = std::dynamic_pointer_cast<AsynHttpRequest>(request);
asynRequest->mLastSendTime = now;
auto response = HttpResponse{};
Expand Down
Loading
Loading