Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix GetLastLine core #2000

Merged
merged 4 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 100 additions & 78 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -349,26 +349,25 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t

namespace detail {

void updatePrimaryCheckpoint(const std::string& key, PrimaryCheckpointPB& cpt, const std::string& field) {
cpt.set_update_time(time(NULL));
if (CheckpointManagerV2::GetInstance()->SetPB(key, cpt)) {
LOG_INFO(sLogger, ("update primary checkpoint", key)("field", field)("checkpoint", cpt.DebugString()));
} else {
LOG_WARNING(sLogger,
("update primary checkpoint error", key)("field", field)("checkpoint", cpt.DebugString()));
}
void updatePrimaryCheckpoint(const std::string& key, PrimaryCheckpointPB& cpt, const std::string& field) {
cpt.set_update_time(time(NULL));
if (CheckpointManagerV2::GetInstance()->SetPB(key, cpt)) {
LOG_INFO(sLogger, ("update primary checkpoint", key)("field", field)("checkpoint", cpt.DebugString()));
} else {
LOG_WARNING(sLogger, ("update primary checkpoint error", key)("field", field)("checkpoint", cpt.DebugString()));
}
}

std::pair<size_t, size_t> getPartitionRange(size_t idx, size_t concurrency, size_t totalPartitionCount) {
auto base = totalPartitionCount / concurrency;
auto extra = totalPartitionCount % concurrency;
if (extra == 0) {
return std::make_pair(idx * base, (idx + 1) * base - 1);
}
size_t min = idx <= extra ? idx * (base + 1) : extra * (base + 1) + (idx - extra) * base;
size_t max = idx < extra ? min + base : min + base - 1;
return std::make_pair(min, max);
std::pair<size_t, size_t> getPartitionRange(size_t idx, size_t concurrency, size_t totalPartitionCount) {
auto base = totalPartitionCount / concurrency;
auto extra = totalPartitionCount % concurrency;
if (extra == 0) {
return std::make_pair(idx * base, (idx + 1) * base - 1);
}
size_t min = idx <= extra ? idx * (base + 1) : extra * (base + 1) + (idx - extra) * base;
size_t max = idx < extra ? min + base : min + base - 1;
return std::make_pair(min, max);
}

} // namespace detail

Expand Down Expand Up @@ -687,7 +686,7 @@ void LogFileReader::SetFilePosBackwardToFixedPos(LogFileOperator& op) {

void LogFileReader::checkContainerType(LogFileOperator& op) {
// 判断container类型
char containerBOMBuffer[1] = {0};
char containerBOMBuffer[2] = {0};
size_t readBOMByte = 1;
int64_t filePos = 0;
TruncateInfo* truncateInfo = NULL;
Expand Down Expand Up @@ -1991,7 +1990,8 @@ LogFileReader::FileCompareResult LogFileReader::CompareToFile(const string& file
3. continue\nend\ncontinue\nend\n -> continue\nxxx\nend
5. mLogEndRegPtr != NULL
1. xxx\nend\n -> xxx\nend
1. xxx\nend\nxxx\n -> xxx\nend
2. xxx\nend\nxxx\n -> xxx\nend
3. xxx\nend -> ""
*/
/*
return: the number of bytes left, including \n
Expand All @@ -2009,6 +2009,7 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
}
rollbackLineFeedCount = 0;
// Multiline rollback
bool foundEnd = false;
if (mMultilineConfig.first->IsMultiline()) {
std::string exception;
while (endPs >= 0) {
Expand All @@ -2019,6 +2020,8 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
content.data.size(),
*mMultilineConfig.first->GetEndPatternReg(),
exception)) {
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
foundEnd = true;
// Ensure the end line is complete
if (buffer[content.lineEnd] == '\n') {
return content.lineEnd + 1;
Expand All @@ -2030,14 +2033,19 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
*mMultilineConfig.first->GetStartPatternReg(),
exception)) {
// start + continue, start
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
// Keep all the buffer if rollback all
return content.lineBegin;
}
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
endPs = content.lineBegin - 1;
}
}
if (mMultilineConfig.first->GetEndPatternReg() && foundEnd) {
return 0;
}
// Single line rollback or all unmatch rollback
rollbackLineFeedCount = 0;
if (buffer[size - 1] == '\n') {
Expand All @@ -2047,11 +2055,13 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
}
LineInfo content = GetLastLine(StringView(buffer, size), endPs, true);
// 最后一行是完整行,且以 \n 结尾
if (content.fullLine && buffer[endPs] == '\n') {
return size;
if (content.fullLine && buffer[content.lineEnd] == '\n') {
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
return content.lineEnd + 1;
}
content = GetLastLine(StringView(buffer, size), endPs, false);
rollbackLineFeedCount = content.rollbackLineFeedCount;
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
return content.lineBegin;
}

Expand Down Expand Up @@ -2185,32 +2195,31 @@ StringBuffer* BaseLineParse::GetStringBuffer() {
return &mStringBuffer;
}

/*
params:
buffer: all read logs
end: the end position of current line, \n or \0
return:
last line (backward), without \n or \0
*/
LineInfo RawTextParser::GetLastLine(StringView buffer,
int32_t end,
size_t protocolFunctionIndex,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}
if (protocolFunctionIndex != 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}

for (int32_t begin = end; begin > 0; --begin) {
if (begin == 0 || buffer[begin - 1] == '\n') {
return {.data = StringView(buffer.data() + begin, end - begin),
.lineBegin = begin,
.lineEnd = end,
.rollbackLineFeedCount = 1,
.fullLine = true};
}
}
return {.data = StringView(buffer.data(), end),
.lineBegin = 0,
.lineEnd = end,
.rollbackLineFeedCount = 1,
.fullLine = true};
if (buffer[begin - 1] == '\n') {
return LineInfo(StringView(buffer.data() + begin, end - begin), begin, end, 1, true, 0);
}
}
return LineInfo(StringView(buffer.data(), end), 0, end, 1, true, 0);
}

LineInfo DockerJsonFileParser::GetLastLine(StringView buffer,
Expand All @@ -2219,38 +2228,41 @@ LineInfo DockerJsonFileParser::GetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}
if (protocolFunctionIndex == 0) {
// 异常情况, DockerJsonFileParse不允许在最后一个解析器
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}

size_t nextProtocolFunctionIndex = protocolFunctionIndex - 1;
LineInfo finalLine
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
LineInfo finalLine;
while (!finalLine.fullLine) {
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->GetLastLine(
buffer, end, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

LineInfo line
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
LineInfo line;
parseLine(rawLine, line);
finalLine.data = line.data;
finalLine.fullLine = line.fullLine;
finalLine.lineBegin = line.lineBegin;
finalLine.rollbackLineFeedCount += line.rollbackLineFeedCount;
finalLine.dataRaw = line.dataRaw;
if (finalLine.lineEnd == 0) {
finalLine.lineEnd = line.lineEnd;
int32_t rollbackLineFeedCount = 0;
int32_t forceRollbackLineFeedCount = 0;
if (line.fullLine) {
rollbackLineFeedCount = line.rollbackLineFeedCount;
forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount;
} else {
forceRollbackLineFeedCount
= finalLine.forceRollbackLineFeedCount + line.forceRollbackLineFeedCount + line.rollbackLineFeedCount;
rollbackLineFeedCount = 0;
}
finalLine = std::move(line);
finalLine.rollbackLineFeedCount = rollbackLineFeedCount;
finalLine.forceRollbackLineFeedCount = forceRollbackLineFeedCount;
if (!finalLine.fullLine) {
if (finalLine.lineBegin == 0) {
finalLine.data = StringView();
return finalLine;
return LineInfo(
StringView(), 0, 0, finalLine.rollbackLineFeedCount, false, finalLine.forceRollbackLineFeedCount);
}
end = finalLine.lineBegin - 1;
}
Expand All @@ -2261,7 +2273,9 @@ LineInfo DockerJsonFileParser::GetLastLine(StringView buffer,
bool DockerJsonFileParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
paseLine = rawLine;
paseLine.fullLine = false;

if (rawLine.data.size() == 0) {
return false;
}
rapidjson::Document doc;
doc.Parse(rawLine.data.data(), rawLine.data.size());

Expand Down Expand Up @@ -2299,40 +2313,44 @@ LineInfo ContainerdTextParser::GetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}
if (protocolFunctionIndex == 0) {
// 异常情况, DockerJsonFileParse不允许在最后一个解析器
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
// 异常情况, ContainerdTextParser不允许在最后一个解析器
return LineInfo(StringView(), 0, 0, 0, false, 0);
}
LineInfo finalLine
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
// 跳过最后的连续P
LineInfo finalLine;
finalLine.fullLine = false;
size_t nextProtocolFunctionIndex = protocolFunctionIndex - 1;

// 跳过最后的连续P
while (!finalLine.fullLine) {
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->GetLastLine(
buffer, end, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

LineInfo line
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
LineInfo line;
parseLine(rawLine, line);
// containerd 不需要外层协议的 dataRaw
finalLine.data = line.data;
finalLine.fullLine = line.fullLine;
finalLine.lineBegin = line.lineBegin;
finalLine.rollbackLineFeedCount += line.rollbackLineFeedCount;
mergeLines(finalLine, finalLine, true);
if (finalLine.lineEnd == 0) {
finalLine.lineEnd = line.lineEnd;
int32_t rollbackLineFeedCount = 0;
int32_t forceRollbackLineFeedCount = 0;
if (line.fullLine) {
rollbackLineFeedCount = line.rollbackLineFeedCount;
forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount;
} else {
forceRollbackLineFeedCount
= finalLine.forceRollbackLineFeedCount + line.forceRollbackLineFeedCount + line.rollbackLineFeedCount;
rollbackLineFeedCount = 0;
}
finalLine = std::move(line);
finalLine.rollbackLineFeedCount = rollbackLineFeedCount;
finalLine.forceRollbackLineFeedCount = forceRollbackLineFeedCount;
mergeLines(finalLine, finalLine, true);
if (!finalLine.fullLine) {
if (finalLine.lineBegin == 0) {
finalLine.data = StringView();
return finalLine;
return LineInfo(
StringView(), 0, 0, finalLine.rollbackLineFeedCount, false, finalLine.forceRollbackLineFeedCount);
}
end = finalLine.lineBegin - 1;
}
Expand All @@ -2352,11 +2370,10 @@ LineInfo ContainerdTextParser::GetLastLine(StringView buffer,
break;
}

LineInfo previousLine
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
LineInfo previousLine;
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->GetLastLine(
buffer, finalLine.lineBegin - 1, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

Expand Down Expand Up @@ -2393,9 +2410,10 @@ void ContainerdTextParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
const char* lineEnd = rawLine.data.data() + rawLine.data.size();
paseLine = rawLine;
paseLine.fullLine = true;

if (rawLine.data.size() == 0) {
return;
}
// 寻找第一个分隔符位置 time
StringView timeValue;
const char* pch1 = std::find(rawLine.data.data(), lineEnd, ProcessorParseContainerLogNative::CONTAINERD_DELIMITER);
if (pch1 == lineEnd) {
return;
Expand All @@ -2411,6 +2429,10 @@ void ContainerdTextParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
return;
}
// 如果既不以 P 开头,也不以 F 开头
if (pch2 + 1 >= lineEnd) {
paseLine.data = StringView(pch2 + 1, lineEnd - pch2 - 1);
return;
}
if (*(pch2 + 1) != ProcessorParseContainerLogNative::CONTAINERD_PART_TAG
&& *(pch2 + 1) != ProcessorParseContainerLogNative::CONTAINERD_FULL_TAG) {
paseLine.data = StringView(pch2 + 1, lineEnd - pch2 - 1);
Expand Down
22 changes: 18 additions & 4 deletions core/file_server/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "common/memory/SourceBuffer.h"
#include "file_server/event/Event.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/FileServer.h"
#include "file_server/MultilineOptions.h"
#include "protobuf/sls/sls_logs.pb.h"
#include "file_server/event/Event.h"
#include "file_server/reader/FileReaderOptions.h"
#include "logger/Logger.h"
#include "models/StringView.h"
#include "pipeline/queue/QueueKey.h"
#include "protobuf/sls/sls_logs.pb.h"
#include "rapidjson/allocators.h"
#include "file_server/reader/FileReaderOptions.h"

namespace logtail {

Expand All @@ -57,6 +57,19 @@ struct LineInfo {
int32_t lineEnd;
int32_t rollbackLineFeedCount;
bool fullLine;
int32_t forceRollbackLineFeedCount;
LineInfo(StringView data = StringView(),
int32_t lineBegin = 0,
int32_t lineEnd = 0,
int32_t rollbackLineFeedCount = 0,
bool fullLine = false,
int32_t forceRollbackLineFeedCount = 0)
: data(data),
lineBegin(lineBegin),
lineEnd(lineEnd),
rollbackLineFeedCount(rollbackLineFeedCount),
fullLine(fullLine),
forceRollbackLineFeedCount(forceRollbackLineFeedCount) {}
};

class BaseLineParse {
Expand Down Expand Up @@ -234,7 +247,7 @@ class LogFileReader {

/// @return e.g. `/home/admin/access.log`
const std::string& GetConvertedPath() const;

const std::string& GetHostLogPathFile() const { return mHostLogPathFile; }

int64_t GetFileSize() const { return mLastFileSize; }
Expand Down Expand Up @@ -686,6 +699,7 @@ class LogFileReader {
friend class LogSplitNoDiscardUnmatchUnittest;
friend class RemoveLastIncompleteLogMultilineUnittest;
friend class LogFileReaderCheckpointUnittest;
friend class GetLastLineUnittest;
friend class LastMatchedContainerdTextLineUnittest;
friend class LastMatchedDockerJsonFileUnittest;
friend class LastMatchedContainerdTextWithDockerJsonUnittest;
Expand Down
Loading
Loading