diff --git a/core/models/MetricValue.cpp b/core/models/MetricValue.cpp index c262367e07..1fc05f76fd 100644 --- a/core/models/MetricValue.cpp +++ b/core/models/MetricValue.cpp @@ -56,11 +56,11 @@ void UntypedMultiDoubleValues::DelValue(StringView key) { mValues.erase(key); } -std::map::const_iterator UntypedMultiDoubleValues::ValusBegin() const { +std::map::const_iterator UntypedMultiDoubleValues::ValuesBegin() const { return mValues.begin(); } -std::map::const_iterator UntypedMultiDoubleValues::ValusEnd() const { +std::map::const_iterator UntypedMultiDoubleValues::ValuesEnd() const { return mValues.end(); } diff --git a/core/models/MetricValue.h b/core/models/MetricValue.h index b3caef108c..a18059cec0 100644 --- a/core/models/MetricValue.h +++ b/core/models/MetricValue.h @@ -58,8 +58,8 @@ struct UntypedMultiDoubleValues { void SetValueNoCopy(StringView key, double val); void DelValue(StringView key); - std::map::const_iterator ValusBegin() const; - std::map::const_iterator ValusEnd() const; + std::map::const_iterator ValuesBegin() const; + std::map::const_iterator ValuesEnd() const; size_t ValusSize() const; size_t DataSize() const; diff --git a/core/monitor/metric_constants/MetricConstants.h b/core/monitor/metric_constants/MetricConstants.h index 95a6eb1af2..c98522f4d1 100644 --- a/core/monitor/metric_constants/MetricConstants.h +++ b/core/monitor/metric_constants/MetricConstants.h @@ -162,6 +162,7 @@ extern const std::string METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS; extern const std::string METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL; extern const std::string METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL; extern const std::string METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL; +extern const std::string METRIC_PLUGIN_FLUSHER_DISCARD_TOTAL; extern const std::string METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL; extern const std::string METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL; extern const std::string METRIC_PLUGIN_FLUSHER_UNAUTH_ERROR_TOTAL; diff --git a/core/monitor/metric_constants/PluginMetrics.cpp b/core/monitor/metric_constants/PluginMetrics.cpp index 329e487539..aa04551423 100644 --- a/core/monitor/metric_constants/PluginMetrics.cpp +++ b/core/monitor/metric_constants/PluginMetrics.cpp @@ -140,6 +140,7 @@ const string METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS = "total_package_time_m const string METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL = "send_total"; const string METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL = "send_done_total"; const string METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL = "success_total"; +const string METRIC_PLUGIN_FLUSHER_DISCARD_TOTAL = "discard_total"; const string METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL = "network_error_total"; const string METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL = "server_error_total"; const string METRIC_PLUGIN_FLUSHER_UNAUTH_ERROR_TOTAL = "unauth_error_total"; diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 3ed21f7d2b..8d4c2a75d6 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -317,10 +317,11 @@ bool Pipeline::Init(PipelineConfig&& config) { ProcessQueueManager::GetInstance()->SetDownStreamQueues(mContext.GetProcessQueueKey(), std::move(senderQueues)); } - WriteMetrics::GetInstance()->PrepareMetricsRecordRef( - mMetricsRecordRef, - MetricCategory::METRIC_CATEGORY_PIPELINE, - {{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()}, {METRIC_LABEL_KEY_PIPELINE_NAME, mName}}); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, + MetricCategory::METRIC_CATEGORY_PIPELINE, + {{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()}, + {METRIC_LABEL_KEY_PIPELINE_NAME, mName}, + {METRIC_LABEL_KEY_LOGSTORE, mContext.GetLogstoreName()}}); mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME); mProcessorsInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL); mProcessorsInGroupsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL); diff --git a/core/pipeline/plugin/interface/Plugin.h b/core/pipeline/plugin/interface/Plugin.h index 999b10b921..6aef73b37a 100644 --- a/core/pipeline/plugin/interface/Plugin.h +++ b/core/pipeline/plugin/interface/Plugin.h @@ -42,6 +42,7 @@ class Plugin { MetricCategory::METRIC_CATEGORY_PLUGIN, {{METRIC_LABEL_KEY_PROJECT, mContext->GetProjectName()}, {METRIC_LABEL_KEY_PIPELINE_NAME, mContext->GetConfigName()}, + {METRIC_LABEL_KEY_LOGSTORE, mContext->GetLogstoreName()}, {METRIC_LABEL_KEY_PLUGIN_TYPE, name}, {METRIC_LABEL_KEY_PLUGIN_ID, id}}); } diff --git a/core/pipeline/serializer/JsonSerializer.cpp b/core/pipeline/serializer/JsonSerializer.cpp index c83b45e282..8535a735a2 100644 --- a/core/pipeline/serializer/JsonSerializer.cpp +++ b/core/pipeline/serializer/JsonSerializer.cpp @@ -83,8 +83,8 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str eventJson[METRIC_RESERVED_KEY_VALUE] = e.GetValue()->mValue; } else if (e.Is()) { eventJson[METRIC_RESERVED_KEY_VALUE] = Json::Value(); - for (auto value = e.GetValue()->ValusBegin(); - value != e.GetValue()->ValusEnd(); + for (auto value = e.GetValue()->ValuesBegin(); + value != e.GetValue()->ValuesEnd(); value++) { eventJson[METRIC_RESERVED_KEY_VALUE][value->first.to_string()] = value->second; } diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 3ecb22a0be..765318b77f 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -126,12 +126,14 @@ shared_ptr FlusherSLS::GetLogstoreConcurrencyLimiter(const s auto iter = sLogstoreConcurrencyLimiterMap.find(key); if (iter == sLogstoreConcurrencyLimiterMap.end()) { - auto limiter = make_shared(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency()); + auto limiter = make_shared(sName + "#quota#logstore#" + key, + AppConfig::GetInstance()->GetSendRequestConcurrency()); sLogstoreConcurrencyLimiterMap.try_emplace(key, limiter); return limiter; } if (iter->second.expired()) { - auto limiter = make_shared(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency()); + auto limiter = make_shared(sName + "#quota#logstore#" + key, + AppConfig::GetInstance()->GetSendRequestConcurrency()); iter->second = limiter; return limiter; } @@ -142,12 +144,14 @@ shared_ptr FlusherSLS::GetProjectConcurrencyLimiter(const st lock_guard lock(sMux); auto iter = sProjectConcurrencyLimiterMap.find(project); if (iter == sProjectConcurrencyLimiterMap.end()) { - auto limiter = make_shared(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency()); + auto limiter = make_shared(sName + "#quota#project#" + project, + AppConfig::GetInstance()->GetSendRequestConcurrency()); sProjectConcurrencyLimiterMap.try_emplace(project, limiter); return limiter; } if (iter->second.expired()) { - auto limiter = make_shared(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency()); + auto limiter = make_shared(sName + "#quota#project#" + project, + AppConfig::GetInstance()->GetSendRequestConcurrency()); iter->second = limiter; return limiter; } @@ -158,12 +162,20 @@ shared_ptr FlusherSLS::GetRegionConcurrencyLimiter(const str lock_guard lock(sMux); auto iter = sRegionConcurrencyLimiterMap.find(region); if (iter == sRegionConcurrencyLimiterMap.end()) { - auto limiter = make_shared(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion()); + auto limiter = make_shared( + sName + "#network#region#" + region, + AppConfig::GetInstance()->GetSendRequestConcurrency(), + AppConfig::GetInstance()->GetSendRequestConcurrency() + * AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion()); sRegionConcurrencyLimiterMap.try_emplace(region, limiter); return limiter; } if (iter->second.expired()) { - auto limiter = make_shared(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion()); + auto limiter = make_shared( + sName + "#network#region#" + region, + AppConfig::GetInstance()->GetSendRequestConcurrency(), + AppConfig::GetInstance()->GetSendRequestConcurrency() + * AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion()); iter->second = limiter; return limiter; } @@ -524,6 +536,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL); mSendDoneCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL); mSuccessCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL); + mDiscardCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_DISCARD_TOTAL); mNetworkErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL); mServerErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL); mShardWriteQuotaErrorCnt @@ -878,6 +891,9 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) DealSenderQueueItemAfterSend(item, true); break; case OperationOnFail::DISCARD: + if (mDiscardCnt) { + mDiscardCnt->Add(1); + } default: LOG_WARNING(sLogger, LOG_PATTERN); if (!isProfileData) { diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index 6a71b0f526..25291a7cc4 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -128,6 +128,7 @@ class FlusherSLS : public HttpFlusher { CounterPtr mSendCnt; CounterPtr mSendDoneCnt; CounterPtr mSuccessCnt; + CounterPtr mDiscardCnt; CounterPtr mNetworkErrorCnt; CounterPtr mServerErrorCnt; CounterPtr mShardWriteQuotaErrorCnt; diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 614e6dd415..89df992071 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -124,7 +124,7 @@ void PipelineUnittest::OnSuccessfulInit() const { APSARA_TEST_EQUAL(QueueKeyManager::GetInstance()->GetKey("test_config-flusher_sls-test_project#test_logstore"), pipeline->GetContext().GetLogstoreKey()); APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load()); - APSARA_TEST_EQUAL(2U, pipeline->mMetricsRecordRef->GetLabels()->size()); + APSARA_TEST_EQUAL(3U, pipeline->mMetricsRecordRef->GetLabels()->size()); APSARA_TEST_TRUE(pipeline->mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_PIPELINE_NAME, configName)); APSARA_TEST_TRUE(pipeline->mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_PROJECT, "test_project"));