Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Optimize the limiter code to meet better isolation and recovery scenarios #1985

Merged
merged 7 commits into from
Dec 27, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 25 additions & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -173,6 +173,7 @@ DEFINE_FLAG_STRING(logtail_snapshot_dir, "snapshot dir on local disk", "snapshot
DEFINE_FLAG_STRING(logtail_profile_snapshot, "reader profile on local disk", "logtail_profile_snapshot");
DEFINE_FLAG_STRING(ilogtail_config_env_name, "config file path", "ALIYUN_LOGTAIL_CONFIG");


#if defined(__linux__)
DEFINE_FLAG_STRING(adhoc_check_point_file_dir, "", "/tmp/logtail_adhoc_checkpoint");
#elif defined(_MSC_VER)
Expand All @@ -192,6 +193,21 @@ DEFINE_FLAG_STRING(sls_observer_ebpf_host_path,
namespace logtail {
constexpr int32_t kDefaultMaxSendBytePerSec = 25 * 1024 * 1024; // the max send speed per sec, realtime thread


// 全局并发度保留余量百分比
const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION = 0.5;
// 单地域并发度最小值
const int32_t MIN_SEND_REQUEST_CONCURRENCY = 15;
// 单地域并发度最大值
const int32_t MAX_SEND_REQUEST_CONCURRENCY = 80;
// 并发度统计数量&&时间间隔
const uint32_t CONCURRENCY_STATISTIC_THRESHOLD = 10;
const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS = 3;
// 并发度不回退百分比阈值
const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE = 10;
// 并发度慢回退百分比阈值
const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE = 40;

std::string AppConfig::sLocalConfigDir = "local";
void CreateAgentDir() {
try {
Expand Down Expand Up @@ -1190,6 +1206,15 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mBindInterface.clear();
LOG_INFO(sLogger, ("bind_interface", mBindInterface));
}

// mSendRequestConcurrency was limited
if (mSendRequestConcurrency < MIN_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MIN_SEND_REQUEST_CONCURRENCY;
}
if (mSendRequestConcurrency > MAX_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MAX_SEND_REQUEST_CONCURRENCY;
}
mSendRequestGlobalConcurrency = mSendRequestConcurrency * (1 + GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION);
}

bool AppConfig::CheckAndResetProxyEnv() {
Expand Down
17 changes: 16 additions & 1 deletion core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
namespace logtail {
extern const int32_t kDefaultMaxSendBytePerSec;

extern const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION;
extern const int32_t MIN_SEND_REQUEST_CONCURRENCY;
extern const int32_t MAX_SEND_REQUEST_CONCURRENCY;
extern const uint32_t CONCURRENCY_STATISTIC_THRESHOLD;
extern const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS;
extern const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE;
extern const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE;

void CreateAgentDir();

std::string GetAgentLogDir();
Expand Down Expand Up @@ -132,6 +140,7 @@ class AppConfig {
int32_t mNumOfBufferFile;
int32_t mLocalFileSize;
int32_t mSendRequestConcurrency;
int32_t mSendRequestGlobalConcurrency;
std::string mBufferFilePath;

// checkpoint
Expand Down Expand Up @@ -208,6 +217,8 @@ class AppConfig {

std::string mBindInterface;



// /**
// * @brief Load ConfigServer, DataServer and network interface
// *
Expand Down Expand Up @@ -435,8 +446,12 @@ class AppConfig {
int32_t GetLocalFileSize() const { return mLocalFileSize; }

const std::string& GetBufferFilePath() const { return mBufferFilePath; }

// 单地域并发度
int32_t GetSendRequestConcurrency() const { return mSendRequestConcurrency; }
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
// 全局并发度
int32_t GetSendRequestGlobalConcurrency() const { return mSendRequestGlobalConcurrency; }

double GetGlobalConcurrencyFreePercentageForOneRegion() const { return GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION; }

int32_t GetProcessThreadCount() const { return mProcessThreadCount; }

Expand Down
92 changes: 61 additions & 31 deletions core/pipeline/limiter/ConcurrencyLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,12 @@
using namespace std;

namespace logtail {

#ifdef APSARA_UNIT_TEST_MAIN
uint32_t ConcurrencyLimiter::GetCurrentLimit() const {
lock_guard<mutex> lock(mLimiterMux);
return mCurrenctConcurrency;
}

uint32_t ConcurrencyLimiter::GetCurrentInterval() const {
lock_guard<mutex> lock(mLimiterMux);
return mRetryIntervalSecs;
}
void ConcurrencyLimiter::SetCurrentLimit(uint32_t limit) {
lock_guard<mutex> lock(mLimiterMux);
mCurrenctConcurrency = limit;
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
Expand All @@ -42,19 +37,15 @@ void ConcurrencyLimiter::SetInSendingCount(uint32_t count) {
uint32_t ConcurrencyLimiter::GetInSendingCount() const {
return mInSendingCnt.load();
}

uint32_t ConcurrencyLimiter::GetStatisticThreshold() const {
return CONCURRENCY_STATISTIC_THRESHOLD;
}

#endif

bool ConcurrencyLimiter::IsValidToPop() {
lock_guard<mutex> lock(mLimiterMux);
if (mCurrenctConcurrency == 0) {
auto curTime = std::chrono::system_clock::now();
if (chrono::duration_cast<chrono::seconds>(curTime - mLastCheckTime).count() > mRetryIntervalSecs) {
mLastCheckTime = curTime;
return true;
} else {
return false;
}
}
if (mCurrenctConcurrency > mInSendingCnt.load()) {
return true;
}
Expand All @@ -69,16 +60,20 @@ void ConcurrencyLimiter::OnSendDone() {
--mInSendingCnt;
}

void ConcurrencyLimiter::OnSuccess() {
void ConcurrencyLimiter::OnSuccess(std::chrono::system_clock::time_point currentTime) {
AdjustConcurrency(true, currentTime);
}

void ConcurrencyLimiter::OnFail(std::chrono::system_clock::time_point currentTime) {
AdjustConcurrency(false, currentTime);
}

void ConcurrencyLimiter::Increase() {
lock_guard<mutex> lock(mLimiterMux);
if (mCurrenctConcurrency <= 0) {
mRetryIntervalSecs = mMinRetryIntervalSecs;
LOG_INFO(sLogger, ("reset send retry interval, type", mDescription));
}
if (mCurrenctConcurrency != mMaxConcurrency) {
++mCurrenctConcurrency;
if (mCurrenctConcurrency == mMaxConcurrency) {
LOG_INFO(sLogger,
LOG_DEBUG(sLogger,
("increase send concurrency to maximum, type", mDescription)("concurrency", mCurrenctConcurrency));
} else {
LOG_DEBUG(sLogger,
Expand All @@ -88,22 +83,57 @@ void ConcurrencyLimiter::OnSuccess() {
}
}

void ConcurrencyLimiter::OnFail() {
void ConcurrencyLimiter::Decrease(double fallBackRatio) {
lock_guard<mutex> lock(mLimiterMux);
linrunqi08 marked this conversation as resolved.
Show resolved Hide resolved
if (mCurrenctConcurrency != 0) {
if (mCurrenctConcurrency != mMinConcurrency) {
auto old = mCurrenctConcurrency;
mCurrenctConcurrency = static_cast<uint32_t>(mCurrenctConcurrency * mConcurrencyDownRatio);
LOG_INFO(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency));
mCurrenctConcurrency = std::max(static_cast<uint32_t>(mCurrenctConcurrency * fallBackRatio), mMinConcurrency);
LOG_DEBUG(sLogger, ("decrease send concurrency, type", mDescription)("from", old)("to", mCurrenctConcurrency));
} else {
if (mRetryIntervalSecs != mMaxRetryIntervalSecs) {
auto old = mRetryIntervalSecs;
mRetryIntervalSecs
= min(mMaxRetryIntervalSecs, static_cast<uint32_t>(mRetryIntervalSecs * mRetryIntervalUpRatio));
LOG_INFO(sLogger,
("increase send retry interval, type",
mDescription)("from", ToString(old) + "s")("to", ToString(mRetryIntervalSecs) + "s"));
if (mMinConcurrency == 0) {
mCurrenctConcurrency = 1;
LOG_INFO(sLogger, ("decrease send concurrency to min, type", mDescription)("to", mCurrenctConcurrency));
}
}
}


void ConcurrencyLimiter::AdjustConcurrency(bool success, std::chrono::system_clock::time_point currentTime) {
uint32_t failPercentage = 0;
bool finishStatistics = false;
{
lock_guard<mutex> lock(mStatisticsMux);
mStatisticsTotal ++;
if (!success) {
mStatisticsFailTotal ++;
}
if (mLastStatisticsTime == std::chrono::system_clock::time_point()) {
mLastStatisticsTime = currentTime;
}
if (mStatisticsTotal == CONCURRENCY_STATISTIC_THRESHOLD || chrono::duration_cast<chrono::seconds>(currentTime - mLastStatisticsTime).count() > CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS) {
failPercentage = mStatisticsFailTotal*100/mStatisticsTotal;
LOG_DEBUG(sLogger,("AdjustConcurrency", mDescription)("mStatisticsFailTotal", mStatisticsFailTotal)("mStatisticsTotal", mStatisticsTotal));
mStatisticsTotal = 0;
mStatisticsFailTotal = 0;
mLastStatisticsTime = currentTime;
finishStatistics = true;
}
}
if (finishStatistics) {
if (failPercentage == 0) {
// 成功
Increase();
} else if (failPercentage <= NO_FALL_BACK_FAIL_PERCENTAGE) {
// 不调整
} else if (failPercentage <= SLOW_FALL_BACK_FAIL_PERCENTAGE) {
// 慢回退
Decrease(mConcurrencySlowFallBackRatio);
} else {
// 快速回退
Decrease(mConcurrencyFastFallBackRatio);
}
}
}


} // namespace logtail
42 changes: 23 additions & 19 deletions core/pipeline/limiter/ConcurrencyLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,33 +22,31 @@
#include <mutex>
#include <string>

#include "app_config/AppConfig.h"
#include "monitor/metric_constants/MetricConstants.h"

namespace logtail {

class ConcurrencyLimiter {
public:
ConcurrencyLimiter(const std::string& description,
uint32_t maxConcurrency,
uint32_t maxRetryIntervalSecs = 3600,
uint32_t minRetryIntervalSecs = 30,
double retryIntervalUpRatio = 1.5,
double concurrencyDownRatio = 0.5)
uint32_t minConcurrency = 1,
double concurrencyFastFallBackRatio = 0.5,
double concurrencySlowFallBackRatio = 0.8)
: mDescription(description),
mMaxConcurrency(maxConcurrency),
mMinConcurrency(minConcurrency),
mCurrenctConcurrency(maxConcurrency),
mMaxRetryIntervalSecs(maxRetryIntervalSecs),
mMinRetryIntervalSecs(minRetryIntervalSecs),
mRetryIntervalSecs(minRetryIntervalSecs),
mRetryIntervalUpRatio(retryIntervalUpRatio),
mConcurrencyDownRatio(concurrencyDownRatio) {}
mConcurrencyFastFallBackRatio(concurrencyFastFallBackRatio),
mConcurrencySlowFallBackRatio(concurrencySlowFallBackRatio) {}

bool IsValidToPop();
void PostPop();
void OnSendDone();

void OnSuccess();
void OnFail();
void OnSuccess(std::chrono::system_clock::time_point currentTime);
void OnFail(std::chrono::system_clock::time_point currentTime);


static std::string GetLimiterMetricName(const std::string& limiter) {
if (limiter == "region") {
Expand All @@ -64,10 +62,10 @@ class ConcurrencyLimiter {
#ifdef APSARA_UNIT_TEST_MAIN

uint32_t GetCurrentLimit() const;
uint32_t GetCurrentInterval() const;
void SetCurrentLimit(uint32_t limit);
void SetInSendingCount(uint32_t count);
uint32_t GetInSendingCount() const;
uint32_t GetStatisticThreshold() const;

#endif

Expand All @@ -77,19 +75,25 @@ class ConcurrencyLimiter {
std::atomic_uint32_t mInSendingCnt = 0U;

uint32_t mMaxConcurrency = 0;
uint32_t mMinConcurrency = 0;

mutable std::mutex mLimiterMux;
uint32_t mCurrenctConcurrency = 0;

uint32_t mMaxRetryIntervalSecs = 0;
uint32_t mMinRetryIntervalSecs = 0;
double mConcurrencyFastFallBackRatio = 0.0;
double mConcurrencySlowFallBackRatio = 0.0;

uint32_t mRetryIntervalSecs = 0;
std::chrono::system_clock::time_point mLastCheckTime;

double mRetryIntervalUpRatio = 0.0;
double mConcurrencyDownRatio = 0.0;
mutable std::mutex mStatisticsMux;
std::chrono::system_clock::time_point mLastStatisticsTime;
uint32_t mStatisticsTotal = 0;
uint32_t mStatisticsFailTotal = 0;

void Increase();
void Decrease(double fallBackRatio);
void AdjustConcurrency(bool success, std::chrono::system_clock::time_point currentTime);

std::chrono::system_clock::time_point mLastCheckTime;
};

} // namespace logtail
7 changes: 0 additions & 7 deletions core/pipeline/queue/BoundedSenderQueueInterface.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,13 +56,6 @@ void BoundedSenderQueueInterface::SetConcurrencyLimiters(std::unordered_map<std:
}
}

void BoundedSenderQueueInterface::OnSendingSuccess() {
for (auto& limiter : mConcurrencyLimiters) {
if (limiter.first != nullptr) {
limiter.first->OnSuccess();
}
}
}

void BoundedSenderQueueInterface::DecreaseSendingCnt() {
for (auto& limiter : mConcurrencyLimiters) {
Expand Down
1 change: 0 additions & 1 deletion core/pipeline/queue/BoundedSenderQueueInterface.h
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,6 @@ class BoundedSenderQueueInterface : public BoundedQueueInterface<std::unique_ptr
virtual void GetAvailableItems(std::vector<SenderQueueItem*>& items, int32_t limit) = 0;

void DecreaseSendingCnt();
void OnSendingSuccess();
void SetRateLimiter(uint32_t maxRate);
void SetConcurrencyLimiters(std::unordered_map<std::string, std::shared_ptr<ConcurrencyLimiter>>&& concurrencyLimitersMap);
virtual void SetPipelineForItems(const std::shared_ptr<Pipeline>& p) const = 0;
Expand Down
Loading
Loading