Skip to content

Commit

Permalink
feat: zero copy from response to logGroup
Browse files Browse the repository at this point in the history
  • Loading branch information
catdogpandas committed Nov 18, 2024
1 parent f49f501 commit df6c66e
Show file tree
Hide file tree
Showing 15 changed files with 187 additions and 45 deletions.
43 changes: 35 additions & 8 deletions core/models/EventPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,15 @@ RawEvent* EventPool::AcquireRawEvent(PipelineEventGroup* ptr) {
return AcquireEventNoLock(ptr, mRawEventPool, mMinUnusedRawEventsCnt);
}

RawEvent* EventPool::AcquireRawEvent(PipelineEventGroup* ptr) {
if (mEnableLock) {
TransferPoolIfEmpty(mRawEventPool, mRawEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mRawEventPool, mMinUnusedRawEventsCnt);
}
return AcquireEventNoLock(ptr, mRawEventPool, mMinUnusedRawEventsCnt);
}

void EventPool::Release(vector<LogEvent*>&& obj) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolBakMux);
Expand Down Expand Up @@ -112,6 +121,15 @@ void EventPool::Release(vector<RawEvent*>&& obj) {
}
}

void EventPool::Release(vector<RawEvent*>&& obj) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolBakMux);
mRawEventPoolBak.insert(mRawEventPoolBak.end(), obj.begin(), obj.end());
} else {
mRawEventPool.insert(mRawEventPool.end(), obj.begin(), obj.end());
}
}

template <class T>
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux, const string& type) {
if (minUnusedCnt <= pool.size() || minUnusedCnt == numeric_limits<size_t>::max()) {
Expand Down Expand Up @@ -146,15 +164,15 @@ void EventPool::CheckGC() {
if (time(nullptr) - mLastGCTime > INT32_FLAG(event_pool_gc_interval_secs)) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolMux);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux, "span");
DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, &mPoolBakMux, "raw");
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux);
DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, &mPoolBakMux);
} else {
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr, "span");
DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, nullptr, "raw");
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr);
DoGC(mRawEventPool, mRawEventPoolBak, mMinUnusedRawEventsCnt, nullptr);
}
mLastGCTime = time(nullptr);
}
Expand All @@ -173,6 +191,9 @@ void EventPool::DestroyAllEventPool() {
for (auto& item : mRawEventPool) {
delete item;
}
for (auto& item : mRawEventPool) {
delete item;
}
}

void EventPool::DestroyAllEventPoolBak() {
Expand All @@ -188,6 +209,9 @@ void EventPool::DestroyAllEventPoolBak() {
for (auto& item : mRawEventPoolBak) {
delete item;
}
for (auto& item : mRawEventPoolBak) {
delete item;
}
}

#ifdef APSARA_UNIT_TEST_MAIN
Expand All @@ -199,10 +223,12 @@ void EventPool::Clear() {
mMetricEventPool.clear();
mSpanEventPool.clear();
mRawEventPool.clear();
mRawEventPool.clear();
mMinUnusedLogEventsCnt = numeric_limits<size_t>::max();
mMinUnusedMetricEventsCnt = numeric_limits<size_t>::max();
mMinUnusedSpanEventsCnt = numeric_limits<size_t>::max();
mMinUnusedRawEventsCnt = numeric_limits<size_t>::max();
mMinUnusedRawEventsCnt = numeric_limits<size_t>::max();
}
{
lock_guard<mutex> lock(mPoolBakMux);
Expand All @@ -211,6 +237,7 @@ void EventPool::Clear() {
mMetricEventPoolBak.clear();
mSpanEventPoolBak.clear();
mRawEventPoolBak.clear();
mRawEventPoolBak.clear();
}
mLastGCTime = 0;
}
Expand Down
7 changes: 6 additions & 1 deletion core/pipeline/serializer/SLSSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
#include "plugin/flusher/sls/FlusherSLS.h"
#include "protobuf/sls/LogGroupSerializer.h"

DECLARE_FLAG_INT32(max_send_log_group_size);
DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024);

using namespace std;

