From 14a65e9a5ca905c6fecb97bc0e0b4ba364095b7a Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Thu, 7 Nov 2024 03:20:10 +0000 Subject: [PATCH 1/2] read blocked event as soon as process queue triggers feedback to improve file reading performance --- core/file_server/event/BlockEventManager.cpp | 8 +++- core/file_server/event_handler/LogInput.cpp | 41 ++++++++++---------- core/file_server/event_handler/LogInput.h | 4 ++ 3 files changed, 31 insertions(+), 22 deletions(-) diff --git a/core/file_server/event/BlockEventManager.cpp b/core/file_server/event/BlockEventManager.cpp index b583d25e6a..006fbff7fd 100644 --- a/core/file_server/event/BlockEventManager.cpp +++ b/core/file_server/event/BlockEventManager.cpp @@ -17,6 +17,7 @@ #include "common/Flags.h" #include "common/HashUtil.h" #include "common/StringTools.h" +#include "file_server/event_handler/LogInput.h" #include "logger/Logger.h" #include "pipeline/queue/ProcessQueueManager.h" @@ -69,8 +70,11 @@ BlockedEventManager::~BlockedEventManager() { } void BlockedEventManager::Feedback(int64_t key) { - lock_guard lock(mFeedbackQueueMux); - mFeedbackQueue.emplace_back(key); + { + lock_guard lock(mFeedbackQueueMux); + mFeedbackQueue.emplace_back(key); + } + LogInput::GetInstance()->Trigger(); } void BlockedEventManager::UpdateBlockEvent( diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index 38b6e4ab36..72980522df 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -88,9 +88,12 @@ void LogInput::Start() { mInteruptFlag = false; mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); - mRegisterdHandlersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL); - mActiveReadersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL); - mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG); + mRegisterdHandlersTotal + = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL); + mActiveReadersTotal + = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL); + mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge( + METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG); new Thread([this]() { ProcessLoop(); }); } @@ -118,19 +121,14 @@ void LogInput::TryReadEvents(bool forceRead) { if (mInteruptFlag) return; - if (!forceRead) { - int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds(); - if (curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval)) - mLastReadEventMicroSeconds = curMicroSeconds; - else - return; - } else - mLastReadEventMicroSeconds = GetCurrentTimeInMicroSeconds(); - - vector inotifyEvents; - EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents); - if (inotifyEvents.size() > 0) { - PushEventQueue(inotifyEvents); + int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds(); + if (forceRead || curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval)) { + vector inotifyEvents; + EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents); + if (inotifyEvents.size() > 0) { + PushEventQueue(inotifyEvents); + } + mLastReadEventMicroSeconds = curMicroSeconds; } vector feedbackEvents; @@ -212,8 +210,7 @@ bool LogInput::ReadLocalEvents() { } // set discard old data flag, so that history data will not be dropped. BOOL_FLAG(ilogtail_discard_old_data) = false; - LOG_INFO(sLogger, - ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size())); + LOG_INFO(sLogger, ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size())); for (Json::ValueIterator iter = localEventJson.begin(); iter != localEventJson.end(); ++iter) { const Json::Value& eventItem = *iter; if (!eventItem.isObject()) { @@ -395,8 +392,12 @@ void* LogInput::ProcessLoop() { delete ev; else ProcessEvent(dispatcher, ev); - } else - usleep(INT32_FLAG(log_input_thread_wait_interval)); + } else { + mutex mux; + unique_lock lock(mux); + mFeedbackCV.wait_for(lock, chrono::microseconds(INT32_FLAG(log_input_thread_wait_interval))); + } + if (mIdleFlag) continue; diff --git a/core/file_server/event_handler/LogInput.h b/core/file_server/event_handler/LogInput.h index 05a6ebbc6f..3b66900a17 100644 --- a/core/file_server/event_handler/LogInput.h +++ b/core/file_server/event_handler/LogInput.h @@ -60,6 +60,8 @@ class LogInput : public LogRunnable { int32_t GetLastReadEventTime() { return mLastReadEventTime; } + void Trigger() { mFeedbackCV.notify_one(); } + private: LogInput(); ~LogInput(); @@ -89,6 +91,8 @@ class LogInput : public LogRunnable { mutable std::mutex mThreadRunningMux; mutable std::condition_variable mStopCV; + mutable std::condition_variable mFeedbackCV; + #ifdef APSARA_UNIT_TEST_MAIN friend class LogInputUnittest; friend class EventDispatcherTest; From e89c791a9a00b74895d1c838ac74b05b814c5492 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Thu, 7 Nov 2024 07:42:23 +0000 Subject: [PATCH 2/2] polish --- core/file_server/event_handler/LogInput.cpp | 3 +-- core/file_server/event_handler/LogInput.h | 1 + 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/core/file_server/event_handler/LogInput.cpp b/core/file_server/event_handler/LogInput.cpp index 72980522df..299e660c68 100644 --- a/core/file_server/event_handler/LogInput.cpp +++ b/core/file_server/event_handler/LogInput.cpp @@ -393,8 +393,7 @@ void* LogInput::ProcessLoop() { else ProcessEvent(dispatcher, ev); } else { - mutex mux; - unique_lock lock(mux); + unique_lock lock(mFeedbackMux); mFeedbackCV.wait_for(lock, chrono::microseconds(INT32_FLAG(log_input_thread_wait_interval))); } diff --git a/core/file_server/event_handler/LogInput.h b/core/file_server/event_handler/LogInput.h index 3b66900a17..aadeb17082 100644 --- a/core/file_server/event_handler/LogInput.h +++ b/core/file_server/event_handler/LogInput.h @@ -91,6 +91,7 @@ class LogInput : public LogRunnable { mutable std::mutex mThreadRunningMux; mutable std::condition_variable mStopCV; + mutable std::mutex mFeedbackMux; mutable std::condition_variable mFeedbackCV; #ifdef APSARA_UNIT_TEST_MAIN