From b208161367bc1baa02b87137d2d0b6dd9f59c3b3 Mon Sep 17 00:00:00 2001 From: linrunqi08 Date: Mon, 23 Dec 2024 06:09:06 +0000 Subject: [PATCH] refine limiter code --- core/app_config/AppConfig.cpp | 9 ++ core/app_config/AppConfig.h | 3 + core/pipeline/limiter/ConcurrencyLimiter.cpp | 96 +++++++++----- core/pipeline/limiter/ConcurrencyLimiter.h | 45 ++++--- .../queue/BoundedSenderQueueInterface.cpp | 7 - .../queue/BoundedSenderQueueInterface.h | 1 - core/plugin/flusher/sls/FlusherSLS.cpp | 39 +++--- core/runner/FlusherRunner.cpp | 4 +- core/runner/sink/http/HttpSink.cpp | 2 +- .../pipeline/ConcurrencyLimiterUnittest.cpp | 125 +++++++++++++----- 10 files changed, 218 insertions(+), 113 deletions(-) diff --git a/core/app_config/AppConfig.cpp b/core/app_config/AppConfig.cpp index 202d9f67f1..a3ebc6a9ff 100644 --- a/core/app_config/AppConfig.cpp +++ b/core/app_config/AppConfig.cpp @@ -1190,6 +1190,15 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) { mBindInterface.clear(); LOG_INFO(sLogger, ("bind_interface", mBindInterface)); } + + // mSendRequestConcurrency was limited in [15, 80] + if (mSendRequestConcurrency < 15) { + mSendRequestConcurrency = 15; + } + if (mSendRequestConcurrency > 80) { + mSendRequestConcurrency = 80; + } + mSendRequestGlobalConcurrency = mSendRequestConcurrency * 1.5; } bool AppConfig::CheckAndResetProxyEnv() { diff --git a/core/app_config/AppConfig.h b/core/app_config/AppConfig.h index a1f0af7ec5..2ed376884b 100644 --- a/core/app_config/AppConfig.h +++ b/core/app_config/AppConfig.h @@ -132,6 +132,7 @@ class AppConfig { int32_t mNumOfBufferFile; int32_t mLocalFileSize; int32_t mSendRequestConcurrency; + int32_t mSendRequestGlobalConcurrency; std::string mBufferFilePath; // checkpoint @@ -438,6 +439,8 @@ class AppConfig { int32_t GetSendRequestConcurrency() const { return mSendRequestConcurrency; } + int32_t GetSendRequestGlobalConcurrency() const { return mSendRequestGlobalConcurrency; } + int32_t GetProcessThreadCount() const { return mProcessThreadCount; } // const std::string& GetMappingConfigPath() const { return mMappingConfigPath; } diff --git a/core/pipeline/limiter/ConcurrencyLimiter.cpp b/core/pipeline/limiter/ConcurrencyLimiter.cpp index ac75c8de27..7979ccfc22 100644 --- a/core/pipeline/limiter/ConcurrencyLimiter.cpp +++ b/core/pipeline/limiter/ConcurrencyLimiter.cpp @@ -27,10 +27,6 @@ uint32_t ConcurrencyLimiter::GetCurrentLimit() const { return mCurrenctConcurrency; } -uint32_t ConcurrencyLimiter::GetCurrentInterval() const { - lock_guard lock(mLimiterMux); - return mRetryIntervalSecs; -} void ConcurrencyLimiter::SetCurrentLimit(uint32_t limit) { lock_guard lock(mLimiterMux); mCurrenctConcurrency = limit; @@ -42,19 +38,15 @@ void ConcurrencyLimiter::SetInSendingCount(uint32_t count) { uint32_t ConcurrencyLimiter::GetInSendingCount() const { return mInSendingCnt.load(); } + +uint32_t ConcurrencyLimiter::GetStatisticThreshold() const { + return mStatisticThreshold; +} + #endif bool ConcurrencyLimiter::IsValidToPop() { lock_guard lock(mLimiterMux); - if (mCurrenctConcurrency == 0) { - auto curTime = std::chrono::system_clock::now(); - if (chrono::duration_cast(curTime - mLastCheckTime).count() > mRetryIntervalSecs) { - mLastCheckTime = curTime; - return true; - } else { - return false; - } - } if (mCurrenctConcurrency > mInSendingCnt.load()) { return true; } @@ -69,14 +61,22 @@ void ConcurrencyLimiter::OnSendDone() { --mInSendingCnt; } -void ConcurrencyLimiter::OnSuccess() { +void ConcurrencyLimiter::OnSuccess(std::chrono::system_clock::time_point time) { + AdjustConcurrency(true, time); +} + +void ConcurrencyLimiter::OnFail(std::chrono::system_clock::time_point time) { + AdjustConcurrency(false, time); +} + + + +void ConcurrencyLimiter::Increase() { lock_guard lock(mLimiterMux); - if (mCurrenctConcurrency <= 0) { - mRetryIntervalSecs = mMinRetryIntervalSecs; - LOG_INFO(sLogger, ("reset send retry interval, type", mDescription)); - } if (mCurrenctConcurrency != mMaxConcurrency) { ++mCurrenctConcurrency; + LOG_INFO(sLogger, + ("increase send concurrency", mDescription)("concurrency", mCurrenctConcurrency)); if (mCurrenctConcurrency == mMaxConcurrency) { LOG_INFO(sLogger, ("increase send concurrency to maximum, type", mDescription)("concurrency", mCurrenctConcurrency)); @@ -88,22 +88,58 @@ void ConcurrencyLimiter::OnSuccess() { } } -void ConcurrencyLimiter::OnFail() { +void ConcurrencyLimiter::Decrease(bool fastFallBack) { lock_guard lock(mLimiterMux); - if (mCurrenctConcurrency != 0) { - auto old = mCurrenctConcurrency; - mCurrenctConcurrency = static_cast(mCurrenctConcurrency * mConcurrencyDownRatio); - LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency)); + if (fastFallBack) { + if (mCurrenctConcurrency != mMinConcurrency) { + auto old = mCurrenctConcurrency; + mCurrenctConcurrency = std::max(static_cast(mCurrenctConcurrency * mConcurrencyDownFastRatio), mMinConcurrency); + LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency)); + } } else { - if (mRetryIntervalSecs != mMaxRetryIntervalSecs) { - auto old = mRetryIntervalSecs; - mRetryIntervalSecs - = min(mMaxRetryIntervalSecs, static_cast(mRetryIntervalSecs * mRetryIntervalUpRatio)); - LOG_INFO(sLogger, - ("increase send retry interval, type", - mDescription)("from", ToString(old) + "s")("to", ToString(mRetryIntervalSecs) + "s")); + if (mCurrenctConcurrency != mMinConcurrency) { + mCurrenctConcurrency = std::max(static_cast(mCurrenctConcurrency * mConcurrencyDownSlowRatio), mMinConcurrency); + LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("to", mCurrenctConcurrency)); + } else { + if (mMinConcurrency == 0) { + mCurrenctConcurrency = 1; + LOG_INFO(sLogger, ("decrease send concurrency to min, type", mDescription)("to", mCurrenctConcurrency)); + } } } } + +void ConcurrencyLimiter::AdjustConcurrency(bool success, std::chrono::system_clock::time_point time) { + lock_guard lock(mStatisticsMux); + mStatisticsTotal ++; + if (!success) { + mStatisticsFailTotal ++; + } + if (mLastStatisticsTime == std::chrono::system_clock::time_point()) { + mLastStatisticsTime = time; + } + + if (mStatisticsTotal == mStatisticThreshold || chrono::duration_cast(time - mLastStatisticsTime).count() > mStatisticIntervalThresholdSeconds) { + uint32_t failPercentage = mStatisticsFailTotal*100/mStatisticsTotal; + LOG_DEBUG(sLogger,("AdjustConcurrency", mDescription)("mStatisticsFailTotal", mStatisticsFailTotal)("mStatisticsTotal", mStatisticsTotal)); + mStatisticsTotal = 0; + mStatisticsFailTotal = 0; + mLastStatisticsTime = time; + if (failPercentage == 0) { + // 成功 + Increase(); + } else if (failPercentage <= 10) { + // 不调整 + } else if (failPercentage <= 40) { + // 慢回退 + Decrease(false); + } else { + // 快速回退 + Decrease(true); + } + } +} + + } // namespace logtail diff --git a/core/pipeline/limiter/ConcurrencyLimiter.h b/core/pipeline/limiter/ConcurrencyLimiter.h index 1191326b04..8816c16198 100644 --- a/core/pipeline/limiter/ConcurrencyLimiter.h +++ b/core/pipeline/limiter/ConcurrencyLimiter.h @@ -30,25 +30,23 @@ class ConcurrencyLimiter { public: ConcurrencyLimiter(const std::string& description, uint32_t maxConcurrency, - uint32_t maxRetryIntervalSecs = 3600, - uint32_t minRetryIntervalSecs = 30, - double retryIntervalUpRatio = 1.5, - double concurrencyDownRatio = 0.5) + uint32_t minConcurrency = 0, + double concurrencyDownFastRatio = 0.5, + double concurrencyDownSlowRatio = 0.8) : mDescription(description), mMaxConcurrency(maxConcurrency), + mMinConcurrency(minConcurrency), mCurrenctConcurrency(maxConcurrency), - mMaxRetryIntervalSecs(maxRetryIntervalSecs), - mMinRetryIntervalSecs(minRetryIntervalSecs), - mRetryIntervalSecs(minRetryIntervalSecs), - mRetryIntervalUpRatio(retryIntervalUpRatio), - mConcurrencyDownRatio(concurrencyDownRatio) {} + mConcurrencyDownFastRatio(concurrencyDownFastRatio), + mConcurrencyDownSlowRatio(concurrencyDownSlowRatio) {} bool IsValidToPop(); void PostPop(); void OnSendDone(); - void OnSuccess(); - void OnFail(); + void OnSuccess(std::chrono::system_clock::time_point time); + void OnFail(std::chrono::system_clock::time_point time); + static std::string GetLimiterMetricName(const std::string& limiter) { if (limiter == "region") { @@ -64,10 +62,10 @@ class ConcurrencyLimiter { #ifdef APSARA_UNIT_TEST_MAIN uint32_t GetCurrentLimit() const; - uint32_t GetCurrentInterval() const; void SetCurrentLimit(uint32_t limit); void SetInSendingCount(uint32_t count); uint32_t GetInSendingCount() const; + uint32_t GetStatisticThreshold() const; #endif @@ -77,19 +75,30 @@ class ConcurrencyLimiter { std::atomic_uint32_t mInSendingCnt = 0U; uint32_t mMaxConcurrency = 0; + uint32_t mMinConcurrency = 0; mutable std::mutex mLimiterMux; uint32_t mCurrenctConcurrency = 0; - uint32_t mMaxRetryIntervalSecs = 0; - uint32_t mMinRetryIntervalSecs = 0; + double mConcurrencyDownFastRatio = 0.0; + double mConcurrencyDownSlowRatio = 0.0; - uint32_t mRetryIntervalSecs = 0; + std::chrono::system_clock::time_point mLastCheckTime; - double mRetryIntervalUpRatio = 0.0; - double mConcurrencyDownRatio = 0.0; + mutable std::mutex mStatisticsMux; + std::chrono::system_clock::time_point mLastStatisticsTime; + uint32_t mStatisticsTotal = 0; + uint32_t mStatisticsFailTotal = 0; + + // 统计10个数据,最多等3s + const uint32_t mStatisticThreshold = 10; + const uint32_t mStatisticIntervalThresholdSeconds = 3; + + + void Increase(); + void Decrease(bool fastFallBack); + void AdjustConcurrency(bool success, std::chrono::system_clock::time_point time); - std::chrono::system_clock::time_point mLastCheckTime; }; } // namespace logtail diff --git a/core/pipeline/queue/BoundedSenderQueueInterface.cpp b/core/pipeline/queue/BoundedSenderQueueInterface.cpp index bbf258189d..4d9f0821a1 100644 --- a/core/pipeline/queue/BoundedSenderQueueInterface.cpp +++ b/core/pipeline/queue/BoundedSenderQueueInterface.cpp @@ -56,13 +56,6 @@ void BoundedSenderQueueInterface::SetConcurrencyLimiters(std::unordered_mapOnSuccess(); - } - } -} void BoundedSenderQueueInterface::DecreaseSendingCnt() { for (auto& limiter : mConcurrencyLimiters) { diff --git a/core/pipeline/queue/BoundedSenderQueueInterface.h b/core/pipeline/queue/BoundedSenderQueueInterface.h index 71a1f0c85e..826d574a5b 100644 --- a/core/pipeline/queue/BoundedSenderQueueInterface.h +++ b/core/pipeline/queue/BoundedSenderQueueInterface.h @@ -48,7 +48,6 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface& items, int32_t limit) = 0; void DecreaseSendingCnt(); - void OnSendingSuccess(); void SetRateLimiter(uint32_t maxRate); void SetConcurrencyLimiters(std::unordered_map>&& concurrencyLimitersMap); virtual void SetPipelineForItems(const std::shared_ptr& p) const = 0; diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 47977e47a6..ccc05c3533 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -123,12 +123,12 @@ shared_ptr FlusherSLS::GetLogstoreConcurrencyLimiter(const s auto iter = sLogstoreConcurrencyLimiterMap.find(key); if (iter == sLogstoreConcurrencyLimiterMap.end()) { - auto limiter = GetConcurrencyLimiter(sName + "#quota#logstore#" + key); + auto limiter = make_shared(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency(), 1); sLogstoreConcurrencyLimiterMap.try_emplace(key, limiter); return limiter; } if (iter->second.expired()) { - auto limiter = GetConcurrencyLimiter(sName + "#quota#logstore#" + key); + auto limiter = make_shared(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency(), 1); iter->second = limiter; return limiter; } @@ -139,12 +139,12 @@ shared_ptr FlusherSLS::GetProjectConcurrencyLimiter(const st lock_guard lock(sMux); auto iter = sProjectConcurrencyLimiterMap.find(project); if (iter == sProjectConcurrencyLimiterMap.end()) { - auto limiter = GetConcurrencyLimiter(sName + "#quota#project#" + project); + auto limiter = make_shared(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency(), 1); sProjectConcurrencyLimiterMap.try_emplace(project, limiter); return limiter; } if (iter->second.expired()) { - auto limiter = GetConcurrencyLimiter(sName + "#quota#project#" + project); + auto limiter = make_shared(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency(), 1); iter->second = limiter; return limiter; } @@ -155,12 +155,12 @@ shared_ptr FlusherSLS::GetRegionConcurrencyLimiter(const str lock_guard lock(sMux); auto iter = sRegionConcurrencyLimiterMap.find(region); if (iter == sRegionConcurrencyLimiterMap.end()) { - auto limiter = GetConcurrencyLimiter(sName + "#network#region#" + region); + auto limiter = make_shared(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*0.5); sRegionConcurrencyLimiterMap.try_emplace(region, limiter); return limiter; } if (iter->second.expired()) { - auto limiter = GetConcurrencyLimiter(sName + "#network#region#" + region); + auto limiter = make_shared(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*0.5); iter->second = limiter; return limiter; } @@ -637,6 +637,9 @@ bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr item, data->mShardHashKey); } + if (req) { + req->mMaxTryCnt = 1; + } if (!req) { *keepItem = true; return false; @@ -690,9 +693,9 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) ToString(chrono::duration_cast(curSystemTime - item->mFirstEnqueTime).count()) + "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentEndpoint)("is profile data", isProfileData)); - GetRegionConcurrencyLimiter(mRegion)->OnSuccess(); - GetProjectConcurrencyLimiter(mProject)->OnSuccess(); - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); + GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime); + GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime); SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey); DealSenderQueueItemAfterSend(item, false); if (mSuccessCnt) { @@ -733,17 +736,17 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) } } operation = data->mBufferOrNot ? OperationOnFail::RETRY_LATER : OperationOnFail::DISCARD; - GetRegionConcurrencyLimiter(mRegion)->OnFail(); - GetProjectConcurrencyLimiter(mProject)->OnSuccess(); - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); + GetRegionConcurrencyLimiter(mRegion)->OnFail(curSystemTime); + GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime); } else if (sendResult == SEND_QUOTA_EXCEED) { BOOL_FLAG(global_network_success) = true; if (slsResponse.mErrorCode == sdk::LOGE_SHARD_WRITE_QUOTA_EXCEED) { failDetail << "shard write quota exceed"; suggestion << "Split logstore shards. https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources"; - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnFail(); - GetRegionConcurrencyLimiter(mRegion)->OnSuccess(); - GetProjectConcurrencyLimiter(mProject)->OnSuccess(); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnFail(curSystemTime); + GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime); + GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime); if (mShardWriteQuotaErrorCnt) { mShardWriteQuotaErrorCnt->Add(1); } @@ -751,9 +754,9 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) failDetail << "project write quota exceed"; suggestion << "Submit quota modification request. " "https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources"; - GetProjectConcurrencyLimiter(mProject)->OnFail(); - GetRegionConcurrencyLimiter(mRegion)->OnSuccess(); - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); + GetProjectConcurrencyLimiter(mProject)->OnFail(curSystemTime); + GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime); if (mProjectQuotaErrorCnt) { mProjectQuotaErrorCnt->Add(1); } diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 38b09d78de..3b77dc17f5 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -115,7 +115,7 @@ void FlusherRunner::DecreaseHttpSendingCnt() { void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) { // TODO: use semaphore instead while (withLimit && !Application::GetInstance()->IsExiting() - && GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestConcurrency()) { + && GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestGlobalConcurrency()) { this_thread::sleep_for(chrono::milliseconds(10)); } @@ -155,7 +155,7 @@ void FlusherRunner::Run() { vector items; int32_t limit - = Application::GetInstance()->IsExiting() ? -1 : AppConfig::GetInstance()->GetSendRequestConcurrency(); + = Application::GetInstance()->IsExiting() ? -1 : AppConfig::GetInstance()->GetSendRequestGlobalConcurrency(); SenderQueueManager::GetInstance()->GetAvailableItems(items, limit); if (items.empty()) { SenderQueueManager::GetInstance()->Wait(1000); diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 2bd2e77cb9..2e85a84c8d 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -54,7 +54,7 @@ bool HttpSink::Init() { mSendConcurrency = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_SINK_SEND_CONCURRENCY); // TODO: should be dynamic - mSendConcurrency->Set(AppConfig::GetInstance()->GetSendRequestConcurrency()); + mSendConcurrency->Set(AppConfig::GetInstance()->GetSendRequestGlobalConcurrency()); mThreadRes = async(launch::async, &HttpSink::Run, this); return true; diff --git a/core/unittest/pipeline/ConcurrencyLimiterUnittest.cpp b/core/unittest/pipeline/ConcurrencyLimiterUnittest.cpp index e593e8db68..973b1b06b3 100644 --- a/core/unittest/pipeline/ConcurrencyLimiterUnittest.cpp +++ b/core/unittest/pipeline/ConcurrencyLimiterUnittest.cpp @@ -25,65 +25,118 @@ class ConcurrencyLimiterUnittest : public testing::Test { }; void ConcurrencyLimiterUnittest::TestLimiter() const { - shared_ptr sConcurrencyLimiter = make_shared("", 80); - // comcurrency = 10, count = 0 + auto curSystemTime = chrono::system_clock::now(); + int maxConcurrency = 80; + int minConcurrency = 20; + + shared_ptr sConcurrencyLimiter = make_shared("", maxConcurrency, minConcurrency); + // fastFallBack APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); - sConcurrencyLimiter->PostPop(); - APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetInSendingCount()); - sConcurrencyLimiter->OnFail(); - sConcurrencyLimiter->OnSendDone(); + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold(); i++) { + sConcurrencyLimiter->PostPop(); + APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetInSendingCount()); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } APSARA_TEST_EQUAL(40U, sConcurrencyLimiter->GetCurrentLimit()); APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); - APSARA_TEST_EQUAL(30U, sConcurrencyLimiter->GetCurrentInterval()); - // count = 10, comcurrency = 10 + // success one time APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); - int num = 10; - for (int i = 0; i < num; i++) { + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold(); i++) { APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); sConcurrencyLimiter->PostPop(); } APSARA_TEST_EQUAL(10U, sConcurrencyLimiter->GetInSendingCount()); - for (int i = 0; i < num; i++) { - sConcurrencyLimiter->OnSuccess(); + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold(); i++) { + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); sConcurrencyLimiter->OnSendDone(); } + APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); - APSARA_TEST_EQUAL(50U, sConcurrencyLimiter->GetCurrentLimit()); - APSARA_TEST_EQUAL(30U, sConcurrencyLimiter->GetCurrentInterval()); + APSARA_TEST_EQUAL(41U, sConcurrencyLimiter->GetCurrentLimit()); - // limit = 50/2/2/2/2/2/2/2 = 25/2/2/2/2/2/2 = 3/2/2/2 = 1/2/2 = 0 - // interval = 30 * 1.5 = 45 - num = 7; - for (int i = 0; i < num; i++) { - APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); + // slowFallBack + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold() - 2; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + for (int i = 0; i < 2; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + uint32_t expect = 41*0.8; + APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); + APSARA_TEST_EQUAL(expect, sConcurrencyLimiter->GetCurrentLimit()); + + // no FallBack + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold() - 1; i++) { sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); + sConcurrencyLimiter->OnSendDone(); } - APSARA_TEST_EQUAL(7U, sConcurrencyLimiter->GetInSendingCount()); - for (int i = 0; i < num; i++) { - sConcurrencyLimiter->OnFail(); + for (int i = 0; i < 1; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); sConcurrencyLimiter->OnSendDone(); } APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); - APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetCurrentLimit()); - APSARA_TEST_EQUAL(45U, sConcurrencyLimiter->GetCurrentInterval()); - - num = 3; - for (int i = 0; i < num; i++) { - if (i == 0) { - APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); - } else { - APSARA_TEST_EQUAL(false, sConcurrencyLimiter->IsValidToPop()); + APSARA_TEST_EQUAL(expect, sConcurrencyLimiter->GetCurrentLimit()); + + // test FallBack to minConcurrency + for (int i = 0; i < 10; i++) { + for (uint32_t j = 0; j < sConcurrencyLimiter->GetStatisticThreshold(); j++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); + sConcurrencyLimiter->OnSendDone(); } } + APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); + APSARA_TEST_EQUAL(minConcurrency, sConcurrencyLimiter->GetCurrentLimit()); - sConcurrencyLimiter->PostPop(); - sConcurrencyLimiter->OnSuccess(); - sConcurrencyLimiter->OnSendDone(); + // test limit by concurrency + for (int i = 0; i < minConcurrency; i++) { + APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); + sConcurrencyLimiter->PostPop(); + } + APSARA_TEST_EQUAL(false, sConcurrencyLimiter->IsValidToPop()); + for (int i = 0; i < minConcurrency; i++) { + sConcurrencyLimiter->OnSendDone(); + } + // test time exceed interval; 8 success, 1 fail, and last one timeout + sConcurrencyLimiter->SetCurrentLimit(40); + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold() - 3; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + for (int i = 0; i < 1; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + sleep(4); + for (int i = 0; i < 1; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + expect = 40*0.8; APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); - APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetCurrentLimit()); - APSARA_TEST_EQUAL(30U, sConcurrencyLimiter->GetCurrentInterval()); + APSARA_TEST_EQUAL(expect, sConcurrencyLimiter->GetCurrentLimit()); } UNIT_TEST_CASE(ConcurrencyLimiterUnittest, TestLimiter)