Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[backport]: fix GetLastLine core #1996

Open
wants to merge 12 commits into
base: 2.1
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
154 changes: 113 additions & 41 deletions core/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@
#include "monitor/LogFileProfiler.h"
#include "monitor/LogtailAlarm.h"
#include "monitor/MetricConstants.h"
#include "monitor/MetricConstants.h"
#include "processor/inner/ProcessorParseContainerLogNative.h"
#include "rapidjson/document.h"
#include "reader/JsonLogFileReader.h"
Expand Down Expand Up @@ -2022,7 +2021,8 @@ LogFileReader::FileCompareResult LogFileReader::CompareToFile(const string& file
3. continue\nend\ncontinue\nend\n -> continue\nxxx\nend
5. mLogEndRegPtr != NULL
1. xxx\nend\n -> xxx\nend
1. xxx\nend\nxxx\n -> xxx\nend
2. xxx\nend\nxxx\n -> xxx\nend
3. xxx\nend -> ""
*/
/*
return: the number of bytes left, including \n
Expand All @@ -2041,6 +2041,7 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
rollbackLineFeedCount = 0;
if (mReaderConfig.first->mInputType == FileReaderOptions::InputType::InputContainerStdio) {
// Multiline rollback
bool foundEnd = false;
if (mMultilineConfig.first->IsMultiline()) {
std::string exception;
while (endPs >= 0) {
Expand All @@ -2051,6 +2052,8 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
content.data.size(),
*mMultilineConfig.first->GetEndPatternReg(),
exception)) {
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
foundEnd = true;
// Ensure the end line is complete
if (buffer[content.lineEnd] == '\n') {
return content.lineEnd + 1;
Expand All @@ -2062,14 +2065,19 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
*mMultilineConfig.first->GetStartPatternReg(),
exception)) {
// start + continue, start
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
// Keep all the buffer if rollback all
return content.lineBegin;
}
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
endPs = content.lineBegin - 1;
}
}
if (mMultilineConfig.first->GetEndPatternReg() && foundEnd) {
return 0;
}
// Single line rollback or all unmatch rollback
rollbackLineFeedCount = 0;
if (buffer[size - 1] == '\n') {
Expand All @@ -2079,13 +2087,16 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
}
LineInfo content = NewGetLastLine(StringView(buffer, size), endPs, true);
// 最后一行是完整行,且以 \n 结尾
if (content.fullLine && buffer[endPs] == '\n') {
return size;
if (content.fullLine && buffer[content.lineEnd] == '\n') {
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
return content.lineEnd + 1;
}
content = NewGetLastLine(StringView(buffer, size), endPs, false);
rollbackLineFeedCount = content.rollbackLineFeedCount;
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
return content.lineBegin;
} else {
bool foundEnd = false;
// Multiline rollback
if (mMultilineConfig.first->IsMultiline()) {
std::string exception;
Expand All @@ -2095,6 +2106,7 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
// start + end, continue + end, end
if (BoostRegexSearch(
content.data(), content.size(), *mMultilineConfig.first->GetEndPatternReg(), exception)) {
foundEnd = true;
// Ensure the end line is complete
if (buffer[endPs] == '\n') {
return endPs + 1;
Expand All @@ -2114,6 +2126,9 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
endPs = content.data() - buffer - 1;
}
}
if (mMultilineConfig.first->GetEndPatternReg() && foundEnd) {
return 0;
}
// Single line rollback or all unmatch rollback
rollbackLineFeedCount = 0;
if (buffer[size - 1] == '\n') {
Expand Down Expand Up @@ -2281,26 +2296,38 @@ LineInfo RawTextParser::NewGetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}
if (protocolFunctionIndex != 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}

for (int32_t begin = end; begin > 0; --begin) {
if (begin == 0 || buffer[begin - 1] == '\n') {
if (buffer[begin - 1] == '\n') {
return {.data = StringView(buffer.data() + begin, end - begin),
.lineBegin = begin,
.lineEnd = end,
.rollbackLineFeedCount = 1,
.fullLine = true};
.fullLine = true,
.forceRollbackLineFeedCount = 0};
}
}
return {.data = StringView(buffer.data(), end),
.lineBegin = 0,
.lineEnd = end,
.rollbackLineFeedCount = 1,
.fullLine = true};
.fullLine = true,
.forceRollbackLineFeedCount = 0};
}

LineInfo DockerJsonFileParser::NewGetLastLine(StringView buffer,
Expand All @@ -2309,36 +2336,55 @@ LineInfo DockerJsonFileParser::NewGetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}
if (protocolFunctionIndex == 0) {
// 异常情况, DockerJsonFileParse不允许在最后一个解析器
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}

size_t nextProtocolFunctionIndex = protocolFunctionIndex - 1;
LineInfo finalLine;
while (!finalLine.fullLine) {
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->NewGetLastLine(
buffer, end, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

LineInfo line;
parseLine(rawLine, line);
finalLine.data = line.data;
finalLine.fullLine = line.fullLine;
finalLine.lineBegin = line.lineBegin;
finalLine.rollbackLineFeedCount += line.rollbackLineFeedCount;
finalLine.dataRaw = line.dataRaw;
if (finalLine.lineEnd == 0) {
finalLine.lineEnd = line.lineEnd;
int32_t rollbackLineFeedCount = 0;
int32_t forceRollbackLineFeedCount = 0;
if (line.fullLine) {
rollbackLineFeedCount = line.rollbackLineFeedCount;
forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount;
} else {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

这里再看一下,有没有遗漏原来的forceRollbackLine

Suggested change
} else {
line.forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount;

forceRollbackLineFeedCount
= finalLine.forceRollbackLineFeedCount + line.forceRollbackLineFeedCount + line.rollbackLineFeedCount;
rollbackLineFeedCount = 0;
}
finalLine = std::move(line);
finalLine.rollbackLineFeedCount = rollbackLineFeedCount;
finalLine.forceRollbackLineFeedCount = forceRollbackLineFeedCount;
if (!finalLine.fullLine) {
if (finalLine.lineBegin == 0) {
finalLine.data = StringView();
return finalLine;
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = finalLine.rollbackLineFeedCount,
.fullLine = false,
.forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount};
}
end = finalLine.lineBegin - 1;
}
Expand All @@ -2349,7 +2395,9 @@ LineInfo DockerJsonFileParser::NewGetLastLine(StringView buffer,
bool DockerJsonFileParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
paseLine = rawLine;
paseLine.fullLine = false;

if (rawLine.data.size() == 0) {
return false;
}
rapidjson::Document doc;
doc.Parse(rawLine.data.data(), rawLine.data.size());

Expand Down Expand Up @@ -2387,39 +2435,58 @@ LineInfo ContainerdTextParser::NewGetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}
if (protocolFunctionIndex == 0) {
// 异常情况, DockerJsonFileParse不允许在最后一个解析器
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
// 异常情况, ContainerdTextParser不允许在最后一个解析器
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = 0,
.fullLine = false,
.forceRollbackLineFeedCount = 0};
}
LineInfo finalLine;
finalLine.fullLine = false;
// 跳过最后的连续P
size_t nextProtocolFunctionIndex = protocolFunctionIndex - 1;

// 跳过最后的连续P
while (!finalLine.fullLine) {
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->NewGetLastLine(
buffer, end, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

LineInfo line;
parseLine(rawLine, line);
// containerd 不需要外层协议的 dataRaw
finalLine.data = line.data;
finalLine.fullLine = line.fullLine;
finalLine.lineBegin = line.lineBegin;
finalLine.rollbackLineFeedCount += line.rollbackLineFeedCount;
mergeLines(finalLine, finalLine, true);
if (finalLine.lineEnd == 0) {
finalLine.lineEnd = line.lineEnd;
int32_t rollbackLineFeedCount = 0;
int32_t forceRollbackLineFeedCount = 0;
if (line.fullLine) {
rollbackLineFeedCount = line.rollbackLineFeedCount;
forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount;
} else {
forceRollbackLineFeedCount
= finalLine.forceRollbackLineFeedCount + line.forceRollbackLineFeedCount + line.rollbackLineFeedCount;
rollbackLineFeedCount = 0;
}
finalLine = std::move(line);
finalLine.rollbackLineFeedCount = rollbackLineFeedCount;
finalLine.forceRollbackLineFeedCount = forceRollbackLineFeedCount;
mergeLines(finalLine, finalLine, true);
if (!finalLine.fullLine) {
if (finalLine.lineBegin == 0) {
finalLine.data = StringView();
return finalLine;
return {.data = StringView(),
.lineBegin = 0,
.lineEnd = 0,
.rollbackLineFeedCount = finalLine.rollbackLineFeedCount,
.fullLine = false,
.forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount};
}
end = finalLine.lineBegin - 1;
}
Expand All @@ -2442,7 +2509,7 @@ LineInfo ContainerdTextParser::NewGetLastLine(StringView buffer,
LineInfo previousLine;
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->NewGetLastLine(
buffer, finalLine.lineBegin - 1, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

Expand Down Expand Up @@ -2479,9 +2546,10 @@ void ContainerdTextParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
const char* lineEnd = rawLine.data.data() + rawLine.data.size();
paseLine = rawLine;
paseLine.fullLine = true;

if (rawLine.data.size() == 0) {
return;
}
// 寻找第一个分隔符位置 time
StringView timeValue;
const char* pch1 = std::find(rawLine.data.data(), lineEnd, ProcessorParseContainerLogNative::CONTAINERD_DELIMITER);
if (pch1 == lineEnd) {
return;
Expand All @@ -2497,6 +2565,10 @@ void ContainerdTextParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
return;
}
// 如果既不以 P 开头,也不以 F 开头
if (pch2 + 1 >= lineEnd) {
paseLine.data = StringView(pch2 + 1, lineEnd - pch2 - 1);
return;
}
if (*(pch2 + 1) != ProcessorParseContainerLogNative::CONTAINERD_PART_TAG
&& *(pch2 + 1) != ProcessorParseContainerLogNative::CONTAINERD_FULL_TAG) {
paseLine.data = StringView(pch2 + 1, lineEnd - pch2 - 1);
Expand Down
8 changes: 5 additions & 3 deletions core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,6 @@
#include "event/Event.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/FileServer.h"
#include "file_server/FileServer.h"
#include "file_server/MultilineOptions.h"
#include "log_pb/sls_logs.pb.h"
#include "logger/Logger.h"
Expand All @@ -57,16 +56,19 @@ struct LineInfo {
int32_t lineEnd;
int32_t rollbackLineFeedCount;
bool fullLine;
int32_t forceRollbackLineFeedCount;
LineInfo(StringView data = StringView(),
int32_t lineBegin = 0,
int32_t lineEnd = 0,
int32_t rollbackLineFeedCount = 0,
bool fullLine = false)
bool fullLine = false,
int32_t forceRollbackLineFeedCount = 0)
: data(data),
lineBegin(lineBegin),
lineEnd(lineEnd),
rollbackLineFeedCount(rollbackLineFeedCount),
fullLine(fullLine) {}
fullLine(fullLine),
forceRollbackLineFeedCount(forceRollbackLineFeedCount) {}
};

class BaseLineParse {
Expand Down
Loading
Loading