diff --git a/core/file_server/event/BlockEventManager.cpp b/core/file_server/event/BlockEventManager.cpp index b583d25e6a..006fbff7fd 100644 --- a/core/file_server/event/BlockEventManager.cpp +++ b/core/file_server/event/BlockEventManager.cpp @@ -17,6 +17,7 @@ #include "common/Flags.h" #include "common/HashUtil.h" #include "common/StringTools.h" +#include "file_server/event_handler/LogInput.h" #include "logger/Logger.h" #include "pipeline/queue/ProcessQueueManager.h" @@ -69,8 +70,11 @@ BlockedEventManager::~BlockedEventManager() { } void BlockedEventManager::Feedback(int64_t key) { - lock_guard lock(mFeedbackQueueMux); - mFeedbackQueue.emplace_back(key); + { + lock_guard lock(mFeedbackQueueMux); + mFeedbackQueue.emplace_back(key); + } + LogInput::GetInstance()->Trigger(); } void BlockedEventManager::UpdateBlockEvent( diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index 38b6e4ab36..299e660c68 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -88,9 +88,12 @@ void LogInput::Start() { mInteruptFlag = false; mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); - mRegisterdHandlersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL); - mActiveReadersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL); - mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG); + mRegisterdHandlersTotal + = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL); + mActiveReadersTotal + = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL); + mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge( + METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG); new Thread([this]() { ProcessLoop(); }); } @@ -118,19 +121,14 @@ void LogInput::TryReadEvents(bool forceRead) { if (mInteruptFlag) return; - if (!forceRead) { - int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds(); - if (curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval)) - mLastReadEventMicroSeconds = curMicroSeconds; - else - return; - } else - mLastReadEventMicroSeconds = GetCurrentTimeInMicroSeconds(); - - vector inotifyEvents; - EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents); - if (inotifyEvents.size() > 0) { - PushEventQueue(inotifyEvents); + int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds(); + if (forceRead || curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval)) { + vector inotifyEvents; + EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents); + if (inotifyEvents.size() > 0) { + PushEventQueue(inotifyEvents); + } + mLastReadEventMicroSeconds = curMicroSeconds; } vector feedbackEvents; @@ -212,8 +210,7 @@ bool LogInput::ReadLocalEvents() { } // set discard old data flag, so that history data will not be dropped. BOOL_FLAG(ilogtail_discard_old_data) = false; - LOG_INFO(sLogger, - ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size())); + LOG_INFO(sLogger, ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size())); for (Json::ValueIterator iter = localEventJson.begin(); iter != localEventJson.end(); ++iter) { const Json::Value& eventItem = *iter; if (!eventItem.isObject()) { @@ -395,8 +392,11 @@ void* LogInput::ProcessLoop() { delete ev; else ProcessEvent(dispatcher, ev); - } else - usleep(INT32_FLAG(log_input_thread_wait_interval)); + } else { + unique_lock lock(mFeedbackMux); + mFeedbackCV.wait_for(lock, chrono::microseconds(INT32_FLAG(log_input_thread_wait_interval))); + } + if (mIdleFlag) continue; diff --git a/core/file_server/event_handler/LogInput.h b/core/file_server/event_handler/LogInput.h index 05a6ebbc6f..aadeb17082 100644 --- a/core/file_server/event_handler/LogInput.h +++ b/core/file_server/event_handler/LogInput.h @@ -60,6 +60,8 @@ class LogInput : public LogRunnable { int32_t GetLastReadEventTime() { return mLastReadEventTime; } + void Trigger() { mFeedbackCV.notify_one(); } + private: LogInput(); ~LogInput(); @@ -89,6 +91,9 @@ class LogInput : public LogRunnable { mutable std::mutex mThreadRunningMux; mutable std::condition_variable mStopCV; + mutable std::mutex mFeedbackMux; + mutable std::condition_variable mFeedbackCV; + #ifdef APSARA_UNIT_TEST_MAIN friend class LogInputUnittest; friend class EventDispatcherTest;