Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add metrics for logtail mode #2001

Merged
merged 2 commits into from
Dec 31, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/models/MetricValue.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,11 @@ void UntypedMultiDoubleValues::DelValue(StringView key) {
mValues.erase(key);
}

std::map<StringView, double>::const_iterator UntypedMultiDoubleValues::ValusBegin() const {
std::map<StringView, double>::const_iterator UntypedMultiDoubleValues::ValuesBegin() const {
return mValues.begin();
}

std::map<StringView, double>::const_iterator UntypedMultiDoubleValues::ValusEnd() const {
std::map<StringView, double>::const_iterator UntypedMultiDoubleValues::ValuesEnd() const {
return mValues.end();
}

Expand Down
4 changes: 2 additions & 2 deletions core/models/MetricValue.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,8 +58,8 @@ struct UntypedMultiDoubleValues {
void SetValueNoCopy(StringView key, double val);
void DelValue(StringView key);

std::map<StringView, double>::const_iterator ValusBegin() const;
std::map<StringView, double>::const_iterator ValusEnd() const;
std::map<StringView, double>::const_iterator ValuesBegin() const;
std::map<StringView, double>::const_iterator ValuesEnd() const;
size_t ValusSize() const;

size_t DataSize() const;
Expand Down
1 change: 1 addition & 0 deletions core/monitor/metric_constants/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ extern const std::string METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS;
extern const std::string METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL;
extern const std::string METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL;
extern const std::string METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL;
extern const std::string METRIC_PLUGIN_FLUSHER_DISCARD_TOTAL;
extern const std::string METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL;
extern const std::string METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL;
extern const std::string METRIC_PLUGIN_FLUSHER_UNAUTH_ERROR_TOTAL;
Expand Down
1 change: 1 addition & 0 deletions core/monitor/metric_constants/PluginMetrics.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ const string METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS = "total_package_time_m
const string METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL = "send_total";
const string METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL = "send_done_total";
const string METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL = "success_total";
const string METRIC_PLUGIN_FLUSHER_DISCARD_TOTAL = "discard_total";
const string METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL = "network_error_total";
const string METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL = "server_error_total";
const string METRIC_PLUGIN_FLUSHER_UNAUTH_ERROR_TOTAL = "unauth_error_total";
Expand Down
9 changes: 5 additions & 4 deletions core/pipeline/Pipeline.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -317,10 +317,11 @@ bool Pipeline::Init(PipelineConfig&& config) {
ProcessQueueManager::GetInstance()->SetDownStreamQueues(mContext.GetProcessQueueKey(), std::move(senderQueues));
}

WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef,
MetricCategory::METRIC_CATEGORY_PIPELINE,
{{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()}, {METRIC_LABEL_KEY_PIPELINE_NAME, mName}});
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef,
MetricCategory::METRIC_CATEGORY_PIPELINE,
{{METRIC_LABEL_KEY_PROJECT, mContext.GetProjectName()},
{METRIC_LABEL_KEY_PIPELINE_NAME, mName},
{METRIC_LABEL_KEY_LOGSTORE, mContext.GetLogstoreName()}});
mStartTime = mMetricsRecordRef.CreateIntGauge(METRIC_PIPELINE_START_TIME);
mProcessorsInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL);
mProcessorsInGroupsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL);
Expand Down
1 change: 1 addition & 0 deletions core/pipeline/plugin/interface/Plugin.h
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ class Plugin {
MetricCategory::METRIC_CATEGORY_PLUGIN,
{{METRIC_LABEL_KEY_PROJECT, mContext->GetProjectName()},
{METRIC_LABEL_KEY_PIPELINE_NAME, mContext->GetConfigName()},
{METRIC_LABEL_KEY_LOGSTORE, mContext->GetLogstoreName()},
{METRIC_LABEL_KEY_PLUGIN_TYPE, name},
{METRIC_LABEL_KEY_PLUGIN_ID, id}});
}
Expand Down
4 changes: 2 additions & 2 deletions core/pipeline/serializer/JsonSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -83,8 +83,8 @@ bool JsonEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, str
eventJson[METRIC_RESERVED_KEY_VALUE] = e.GetValue<UntypedSingleValue>()->mValue;
} else if (e.Is<UntypedMultiDoubleValues>()) {
eventJson[METRIC_RESERVED_KEY_VALUE] = Json::Value();
for (auto value = e.GetValue<UntypedMultiDoubleValues>()->ValusBegin();
value != e.GetValue<UntypedMultiDoubleValues>()->ValusEnd();
for (auto value = e.GetValue<UntypedMultiDoubleValues>()->ValuesBegin();
value != e.GetValue<UntypedMultiDoubleValues>()->ValuesEnd();
value++) {
eventJson[METRIC_RESERVED_KEY_VALUE][value->first.to_string()] = value->second;
}
Expand Down
28 changes: 22 additions & 6 deletions core/plugin/flusher/sls/FlusherSLS.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -126,12 +126,14 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetLogstoreConcurrencyLimiter(const s

auto iter = sLogstoreConcurrencyLimiterMap.find(key);
if (iter == sLogstoreConcurrencyLimiterMap.end()) {
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency());
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key,
AppConfig::GetInstance()->GetSendRequestConcurrency());
sLogstoreConcurrencyLimiterMap.try_emplace(key, limiter);
return limiter;
}
if (iter->second.expired()) {
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key, AppConfig::GetInstance()->GetSendRequestConcurrency());
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#logstore#" + key,
AppConfig::GetInstance()->GetSendRequestConcurrency());
iter->second = limiter;
return limiter;
}
Expand All @@ -142,12 +144,14 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetProjectConcurrencyLimiter(const st
lock_guard<mutex> lock(sMux);
auto iter = sProjectConcurrencyLimiterMap.find(project);
if (iter == sProjectConcurrencyLimiterMap.end()) {
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency());
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project,
AppConfig::GetInstance()->GetSendRequestConcurrency());
sProjectConcurrencyLimiterMap.try_emplace(project, limiter);
return limiter;
}
if (iter->second.expired()) {
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project, AppConfig::GetInstance()->GetSendRequestConcurrency());
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#quota#project#" + project,
AppConfig::GetInstance()->GetSendRequestConcurrency());
iter->second = limiter;
return limiter;
}
Expand All @@ -158,12 +162,20 @@ shared_ptr<ConcurrencyLimiter> FlusherSLS::GetRegionConcurrencyLimiter(const str
lock_guard<mutex> lock(sMux);
auto iter = sRegionConcurrencyLimiterMap.find(region);
if (iter == sRegionConcurrencyLimiterMap.end()) {
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion());
auto limiter = make_shared<ConcurrencyLimiter>(
sName + "#network#region#" + region,
AppConfig::GetInstance()->GetSendRequestConcurrency(),
AppConfig::GetInstance()->GetSendRequestConcurrency()
* AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion());
sRegionConcurrencyLimiterMap.try_emplace(region, limiter);
return limiter;
}
if (iter->second.expired()) {
auto limiter = make_shared<ConcurrencyLimiter>(sName + "#network#region#" + region, AppConfig::GetInstance()->GetSendRequestConcurrency(), AppConfig::GetInstance()->GetSendRequestConcurrency()*AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion());
auto limiter = make_shared<ConcurrencyLimiter>(
sName + "#network#region#" + region,
AppConfig::GetInstance()->GetSendRequestConcurrency(),
AppConfig::GetInstance()->GetSendRequestConcurrency()
* AppConfig::GetInstance()->GetGlobalConcurrencyFreePercentageForOneRegion());
iter->second = limiter;
return limiter;
}
Expand Down Expand Up @@ -524,6 +536,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline
mSendCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL);
mSendDoneCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL);
mSuccessCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL);
mDiscardCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_DISCARD_TOTAL);
mNetworkErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL);
mServerErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL);
mShardWriteQuotaErrorCnt
Expand Down Expand Up @@ -878,6 +891,9 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item)
DealSenderQueueItemAfterSend(item, true);
break;
case OperationOnFail::DISCARD:
if (mDiscardCnt) {
mDiscardCnt->Add(1);
}
default:
LOG_WARNING(sLogger, LOG_PATTERN);
if (!isProfileData) {
Expand Down
1 change: 1 addition & 0 deletions core/plugin/flusher/sls/FlusherSLS.h
Original file line number Diff line number Diff line change
Expand Up @@ -128,6 +128,7 @@ class FlusherSLS : public HttpFlusher {
CounterPtr mSendCnt;
CounterPtr mSendDoneCnt;
CounterPtr mSuccessCnt;
CounterPtr mDiscardCnt;
CounterPtr mNetworkErrorCnt;
CounterPtr mServerErrorCnt;
CounterPtr mShardWriteQuotaErrorCnt;
Expand Down
2 changes: 1 addition & 1 deletion core/unittest/pipeline/PipelineUnittest.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,7 +124,7 @@ void PipelineUnittest::OnSuccessfulInit() const {
APSARA_TEST_EQUAL(QueueKeyManager::GetInstance()->GetKey("test_config-flusher_sls-test_project#test_logstore"),
pipeline->GetContext().GetLogstoreKey());
APSARA_TEST_EQUAL(0, pipeline->mInProcessCnt.load());
APSARA_TEST_EQUAL(2U, pipeline->mMetricsRecordRef->GetLabels()->size());
APSARA_TEST_EQUAL(3U, pipeline->mMetricsRecordRef->GetLabels()->size());
APSARA_TEST_TRUE(pipeline->mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_PIPELINE_NAME, configName));
APSARA_TEST_TRUE(pipeline->mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_PROJECT, "test_project"));

Expand Down
Loading