Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 26, 2024
1 parent ada2037 commit fdf8b3a
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 6 deletions.
1 change: 1 addition & 0 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ class PipelineManager {
friend class CircularProcessQueueUnittest;
friend class CommonConfigProviderUnittest;
friend class FlusherUnittest;
friend class PipelineUnittest;
#endif
};

Expand Down
19 changes: 13 additions & 6 deletions core/unittest/pipeline/PipelineUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2702,7 +2702,8 @@ void PipelineUnittest::TestProcess() const {
processor->Init(Json::Value(), ctx);
pipeline.mProcessorLine.emplace_back(std::move(processor));

WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_UNKNOWN, {});
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
pipeline.mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_UNKNOWN, {});
pipeline.mProcessorsInEventsTotal
= pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL);
pipeline.mProcessorsInGroupsTotal
Expand Down Expand Up @@ -2750,7 +2751,8 @@ void PipelineUnittest::TestSend() const {
configs.emplace_back(1, nullptr);
pipeline.mRouter.Init(configs, ctx);

WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_UNKNOWN, {});
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
pipeline.mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_UNKNOWN, {});
pipeline.mFlushersInGroupsTotal
= pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL);
pipeline.mFlushersInEventsTotal
Expand Down Expand Up @@ -2816,7 +2818,8 @@ void PipelineUnittest::TestSend() const {
configs.emplace_back(configJson.size(), nullptr);
pipeline.mRouter.Init(configs, ctx);

WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_UNKNOWN, {});
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
pipeline.mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_UNKNOWN, {});
pipeline.mFlushersInGroupsTotal
= pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL);
pipeline.mFlushersInEventsTotal
Expand Down Expand Up @@ -2893,15 +2896,19 @@ void PipelineUnittest::TestInProcessingCount() const {
vector<PipelineEventGroup> group;
group.emplace_back(make_shared<SourceBuffer>());

auto pipeline2 = make_shared<Pipeline>();
PipelineManager::GetInstance()->mPipelineNameEntityMap[""] = pipeline2;
processQueue->EnablePop();
processQueue->Push(GenerateProcessItem(pipeline));
APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load());
APSARA_TEST_EQUAL(0, pipeline2->mInProcessCnt.load());
unique_ptr<ProcessQueueItem> item;
APSARA_TEST_TRUE(processQueue->Pop(item));
APSARA_TEST_EQUAL(1, pipeline->mInProcessCnt.load());

pipeline->SubInProcessCnt();
APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load());
APSARA_TEST_EQUAL(1, pipeline2->mInProcessCnt.load());

pipeline2->SubInProcessCnt();
APSARA_TEST_EQUAL(0, pipeline2->mInProcessCnt.load());
}

void PipelineUnittest::TestWaitAllItemsInProcessFinished() const {
Expand Down

0 comments on commit fdf8b3a

Please sign in to comment.