From ebeb0f9f2448d812e70ae53241e02dd44acb6efe Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 1 Nov 2024 10:07:08 +0000 Subject: [PATCH 01/14] do not separate event group when group size excceeds min batch size --- core/pipeline/batch/BatchItem.h | 8 +- core/pipeline/batch/Batcher.h | 159 +++++++++++------- core/pipeline/batch/FlushStrategy.h | 34 ++-- core/plugin/flusher/sls/FlusherSLS.cpp | 23 +-- core/plugin/flusher/sls/FlusherSLS.h | 1 - core/unittest/batch/BatcherUnittest.cpp | 103 ++++++++---- core/unittest/batch/FlushStrategyUnittest.cpp | 17 +- core/unittest/flusher/FlusherSLSUnittest.cpp | 22 +-- 8 files changed, 218 insertions(+), 149 deletions(-) diff --git a/core/pipeline/batch/BatchItem.h b/core/pipeline/batch/BatchItem.h index 9fa956dbef..d76dda00c1 100644 --- a/core/pipeline/batch/BatchItem.h +++ b/core/pipeline/batch/BatchItem.h @@ -34,7 +34,7 @@ class GroupBatchItem { void Add(BatchedEvents&& g, int64_t totalEnqueTimeMs) { mEventsCnt += g.mEvents.size(); - mTotalEnqueTimeMs += totalEnqueTimeMs; + // mTotalEnqueTimeMs += totalEnqueTimeMs; mGroups.emplace_back(std::move(g)); mStatus.Update(mGroups.back()); } @@ -94,9 +94,9 @@ class EventBatchItem { void Add(PipelineEventPtr&& e) { mBatch.mEvents.emplace_back(std::move(e)); mStatus.Update(mBatch.mEvents.back()); - mTotalEnqueTimeMs += std::chrono::time_point_cast(std::chrono::system_clock::now()) - .time_since_epoch() - .count(); + // mTotalEnqueTimeMs += std::chrono::time_point_cast(std::chrono::system_clock::now()) + // .time_since_epoch() + // .count(); } void Flush(GroupBatchItem& res) { diff --git a/core/pipeline/batch/Batcher.h b/core/pipeline/batch/Batcher.h index 523e390d1f..aeca80b600 100644 --- a/core/pipeline/batch/Batcher.h +++ b/core/pipeline/batch/Batcher.h @@ -47,12 +47,12 @@ class Batcher { std::string errorMsg; PipelineContext& ctx = flusher->GetContext(); - uint32_t maxSizeBytes = strategy.mMaxSizeBytes; - if (!GetOptionalUIntParam(config, "MaxSizeBytes", maxSizeBytes, errorMsg)) { + uint32_t minSizeBytes = strategy.mMinSizeBytes; + if (!GetOptionalUIntParam(config, "MinSizeBytes", minSizeBytes, errorMsg)) { PARAM_WARNING_DEFAULT(ctx.GetLogger(), ctx.GetAlarm(), errorMsg, - maxSizeBytes, + minSizeBytes, flusher->Name(), ctx.GetConfigName(), ctx.GetProjectName(), @@ -60,12 +60,12 @@ class Batcher { ctx.GetRegion()); } - uint32_t maxCnt = strategy.mMaxCnt; - if (!GetOptionalUIntParam(config, "MaxCnt", maxCnt, errorMsg)) { + uint32_t minCnt = strategy.mMinCnt; + if (!GetOptionalUIntParam(config, "MinCnt", minCnt, errorMsg)) { PARAM_WARNING_DEFAULT(ctx.GetLogger(), ctx.GetAlarm(), errorMsg, - maxCnt, + minCnt, flusher->Name(), ctx.GetConfigName(), ctx.GetProjectName(), @@ -88,14 +88,15 @@ class Batcher { if (enableGroupBatch) { uint32_t groupTimeout = timeoutSecs / 2; - mGroupFlushStrategy = GroupFlushStrategy(maxSizeBytes, groupTimeout); + mGroupFlushStrategy = GroupFlushStrategy(minSizeBytes, groupTimeout); mGroupQueue = GroupBatchItem(); mEventFlushStrategy.SetTimeoutSecs(timeoutSecs - groupTimeout); } else { mEventFlushStrategy.SetTimeoutSecs(timeoutSecs); } - mEventFlushStrategy.SetMaxSizeBytes(maxSizeBytes); - mEventFlushStrategy.SetMaxCnt(maxCnt); + mEventFlushStrategy.SetMaxSizeBytes(strategy.mMaxSizeBytes); + mEventFlushStrategy.SetMinSizeBytes(minSizeBytes); + mEventFlushStrategy.SetMinCnt(minCnt); mFlusher = flusher; @@ -114,7 +115,7 @@ class Batcher { mInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_EVENTS_TOTAL); mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_SIZE_BYTES); mOutEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_EVENTS_TOTAL); - mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); + // mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_TOTAL_DELAY_MS); mEventBatchItemsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_EVENT_BATCHES_TOTAL); mBufferedGroupsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_GROUPS_TOTAL); mBufferedEventsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_COMPONENT_BATCHER_BUFFERED_EVENTS_TOTAL); @@ -134,51 +135,79 @@ class Batcher { mInGroupDataSizeBytes->Add(g.DataSize()); mEventBatchItemsTotal->Set(mEventQueueMap.size()); - size_t eventsSize = g.GetEvents().size(); - for (size_t i = 0; i < eventsSize; ++i) { - PipelineEventPtr& e = g.MutableEvents()[i]; - if (!item.IsEmpty() && mEventFlushStrategy.NeedFlushByTime(item.GetStatus(), e)) { - if (!mGroupQueue) { - UpdateMetricsOnFlushingEventQueue(item); + if (g.DataSize() > mEventFlushStrategy.GetMinSizeBytes()) { + // for group size larger than min batch size, separate group only if size is larger than max batch size + if (!item.IsEmpty()) { + UpdateMetricsOnFlushingEventQueue(item); + item.Flush(res); + } + for (auto& e : g.MutableEvents()) { + // should consider time condition here because sls require this + if (!item.IsEmpty() && mEventFlushStrategy.NeedFlushByTime(item.GetStatus(), e)) { + mOutEventsTotal->Add(item.EventSize()); + item.Flush(res); + } + if (item.IsEmpty()) { + item.Reset(g.GetSizedTags(), + g.GetSourceBuffer(), + g.GetExactlyOnceCheckpoint(), + g.GetMetadata(EventGroupMetaKey::SOURCE_ID)); + } + item.Add(std::move(e)); + if (mEventFlushStrategy.SizeReachingUpperLimit(item.GetStatus())) { + mOutEventsTotal->Add(item.EventSize()); item.Flush(res); - } else { - if (!mGroupQueue->IsEmpty() && mGroupFlushStrategy->NeedFlushByTime(mGroupQueue->GetStatus())) { - UpdateMetricsOnFlushingGroupQueue(); - mGroupQueue->Flush(res); - } - if (mGroupQueue->IsEmpty()) { - TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(), - 0, - 0, - mGroupFlushStrategy->GetTimeoutSecs(), - mFlusher); - } - item.Flush(mGroupQueue.value()); - if (mGroupFlushStrategy->NeedFlushBySize(mGroupQueue->GetStatus())) { - UpdateMetricsOnFlushingGroupQueue(); - mGroupQueue->Flush(res); - } } } - if (item.IsEmpty()) { - item.Reset(g.GetSizedTags(), - g.GetSourceBuffer(), - g.GetExactlyOnceCheckpoint(), - g.GetMetadata(EventGroupMetaKey::SOURCE_ID)); - TimeoutFlushManager::GetInstance()->UpdateRecord( - mFlusher->GetContext().GetConfigName(), 0, key, mEventFlushStrategy.GetTimeoutSecs(), mFlusher); - mBufferedGroupsTotal->Add(1); - mBufferedDataSizeByte->Add(item.DataSize()); - } else if (i == 0) { - item.AddSourceBuffer(g.GetSourceBuffer()); - } - mBufferedEventsTotal->Add(1); - mBufferedDataSizeByte->Add(e->DataSize()); - item.Add(std::move(e)); - if (mEventFlushStrategy.NeedFlushBySize(item.GetStatus()) - || mEventFlushStrategy.NeedFlushByCnt(item.GetStatus())) { - UpdateMetricsOnFlushingEventQueue(item); - item.Flush(res); + mOutEventsTotal->Add(item.EventSize()); + item.Flush(res); + } else { + size_t eventsSize = g.GetEvents().size(); + for (size_t i = 0; i < eventsSize; ++i) { + PipelineEventPtr& e = g.MutableEvents()[i]; + if (!item.IsEmpty() && mEventFlushStrategy.NeedFlushByTime(item.GetStatus(), e)) { + if (!mGroupQueue) { + UpdateMetricsOnFlushingEventQueue(item); + item.Flush(res); + } else { + if (!mGroupQueue->IsEmpty() && mGroupFlushStrategy->NeedFlushByTime(mGroupQueue->GetStatus())) { + UpdateMetricsOnFlushingGroupQueue(); + mGroupQueue->Flush(res); + } + if (mGroupQueue->IsEmpty()) { + TimeoutFlushManager::GetInstance()->UpdateRecord(mFlusher->GetContext().GetConfigName(), + 0, + 0, + mGroupFlushStrategy->GetTimeoutSecs(), + mFlusher); + } + item.Flush(mGroupQueue.value()); + if (mGroupFlushStrategy->NeedFlushBySize(mGroupQueue->GetStatus())) { + UpdateMetricsOnFlushingGroupQueue(); + mGroupQueue->Flush(res); + } + } + } + if (item.IsEmpty()) { + item.Reset(g.GetSizedTags(), + g.GetSourceBuffer(), + g.GetExactlyOnceCheckpoint(), + g.GetMetadata(EventGroupMetaKey::SOURCE_ID)); + TimeoutFlushManager::GetInstance()->UpdateRecord( + mFlusher->GetContext().GetConfigName(), 0, key, mEventFlushStrategy.GetTimeoutSecs(), mFlusher); + mBufferedGroupsTotal->Add(1); + mBufferedDataSizeByte->Add(item.DataSize()); + } else if (i == 0) { + item.AddSourceBuffer(g.GetSourceBuffer()); + } + mBufferedEventsTotal->Add(1); + mBufferedDataSizeByte->Add(e->DataSize()); + item.Add(std::move(e)); + if (mEventFlushStrategy.NeedFlushBySize(item.GetStatus()) + || mEventFlushStrategy.NeedFlushByCnt(item.GetStatus())) { + UpdateMetricsOnFlushingEventQueue(item); + item.Flush(res); + } } } mTotalAddTimeMs->Add(std::chrono::system_clock::now() - before); @@ -260,12 +289,12 @@ class Batcher { private: void UpdateMetricsOnFlushingEventQueue(const EventBatchItem& item) { mOutEventsTotal->Add(item.EventSize()); - mTotalDelayMs->Add( - item.EventSize() - * std::chrono::time_point_cast(std::chrono::system_clock::now()) - .time_since_epoch() - .count() - - item.TotalEnqueTimeMs()); + // mTotalDelayMs->Add( + // item.EventSize() + // * std::chrono::time_point_cast(std::chrono::system_clock::now()) + // .time_since_epoch() + // .count() + // - item.TotalEnqueTimeMs()); mBufferedGroupsTotal->Sub(1); mBufferedEventsTotal->Sub(item.EventSize()); mBufferedDataSizeByte->Sub(item.DataSize()); @@ -273,12 +302,12 @@ class Batcher { void UpdateMetricsOnFlushingGroupQueue() { mOutEventsTotal->Add(mGroupQueue->EventSize()); - mTotalDelayMs->Add( - mGroupQueue->EventSize() - * std::chrono::time_point_cast(std::chrono::system_clock::now()) - .time_since_epoch() - .count() - - mGroupQueue->TotalEnqueTimeMs()); + // mTotalDelayMs->Add( + // mGroupQueue->EventSize() + // * std::chrono::time_point_cast(std::chrono::system_clock::now()) + // .time_since_epoch() + // .count() + // - mGroupQueue->TotalEnqueTimeMs()); mBufferedGroupsTotal->Sub(mGroupQueue->GroupSize()); mBufferedEventsTotal->Sub(mGroupQueue->EventSize()); mBufferedDataSizeByte->Sub(mGroupQueue->DataSize()); @@ -297,7 +326,7 @@ class Batcher { CounterPtr mInEventsTotal; CounterPtr mInGroupDataSizeBytes; CounterPtr mOutEventsTotal; - CounterPtr mTotalDelayMs; + // CounterPtr mTotalDelayMs; IntGaugePtr mEventBatchItemsTotal; IntGaugePtr mBufferedGroupsTotal; IntGaugePtr mBufferedEventsTotal; diff --git a/core/pipeline/batch/FlushStrategy.h b/core/pipeline/batch/FlushStrategy.h index d248cc7f10..e2a718244e 100644 --- a/core/pipeline/batch/FlushStrategy.h +++ b/core/pipeline/batch/FlushStrategy.h @@ -20,15 +20,17 @@ #include #include +#include -#include "pipeline/batch/BatchStatus.h" #include "models/PipelineEventPtr.h" +#include "pipeline/batch/BatchStatus.h" namespace logtail { struct DefaultFlushStrategyOptions { - uint32_t mMaxSizeBytes = 0; - uint32_t mMaxCnt = 0; + uint32_t mMaxSizeBytes = std::numeric_limits::max(); + uint32_t mMinSizeBytes = 0; + uint32_t mMinCnt = 0; uint32_t mTimeoutSecs = 0; }; @@ -36,49 +38,53 @@ template class EventFlushStrategy { public: void SetMaxSizeBytes(uint32_t size) { mMaxSizeBytes = size; } - void SetMaxCnt(uint32_t cnt) { mMaxCnt = cnt; } + void SetMinSizeBytes(uint32_t size) { mMinSizeBytes = size; } + void SetMinCnt(uint32_t cnt) { mMinCnt = cnt; } void SetTimeoutSecs(uint32_t secs) { mTimeoutSecs = secs; } uint32_t GetMaxSizeBytes() const { return mMaxSizeBytes; } - uint32_t GetMaxCnt() const { return mMaxCnt; } + uint32_t GetMinSizeBytes() const { return mMinSizeBytes; } + uint32_t GetMinCnt() const { return mMinCnt; } uint32_t GetTimeoutSecs() const { return mTimeoutSecs; } // should be called after event is added - bool NeedFlushBySize(const T& status) { return status.GetSize() >= mMaxSizeBytes; } - bool NeedFlushByCnt(const T& status) { return status.GetCnt() == mMaxCnt; } + bool NeedFlushBySize(const T& status) { return status.GetSize() >= mMinSizeBytes; } + bool NeedFlushByCnt(const T& status) { return status.GetCnt() == mMinCnt; } // should be called before event is added bool NeedFlushByTime(const T& status, const PipelineEventPtr& e) { return time(nullptr) - status.GetCreateTime() >= mTimeoutSecs; } + bool SizeReachingUpperLimit(const T& status) { return status.GetSize() >= mMaxSizeBytes; } private: uint32_t mMaxSizeBytes = 0; - uint32_t mMaxCnt = 0; + uint32_t mMinSizeBytes = 0; + uint32_t mMinCnt = 0; uint32_t mTimeoutSecs = 0; }; class GroupFlushStrategy { public: - GroupFlushStrategy(uint32_t size, uint32_t timeout) : mMaxSizeBytes(size), mTimeoutSecs(timeout) {} + GroupFlushStrategy(uint32_t size, uint32_t timeout) : mMinSizeBytes(size), mTimeoutSecs(timeout) {} - void SetMaxSizeBytes(uint32_t size) { mMaxSizeBytes = size; } + void SetMinSizeBytes(uint32_t size) { mMinSizeBytes = size; } void SetTimeoutSecs(uint32_t secs) { mTimeoutSecs = secs; } - uint32_t GetMaxSizeBytes() const { return mMaxSizeBytes; } + uint32_t GetMinSizeBytes() const { return mMinSizeBytes; } uint32_t GetTimeoutSecs() const { return mTimeoutSecs; } // should be called after event is added - bool NeedFlushBySize(const GroupBatchStatus& status) { return status.GetSize() >= mMaxSizeBytes; } + bool NeedFlushBySize(const GroupBatchStatus& status) { return status.GetSize() >= mMinSizeBytes; } // should be called before event is added bool NeedFlushByTime(const GroupBatchStatus& status) { return time(nullptr) - status.GetCreateTime() >= mTimeoutSecs; } private: - uint32_t mMaxSizeBytes = 0; + uint32_t mMinSizeBytes = 0; uint32_t mTimeoutSecs = 0; }; template <> bool EventFlushStrategy::NeedFlushByTime(const SLSEventBatchStatus& status, - const PipelineEventPtr& e); + const PipelineEventPtr& e); } // namespace logtail diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 9f5b4cd08b..f4c31ee7df 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -59,6 +59,7 @@ DEFINE_FLAG_INT32(profile_data_send_retrytimes, "how many times should retry if DEFINE_FLAG_INT32(unknow_error_try_max, "discard data when try times > this value", 5); DEFINE_FLAG_BOOL(global_network_success, "global network success flag, default false", false); DEFINE_FLAG_BOOL(enable_metricstore_channel, "only works for metrics data for enhance metrics query performance", true); +DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024); DECLARE_FLAG_BOOL(send_prefer_real_ip); @@ -374,8 +375,10 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline #endif mEndpoint = TrimString(mEndpoint); if (!mEndpoint.empty()) { - SLSClientManager::GetInstance()->AddEndpointEntry( - mRegion, StandardizeEndpoint(mEndpoint, mEndpoint), false, SLSClientManager::EndpointSourceType::LOCAL); + SLSClientManager::GetInstance()->AddEndpointEntry(mRegion, + StandardizeEndpoint(mEndpoint, mEndpoint), + false, + SLSClientManager::EndpointSourceType::LOCAL); } } #ifdef __ENTERPRISE__ @@ -472,7 +475,8 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetRegion()); } - DefaultFlushStrategyOptions strategy{static_cast(INT32_FLAG(batch_send_metric_size)), + DefaultFlushStrategyOptions strategy{static_cast(INT32_FLAG(max_send_log_group_size)), + static_cast(INT32_FLAG(batch_send_metric_size)), static_cast(INT32_FLAG(merge_log_count_limit)), static_cast(INT32_FLAG(batch_send_interval))}; if (!mBatcher.Init( @@ -516,19 +520,6 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mMaxSendRate); } - // (Deprecated) FlowControlExpireTime - if (!GetOptionalUIntParam(config, "FlowControlExpireTime", mFlowControlExpireTime, errorMsg)) { - PARAM_WARNING_DEFAULT(mContext->GetLogger(), - mContext->GetAlarm(), - errorMsg, - mFlowControlExpireTime, - sName, - mContext->GetConfigName(), - mContext->GetProjectName(), - mContext->GetLogstoreName(), - mContext->GetRegion()); - } - GenerateGoPlugin(config, optionalGoPipeline); mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL); diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index c9afca958a..aa6c7986e7 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -81,7 +81,6 @@ class FlusherSLS : public HttpFlusher { sls_logs::SlsTelemetryType mTelemetryType = sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS; std::vector mShardHashKeys; uint32_t mMaxSendRate = 0; // preserved only for exactly once - uint32_t mFlowControlExpireTime = 0; // TODO: temporarily public for profile std::unique_ptr mCompressor; diff --git a/core/unittest/batch/BatcherUnittest.cpp b/core/unittest/batch/BatcherUnittest.cpp index d75b2ed9ec..a2ecef0b39 100644 --- a/core/unittest/batch/BatcherUnittest.cpp +++ b/core/unittest/batch/BatcherUnittest.cpp @@ -28,6 +28,7 @@ class BatcherUnittest : public ::testing::Test { void TestInitWithGroupBatch(); void TestAddWithoutGroupBatch(); void TestAddWithGroupBatch(); + void TestAddWithOversizedGroup(); void TestFlushEventQueueWithoutGroupBatch(); void TestFlushEventQueueWithGroupBatch(); void TestFlushGroupQueue(); @@ -59,16 +60,18 @@ unique_ptr BatcherUnittest::sFlusher; void BatcherUnittest::TestParamInit() { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 1; - strategy.mMaxSizeBytes = 100; + strategy.mMinCnt = 1; + strategy.mMaxSizeBytes = 300; + strategy.mMinSizeBytes = 100; strategy.mTimeoutSecs = 3; { // empty config Batcher<> batch; batch.Init(Json::Value(), sFlusher.get(), strategy); - APSARA_TEST_EQUAL(1U, batch.mEventFlushStrategy.GetMaxCnt()); - APSARA_TEST_EQUAL(100U, batch.mEventFlushStrategy.GetMaxSizeBytes()); + APSARA_TEST_EQUAL(1U, batch.mEventFlushStrategy.GetMinCnt()); + APSARA_TEST_EQUAL(100U, batch.mEventFlushStrategy.GetMinSizeBytes()); APSARA_TEST_EQUAL(3U, batch.mEventFlushStrategy.GetTimeoutSecs()); + APSARA_TEST_EQUAL(300U, batch.mEventFlushStrategy.GetMaxSizeBytes()); } { // invalid param @@ -85,9 +88,10 @@ void BatcherUnittest::TestParamInit() { Batcher<> batch; batch.Init(configJson, sFlusher.get(), strategy); - APSARA_TEST_EQUAL(1U, batch.mEventFlushStrategy.GetMaxCnt()); - APSARA_TEST_EQUAL(100U, batch.mEventFlushStrategy.GetMaxSizeBytes()); + APSARA_TEST_EQUAL(1U, batch.mEventFlushStrategy.GetMinCnt()); + APSARA_TEST_EQUAL(100U, batch.mEventFlushStrategy.GetMinSizeBytes()); APSARA_TEST_EQUAL(3U, batch.mEventFlushStrategy.GetTimeoutSecs()); + APSARA_TEST_EQUAL(300U, batch.mEventFlushStrategy.GetMaxSizeBytes()); } } @@ -105,9 +109,10 @@ void BatcherUnittest::TestInitWithoutGroupBatch() { Batcher<> batch; batch.Init(configJson, sFlusher.get(), DefaultFlushStrategyOptions()); - APSARA_TEST_EQUAL(10U, batch.mEventFlushStrategy.GetMaxCnt()); - APSARA_TEST_EQUAL(1000U, batch.mEventFlushStrategy.GetMaxSizeBytes()); + APSARA_TEST_EQUAL(10U, batch.mEventFlushStrategy.GetMinCnt()); + APSARA_TEST_EQUAL(1000U, batch.mEventFlushStrategy.GetMinSizeBytes()); APSARA_TEST_EQUAL(5U, batch.mEventFlushStrategy.GetTimeoutSecs()); + APSARA_TEST_EQUAL(numeric_limits::max(), batch.mEventFlushStrategy.GetMaxSizeBytes()); APSARA_TEST_EQUAL(sFlusher.get(), batch.mFlusher); } @@ -125,11 +130,12 @@ void BatcherUnittest::TestInitWithGroupBatch() { Batcher<> batch; batch.Init(configJson, sFlusher.get(), DefaultFlushStrategyOptions(), true); - APSARA_TEST_EQUAL(10U, batch.mEventFlushStrategy.GetMaxCnt()); - APSARA_TEST_EQUAL(1000U, batch.mEventFlushStrategy.GetMaxSizeBytes()); + APSARA_TEST_EQUAL(10U, batch.mEventFlushStrategy.GetMinCnt()); + APSARA_TEST_EQUAL(1000U, batch.mEventFlushStrategy.GetMinSizeBytes()); APSARA_TEST_EQUAL(3U, batch.mEventFlushStrategy.GetTimeoutSecs()); + APSARA_TEST_EQUAL(numeric_limits::max(), batch.mEventFlushStrategy.GetMaxSizeBytes()); APSARA_TEST_TRUE(batch.mGroupFlushStrategy); - APSARA_TEST_EQUAL(1000U, batch.mGroupFlushStrategy->GetMaxSizeBytes()); + APSARA_TEST_EQUAL(1000U, batch.mGroupFlushStrategy->GetMinSizeBytes()); APSARA_TEST_EQUAL(2U, batch.mGroupFlushStrategy->GetTimeoutSecs()); APSARA_TEST_TRUE(batch.mGroupQueue); APSARA_TEST_EQUAL(sFlusher.get(), batch.mFlusher); @@ -137,8 +143,8 @@ void BatcherUnittest::TestInitWithGroupBatch() { void BatcherUnittest::TestAddWithoutGroupBatch() { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 3; - strategy.mMaxSizeBytes = 1000; + strategy.mMinCnt = 3; + strategy.mMinSizeBytes = 1000; strategy.mTimeoutSecs = 3; Batcher<> batch; @@ -186,7 +192,7 @@ void BatcherUnittest::TestAddWithoutGroupBatch() { // flush by time then by size res.clear(); batch.mEventFlushStrategy.SetTimeoutSecs(0); - batch.mEventFlushStrategy.SetMaxSizeBytes(10); + batch.mEventFlushStrategy.SetMinSizeBytes(10); PipelineEventGroup group3 = CreateEventGroup(1); SourceBuffer* buffer3 = group3.GetSourceBuffer().get(); RangeCheckpoint* eoo3 = group3.GetExactlyOnceCheckpoint().get(); @@ -218,8 +224,8 @@ void BatcherUnittest::TestAddWithoutGroupBatch() { void BatcherUnittest::TestAddWithGroupBatch() { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 3; - strategy.mMaxSizeBytes = 1000; + strategy.mMinCnt = 3; + strategy.mMinSizeBytes = 1000; strategy.mTimeoutSecs = 3; Batcher<> batch; @@ -298,7 +304,7 @@ void BatcherUnittest::TestAddWithGroupBatch() { // flush by time to group batch, and then group flush by size res.clear(); batch.mGroupFlushStrategy->SetTimeoutSecs(3); - batch.mGroupFlushStrategy->SetMaxSizeBytes(10); + batch.mGroupFlushStrategy->SetMinSizeBytes(10); PipelineEventGroup group5 = CreateEventGroup(1); SourceBuffer* buffer5 = group5.GetSourceBuffer().get(); RangeCheckpoint* eoo5 = group5.GetExactlyOnceCheckpoint().get(); @@ -328,7 +334,7 @@ void BatcherUnittest::TestAddWithGroupBatch() { // flush by size res.clear(); - batch.mEventFlushStrategy.SetMaxSizeBytes(10); + batch.mEventFlushStrategy.SetMinSizeBytes(10); batch.mEventFlushStrategy.SetTimeoutSecs(3); PipelineEventGroup group6 = CreateEventGroup(1); SourceBuffer* buffer6 = group6.GetSourceBuffer().get(); @@ -349,10 +355,35 @@ void BatcherUnittest::TestAddWithGroupBatch() { updateTime - 1); } +void BatcherUnittest::TestAddWithOversizedGroup() { + DefaultFlushStrategyOptions strategy; + strategy.mMaxSizeBytes = 500; + strategy.mMinCnt = 3; + strategy.mMinSizeBytes = 200; + strategy.mTimeoutSecs = 3; + + Batcher<> batch; + batch.Init(Json::Value(), sFlusher.get(), strategy); + + vector res; + PipelineEventGroup group1 = CreateEventGroup(2); + size_t key = group1.GetTagsHash(); + batch.Add(std::move(group1), res); + + PipelineEventGroup group2 = CreateEventGroup(20); + batch.Add(std::move(group2), res); + APSARA_TEST_EQUAL(1U, batch.mEventQueueMap.size()); + APSARA_TEST_EQUAL(0U, batch.mEventQueueMap[key].mBatch.mEvents.size()); + APSARA_TEST_EQUAL(3U, res.size()); + APSARA_TEST_EQUAL(2U, res[0][0].mEvents.size()); + APSARA_TEST_EQUAL(13U, res[1][0].mEvents.size()); + APSARA_TEST_EQUAL(7U, res[2][0].mEvents.size()); +} + void BatcherUnittest::TestFlushEventQueueWithoutGroupBatch() { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 3; - strategy.mMaxSizeBytes = 1000; + strategy.mMinCnt = 3; + strategy.mMinSizeBytes = 1000; strategy.mTimeoutSecs = 3; Batcher<> batch; @@ -385,8 +416,8 @@ void BatcherUnittest::TestFlushEventQueueWithoutGroupBatch() { void BatcherUnittest::TestFlushEventQueueWithGroupBatch() { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 10; - strategy.mMaxSizeBytes = 1000; + strategy.mMinCnt = 10; + strategy.mMinSizeBytes = 1000; strategy.mTimeoutSecs = 3; Batcher<> batch; @@ -415,7 +446,7 @@ void BatcherUnittest::TestFlushEventQueueWithGroupBatch() { // flush to group item, and group is flushed by time then by size batch.mGroupFlushStrategy->SetTimeoutSecs(0); - batch.mGroupFlushStrategy->SetMaxSizeBytes(10); + batch.mGroupFlushStrategy->SetMinSizeBytes(10); PipelineEventGroup group2 = CreateEventGroup(2); SourceBuffer* buffer2 = group2.GetSourceBuffer().get(); RangeCheckpoint* eoo2 = group2.GetExactlyOnceCheckpoint().get(); @@ -441,8 +472,8 @@ void BatcherUnittest::TestFlushEventQueueWithGroupBatch() { void BatcherUnittest::TestFlushGroupQueue() { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 3; - strategy.mMaxSizeBytes = 1000; + strategy.mMinCnt = 3; + strategy.mMinSizeBytes = 1000; strategy.mTimeoutSecs = 3; { // no group item @@ -481,8 +512,8 @@ void BatcherUnittest::TestFlushGroupQueue() { void BatcherUnittest::TestFlushAllWithoutGroupBatch() { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 3; - strategy.mMaxSizeBytes = 1000; + strategy.mMinCnt = 3; + strategy.mMinSizeBytes = 1000; strategy.mTimeoutSecs = 3; Batcher<> batch; @@ -509,8 +540,8 @@ void BatcherUnittest::TestFlushAllWithoutGroupBatch() { void BatcherUnittest::TestFlushAllWithGroupBatch() { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 3; - strategy.mMaxSizeBytes = 1000; + strategy.mMinCnt = 3; + strategy.mMinSizeBytes = 1000; strategy.mTimeoutSecs = 3; Batcher<> batch; @@ -537,7 +568,7 @@ void BatcherUnittest::TestFlushAllWithGroupBatch() { // flush all by time then by size batch.mGroupFlushStrategy->SetTimeoutSecs(0); - batch.mGroupFlushStrategy->SetMaxSizeBytes(10); + batch.mGroupFlushStrategy->SetMinSizeBytes(10); vector res; batch.FlushAll(res); APSARA_TEST_EQUAL(0U, batch.mEventQueueMap.size()); @@ -563,8 +594,8 @@ void BatcherUnittest::TestFlushAllWithGroupBatch() { void BatcherUnittest::TestMetric() { { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 2; - strategy.mMaxSizeBytes = 1000; + strategy.mMinCnt = 2; + strategy.mMinSizeBytes = 1000; strategy.mTimeoutSecs = 3; Batcher<> batch; @@ -579,7 +610,8 @@ void BatcherUnittest::TestMetric() { APSARA_TEST_EQUAL(6U, batch.mMetricsRecordRef->GetLabels()->size()); APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_PROJECT, "")); APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_PIPELINE_NAME, "test_config")); - APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_COMPONENT_NAME, METRIC_LABEL_VALUE_COMPONENT_NAME_BATCHER)); + APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_COMPONENT_NAME, + METRIC_LABEL_VALUE_COMPONENT_NAME_BATCHER)); APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_FLUSHER_PLUGIN_ID, "1")); APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_GROUP_BATCH_ENABLED, "false")); APSARA_TEST_EQUAL(3U, batch.mInEventsTotal->GetValue()); @@ -592,8 +624,8 @@ void BatcherUnittest::TestMetric() { } { DefaultFlushStrategyOptions strategy; - strategy.mMaxCnt = 2; - strategy.mMaxSizeBytes = 1000; + strategy.mMinCnt = 2; + strategy.mMinSizeBytes = 1000; strategy.mTimeoutSecs = 3; Batcher<> batch; @@ -632,6 +664,7 @@ PipelineEventGroup BatcherUnittest::CreateEventGroup(size_t cnt) { UNIT_TEST_CASE(BatcherUnittest, TestParamInit) UNIT_TEST_CASE(BatcherUnittest, TestInitWithoutGroupBatch) UNIT_TEST_CASE(BatcherUnittest, TestInitWithGroupBatch) +UNIT_TEST_CASE(BatcherUnittest, TestAddWithOversizedGroup) UNIT_TEST_CASE(BatcherUnittest, TestAddWithoutGroupBatch) UNIT_TEST_CASE(BatcherUnittest, TestAddWithGroupBatch) UNIT_TEST_CASE(BatcherUnittest, TestFlushEventQueueWithoutGroupBatch) diff --git a/core/unittest/batch/FlushStrategyUnittest.cpp b/core/unittest/batch/FlushStrategyUnittest.cpp index 102b5fbfc2..b19e6c7cba 100644 --- a/core/unittest/batch/FlushStrategyUnittest.cpp +++ b/core/unittest/batch/FlushStrategyUnittest.cpp @@ -27,8 +27,9 @@ class EventFlushStrategyUnittest : public ::testing::Test { protected: void SetUp() override { - mStrategy.SetMaxCnt(2); - mStrategy.SetMaxSizeBytes(100); + mStrategy.SetMinCnt(2); + mStrategy.SetMaxSizeBytes(200); + mStrategy.SetMinSizeBytes(100); mStrategy.SetTimeoutSecs(3); } @@ -45,6 +46,7 @@ void EventFlushStrategyUnittest::TestNeedFlush() { APSARA_TEST_TRUE(mStrategy.NeedFlushByCnt(status)); APSARA_TEST_FALSE(mStrategy.NeedFlushBySize(status)); APSARA_TEST_FALSE(mStrategy.NeedFlushByTime(status, PipelineEventPtr())); + APSARA_TEST_FALSE(mStrategy.SizeReachingUpperLimit(status)); status.mCnt = 1; status.mSizeBytes = 100; @@ -52,6 +54,7 @@ void EventFlushStrategyUnittest::TestNeedFlush() { APSARA_TEST_FALSE(mStrategy.NeedFlushByCnt(status)); APSARA_TEST_TRUE(mStrategy.NeedFlushBySize(status)); APSARA_TEST_FALSE(mStrategy.NeedFlushByTime(status, PipelineEventPtr())); + APSARA_TEST_FALSE(mStrategy.SizeReachingUpperLimit(status)); status.mCnt = 1; status.mSizeBytes = 50; @@ -59,6 +62,12 @@ void EventFlushStrategyUnittest::TestNeedFlush() { APSARA_TEST_FALSE(mStrategy.NeedFlushByCnt(status)); APSARA_TEST_FALSE(mStrategy.NeedFlushBySize(status)); APSARA_TEST_TRUE(mStrategy.NeedFlushByTime(status, PipelineEventPtr())); + APSARA_TEST_FALSE(mStrategy.SizeReachingUpperLimit(status)); + + status.mSizeBytes = 300; + status.mCreateTime = time(nullptr) - 1; + APSARA_TEST_FALSE(mStrategy.NeedFlushByTime(status, PipelineEventPtr())); + APSARA_TEST_TRUE(mStrategy.SizeReachingUpperLimit(status)); } UNIT_TEST_CASE(EventFlushStrategyUnittest, TestNeedFlush) @@ -91,8 +100,8 @@ class SLSEventFlushStrategyUnittest : public ::testing::Test { protected: void SetUp() override { - mStrategy.SetMaxCnt(2); - mStrategy.SetMaxSizeBytes(100); + mStrategy.SetMinCnt(2); + mStrategy.SetMinSizeBytes(100); mStrategy.SetTimeoutSecs(3); } diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index a760f5b8cd..1e1251fd51 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -107,15 +107,17 @@ void FlusherSLSUnittest::OnSuccessfulInit() { APSARA_TEST_EQUAL(sls_logs::SlsTelemetryType::SLS_TELEMETRY_TYPE_LOGS, flusher->mTelemetryType); APSARA_TEST_TRUE(flusher->mShardHashKeys.empty()); APSARA_TEST_EQUAL(static_cast(INT32_FLAG(merge_log_count_limit)), - flusher->mBatcher.GetEventFlushStrategy().GetMaxCnt()); - APSARA_TEST_EQUAL(static_cast(INT32_FLAG(batch_send_metric_size)), + flusher->mBatcher.GetEventFlushStrategy().GetMinCnt()); + APSARA_TEST_EQUAL(static_cast(INT32_FLAG(max_send_log_group_size)), flusher->mBatcher.GetEventFlushStrategy().GetMaxSizeBytes()); + APSARA_TEST_EQUAL(static_cast(INT32_FLAG(batch_send_metric_size)), + flusher->mBatcher.GetEventFlushStrategy().GetMinSizeBytes()); uint32_t timeout = static_cast(INT32_FLAG(batch_send_interval)) / 2; APSARA_TEST_EQUAL(static_cast(INT32_FLAG(batch_send_interval)) - timeout, flusher->mBatcher.GetEventFlushStrategy().GetTimeoutSecs()); APSARA_TEST_TRUE(flusher->mBatcher.GetGroupFlushStrategy().has_value()); APSARA_TEST_EQUAL(static_cast(INT32_FLAG(batch_send_metric_size)), - flusher->mBatcher.GetGroupFlushStrategy()->GetMaxSizeBytes()); + flusher->mBatcher.GetGroupFlushStrategy()->GetMinSizeBytes()); APSARA_TEST_EQUAL(timeout, flusher->mBatcher.GetGroupFlushStrategy()->GetTimeoutSecs()); APSARA_TEST_TRUE(flusher->mGroupSerializer); APSARA_TEST_TRUE(flusher->mGroupListSerializer); @@ -645,7 +647,7 @@ void FlusherSLSUnittest::TestSend() { } { // non-replay group - flusher.mBatcher.GetEventFlushStrategy().SetMaxCnt(1); + flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1); PipelineEventGroup group(make_shared()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname"); @@ -730,7 +732,7 @@ void FlusherSLSUnittest::TestSend() { } { // group - flusher.mBatcher.GetEventFlushStrategy().SetMaxCnt(1); + flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1); PipelineEventGroup group(make_shared()); group.SetMetadata(EventGroupMetaKey::SOURCE_ID, string("source-id")); group.SetTag(LOG_RESERVED_KEY_HOSTNAME, "hostname"); @@ -778,19 +780,19 @@ void FlusherSLSUnittest::TestSend() { APSARA_TEST_EQUAL("content_value", logGroup.logs(0).contents(0).value()); SenderQueueManager::GetInstance()->RemoveItem(item->mQueueKey, item); - flusher.mBatcher.GetEventFlushStrategy().SetMaxCnt(4000); + flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(4000); } { // oversized group INT32_FLAG(max_send_log_group_size) = 1; - flusher.mBatcher.GetEventFlushStrategy().SetMaxCnt(1); + flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(1); PipelineEventGroup group(make_shared()); auto e = group.AddLogEvent(); e->SetTimestamp(1234567890); e->SetContent(string("content_key"), string("content_value")); APSARA_TEST_FALSE(flusher.Send(std::move(group))); INT32_FLAG(max_send_log_group_size) = 10 * 1024 * 1024; - flusher.mBatcher.GetEventFlushStrategy().SetMaxCnt(4000); + flusher.mBatcher.GetEventFlushStrategy().SetMinCnt(4000); } } { @@ -829,7 +831,7 @@ void FlusherSLSUnittest::TestSend() { e->SetTimestamp(1234567990); e->SetContent(string("content_key"), string("content_value")); } - flusher.mBatcher.GetGroupFlushStrategy()->SetMaxSizeBytes(group.DataSize()); + flusher.mBatcher.GetGroupFlushStrategy()->SetMinSizeBytes(group.DataSize()); // flush the above two events from group item by the following event { auto e = group.AddLogEvent(); @@ -892,7 +894,7 @@ void FlusherSLSUnittest::TestSend() { for (auto& tmp : res) { SenderQueueManager::GetInstance()->RemoveItem(tmp->mQueueKey, tmp); } - flusher.mBatcher.GetGroupFlushStrategy()->SetMaxSizeBytes(256 * 1024); + flusher.mBatcher.GetGroupFlushStrategy()->SetMinSizeBytes(256 * 1024); } } From 2d5efd7232a6c733fd32d1782e4caa179479dbcb Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 1 Nov 2024 11:43:51 +0000 Subject: [PATCH 02/14] polish --- core/pipeline/queue/BoundedProcessQueue.cpp | 1 + core/pipeline/queue/CircularProcessQueue.cpp | 1 + core/pipeline/queue/ProcessQueueInterface.cpp | 1 + core/pipeline/queue/ProcessQueueInterface.h | 2 ++ 4 files changed, 5 insertions(+) diff --git a/core/pipeline/queue/BoundedProcessQueue.cpp b/core/pipeline/queue/BoundedProcessQueue.cpp index 170e4444f0..5ae43e49f9 100644 --- a/core/pipeline/queue/BoundedProcessQueue.cpp +++ b/core/pipeline/queue/BoundedProcessQueue.cpp @@ -49,6 +49,7 @@ bool BoundedProcessQueue::Push(unique_ptr&& item) { } bool BoundedProcessQueue::Pop(unique_ptr& item) { + mFetchAttemptsCnt->Add(1); if (!IsValidToPop()) { return false; } diff --git a/core/pipeline/queue/CircularProcessQueue.cpp b/core/pipeline/queue/CircularProcessQueue.cpp index 992fade5f7..330ebdb4ee 100644 --- a/core/pipeline/queue/CircularProcessQueue.cpp +++ b/core/pipeline/queue/CircularProcessQueue.cpp @@ -56,6 +56,7 @@ bool CircularProcessQueue::Push(unique_ptr&& item) { } bool CircularProcessQueue::Pop(unique_ptr& item) { + mFetchAttemptsCnt->Add(1); if (!IsValidToPop()) { return false; } diff --git a/core/pipeline/queue/ProcessQueueInterface.cpp b/core/pipeline/queue/ProcessQueueInterface.cpp index 3f28864625..19a236b378 100644 --- a/core/pipeline/queue/ProcessQueueInterface.cpp +++ b/core/pipeline/queue/ProcessQueueInterface.cpp @@ -23,6 +23,7 @@ namespace logtail { ProcessQueueInterface::ProcessQueueInterface(int64_t key, size_t cap, uint32_t priority, const PipelineContext& ctx) : QueueInterface(key, cap, ctx), mPriority(priority), mConfigName(ctx.GetConfigName()) { mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_COMPONENT_NAME, METRIC_LABEL_VALUE_COMPONENT_NAME_PROCESS_QUEUE}}); + mFetchAttemptsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL); } void ProcessQueueInterface::SetDownStreamQueues(vector&& ques) { diff --git a/core/pipeline/queue/ProcessQueueInterface.h b/core/pipeline/queue/ProcessQueueInterface.h index b8664cbdda..5ba3054f58 100644 --- a/core/pipeline/queue/ProcessQueueInterface.h +++ b/core/pipeline/queue/ProcessQueueInterface.h @@ -52,6 +52,8 @@ class ProcessQueueInterface : virtual public QueueInterface Date: Fri, 1 Nov 2024 12:43:52 +0000 Subject: [PATCH 03/14] polish --- core/pipeline/serializer/SLSSerializer.cpp | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/core/pipeline/serializer/SLSSerializer.cpp b/core/pipeline/serializer/SLSSerializer.cpp index f2a655869f..5fa59ea2f6 100644 --- a/core/pipeline/serializer/SLSSerializer.cpp +++ b/core/pipeline/serializer/SLSSerializer.cpp @@ -20,8 +20,7 @@ #include "common/compression/CompressType.h" #include "plugin/flusher/sls/FlusherSLS.h" - -DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024); +DECLARE_FLAG_INT32(max_send_log_group_size); const std::string METRIC_RESERVED_KEY_NAME = "__name__"; const std::string METRIC_RESERVED_KEY_LABELS = "__labels__"; From 077739c910f4895d72b1e84d1f4d2cb0d627d7c8 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 1 Nov 2024 13:09:26 +0000 Subject: [PATCH 04/14] polish --- core/unittest/batch/BatchItemUnittest.cpp | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/core/unittest/batch/BatchItemUnittest.cpp b/core/unittest/batch/BatchItemUnittest.cpp index 0aac5d30b3..5773cc94f7 100644 --- a/core/unittest/batch/BatchItemUnittest.cpp +++ b/core/unittest/batch/BatchItemUnittest.cpp @@ -92,7 +92,7 @@ void EventBatchItemUnittest::TestAdd() { APSARA_TEST_EQUAL(1U, mItem.mBatch.mEvents.size()); APSARA_TEST_EQUAL(1U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(size, mItem.GetStatus().GetSize()); - APSARA_TEST_NOT_EQUAL(0, mItem.mTotalEnqueTimeMs); + // APSARA_TEST_NOT_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestFlushEmpty() { @@ -133,7 +133,7 @@ void EventBatchItemUnittest::TestFlushGroupBatchItem() { APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); - APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); + // APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestFlushBatchedEvensList() { @@ -161,7 +161,7 @@ void EventBatchItemUnittest::TestFlushBatchedEvensList() { APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); - APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); + // APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestFlushBatchedEvensLists() { @@ -190,7 +190,7 @@ void EventBatchItemUnittest::TestFlushBatchedEvensLists() { APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); - APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); + // APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestExactlyOnce() { From 3a2a59ffa0d046e2ddac15bbb15fb9858769f196 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 1 Nov 2024 13:22:52 +0000 Subject: [PATCH 05/14] polish --- core/unittest/batch/BatchItemUnittest.cpp | 2 +- core/unittest/batch/BatcherUnittest.cpp | 12 ++++++------ 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/core/unittest/batch/BatchItemUnittest.cpp b/core/unittest/batch/BatchItemUnittest.cpp index 5773cc94f7..b60c0e356f 100644 --- a/core/unittest/batch/BatchItemUnittest.cpp +++ b/core/unittest/batch/BatchItemUnittest.cpp @@ -252,7 +252,7 @@ void GroupBatchItemUnittest::TestAdd() { APSARA_TEST_EQUAL(1U, mItem.mGroups.size()); APSARA_TEST_EQUAL(size, mItem.GetStatus().GetSize()); - APSARA_TEST_EQUAL(1234567890000, mItem.TotalEnqueTimeMs()); + // APSARA_TEST_EQUAL(1234567890000, mItem.TotalEnqueTimeMs()); APSARA_TEST_EQUAL(1U, mItem.EventSize()); APSARA_TEST_EQUAL(1U, mItem.GroupSize()); APSARA_TEST_EQUAL(100U, mItem.DataSize()); diff --git a/core/unittest/batch/BatcherUnittest.cpp b/core/unittest/batch/BatcherUnittest.cpp index a2ecef0b39..492654227d 100644 --- a/core/unittest/batch/BatcherUnittest.cpp +++ b/core/unittest/batch/BatcherUnittest.cpp @@ -79,8 +79,8 @@ void BatcherUnittest::TestParamInit() { string configStr, errorMsg; configStr = R"( { - "MaxSizeBytes": "1000", - "MaxCnt": "10", + "MinSizeBytes": "1000", + "MinCnt": "10", "TimeoutSecs": "5" } )"; @@ -100,8 +100,8 @@ void BatcherUnittest::TestInitWithoutGroupBatch() { string configStr, errorMsg; configStr = R"( { - "MaxSizeBytes": 1000, - "MaxCnt": 10, + "MinSizeBytes": 1000, + "MinCnt": 10, "TimeoutSecs": 5 } )"; @@ -121,8 +121,8 @@ void BatcherUnittest::TestInitWithGroupBatch() { string configStr, errorMsg; configStr = R"( { - "MaxSizeBytes": 1000, - "MaxCnt": 10, + "MinSizeBytes": 1000, + "MinCnt": 10, "TimeoutSecs": 5 } )"; From b360476f1241df68317e3d6a4b643b636d5ac8c4 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 1 Nov 2024 14:55:17 +0000 Subject: [PATCH 06/14] polish --- core/unittest/batch/BatcherUnittest.cpp | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) diff --git a/core/unittest/batch/BatcherUnittest.cpp b/core/unittest/batch/BatcherUnittest.cpp index 492654227d..5e105d6f90 100644 --- a/core/unittest/batch/BatcherUnittest.cpp +++ b/core/unittest/batch/BatcherUnittest.cpp @@ -334,21 +334,25 @@ void BatcherUnittest::TestAddWithGroupBatch() { // flush by size res.clear(); - batch.mEventFlushStrategy.SetMinSizeBytes(10); + batch.mEventFlushStrategy.SetMinSizeBytes(159); batch.mEventFlushStrategy.SetTimeoutSecs(3); PipelineEventGroup group6 = CreateEventGroup(1); SourceBuffer* buffer6 = group6.GetSourceBuffer().get(); batch.Add(std::move(group6), res); + PipelineEventGroup group7 = CreateEventGroup(2); + SourceBuffer* buffer7 = group7.GetSourceBuffer().get(); + batch.Add(std::move(group7), res); APSARA_TEST_EQUAL(1U, batch.mEventQueueMap.size()); - APSARA_TEST_EQUAL(0U, batch.mEventQueueMap[key].mBatch.mEvents.size()); + APSARA_TEST_EQUAL(1U, batch.mEventQueueMap[key].mBatch.mEvents.size()); APSARA_TEST_EQUAL(1U, res.size()); APSARA_TEST_EQUAL(1U, res[0].size()); - APSARA_TEST_EQUAL(2U, res[0][0].mEvents.size()); + APSARA_TEST_EQUAL(3U, res[0][0].mEvents.size()); APSARA_TEST_EQUAL(1U, res[0][0].mTags.mInner.size()); APSARA_TEST_STREQ("val", res[0][0].mTags.mInner["key"].data()); - APSARA_TEST_EQUAL(2U, res[0][0].mSourceBuffers.size()); + APSARA_TEST_EQUAL(3U, res[0][0].mSourceBuffers.size()); APSARA_TEST_EQUAL(buffer5, res[0][0].mSourceBuffers[0].get()); APSARA_TEST_EQUAL(buffer6, res[0][0].mSourceBuffers[1].get()); + APSARA_TEST_EQUAL(buffer7, res[0][0].mSourceBuffers[2].get()); APSARA_TEST_EQUAL(eoo5, res[0][0].mExactlyOnceCheckpoint.get()); APSARA_TEST_STREQ("pack_id", res[0][0].mPackIdPrefix.data()); APSARA_TEST_GT(TimeoutFlushManager::GetInstance()->mTimeoutRecords["test_config"].at(make_pair(0, key)).mUpdateTime, @@ -375,8 +379,11 @@ void BatcherUnittest::TestAddWithOversizedGroup() { APSARA_TEST_EQUAL(1U, batch.mEventQueueMap.size()); APSARA_TEST_EQUAL(0U, batch.mEventQueueMap[key].mBatch.mEvents.size()); APSARA_TEST_EQUAL(3U, res.size()); + APSARA_TEST_EQUAL(1U, res[0].size()); APSARA_TEST_EQUAL(2U, res[0][0].mEvents.size()); + APSARA_TEST_EQUAL(1U, res[1].size()); APSARA_TEST_EQUAL(13U, res[1][0].mEvents.size()); + APSARA_TEST_EQUAL(1U, res[2].size()); APSARA_TEST_EQUAL(7U, res[2][0].mEvents.size()); } From ae9a01a58e93f7c6704dffe3dcbae5a054a13799 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Sat, 2 Nov 2024 03:13:37 +0000 Subject: [PATCH 07/14] polish --- core/runner/FlusherRunner.cpp | 91 ++++++------------- core/runner/FlusherRunner.h | 6 +- .../InstanceConfigManagerUnittest.cpp | 9 +- 3 files changed, 33 insertions(+), 73 deletions(-) diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index ae1d55dc1a..095241c6ef 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -31,19 +31,18 @@ #include "plugin/flusher/sls/PackIdManager.h" #include "plugin/flusher/sls/SLSClientManager.h" -using namespace std; - -DEFINE_FLAG_INT32(check_send_client_timeout_interval, "", 600); -DEFINE_FLAG_BOOL(enable_flow_control, "if enable flow control", true); -DEFINE_FLAG_BOOL(enable_send_tps_smoothing, "avoid web server load burst", true); DEFINE_FLAG_INT32(flusher_runner_exit_timeout_secs, "", 60); +DEFINE_FLAG_INT32(check_send_client_timeout_interval, "", 600); -static const int SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND = 3; +using namespace std; namespace logtail { bool FlusherRunner::Init() { - srand(time(nullptr)); + LoadModuleConfig(true); + mCallback = [this]() { return LoadModuleConfig(false); }; + AppConfig::GetInstance()->RegisterCallback("max_bytes_per_sec", &mCallback); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef( mMetricsRecordRef, {{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FLUSHER}, @@ -58,9 +57,7 @@ bool FlusherRunner::Init() { mThreadRes = async(launch::async, &FlusherRunner::Run, this); mLastCheckSendClientTime = time(nullptr); - LoadModuleConfig(true); - mCallback = [this]() { return LoadModuleConfig(false); }; - AppConfig::GetInstance()->RegisterCallback("max_bytes_per_sec", &mCallback); + return true; } @@ -87,18 +84,11 @@ bool FlusherRunner::LoadModuleConfig(bool isInit) { void FlusherRunner::UpdateSendFlowControl() { // when inflow exceed 30MB/s, FlowControl lose precision if (AppConfig::GetInstance()->GetMaxBytePerSec() >= 30 * 1024 * 1024) { - if (mSendFlowControl) - mSendFlowControl = false; - if (mSendRandomSleep) - mSendRandomSleep = false; - } else { - mSendRandomSleep = BOOL_FLAG(enable_send_tps_smoothing); - mSendFlowControl = BOOL_FLAG(enable_flow_control); + mEnableRateLimiter = false; } LOG_INFO(sLogger, ("send byte per second limit", AppConfig::GetInstance()->GetMaxBytePerSec())( - "send flow control", mSendFlowControl ? "enable" : "disable")( - "send random sleep", mSendRandomSleep ? "enable" : "disable")); + "send flow control", mEnableRateLimiter ? "enable" : "disable")); } void FlusherRunner::Stop() { @@ -118,29 +108,10 @@ void FlusherRunner::DecreaseHttpSendingCnt() { } void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) { - if (!BOOL_FLAG(enable_full_drain_mode) && item->mFlusher->Name() == "flusher_sls" - && Application::GetInstance()->IsExiting()) { - DiskBufferWriter::GetInstance()->PushToDiskBuffer(item, 3); - SenderQueueManager::GetInstance()->RemoveItem(item->mFlusher->GetQueueKey(), item); - return; - } - - int32_t beforeSleepTime = time(NULL); + // TODO: use semaphore instead while (withLimit && !Application::GetInstance()->IsExiting() && GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestConcurrency()) { - usleep(10 * 1000); - } - int32_t afterSleepTime = time(NULL); - int32_t blockCostTime = afterSleepTime - beforeSleepTime; - if (blockCostTime > SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND) { - LOG_WARNING(sLogger, - ("sending log group blocked too long because send concurrency reached limit. current " - "concurrency used", - GetSendingBufferCount())("max concurrency", AppConfig::GetInstance()->GetSendRequestConcurrency())( - "blocked time", blockCostTime)); - LogtailAlarm::GetInstance()->SendAlarm(SENDING_COSTS_TOO_MUCH_TIME_ALARM, - "sending log group blocked for too much time, cost " - + ToString(blockCostTime)); + this_thread::sleep_for(chrono::milliseconds(10)); } auto req = static_cast(item->mFlusher)->BuildRequest(item); @@ -173,30 +144,19 @@ void FlusherRunner::Run() { } mInItemsTotal->Add(items.size()); mWaitingItemsTotal->Add(items.size()); - - // smoothing send tps, walk around webserver load burst - uint32_t bufferPackageCount = items.size(); - if (!Application::GetInstance()->IsExiting() && mSendRandomSleep) { - int64_t sleepMicroseconds = 0; - if (bufferPackageCount < 20) - sleepMicroseconds = (rand() % 30) * 10000; // 0ms ~ 300ms - else if (bufferPackageCount < 30) - sleepMicroseconds = (rand() % 20) * 10000; // 0ms ~ 200ms - else if (bufferPackageCount < 40) - sleepMicroseconds = (rand() % 10) * 10000; // 0ms ~ 100ms - if (sleepMicroseconds > 0) - usleep(sleepMicroseconds); - } } for (auto itr = items.begin(); itr != items.end(); ++itr) { - auto waitTime = chrono::duration_cast(curTime - (*itr)->mFirstEnqueTime); - LOG_DEBUG(sLogger, - ("got item from sender queue, item address", - *itr)("config-flusher-dst", QueueKeyManager::GetInstance()->GetName((*itr)->mQueueKey))( - "wait time", ToString(waitTime.count()) + "ms")("try cnt", ToString((*itr)->mTryCnt))); - - if (!Application::GetInstance()->IsExiting() && mSendFlowControl) { + LOG_DEBUG( + sLogger, + ("got item from sender queue, item address", + *itr)("config-flusher-dst", QueueKeyManager::GetInstance()->GetName((*itr)->mQueueKey))( + "wait time", + ToString(chrono::duration_cast(curTime - (*itr)->mFirstEnqueTime).count()) + + "ms")("try cnt", ToString((*itr)->mTryCnt))); + + // TODO: use rate limiter instead + if (!Application::GetInstance()->IsExiting() && mEnableRateLimiter) { RateLimiter::FlowControl((*itr)->mRawSize, mSendLastTime, mSendLastByte, true); } @@ -222,7 +182,14 @@ void FlusherRunner::Run() { void FlusherRunner::Dispatch(SenderQueueItem* item) { switch (item->mFlusher->GetSinkType()) { case SinkType::HTTP: - PushToHttpSink(item); + // TODO: make it common for all http flushers + if (!BOOL_FLAG(enable_full_drain_mode) && Application::GetInstance()->IsExiting() + && item->mFlusher->Name() == "flusher_sls") { + DiskBufferWriter::GetInstance()->PushToDiskBuffer(item, 3); + SenderQueueManager::GetInstance()->RemoveItem(item->mFlusher->GetQueueKey(), item); + } else { + PushToHttpSink(item); + } break; default: SenderQueueManager::GetInstance()->RemoveItem(item->mFlusher->GetQueueKey(), item); diff --git a/core/runner/FlusherRunner.h b/core/runner/FlusherRunner.h index 4c25d1e350..e8fbf23e0f 100644 --- a/core/runner/FlusherRunner.h +++ b/core/runner/FlusherRunner.h @@ -47,14 +47,13 @@ class FlusherRunner { int32_t GetSendingBufferCount() { return mHttpSendingCnt.load(); } - bool LoadModuleConfig(bool isInit); - private: FlusherRunner() = default; ~FlusherRunner() = default; void Run(); void Dispatch(SenderQueueItem* item); + bool LoadModuleConfig(bool isInit); void UpdateSendFlowControl(); std::function mCallback; @@ -69,8 +68,7 @@ class FlusherRunner { int64_t mSendLastTime = 0; int32_t mSendLastByte = 0; - bool mSendRandomSleep; - bool mSendFlowControl; + bool mEnableRateLimiter = true; mutable MetricsRecordRef mMetricsRecordRef; CounterPtr mInItemsTotal; diff --git a/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp b/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp index 5adb906538..f3bc3c7b54 100644 --- a/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp +++ b/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp @@ -22,9 +22,6 @@ using namespace std; -DECLARE_FLAG_BOOL(enable_send_tps_smoothing); -DECLARE_FLAG_BOOL(enable_flow_control); - namespace logtail { class InstanceConfigManagerUnittest : public testing::Test { @@ -151,8 +148,7 @@ void InstanceConfigManagerUnittest::TestUpdateInstanceConfigs() { APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test3")); } APSARA_TEST_EQUAL(kDefaultMaxSendBytePerSec, AppConfig::GetInstance()->GetMaxBytePerSec()); - APSARA_TEST_EQUAL(true, FlusherRunner::GetInstance()->mSendRandomSleep); - APSARA_TEST_EQUAL(true, FlusherRunner::GetInstance()->mSendFlowControl); + APSARA_TEST_EQUAL(true, FlusherRunner::GetInstance()->mEnableRateLimiter); // Modified status = 1; { @@ -206,8 +202,7 @@ void InstanceConfigManagerUnittest::TestUpdateInstanceConfigs() { APSARA_TEST_EQUAL(nullptr, InstanceConfigManager::GetInstance()->FindConfigByName("test3")); } APSARA_TEST_EQUAL(31457280, AppConfig::GetInstance()->GetMaxBytePerSec()); - APSARA_TEST_EQUAL(false, FlusherRunner::GetInstance()->mSendRandomSleep); - APSARA_TEST_EQUAL(false, FlusherRunner::GetInstance()->mSendFlowControl); + APSARA_TEST_EQUAL(false, FlusherRunner::GetInstance()->mEnableRateLimiter); // Removed status = 2; { From c340561d59ff57f40d341360b643130740b0cffe Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Sat, 2 Nov 2024 03:46:42 +0000 Subject: [PATCH 08/14] polish --- core/runner/ProcessorRunner.cpp | 21 ++++++++++++--------- 1 file changed, 12 insertions(+), 9 deletions(-) diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index d7b15ba311..17892f44b4 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -27,10 +27,6 @@ #include "queue/ProcessQueueManager.h" #include "queue/QueueKeyManager.h" -DECLARE_FLAG_INT32(max_send_log_group_size); - -using namespace std; - #if defined(_MSC_VER) // On Windows, if Chinese config base path is used, the log path will be converted to GBK, // so the __tag__.__path__ have to be converted back to UTF8 to avoid bad display. @@ -40,6 +36,10 @@ DEFINE_FLAG_BOOL(enable_chinese_tag_path, "Enable Chinese __tag__.__path__", tru DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1); DEFINE_FLAG_INT32(processor_runner_exit_timeout_secs, "", 60); +DECLARE_FLAG_INT32(max_send_log_group_size); + +using namespace std; + namespace logtail { thread_local MetricsRecordRef ProcessorRunner::sMetricsRecordRef; @@ -90,7 +90,7 @@ bool ProcessorRunner::PushQueue(QueueKey key, size_t inputIndex, PipelineEventGr } void ProcessorRunner::Run(uint32_t threadNo) { - LOG_INFO(sLogger, ("processor runner", "started")("threadNo", threadNo)); + LOG_INFO(sLogger, ("processor runner", "started")("thread no", threadNo)); // thread local metrics should be initialized in each thread WriteMetrics::GetInstance()->PrepareMetricsRecordRef( @@ -103,12 +103,12 @@ void ProcessorRunner::Run(uint32_t threadNo) { sInGroupDataSizeBytes = sMetricsRecordRef.CreateCounter(METRIC_RUNNER_IN_SIZE_BYTES); sLastRunTime = sMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); - static int32_t lastMergeTime = 0; + static int32_t lastFlushBatchTime = 0; while (true) { - int32_t curTime = time(NULL); - if (threadNo == 0 && curTime - lastMergeTime >= INT32_FLAG(default_flush_merged_buffer_interval)) { + int32_t curTime = time(nullptr); + if (threadNo == 0 && curTime - lastFlushBatchTime >= INT32_FLAG(default_flush_merged_buffer_interval)) { TimeoutFlushManager::GetInstance()->FlushTimeoutBatch(); - lastMergeTime = curTime; + lastFlushBatchTime = curTime; } sLastRunTime->Set(curTime); @@ -144,6 +144,9 @@ void ProcessorRunner::Run(uint32_t threadNo) { pipeline->Process(eventGroupList, item->mInputIndex); if (pipeline->IsFlushingThroughGoPipeline()) { + // TODO: + // 1. allow all event types to be sent to Go pipelines + // 2. use event group protobuf instead if (isLog) { for (auto& group : eventGroupList) { string res, errorMsg; From c284559eec4a7d9c6de707b738e016a5f68122d3 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Sat, 2 Nov 2024 06:20:53 +0000 Subject: [PATCH 09/14] polish --- core/monitor/metric_constants/ComponentMetrics.cpp | 4 ++-- core/monitor/metric_constants/MetricConstants.h | 4 ++-- core/pipeline/queue/BoundedProcessQueue.cpp | 6 +++++- core/pipeline/queue/CircularProcessQueue.cpp | 6 +++++- core/pipeline/queue/ProcessQueueInterface.cpp | 5 +++-- core/pipeline/queue/ProcessQueueInterface.h | 3 ++- core/pipeline/queue/SenderQueue.cpp | 8 ++++---- core/pipeline/queue/SenderQueue.h | 4 ++-- 8 files changed, 25 insertions(+), 15 deletions(-) diff --git a/core/monitor/metric_constants/ComponentMetrics.cpp b/core/monitor/metric_constants/ComponentMetrics.cpp index 006602e751..cbbb8ed40b 100644 --- a/core/monitor/metric_constants/ComponentMetrics.cpp +++ b/core/monitor/metric_constants/ComponentMetrics.cpp @@ -74,8 +74,8 @@ const string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES = "component_extra_b const string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_TOTAL = "component_discarded_events_total"; const string METRIC_COMPONENT_QUEUE_FETCHED_ITEMS_TOTAL = "component_fetched_items_total"; -const string METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL = "component_fetch_attempts_total"; -const string METRIC_COMPONENT_QUEUE_SUCCESSFUL_FETCH_TIMES_TOTAL = "component_successful_fetch_times_total"; +const string METRIC_COMPONENT_QUEUE_FETCH_TIMES_TOTAL = "component_fetch_times_total"; +const string METRIC_COMPONENT_QUEUE_VALID_FETCH_TIMES_TOTAL = "component_valid_fetch_times_total"; const string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_REGION_LIMITER_TIMES_TOTAL = "component_fetch_rejected_by_region_limiter_times_total"; const string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_PROJECT_LIMITER_TIMES_TOTAL diff --git a/core/monitor/metric_constants/MetricConstants.h b/core/monitor/metric_constants/MetricConstants.h index 988c8b4afa..84740a09b9 100644 --- a/core/monitor/metric_constants/MetricConstants.h +++ b/core/monitor/metric_constants/MetricConstants.h @@ -261,8 +261,8 @@ extern const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES; extern const std::string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_TOTAL; extern const std::string METRIC_COMPONENT_QUEUE_FETCHED_ITEMS_TOTAL; -extern const std::string METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL; -extern const std::string METRIC_COMPONENT_QUEUE_SUCCESSFUL_FETCH_TIMES_TOTAL; +extern const std::string METRIC_COMPONENT_QUEUE_FETCH_TIMES_TOTAL; +extern const std::string METRIC_COMPONENT_QUEUE_VALID_FETCH_TIMES_TOTAL; extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_REGION_LIMITER_TIMES_TOTAL; extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_PROJECT_LIMITER_TIMES_TOTAL; extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_LOGSTORE_LIMITER_TIMES_TOTAL; diff --git a/core/pipeline/queue/BoundedProcessQueue.cpp b/core/pipeline/queue/BoundedProcessQueue.cpp index 5ae43e49f9..ee7532f9b2 100644 --- a/core/pipeline/queue/BoundedProcessQueue.cpp +++ b/core/pipeline/queue/BoundedProcessQueue.cpp @@ -49,7 +49,11 @@ bool BoundedProcessQueue::Push(unique_ptr&& item) { } bool BoundedProcessQueue::Pop(unique_ptr& item) { - mFetchAttemptsCnt->Add(1); + mFetchTimesCnt->Add(1); + if (Empty()) { + return false; + } + mValidFetchTimesCnt->Add(1); if (!IsValidToPop()) { return false; } diff --git a/core/pipeline/queue/CircularProcessQueue.cpp b/core/pipeline/queue/CircularProcessQueue.cpp index 330ebdb4ee..6c4f57ba74 100644 --- a/core/pipeline/queue/CircularProcessQueue.cpp +++ b/core/pipeline/queue/CircularProcessQueue.cpp @@ -56,7 +56,11 @@ bool CircularProcessQueue::Push(unique_ptr&& item) { } bool CircularProcessQueue::Pop(unique_ptr& item) { - mFetchAttemptsCnt->Add(1); + mFetchTimesCnt->Add(1); + if (Empty()) { + return false; + } + mValidFetchTimesCnt->Add(1); if (!IsValidToPop()) { return false; } diff --git a/core/pipeline/queue/ProcessQueueInterface.cpp b/core/pipeline/queue/ProcessQueueInterface.cpp index 19a236b378..01acae6d69 100644 --- a/core/pipeline/queue/ProcessQueueInterface.cpp +++ b/core/pipeline/queue/ProcessQueueInterface.cpp @@ -23,7 +23,8 @@ namespace logtail { ProcessQueueInterface::ProcessQueueInterface(int64_t key, size_t cap, uint32_t priority, const PipelineContext& ctx) : QueueInterface(key, cap, ctx), mPriority(priority), mConfigName(ctx.GetConfigName()) { mMetricsRecordRef.AddLabels({{METRIC_LABEL_KEY_COMPONENT_NAME, METRIC_LABEL_VALUE_COMPONENT_NAME_PROCESS_QUEUE}}); - mFetchAttemptsCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL); + mFetchTimesCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_QUEUE_FETCH_TIMES_TOTAL); + mValidFetchTimesCnt = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_QUEUE_VALID_FETCH_TIMES_TOTAL); } void ProcessQueueInterface::SetDownStreamQueues(vector&& ques) { @@ -38,7 +39,7 @@ void ProcessQueueInterface::SetDownStreamQueues(vectorCommitMetricsRecordRef(mMetricsRecordRef); } @@ -111,7 +111,7 @@ bool SenderQueue::Remove(SenderQueueItem* item) { } void SenderQueue::GetAvailableItems(vector& items, int32_t limit) { - mFetchAttemptsCnt->Add(1); + mFetchTimesCnt->Add(1); if (Empty()) { return; } @@ -173,7 +173,7 @@ void SenderQueue::GetAvailableItems(vector& items, int32_t lim } } if (hasAvailableItem) { - mSuccessfulFetchTimesCnt->Add(1); + mValidFetchTimesCnt->Add(1); } } diff --git a/core/pipeline/queue/SenderQueue.h b/core/pipeline/queue/SenderQueue.h index 0e18bdb700..4c3a714d09 100644 --- a/core/pipeline/queue/SenderQueue.h +++ b/core/pipeline/queue/SenderQueue.h @@ -47,8 +47,8 @@ class SenderQueue : public BoundedSenderQueueInterface { size_t mRead = 0; size_t mSize = 0; - CounterPtr mSuccessfulFetchTimesCnt; - CounterPtr mFetchAttemptsCnt; + CounterPtr mFetchTimesCnt; + CounterPtr mValidFetchTimesCnt; CounterPtr mFetchedItemsCnt; #ifdef APSARA_UNIT_TEST_MAIN From 267fd0a0b3b134a05939cd1c983445c5688efbca Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Sat, 2 Nov 2024 06:24:31 +0000 Subject: [PATCH 10/14] polish --- core/models/EventPool.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/core/models/EventPool.cpp b/core/models/EventPool.cpp index 8941137c86..4f7e084ed9 100644 --- a/core/models/EventPool.cpp +++ b/core/models/EventPool.cpp @@ -105,13 +105,18 @@ void DoGC(vector& pool, vector& poolBak, size_t& minUnusedCnt, mutex* mu delete pool.back(); pool.pop_back(); } + size_t bakSZ = 0; if (mux) { lock_guard lock(*mux); + bakSZ = poolBak.size(); for (auto& item : poolBak) { delete item; } poolBak.clear(); } + if (sz != 0 || bakSZ != 0) { + LOG_INFO(sLogger, ("event pool gc", "done")("gc event cnt", sz + bakSZ)("pool size", pool.size())); + } } else { LOG_ERROR(sLogger, ("unexpected error", "min unused event cnt is greater than pool size")( From 315f8bdebaa6171cf04cbb68812b84126d6fe844 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Sat, 2 Nov 2024 10:50:21 +0000 Subject: [PATCH 11/14] polish --- core/models/EventPool.cpp | 27 +++++++++++++-------------- core/models/PipelineEventGroup.cpp | 16 +++++++++++++++- core/pipeline/batch/BatchedEvents.cpp | 16 +++++++++++++++- 3 files changed, 43 insertions(+), 16 deletions(-) diff --git a/core/models/EventPool.cpp b/core/models/EventPool.cpp index 4f7e084ed9..1308bd75be 100644 --- a/core/models/EventPool.cpp +++ b/core/models/EventPool.cpp @@ -41,9 +41,8 @@ EventPool::~EventPool() { } LogEvent* EventPool::AcquireLogEvent(PipelineEventGroup* ptr) { - TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak); - if (mEnableLock) { + TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak); lock_guard lock(mPoolMux); return AcquireEventNoLock(ptr, mLogEventPool, mMinUnusedLogEventsCnt); } @@ -51,9 +50,8 @@ LogEvent* EventPool::AcquireLogEvent(PipelineEventGroup* ptr) { } MetricEvent* EventPool::AcquireMetricEvent(PipelineEventGroup* ptr) { - TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak); - if (mEnableLock) { + TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak); lock_guard lock(mPoolMux); return AcquireEventNoLock(ptr, mMetricEventPool, mMinUnusedMetricEventsCnt); } @@ -61,9 +59,8 @@ MetricEvent* EventPool::AcquireMetricEvent(PipelineEventGroup* ptr) { } SpanEvent* EventPool::AcquireSpanEvent(PipelineEventGroup* ptr) { - TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak); - if (mEnableLock) { + TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak); lock_guard lock(mPoolMux); return AcquireEventNoLock(ptr, mSpanEventPool, mMinUnusedSpanEventsCnt); } @@ -98,7 +95,7 @@ void EventPool::Release(vector&& obj) { } template -void DoGC(vector& pool, vector& poolBak, size_t& minUnusedCnt, mutex* mux) { +void DoGC(vector& pool, vector& poolBak, size_t& minUnusedCnt, mutex* mux, const string& type) { if (minUnusedCnt <= pool.size() || minUnusedCnt == numeric_limits::max()) { auto sz = minUnusedCnt == numeric_limits::max() ? pool.size() : minUnusedCnt; for (size_t i = 0; i < sz; ++i) { @@ -115,7 +112,9 @@ void DoGC(vector& pool, vector& poolBak, size_t& minUnusedCnt, mutex* mu poolBak.clear(); } if (sz != 0 || bakSZ != 0) { - LOG_INFO(sLogger, ("event pool gc", "done")("gc event cnt", sz + bakSZ)("pool size", pool.size())); + LOG_INFO( + sLogger, + ("event pool gc", "done")("event type", type)("gc event cnt", sz + bakSZ)("pool size", pool.size())); } } else { LOG_ERROR(sLogger, @@ -129,13 +128,13 @@ void EventPool::CheckGC() { if (time(nullptr) - mLastGCTime > INT32_FLAG(event_pool_gc_interval_secs)) { if (mEnableLock) { lock_guard lock(mPoolMux); - DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux); - DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux); - DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux); + DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux, "log"); + DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux, "metric"); + DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux, "span"); } else { - DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr); - DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr); - DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr); + DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr, "log"); + DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr, "metric"); + DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr, "span"); } mLastGCTime = time(nullptr); } diff --git a/core/models/PipelineEventGroup.cpp b/core/models/PipelineEventGroup.cpp index 16f194fd03..35ebd35a0d 100644 --- a/core/models/PipelineEventGroup.cpp +++ b/core/models/PipelineEventGroup.cpp @@ -34,10 +34,24 @@ namespace logtail { template void DestroyEvents(vector&& events) { unordered_map> eventsPoolMap; + // for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency + EventPool* cachedPoolPtr = nullptr; + typename unordered_map>::iterator cachedIt; + bool firstEvent = true; for (auto& item : events) { if (item && item.IsFromEventPool()) { item->Reset(); - eventsPoolMap[item.GetEventPool()].emplace_back(static_cast(item.Release())); + if (firstEvent || item.GetEventPool() != cachedPoolPtr) { + cachedPoolPtr = item.GetEventPool(); + cachedIt = eventsPoolMap.find(cachedPoolPtr); + if (cachedIt == eventsPoolMap.end()) { + eventsPoolMap.emplace(cachedPoolPtr, vector()); + cachedIt = eventsPoolMap.find(cachedPoolPtr); + cachedIt->second.reserve(events.size()); + } + firstEvent = false; + } + cachedIt->second.emplace_back(static_cast(item.Release())); } } for (auto& item : eventsPoolMap) { diff --git a/core/pipeline/batch/BatchedEvents.cpp b/core/pipeline/batch/BatchedEvents.cpp index 02ccc25905..43a63ea9e4 100644 --- a/core/pipeline/batch/BatchedEvents.cpp +++ b/core/pipeline/batch/BatchedEvents.cpp @@ -23,10 +23,24 @@ namespace logtail { template void DestroyEvents(vector&& events) { unordered_map> eventsPoolMap; + // for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency + EventPool* cachedPoolPtr = nullptr; + typename unordered_map>::iterator cachedIt; + bool firstEvent = true; for (auto& item : events) { if (item && item.IsFromEventPool()) { item->Reset(); - eventsPoolMap[item.GetEventPool()].emplace_back(static_cast(item.Release())); + if (firstEvent || item.GetEventPool() != cachedPoolPtr) { + cachedPoolPtr = item.GetEventPool(); + cachedIt = eventsPoolMap.find(cachedPoolPtr); + if (cachedIt == eventsPoolMap.end()) { + eventsPoolMap.emplace(cachedPoolPtr, vector()); + cachedIt = eventsPoolMap.find(cachedPoolPtr); + cachedIt->second.reserve(events.size()); + } + firstEvent = false; + } + cachedIt->second.emplace_back(static_cast(item.Release())); } } for (auto& item : eventsPoolMap) { From 0336a6ad12a8090351729055e1afe46306b162ea Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Sat, 2 Nov 2024 12:35:55 +0000 Subject: [PATCH 12/14] polish --- core/models/EventPool.cpp | 2 +- core/runner/sink/http/HttpSink.cpp | 21 +++++++++++++-------- 2 files changed, 14 insertions(+), 9 deletions(-) diff --git a/core/models/EventPool.cpp b/core/models/EventPool.cpp index 1308bd75be..0a9035af71 100644 --- a/core/models/EventPool.cpp +++ b/core/models/EventPool.cpp @@ -60,7 +60,7 @@ MetricEvent* EventPool::AcquireMetricEvent(PipelineEventGroup* ptr) { SpanEvent* EventPool::AcquireSpanEvent(PipelineEventGroup* ptr) { if (mEnableLock) { - TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak); + TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak); lock_guard lock(mPoolMux); return AcquireEventNoLock(ptr, mSpanEventPool, mMinUnusedSpanEventsCnt); } diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index d1670ffeb9..2969ee5bf9 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -168,7 +168,8 @@ void HttpSink::DoRun() { HandleCompletedRequests(runningHandlers); unique_ptr request; - if (mQueue.TryPop(request)) { + bool hasRequest = false; + while (mQueue.TryPop(request)) { mInItemsTotal->Add(1); LOG_DEBUG(sLogger, ("got item from flusher runner, item address", request->mItem)( @@ -180,8 +181,12 @@ void HttpSink::DoRun() { if (AddRequestToClient(std::move(request))) { ++runningHandlers; mSendingItemsTotal->Add(1); + hasRequest = true; } } + if (hasRequest) { + continue; + } struct timeval timeout { 1, 0 @@ -272,13 +277,13 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { static_cast(request->mItem->mFlusher) ->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); - LOG_DEBUG( - sLogger, - ("failed to send http request", "abort")("item address", request->mItem)( - "config-flusher-dst", - QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( - "response time", ToString(responseTimeMs) + "ms")("try cnt", ToString(request->mTryCnt))( - "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); + LOG_DEBUG(sLogger, + ("failed to send http request", "abort")("item address", request->mItem)( + "config-flusher-dst", + QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "response time", ToString(responseTimeMs) + "ms")("try cnt", + ToString(request->mTryCnt))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); } mOutFailedItemsTotal->Add(1); mFailedItemTotalResponseTimeMs->Add(responseTime); From 01f98ec2a29320aabe90acd67893057224ac716e Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Wed, 6 Nov 2024 02:07:45 +0000 Subject: [PATCH 13/14] polish --- core/pipeline/serializer/SLSSerializer.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/pipeline/serializer/SLSSerializer.cpp b/core/pipeline/serializer/SLSSerializer.cpp index 3f48dc961a..a6b8d7b803 100644 --- a/core/pipeline/serializer/SLSSerializer.cpp +++ b/core/pipeline/serializer/SLSSerializer.cpp @@ -19,7 +19,7 @@ #include "plugin/flusher/sls/FlusherSLS.h" #include "protobuf/sls/LogGroupSerializer.h" -DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024); +DECLARE_FLAG_INT32(max_send_log_group_size); using namespace std; From 22c67a896a3533b3dfefc40f71d2ced0e107abd2 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Thu, 7 Nov 2024 03:05:11 +0000 Subject: [PATCH 14/14] polish --- core/plugin/flusher/sls/FlusherSLS.cpp | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index f4c31ee7df..b0fd4d629a 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -60,6 +60,7 @@ DEFINE_FLAG_INT32(unknow_error_try_max, "discard data when try times > this valu DEFINE_FLAG_BOOL(global_network_success, "global network success flag, default false", false); DEFINE_FLAG_BOOL(enable_metricstore_channel, "only works for metrics data for enhance metrics query performance", true); DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024); +DEFINE_FLAG_INT32(sls_serialize_size_expansion_ratio, "", 1.15); DECLARE_FLAG_BOOL(send_prefer_real_ip); @@ -475,10 +476,11 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mContext->GetRegion()); } - DefaultFlushStrategyOptions strategy{static_cast(INT32_FLAG(max_send_log_group_size)), - static_cast(INT32_FLAG(batch_send_metric_size)), - static_cast(INT32_FLAG(merge_log_count_limit)), - static_cast(INT32_FLAG(batch_send_interval))}; + DefaultFlushStrategyOptions strategy{ + static_cast(INT32_FLAG(max_send_log_group_size) / INT32_FLAG(sls_serialize_size_expansion_ratio)), + static_cast(INT32_FLAG(batch_send_metric_size)), + static_cast(INT32_FLAG(merge_log_count_limit)), + static_cast(INT32_FLAG(batch_send_interval))}; if (!mBatcher.Init( itr ? *itr : Json::Value(), this, strategy, !mContext->IsExactlyOnceEnabled() && mShardHashKeys.empty())) { // when either exactly once is enabled or ShardHashKeys is not empty, we don't enable group batch