Skip to content

Commit

Permalink
fix: Correct parsing errors in ProcessorParseApsaraNative with large …
Browse files Browse the repository at this point in the history
…buffer input (alibaba#1255)

* fix 越界问题

* add comment
  • Loading branch information
quzard authored Dec 7, 2023
1 parent e740f55 commit ff0d88d
Show file tree
Hide file tree
Showing 9 changed files with 1,517 additions and 312 deletions.
2 changes: 2 additions & 0 deletions core/common/JsonUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ std::string CompactJson(const std::string& inJson) {
case '\\':
if (++ch == inJson.end()) { // skip next char after escape char
--ch;
} else {
outJson << '\\';
}
break;
default:
Expand Down
12 changes: 6 additions & 6 deletions core/config/Config.h
Original file line number Diff line number Diff line change
Expand Up @@ -139,26 +139,26 @@ class Config {
std::shared_ptr<LogFilterRule> mFilterRule;
bool mLocalStorage;
int mVersion;
bool mDiscardNoneUtf8;
bool mDiscardNoneUtf8 = false;
std::string mAliuid;
std::string mRegion;
std::string mStreamLogTag;
bool mDiscardUnmatch;
bool mDiscardUnmatch = false;
std::vector<std::string> mColumnKeys;
std::string mSeparator;
char mQuote;
// for delimiter log, accept logs without enough keys or not
// eg, keys -> [a, b, c], raw log "xx|yy", log -> [a->xx, b->yy]
bool mAcceptNoEnoughKeys;
bool mAutoExtend;
bool mAcceptNoEnoughKeys = false;
bool mAutoExtend = true;
std::string mTimeKey;
std::vector<std::string> mShardHashKey;
bool mTailExisted;
std::unordered_map<std::string, std::vector<SensitiveWordCastOption>> mSensitiveWordCastOptions;
bool mUploadRawLog; // true to update raw log to sls
bool mUploadRawLog = false; // true to update raw log to sls
bool mSimpleLogFlag;
bool mTimeZoneAdjust;
int mLogTimeZoneOffsetSecond;
int mLogTimeZoneOffsetSecond = 0;
int32_t mCreateTime; // create time of this config
int32_t
mMaxSendBytesPerSecond; // limit for logstore, not just this config. so if we have multi configs with different
Expand Down
113 changes: 96 additions & 17 deletions core/processor/ProcessorParseApsaraNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,14 @@ void ProcessorParseApsaraNative::Process(PipelineEventGroup& logGroup) {
return;
}

/*
* 处理单个日志事件。
* @param logPath - 日志文件的路径。
* @param e - 指向待处理日志事件的智能指针。
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param timeStrCache - 缓存时间字符串,用于比较和更新。
* @return 如果事件被处理且保留,则返回true,如果事件被丢弃,则返回false。
*/
bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, PipelineEventPtr& e, LogtailTime& lastLogTime, StringView& timeStrCache) {
if (!IsSupportedEvent(e)) {
return true;
Expand All @@ -79,6 +87,9 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
return true;
}
StringView buffer = sourceEvent.GetContent(mSourceKey);
if (buffer.size() == 0) {
return true;
}
mProcParseInSizeBytes->Add(buffer.size());
int64_t logTime_in_micro = 0;
time_t logTime = ApsaraEasyReadLogTimeParser(buffer, timeStrCache, lastLogTime, logTime_in_micro);
Expand Down Expand Up @@ -148,13 +159,14 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
index = ParseApsaraBaseFields(buffer, sourceEvent);
bool sourceKeyOverwritten = mSourceKeyOverwritten;
bool rawLogTagOverwritten = false;
if (buffer.data()[index] != 0) {
do {
++index;
if (buffer.data()[index] == '\t' || buffer.data()[index] == '\0') {
int32_t length = buffer.size();
if (index < length) {
for (index = index + 1; index <= length; ++index) {
if (index == length || buffer.data()[index] == '\t') {
if (colon_index >= 0) {
StringView key(buffer.data() + beg_index, colon_index - beg_index);
AddLog(key, StringView(buffer.data() + colon_index + 1, index - colon_index - 1), sourceEvent);
StringView data(buffer.data() + colon_index + 1, index - colon_index - 1);
AddLog(key, data, sourceEvent);
if (key == mSourceKey) {
sourceKeyOverwritten = true;
}
Expand All @@ -167,7 +179,7 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
} else if (buffer.data()[index] == ':' && colon_index == -1) {
colon_index = index;
}
} while (buffer.data()[index]);
}
}
// TODO: deprecated
if (mAdjustApsaraMicroTimezone) {
Expand All @@ -189,14 +201,28 @@ bool ProcessorParseApsaraNative::ProcessEvent(const StringView& logPath, Pipelin
return true;
}

/*
* 解析Apsara格式日志的时间。
* @param buffer - 包含日志数据的字符串视图。
* @param timeStr - 解析后的时间字符串。
* @param lastLogTime - 上一条日志的时间戳(秒)。
* @param microTime - 解析出的微秒时间戳。
* @return 解析出的时间戳(秒),如果解析失败,则返回0。
*/
time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime) {
if (buffer[0] != '[') {
return 0;
}
if (buffer[1] == '1') // for normal time, e.g 1378882630, starts with '1'
{
int nanosecondLength = 0;
auto strptimeResult = Strptime(buffer.data() + 1, "%s", &lastLogTime, nanosecondLength);
size_t pos = buffer.find(']', 1);
if (pos == std::string::npos) {
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer));
return 0;
}
std::string strTime = buffer.substr(1, pos).to_string();
auto strptimeResult = Strptime(strTime.c_str(), "%s", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%s"));
Expand All @@ -207,14 +233,20 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
// test other date format case
{
if (IsPrefixString(buffer.data() + 1, timeStr) == true) {
size_t pos = buffer.find(']', 1);
if (pos == std::string::npos) {
LOG_WARNING(sLogger, ("parse apsara log time", "fail")("string", buffer));
return 0;
}
std::string strTime = buffer.substr(1, pos).to_string();
if (IsPrefixString(strTime.c_str(), timeStr) == true) {
microTime = (int64_t)lastLogTime.tv_sec * 1000000 + lastLogTime.tv_nsec / 1000;
return lastLogTime.tv_sec;
}
struct tm tm;
memset(&tm, 0, sizeof(tm));
int nanosecondLength = 0;
auto strptimeResult = Strptime(buffer.data() + 1, "%Y-%m-%d %H:%M:%S.%f", &lastLogTime, nanosecondLength);
auto strptimeResult = Strptime(strTime.c_str(), "%Y-%m-%d %H:%M:%S.%f", &lastLogTime, nanosecondLength);
if (NULL == strptimeResult || strptimeResult[0] != ']') {
LOG_WARNING(sLogger,
("parse apsara log time", "fail")("string", buffer)("timeformat", "%Y-%m-%d %H:%M:%S.%f"));
Expand All @@ -227,6 +259,12 @@ time_t ProcessorParseApsaraNative::ApsaraEasyReadLogTimeParser(StringView& buffe
}
}

/*
* 检查字符串是否包含指定的前缀。
* @param all - 完整的字符串。
* @param prefix - 要检查的前缀。
* @return 如果字符串以指定前缀开头,则返回true;否则返回false。
*/
bool ProcessorParseApsaraNative::IsPrefixString(const char* all, const StringView& prefix) {
if (prefix.size() == 0)
return false;
Expand All @@ -239,13 +277,20 @@ bool ProcessorParseApsaraNative::IsPrefixString(const char* all, const StringVie
return true;
}

static int32_t FindBaseFields(StringView& buffer, int32_t beginIndexArray[], int32_t endIndexArray[]) {
/*
* 查找Apsara格式日志的基础字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndexArray - 字段开始索引的数组。
* @param endIndexArray - 字段结束索引的数组。
* @return 解析到的基础字段数量。
*/
static int32_t FindBaseFields(const StringView& buffer, int32_t beginIndexArray[], int32_t endIndexArray[]) {
int32_t baseFieldNum = 0;
for (int32_t i = 0; buffer[i] != 0; i++) {
for (size_t i = 0; i < buffer.size(); i++) {
if (buffer[i] == '[') {
beginIndexArray[baseFieldNum] = i + 1;
} else if (buffer[i] == ']') {
if (buffer[i + 1] == '\t' || buffer[i + 1] == '\0' || buffer[i + 1] == '\n') {
if (buffer[i + 1] == '\t' || buffer[i + 1] == '\n') {
endIndexArray[baseFieldNum] = i;
baseFieldNum++;
}
Expand All @@ -260,7 +305,14 @@ static int32_t FindBaseFields(StringView& buffer, int32_t beginIndexArray[], int
return baseFieldNum;
}

static bool IsFieldLevel(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 检查是否为日志级别字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 如果字段是日志级别,则返回true;否则返回false。
*/
static bool IsFieldLevel(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] > 'Z' || buffer[i] < 'A') {
return false;
Expand All @@ -269,7 +321,14 @@ static bool IsFieldLevel(StringView& buffer, int32_t beginIndex, int32_t endInde
return true;
}

static bool IsFieldThread(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 检查是否为线程ID字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 如果字段是线程ID,则返回true;否则返回false。
*/
static bool IsFieldThread(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] > '9' || buffer[i] < '0') {
return false;
Expand All @@ -278,7 +337,14 @@ static bool IsFieldThread(StringView& buffer, int32_t beginIndex, int32_t endInd
return true;
}

static bool IsFieldFileLine(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 检查是否为文件和行号字段。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 如果字段是文件和行号,则返回true;否则返回false。
*/
static bool IsFieldFileLine(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] == '/' || buffer[i] == '.') {
return true;
Expand All @@ -287,7 +353,14 @@ static bool IsFieldFileLine(StringView& buffer, int32_t beginIndex, int32_t endI
return false;
}

static int32_t FindColonIndex(StringView& buffer, int32_t beginIndex, int32_t endIndex) {
/*
* 查找冒号字符的索引。
* @param buffer - 包含日志数据的字符串视图。
* @param beginIndex - 字段开始的索引。
* @param endIndex - 字段结束的索引。
* @return 冒号字符的索引,如果未找到,则返回endIndex。
*/
static int32_t FindColonIndex(const StringView& buffer, int32_t beginIndex, int32_t endIndex) {
for (int32_t i = beginIndex; i < endIndex; i++) {
if (buffer[i] == ':') {
return i;
Expand All @@ -296,7 +369,13 @@ static int32_t FindColonIndex(StringView& buffer, int32_t beginIndex, int32_t en
return endIndex;
}

int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(StringView& buffer, LogEvent& sourceEvent) {
/*
* 解析Apsara日志的基础字段并添加到日志事件中。
* @param buffer - 包含日志数据的字符串视图。
* @param sourceEvent - 引用到日志事件对象,用于添加解析出的字段。
* @return 返回处理完基础字段后的索引位置。
*/
int32_t ProcessorParseApsaraNative::ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent) {
int32_t beginIndexArray[LogParser::MAX_BASE_FIELD_NUM] = {0};
int32_t endIndexArray[LogParser::MAX_BASE_FIELD_NUM] = {0};
int32_t baseFieldNum = FindBaseFields(buffer, beginIndexArray, endIndexArray);
Expand Down
2 changes: 1 addition & 1 deletion core/processor/ProcessorParseApsaraNative.h
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ class ProcessorParseApsaraNative : public Processor {
time_t ApsaraEasyReadLogTimeParser(StringView& buffer, StringView& timeStr, LogtailTime& lastLogTime, int64_t& microTime);
int32_t GetApsaraLogMicroTime(StringView& buffer);
bool IsPrefixString(const char* all, const StringView& prefix);
int32_t ParseApsaraBaseFields(StringView& buffer, LogEvent& sourceEvent);
int32_t ParseApsaraBaseFields(const StringView& buffer, LogEvent& sourceEvent);

std::string mSourceKey;
std::string mRawLogTag;
Expand Down
Loading

0 comments on commit ff0d88d

Please sign in to comment.