Skip to content

Commit

Permalink
fix GetLastLine core (#2000)
Browse files Browse the repository at this point in the history
  • Loading branch information
quzard authored Dec 31, 2024
1 parent 62c55e4 commit bd3ad05
Show file tree
Hide file tree
Showing 4 changed files with 1,052 additions and 121 deletions.
178 changes: 100 additions & 78 deletions core/file_server/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,26 +350,25 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t

namespace detail {

void updatePrimaryCheckpoint(const std::string& key, PrimaryCheckpointPB& cpt, const std::string& field) {
cpt.set_update_time(time(NULL));
if (CheckpointManagerV2::GetInstance()->SetPB(key, cpt)) {
LOG_INFO(sLogger, ("update primary checkpoint", key)("field", field)("checkpoint", cpt.DebugString()));
} else {
LOG_WARNING(sLogger,
("update primary checkpoint error", key)("field", field)("checkpoint", cpt.DebugString()));
}
void updatePrimaryCheckpoint(const std::string& key, PrimaryCheckpointPB& cpt, const std::string& field) {
cpt.set_update_time(time(NULL));
if (CheckpointManagerV2::GetInstance()->SetPB(key, cpt)) {
LOG_INFO(sLogger, ("update primary checkpoint", key)("field", field)("checkpoint", cpt.DebugString()));
} else {
LOG_WARNING(sLogger, ("update primary checkpoint error", key)("field", field)("checkpoint", cpt.DebugString()));
}
}

std::pair<size_t, size_t> getPartitionRange(size_t idx, size_t concurrency, size_t totalPartitionCount) {
auto base = totalPartitionCount / concurrency;
auto extra = totalPartitionCount % concurrency;
if (extra == 0) {
return std::make_pair(idx * base, (idx + 1) * base - 1);
}
size_t min = idx <= extra ? idx * (base + 1) : extra * (base + 1) + (idx - extra) * base;
size_t max = idx < extra ? min + base : min + base - 1;
return std::make_pair(min, max);
std::pair<size_t, size_t> getPartitionRange(size_t idx, size_t concurrency, size_t totalPartitionCount) {
auto base = totalPartitionCount / concurrency;
auto extra = totalPartitionCount % concurrency;
if (extra == 0) {
return std::make_pair(idx * base, (idx + 1) * base - 1);
}
size_t min = idx <= extra ? idx * (base + 1) : extra * (base + 1) + (idx - extra) * base;
size_t max = idx < extra ? min + base : min + base - 1;
return std::make_pair(min, max);
}

} // namespace detail

Expand Down Expand Up @@ -688,7 +687,7 @@ void LogFileReader::SetFilePosBackwardToFixedPos(LogFileOperator& op) {

void LogFileReader::checkContainerType(LogFileOperator& op) {
// 判断container类型
char containerBOMBuffer[1] = {0};
char containerBOMBuffer[2] = {0};
size_t readBOMByte = 1;
int64_t filePos = 0;
TruncateInfo* truncateInfo = NULL;
Expand Down Expand Up @@ -1992,7 +1991,8 @@ LogFileReader::FileCompareResult LogFileReader::CompareToFile(const string& file
3. continue\nend\ncontinue\nend\n -> continue\nxxx\nend
5. mLogEndRegPtr != NULL
1. xxx\nend\n -> xxx\nend
1. xxx\nend\nxxx\n -> xxx\nend
2. xxx\nend\nxxx\n -> xxx\nend
3. xxx\nend -> ""
*/
/*
return: the number of bytes left, including \n
Expand All @@ -2010,6 +2010,7 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
}
rollbackLineFeedCount = 0;
// Multiline rollback
bool foundEnd = false;
if (mMultilineConfig.first->IsMultiline()) {
std::string exception;
while (endPs >= 0) {
Expand All @@ -2020,6 +2021,8 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
content.data.size(),
*mMultilineConfig.first->GetEndPatternReg(),
exception)) {
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
foundEnd = true;
// Ensure the end line is complete
if (buffer[content.lineEnd] == '\n') {
return content.lineEnd + 1;
Expand All @@ -2031,14 +2034,19 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
*mMultilineConfig.first->GetStartPatternReg(),
exception)) {
// start + continue, start
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
// Keep all the buffer if rollback all
return content.lineBegin;
}
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
endPs = content.lineBegin - 1;
}
}
if (mMultilineConfig.first->GetEndPatternReg() && foundEnd) {
return 0;
}
// Single line rollback or all unmatch rollback
rollbackLineFeedCount = 0;
if (buffer[size - 1] == '\n') {
Expand All @@ -2048,11 +2056,13 @@ LogFileReader::RemoveLastIncompleteLog(char* buffer, int32_t size, int32_t& roll
}
LineInfo content = GetLastLine(StringView(buffer, size), endPs, true);
// 最后一行是完整行,且以 \n 结尾
if (content.fullLine && buffer[endPs] == '\n') {
return size;
if (content.fullLine && buffer[content.lineEnd] == '\n') {
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
return content.lineEnd + 1;
}
content = GetLastLine(StringView(buffer, size), endPs, false);
rollbackLineFeedCount = content.rollbackLineFeedCount;
rollbackLineFeedCount += content.forceRollbackLineFeedCount;
rollbackLineFeedCount += content.rollbackLineFeedCount;
return content.lineBegin;
}

Expand Down Expand Up @@ -2186,32 +2196,31 @@ StringBuffer* BaseLineParse::GetStringBuffer() {
return &mStringBuffer;
}

/*
params:
buffer: all read logs
end: the end position of current line, \n or \0
return:
last line (backward), without \n or \0
*/
LineInfo RawTextParser::GetLastLine(StringView buffer,
int32_t end,
size_t protocolFunctionIndex,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}
if (protocolFunctionIndex != 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}

for (int32_t begin = end; begin > 0; --begin) {
if (begin == 0 || buffer[begin - 1] == '\n') {
return {.data = StringView(buffer.data() + begin, end - begin),
.lineBegin = begin,
.lineEnd = end,
.rollbackLineFeedCount = 1,
.fullLine = true};
}
}
return {.data = StringView(buffer.data(), end),
.lineBegin = 0,
.lineEnd = end,
.rollbackLineFeedCount = 1,
.fullLine = true};
if (buffer[begin - 1] == '\n') {
return LineInfo(StringView(buffer.data() + begin, end - begin), begin, end, 1, true, 0);
}
}
return LineInfo(StringView(buffer.data(), end), 0, end, 1, true, 0);
}

LineInfo DockerJsonFileParser::GetLastLine(StringView buffer,
Expand All @@ -2220,38 +2229,41 @@ LineInfo DockerJsonFileParser::GetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}
if (protocolFunctionIndex == 0) {
// 异常情况, DockerJsonFileParse不允许在最后一个解析器
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}

size_t nextProtocolFunctionIndex = protocolFunctionIndex - 1;
LineInfo finalLine
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
LineInfo finalLine;
while (!finalLine.fullLine) {
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->GetLastLine(
buffer, end, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

LineInfo line
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
LineInfo line;
parseLine(rawLine, line);
finalLine.data = line.data;
finalLine.fullLine = line.fullLine;
finalLine.lineBegin = line.lineBegin;
finalLine.rollbackLineFeedCount += line.rollbackLineFeedCount;
finalLine.dataRaw = line.dataRaw;
if (finalLine.lineEnd == 0) {
finalLine.lineEnd = line.lineEnd;
int32_t rollbackLineFeedCount = 0;
int32_t forceRollbackLineFeedCount = 0;
if (line.fullLine) {
rollbackLineFeedCount = line.rollbackLineFeedCount;
forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount;
} else {
forceRollbackLineFeedCount
= finalLine.forceRollbackLineFeedCount + line.forceRollbackLineFeedCount + line.rollbackLineFeedCount;
rollbackLineFeedCount = 0;
}
finalLine = std::move(line);
finalLine.rollbackLineFeedCount = rollbackLineFeedCount;
finalLine.forceRollbackLineFeedCount = forceRollbackLineFeedCount;
if (!finalLine.fullLine) {
if (finalLine.lineBegin == 0) {
finalLine.data = StringView();
return finalLine;
return LineInfo(
StringView(), 0, 0, finalLine.rollbackLineFeedCount, false, finalLine.forceRollbackLineFeedCount);
}
end = finalLine.lineBegin - 1;
}
Expand All @@ -2262,7 +2274,9 @@ LineInfo DockerJsonFileParser::GetLastLine(StringView buffer,
bool DockerJsonFileParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
paseLine = rawLine;
paseLine.fullLine = false;

if (rawLine.data.size() == 0) {
return false;
}
rapidjson::Document doc;
doc.Parse(rawLine.data.data(), rawLine.data.size());

Expand Down Expand Up @@ -2300,40 +2314,44 @@ LineInfo ContainerdTextParser::GetLastLine(StringView buffer,
bool needSingleLine,
std::vector<BaseLineParse*>* lineParsers) {
if (end == 0) {
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
return LineInfo(StringView(), 0, 0, 0, false, 0);
}
if (protocolFunctionIndex == 0) {
// 异常情况, DockerJsonFileParse不允许在最后一个解析器
return {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
// 异常情况, ContainerdTextParser不允许在最后一个解析器
return LineInfo(StringView(), 0, 0, 0, false, 0);
}
LineInfo finalLine
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
// 跳过最后的连续P
LineInfo finalLine;
finalLine.fullLine = false;
size_t nextProtocolFunctionIndex = protocolFunctionIndex - 1;

// 跳过最后的连续P
while (!finalLine.fullLine) {
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->GetLastLine(
buffer, end, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

LineInfo line
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
LineInfo line;
parseLine(rawLine, line);
// containerd 不需要外层协议的 dataRaw
finalLine.data = line.data;
finalLine.fullLine = line.fullLine;
finalLine.lineBegin = line.lineBegin;
finalLine.rollbackLineFeedCount += line.rollbackLineFeedCount;
mergeLines(finalLine, finalLine, true);
if (finalLine.lineEnd == 0) {
finalLine.lineEnd = line.lineEnd;
int32_t rollbackLineFeedCount = 0;
int32_t forceRollbackLineFeedCount = 0;
if (line.fullLine) {
rollbackLineFeedCount = line.rollbackLineFeedCount;
forceRollbackLineFeedCount = finalLine.forceRollbackLineFeedCount;
} else {
forceRollbackLineFeedCount
= finalLine.forceRollbackLineFeedCount + line.forceRollbackLineFeedCount + line.rollbackLineFeedCount;
rollbackLineFeedCount = 0;
}
finalLine = std::move(line);
finalLine.rollbackLineFeedCount = rollbackLineFeedCount;
finalLine.forceRollbackLineFeedCount = forceRollbackLineFeedCount;
mergeLines(finalLine, finalLine, true);
if (!finalLine.fullLine) {
if (finalLine.lineBegin == 0) {
finalLine.data = StringView();
return finalLine;
return LineInfo(
StringView(), 0, 0, finalLine.rollbackLineFeedCount, false, finalLine.forceRollbackLineFeedCount);
}
end = finalLine.lineBegin - 1;
}
Expand All @@ -2353,11 +2371,10 @@ LineInfo ContainerdTextParser::GetLastLine(StringView buffer,
break;
}

LineInfo previousLine
= {.data = StringView(), .lineBegin = 0, .lineEnd = 0, .rollbackLineFeedCount = 0, .fullLine = false};
LineInfo previousLine;
LineInfo rawLine = (*lineParsers)[nextProtocolFunctionIndex]->GetLastLine(
buffer, finalLine.lineBegin - 1, nextProtocolFunctionIndex, needSingleLine, lineParsers);
if (rawLine.data.back() == '\n') {
if (rawLine.data.size() > 0 && rawLine.data.back() == '\n') {
rawLine.data = StringView(rawLine.data.data(), rawLine.data.size() - 1);
}

Expand Down Expand Up @@ -2394,9 +2411,10 @@ void ContainerdTextParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
const char* lineEnd = rawLine.data.data() + rawLine.data.size();
paseLine = rawLine;
paseLine.fullLine = true;

if (rawLine.data.size() == 0) {
return;
}
// 寻找第一个分隔符位置 time
StringView timeValue;
const char* pch1 = std::find(rawLine.data.data(), lineEnd, ProcessorParseContainerLogNative::CONTAINERD_DELIMITER);
if (pch1 == lineEnd) {
return;
Expand All @@ -2412,6 +2430,10 @@ void ContainerdTextParser::parseLine(LineInfo rawLine, LineInfo& paseLine) {
return;
}
// 如果既不以 P 开头,也不以 F 开头
if (pch2 + 1 >= lineEnd) {
paseLine.data = StringView(pch2 + 1, lineEnd - pch2 - 1);
return;
}
if (*(pch2 + 1) != ProcessorParseContainerLogNative::CONTAINERD_PART_TAG
&& *(pch2 + 1) != ProcessorParseContainerLogNative::CONTAINERD_FULL_TAG) {
paseLine.data = StringView(pch2 + 1, lineEnd - pch2 - 1);
Expand Down
22 changes: 18 additions & 4 deletions core/file_server/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,16 @@
#include "common/StringTools.h"
#include "common/TimeUtil.h"
#include "common/memory/SourceBuffer.h"
#include "file_server/event/Event.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/FileServer.h"
#include "file_server/MultilineOptions.h"
#include "protobuf/sls/sls_logs.pb.h"
#include "file_server/event/Event.h"
#include "file_server/reader/FileReaderOptions.h"
#include "logger/Logger.h"
#include "models/StringView.h"
#include "pipeline/queue/QueueKey.h"
#include "protobuf/sls/sls_logs.pb.h"
#include "rapidjson/allocators.h"
#include "file_server/reader/FileReaderOptions.h"

namespace logtail {

Expand All @@ -57,6 +57,19 @@ struct LineInfo {
int32_t lineEnd;
int32_t rollbackLineFeedCount;
bool fullLine;
int32_t forceRollbackLineFeedCount;
LineInfo(StringView data = StringView(),
int32_t lineBegin = 0,
int32_t lineEnd = 0,
int32_t rollbackLineFeedCount = 0,
bool fullLine = false,
int32_t forceRollbackLineFeedCount = 0)
: data(data),
lineBegin(lineBegin),
lineEnd(lineEnd),
rollbackLineFeedCount(rollbackLineFeedCount),
fullLine(fullLine),
forceRollbackLineFeedCount(forceRollbackLineFeedCount) {}
};

class BaseLineParse {
Expand Down Expand Up @@ -234,7 +247,7 @@ class LogFileReader {

/// @return e.g. `/home/admin/access.log`
const std::string& GetConvertedPath() const;

const std::string& GetHostLogPathFile() const { return mHostLogPathFile; }

int64_t GetFileSize() const { return mLastFileSize; }
Expand Down Expand Up @@ -686,6 +699,7 @@ class LogFileReader {
friend class LogSplitNoDiscardUnmatchUnittest;
friend class RemoveLastIncompleteLogMultilineUnittest;
friend class LogFileReaderCheckpointUnittest;
friend class GetLastLineUnittest;
friend class LastMatchedContainerdTextLineUnittest;
friend class LastMatchedDockerJsonFileUnittest;
friend class LastMatchedContainerdTextWithDockerJsonUnittest;
Expand Down
Loading

0 comments on commit bd3ad05

Please sign in to comment.