Skip to content

Commit

Permalink
feat: build EventGroup in libCurl Callback (alibaba#1860)
Browse files Browse the repository at this point in the history
  • Loading branch information
catdogpandas authored Nov 13, 2024
1 parent eaf69cb commit 9dd0592
Show file tree
Hide file tree
Showing 9 changed files with 159 additions and 135 deletions.
46 changes: 2 additions & 44 deletions core/prometheus/async/PromHttpRequest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -5,49 +5,9 @@
#include <string>

#include "common/http/HttpRequest.h"
#include "prometheus/Constants.h"

namespace logtail {

// size_t PromWriteCallback(char* buffer, size_t size, size_t nmemb, void* data) {
// unsigned long sizes = size * nmemb;

// if (buffer == nullptr) {
// return 0;
// }

// PromResponseBody* body = static_cast<PromResponseBody*>(data);

// size_t begin = 0;
// while (begin < sizes) {
// for (size_t end = begin; end < sizes; ++end) {
// if (buffer[end] == '\n') {
// if (begin == 0) {
// body->mCache.append(buffer, end);
// if (!body->mCache.empty()) {
// auto e = body->mEventGroup.AddLogEvent();
// auto sb = body->mEventGroup.GetSourceBuffer()->CopyString(body->mCache);
// body->mCache.clear();
// e->SetContentNoCopy(prometheus::PROMETHEUS, StringView(sb.data, sb.size));
// }
// } else if (begin != end) {
// auto e = body->mEventGroup.AddLogEvent();
// auto sb = body->mEventGroup.GetSourceBuffer()->CopyString(buffer + begin, end - begin);
// e->SetContentNoCopy(prometheus::PROMETHEUS, StringView(sb.data, sb.size));
// }
// begin += end - begin + 1;
// continue;
// }
// }
// break;
// }
// if (begin < sizes) {
// body->mCache.append(buffer + begin, sizes - begin);
// }
// body->mRawSize += sizes;
// return sizes;
// }

PromHttpRequest::PromHttpRequest(const std::string& method,
bool httpsFlag,
const std::string& host,
Expand All @@ -56,6 +16,7 @@ PromHttpRequest::PromHttpRequest(const std::string& method,
const std::string& query,
const std::map<std::string, std::string>& header,
const std::string& body,
HttpResponse&& response,
uint32_t timeout,
uint32_t maxTryCnt,
std::shared_ptr<PromFuture<HttpResponse&, uint64_t>> future,
Expand All @@ -68,10 +29,7 @@ PromHttpRequest::PromHttpRequest(const std::string& method,
query,
header,
body,
// HttpResponse(
// new PromResponseBody(), [](void* ptr) { delete static_cast<PromResponseBody*>(ptr); },
// PromWriteCallback),
HttpResponse(),
std::move(response),
timeout,
maxTryCnt),
mFuture(std::move(future)),
Expand Down
9 changes: 1 addition & 8 deletions core/prometheus/async/PromHttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ class PromHttpRequest : public AsynHttpRequest {
const std::string& query,
const std::map<std::string, std::string>& header,
const std::string& body,
HttpResponse&& response,
uint32_t timeout,
uint32_t maxTryCnt,
std::shared_ptr<PromFuture<HttpResponse&, uint64_t>> future,
Expand All @@ -36,12 +37,4 @@ class PromHttpRequest : public AsynHttpRequest {
std::shared_ptr<PromFuture<>> mIsContextValidFuture;
};

struct PromResponseBody {
PipelineEventGroup mEventGroup;
std::string mCache;
size_t mRawSize = 0;

PromResponseBody() : mEventGroup(std::make_shared<SourceBuffer>()) {};
};

} // namespace logtail
18 changes: 0 additions & 18 deletions core/prometheus/labels/TextParser.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,24 +68,6 @@ PipelineEventGroup TextParser::Parse(const string& content, uint64_t defaultTime
return eGroup;
}

PipelineEventGroup TextParser::BuildLogGroup(const string& content) {
PipelineEventGroup eGroup(std::make_shared<SourceBuffer>());

vector<StringView> lines;
// pre-reserve vector size by 1024 which is experience value per line
lines.reserve(content.size() / 1024);
SplitStringView(content, '\n', lines);
for (const auto& line : lines) {
if (!IsValidMetric(line)) {
continue;
}
auto* logEvent = eGroup.AddLogEvent();
logEvent->SetContent(prometheus::PROMETHEUS, line);
}

return eGroup;
}

bool TextParser::ParseLine(StringView line, MetricEvent& metricEvent) {
mLine = line;
mPos = 0;
Expand Down
1 change: 0 additions & 1 deletion core/prometheus/labels/TextParser.h
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ class TextParser {
void SetDefaultTimestamp(uint64_t defaultTimestamp, uint32_t defaultNanoSec);

PipelineEventGroup Parse(const std::string& content, uint64_t defaultTimestamp, uint32_t defaultNanoSec);
PipelineEventGroup BuildLogGroup(const std::string& content);

bool ParseLine(StringView line, MetricEvent& metricEvent);

Expand Down
82 changes: 58 additions & 24 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/Utils.h"
#include "prometheus/async/PromFuture.h"
#include "prometheus/async/PromHttpRequest.h"
#include "sdk/Common.h"
Expand All @@ -38,6 +39,35 @@ using namespace std;

namespace logtail {

size_t PromMetricWriteCallback(char* buffer, size_t size, size_t nmemb, void* data) {
uint64_t sizes = size * nmemb;

if (buffer == nullptr || data == nullptr) {
return 0;
}

auto* body = static_cast<PromMetricResponseBody*>(data);

size_t begin = 0;
for (size_t end = begin; end < sizes; ++end) {
if (buffer[end] == '\n') {
if (begin == 0 && !body->mCache.empty()) {
body->mCache.append(buffer, end);
body->AddEvent(body->mCache.data(), body->mCache.size());
body->mCache.clear();
} else if (begin != end) {
body->AddEvent(buffer + begin, end - begin);
}
begin = end + 1;
}
}
if (begin < sizes) {
body->mCache.append(buffer + begin, sizes - begin);
}
body->mRawSize += sizes;
return sizes;
}

ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
std::string host,
int32_t port,
Expand All @@ -61,26 +91,29 @@ ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
}

void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t timestampMilliSec) {
auto& responseBody = *response.GetBody<string>();
auto& responseBody = *response.GetBody<PromMetricResponseBody>();
responseBody.FlushCache();
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_EVENTS_TOTAL, response.GetStatusCode());
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_SIZE_BYTES, response.GetStatusCode(), responseBody.size());
mSelfMonitor->AddCounter(
METRIC_PLUGIN_PROM_SCRAPE_TIME_MS, response.GetStatusCode(), GetCurrentTimeInMilliSeconds() - timestampMilliSec);
mSelfMonitor->AddCounter(METRIC_PLUGIN_OUT_SIZE_BYTES, response.GetStatusCode(), responseBody.mRawSize);
mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SCRAPE_TIME_MS,
response.GetStatusCode(),
GetCurrentTimeInMilliSeconds() - timestampMilliSec);

mScrapeTimestampMilliSec = timestampMilliSec;
mScrapeDurationSeconds = 1.0 * (GetCurrentTimeInMilliSeconds() - timestampMilliSec) / 1000;
mScrapeResponseSizeBytes = responseBody.size();
mScrapeResponseSizeBytes = responseBody.mRawSize;
mUpState = response.GetStatusCode() == 200;
if (response.GetStatusCode() != 200) {
mScrapeResponseSizeBytes = 0;
string headerStr;
for (const auto& [k, v] : mScrapeConfigPtr->mRequestHeaders) {
headerStr.append(k).append(":").append(v).append(";");
}
LOG_WARNING(sLogger,
("scrape failed, status code", response.GetStatusCode())("target", mHash)("http header", headerStr));
LOG_WARNING(
sLogger,
("scrape failed, status code", response.GetStatusCode())("target", mHash)("http header", headerStr));
}
auto eventGroup = BuildPipelineEventGroup(responseBody);
auto& eventGroup = responseBody.mEventGroup;

SetAutoMetricMeta(eventGroup);
SetTargetLabels(eventGroup);
Expand All @@ -99,10 +132,6 @@ void ScrapeScheduler::SetTargetLabels(PipelineEventGroup& eGroup) {
mTargetLabels.Range([&eGroup](const std::string& key, const std::string& value) { eGroup.SetTag(key, value); });
}

PipelineEventGroup ScrapeScheduler::BuildPipelineEventGroup(const std::string& content) {
return mParser->BuildLogGroup(content);
}

void ScrapeScheduler::PushEventGroup(PipelineEventGroup&& eGroup) {
auto item = make_unique<ProcessQueueItem>(std::move(eGroup), mInputIndex);
#ifdef APSARA_UNIT_TEST_MAIN
Expand Down Expand Up @@ -175,18 +204,23 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
if (retry > 0) {
retry -= 1;
}
auto request = std::make_unique<PromHttpRequest>(sdk::HTTP_GET,
mScrapeConfigPtr->mScheme == prometheus::HTTPS,
mHost,
mPort,
mScrapeConfigPtr->mMetricsPath,
mScrapeConfigPtr->mQueryString,
mScrapeConfigPtr->mRequestHeaders,
"",
mScrapeConfigPtr->mScrapeTimeoutSeconds,
retry,
this->mFuture,
this->mIsContextValidFuture);
auto request
= std::make_unique<PromHttpRequest>(sdk::HTTP_GET,
mScrapeConfigPtr->mScheme == prometheus::HTTPS,
mHost,
mPort,
mScrapeConfigPtr->mMetricsPath,
mScrapeConfigPtr->mQueryString,
mScrapeConfigPtr->mRequestHeaders,
"",
HttpResponse(
new PromMetricResponseBody(),
[](void* ptr) { delete static_cast<PromMetricResponseBody*>(ptr); },
PromMetricWriteCallback),
mScrapeConfigPtr->mScrapeTimeoutSeconds,
retry,
this->mFuture,
this->mIsContextValidFuture);
auto timerEvent = std::make_unique<HttpRequestTimerEvent>(execTime, std::move(request));
return timerEvent;
}
Expand Down
25 changes: 23 additions & 2 deletions core/prometheus/schedulers/ScrapeScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,9 @@
#include "models/PipelineEventGroup.h"
#include "monitor/LoongCollectorMetricTypes.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/PromSelfMonitor.h"
#include "prometheus/Utils.h"
#include "prometheus/labels/TextParser.h"
#include "prometheus/schedulers/ScrapeConfig.h"

Expand All @@ -35,6 +37,27 @@

namespace logtail {

size_t PromMetricWriteCallback(char* buffer, size_t size, size_t nmemb, void* data);

struct PromMetricResponseBody {
PipelineEventGroup mEventGroup;
std::string mCache;
size_t mRawSize = 0;

PromMetricResponseBody() : mEventGroup(std::make_shared<SourceBuffer>()) {};
void AddEvent(char* line, size_t len) {
if (IsValidMetric(StringView(line, len))) {
auto* e = mEventGroup.AddLogEvent();
auto sb = mEventGroup.GetSourceBuffer()->CopyString(line, len);
e->SetContentNoCopy(prometheus::PROMETHEUS, StringView(sb.data, sb.size));
}
}
void FlushCache() {
AddEvent(mCache.data(), mCache.size());
mCache.clear();
}
};

class ScrapeScheduler : public BaseScheduler {
public:
ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
Expand All @@ -61,8 +84,6 @@ class ScrapeScheduler : public BaseScheduler {
void SetAutoMetricMeta(PipelineEventGroup& eGroup);
void SetTargetLabels(PipelineEventGroup& eGroup);

PipelineEventGroup BuildPipelineEventGroup(const std::string& content);

std::unique_ptr<TimerEvent> BuildScrapeTimerEvent(std::chrono::steady_clock::time_point execTime);

std::shared_ptr<ScrapeConfig> mScrapeConfigPtr;
Expand Down
6 changes: 4 additions & 2 deletions core/prometheus/schedulers/TargetSubscriberScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -59,8 +59,9 @@ bool TargetSubscriberScheduler::operator<(const TargetSubscriberScheduler& other

void TargetSubscriberScheduler::OnSubscription(HttpResponse& response, uint64_t timestampMilliSec) {
mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SUBSCRIBE_TOTAL, response.GetStatusCode());
mSelfMonitor->AddCounter(
METRIC_PLUGIN_PROM_SUBSCRIBE_TIME_MS, response.GetStatusCode(), GetCurrentTimeInMilliSeconds() - timestampMilliSec);
mSelfMonitor->AddCounter(METRIC_PLUGIN_PROM_SUBSCRIBE_TIME_MS,
response.GetStatusCode(),
GetCurrentTimeInMilliSeconds() - timestampMilliSec);
if (response.GetStatusCode() == 304) {
// not modified
return;
Expand Down Expand Up @@ -304,6 +305,7 @@ TargetSubscriberScheduler::BuildSubscriberTimerEvent(std::chrono::steady_clock::
"collector_id=" + mPodName,
httpHeader,
"",
HttpResponse(),
prometheus::RefeshIntervalSeconds,
1,
this->mFuture);
Expand Down
2 changes: 1 addition & 1 deletion core/unittest/prometheus/PromAsynUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ void PromAsynUnittest::TestExecTime() {
return true;
});
auto request = std::make_shared<PromHttpRequest>(
"http", false, "127.0.0.1", 8080, "/", "", map<string, string>(), "", 10, 3, future);
"http", false, "127.0.0.1", 8080, "/", "", map<string, string>(), "", HttpResponse(), 10, 3, future);
auto asynRequest = std::dynamic_pointer_cast<AsynHttpRequest>(request);
asynRequest->mLastSendTime = now;
auto response = HttpResponse{};
Expand Down
Loading

0 comments on commit 9dd0592

Please sign in to comment.