Skip to content

Commit

Permalink
refine limiter code
Browse files Browse the repository at this point in the history
  • Loading branch information
linrunqi.lrq committed Dec 12, 2024
1 parent fc6820f commit 66dbe52
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 19 deletions.
44 changes: 31 additions & 13 deletions core/pipeline/limiter/ConcurrencyLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ void ConcurrencyLimiter::OnSuccess() {
}
if (mCurrenctConcurrency != mMaxConcurrency) {
++mCurrenctConcurrency;
LOG_INFO(sLogger,
("increase send concurrency", mDescription)("concurrency", mCurrenctConcurrency));
if (mCurrenctConcurrency == mMaxConcurrency) {
LOG_INFO(sLogger,
("increase send concurrency to maximum, type", mDescription)("concurrency", mCurrenctConcurrency));
Expand All @@ -88,22 +90,38 @@ void ConcurrencyLimiter::OnSuccess() {
}
}


void ConcurrencyLimiter::OnFail() {
lock_guard<mutex> lock(mLimiterMux);
if (mCurrenctConcurrency != 0) {
auto old = mCurrenctConcurrency;
mCurrenctConcurrency = static_cast<uint32_t>(mCurrenctConcurrency * mConcurrencyDownRatio);
LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency));
} else {
if (mRetryIntervalSecs != mMaxRetryIntervalSecs) {
auto old = mRetryIntervalSecs;
mRetryIntervalSecs
= min(mMaxRetryIntervalSecs, static_cast<uint32_t>(mRetryIntervalSecs * mRetryIntervalUpRatio));
LOG_INFO(sLogger,
("increase send retry interval, type",
mDescription)("from", ToString(old) + "s")("to", ToString(mRetryIntervalSecs) + "s"));
}
switch (mFallbackMode) {
case (Fast):
if (mCurrenctConcurrency != 0) {
auto old = mCurrenctConcurrency;
mCurrenctConcurrency = static_cast<uint32_t>(mCurrenctConcurrency * mConcurrencyDownRatio);
LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency));
} else {
if (mRetryIntervalSecs != mMaxRetryIntervalSecs) {
auto old = mRetryIntervalSecs;
mRetryIntervalSecs
= min(mMaxRetryIntervalSecs, static_cast<uint32_t>(mRetryIntervalSecs * mRetryIntervalUpRatio));
LOG_INFO(sLogger,
("increase send retry interval, type",
mDescription)("from", ToString(old) + "s")("to", ToString(mRetryIntervalSecs) + "s"));
}

}
break;
case (Slow):
if (mCurrenctConcurrency != 0) {
mCurrenctConcurrency = mCurrenctConcurrency - 1;
LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("to", mCurrenctConcurrency));
} else {
mCurrenctConcurrency = 1;
LOG_INFO(sLogger, ("decrease send concurrency to min, type", mDescription)("to", mCurrenctConcurrency));
}
break;
}

}

} // namespace logtail
13 changes: 11 additions & 2 deletions core/pipeline/limiter/ConcurrencyLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,21 @@ namespace logtail {

class ConcurrencyLimiter {
public:
enum FallbackMode {
Fast,
Slow
};

ConcurrencyLimiter(const std::string& description,
uint32_t maxConcurrency,
uint32_t maxRetryIntervalSecs = 3600,
uint32_t minRetryIntervalSecs = 30,
FallbackMode fallbackMode = Fast,
uint32_t maxRetryIntervalSecs = 60,
uint32_t minRetryIntervalSecs = 5,
double retryIntervalUpRatio = 1.5,
double concurrencyDownRatio = 0.5)
: mDescription(description),
mMaxConcurrency(maxConcurrency),
mFallbackMode(fallbackMode),
mCurrenctConcurrency(maxConcurrency),
mMaxRetryIntervalSecs(maxRetryIntervalSecs),
mMinRetryIntervalSecs(minRetryIntervalSecs),
Expand Down Expand Up @@ -78,6 +85,8 @@ class ConcurrencyLimiter {

uint32_t mMaxConcurrency = 0;

FallbackMode mFallbackMode;

mutable std::mutex mLimiterMux;
uint32_t mCurrenctConcurrency = 0;

Expand Down
8 changes: 4 additions & 4 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -123,12 +123,12 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetLogstoreConcurrencyLimiter(const s

auto iter = sLogstoreConcurrencyLimiterMap.find(key);
if (iter == sLogstoreConcurrencyLimiterMap.end()) {
auto limiter = GetConcurrencyLimiter(sName + "#quota#logstore#" + key);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency(), ConcurrencyLimiter::FallbackMode::Slow, 10, 5);
sLogstoreConcurrencyLimiterMap.try_emplace(key, limiter);
return limiter;
}
if (iter->second.expired()) {
auto limiter = GetConcurrencyLimiter(sName + "#quota#logstore#" + key);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency(), ConcurrencyLimiter::FallbackMode::Slow, 10, 5);
iter->second = limiter;
return limiter;
}
Expand All @@ -139,12 +139,12 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetProjectConcurrencyLimiter(const st
lock_guard<mutex> lock(sMux);
auto iter = sProjectConcurrencyLimiterMap.find(project);
if (iter == sProjectConcurrencyLimiterMap.end()) {
auto limiter = GetConcurrencyLimiter(sName + "#quota#project#" + project);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency(), ConcurrencyLimiter::FallbackMode::Slow, 10, 5);
sProjectConcurrencyLimiterMap.try_emplace(project, limiter);
return limiter;
}
if (iter->second.expired()) {
auto limiter = GetConcurrencyLimiter(sName + "#quota#project#" + project);
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency(), ConcurrencyLimiter::FallbackMode::Slow, 10, 5);
iter->second = limiter;
return limiter;
}
Expand Down

0 comments on commit 66dbe52

Please sign in to comment.