Skip to content

Commit

Permalink
refine limiter code
Browse files Browse the repository at this point in the history
  • Loading branch information
linrunqi08 committed Dec 23, 2024
1 parent fc6820f commit b208161
Show file tree
Hide file tree
Showing 10 changed files with 218 additions and 113 deletions.
9 changes: 9 additions & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -1190,6 +1190,15 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mBindInterface.clear();
LOG_INFO(sLogger, ("bind_interface", mBindInterface));
}

// mSendRequestConcurrency was limited in [15, 80]
if (mSendRequestConcurrency < 15) {
mSendRequestConcurrency = 15;
}
if (mSendRequestConcurrency > 80) {
mSendRequestConcurrency = 80;
}
mSendRequestGlobalConcurrency = mSendRequestConcurrency * 1.5;
}

bool AppConfig::CheckAndResetProxyEnv() {
Expand Down
3 changes: 3 additions & 0 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,7 @@ class AppConfig {
int32_t mNumOfBufferFile;
int32_t mLocalFileSize;
int32_t mSendRequestConcurrency;
int32_t mSendRequestGlobalConcurrency;
std::string mBufferFilePath;

// checkpoint
Expand Down Expand Up @@ -438,6 +439,8 @@ class AppConfig {

int32_t GetSendRequestConcurrency() const { return mSendRequestConcurrency; }

int32_t GetSendRequestGlobalConcurrency() const { return mSendRequestGlobalConcurrency; }

int32_t GetProcessThreadCount() const { return mProcessThreadCount; }

// const std::string& GetMappingConfigPath() const { return mMappingConfigPath; }
Expand Down
96 changes: 66 additions & 30 deletions core/pipeline/limiter/ConcurrencyLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ uint32_t ConcurrencyLimiter::GetCurrentLimit() const {
return mCurrenctConcurrency;
}

uint32_t ConcurrencyLimiter::GetCurrentInterval() const {
lock_guard<mutex> lock(mLimiterMux);
return mRetryIntervalSecs;
}
void ConcurrencyLimiter::SetCurrentLimit(uint32_t limit) {
lock_guard<mutex> lock(mLimiterMux);
mCurrenctConcurrency = limit;
Expand All @@ -42,19 +38,15 @@ void ConcurrencyLimiter::SetInSendingCount(uint32_t count) {
uint32_t ConcurrencyLimiter::GetInSendingCount() const {
return mInSendingCnt.load();
}

uint32_t ConcurrencyLimiter::GetStatisticThreshold() const {
return mStatisticThreshold;
}

#endif

bool ConcurrencyLimiter::IsValidToPop() {
lock_guard<mutex> lock(mLimiterMux);
if (mCurrenctConcurrency == 0) {
auto curTime = std::chrono::system_clock::now();
if (chrono::duration_cast<chrono::seconds>(curTime - mLastCheckTime).count() > mRetryIntervalSecs) {
mLastCheckTime = curTime;
return true;
} else {
return false;
}
}
if (mCurrenctConcurrency > mInSendingCnt.load()) {
return true;
}
Expand All @@ -69,14 +61,22 @@ void ConcurrencyLimiter::OnSendDone() {
--mInSendingCnt;
}

void ConcurrencyLimiter::OnSuccess() {
void ConcurrencyLimiter::OnSuccess(std::chrono::system_clock::time_point time) {
AdjustConcurrency(true, time);
}

void ConcurrencyLimiter::OnFail(std::chrono::system_clock::time_point time) {
AdjustConcurrency(false, time);
}



void ConcurrencyLimiter::Increase() {
lock_guard<mutex> lock(mLimiterMux);
if (mCurrenctConcurrency <= 0) {
mRetryIntervalSecs = mMinRetryIntervalSecs;
LOG_INFO(sLogger, ("reset send retry interval, type", mDescription));
}
if (mCurrenctConcurrency != mMaxConcurrency) {
++mCurrenctConcurrency;
LOG_INFO(sLogger,
("increase send concurrency", mDescription)("concurrency", mCurrenctConcurrency));
if (mCurrenctConcurrency == mMaxConcurrency) {
LOG_INFO(sLogger,
("increase send concurrency to maximum, type", mDescription)("concurrency", mCurrenctConcurrency));
Expand All @@ -88,22 +88,58 @@ void ConcurrencyLimiter::OnSuccess() {
}
}

void ConcurrencyLimiter::OnFail() {
void ConcurrencyLimiter::Decrease(bool fastFallBack) {
lock_guard<mutex> lock(mLimiterMux);
if (mCurrenctConcurrency != 0) {
auto old = mCurrenctConcurrency;
mCurrenctConcurrency = static_cast<uint32_t>(mCurrenctConcurrency * mConcurrencyDownRatio);
LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency));
if (fastFallBack) {
if (mCurrenctConcurrency != mMinConcurrency) {
auto old = mCurrenctConcurrency;
mCurrenctConcurrency = std::max(static_cast<uint32_t>(mCurrenctConcurrency * mConcurrencyDownFastRatio), mMinConcurrency);
LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency));
}
} else {
if (mRetryIntervalSecs != mMaxRetryIntervalSecs) {
auto old = mRetryIntervalSecs;
mRetryIntervalSecs
= min(mMaxRetryIntervalSecs, static_cast<uint32_t>(mRetryIntervalSecs * mRetryIntervalUpRatio));
LOG_INFO(sLogger,
("increase send retry interval, type",
mDescription)("from", ToString(old) + "s")("to", ToString(mRetryIntervalSecs) + "s"));
if (mCurrenctConcurrency != mMinConcurrency) {
mCurrenctConcurrency = std::max(static_cast<uint32_t>(mCurrenctConcurrency * mConcurrencyDownSlowRatio), mMinConcurrency);
LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("to", mCurrenctConcurrency));
} else {
if (mMinConcurrency == 0) {
mCurrenctConcurrency = 1;
LOG_INFO(sLogger, ("decrease send concurrency to min, type", mDescription)("to", mCurrenctConcurrency));
}
}
}
}


void ConcurrencyLimiter::AdjustConcurrency(bool success, std::chrono::system_clock::time_point time) {
lock_guard<mutex> lock(mStatisticsMux);
mStatisticsTotal ++;
if (!success) {
mStatisticsFailTotal ++;
}
if (mLastStatisticsTime == std::chrono::system_clock::time_point()) {
mLastStatisticsTime = time;
}

if (mStatisticsTotal == mStatisticThreshold || chrono::duration_cast<chrono::seconds>(time - mLastStatisticsTime).count() > mStatisticIntervalThresholdSeconds) {
uint32_t failPercentage = mStatisticsFailTotal*100/mStatisticsTotal;
LOG_DEBUG(sLogger,("AdjustConcurrency", mDescription)("mStatisticsFailTotal", mStatisticsFailTotal)("mStatisticsTotal", mStatisticsTotal));
mStatisticsTotal = 0;
mStatisticsFailTotal = 0;
mLastStatisticsTime = time;
if (failPercentage == 0) {
// 成功
Increase();
} else if (failPercentage <= 10) {
// 不调整
} else if (failPercentage <= 40) {
// 慢回退
Decrease(false);
} else {
// 快速回退
Decrease(true);
}
}
}


} // namespace logtail
45 changes: 27 additions & 18 deletions core/pipeline/limiter/ConcurrencyLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,25 +30,23 @@ class ConcurrencyLimiter {
public:
ConcurrencyLimiter(const std::string& description,
uint32_t maxConcurrency,
uint32_t maxRetryIntervalSecs = 3600,
uint32_t minRetryIntervalSecs = 30,
double retryIntervalUpRatio = 1.5,
double concurrencyDownRatio = 0.5)
uint32_t minConcurrency = 0,
double concurrencyDownFastRatio = 0.5,
double concurrencyDownSlowRatio = 0.8)
: mDescription(description),
mMaxConcurrency(maxConcurrency),
mMinConcurrency(minConcurrency),
mCurrenctConcurrency(maxConcurrency),
mMaxRetryIntervalSecs(maxRetryIntervalSecs),
mMinRetryIntervalSecs(minRetryIntervalSecs),
mRetryIntervalSecs(minRetryIntervalSecs),
mRetryIntervalUpRatio(retryIntervalUpRatio),
mConcurrencyDownRatio(concurrencyDownRatio) {}
mConcurrencyDownFastRatio(concurrencyDownFastRatio),
mConcurrencyDownSlowRatio(concurrencyDownSlowRatio) {}

bool IsValidToPop();
void PostPop();
void OnSendDone();

void OnSuccess();
void OnFail();
void OnSuccess(std::chrono::system_clock::time_point time);
void OnFail(std::chrono::system_clock::time_point time);


static std::string GetLimiterMetricName(const std::string& limiter) {
if (limiter == "region") {
Expand All @@ -64,10 +62,10 @@ class ConcurrencyLimiter {
#ifdef APSARA_UNIT_TEST_MAIN

uint32_t GetCurrentLimit() const;
uint32_t GetCurrentInterval() const;
void SetCurrentLimit(uint32_t limit);
void SetInSendingCount(uint32_t count);
uint32_t GetInSendingCount() const;
uint32_t GetStatisticThreshold() const;

#endif

Expand All @@ -77,19 +75,30 @@ class ConcurrencyLimiter {
std::atomic_uint32_t mInSendingCnt = 0U;

uint32_t mMaxConcurrency = 0;
uint32_t mMinConcurrency = 0;

mutable std::mutex mLimiterMux;
uint32_t mCurrenctConcurrency = 0;

uint32_t mMaxRetryIntervalSecs = 0;
uint32_t mMinRetryIntervalSecs = 0;
double mConcurrencyDownFastRatio = 0.0;
double mConcurrencyDownSlowRatio = 0.0;

uint32_t mRetryIntervalSecs = 0;
std::chrono::system_clock::time_point mLastCheckTime;

double mRetryIntervalUpRatio = 0.0;
double mConcurrencyDownRatio = 0.0;
mutable std::mutex mStatisticsMux;
std::chrono::system_clock::time_point mLastStatisticsTime;
uint32_t mStatisticsTotal = 0;
uint32_t mStatisticsFailTotal = 0;

// 统计10个数据,最多等3s
const uint32_t mStatisticThreshold = 10;
const uint32_t mStatisticIntervalThresholdSeconds = 3;


void Increase();
void Decrease(bool fastFallBack);
void AdjustConcurrency(bool success, std::chrono::system_clock::time_point time);

std::chrono::system_clock::time_point mLastCheckTime;
};

} // namespace logtail
7 changes: 0 additions & 7 deletions core/pipeline/queue/BoundedSenderQueueInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ void BoundedSenderQueueInterface::SetConcurrencyLimiters(std::unordered_map<std:
}
}

void BoundedSenderQueueInterface::OnSendingSuccess() {
for (auto& limiter : mConcurrencyLimiters) {
if (limiter.first != nullptr) {
limiter.first->OnSuccess();
}
}
}

void BoundedSenderQueueInterface::DecreaseSendingCnt() {
for (auto& limiter : mConcurrencyLimiters) {
Expand Down
1 change: 0 additions & 1 deletion core/pipeline/queue/BoundedSenderQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface<std::unique_ptr
virtual void GetAvailableItems(std::vector<SenderQueueItem*>& items, int32_t limit) = 0;

void DecreaseSendingCnt();
void OnSendingSuccess();
void SetRateLimiter(uint32_t maxRate);
void SetConcurrencyLimiters(std::unordered_map<std::string, std::shared_ptr<ConcurrencyLimiter>>&& concurrencyLimitersMap);
virtual void SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const = 0;
Expand Down
39 changes: 21 additions & 18 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetLogstoreConcurrencyLimiter(const s

auto iter = sLogstoreConcurrencyLimiterMap.find(key);
if (iter == sLogstoreConcurrencyLimiterMap.end()) {
auto limiter = GetConcurrencyLimiter(sName + "#quota#logstore#" + key);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency(), 1);
sLogstoreConcurrencyLimiterMap.try_emplace(key, limiter);
return limiter;
}
if (iter->second.expired()) {
auto limiter = GetConcurrencyLimiter(sName + "#quota#logstore#" + key);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency(), 1);
iter->second = limiter;
return limiter;
}
Expand All @@ -139,12 +139,12 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetProjectConcurrencyLimiter(const st
lock_guard<mutex> lock(sMux);
auto iter = sProjectConcurrencyLimiterMap.find(project);
if (iter == sProjectConcurrencyLimiterMap.end()) {
auto limiter = GetConcurrencyLimiter(sName + "#quota#project#" + project);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency(), 1);
sProjectConcurrencyLimiterMap.try_emplace(project, limiter);
return limiter;
}
if (iter->second.expired()) {
auto limiter = GetConcurrencyLimiter(sName + "#quota#project#" + project);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency(), 1);
iter->second = limiter;
return limiter;
}
Expand All @@ -155,12 +155,12 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetRegionConcurrencyLimiter(const str
lock_guard<mutex> lock(sMux);
auto iter = sRegionConcurrencyLimiterMap.find(region);
if (iter == sRegionConcurrencyLimiterMap.end()) {
auto limiter = GetConcurrencyLimiter(sName + "#network#region#" + region);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*0.5);
sRegionConcurrencyLimiterMap.try_emplace(region, limiter);
return limiter;
}
if (iter->second.expired()) {
auto limiter = GetConcurrencyLimiter(sName + "#network#region#" + region);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*0.5);
iter->second = limiter;
return limiter;
}
Expand Down Expand Up @@ -637,6 +637,9 @@ bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr<HttpSinkRequest>
item,
data->mShardHashKey);
}
if (req) {
req->mMaxTryCnt = 1;
}
if (!req) {
*keepItem = true;
return false;
Expand Down Expand Up @@ -690,9 +693,9 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
ToString(chrono::duration_cast<chrono::milliseconds>(curSystemTime - item->mFirstEnqueTime).count())
+ "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentEndpoint)("is profile data",
isProfileData));
GetRegionConcurrencyLimiter(mRegion)->OnSuccess();
GetProjectConcurrencyLimiter(mProject)->OnSuccess();
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess();
GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime);
GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime);
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime);
SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey);
DealSenderQueueItemAfterSend(item, false);
if (mSuccessCnt) {
Expand Down Expand Up @@ -733,27 +736,27 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
}
}
operation = data->mBufferOrNot ? OperationOnFail::RETRY_LATER : OperationOnFail::DISCARD;
GetRegionConcurrencyLimiter(mRegion)->OnFail();
GetProjectConcurrencyLimiter(mProject)->OnSuccess();
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess();
GetRegionConcurrencyLimiter(mRegion)->OnFail(curSystemTime);
GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime);
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime);
} else if (sendResult == SEND_QUOTA_EXCEED) {
BOOL_FLAG(global_network_success) = true;
if (slsResponse.mErrorCode == sdk::LOGE_SHARD_WRITE_QUOTA_EXCEED) {
failDetail << "shard write quota exceed";
suggestion << "Split logstore shards. https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnFail();
GetRegionConcurrencyLimiter(mRegion)->OnSuccess();
GetProjectConcurrencyLimiter(mProject)->OnSuccess();
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnFail(curSystemTime);
GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime);
GetProjectConcurrencyLimiter(mProject)->OnSuccess(curSystemTime);
if (mShardWriteQuotaErrorCnt) {
mShardWriteQuotaErrorCnt->Add(1);
}
} else {
failDetail << "project write quota exceed";
suggestion << "Submit quota modification request. "
"https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
GetProjectConcurrencyLimiter(mProject)->OnFail();
GetRegionConcurrencyLimiter(mRegion)->OnSuccess();
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess();
GetProjectConcurrencyLimiter(mProject)->OnFail(curSystemTime);
GetRegionConcurrencyLimiter(mRegion)->OnSuccess(curSystemTime);
GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(curSystemTime);
if (mProjectQuotaErrorCnt) {
mProjectQuotaErrorCnt->Add(1);
}
Expand Down
4 changes: 2 additions & 2 deletions core/runner/FlusherRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ void FlusherRunner::DecreaseHttpSendingCnt() {
void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) {
// TODO: use semaphore instead
while (withLimit && !Application::GetInstance()->IsExiting()
&& GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestConcurrency()) {
&& GetSendingBufferCount() >= AppConfig::GetInstance()->GetSendRequestGlobalConcurrency()) {
this_thread::sleep_for(chrono::milliseconds(10));
}

Expand Down Expand Up @@ -155,7 +155,7 @@ void FlusherRunner::Run() {

vector<SenderQueueItem*> items;
int32_t limit
= Application::GetInstance()->IsExiting() ? -1 : AppConfig::GetInstance()->GetSendRequestConcurrency();
= Application::GetInstance()->IsExiting() ? -1 : AppConfig::GetInstance()->GetSendRequestGlobalConcurrency();
SenderQueueManager::GetInstance()->GetAvailableItems(items, limit);
if (items.empty()) {
SenderQueueManager::GetInstance()->Wait(1000);
Expand Down
Loading

0 comments on commit b208161

Please sign in to comment.