Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Jul 19, 2024
1 parent fbe133b commit e477fdc
Showing 1 changed file with 36 additions and 27 deletions.
63 changes: 36 additions & 27 deletions core/event_handler/EventHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,7 +363,24 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
"new log reader queue count", mNameReaderMap.size() + 1));
}
LogFileReaderPtrArray& readerArray = mNameReaderMap[name];
if (readerArray.size() >= readerConfig.first->mRotatorQueueSize) {

LOG_INFO(sLogger,
("start to create log reader, project",
readerConfig.second->GetProjectName())("logstore", readerConfig.second->GetLogstoreName())(
"config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))(
"file device", ToString(devInode.dev))("file inode", ToString(devInode.inode)));

LogFileReaderPtr readerPtr(LogFileReader::CreateLogFileReader(path,
name,
devInode,
readerConfig,
multilineConfig,
discoveryConfig,
exactlyonceConcurrency,
forceBeginingFlag));

if (readerArray.size() >= readerConfig.first->mRotatorQueueSize
&& readerPtr->GetIdxInReaderArrayFromLastCpt() == -1) {
int32_t nowTime = time(NULL);
if (nowTime - mLastOverflowErrorTime > INT32_FLAG(rotate_overflow_error_interval)) {
mLastOverflowErrorTime = nowTime;
Expand All @@ -386,23 +403,24 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
return LogFileReaderPtr();
}

LOG_INFO(sLogger,
("start to create log reader, project",
readerConfig.second->GetProjectName())("logstore", readerConfig.second->GetLogstoreName())(
"config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))(
"file device", ToString(devInode.dev))("file inode", ToString(devInode.inode)));

LogFileReaderPtr readerPtr(LogFileReader::CreateLogFileReader(path,
name,
devInode,
readerConfig,
multilineConfig,
discoveryConfig,
exactlyonceConcurrency,
forceBeginingFlag));
if (readerPtr.get() == NULL)
return LogFileReaderPtr();

// new log
bool backFlag = false;
if (readerPtr->GetRealLogPath().empty() || readerPtr->GetRealLogPath() == readerPtr->GetHostLogPath()) {
backFlag = true;
// if reader is a new file(not from checkpoint), and file is rotate file, reset file pos
if (readerArray.size() > 0 && !readerPtr->IsFromCheckPoint()) {
LOG_DEBUG(sLogger, ("file rotate, reset new reader pos", PathJoin(path, name)));
readerPtr->ResetLastFilePos();
}
} else {
backFlag = false;
// rotate log, push front
LOG_DEBUG(sLogger, ("rotator log, push front", readerPtr->GetRealLogPath()));
}

// need check skip flag first and if flag is false then open fd
if (!readerPtr->NeedSkipFirstModify()) {
if (!readerPtr->UpdateFilePtr()) {
Expand Down Expand Up @@ -436,19 +454,10 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,

int32_t idx = readerPtr->GetIdxInReaderArrayFromLastCpt();
// new reader
if (idx == -1) {
if (!(readerPtr->GetRealLogPath().empty() || readerPtr->GetRealLogPath() == readerPtr->GetHostLogPath())) {
LOG_ERROR(sLogger,
("unexpected real log path", readerPtr->GetRealLogPath())("host log path",
readerPtr->GetHostLogPath()));
}
// if reader is a new file(not from checkpoint), and file is rotate file, reset file pos
if (readerArray.size() > 0 && !readerPtr->IsFromCheckPoint()) {
LOG_DEBUG(sLogger, ("file rotate, reset new reader pos", PathJoin(path, name)));
readerPtr->ResetLastFilePos();
}
if (backFlag) {
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
readerPtr->SetReaderArray(&readerArray);
// reader not in reader array
} else if (idx == -2) {
mRotatorReaderMap[devInode] = readerPtr;
Expand All @@ -457,13 +466,13 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path,
readerArray.push_back(readerPtr);
mDevInodeReaderMap[devInode] = readerPtr;
std::sort(readerArray.begin(), readerArray.end(), ModifyHandler::CompareReaderByIdxFromCpt);
readerPtr->SetReaderArray(&readerArray);
} else {
LOG_ERROR(sLogger,
("unexpected idx", idx)("real log path", readerPtr->GetRealLogPath())("host log path",
readerPtr->GetHostLogPath()));
return LogFileReaderPtr();
}
readerPtr->SetReaderArray(&readerArray);

LOG_INFO(sLogger,
("log reader creation succeed",
Expand Down

0 comments on commit e477fdc

Please sign in to comment.