Skip to content

Commit

Permalink
[backport]: fix GetLastLine core (#1996)
Browse files Browse the repository at this point in the history
  • Loading branch information
quzard authored Dec 31, 2024
1 parent 48d786a commit 839de1a
Show file tree
Hide file tree
Showing 4 changed files with 1,025 additions and 88 deletions.
154 changes: 113 additions & 41 deletions core/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
#include "monitor/LogFileProfiler.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"
#include "monitor/MetricConstants.h"
#include "processor/inner/ProcessorParseContainerLogNative.h"
#include "rapidjson/document.h"
#include "reader/JsonLogFileReader.h"
Expand Down Expand Up @@ -2022,7 +2021,8 @@ LogFileReader::FileCompareResult LogFileReader::CompareToFile(const string& file
3. continue\nend\ncontinue\nend\n -> continue\nxxx\nend
5. mLogEndRegPtr != NULL
1. xxx\nend\n -> xxx\nend
1. xxx\nend\nxxx\n -> xxx\nend
2. xxx\nend\nxxx\n -> xxx\nend
3. xxx\nend -> ""
*/
/*
return: the number of bytes left, including \n
Expand All @@ -2041,6 +2041,7 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
rollbackLineFeedCount = 0;
if (mReaderConfig.first->mInputType == FileReaderOptions::InputType::InputContainerStdio) {
// Multiline rollback
bool foundEnd = false;
if (mMultilineConfig.first->IsMultiline()) {
std::string exception;
while (endPs >= 0) {
Expand All @@ -2051,6 +2052,8 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
content.data.size(),
*mMultilineConfig.first->GetEndPatternReg(),
exception)) {
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
foundEnd = true;
// Ensure the end line is complete
if (buffer[content.lineEnd] == '\n') {
return content.lineEnd + 1;
Expand All @@ -2062,14 +2065,19 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
*mMultilineConfig.first->GetStartPatternReg(),
exception)) {
// start + continue, start
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
// Keep all the buffer if rollback all
return content.lineBegin;
}
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
endPs = content.lineBegin - 1;
}
}
if (mMultilineConfig.first->GetEndPatternReg() && foundEnd) {
return 0;
}
// Single line rollback or all unmatch rollback
rollbackLineFeedCount = 0;
if (buffer[size - 1] == '\n') {
Expand All @@ -2079,13 +2087,16 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
}
LineInfo content = NewGetLastLine(StringView(buffer, size), endPs, true);
// 最后一行是完整行,且以 \n 结尾
if (content.fullLine && buffer[endPs] == '\n') {
return size;
if (content.fullLine && buffer[content.lineEnd] == '\n') {
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
return content.lineEnd + 1;
}
content = NewGetLastLine(StringView(buffer, size), endPs, false);
rollbackLineFeedCount = content.rollbackLineFeedCount;
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
return content.lineBegin;
} else {
bool foundEnd = false;
// Multiline rollback
if (mMultilineConfig.first->IsMultiline()) {
std::string exception;
Expand All @@ -2095,6 +2106,7 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
// start + end, continue + end, end
if (BoostRegexSearch(
content.data(), content.size(), *mMultilineConfig.first->GetEndPatternReg(), exception)) {
foundEnd = true;
// Ensure the end line is complete
if (buffer[endPs] == '\n') {
return endPs + 1;
Expand All @@ -2114,6 +2126,9 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
endPs = content.data() - buffer - 1;
}
}
if (mMultilineConfig.first->GetEndPatternReg() && foundEnd) {
return 0;
}
// Single line rollback or all unmatch rollback
rollbackLineFeedCount = 0;
if (buffer[size - 1] == '\n') {
Expand Down Expand Up @@ -2281,26 +2296,38 @@ LineInfo RawTextParser::NewGetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}
if (protocolFunctionIndex != 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}

for (int32_t begin = end; begin > 0; --begin) {
if (begin == 0 || buffer[begin - 1] == '\n') {
if (buffer[begin - 1] == '\n') {
return {.data = StringView(buffer.data() + begin, end - begin),
.lineBegin = begin,
.lineEnd = end,
.rollbackLineFeedCount = 1,
.fullLine = true};
.fullLine = true,
.forceRollbackLineFeedCount = 0};
}
}
return {.data = StringView(buffer.data(), end),
.lineBegin = 0,
.lineEnd = end,
.rollbackLineFeedCount = 1,
.fullLine = true};
.fullLine = true,
.forceRollbackLineFeedCount = 0};
}

LineInfo DockerJsonFileParser::NewGetLastLine(StringView buffer,
Expand All @@ -2309,36 +2336,55 @@ LineInfo DockerJsonFileParser::NewGetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}
if (protocolFunctionIndex == 0) {
// 异常情况, DockerJsonFileParse不允许在最后一个解析器
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}

size_t nextProtocolFunctionIndex = protocolFunctionIndex - 1;
LineInfo finalLine;
while (!finalLine.fullLine) {
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->NewGetLastLine(
buffer, end, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

LineInfo line;
parseLine(rawLine, line);
finalLine.data = line.data;
finalLine.fullLine = line.fullLine;
finalLine.lineBegin = line.lineBegin;
finalLine.rollbackLineFeedCount += line.rollbackLineFeedCount;
finalLine.dataRaw = line.dataRaw;
if (finalLine.lineEnd == 0) {
finalLine.lineEnd = line.lineEnd;
int32_t rollbackLineFeedCount = 0;
int32_t forceRollbackLineFeedCount = 0;
if (line.fullLine) {
rollbackLineFeedCount = line.rollbackLineFeedCount;
forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount;
} else {
forceRollbackLineFeedCount
= finalLine.forceRollbackLineFeedCount + line.forceRollbackLineFeedCount + line.rollbackLineFeedCount;
rollbackLineFeedCount = 0;
}
finalLine = std::move(line);
finalLine.rollbackLineFeedCount = rollbackLineFeedCount;
finalLine.forceRollbackLineFeedCount = forceRollbackLineFeedCount;
if (!finalLine.fullLine) {
if (finalLine.lineBegin == 0) {
finalLine.data = StringView();
return finalLine;
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = finalLine.rollbackLineFeedCount,
.fullLine = false,
.forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount};
}
end = finalLine.lineBegin - 1;
}
Expand All @@ -2349,7 +2395,9 @@ LineInfo DockerJsonFileParser::NewGetLastLine(StringView buffer,
bool DockerJsonFileParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
paseLine = rawLine;
paseLine.fullLine = false;

if (rawLine.data.size() == 0) {
return false;
}
rapidjson::Document doc;
doc.Parse(rawLine.data.data(), rawLine.data.size());

Expand Down Expand Up @@ -2387,39 +2435,58 @@ LineInfo ContainerdTextParser::NewGetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}
if (protocolFunctionIndex == 0) {
// 异常情况, DockerJsonFileParse不允许在最后一个解析器
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
// 异常情况, ContainerdTextParser不允许在最后一个解析器
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}
LineInfo finalLine;
finalLine.fullLine = false;
// 跳过最后的连续P
size_t nextProtocolFunctionIndex = protocolFunctionIndex - 1;

// 跳过最后的连续P
while (!finalLine.fullLine) {
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->NewGetLastLine(
buffer, end, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

LineInfo line;
parseLine(rawLine, line);
// containerd 不需要外层协议的 dataRaw
finalLine.data = line.data;
finalLine.fullLine = line.fullLine;
finalLine.lineBegin = line.lineBegin;
finalLine.rollbackLineFeedCount += line.rollbackLineFeedCount;
mergeLines(finalLine, finalLine, true);
if (finalLine.lineEnd == 0) {
finalLine.lineEnd = line.lineEnd;
int32_t rollbackLineFeedCount = 0;
int32_t forceRollbackLineFeedCount = 0;
if (line.fullLine) {
rollbackLineFeedCount = line.rollbackLineFeedCount;
forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount;
} else {
forceRollbackLineFeedCount
= finalLine.forceRollbackLineFeedCount + line.forceRollbackLineFeedCount + line.rollbackLineFeedCount;
rollbackLineFeedCount = 0;
}
finalLine = std::move(line);
finalLine.rollbackLineFeedCount = rollbackLineFeedCount;
finalLine.forceRollbackLineFeedCount = forceRollbackLineFeedCount;
mergeLines(finalLine, finalLine, true);
if (!finalLine.fullLine) {
if (finalLine.lineBegin == 0) {
finalLine.data = StringView();
return finalLine;
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = finalLine.rollbackLineFeedCount,
.fullLine = false,
.forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount};
}
end = finalLine.lineBegin - 1;
}
Expand All @@ -2442,7 +2509,7 @@ LineInfo ContainerdTextParser::NewGetLastLine(StringView buffer,
LineInfo previousLine;
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->NewGetLastLine(
buffer, finalLine.lineBegin - 1, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

Expand Down Expand Up @@ -2479,9 +2546,10 @@ void ContainerdTextParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
const char* lineEnd = rawLine.data.data() + rawLine.data.size();
paseLine = rawLine;
paseLine.fullLine = true;

if (rawLine.data.size() == 0) {
return;
}
// 寻找第一个分隔符位置 time
StringView timeValue;
const char* pch1 = std::find(rawLine.data.data(), lineEnd, ProcessorParseContainerLogNative::CONTAINERD_DELIMITER);
if (pch1 == lineEnd) {
return;
Expand All @@ -2497,6 +2565,10 @@ void ContainerdTextParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
return;
}
// 如果既不以 P 开头,也不以 F 开头
if (pch2 + 1 >= lineEnd) {
paseLine.data = StringView(pch2 + 1, lineEnd - pch2 - 1);
return;
}
if (*(pch2 + 1) != ProcessorParseContainerLogNative::CONTAINERD_PART_TAG
&& *(pch2 + 1) != ProcessorParseContainerLogNative::CONTAINERD_FULL_TAG) {
paseLine.data = StringView(pch2 + 1, lineEnd - pch2 - 1);
Expand Down
8 changes: 5 additions & 3 deletions core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "event/Event.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/FileServer.h"
#include "file_server/FileServer.h"
#include "file_server/MultilineOptions.h"
#include "log_pb/sls_logs.pb.h"
#include "logger/Logger.h"
Expand All @@ -57,16 +56,19 @@ struct LineInfo {
int32_t lineEnd;
int32_t rollbackLineFeedCount;
bool fullLine;
int32_t forceRollbackLineFeedCount;
LineInfo(StringView data = StringView(),
int32_t lineBegin = 0,
int32_t lineEnd = 0,
int32_t rollbackLineFeedCount = 0,
bool fullLine = false)
bool fullLine = false,
int32_t forceRollbackLineFeedCount = 0)
: data(data),
lineBegin(lineBegin),
lineEnd(lineEnd),
rollbackLineFeedCount(rollbackLineFeedCount),
fullLine(fullLine) {}
fullLine(fullLine),
forceRollbackLineFeedCount(forceRollbackLineFeedCount) {}
};

class BaseLineParse {
Expand Down
Loading

0 comments on commit 839de1a

Please sign in to comment.