Skip to content

Commit

Permalink
fix comments
Browse files Browse the repository at this point in the history
  • Loading branch information
linrunqi08 committed Dec 27, 2024
1 parent 9544127 commit fe0d205
Show file tree
Hide file tree
Showing 7 changed files with 42 additions and 31 deletions.
25 changes: 20 additions & 5 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -193,6 +193,21 @@ DEFINE_FLAG_STRING(sls_observer_ebpf_host_path,
namespace logtail {
constexpr int32_t kDefaultMaxSendBytePerSec = 25 * 1024 * 1024; // the max send speed per sec, realtime thread


// 全局并发度保留余量百分比
const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION = 0.5;
// 单地域并发度最小值
const int32_t MIN_SEND_REQUEST_CONCURRENCY = 15;
// 单地域并发度最大值
const int32_t MAX_SEND_REQUEST_CONCURRENCY = 80;
// 并发度统计数量&&时间间隔
const uint32_t CONCURRENCY_STATISTIC_THRESHOLD = 10;
const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS = 3;
// 并发度不回退百分比阈值
const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE = 10;
// 并发度慢回退百分比阈值
const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE = 40;

std::string AppConfig::sLocalConfigDir = "local";
void CreateAgentDir() {
try {
Expand Down Expand Up @@ -1193,13 +1208,13 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
}

// mSendRequestConcurrency was limited
if (mSendRequestConcurrency < mMinSendRequestConcurrency) {
mSendRequestConcurrency = mMinSendRequestConcurrency;
if (mSendRequestConcurrency < MIN_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MIN_SEND_REQUEST_CONCURRENCY;
}
if (mSendRequestConcurrency > mMaxSendRequestConcurrency) {
mSendRequestConcurrency = mMaxSendRequestConcurrency;
if (mSendRequestConcurrency > MAX_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MAX_SEND_REQUEST_CONCURRENCY;
}
mSendRequestGlobalConcurrency = mSendRequestConcurrency * (1 + mGlobalConcurrencyFreePercentageForOneRegion);
mSendRequestGlobalConcurrency = mSendRequestConcurrency * (1 + GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION);
}

bool AppConfig::CheckAndResetProxyEnv() {
Expand Down
15 changes: 10 additions & 5 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
namespace logtail {
extern const int32_t kDefaultMaxSendBytePerSec;

extern const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION;
extern const int32_t MIN_SEND_REQUEST_CONCURRENCY;
extern const int32_t MAX_SEND_REQUEST_CONCURRENCY;
extern const uint32_t CONCURRENCY_STATISTIC_THRESHOLD;
extern const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS;
extern const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE;
extern const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE;

void CreateAgentDir();

std::string GetAgentLogDir();
Expand Down Expand Up @@ -209,10 +217,7 @@ class AppConfig {

std::string mBindInterface;

// 全局并发度对单地域占比保留的余量
const double mGlobalConcurrencyFreePercentageForOneRegion = 0.5;
const int32_t mMinSendRequestConcurrency = 15;
const int32_t mMaxSendRequestConcurrency = 80;


// /**
// * @brief Load ConfigServer, DataServer and network interface
Expand Down Expand Up @@ -446,7 +451,7 @@ class AppConfig {
// 全局并发度
int32_t GetSendRequestGlobalConcurrency() const { return mSendRequestGlobalConcurrency; }

double GetGlobalConcurrencyFreePercentageForOneRegion() const { return mGlobalConcurrencyFreePercentageForOneRegion; }
double GetGlobalConcurrencyFreePercentageForOneRegion() const { return GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION; }

int32_t GetProcessThreadCount() const { return mProcessThreadCount; }

Expand Down
9 changes: 4 additions & 5 deletions core/pipeline/limiter/ConcurrencyLimiter.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@
using namespace std;

namespace logtail {

#ifdef APSARA_UNIT_TEST_MAIN
uint32_t ConcurrencyLimiter::GetCurrentLimit() const {
lock_guard<mutex> lock(mLimiterMux);
Expand All @@ -40,7 +39,7 @@ uint32_t ConcurrencyLimiter::GetInSendingCount() const {
}

uint32_t ConcurrencyLimiter::GetStatisticThreshold() const {
return mStatisticThreshold;
return CONCURRENCY_STATISTIC_THRESHOLD;
}

#endif
Expand Down Expand Up @@ -111,7 +110,7 @@ void ConcurrencyLimiter::AdjustConcurrency(bool success, std::chrono::system_clo
if (mLastStatisticsTime == std::chrono::system_clock::time_point()) {
mLastStatisticsTime = currentTime;
}
if (mStatisticsTotal == mStatisticThreshold || chrono::duration_cast<chrono::seconds>(currentTime - mLastStatisticsTime).count() > mStatisticIntervalThresholdSeconds) {
if (mStatisticsTotal == CONCURRENCY_STATISTIC_THRESHOLD || chrono::duration_cast<chrono::seconds>(currentTime - mLastStatisticsTime).count() > CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS) {
failPercentage = mStatisticsFailTotal*100/mStatisticsTotal;
LOG_DEBUG(sLogger,("AdjustConcurrency", mDescription)("mStatisticsFailTotal", mStatisticsFailTotal)("mStatisticsTotal", mStatisticsTotal));
mStatisticsTotal = 0;
Expand All @@ -124,9 +123,9 @@ void ConcurrencyLimiter::AdjustConcurrency(bool success, std::chrono::system_clo
if (failPercentage == 0) {
// 成功
Increase();
} else if (failPercentage <= mNoFallBackFailPercentage) {
} else if (failPercentage <= NO_FALL_BACK_FAIL_PERCENTAGE) {
// 不调整
} else if (failPercentage <= mSlowFallBackFailPercentage) {
} else if (failPercentage <= SLOW_FALL_BACK_FAIL_PERCENTAGE) {
// 慢回退
Decrease(mConcurrencySlowFallBackRatio);
} else {
Expand Down
10 changes: 1 addition & 9 deletions core/pipeline/limiter/ConcurrencyLimiter.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,10 @@
#include <mutex>
#include <string>

#include "app_config/AppConfig.h"
#include "monitor/metric_constants/MetricConstants.h"

namespace logtail {

class ConcurrencyLimiter {
public:
ConcurrencyLimiter(const std::string& description,
Expand Down Expand Up @@ -90,14 +90,6 @@ class ConcurrencyLimiter {
uint32_t mStatisticsTotal = 0;
uint32_t mStatisticsFailTotal = 0;

// 统计10个数据,最多等3s
const uint32_t mStatisticThreshold = 10;
const uint32_t mStatisticIntervalThresholdSeconds = 3;

const uint32_t mNoFallBackFailPercentage = 10;
const uint32_t mSlowFallBackFailPercentage = 40;


void Increase();
void Decrease(double fallBackRatio);
void AdjustConcurrency(bool success, std::chrono::system_clock::time_point currentTime);
Expand Down
3 changes: 0 additions & 3 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -637,9 +637,6 @@ bool FlusherSLS::BuildRequest(SenderQueueItem* item, unique_ptr<HttpSinkRequest>
item,
data->mShardHashKey);
}
if (req) {
req->mMaxTryCnt = 1;
}
if (!req) {
*keepItem = true;
return false;
Expand Down
7 changes: 5 additions & 2 deletions core/runner/sink/http/HttpSinkRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -32,8 +32,11 @@ struct HttpSinkRequest : public AsynHttpRequest {
const std::string& query,
const std::map<std::string, std::string>& header,
const std::string& body,
SenderQueueItem* item)
: AsynHttpRequest(method, httpsFlag, host, port, url, query, header, body), mItem(item) {}
SenderQueueItem* item,
uint32_t timeout = static_cast<uint32_t>(INT32_FLAG(default_http_request_timeout_secs)),
uint32_t maxTryCnt = static_cast<uint32_t>(INT32_FLAG(default_http_request_max_try_cnt))
)
: AsynHttpRequest(method, httpsFlag, host, port, url, query, header, body, HttpResponse(), timeout, maxTryCnt), mItem(item) {}

bool IsContextValid() const override { return true; }
void OnSendDone(HttpResponse& response) override {}
Expand Down
4 changes: 2 additions & 2 deletions core/sdk/Client.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -266,7 +266,7 @@ namespace sdk {
SetCommonHeader(httpHeader, (int32_t)(body.length()), "");
string signature = GetUrlSignature(HTTP_POST, operation, httpHeader, parameterList, body, accessKeySecret);
httpHeader[AUTHORIZATION] = LOG_HEADSIGNATURE_PREFIX + accessKeyId + ':' + signature;
return make_unique<HttpSinkRequest>(HTTP_POST, mUsingHTTPS, host, mPort, operation, "", httpHeader, body, item);
return make_unique<HttpSinkRequest>(HTTP_POST, mUsingHTTPS, host, mPort, operation, "", httpHeader, body, item, INT32_FLAG(default_http_request_timeout_secs), 1);
}

unique_ptr<HttpSinkRequest>
Expand Down Expand Up @@ -312,7 +312,7 @@ namespace sdk {
GetQueryString(parameterList, queryString);

return make_unique<HttpSinkRequest>(
HTTP_POST, mUsingHTTPS, host, mPort, operation, queryString, httpHeader, body, item);
HTTP_POST, mUsingHTTPS, host, mPort, operation, queryString, httpHeader, body, item, INT32_FLAG(default_http_request_timeout_secs), 1);
}

PostLogStoreLogsResponse
Expand Down

0 comments on commit fe0d205

Please sign in to comment.