From 2dc76058bbbba7062c17ac3befa8a152678acc95 Mon Sep 17 00:00:00 2001 From: linrunqi08 <90741255+linrunqi08@users.noreply.github.com> Date: Fri, 27 Dec 2024 16:49:39 +0800 Subject: [PATCH] Optimize the limiter code to meet better isolation and recovery scenarios (#1985) --- core/app_config/AppConfig.cpp | 25 ++++ core/app_config/AppConfig.h | 17 ++- core/pipeline/limiter/ConcurrencyLimiter.cpp | 92 ++++++++----- core/pipeline/limiter/ConcurrencyLimiter.h | 42 +++--- .../queue/BoundedSenderQueueInterface.cpp | 7 - .../queue/BoundedSenderQueueInterface.h | 1 - core/plugin/flusher/sls/FlusherSLS.cpp | 36 ++--- core/runner/FlusherRunner.cpp | 4 +- core/runner/sink/http/HttpSink.cpp | 2 +- core/runner/sink/http/HttpSinkRequest.h | 7 +- core/sdk/Client.cpp | 4 +- core/sdk/Common.h | 2 + .../pipeline/ConcurrencyLimiterUnittest.cpp | 125 +++++++++++++----- .../unittest/sender/FlusherRunnerUnittest.cpp | 2 + 14 files changed, 246 insertions(+), 120 deletions(-) diff --git a/core/app_config/AppConfig.cpp b/core/app_config/AppConfig.cpp index 27aa6116cf..1410714e3a 100644 --- a/core/app_config/AppConfig.cpp +++ b/core/app_config/AppConfig.cpp @@ -175,6 +175,7 @@ DEFINE_FLAG_STRING(logtail_snapshot_dir, "snapshot dir on local disk", "snapshot DEFINE_FLAG_STRING(logtail_profile_snapshot, "reader profile on local disk", "logtail_profile_snapshot"); DEFINE_FLAG_STRING(ilogtail_config_env_name, "config file path", "ALIYUN_LOGTAIL_CONFIG"); + #if defined(__linux__) DEFINE_FLAG_STRING(adhoc_check_point_file_dir, "", "/tmp/logtail_adhoc_checkpoint"); #elif defined(_MSC_VER) @@ -194,6 +195,21 @@ DEFINE_FLAG_STRING(sls_observer_ebpf_host_path, namespace logtail { constexpr int32_t kDefaultMaxSendBytePerSec = 25 * 1024 * 1024; // the max send speed per sec, realtime thread + +// 全局并发度保留余量百分比 +const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION = 0.5; +// 单地域并发度最小值 +const int32_t MIN_SEND_REQUEST_CONCURRENCY = 15; +// 单地域并发度最大值 +const int32_t MAX_SEND_REQUEST_CONCURRENCY = 80; +// 并发度统计数量&&时间间隔 +const uint32_t CONCURRENCY_STATISTIC_THRESHOLD = 10; +const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS = 3; +// 并发度不回退百分比阈值 +const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE = 10; +// 并发度慢回退百分比阈值 +const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE = 40; + std::string AppConfig::sLocalConfigDir = "local"; void CreateAgentDir() { try { @@ -1161,6 +1177,15 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) { mBindInterface.clear(); LOG_INFO(sLogger, ("bind_interface", mBindInterface)); } + + // mSendRequestConcurrency was limited + if (mSendRequestConcurrency < MIN_SEND_REQUEST_CONCURRENCY) { + mSendRequestConcurrency = MIN_SEND_REQUEST_CONCURRENCY; + } + if (mSendRequestConcurrency > MAX_SEND_REQUEST_CONCURRENCY) { + mSendRequestConcurrency = MAX_SEND_REQUEST_CONCURRENCY; + } + mSendRequestGlobalConcurrency = mSendRequestConcurrency * (1 + GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION); } bool AppConfig::CheckAndResetProxyEnv() { diff --git a/core/app_config/AppConfig.h b/core/app_config/AppConfig.h index 609c4379f6..d8f756ce03 100644 --- a/core/app_config/AppConfig.h +++ b/core/app_config/AppConfig.h @@ -31,6 +31,14 @@ namespace logtail { extern const int32_t kDefaultMaxSendBytePerSec; +extern const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION; +extern const int32_t MIN_SEND_REQUEST_CONCURRENCY; +extern const int32_t MAX_SEND_REQUEST_CONCURRENCY; +extern const uint32_t CONCURRENCY_STATISTIC_THRESHOLD; +extern const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS; +extern const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE; +extern const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE; + void CreateAgentDir(); std::string GetAgentLogDir(); @@ -131,6 +139,7 @@ class AppConfig { int32_t mNumOfBufferFile; int32_t mLocalFileSize; int32_t mSendRequestConcurrency; + int32_t mSendRequestGlobalConcurrency; std::string mBufferFilePath; // checkpoint @@ -207,6 +216,8 @@ class AppConfig { std::string mBindInterface; + + // /** // * @brief Load ConfigServer, DataServer and network interface // * @@ -434,8 +445,12 @@ class AppConfig { int32_t GetLocalFileSize() const { return mLocalFileSize; } const std::string& GetBufferFilePath() const { return mBufferFilePath; } - + // 单地域并发度 int32_t GetSendRequestConcurrency() const { return mSendRequestConcurrency; } + // 全局并发度 + int32_t GetSendRequestGlobalConcurrency() const { return mSendRequestGlobalConcurrency; } + + double GetGlobalConcurrencyFreePercentageForOneRegion() const { return GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION; } int32_t GetProcessThreadCount() const { return mProcessThreadCount; } diff --git a/core/pipeline/limiter/ConcurrencyLimiter.cpp b/core/pipeline/limiter/ConcurrencyLimiter.cpp index ac75c8de27..937004769b 100644 --- a/core/pipeline/limiter/ConcurrencyLimiter.cpp +++ b/core/pipeline/limiter/ConcurrencyLimiter.cpp @@ -20,17 +20,12 @@ using namespace std; namespace logtail { - #ifdef APSARA_UNIT_TEST_MAIN uint32_t ConcurrencyLimiter::GetCurrentLimit() const { lock_guard lock(mLimiterMux); return mCurrenctConcurrency; } -uint32_t ConcurrencyLimiter::GetCurrentInterval() const { - lock_guard lock(mLimiterMux); - return mRetryIntervalSecs; -} void ConcurrencyLimiter::SetCurrentLimit(uint32_t limit) { lock_guard lock(mLimiterMux); mCurrenctConcurrency = limit; @@ -42,19 +37,15 @@ void ConcurrencyLimiter::SetInSendingCount(uint32_t count) { uint32_t ConcurrencyLimiter::GetInSendingCount() const { return mInSendingCnt.load(); } + +uint32_t ConcurrencyLimiter::GetStatisticThreshold() const { + return CONCURRENCY_STATISTIC_THRESHOLD; +} + #endif bool ConcurrencyLimiter::IsValidToPop() { lock_guard lock(mLimiterMux); - if (mCurrenctConcurrency == 0) { - auto curTime = std::chrono::system_clock::now(); - if (chrono::duration_cast(curTime - mLastCheckTime).count() > mRetryIntervalSecs) { - mLastCheckTime = curTime; - return true; - } else { - return false; - } - } if (mCurrenctConcurrency > mInSendingCnt.load()) { return true; } @@ -69,16 +60,20 @@ void ConcurrencyLimiter::OnSendDone() { --mInSendingCnt; } -void ConcurrencyLimiter::OnSuccess() { +void ConcurrencyLimiter::OnSuccess(std::chrono::system_clock::time_point currentTime) { + AdjustConcurrency(true, currentTime); +} + +void ConcurrencyLimiter::OnFail(std::chrono::system_clock::time_point currentTime) { + AdjustConcurrency(false, currentTime); +} + +void ConcurrencyLimiter::Increase() { lock_guard lock(mLimiterMux); - if (mCurrenctConcurrency <= 0) { - mRetryIntervalSecs = mMinRetryIntervalSecs; - LOG_INFO(sLogger, ("reset send retry interval, type", mDescription)); - } if (mCurrenctConcurrency != mMaxConcurrency) { ++mCurrenctConcurrency; if (mCurrenctConcurrency == mMaxConcurrency) { - LOG_INFO(sLogger, + LOG_DEBUG(sLogger, ("increase send concurrency to maximum, type", mDescription)("concurrency", mCurrenctConcurrency)); } else { LOG_DEBUG(sLogger, @@ -88,22 +83,57 @@ void ConcurrencyLimiter::OnSuccess() { } } -void ConcurrencyLimiter::OnFail() { +void ConcurrencyLimiter::Decrease(double fallBackRatio) { lock_guard lock(mLimiterMux); - if (mCurrenctConcurrency != 0) { + if (mCurrenctConcurrency != mMinConcurrency) { auto old = mCurrenctConcurrency; - mCurrenctConcurrency = static_cast(mCurrenctConcurrency * mConcurrencyDownRatio); - LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency)); + mCurrenctConcurrency = std::max(static_cast(mCurrenctConcurrency * fallBackRatio), mMinConcurrency); + LOG_DEBUG(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency)); } else { - if (mRetryIntervalSecs != mMaxRetryIntervalSecs) { - auto old = mRetryIntervalSecs; - mRetryIntervalSecs - = min(mMaxRetryIntervalSecs, static_cast(mRetryIntervalSecs * mRetryIntervalUpRatio)); - LOG_INFO(sLogger, - ("increase send retry interval, type", - mDescription)("from", ToString(old) + "s")("to", ToString(mRetryIntervalSecs) + "s")); + if (mMinConcurrency == 0) { + mCurrenctConcurrency = 1; + LOG_INFO(sLogger, ("decrease send concurrency to min, type", mDescription)("to", mCurrenctConcurrency)); } } } + +void ConcurrencyLimiter::AdjustConcurrency(bool success, std::chrono::system_clock::time_point currentTime) { + uint32_t failPercentage = 0; + bool finishStatistics = false; + { + lock_guard lock(mStatisticsMux); + mStatisticsTotal ++; + if (!success) { + mStatisticsFailTotal ++; + } + if (mLastStatisticsTime == std::chrono::system_clock::time_point()) { + mLastStatisticsTime = currentTime; + } + if (mStatisticsTotal == CONCURRENCY_STATISTIC_THRESHOLD || chrono::duration_cast(currentTime - mLastStatisticsTime).count() > CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS) { + failPercentage = mStatisticsFailTotal*100/mStatisticsTotal; + LOG_DEBUG(sLogger,("AdjustConcurrency", mDescription)("mStatisticsFailTotal", mStatisticsFailTotal)("mStatisticsTotal", mStatisticsTotal)); + mStatisticsTotal = 0; + mStatisticsFailTotal = 0; + mLastStatisticsTime = currentTime; + finishStatistics = true; + } + } + if (finishStatistics) { + if (failPercentage == 0) { + // 成功 + Increase(); + } else if (failPercentage <= NO_FALL_BACK_FAIL_PERCENTAGE) { + // 不调整 + } else if (failPercentage <= SLOW_FALL_BACK_FAIL_PERCENTAGE) { + // 慢回退 + Decrease(mConcurrencySlowFallBackRatio); + } else { + // 快速回退 + Decrease(mConcurrencyFastFallBackRatio); + } + } +} + + } // namespace logtail diff --git a/core/pipeline/limiter/ConcurrencyLimiter.h b/core/pipeline/limiter/ConcurrencyLimiter.h index 1191326b04..513cea0988 100644 --- a/core/pipeline/limiter/ConcurrencyLimiter.h +++ b/core/pipeline/limiter/ConcurrencyLimiter.h @@ -22,33 +22,31 @@ #include #include +#include "app_config/AppConfig.h" #include "monitor/metric_constants/MetricConstants.h" namespace logtail { - class ConcurrencyLimiter { public: ConcurrencyLimiter(const std::string& description, uint32_t maxConcurrency, - uint32_t maxRetryIntervalSecs = 3600, - uint32_t minRetryIntervalSecs = 30, - double retryIntervalUpRatio = 1.5, - double concurrencyDownRatio = 0.5) + uint32_t minConcurrency = 1, + double concurrencyFastFallBackRatio = 0.5, + double concurrencySlowFallBackRatio = 0.8) : mDescription(description), mMaxConcurrency(maxConcurrency), + mMinConcurrency(minConcurrency), mCurrenctConcurrency(maxConcurrency), - mMaxRetryIntervalSecs(maxRetryIntervalSecs), - mMinRetryIntervalSecs(minRetryIntervalSecs), - mRetryIntervalSecs(minRetryIntervalSecs), - mRetryIntervalUpRatio(retryIntervalUpRatio), - mConcurrencyDownRatio(concurrencyDownRatio) {} + mConcurrencyFastFallBackRatio(concurrencyFastFallBackRatio), + mConcurrencySlowFallBackRatio(concurrencySlowFallBackRatio) {} bool IsValidToPop(); void PostPop(); void OnSendDone(); - void OnSuccess(); - void OnFail(); + void OnSuccess(std::chrono::system_clock::time_point currentTime); + void OnFail(std::chrono::system_clock::time_point currentTime); + static std::string GetLimiterMetricName(const std::string& limiter) { if (limiter == "region") { @@ -64,10 +62,10 @@ class ConcurrencyLimiter { #ifdef APSARA_UNIT_TEST_MAIN uint32_t GetCurrentLimit() const; - uint32_t GetCurrentInterval() const; void SetCurrentLimit(uint32_t limit); void SetInSendingCount(uint32_t count); uint32_t GetInSendingCount() const; + uint32_t GetStatisticThreshold() const; #endif @@ -77,19 +75,25 @@ class ConcurrencyLimiter { std::atomic_uint32_t mInSendingCnt = 0U; uint32_t mMaxConcurrency = 0; + uint32_t mMinConcurrency = 0; mutable std::mutex mLimiterMux; uint32_t mCurrenctConcurrency = 0; - uint32_t mMaxRetryIntervalSecs = 0; - uint32_t mMinRetryIntervalSecs = 0; + double mConcurrencyFastFallBackRatio = 0.0; + double mConcurrencySlowFallBackRatio = 0.0; - uint32_t mRetryIntervalSecs = 0; + std::chrono::system_clock::time_point mLastCheckTime; - double mRetryIntervalUpRatio = 0.0; - double mConcurrencyDownRatio = 0.0; + mutable std::mutex mStatisticsMux; + std::chrono::system_clock::time_point mLastStatisticsTime; + uint32_t mStatisticsTotal = 0; + uint32_t mStatisticsFailTotal = 0; + + void Increase(); + void Decrease(double fallBackRatio); + void AdjustConcurrency(bool success, std::chrono::system_clock::time_point currentTime); - std::chrono::system_clock::time_point mLastCheckTime; }; } // namespace logtail diff --git a/core/pipeline/queue/BoundedSenderQueueInterface.cpp b/core/pipeline/queue/BoundedSenderQueueInterface.cpp index bbf258189d..4d9f0821a1 100644 --- a/core/pipeline/queue/BoundedSenderQueueInterface.cpp +++ b/core/pipeline/queue/BoundedSenderQueueInterface.cpp @@ -56,13 +56,6 @@ void BoundedSenderQueueInterface::SetConcurrencyLimiters(std::unordered_mapOnSuccess(); - } - } -} void BoundedSenderQueueInterface::DecreaseSendingCnt() { for (auto& limiter : mConcurrencyLimiters) { diff --git a/core/pipeline/queue/BoundedSenderQueueInterface.h b/core/pipeline/queue/BoundedSenderQueueInterface.h index 71a1f0c85e..826d574a5b 100644 --- a/core/pipeline/queue/BoundedSenderQueueInterface.h +++ b/core/pipeline/queue/BoundedSenderQueueInterface.h @@ -48,7 +48,6 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface& items, int32_t limit) = 0; void DecreaseSendingCnt(); - void OnSendingSuccess(); void SetRateLimiter(uint32_t maxRate); void SetConcurrencyLimiters(std::unordered_map>&& concurrencyLimitersMap); virtual void SetPipelineForItems(const std::shared_ptr& p) const = 0; diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index e9c0e39861..4a5edeeec1 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -126,12 +126,12 @@ shared_ptr FlusherSLS::GetLogstoreConcurrencyLimiter(const s auto iter = sLogstoreConcurrencyLimiterMap.find(key); if (iter == sLogstoreConcurrencyLimiterMap.end()) { - auto limiter = GetConcurrencyLimiter(sName + "#quota#logstore#" + key); + auto limiter = make_shared(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency()); sLogstoreConcurrencyLimiterMap.try_emplace(key, limiter); return limiter; } if (iter->second.expired()) { - auto limiter = GetConcurrencyLimiter(sName + "#quota#logstore#" + key); + auto limiter = make_shared(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency()); iter->second = limiter; return limiter; } @@ -142,12 +142,12 @@ shared_ptr FlusherSLS::GetProjectConcurrencyLimiter(const st lock_guard lock(sMux); auto iter = sProjectConcurrencyLimiterMap.find(project); if (iter == sProjectConcurrencyLimiterMap.end()) { - auto limiter = GetConcurrencyLimiter(sName + "#quota#project#" + project); + auto limiter = make_shared(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency()); sProjectConcurrencyLimiterMap.try_emplace(project, limiter); return limiter; } if (iter->second.expired()) { - auto limiter = GetConcurrencyLimiter(sName + "#quota#project#" + project); + auto limiter = make_shared(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency()); iter->second = limiter; return limiter; } @@ -158,12 +158,12 @@ shared_ptr FlusherSLS::GetRegionConcurrencyLimiter(const str lock_guard lock(sMux); auto iter = sRegionConcurrencyLimiterMap.find(region); if (iter == sRegionConcurrencyLimiterMap.end()) { - auto limiter = GetConcurrencyLimiter(sName + "#network#region#" + region); + auto limiter = make_shared(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion()); sRegionConcurrencyLimiterMap.try_emplace(region, limiter); return limiter; } if (iter->second.expired()) { - auto limiter = GetConcurrencyLimiter(sName + "#network#region#" + region); + auto limiter = make_shared(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion()); iter->second = limiter; return limiter; } @@ -693,9 +693,9 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) ToString(chrono::duration_cast(curSystemTime - item->mFirstEnqueTime).count()) + "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentEndpoint)("is profile data", isProfileData)); - GetRegionConcurrencyLimiter(mRegion)->OnSuccess(); - GetProjectConcurrencyLimiter(mProject)->OnSuccess(); - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); + GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime); + GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime); SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey); if (mSuccessCnt) { mSuccessCnt->Add(1); @@ -736,17 +736,17 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) } } operation = data->mBufferOrNot ? OperationOnFail::RETRY_LATER : OperationOnFail::DISCARD; - GetRegionConcurrencyLimiter(mRegion)->OnFail(); - GetProjectConcurrencyLimiter(mProject)->OnSuccess(); - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); + GetRegionConcurrencyLimiter(mRegion)->OnFail(curSystemTime); + GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime); } else if (sendResult == SEND_QUOTA_EXCEED) { BOOL_FLAG(global_network_success) = true; if (slsResponse.mErrorCode == sdk::LOGE_SHARD_WRITE_QUOTA_EXCEED) { failDetail << "shard write quota exceed"; suggestion << "Split logstore shards. https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources"; - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnFail(); - GetRegionConcurrencyLimiter(mRegion)->OnSuccess(); - GetProjectConcurrencyLimiter(mProject)->OnSuccess(); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnFail(curSystemTime); + GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime); + GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime); if (mShardWriteQuotaErrorCnt) { mShardWriteQuotaErrorCnt->Add(1); } @@ -754,9 +754,9 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) failDetail << "project write quota exceed"; suggestion << "Submit quota modification request. " "https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources"; - GetProjectConcurrencyLimiter(mProject)->OnFail(); - GetRegionConcurrencyLimiter(mRegion)->OnSuccess(); - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); + GetProjectConcurrencyLimiter(mProject)->OnFail(curSystemTime); + GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime); if (mProjectQuotaErrorCnt) { mProjectQuotaErrorCnt->Add(1); } diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 38b09d78de..3b77dc17f5 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -115,7 +115,7 @@ void FlusherRunner::DecreaseHttpSendingCnt() { void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) { // TODO: use semaphore instead while (withLimit && !Application::GetInstance()->IsExiting() - && GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestConcurrency()) { + && GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestGlobalConcurrency()) { this_thread::sleep_for(chrono::milliseconds(10)); } @@ -155,7 +155,7 @@ void FlusherRunner::Run() { vector items; int32_t limit - = Application::GetInstance()->IsExiting() ? -1 : AppConfig::GetInstance()->GetSendRequestConcurrency(); + = Application::GetInstance()->IsExiting() ? -1 : AppConfig::GetInstance()->GetSendRequestGlobalConcurrency(); SenderQueueManager::GetInstance()->GetAvailableItems(items, limit); if (items.empty()) { SenderQueueManager::GetInstance()->Wait(1000); diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 6777e3d3b0..e9951d4237 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -54,7 +54,7 @@ bool HttpSink::Init() { mSendConcurrency = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_SINK_SEND_CONCURRENCY); // TODO: should be dynamic - mSendConcurrency->Set(AppConfig::GetInstance()->GetSendRequestConcurrency()); + mSendConcurrency->Set(AppConfig::GetInstance()->GetSendRequestGlobalConcurrency()); mThreadRes = async(launch::async, &HttpSink::Run, this); return true; diff --git a/core/runner/sink/http/HttpSinkRequest.h b/core/runner/sink/http/HttpSinkRequest.h index 1f936a8f99..f8220f7722 100644 --- a/core/runner/sink/http/HttpSinkRequest.h +++ b/core/runner/sink/http/HttpSinkRequest.h @@ -32,8 +32,11 @@ struct HttpSinkRequest : public AsynHttpRequest { const std::string& query, const std::map& header, const std::string& body, - SenderQueueItem* item) - : AsynHttpRequest(method, httpsFlag, host, port, url, query, header, body), mItem(item) {} + SenderQueueItem* item, + uint32_t timeout = static_cast(INT32_FLAG(default_http_request_timeout_secs)), + uint32_t maxTryCnt = static_cast(INT32_FLAG(default_http_request_max_try_cnt)) + ) + : AsynHttpRequest(method, httpsFlag, host, port, url, query, header, body, HttpResponse(), timeout, maxTryCnt), mItem(item) {} bool IsContextValid() const override { return true; } void OnSendDone(HttpResponse& response) override {} diff --git a/core/sdk/Client.cpp b/core/sdk/Client.cpp index acfc49f95c..053e6da40a 100644 --- a/core/sdk/Client.cpp +++ b/core/sdk/Client.cpp @@ -289,7 +289,7 @@ namespace sdk { SetCommonHeader(httpHeader, (int32_t)(body.length()), ""); string signature = GetUrlSignature(HTTP_POST, operation, httpHeader, parameterList, body, accessKeySecret); httpHeader[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature; - return make_unique(HTTP_POST, mUsingHTTPS, host, mPort, operation, "", httpHeader, body, item); + return make_unique(HTTP_POST, mUsingHTTPS, host, mPort, operation, "", httpHeader, body, item, INT32_FLAG(default_http_request_timeout_secs), LOG_REQUEST_TRY_TIMES); } unique_ptr @@ -340,7 +340,7 @@ namespace sdk { GetQueryString(parameterList, queryString); return make_unique( - HTTP_POST, mUsingHTTPS, host, mPort, operation, queryString, httpHeader, body, item); + HTTP_POST, mUsingHTTPS, host, mPort, operation, queryString, httpHeader, body, item, INT32_FLAG(default_http_request_timeout_secs), LOG_REQUEST_TRY_TIMES); } PostLogStoreLogsResponse diff --git a/core/sdk/Common.h b/core/sdk/Common.h index 56ce512560..e37d2efa61 100644 --- a/core/sdk/Common.h +++ b/core/sdk/Common.h @@ -36,6 +36,8 @@ namespace sdk { const int64_t kFirstHashKeySeqID = 1; const uint32_t LOG_REQUEST_TIMEOUT = 20; + const uint32_t LOG_REQUEST_TRY_TIMES = 1; + const uint32_t MD5_BYTES = 16; #define DATE_FORMAT_RFC822 "%a, %d %b %Y %H:%M:%S GMT" ///< RFC822 date formate, GMT time. diff --git a/core/unittest/pipeline/ConcurrencyLimiterUnittest.cpp b/core/unittest/pipeline/ConcurrencyLimiterUnittest.cpp index e593e8db68..973b1b06b3 100644 --- a/core/unittest/pipeline/ConcurrencyLimiterUnittest.cpp +++ b/core/unittest/pipeline/ConcurrencyLimiterUnittest.cpp @@ -25,65 +25,118 @@ class ConcurrencyLimiterUnittest : public testing::Test { }; void ConcurrencyLimiterUnittest::TestLimiter() const { - shared_ptr sConcurrencyLimiter = make_shared("", 80); - // comcurrency = 10, count = 0 + auto curSystemTime = chrono::system_clock::now(); + int maxConcurrency = 80; + int minConcurrency = 20; + + shared_ptr sConcurrencyLimiter = make_shared("", maxConcurrency, minConcurrency); + // fastFallBack APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); - sConcurrencyLimiter->PostPop(); - APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetInSendingCount()); - sConcurrencyLimiter->OnFail(); - sConcurrencyLimiter->OnSendDone(); + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold(); i++) { + sConcurrencyLimiter->PostPop(); + APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetInSendingCount()); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } APSARA_TEST_EQUAL(40U, sConcurrencyLimiter->GetCurrentLimit()); APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); - APSARA_TEST_EQUAL(30U, sConcurrencyLimiter->GetCurrentInterval()); - // count = 10, comcurrency = 10 + // success one time APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); - int num = 10; - for (int i = 0; i < num; i++) { + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold(); i++) { APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); sConcurrencyLimiter->PostPop(); } APSARA_TEST_EQUAL(10U, sConcurrencyLimiter->GetInSendingCount()); - for (int i = 0; i < num; i++) { - sConcurrencyLimiter->OnSuccess(); + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold(); i++) { + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); sConcurrencyLimiter->OnSendDone(); } + APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); - APSARA_TEST_EQUAL(50U, sConcurrencyLimiter->GetCurrentLimit()); - APSARA_TEST_EQUAL(30U, sConcurrencyLimiter->GetCurrentInterval()); + APSARA_TEST_EQUAL(41U, sConcurrencyLimiter->GetCurrentLimit()); - // limit = 50/2/2/2/2/2/2/2 = 25/2/2/2/2/2/2 = 3/2/2/2 = 1/2/2 = 0 - // interval = 30 * 1.5 = 45 - num = 7; - for (int i = 0; i < num; i++) { - APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); + // slowFallBack + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold() - 2; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + for (int i = 0; i < 2; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + uint32_t expect = 41*0.8; + APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); + APSARA_TEST_EQUAL(expect, sConcurrencyLimiter->GetCurrentLimit()); + + // no FallBack + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold() - 1; i++) { sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); + sConcurrencyLimiter->OnSendDone(); } - APSARA_TEST_EQUAL(7U, sConcurrencyLimiter->GetInSendingCount()); - for (int i = 0; i < num; i++) { - sConcurrencyLimiter->OnFail(); + for (int i = 0; i < 1; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); sConcurrencyLimiter->OnSendDone(); } APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); - APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetCurrentLimit()); - APSARA_TEST_EQUAL(45U, sConcurrencyLimiter->GetCurrentInterval()); - - num = 3; - for (int i = 0; i < num; i++) { - if (i == 0) { - APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); - } else { - APSARA_TEST_EQUAL(false, sConcurrencyLimiter->IsValidToPop()); + APSARA_TEST_EQUAL(expect, sConcurrencyLimiter->GetCurrentLimit()); + + // test FallBack to minConcurrency + for (int i = 0; i < 10; i++) { + for (uint32_t j = 0; j < sConcurrencyLimiter->GetStatisticThreshold(); j++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); + sConcurrencyLimiter->OnSendDone(); } } + APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); + APSARA_TEST_EQUAL(minConcurrency, sConcurrencyLimiter->GetCurrentLimit()); - sConcurrencyLimiter->PostPop(); - sConcurrencyLimiter->OnSuccess(); - sConcurrencyLimiter->OnSendDone(); + // test limit by concurrency + for (int i = 0; i < minConcurrency; i++) { + APSARA_TEST_EQUAL(true, sConcurrencyLimiter->IsValidToPop()); + sConcurrencyLimiter->PostPop(); + } + APSARA_TEST_EQUAL(false, sConcurrencyLimiter->IsValidToPop()); + for (int i = 0; i < minConcurrency; i++) { + sConcurrencyLimiter->OnSendDone(); + } + // test time exceed interval; 8 success, 1 fail, and last one timeout + sConcurrencyLimiter->SetCurrentLimit(40); + for (uint32_t i = 0; i < sConcurrencyLimiter->GetStatisticThreshold() - 3; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + for (int i = 0; i < 1; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnFail(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + sleep(4); + for (int i = 0; i < 1; i++) { + sConcurrencyLimiter->PostPop(); + curSystemTime = chrono::system_clock::now(); + sConcurrencyLimiter->OnSuccess(curSystemTime); + sConcurrencyLimiter->OnSendDone(); + } + expect = 40*0.8; APSARA_TEST_EQUAL(0U, sConcurrencyLimiter->GetInSendingCount()); - APSARA_TEST_EQUAL(1U, sConcurrencyLimiter->GetCurrentLimit()); - APSARA_TEST_EQUAL(30U, sConcurrencyLimiter->GetCurrentInterval()); + APSARA_TEST_EQUAL(expect, sConcurrencyLimiter->GetCurrentLimit()); } UNIT_TEST_CASE(ConcurrencyLimiterUnittest, TestLimiter) diff --git a/core/unittest/sender/FlusherRunnerUnittest.cpp b/core/unittest/sender/FlusherRunnerUnittest.cpp index 9bb1b6e1fa..076b51fb07 100644 --- a/core/unittest/sender/FlusherRunnerUnittest.cpp +++ b/core/unittest/sender/FlusherRunnerUnittest.cpp @@ -47,6 +47,8 @@ void FlusherRunnerUnittest::TestDispatch() { flusher->SetMetricsRecordRef("name", "1"); flusher->Init(Json::Value(), tmp); + AppConfig::GetInstance()->mSendRequestGlobalConcurrency = 10; + auto item = make_unique("content", 10, flusher.get(), flusher->GetQueueKey()); auto realItem = item.get(); flusher->PushToQueue(std::move(item));