Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

do not separate event group when group size excceeds min batch size to improve performance #1850

Merged
merged 15 commits into from
Nov 7, 2024
30 changes: 17 additions & 13 deletions core/models/EventPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,26 @@ EventPool::~EventPool() {
}

LogEvent* EventPool::AcquireLogEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mLogEventPool, mMinUnusedLogEventsCnt);
}
return AcquireEventNoLock(ptr, mLogEventPool, mMinUnusedLogEventsCnt);
}

MetricEvent* EventPool::AcquireMetricEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mMetricEventPool, mMinUnusedMetricEventsCnt);
}
return AcquireEventNoLock(ptr, mMetricEventPool, mMinUnusedMetricEventsCnt);
}

SpanEvent* EventPool::AcquireSpanEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mSpanEventPool, mMinUnusedSpanEventsCnt);
}
Expand Down Expand Up @@ -98,20 +95,27 @@ void EventPool::Release(vector<SpanEvent*>&& obj) {
}

template <class T>
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux) {
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux, const string& type) {
if (minUnusedCnt <= pool.size() || minUnusedCnt == numeric_limits<size_t>::max()) {
auto sz = minUnusedCnt == numeric_limits<size_t>::max() ? pool.size() : minUnusedCnt;
for (size_t i = 0; i < sz; ++i) {
delete pool.back();
pool.pop_back();
}
size_t bakSZ = 0;
if (mux) {
lock_guard<mutex> lock(*mux);
bakSZ = poolBak.size();
for (auto& item : poolBak) {
delete item;
}
poolBak.clear();
}
if (sz != 0 || bakSZ != 0) {
LOG_INFO(
sLogger,
("event pool gc", "done")("event type", type)("gc event cnt", sz + bakSZ)("pool size", pool.size()));
}
} else {
LOG_ERROR(sLogger,
("unexpected error", "min unused event cnt is greater than pool size")(
Expand All @@ -124,13 +128,13 @@ void EventPool::CheckGC() {
if (time(nullptr) - mLastGCTime > INT32_FLAG(event_pool_gc_interval_secs)) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolMux);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux, "span");
} else {
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr, "span");
}
mLastGCTime = time(nullptr);
}
Expand Down
16 changes: 15 additions & 1 deletion core/models/PipelineEventGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,24 @@ namespace logtail {
template <class T>
void DestroyEvents(vector<PipelineEventPtr>&& events) {
unordered_map<EventPool*, vector<T*>> eventsPoolMap;
// for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency
EventPool* cachedPoolPtr = nullptr;
typename unordered_map<EventPool*, vector<T*>>::iterator cachedIt;
bool firstEvent = true;
for (auto& item : events) {
if (item && item.IsFromEventPool()) {
item->Reset();
eventsPoolMap[item.GetEventPool()].emplace_back(static_cast<T*>(item.Release()));
if (firstEvent || item.GetEventPool() != cachedPoolPtr) {
cachedPoolPtr = item.GetEventPool();
cachedIt = eventsPoolMap.find(cachedPoolPtr);
if (cachedIt == eventsPoolMap.end()) {
eventsPoolMap.emplace(cachedPoolPtr, vector<T*>());
cachedIt = eventsPoolMap.find(cachedPoolPtr);
cachedIt->second.reserve(events.size());
}
firstEvent = false;
}
cachedIt->second.emplace_back(static_cast<T*>(item.Release()));
}
}
for (auto& item : eventsPoolMap) {
Expand Down
4 changes: 2 additions & 2 deletions core/monitor/metric_constants/ComponentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ const string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES = "component_extra_b
const string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_TOTAL = "component_discarded_events_total";

const string METRIC_COMPONENT_QUEUE_FETCHED_ITEMS_TOTAL = "component_fetched_items_total";
const string METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL = "component_fetch_attempts_total";
const string METRIC_COMPONENT_QUEUE_SUCCESSFUL_FETCH_TIMES_TOTAL = "component_successful_fetch_times_total";
const string METRIC_COMPONENT_QUEUE_FETCH_TIMES_TOTAL = "component_fetch_times_total";
const string METRIC_COMPONENT_QUEUE_VALID_FETCH_TIMES_TOTAL = "component_valid_fetch_times_total";
const string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_REGION_LIMITER_TIMES_TOTAL
= "component_fetch_rejected_by_region_limiter_times_total";
const string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_PROJECT_LIMITER_TIMES_TOTAL
Expand Down
4 changes: 2 additions & 2 deletions core/monitor/metric_constants/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ extern const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES;
extern const std::string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_TOTAL;

extern const std::string METRIC_COMPONENT_QUEUE_FETCHED_ITEMS_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_SUCCESSFUL_FETCH_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_VALID_FETCH_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_REGION_LIMITER_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_PROJECT_LIMITER_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_LOGSTORE_LIMITER_TIMES_TOTAL;
Expand Down
8 changes: 4 additions & 4 deletions core/pipeline/batch/BatchItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class GroupBatchItem {

void Add(BatchedEvents&& g, int64_t totalEnqueTimeMs) {
mEventsCnt += g.mEvents.size();
mTotalEnqueTimeMs += totalEnqueTimeMs;
// mTotalEnqueTimeMs += totalEnqueTimeMs;
mGroups.emplace_back(std::move(g));
mStatus.Update(mGroups.back());
}
Expand Down Expand Up @@ -94,9 +94,9 @@ class EventBatchItem {
void Add(PipelineEventPtr&& e) {
mBatch.mEvents.emplace_back(std::move(e));
mStatus.Update(mBatch.mEvents.back());
mTotalEnqueTimeMs += std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count();
// mTotalEnqueTimeMs += std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
// .time_since_epoch()
// .count();
}

void Flush(GroupBatchItem& res) {
Expand Down
16 changes: 15 additions & 1 deletion core/pipeline/batch/BatchedEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,24 @@ namespace logtail {
template <class T>
void DestroyEvents(vector<PipelineEventPtr>&& events) {
unordered_map<EventPool*, vector<T*>> eventsPoolMap;
// for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency
EventPool* cachedPoolPtr = nullptr;
typename unordered_map<EventPool*, vector<T*>>::iterator cachedIt;
bool firstEvent = true;
for (auto& item : events) {
if (item && item.IsFromEventPool()) {
item->Reset();
eventsPoolMap[item.GetEventPool()].emplace_back(static_cast<T*>(item.Release()));
if (firstEvent || item.GetEventPool() != cachedPoolPtr) {
cachedPoolPtr = item.GetEventPool();
cachedIt = eventsPoolMap.find(cachedPoolPtr);
if (cachedIt == eventsPoolMap.end()) {
eventsPoolMap.emplace(cachedPoolPtr, vector<T*>());
cachedIt = eventsPoolMap.find(cachedPoolPtr);
cachedIt->second.reserve(events.size());
}
firstEvent = false;
}
cachedIt->second.emplace_back(static_cast<T*>(item.Release()));
}
}
for (auto& item : eventsPoolMap) {
Expand Down
Loading
Loading