Skip to content

Commit

Permalink
Merge remote-tracking branch 'byted/main' into fix/flusher-prometheus-ut
Browse files Browse the repository at this point in the history
Change-Id: Iba388c30ac22206c5cb238355809348e70d8a7b5
  • Loading branch information
shunjiazhu committed Nov 12, 2024
2 parents 47d9d3b + 4741ec8 commit 9c31c81
Show file tree
Hide file tree
Showing 57 changed files with 855 additions and 1,213 deletions.
7 changes: 7 additions & 0 deletions .github/workflows/benchmark.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ jobs:
strategy:
matrix:
go-version: [ 1.19.10 ]
python-version: [ 3.8 ]
runner: [ ubuntu-latest ]
fail-fast: true
permissions:
Expand All @@ -62,6 +63,11 @@ jobs:
with:
go-version: ${{ matrix.go-version }}

- name: Set up Python ${{ matrix.python-version }}
uses: actions/setup-python@v5
with:
python-version: ${{ matrix.python-version }}

- name: Check out code
uses: actions/checkout@v2
with:
Expand All @@ -83,6 +89,7 @@ jobs:
BUILD_LOGTAIL_UT: OFF
WITHOUTGDB: ON
run: |
pip3 install -r test/requirements.txt
make benchmark
git stash
Expand Down
8 changes: 6 additions & 2 deletions core/file_server/event/BlockEventManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
#include "common/Flags.h"
#include "common/HashUtil.h"
#include "common/StringTools.h"
#include "file_server/event_handler/LogInput.h"
#include "logger/Logger.h"
#include "pipeline/queue/ProcessQueueManager.h"

Expand Down Expand Up @@ -69,8 +70,11 @@ BlockedEventManager::~BlockedEventManager() {
}

void BlockedEventManager::Feedback(int64_t key) {
lock_guard<mutex> lock(mFeedbackQueueMux);
mFeedbackQueue.emplace_back(key);
{
lock_guard<mutex> lock(mFeedbackQueueMux);
mFeedbackQueue.emplace_back(key);
}
LogInput::GetInstance()->Trigger();
}

void BlockedEventManager::UpdateBlockEvent(
Expand Down
40 changes: 20 additions & 20 deletions core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,12 @@ void LogInput::Start() {
mInteruptFlag = false;

mLastRunTime = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME);
mRegisterdHandlersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL);
mActiveReadersTotal = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);
mRegisterdHandlersTotal
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_WATCHED_DIRS_TOTAL);
mActiveReadersTotal
= FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(METRIC_RUNNER_FILE_ACTIVE_READERS_TOTAL);
mEnableFileIncludedByMultiConfigs = FileServer::GetInstance()->GetMetricsRecordRef().CreateIntGauge(
METRIC_RUNNER_FILE_ENABLE_FILE_INCLUDED_BY_MULTI_CONFIGS_FLAG);

new Thread([this]() { ProcessLoop(); });
}
Expand Down Expand Up @@ -118,19 +121,14 @@ void LogInput::TryReadEvents(bool forceRead) {
if (mInteruptFlag)
return;

if (!forceRead) {
int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds();
if (curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval))
mLastReadEventMicroSeconds = curMicroSeconds;
else
return;
} else
mLastReadEventMicroSeconds = GetCurrentTimeInMicroSeconds();