Expand Down Expand Up @@ -192,15 +192,20 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
for (const auto& tag : group.mTags.mInner) {
if (tag.first == LOG_RESERVED_KEY_TOPIC) {
serializer.AddTopic(tag.second);
serializer.AddTopic(tag.second);
} else if (tag.first == LOG_RESERVED_KEY_SOURCE) {
serializer.AddSource(tag.second);
serializer.AddSource(tag.second);
} else if (tag.first == LOG_RESERVED_KEY_MACHINE_UUID) {
serializer.AddMachineUUID(tag.second);
serializer.AddMachineUUID(tag.second);
} else {
serializer.AddLogTag(tag.first, tag.second);
serializer.AddLogTag(tag.first, tag.second);
}
}
res = std::move(serializer.GetResult());
res = std::move(serializer.GetResult());
return true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@
#include <json/json.h>

#include "common/StringTools.h"
#include "models/LogEvent.h"
#include "models/MetricEvent.h"
#include "models/PipelineEventGroup.h"
#include "models/PipelineEventPtr.h"
#include "models/RawEvent.h"
#include "prometheus/Constants.h"

using namespace std;
Expand Down Expand Up @@ -43,7 +43,7 @@ void ProcessorPromParseMetricNative::Process(PipelineEventGroup& eGroup) {
}

bool ProcessorPromParseMetricNative::IsSupportedEvent(const PipelineEventPtr& e) const {
return e.Is<LogEvent>();
return e.Is<RawEvent>();
}

