Skip to content

Commit

Permalink
read blocked event as soon as process queue triggers feedback to impr…
Browse files Browse the repository at this point in the history
…ove file reading performance (#1863)
  • Loading branch information
henryzhx8 authored Nov 7, 2024
1 parent 2ca86bc commit 9759b52
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 22 deletions.
8 changes: 6 additions & 2 deletions core/file_server/event/BlockEventManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "common/Flags.h"
#include "common/HashUtil.h"
#include "common/StringTools.h"
#include "file_server/event_handler/LogInput.h"
#include "logger/Logger.h"
#include "pipeline/queue/ProcessQueueManager.h"

Expand Down Expand Up @@ -69,8 +70,11 @@ BlockedEventManager::~BlockedEventManager() {
}

void BlockedEventManager::Feedback(int64_t key) {
lock_guard<mutex> lock(mFeedbackQueueMux);
mFeedbackQueue.emplace_back(key);
{
lock_guard<mutex> lock(mFeedbackQueueMux);
mFeedbackQueue.emplace_back(key);
}
LogInput::GetInstance()->Trigger();
}

void BlockedEventManager::UpdateBlockEvent(
Expand Down
40 changes: 20 additions & 20 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ void LogInput::Start() {
mInteruptFlag = false;

mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
mRegisterdHandlersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL);
mActiveReadersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);
mRegisterdHandlersTotal
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL);
mActiveReadersTotal
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(
METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);

new Thread([this]() { ProcessLoop(); });
}
Expand Down Expand Up @@ -118,19 +121,14 @@ void LogInput::TryReadEvents(bool forceRead) {
if (mInteruptFlag)
return;

if (!forceRead) {
int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds();
if (curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval))
mLastReadEventMicroSeconds = curMicroSeconds;
else
return;
} else
mLastReadEventMicroSeconds = GetCurrentTimeInMicroSeconds();

vector<Event*> inotifyEvents;
EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents);
if (inotifyEvents.size() > 0) {
PushEventQueue(inotifyEvents);
int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds();
if (forceRead || curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval)) {
vector<Event*> inotifyEvents;
EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents);
if (inotifyEvents.size() > 0) {
PushEventQueue(inotifyEvents);
}
mLastReadEventMicroSeconds = curMicroSeconds;
}

vector<Event*> feedbackEvents;
Expand Down Expand Up @@ -212,8 +210,7 @@ bool LogInput::ReadLocalEvents() {
}
// set discard old data flag, so that history data will not be dropped.
BOOL_FLAG(ilogtail_discard_old_data) = false;
LOG_INFO(sLogger,
("load local events", GetLocalEventDataFileName())("event count", localEventJson.size()));
LOG_INFO(sLogger, ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size()));
for (Json::ValueIterator iter = localEventJson.begin(); iter != localEventJson.end(); ++iter) {
const Json::Value& eventItem = *iter;
if (!eventItem.isObject()) {
Expand Down Expand Up @@ -395,8 +392,11 @@ void* LogInput::ProcessLoop() {
delete ev;
else
ProcessEvent(dispatcher, ev);
} else
usleep(INT32_FLAG(log_input_thread_wait_interval));
} else {
unique_lock<mutex> lock(mFeedbackMux);
mFeedbackCV.wait_for(lock, chrono::microseconds(INT32_FLAG(log_input_thread_wait_interval)));
}

if (mIdleFlag)
continue;

Expand Down
5 changes: 5 additions & 0 deletions core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class LogInput : public LogRunnable {

int32_t GetLastReadEventTime() { return mLastReadEventTime; }

void Trigger() { mFeedbackCV.notify_one(); }

private:
LogInput();
~LogInput();
Expand Down Expand Up @@ -89,6 +91,9 @@ class LogInput : public LogRunnable {
mutable std::mutex mThreadRunningMux;
mutable std::condition_variable mStopCV;

mutable std::mutex mFeedbackMux;
mutable std::condition_variable mFeedbackCV;

#ifdef APSARA_UNIT_TEST_MAIN
friend class LogInputUnittest;
friend class EventDispatcherTest;
Expand Down

0 comments on commit 9759b52

Please sign in to comment.