vector<Event*> inotifyEvents;
EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents);
if (inotifyEvents.size() > 0) {
PushEventQueue(inotifyEvents);
int64_t curMicroSeconds = GetCurrentTimeInMicroSeconds();
if (forceRead || curMicroSeconds - mLastReadEventMicroSeconds >= INT64_FLAG(read_fs_events_interval)) {
vector<Event*> inotifyEvents;
EventDispatcher::GetInstance()->ReadInotifyEvents(inotifyEvents);
if (inotifyEvents.size() > 0) {
PushEventQueue(inotifyEvents);
}
mLastReadEventMicroSeconds = curMicroSeconds;
}

vector<Event*> feedbackEvents;
Expand Down Expand Up @@ -212,8 +210,7 @@ bool LogInput::ReadLocalEvents() {
}
// set discard old data flag, so that history data will not be dropped.
BOOL_FLAG(ilogtail_discard_old_data) = false;
LOG_INFO(sLogger,
("load local events", GetLocalEventDataFileName())("event count", localEventJson.size()));
LOG_INFO(sLogger, ("load local events", GetLocalEventDataFileName())("event count", localEventJson.size()));
for (Json::ValueIterator iter = localEventJson.begin(); iter != localEventJson.end(); ++iter) {
const Json::Value& eventItem = *iter;
if (!eventItem.isObject()) {
Expand Down Expand Up @@ -395,8 +392,11 @@ void* LogInput::ProcessLoop() {
delete ev;
else
ProcessEvent(dispatcher, ev);
} else
usleep(INT32_FLAG(log_input_thread_wait_interval));
} else {
unique_lock<mutex> lock(mFeedbackMux);
mFeedbackCV.wait_for(lock, chrono::microseconds(INT32_FLAG(log_input_thread_wait_interval)));
}

if (mIdleFlag)
continue;

Expand Down
5 changes: 5 additions & 0 deletions core/file_server/event_handler/LogInput.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ class LogInput : public LogRunnable {

int32_t GetLastReadEventTime() { return mLastReadEventTime; }

void Trigger() { mFeedbackCV.notify_one(); }

private:
LogInput();
~LogInput();
Expand Down Expand Up @@ -89,6 +91,9 @@ class LogInput : public LogRunnable {
mutable std::mutex mThreadRunningMux;
mutable std::condition_variable mStopCV;

mutable std::mutex mFeedbackMux;
mutable std::condition_variable mFeedbackCV;

#ifdef APSARA_UNIT_TEST_MAIN
friend class LogInputUnittest;
friend class EventDispatcherTest;
Expand Down
30 changes: 17 additions & 13 deletions core/models/EventPool.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -41,29 +41,26 @@ EventPool::~EventPool() {
}

LogEvent* EventPool::AcquireLogEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mLogEventPool, mLogEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mLogEventPool, mMinUnusedLogEventsCnt);
}
return AcquireEventNoLock(ptr, mLogEventPool, mMinUnusedLogEventsCnt);
}

MetricEvent* EventPool::AcquireMetricEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mMetricEventPool, mMetricEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mMetricEventPool, mMinUnusedMetricEventsCnt);
}
return AcquireEventNoLock(ptr, mMetricEventPool, mMinUnusedMetricEventsCnt);
}

SpanEvent* EventPool::AcquireSpanEvent(PipelineEventGroup* ptr) {
TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak);

if (mEnableLock) {
TransferPoolIfEmpty(mSpanEventPool, mSpanEventPoolBak);
lock_guard<mutex> lock(mPoolMux);
return AcquireEventNoLock(ptr, mSpanEventPool, mMinUnusedSpanEventsCnt);
}
Expand Down Expand Up @@ -98,20 +95,27 @@ void EventPool::Release(vector<SpanEvent*>&& obj) {
}