bool ProcessorPromParseMetricNative::ProcessEvent(PipelineEventPtr& e,
Expand All @@ -53,9 +53,9 @@ bool ProcessorPromParseMetricNative::ProcessEvent(PipelineEventPtr& e,
if (!IsSupportedEvent(e)) {
return false;
}
auto& sourceEvent = e.Cast<LogEvent>();
auto& sourceEvent = e.Cast<RawEvent>();
std::unique_ptr<MetricEvent> metricEvent = eGroup.CreateMetricEvent(true);
if (parser.ParseLine(sourceEvent.GetContent(prometheus::PROMETHEUS), *metricEvent)) {
if (parser.ParseLine(sourceEvent.GetContent(), *metricEvent)) {
metricEvent->SetTag(string(prometheus::NAME), metricEvent->GetName());
newEvents.emplace_back(std::move(metricEvent), true, nullptr);
}
Expand Down
9 changes: 8 additions & 1 deletion core/prometheus/PrometheusInputRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ PrometheusInputRunner::PrometheusInputRunner()
mUnRegisterMs(0) {
mClient = std::make_unique<sdk::CurlClient>();
mTimer = std::make_shared<Timer>();
mEventPool = std::make_shared<EventPool>(true);

// self monitor
MetricLabels labels;
Expand Down Expand Up @@ -83,7 +84,7 @@ void PrometheusInputRunner::UpdateScrapeInput(std::shared_ptr<TargetSubscriberSc
targetSubscriber->InitSelfMonitor(defaultLabels);

targetSubscriber->mUnRegisterMs = mUnRegisterMs.load();
targetSubscriber->SetTimer(mTimer);
targetSubscriber->SetComponent(mTimer, mEventPool);
auto randSleepMilliSec = GetRandSleepMilliSec(
targetSubscriber->GetId(), prometheus::RefeshIntervalSeconds, GetCurrentTimeInMilliSeconds());
auto firstExecTime = std::chrono::steady_clock::now() + std::chrono::milliseconds(randSleepMilliSec);
Expand Down Expand Up @@ -294,4 +295,10 @@ string PrometheusInputRunner::GetAllProjects() {
}
return result;
}

void PrometheusInputRunner::CheckGC() {
if (mEventPool != nullptr) {
mEventPool->CheckGC();
}
}
}; // namespace logtail
6 changes: 3 additions & 3 deletions core/prometheus/PrometheusInputRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@

#include "common/Lock.h"
#include "common/timer/Timer.h"
#include "monitor/MetricManager.h"
#include "monitor/MetricTypes.h"
#include "prometheus/schedulers/TargetSubscriberScheduler.h"
#include "runner/InputRunner.h"
Expand All @@ -42,6 +41,7 @@ class PrometheusInputRunner : public InputRunner {
static PrometheusInputRunner sInstance;
return &sInstance;
}
void CheckGC();

// input plugin update
void UpdateScrapeInput(std::shared_ptr<TargetSubscriberScheduler> targetSubscriber,
Expand Down Expand Up @@ -70,13 +70,13 @@ class PrometheusInputRunner : public InputRunner {
std::atomic<bool> mIsThreadRunning = true;
std::future<void> mThreadRes;

std::unique_ptr<sdk::CurlClient> mClient;

std::string mServiceHost;
int32_t mServicePort;
std::string mPodName;

std::unique_ptr<sdk::CurlClient> mClient;
std::shared_ptr<Timer> mTimer;
std::shared_ptr<EventPool> mEventPool;

mutable ReadWriteLock mSubscriberMapRWLock;
std::map<std::string, std::shared_ptr<TargetSubscriberScheduler>> mTargetSubscriberSchedulerMap;
Expand Down
10 changes: 10 additions & 0 deletions core/prometheus/schedulers/BaseScheduler.cpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,10 @@
#include "prometheus/schedulers/BaseScheduler.h"

#include "common/timer/Timer.h"
#include "models/EventPool.h"

using namespace std;

namespace logtail {
void BaseScheduler::ExecDone() {
mExecCount++;
Expand Down Expand Up @@ -28,4 +33,9 @@ bool BaseScheduler::IsCancelled() {
ReadLock lock(mLock);
return !mValidState;
}

void BaseScheduler::SetComponent(shared_ptr<Timer> timer, shared_ptr<EventPool> eventPool) {
mTimer = std::move(timer);
mEventPool = std::move(eventPool);
}
} // namespace logtail
8 changes: 7 additions & 1 deletion core/prometheus/schedulers/BaseScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
#include <memory>

#include "common/http/HttpResponse.h"
#include "common/timer/Timer.h"
#include "models/EventPool.h"
#include "prometheus/async/PromFuture.h"

namespace logtail {
Expand All @@ -20,9 +22,10 @@ class BaseScheduler {

void SetFirstExecTime(std::chrono::steady_clock::time_point firstExecTime);
void DelayExecTime(uint64_t delaySeconds);

virtual void Cancel();

void SetComponent(std::shared_ptr<Timer> timer, std::shared_ptr<EventPool> eventPool);

protected:
bool IsCancelled();

Expand All @@ -35,5 +38,8 @@ class BaseScheduler {
bool mValidState = true;
std::shared_ptr<PromFuture<HttpResponse&, uint64_t>> mFuture;
std::shared_ptr<PromFuture<>> mIsContextValidFuture;

std::shared_ptr<Timer> mTimer;
std::shared_ptr<EventPool> mEventPool;
};
} // namespace logtail
14 changes: 3 additions & 11 deletions core/prometheus/schedulers/ScrapeScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,13 +24,11 @@
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "common/timer/HttpRequestTimerEvent.h"
#include "common/timer/Timer.h"
#include "logger/Logger.h"
#include "pipeline/queue/ProcessQueueItem.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/Utils.h"
#include "prometheus/async/PromFuture.h"
#include "prometheus/async/PromHttpRequest.h"
#include "sdk/Common.h"
Expand Down Expand Up @@ -86,8 +84,6 @@ ScrapeScheduler::ScrapeScheduler(std::shared_ptr<ScrapeConfig> scrapeConfigPtr,
mHash = mScrapeConfigPtr->mJobName + tmpTargetURL + ToString(mTargetLabels.Hash());
mInstance = mHost + ":" + ToString(mPort);
mInterval = mScrapeConfigPtr->mScrapeIntervalSeconds;

mParser = make_unique<TextParser>();
}

void ScrapeScheduler::OnMetricResult(HttpResponse& response, uint64_t timestampMilliSec) {
Expand Down Expand Up @@ -214,9 +210,9 @@ std::unique_ptr<TimerEvent> ScrapeScheduler::BuildScrapeTimerEvent(std::chrono::
mScrapeConfigPtr->mRequestHeaders,
"",
HttpResponse(
new PromMetricResponseBody(),
[](void* ptr) { delete static_cast<PromMetricResponseBody*>(ptr); },
PromMetricWriteCallback),
new MetricResponseBody(mEventPool),
[](void* ptr) { delete static_cast<MetricResponseBody*>(ptr); },
MetricWriteCallback),
mScrapeConfigPtr->mScrapeTimeoutSeconds,
retry,
this->mFuture,
Expand All @@ -238,10 +234,6 @@ void ScrapeScheduler::Cancel() {
}
}

void ScrapeScheduler::SetTimer(std::shared_ptr<Timer> timer) {
mTimer = std::move(timer);
}

void ScrapeScheduler::InitSelfMonitor(const MetricLabels& defaultLabels) {
mSelfMonitor = std::make_shared<PromSelfMonitorUnsafe>();
MetricLabels labels = defaultLabels;
Expand Down
16 changes: 6 additions & 10 deletions core/prometheus/schedulers/ScrapeScheduler.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,15 @@

#include <memory>
#include <string>
#include <utility>

#include "BaseScheduler.h"
#include "common/http/HttpResponse.h"
#include "common/timer/Timer.h"
#include "models/PipelineEventGroup.h"
#include "monitor/MetricTypes.h"
#include "pipeline/queue/QueueKey.h"
#include "prometheus/Constants.h"
#include "prometheus/PromSelfMonitor.h"
#include "prometheus/Utils.h"
#include "prometheus/labels/TextParser.h"
#include "prometheus/schedulers/ScrapeConfig.h"

#ifdef APSARA_UNIT_TEST_MAIN
Expand All @@ -43,13 +41,15 @@ struct PromMetricResponseBody {
PipelineEventGroup mEventGroup;
std::string mCache;
size_t mRawSize = 0;
std::shared_ptr<EventPool> mEventPool;

PromMetricResponseBody() : mEventGroup(std::make_shared<SourceBuffer>()) {};
explicit PromMetricResponseBody(std::shared_ptr<EventPool> eventPool)
: mEventGroup(std::make_shared<SourceBuffer>()), mEventPool(std::move(eventPool)) {};
void AddEvent(char* line, size_t len) {
if (IsValidMetric(StringView(line, len))) {
auto* e = mEventGroup.AddLogEvent();
auto* e = mEventGroup.AddLogEvent(true, mEventPool.get());
auto sb = mEventGroup.GetSourceBuffer()->CopyString(line, len);
e->SetContentNoCopy(prometheus::PROMETHEUS, StringView(sb.data, sb.size));
e->SetContentNoCopy(sb);
}
}
void FlushCache() {
Expand All @@ -70,7 +70,6 @@ class ScrapeScheduler : public BaseScheduler {
~ScrapeScheduler() override = default;

void OnMetricResult(HttpResponse&, uint64_t timestampMilliSec);
void SetTimer(std::shared_ptr<Timer> timer);

std::string GetId() const;

Expand All @@ -94,11 +93,8 @@ class ScrapeScheduler : public BaseScheduler {
std::string mInstance;
Labels mTargetLabels;

std::unique_ptr<TextParser> mParser;

QueueKey mQueueKey;
size_t mInputIndex;
std::shared_ptr<Timer> mTimer;

// auto metrics
uint64_t mScrapeTimestampMilliSec = 0;
Expand Down
5 changes: 1 addition & 4 deletions core/prometheus/schedulers/TargetSubscriberScheduler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -223,7 +223,7 @@ TargetSubscriberScheduler::BuildScrapeSchedulerSet(std::vector<Labels>& targetGr
auto scrapeScheduler
= std::make_shared<ScrapeScheduler>(mScrapeConfigPtr, host, port, resultLabel, mQueueKey, mInputIndex);

scrapeScheduler->SetTimer(mTimer);
scrapeScheduler->SetComponent(mTimer, mEventPool);

auto randSleepMilliSec = GetRandSleepMilliSec(
scrapeScheduler->GetId(), mScrapeConfigPtr->mScrapeIntervalSeconds, GetCurrentTimeInMilliSeconds());
Expand All @@ -236,9 +236,6 @@ TargetSubscriberScheduler::BuildScrapeSchedulerSet(std::vector<Labels>& targetGr
return scrapeSchedulerMap;
}

void TargetSubscriberScheduler::SetTimer(shared_ptr<Timer> timer) {
mTimer = std::move(timer);
}

string TargetSubscriberScheduler::GetId() const {
return mJobName;
Expand Down
Loading

0 comments on commit df6c66e

Please sign in to comment.