diff --git a/core/event_handler/EventHandler.cpp b/core/event_handler/EventHandler.cpp index 23c45c2bdf..c3e979cd7e 100644 --- a/core/event_handler/EventHandler.cpp +++ b/core/event_handler/EventHandler.cpp @@ -363,7 +363,24 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path, "new log reader queue count", mNameReaderMap.size() + 1)); } LogFileReaderPtrArray& readerArray = mNameReaderMap[name]; - if (readerArray.size() >= readerConfig.first->mRotatorQueueSize) { + + LOG_INFO(sLogger, + ("start to create log reader, project", + readerConfig.second->GetProjectName())("logstore", readerConfig.second->GetLogstoreName())( + "config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))( + "file device", ToString(devInode.dev))("file inode", ToString(devInode.inode))); + + LogFileReaderPtr readerPtr(LogFileReader::CreateLogFileReader(path, + name, + devInode, + readerConfig, + multilineConfig, + discoveryConfig, + exactlyonceConcurrency, + forceBeginingFlag)); + + if (readerArray.size() >= readerConfig.first->mRotatorQueueSize + && readerPtr->GetIdxInReaderArrayFromLastCpt() == -1) { int32_t nowTime = time(NULL); if (nowTime - mLastOverflowErrorTime > INT32_FLAG(rotate_overflow_error_interval)) { mLastOverflowErrorTime = nowTime; @@ -386,23 +403,24 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path, return LogFileReaderPtr(); } - LOG_INFO(sLogger, - ("start to create log reader, project", - readerConfig.second->GetProjectName())("logstore", readerConfig.second->GetLogstoreName())( - "config", readerConfig.second->GetConfigName())("log reader queue name", PathJoin(path, name))( - "file device", ToString(devInode.dev))("file inode", ToString(devInode.inode))); - - LogFileReaderPtr readerPtr(LogFileReader::CreateLogFileReader(path, - name, - devInode, - readerConfig, - multilineConfig, - discoveryConfig, - exactlyonceConcurrency, - forceBeginingFlag)); if (readerPtr.get() == NULL) return LogFileReaderPtr(); + // new log + bool backFlag = false; + if (readerPtr->GetRealLogPath().empty() || readerPtr->GetRealLogPath() == readerPtr->GetHostLogPath()) { + backFlag = true; + // if reader is a new file(not from checkpoint), and file is rotate file, reset file pos + if (readerArray.size() > 0 && !readerPtr->IsFromCheckPoint()) { + LOG_DEBUG(sLogger, ("file rotate, reset new reader pos", PathJoin(path, name))); + readerPtr->ResetLastFilePos(); + } + } else { + backFlag = false; + // rotate log, push front + LOG_DEBUG(sLogger, ("rotator log, push front", readerPtr->GetRealLogPath())); + } + // need check skip flag first and if flag is false then open fd if (!readerPtr->NeedSkipFirstModify()) { if (!readerPtr->UpdateFilePtr()) { @@ -436,19 +454,10 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path, int32_t idx = readerPtr->GetIdxInReaderArrayFromLastCpt(); // new reader - if (idx == -1) { - if (!(readerPtr->GetRealLogPath().empty() || readerPtr->GetRealLogPath() == readerPtr->GetHostLogPath())) { - LOG_ERROR(sLogger, - ("unexpected real log path", readerPtr->GetRealLogPath())("host log path", - readerPtr->GetHostLogPath())); - } - // if reader is a new file(not from checkpoint), and file is rotate file, reset file pos - if (readerArray.size() > 0 && !readerPtr->IsFromCheckPoint()) { - LOG_DEBUG(sLogger, ("file rotate, reset new reader pos", PathJoin(path, name))); - readerPtr->ResetLastFilePos(); - } + if (backFlag) { readerArray.push_back(readerPtr); mDevInodeReaderMap[devInode] = readerPtr; + readerPtr->SetReaderArray(&readerArray); // reader not in reader array } else if (idx == -2) { mRotatorReaderMap[devInode] = readerPtr; @@ -457,13 +466,13 @@ LogFileReaderPtr ModifyHandler::CreateLogFileReaderPtr(const string& path, readerArray.push_back(readerPtr); mDevInodeReaderMap[devInode] = readerPtr; std::sort(readerArray.begin(), readerArray.end(), ModifyHandler::CompareReaderByIdxFromCpt); + readerPtr->SetReaderArray(&readerArray); } else { LOG_ERROR(sLogger, ("unexpected idx", idx)("real log path", readerPtr->GetRealLogPath())("host log path", readerPtr->GetHostLogPath())); return LogFileReaderPtr(); } - readerPtr->SetReaderArray(&readerArray); LOG_INFO(sLogger, ("log reader creation succeed",