template <class T>
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux) {
void DoGC(vector<T*>& pool, vector<T*>& poolBak, size_t& minUnusedCnt, mutex* mux, const string& type) {
if (minUnusedCnt <= pool.size() || minUnusedCnt == numeric_limits<size_t>::max()) {
auto sz = minUnusedCnt == numeric_limits<size_t>::max() ? pool.size() : minUnusedCnt;
for (size_t i = 0; i < sz; ++i) {
delete pool.back();
pool.pop_back();
}
size_t bakSZ = 0;
if (mux) {
lock_guard<mutex> lock(*mux);
bakSZ = poolBak.size();
for (auto& item : poolBak) {
delete item;
}
poolBak.clear();
}
if (sz != 0 || bakSZ != 0) {
LOG_INFO(
sLogger,
("event pool gc", "done")("event type", type)("gc event cnt", sz + bakSZ)("pool size", pool.size()));
}
} else {
LOG_ERROR(sLogger,
("unexpected error", "min unused event cnt is greater than pool size")(
Expand All @@ -124,13 +128,13 @@ void EventPool::CheckGC() {
if (time(nullptr) - mLastGCTime > INT32_FLAG(event_pool_gc_interval_secs)) {
if (mEnableLock) {
lock_guard<mutex> lock(mPoolMux);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, &mPoolBakMux, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, &mPoolBakMux, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, &mPoolBakMux, "span");
} else {
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr);
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr);
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr);
DoGC(mLogEventPool, mLogEventPoolBak, mMinUnusedLogEventsCnt, nullptr, "log");
DoGC(mMetricEventPool, mMetricEventPoolBak, mMinUnusedMetricEventsCnt, nullptr, "metric");
DoGC(mSpanEventPool, mSpanEventPoolBak, mMinUnusedSpanEventsCnt, nullptr, "span");
}
mLastGCTime = time(nullptr);
}
Expand Down
16 changes: 15 additions & 1 deletion core/models/PipelineEventGroup.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,24 @@ namespace logtail {
template <class T>
void DestroyEvents(vector<PipelineEventPtr>&& events) {
unordered_map<EventPool*, vector<T*>> eventsPoolMap;
// for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency
EventPool* cachedPoolPtr = nullptr;
typename unordered_map<EventPool*, vector<T*>>::iterator cachedIt;
bool firstEvent = true;
for (auto& item : events) {
if (item && item.IsFromEventPool()) {
item->Reset();
eventsPoolMap[item.GetEventPool()].emplace_back(static_cast<T*>(item.Release()));
if (firstEvent || item.GetEventPool() != cachedPoolPtr) {
cachedPoolPtr = item.GetEventPool();
cachedIt = eventsPoolMap.find(cachedPoolPtr);
if (cachedIt == eventsPoolMap.end()) {
eventsPoolMap.emplace(cachedPoolPtr, vector<T*>());
cachedIt = eventsPoolMap.find(cachedPoolPtr);
cachedIt->second.reserve(events.size());
}
firstEvent = false;
}
cachedIt->second.emplace_back(static_cast<T*>(item.Release()));
}
}
for (auto& item : eventsPoolMap) {
Expand Down
4 changes: 2 additions & 2 deletions core/monitor/metric_constants/ComponentMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -74,8 +74,8 @@ const string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES = "component_extra_b
const string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_TOTAL = "component_discarded_events_total";

const string METRIC_COMPONENT_QUEUE_FETCHED_ITEMS_TOTAL = "component_fetched_items_total";
const string METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL = "component_fetch_attempts_total";
const string METRIC_COMPONENT_QUEUE_SUCCESSFUL_FETCH_TIMES_TOTAL = "component_successful_fetch_times_total";
const string METRIC_COMPONENT_QUEUE_FETCH_TIMES_TOTAL = "component_fetch_times_total";
const string METRIC_COMPONENT_QUEUE_VALID_FETCH_TIMES_TOTAL = "component_valid_fetch_times_total";
const string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_REGION_LIMITER_TIMES_TOTAL
= "component_fetch_rejected_by_region_limiter_times_total";
const string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_PROJECT_LIMITER_TIMES_TOTAL
Expand Down
4 changes: 2 additions & 2 deletions core/monitor/metric_constants/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,8 +261,8 @@ extern const std::string METRIC_COMPONENT_QUEUE_EXTRA_BUFFER_SIZE_BYTES;
extern const std::string METRIC_COMPONENT_QUEUE_DISCARDED_EVENTS_TOTAL;

extern const std::string METRIC_COMPONENT_QUEUE_FETCHED_ITEMS_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_ATTEMPTS_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_SUCCESSFUL_FETCH_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_VALID_FETCH_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_REGION_LIMITER_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_PROJECT_LIMITER_TIMES_TOTAL;
extern const std::string METRIC_COMPONENT_QUEUE_FETCH_REJECTED_BY_LOGSTORE_LIMITER_TIMES_TOTAL;
Expand Down
8 changes: 4 additions & 4 deletions core/pipeline/batch/BatchItem.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ class GroupBatchItem {

void Add(BatchedEvents&& g, int64_t totalEnqueTimeMs) {
mEventsCnt += g.mEvents.size();
mTotalEnqueTimeMs += totalEnqueTimeMs;
// mTotalEnqueTimeMs += totalEnqueTimeMs;
mGroups.emplace_back(std::move(g));
mStatus.Update(mGroups.back());
}
Expand Down Expand Up @@ -94,9 +94,9 @@ class EventBatchItem {
void Add(PipelineEventPtr&& e) {
mBatch.mEvents.emplace_back(std::move(e));
mStatus.Update(mBatch.mEvents.back());
mTotalEnqueTimeMs += std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
.time_since_epoch()
.count();
// mTotalEnqueTimeMs += std::chrono::time_point_cast<std::chrono::milliseconds>(std::chrono::system_clock::now())
// .time_since_epoch()
// .count();
}

void Flush(GroupBatchItem& res) {
Expand Down
16 changes: 15 additions & 1 deletion core/pipeline/batch/BatchedEvents.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -23,10 +23,24 @@ namespace logtail {
template <class T>
void DestroyEvents(vector<PipelineEventPtr>&& events) {
unordered_map<EventPool*, vector<T*>> eventsPoolMap;
// for most cases, all events have the same origin. So we cache the pool pointer and iterator for efficiency
EventPool* cachedPoolPtr = nullptr;
typename unordered_map<EventPool*, vector<T*>>::iterator cachedIt;
bool firstEvent = true;
for (auto& item : events) {
if (item && item.IsFromEventPool()) {
item->Reset();
eventsPoolMap[item.GetEventPool()].emplace_back(static_cast<T*>(item.Release()));
if (firstEvent || item.GetEventPool() != cachedPoolPtr) {
cachedPoolPtr = item.GetEventPool();
cachedIt = eventsPoolMap.find(cachedPoolPtr);
if (cachedIt == eventsPoolMap.end()) {
eventsPoolMap.emplace(cachedPoolPtr, vector<T*>());
cachedIt = eventsPoolMap.find(cachedPoolPtr);
cachedIt->second.reserve(events.size());
}
firstEvent = false;
}
cachedIt->second.emplace_back(static_cast<T*>(item.Release()));
}
}
for (auto& item : eventsPoolMap) {
Expand Down
Loading

0 comments on commit 9c31c81

Please sign in to comment.