Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add input raw event and event pool #1870

Merged
merged 42 commits into from
Nov 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
42 commits
Select commit Hold shift + click to select a range
36d29b7
feat: zero copy from response to logGroup
catdogpandas Oct 31, 2024
59d814d
polish
henryzhx8 Oct 31, 2024
08e2df9
polish
henryzhx8 Oct 31, 2024
d3dfeca
polish
henryzhx8 Oct 31, 2024
b7fe757
polish
henryzhx8 Oct 31, 2024
bf8b2af
polish
henryzhx8 Oct 31, 2024
21e70f8
polish
henryzhx8 Nov 1, 2024
1ef6890
polish
henryzhx8 Nov 1, 2024
0c17a66
polish
henryzhx8 Nov 1, 2024
b443b89
polish
henryzhx8 Nov 4, 2024
26c38bd
polish
henryzhx8 Nov 4, 2024
a6362ce
polish
henryzhx8 Nov 4, 2024
319030a
polish
henryzhx8 Nov 4, 2024
da113ec
add raw event type
henryzhx8 Nov 4, 2024
6278d68
Merge branch 'main' into feat/raw_event
henryzhx8 Nov 4, 2024
992be31
polish
henryzhx8 Nov 4, 2024
0f41c5b
polish
henryzhx8 Nov 5, 2024
c53a389
Merge branch 'main' into feat/prom-stream-scrape
catdogpandas Nov 5, 2024
6f44843
Merge branch 'main' into optimization/prom-curl-zero-copy
catdogpandas Nov 6, 2024
f48c3a4
chore: update code style
catdogpandas Nov 6, 2024
65e345b
Merge branch 'main' into feat/raw_event
henryzhx8 Nov 7, 2024
035229e
chore: add ut
catdogpandas Nov 8, 2024
400a9c0
feat: Input Event Pool
catdogpandas Nov 8, 2024
43db612
chore: update
catdogpandas Nov 11, 2024
3e94436
chore: update
catdogpandas Nov 11, 2024
9d18133
Merge remote-tracking branch 'upstream/main' into optimization/prom-c…
catdogpandas Nov 11, 2024
e3dfdc5
chore: update
catdogpandas Nov 11, 2024
a8dcf41
chore: update
catdogpandas Nov 11, 2024
91e0034
Merge remote-tracking branch 'upstream/main' into optimization/prom-c…
catdogpandas Nov 11, 2024
8ee371b
Merge remote-tracking branch 'upstream/feat/raw_event' into feat/prom…
catdogpandas Nov 11, 2024
5304db3
feat: use RawEvent pass raw data
catdogpandas Nov 11, 2024
cd0cab6
Merge commit '400a9c0ef3b1dbd8354eeabbd7bbb2625ba04470' into feat/pro…
catdogpandas Nov 11, 2024
d38ab1d
feat: input event pool
catdogpandas Nov 11, 2024
22f9bf5
Merge branch 'main' into feat/prom-raw-event
catdogpandas Nov 18, 2024
c95c52f
chore: update
catdogpandas Nov 18, 2024
1abcaf1
chore: update
catdogpandas Nov 18, 2024
7258ef6
chore: update
catdogpandas Nov 18, 2024
df82d22
chore: update
catdogpandas Nov 18, 2024
1f2f336
chore: update ut
catdogpandas Nov 18, 2024
05893b2
chore: update
catdogpandas Nov 18, 2024
83105da
chore: update
catdogpandas Nov 18, 2024
74c5671
chore: update ut
catdogpandas Nov 18, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "prometheus/PrometheusInputRunner.h"
#include "runner/FlusherRunner.h"
#include "runner/ProcessorRunner.h"
#include "runner/sink/http/HttpSink.h"
Expand Down Expand Up @@ -324,6 +325,7 @@ void Application::Start() { // GCOVR_EXCL_START

// destruct event handlers here so that it will not block file reading task
ConfigManager::GetInstance()->DeleteHandlers();
PrometheusInputRunner::GetInstance()->CheckGC();

this_thread::sleep_for(chrono::seconds(1));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#include <json/json.h>

#include "common/StringTools.h"
#include "models/LogEvent.h"
#include "models/MetricEvent.h"
#include "models/PipelineEventGroup.h"
#include "models/PipelineEventPtr.h"
#include "models/RawEvent.h"
#include "prometheus/Constants.h"

using namespace std;
Expand Down Expand Up @@ -43,7 +43,7 @@ void ProcessorPromParseMetricNative::Process(PipelineEventGroup& eGroup) {
}

bool ProcessorPromParseMetricNative::IsSupportedEvent(const PipelineEventPtr& e) const {
return e.Is<LogEvent>();
return e.Is<RawEvent>();
}

bool ProcessorPromParseMetricNative::ProcessEvent(PipelineEventPtr& e,
Expand All @@ -53,9 +53,9 @@ bool ProcessorPromParseMetricNative::ProcessEvent(PipelineEventPtr& e,
if (!IsSupportedEvent(e)) {
return false;
}
auto& sourceEvent = e.Cast<LogEvent>();
auto& sourceEvent = e.Cast<RawEvent>();
std::unique_ptr<MetricEvent> metricEvent = eGroup.CreateMetricEvent(true);
if (parser.ParseLine(sourceEvent.GetContent(prometheus::PROMETHEUS), *metricEvent)) {
if (parser.ParseLine(sourceEvent.GetContent(), *metricEvent)) {
metricEvent->SetTag(string(prometheus::NAME), metricEvent->GetName());
newEvents.emplace_back(std::move(metricEvent), true, nullptr);
}
Expand Down
7 changes: 6 additions & 1 deletion core/prometheus/PrometheusInputRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ PrometheusInputRunner::PrometheusInputRunner()
: mServiceHost(STRING_FLAG(loong_collector_operator_service)),
mServicePort(INT32_FLAG(loong_collector_operator_service_port)),
mPodName(STRING_FLAG(_pod_name_)),
mEventPool(true),
mUnRegisterMs(0) {
mClient = std::make_unique<sdk::CurlClient>();
mTimer = std::make_shared<Timer>();
Expand Down Expand Up @@ -83,7 +84,7 @@ void PrometheusInputRunner::UpdateScrapeInput(std::shared_ptr<TargetSubscriberSc
targetSubscriber->InitSelfMonitor(defaultLabels);

targetSubscriber->mUnRegisterMs = mUnRegisterMs.load();
targetSubscriber->SetTimer(mTimer);
targetSubscriber->SetComponent(mTimer, &mEventPool);
auto randSleepMilliSec = GetRandSleepMilliSec(
targetSubscriber->GetId(), prometheus::RefeshIntervalSeconds, GetCurrentTimeInMilliSeconds());
auto firstExecTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(randSleepMilliSec);
Expand Down Expand Up @@ -294,4 +295,8 @@ string PrometheusInputRunner::GetAllProjects() {
}
return result;
}

void PrometheusInputRunner::CheckGC() {
mEventPool.CheckGC();
}
}; // namespace logtail
6 changes: 3 additions & 3 deletions core/prometheus/PrometheusInputRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include "common/Lock.h"
#include "common/timer/Timer.h"
#include "monitor/MetricManager.h"
#include "monitor/MetricTypes.h"
#include "prometheus/schedulers/TargetSubscriberScheduler.h"
#include "runner/InputRunner.h"
Expand All @@ -42,6 +41,7 @@ class PrometheusInputRunner : public InputRunner {
static PrometheusInputRunner sInstance;
return &sInstance;
}
void CheckGC();

// input plugin update
void UpdateScrapeInput(std::shared_ptr<TargetSubscriberScheduler> targetSubscriber,
Expand Down Expand Up @@ -70,13 +70,13 @@ class PrometheusInputRunner : public InputRunner {
std::atomic<bool> mIsThreadRunning = true;
std::future<void> mThreadRes;

std::unique_ptr<sdk::CurlClient> mClient;

std::string mServiceHost;
int32_t mServicePort;
std::string mPodName;

std::unique_ptr<sdk::CurlClient> mClient;
std::shared_ptr<Timer> mTimer;
EventPool mEventPool;

mutable ReadWriteLock mSubscriberMapRWLock;
std::map<std::string, std::shared_ptr<TargetSubscriberScheduler>> mTargetSubscriberSchedulerMap;
Expand Down
10 changes: 10 additions & 0 deletions core/prometheus/schedulers/BaseScheduler.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#include "prometheus/schedulers/BaseScheduler.h"

#include "common/timer/Timer.h"
#include "models/EventPool.h"

using namespace std;

namespace logtail {
void BaseScheduler::ExecDone() {
mExecCount++;
Expand Down Expand Up @@ -28,4 +33,9 @@ bool BaseScheduler::IsCancelled() {
ReadLock lock(mLock);
return !mValidState;
}

void BaseScheduler::SetComponent(shared_ptr<Timer> timer, EventPool* eventPool) {
mTimer = std::move(timer);
mEventPool = eventPool;
}
} // namespace logtail
8 changes: 7 additions & 1 deletion core/prometheus/schedulers/BaseScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <memory>

#include "common/http/HttpResponse.h"
#include "common/timer/Timer.h"
#include "models/EventPool.h"
#include "prometheus/async/PromFuture.h"

namespace logtail {
Expand All @@ -20,9 +22,10 @@ class BaseScheduler {

void SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime);
void DelayExecTime(uint64_t delaySeconds);

virtual void Cancel();

void SetComponent(std::shared_ptr<Timer> timer, EventPool* eventPool);

protected:
bool IsCancelled();

Expand All @@ -35,5 +38,8 @@ class BaseScheduler {
bool mValidState = true;
std::shared_ptr<PromFuture<HttpResponse&, uint64_t>> mFuture;
std::shared_ptr<PromFuture<>> mIsContextValidFuture;

std::shared_ptr<Timer> mTimer;
EventPool* mEventPool = nullptr;
};
} // namespace logtail
10 changes: 1 addition & 9 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "common/timer/HttpRequestTimerEvent.h"
#include "common/timer/Timer.h"
#include "logger/Logger.h"
#include "pipeline/queue/ProcessQueueItem.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/Utils.h"
#include "prometheus/async/PromFuture.h"
#include "prometheus/async/PromHttpRequest.h"
#include "sdk/Common.h"
Expand Down Expand Up @@ -86,8 +84,6 @@ ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
mHash = mScrapeConfigPtr->mJobName + tmpTargetURL + ToString(mTargetLabels.Hash());
mInstance = mHost + ":" + ToString(mPort);
mInterval = mScrapeConfigPtr->mScrapeIntervalSeconds;

mParser = make_unique<TextParser>();
}

void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t timestampMilliSec) {
Expand Down Expand Up @@ -214,7 +210,7 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
mScrapeConfigPtr->mRequestHeaders,
"",
HttpResponse(
new PromMetricResponseBody(),
new PromMetricResponseBody(mEventPool),
[](void* ptr) { delete static_cast<PromMetricResponseBody*>(ptr); },
PromMetricWriteCallback),
mScrapeConfigPtr->mScrapeTimeoutSeconds,
Expand All @@ -238,10 +234,6 @@ void ScrapeScheduler::Cancel() {
}
}

void ScrapeScheduler::SetTimer(std::shared_ptr<Timer> timer) {
mTimer = std::move(timer);
}

void ScrapeScheduler::InitSelfMonitor(const MetricLabels& defaultLabels) {
mSelfMonitor = std::make_shared<PromSelfMonitorUnsafe>();
MetricLabels labels = defaultLabels;
Expand Down
15 changes: 5 additions & 10 deletions core/prometheus/schedulers/ScrapeScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,11 @@

#include "BaseScheduler.h"
#include "common/http/HttpResponse.h"
#include "common/timer/Timer.h"
#include "models/PipelineEventGroup.h"
#include "monitor/MetricTypes.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/PromSelfMonitor.h"
#include "prometheus/Utils.h"
#include "prometheus/labels/TextParser.h"
#include "prometheus/schedulers/ScrapeConfig.h"

#ifdef APSARA_UNIT_TEST_MAIN
Expand All @@ -43,13 +40,15 @@ struct PromMetricResponseBody {
PipelineEventGroup mEventGroup;
std::string mCache;
size_t mRawSize = 0;
EventPool* mEventPool = nullptr;

PromMetricResponseBody() : mEventGroup(std::make_shared<SourceBuffer>()) {};
explicit PromMetricResponseBody(EventPool* eventPool)
: mEventGroup(std::make_shared<SourceBuffer>()), mEventPool(eventPool) {};
void AddEvent(char* line, size_t len) {
if (IsValidMetric(StringView(line, len))) {
auto* e = mEventGroup.AddLogEvent();
auto* e = mEventGroup.AddRawEvent(true, mEventPool);
auto sb = mEventGroup.GetSourceBuffer()->CopyString(line, len);
e->SetContentNoCopy(prometheus::PROMETHEUS, StringView(sb.data, sb.size));
e->SetContentNoCopy(sb);
}
}
void FlushCache() {
Expand All @@ -70,7 +69,6 @@ class ScrapeScheduler : public BaseScheduler {
~ScrapeScheduler() override = default;

void OnMetricResult(HttpResponse&, uint64_t timestampMilliSec);
void SetTimer(std::shared_ptr<Timer> timer);

std::string GetId() const;

Expand All @@ -94,11 +92,8 @@ class ScrapeScheduler : public BaseScheduler {
std::string mInstance;
Labels mTargetLabels;

std::unique_ptr<TextParser> mParser;

QueueKey mQueueKey;
size_t mInputIndex;
std::shared_ptr<Timer> mTimer;

// auto metrics
uint64_t mScrapeTimestampMilliSec = 0;
Expand Down
5 changes: 1 addition & 4 deletions core/prometheus/schedulers/TargetSubscriberScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ TargetSubscriberScheduler::BuildScrapeSchedulerSet(std::vector<Labels>& targetGr
auto scrapeScheduler
= std::make_shared<ScrapeScheduler>(mScrapeConfigPtr, host, port, resultLabel, mQueueKey, mInputIndex);

scrapeScheduler->SetTimer(mTimer);
scrapeScheduler->SetComponent(mTimer, mEventPool);

auto randSleepMilliSec = GetRandSleepMilliSec(
scrapeScheduler->GetId(), mScrapeConfigPtr->mScrapeIntervalSeconds, GetCurrentTimeInMilliSeconds());
Expand All @@ -236,9 +236,6 @@ TargetSubscriberScheduler::BuildScrapeSchedulerSet(std::vector<Labels>& targetGr
return scrapeSchedulerMap;
}

void TargetSubscriberScheduler::SetTimer(shared_ptr<Timer> timer) {
mTimer = std::move(timer);
}

string TargetSubscriberScheduler::GetId() const {
return mJobName;
Expand Down
2 changes: 0 additions & 2 deletions core/prometheus/schedulers/TargetSubscriberScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,6 @@ class TargetSubscriberScheduler : public BaseScheduler {
bool operator<(const TargetSubscriberScheduler& other) const;

void OnSubscription(HttpResponse&, uint64_t);
void SetTimer(std::shared_ptr<Timer> timer);
void SubscribeOnce(std::chrono::steady_clock::time_point execTime);

std::string GetId() const;
Expand Down Expand Up @@ -79,7 +78,6 @@ class TargetSubscriberScheduler : public BaseScheduler {
std::unordered_map<std::string, std::shared_ptr<ScrapeScheduler>> mScrapeSchedulerMap;

std::string mJobName;
std::shared_ptr<Timer> mTimer;

std::string mETag;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -85,8 +85,8 @@ void ProcessorParsePrometheusMetricUnittest::TestProcess() {
if (newLine.empty() || newLine[0] == '#') {
continue;
}
auto* MetricEvent = eGroup.AddLogEvent();
MetricEvent->SetContent(prometheus::PROMETHEUS, newLine);
auto* metricEvent = eGroup.AddRawEvent();
metricEvent->SetContent(newLine);
}

return eGroup;
Expand Down
Loading
Loading