Skip to content

Commit

Permalink
recover readers exactly from checkpoint (alibaba#1620)
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc authored Jul 23, 2024
1 parent b111b38 commit 939937a
Show file tree
Hide file tree
Showing 7 changed files with 337 additions and 80 deletions.
13 changes: 8 additions & 5 deletions core/checkpoint/CheckPointManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,13 +15,15 @@
*/

#pragma once
#include <string>
#include <memory>
#include <unordered_map>
#include <set>
#include <ctime>
#include <json/json.h>

#include <boost/optional.hpp>
#include <ctime>
#include <memory>
#include <set>
#include <string>
#include <unordered_map>

#include "common/DevInode.h"
#include "common/EncodingConverter.h"
#include "common/SplitedFilePath.h"
Expand All @@ -47,6 +49,7 @@ class CheckPoint {
std::string mConfigName;
std::string mFileName;
std::string mRealFileName;
int32_t mPositionInReaderArray = -1; // default not in the reader queue

CheckPoint() {}

Expand Down
61 changes: 44 additions & 17 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,8 @@ ModifyHandler::ModifyHandler(const std::string& configName, const FileDiscoveryC
: mConfigName(configName) {
if (pConfig.first && pConfig.second->GetGlobalConfig().mProcessPriority > 0
&& pConfig.second->GetGlobalConfig().mProcessPriority <= ProcessQueueManager::sMaxPriority) {
mReadFileTimeSlice = (1 << (ProcessQueueManager::sMaxPriority - pConfig.second->GetGlobalConfig().mProcessPriority + 1))
mReadFileTimeSlice
= (1 << (ProcessQueueManager::sMaxPriority - pConfig.second->GetGlobalConfig().mProcessPriority + 1))
* INT64_FLAG(read_file_time_slice);
} else {
mReadFileTimeSlice = INT64_FLAG(read_file_time_slice);
Expand Down Expand Up @@ -362,7 +363,20 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
"new log reader queue count", mNameReaderMap.size() + 1));
}
LogFileReaderPtrArray& readerArray = mNameReaderMap[name];
if (readerArray.size() >= readerConfig.first->mRotatorQueueSize) {

LogFileReaderPtr readerPtr(LogFileReader::CreateLogFileReader(path,
name,
devInode,
readerConfig,
multilineConfig,
discoveryConfig,
exactlyonceConcurrency,
forceBeginingFlag));
if (readerPtr.get() == NULL)
return LogFileReaderPtr();

if (readerArray.size() >= readerConfig.first->mRotatorQueueSize
&& readerPtr->GetIdxInReaderArrayFromLastCpt() == LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY) {
int32_t nowTime = time(NULL);
if (nowTime - mLastOverflowErrorTime > INT32_FLAG(rotate_overflow_error_interval)) {
mLastOverflowErrorTime = nowTime;
Expand Down Expand Up @@ -391,17 +405,6 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
"config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))(
"file device", ToString(devInode.dev))("file inode", ToString(devInode.inode)));

LogFileReaderPtr readerPtr(LogFileReader::CreateLogFileReader(path,
name,
devInode,
readerConfig,
multilineConfig,
discoveryConfig,
exactlyonceConcurrency,
forceBeginingFlag));
if (readerPtr.get() == NULL)
return LogFileReaderPtr();

// new log
bool backFlag = false;
if (readerPtr->GetRealLogPath().empty() || readerPtr->GetRealLogPath() == readerPtr->GetHostLogPath()) {
Expand Down Expand Up @@ -448,9 +451,26 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
}
}

backFlag ? readerArray.push_back(readerPtr) : readerArray.push_front(readerPtr);
int32_t idx = readerPtr->GetIdxInReaderArrayFromLastCpt();
// new reader
if (backFlag) {
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
// reader not in reader array
} else if (idx == LogFileReader::CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY) {
mRotatorReaderMap[devInode] = readerPtr;
// reader in reader array
} else if (idx >= 0) {
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
std::stable_sort(readerArray.begin(), readerArray.end(), ModifyHandler::CompareReaderByIdxFromCpt);
} else {
LOG_ERROR(sLogger,
("unexpected idx", idx)("real log path", readerPtr->GetRealLogPath())("host log path",
readerPtr->GetHostLogPath()));
return LogFileReaderPtr();
}
readerPtr->SetReaderArray(&readerArray);
mDevInodeReaderMap[devInode] = readerPtr;

LOG_INFO(sLogger,
("log reader creation succeed",
Expand Down Expand Up @@ -947,11 +967,18 @@ void ModifyHandler::HandleTimeOut() {
bool ModifyHandler::DumpReaderMeta(bool isRotatorReader, bool checkConfigFlag) {
if (!isRotatorReader) {
for (DevInodeLogFileReaderMap::iterator it = mDevInodeReaderMap.begin(); it != mDevInodeReaderMap.end(); ++it) {
it->second->DumpMetaToMem(checkConfigFlag);
int32_t idxInReaderArray = -2;
for (size_t i = 0; i < it->second->GetReaderArray()->size(); ++i) {
if (it->second->GetReaderArray()->at(i) == it->second) {
idxInReaderArray = i;
break;
}
}
it->second->DumpMetaToMem(checkConfigFlag, idxInReaderArray);
}
} else {
for (DevInodeLogFileReaderMap::iterator it = mRotatorReaderMap.begin(); it != mRotatorReaderMap.end(); ++it) {
it->second->DumpMetaToMem(checkConfigFlag);
it->second->DumpMetaToMem(checkConfigFlag, -2);
}
}
return true;
Expand Down
15 changes: 15 additions & 0 deletions core/event_handler/EventHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,21 @@ class ModifyHandler : public EventHandler {
return left->GetLastUpdateTime() < right->GetLastUpdateTime();
}

static bool CompareReaderByIdxFromCpt(const std::shared_ptr<LogFileReader> left,
const std::shared_ptr<LogFileReader> right) {
if (left->GetIdxInReaderArrayFromLastCpt() == right->GetIdxInReaderArrayFromLastCpt()) {
return false;
}
// new reader is always at the end of the array
if (left->GetIdxInReaderArrayFromLastCpt() == LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY) {
return false;
}
if (right->GetIdxInReaderArrayFromLastCpt() == LogFileReader::CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY) {
return true;
}
return left->GetIdxInReaderArrayFromLastCpt() < right->GetIdxInReaderArrayFromLastCpt();
}

LogFileReaderPtr CreateLogFileReaderPtr(const std::string& path,
const std::string& name,
const DevInode& devInode,
Expand Down
14 changes: 9 additions & 5 deletions core/reader/LogFileReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ void LogFileReader::SetMetrics() {
mInputFileOffsetBytesGauge = mMetricsRecordRef->GetGauge(METRIC_INPUT_FILE_OFFSET_BYTES);
}

void LogFileReader::DumpMetaToMem(bool checkConfigFlag) {
void LogFileReader::DumpMetaToMem(bool checkConfigFlag, int32_t idxInReaderArray) {
if (checkConfigFlag) {
size_t index = mHostLogPath.rfind(PATH_SEPARATOR);
if (index == string::npos || index == mHostLogPath.size() - 1) {
Expand Down Expand Up @@ -264,6 +264,7 @@ void LogFileReader::DumpMetaToMem(bool checkConfigFlag) {
// use last event time as checkpoint's last update time
checkPointPtr->mLastUpdateTime = mLastEventTime;
checkPointPtr->mCache = mCache;
checkPointPtr->mPositionInReaderArray = idxInReaderArray;
CheckPointManager::Instance()->AddCheckPoint(checkPointPtr);
}

Expand Down Expand Up @@ -307,12 +308,15 @@ void LogFileReader::InitReader(bool tailExisted, FileReadPolicy policy, uint32_t
mRealLogPath = checkPointPtr->mRealFileName;
mLastEventTime = checkPointPtr->mLastUpdateTime;
mContainerStopped = checkPointPtr->mContainerStopped;
// new property to recover reader exactly from checkpoint
mIdxInReaderArrayFromLastCpt = checkPointPtr->mPositionInReaderArray;
LOG_INFO(sLogger,
("recover log reader status from checkpoint, project", GetProject())("logstore", GetLogstore())(
"config", GetConfigName())("log reader queue name", mHostLogPath)(
"file device", ToString(mDevInode.dev))("file inode", ToString(mDevInode.inode))(
"file signature", mLastFileSignatureHash)("file signature size", mLastFileSignatureSize)(
"real file path", mRealLogPath)("last file position", mLastFilePos));
"config", GetConfigName())("log reader queue name", mHostLogPath)("file device",
ToString(mDevInode.dev))(
"file inode", ToString(mDevInode.inode))("file signature", mLastFileSignatureHash)(
"file signature size", mLastFileSignatureSize)("real file path", mRealLogPath)(
"last file position", mLastFilePos)("index in reader array", mIdxInReaderArrayFromLastCpt));
// if file is open or
// last update time is new and the file's container is not stopped we
// we should use first modify
Expand Down
10 changes: 9 additions & 1 deletion core/reader/LogFileReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class LogFileReader {
LogFormat mFileLogFormat = LogFormat::TEXT;

static size_t BUFFER_SIZE;
static const int32_t CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY = -1;
static const int32_t CHECKPOINT_IDX_OF_NOT_IN_READER_ARRAY = -2;
std::vector<BaseLineParse*> mLineParsers = {};
template <typename T>
T* GetParser(size_t size) {
Expand Down Expand Up @@ -239,6 +241,10 @@ class LogFileReader {

int64_t GetLastFilePos() const { return mLastFilePos; }

int32_t GetIdxInReaderArrayFromLastCpt() const { return mIdxInReaderArrayFromLastCpt; }

void SetIdxInReaderArrayFromLastCpt(int32_t idx) { mIdxInReaderArrayFromLastCpt = idx; }

void ResetLastFilePos() { mLastFilePos = 0; }

bool NeedSkipFirstModify() const { return mSkipFirstModify; }
Expand All @@ -258,7 +264,7 @@ class LogFileReader {
void
InitReader(bool tailExisted = false, FileReadPolicy policy = BACKWARD_TO_FIXED_POS, uint32_t eoConcurrency = 0);

void DumpMetaToMem(bool checkConfigFlag = false);
void DumpMetaToMem(bool checkConfigFlag = false, int32_t idxInReaderArray = -1);

std::string GetSourceId() { return mSourceId; }

Expand Down Expand Up @@ -479,6 +485,8 @@ class LogFileReader {
int64_t mLastFileSize = 0;
time_t mLastMTime = 0;
std::string mCache;
// >= 0: index of reader array, -1: new reader, -2: not in reader array
int32_t mIdxInReaderArrayFromLastCpt = CHECKPOINT_IDX_OF_NEW_READER_IN_ARRAY;
// std::string mProjectName;
std::string mTopicName;
time_t mLastUpdateTime;
Expand Down
4 changes: 2 additions & 2 deletions core/unittest/event_handler/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@ project(event_handler_unittest)
# add_executable(create_modify_handler_unittest CreateModifyHandlerUnittest.cpp)
# target_link_libraries(create_modify_handler_unittest ${UT_BASE_TARGET})

# add_executable(modify_handler_unittest ModifyHandlerUnittest.cpp)
# target_link_libraries(modify_handler_unittest ${UT_BASE_TARGET})
add_executable(modify_handler_unittest ModifyHandlerUnittest.cpp)
target_link_libraries(modify_handler_unittest unittest_base)

add_executable(log_input_unittest LogInputUnittest.cpp)
target_link_libraries(log_input_unittest ${UT_BASE_TARGET})
Expand Down
Loading

0 comments on commit 939937a

Please sign in to comment.