Skip to content

Commit

Permalink
Improve sender log condition to record valuable debug info (alibaba#1359
Browse files Browse the repository at this point in the history
)

* Improve sender log condition to record valueble debug info
  • Loading branch information
yyuuttaaoo authored Mar 26, 2024
1 parent c404132 commit 5536f76
Show file tree
Hide file tree
Showing 5 changed files with 67 additions and 48 deletions.
4 changes: 2 additions & 2 deletions core/common/LogFileCollectOffsetIndicator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -111,11 +111,11 @@ void LogFileCollectOffsetIndicator::RecordFileOffset(LoggroupTimeValue* data) {
devInode,
data->mLogGroupContext.mFuseMode,
fd,
data->mLastUpdateTime);
data->mEnqueueTime);
iter = mLogFileOffsetInfoMap.insert(std::make_pair(logFileInfo, logFileOffsetInfo)).first;
}
LogFileOffsetInfo* logFileOffsetInfo = iter->second;
logFileOffsetInfo->mLastUpdateTime = data->mLastUpdateTime;
logFileOffsetInfo->mLastUpdateTime = data->mEnqueueTime;

LogFileOffsetInfoNode node(seqNum,
fileInfoPtr->offset,
Expand Down
3 changes: 0 additions & 3 deletions core/common/LogstoreSenderQueue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,8 +142,6 @@ bool LogstoreSenderInfo::RecordSendResult(SendResult rst, LogstoreSenderStatisti
if (++mLastNetworkErrorCount >= INT32_FLAG(max_client_send_error_count)) {
mLastNetworkErrorCount = INT32_FLAG(max_client_send_error_count);
mNetworkValidFlag = false;
LOG_WARNING(sLogger,
("Network fail, disable ", this->mRegion)("retry interval", mNetworkRetryInterval));
}
break;
case LogstoreSenderInfo::SendResult_QuotaFail:
Expand All @@ -155,7 +153,6 @@ bool LogstoreSenderInfo::RecordSendResult(SendResult rst, LogstoreSenderStatisti
if (++mLastQuotaExceedCount >= INT32_FLAG(max_client_quota_exceed_count)) {
mLastQuotaExceedCount = INT32_FLAG(max_client_quota_exceed_count);
mQuotaValidFlag = false;
LOG_WARNING(sLogger, ("QuotaF fail, disable ", this->mRegion)("retry interval", mQuotaRetryInterval));
}
break;
default:
Expand Down
28 changes: 20 additions & 8 deletions core/common/LogstoreSenderQueue.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ struct LogstoreSenderStatistics {
};

struct LoggroupTimeValue {
int32_t mLastUpdateTime;
int32_t mEnqueueTime;
SEND_DATA_TYPE mDataType;
std::string mLogData;
int32_t mRawSize;
Expand All @@ -63,6 +63,7 @@ struct LoggroupTimeValue {

int32_t mSendRetryTimes;
int32_t mLastSendTime;
int32_t mLastLogWarningTime;
std::string mAliuid;
std::string mRegion;
std::string mShardHashKey;
Expand Down Expand Up @@ -99,9 +100,10 @@ struct LoggroupTimeValue {
mDataType = dataType;
mLogLines = lines;
mRawSize = rawSize;
mLastUpdateTime = lastUpdateTime;
mEnqueueTime = lastUpdateTime;
mSendRetryTimes = 0;
mLastSendTime = 0;
mLastLogWarningTime = 0;
mLogData.clear();
mShardHashKey = shardHashKey;
mStatus = LoggroupSendStatus_Idle;
Expand Down Expand Up @@ -400,11 +402,11 @@ class SingleLogstoreSenderManager : public SingleLogstoreFeedbackQueue<LoggroupT
continue;
}

if (item->mLastUpdateTime < minSendTime) {
minSendTime = item->mLastUpdateTime;
if (item->mEnqueueTime < minSendTime) {
minSendTime = item->mEnqueueTime;
}
if (item->mLastUpdateTime > maxSendTime) {
maxSendTime = item->mLastUpdateTime;
if (item->mEnqueueTime > maxSendTime) {
maxSendTime = item->mEnqueueTime;
}
++statisticsItem.mSendQueueSize;
}
Expand All @@ -417,15 +419,25 @@ class SingleLogstoreSenderManager : public SingleLogstoreFeedbackQueue<LoggroupT

int32_t OnSendDone(LoggroupTimeValue* item, LogstoreSenderInfo::SendResult sendRst, bool& needTrigger) {
needTrigger = mSenderInfo.RecordSendResult(sendRst, mSenderStatistics);
if (!mSenderInfo.mNetworkValidFlag) {
LOG_WARNING(sLogger,
("Network fail, pause logstore", item->mLogstore)("project", item->mProjectName)(
"region", mSenderInfo.mRegion)("retry interval", mSenderInfo.mNetworkRetryInterval));
}
if (!mSenderInfo.mQuotaValidFlag) {
LOG_WARNING(sLogger,
("Quota fail, pause logstore", item->mLogstore)("project", item->mProjectName)(
"region", mSenderInfo.mRegion)("retry interval", mSenderInfo.mQuotaRetryInterval));
}
// if send error, reset status to idle, and wait to send again
// network fail or quota fail
if (sendRst != LogstoreSenderInfo::SendResult_OK && sendRst != LogstoreSenderInfo::SendResult_Buffered
&& sendRst != LogstoreSenderInfo::SendResult_DiscardFail) {
item->mStatus = LoggroupSendStatus_Idle;
return 0;
}
if (mSenderStatistics.mMaxSendSuccessTime < item->mLastUpdateTime) {
mSenderStatistics.mMaxSendSuccessTime = item->mLastUpdateTime;
if (mSenderStatistics.mMaxSendSuccessTime < item->mEnqueueTime) {
mSenderStatistics.mMaxSendSuccessTime = item->mEnqueueTime;
}
// else remove item except buffered
return RemoveItem(item, sendRst != LogstoreSenderInfo::SendResult_Buffered);
Expand Down
3 changes: 2 additions & 1 deletion core/monitor/LogIntegrity.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -273,7 +273,8 @@ void LogIntegrity::Notify(LoggroupTimeValue* data, bool flag) {
PTScopedLock lock(mLogIntegrityMapLock);
LogIntegrityInfo* info = NULL;
if (FindLogIntegrityInfo(region, projectName, logstore, filename, info)) {
info->mLastUpdateTime = data->mLastUpdateTime;
info->mLastUpdateTime = data->mEnqueueTime;

info->SetStatus(data->mLogGroupContext.mSeqNum,
data->mLogLines,
flag ? LogTimeInfo::LogIntegrityStatus_SendOK : LogTimeInfo::LogIntegrityStatus_SendFail);
Expand Down
77 changes: 43 additions & 34 deletions core/sender/Sender.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -95,10 +95,9 @@ DEFINE_FLAG_INT32(reset_region_concurrency_error_count,
5);
DEFINE_FLAG_INT32(unknow_error_try_max, "discard data when try times > this value", 5);
DEFINE_FLAG_INT32(test_unavailable_endpoint_interval, "test unavailable endpoint interval", 60);
DEFINE_FLAG_INT32(sending_cost_time_alarm_interval, "sending log group cost too much time, second", 3);
DEFINE_FLAG_INT32(log_group_wait_in_queue_alarm_interval,
"log group wait in queue alarm interval, may blocked by concurrency or quota, second",
6);
static const int SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND = 3;
static const int LOG_GROUP_WAIT_IN_QUEUE_ALARM_INTERVAL_SECOND = 6;
static const int ON_FAIL_LOG_WARNING_INTERVAL_SECOND = 10;
DEFINE_FLAG_STRING(data_endpoint_policy,
"policy for switching between data server endpoints, possible options include "
"'designated_first'(default) and 'designated_locked'",
Expand Down Expand Up @@ -127,7 +126,7 @@ void SendClosure::OnSuccess(sdk::Response* response) {
("SendSucess", "OK")("RequestId", response->requestId)("StatusCode", response->statusCode)(
"ResponseTime", curTime - mDataPtr->mLastSendTime)("Region", mDataPtr->mRegion)(
"Project", mDataPtr->mProjectName)("Logstore", mDataPtr->mLogstore)("Config", mDataPtr->mConfigName)(
"RetryTimes", mDataPtr->mSendRetryTimes)("TotalSendCost", curTime - mDataPtr->mLastUpdateTime)(
"RetryTimes", mDataPtr->mSendRetryTimes)("TotalSendCost", curTime - mDataPtr->mEnqueueTime)(
"LogLines", mDataPtr->mLogLines)("Bytes", mDataPtr->mLogData.size())(
"Endpoint", mDataPtr->mCurrentEndpoint)("IsProfileData", isProfileData));
}
Expand Down Expand Up @@ -173,6 +172,19 @@ static const char* GetOperationString(OperationOnFail op) {
}
}

/*
* @brief OnFail callback if send failed
* There are 3 possible outcomes:
* 1. RETRY_ASYNC_WHEN_FAIL: Resend the item immediately. It will be kept in the sender queue with its mStatus Sending.
* All RETRY_ASYNC_WHEN_FAIL must fall to RECORD_ERROR_WHEN_FAIL after several retries.
* 2. RECORD_ERROR_WHEN_FAIL: Resend the item later. It will be kept in the sender queue with its mStatus reseting to
* Idle. The item will be fetched on the next round when its sender queue is visited. resend later
* 3. DISCARD_WHEN_FAIL: Won't resend the item and delete it in the sender queue.
* @param response response from server (maybe empty)
* @param errorCode defined in sdk/Common.cpp
* @param errorMessage error message from server
*
*/
void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const string& errorMessage) {
// test
LOG_DEBUG(sLogger, ("send failed, error code", errorCode)("error msg", errorMessage));
Expand Down Expand Up @@ -231,20 +243,19 @@ void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const
BOOL_FLAG(global_network_success) = true;
if (errorCode == sdk::LOGE_SHARD_WRITE_QUOTA_EXCEED) {
failDetail << "shard write quota exceed";
suggestion << "split logstore shards. https://help.aliyun.com/document_detail/48998.html";
suggestion << "Split logstore shards. https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
} else {
failDetail << "project write quota exceed";
suggestion << "create ticket or raise issue in support chat group";
suggestion << "Submit quota modification request. "
"https://help.aliyun.com/zh/sls/user-guide/expansion-of-resources";
}
Sender::Instance()->IncTotalSendStatistic(mDataPtr->mProjectName, mDataPtr->mLogstore, curTime);
if (curTime - mDataPtr->mLastUpdateTime > INT32_FLAG(sending_cost_time_alarm_interval)) {
LogtailAlarm::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM,
"error_code: " + errorCode + ", error_message: " + errorMessage
+ ", request_id:" + response->requestId,
mDataPtr->mProjectName,
mDataPtr->mLogstore,
mDataPtr->mRegion);
}
LogtailAlarm::GetInstance()->SendAlarm(SEND_QUOTA_EXCEED_ALARM,
"error_code: " + errorCode + ", error_message: " + errorMessage
+ ", request_id:" + response->requestId,
mDataPtr->mProjectName,
mDataPtr->mLogstore,
mDataPtr->mRegion);
operation = RECORD_ERROR_WHEN_FAIL;
recordRst = LogstoreSenderInfo::SendResult_QuotaFail;
} else if (sendResult == SEND_UNAUTHORIZED) {
Expand All @@ -266,7 +277,7 @@ void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const
mDataPtr->mAliuid, false, sendClient, lastUpdateTime))
operation = RETRY_ASYNC_WHEN_FAIL;
else if (curTime - lastUpdateTime < INT32_FLAG(unauthorized_allowed_delay_after_reset))
operation = RETRY_ASYNC_WHEN_FAIL;
operation = RECORD_ERROR_WHEN_FAIL;
else
operation = DISCARD_WHEN_FAIL;
#ifdef __ENTERPRISE__
Expand Down Expand Up @@ -343,7 +354,7 @@ void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const
// when retry times > unknow_error_try_max, we will drop this data
operation = DefaultOperation();
}
if (curTime - mDataPtr->mLastUpdateTime > INT32_FLAG(discard_send_fail_interval)) {
if (curTime - mDataPtr->mEnqueueTime > INT32_FLAG(discard_send_fail_interval)) {
operation = DISCARD_WHEN_FAIL;
}
bool isProfileData = Sender::IsProfileData(mDataPtr->mRegion, mDataPtr->mProjectName, mDataPtr->mLogstore);
Expand All @@ -356,25 +367,24 @@ void SendClosure::OnFail(sdk::Response* response, const string& errorCode, const
"RequestId", response->requestId)("StatusCode", response->statusCode)("ErrorCode", errorCode)( \
"ErrorMessage", errorMessage)("ResponseTime", curTime - mDataPtr->mLastSendTime)("Region", mDataPtr->mRegion)( \
"Project", mDataPtr->mProjectName)("Logstore", mDataPtr->mLogstore)("Config", mDataPtr->mConfigName)( \
"RetryTimes", mDataPtr->mSendRetryTimes)("TotalSendCost", curTime - mDataPtr->mLastUpdateTime)( \
"LogLines", mDataPtr->mLogLines)("Bytes", mDataPtr->mLogData.size())("Endpoint", mDataPtr->mCurrentEndpoint)( \
"IsProfileData", isProfileData)
"RetryTimes", mDataPtr->mSendRetryTimes)("TotalSendCost", \
curTime - mDataPtr->mEnqueueTime)("LogLines", mDataPtr->mLogLines)( \
"Bytes", mDataPtr->mLogData.size())("Endpoint", mDataPtr->mCurrentEndpoint)("IsProfileData", isProfileData)

// Log warning if retry for too long or will discard data
switch (operation) {
case RETRY_ASYNC_WHEN_FAIL:
if (curTime - mDataPtr->mLastUpdateTime > INT32_FLAG(sending_cost_time_alarm_interval)
|| errorCode == sdk::LOGE_REQUEST_TIMEOUT) {
if (errorCode == sdk::LOGE_REQUEST_TIMEOUT) {
// retry on network timeout should be recorded, because this may lead to data duplication
LOG_WARNING(sLogger, LOG_PATTERN);
}
Sender::Instance()->SendToNetAsync(mDataPtr);
break;
case RECORD_ERROR_WHEN_FAIL:
if (curTime - mDataPtr->mLastUpdateTime > INT32_FLAG(sending_cost_time_alarm_interval)
|| errorCode == sdk::LOGE_REQUEST_TIMEOUT) {
// retry on network timeout should be recorded, because this may lead to data duplication
if (errorCode == sdk::LOGE_REQUEST_TIMEOUT
|| curTime - mDataPtr->mLastLogWarningTime > ON_FAIL_LOG_WARNING_INTERVAL_SECOND) {
LOG_WARNING(sLogger, LOG_PATTERN);
mDataPtr->mLastLogWarningTime = curTime;
}
// Sender::Instance()->PutIntoSecondaryBuffer(mDataPtr, 10);
Sender::Instance()->SubSendingBufferCount();
Expand Down Expand Up @@ -631,7 +641,7 @@ bool Sender::WriteToFile(LoggroupTimeValue* value, bool sendPerformance) {
outfile << value->mProjectName << "\t" << value->mLogstore << "\t" << time(NULL) << "\t" << value->mRawSize
<< "\t" << value->mLogLines << endl;
else
outfile << value->mProjectName << "\t" << value->mLogstore << "\t" << value->mLastUpdateTime << "\t"
outfile << value->mProjectName << "\t" << value->mLogstore << "\t" << value->mEnqueueTime << "\t"
<< value->mRawSize << "\t" << value->mLogLines << endl;
outfile.close();
return true;
Expand Down Expand Up @@ -1487,9 +1497,9 @@ void Sender::DaemonSender() {

for (vector<LoggroupTimeValue*>::iterator itr = logGroupToSend.begin(); itr != logGroupToSend.end(); ++itr) {
LoggroupTimeValue* data = *itr;
int32_t logGroupWaitTime = curTime - data->mLastUpdateTime;
int32_t logGroupWaitTime = curTime - data->mEnqueueTime;

if (logGroupWaitTime > INT32_FLAG(log_group_wait_in_queue_alarm_interval)) {
if (logGroupWaitTime > LOG_GROUP_WAIT_IN_QUEUE_ALARM_INTERVAL_SECOND) {
LOG_WARNING(sLogger,
("log group wait in queue for too long, may blocked by concurrency or quota, region",
data->mRegion)("project", data->mProjectName)("logstore", data->mLogstore)(
Expand Down Expand Up @@ -1520,17 +1530,17 @@ void Sender::DaemonSender() {
usleep(10 * 1000);
}
int32_t afterSleepTime = time(NULL);
int32_t sendCostTime = afterSleepTime - beforeSleepTime;
if (sendCostTime > INT32_FLAG(sending_cost_time_alarm_interval)) {
int32_t blockCostTime = afterSleepTime - beforeSleepTime;
if (blockCostTime > SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND) {
LOG_WARNING(sLogger,
("sending log group blocked too long because send concurrency reached limit. current "
"concurrency used",
GetSendingBufferCount())("max concurrency",
AppConfig::GetInstance()->GetSendRequestConcurrency())(
"blocked time", sendCostTime));
"blocked time", blockCostTime));
LogtailAlarm::GetInstance()->SendAlarm(SENDING_COSTS_TOO_MUCH_TIME_ALARM,
"sending log group costs too much time, blocked time "
+ ToString(sendCostTime),
"sending log group blocked for too much time, cost "
+ ToString(blockCostTime),
data->mProjectName,
data->mLogstore,
data->mRegion);
Expand All @@ -1540,7 +1550,6 @@ void Sender::DaemonSender() {
sendBufferBytes += data->mRawSize;
sendNetBodyBytes += data->mLogData.size();
sendLines += data->mLogLines;
data->mLastUpdateTime = time(NULL); // set last update time before sending
SendToNetAsync(data);
#ifdef __ENTERPRISE__
}
Expand Down

0 comments on commit 5536f76

Please sign in to comment.