Skip to content

Commit

Permalink
do not separate event group when group size excceeds min batch size t…
Browse files Browse the repository at this point in the history
…o improve performance (#1850)
  • Loading branch information
henryzhx8 authored Nov 7, 2024
1 parent a34c7dd commit 2ca86bc
Show file tree
Hide file tree
Showing 26 changed files with 374 additions and 281 deletions.
30 changes: 17 additions & 13 deletions core/models/EventPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,26 @@ EventPool::~EventPool() {
}

LogEvent* EventPool::AcquireLogEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mLogEventPool, mMinUnusedLogEventsCnt);
}
return AcquireEventNoLock(ptr, mLogEventPool, mMinUnusedLogEventsCnt);
}

MetricEvent* EventPool::AcquireMetricEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mMetricEventPool, mMinUnusedMetricEventsCnt);
}
return AcquireEventNoLock(ptr, mMetricEventPool, mMinUnusedMetricEventsCnt);
}

SpanEvent* EventPool::AcquireSpanEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mSpanEventPool, mMinUnusedSpanEventsCnt);
}
Expand Down Expand Up @@ -98,20 +95,27 @@ void EventPool::Release(vector<SpanEvent*>&& obj) {
}

template <class T>
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux) {
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux, const string& type) {
if (minUnusedCnt <= pool.size() || minUnusedCnt == numeric_limits<size_t>::max()) {
auto sz = minUnusedCnt == numeric_limits<size_t>::max() ? pool.size() : minUnusedCnt;
for (size_t i = 0; i < sz; ++i) {
delete pool.back();
pool.pop_back();
}
size_t bakSZ = 0;
if (mux) {
lock_guard<mutex> lock(*mux);
bakSZ = poolBak.size();
for (auto& item : poolBak) {
delete item;
}
poolBak.clear();
}
if (sz != 0 || bakSZ != 0) {
LOG_INFO(
sLogger,
("event pool gc", "done")("event type", type)("gc event cnt", sz + bakSZ)("pool size", pool.size()));
}
} else {
LOG_ERROR(sLogger,
("unexpected error", "min unused event cnt is greater than pool size")(
Expand All @@ -124,13 +128,13 @@ void EventPool::CheckGC() {
if (time(nullptr) - mLastGCTime > INT32_FLAG(event_pool_gc_interval_secs)) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolMux);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux, "span");
} else {
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr, "span");
}
mLastGCTime = time(nullptr);
}
Expand Down
16 changes: 15 additions & 1 deletion core/models/PipelineEventGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,24 @@ namespace logtail {
template <class T>
void DestroyEvents(vector<PipelineEventPtr>&& events) {
unordered_map<EventPool*, vector<T*>> eventsPoolMap;
// for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency
EventPool* cachedPoolPtr = nullptr;
typename unordered_map<EventPool*, vector<T*>>::iterator cachedIt;
bool firstEvent = true;
for (auto& item : events) {
if (item && item.IsFromEventPool()) {
item->Reset();
eventsPoolMap[item.GetEventPool()].emplace_back(static_cast<T*>(item.Release()));
if (firstEvent || item.GetEventPool() != cachedPoolPtr) {
cachedPoolPtr = item.GetEventPool();
cachedIt = eventsPoolMap.find(cachedPoolPtr);
if (cachedIt == eventsPoolMap.end()) {
eventsPoolMap.emplace(cachedPoolPtr, vector<T*>());
cachedIt = eventsPoolMap.find(cachedPoolPtr);
cachedIt->second.reserve(events.size());
}
firstEvent = false;
}
cachedIt->second.emplace_back(static_cast<T*>(item.Release()));
}
}
for (auto& item : eventsPoolMap) {
Expand Down
4 changes: 2 additions & 2 deletions core/monitor/metric_constants/ComponentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ const string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES = "component_extra_b
const string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_TOTAL = "component_discarded_events_total";

const string METRIC_COMPONENT_QUEUE_FETCHED_ITEMS_TOTAL = "component_fetched_items_total";
const string METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL = "component_fetch_attempts_total";
const string METRIC_COMPONENT_QUEUE_SUCCESSFUL_FETCH_TIMES_TOTAL = "component_successful_fetch_times_total";
const string METRIC_COMPONENT_QUEUE_FETCH_TIMES_TOTAL = "component_fetch_times_total";
const string METRIC_COMPONENT_QUEUE_VALID_FETCH_TIMES_TOTAL = "component_valid_fetch_times_total";
const string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_REGION_LIMITER_TIMES_TOTAL
= "component_fetch_rejected_by_region_limiter_times_total";
const string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_PROJECT_LIMITER_TIMES_TOTAL
Expand Down
4 changes: 2 additions & 2 deletions core/monitor/metric_constants/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ extern const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES;
extern const std::string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_TOTAL;

extern const std::string METRIC_COMPONENT_QUEUE_FETCHED_ITEMS_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_SUCCESSFUL_FETCH_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_VALID_FETCH_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_REGION_LIMITER_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_PROJECT_LIMITER_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_LOGSTORE_LIMITER_TIMES_TOTAL;
Expand Down
8 changes: 4 additions & 4 deletions core/pipeline/batch/BatchItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class GroupBatchItem {

void Add(BatchedEvents&& g, int64_t totalEnqueTimeMs) {
mEventsCnt += g.mEvents.size();
mTotalEnqueTimeMs += totalEnqueTimeMs;
// mTotalEnqueTimeMs += totalEnqueTimeMs;
mGroups.emplace_back(std::move(g));
mStatus.Update(mGroups.back());
}
Expand Down Expand Up @@ -94,9 +94,9 @@ class EventBatchItem {
void Add(PipelineEventPtr&& e) {
mBatch.mEvents.emplace_back(std::move(e));
mStatus.Update(mBatch.mEvents.back());
mTotalEnqueTimeMs += std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count();
// mTotalEnqueTimeMs += std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
// .time_since_epoch()
// .count();
}

void Flush(GroupBatchItem& res) {
Expand Down
16 changes: 15 additions & 1 deletion core/pipeline/batch/BatchedEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,24 @@ namespace logtail {
template <class T>
void DestroyEvents(vector<PipelineEventPtr>&& events) {
unordered_map<EventPool*, vector<T*>> eventsPoolMap;
// for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency
EventPool* cachedPoolPtr = nullptr;
typename unordered_map<EventPool*, vector<T*>>::iterator cachedIt;
bool firstEvent = true;
for (auto& item : events) {
if (item && item.IsFromEventPool()) {
item->Reset();
eventsPoolMap[item.GetEventPool()].emplace_back(static_cast<T*>(item.Release()));
if (firstEvent || item.GetEventPool() != cachedPoolPtr) {
cachedPoolPtr = item.GetEventPool();
cachedIt = eventsPoolMap.find(cachedPoolPtr);
if (cachedIt == eventsPoolMap.end()) {
eventsPoolMap.emplace(cachedPoolPtr, vector<T*>());
cachedIt = eventsPoolMap.find(cachedPoolPtr);
cachedIt->second.reserve(events.size());
}
firstEvent = false;
}
cachedIt->second.emplace_back(static_cast<T*>(item.Release()));
}
}
for (auto& item : eventsPoolMap) {
Expand Down
Loading

0 comments on commit 2ca86bc

Please sign in to comment.