Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

change metric struct #1862

Merged
merged 9 commits into from
Nov 13, 2024
Merged
Show file tree
Hide file tree
Changes from 6 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/common/compression/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ namespace logtail {

void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, std::move(labels), std::move(dynamicLabels));
mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_COMPONENT, std::move(labels), std::move(dynamicLabels));
mInItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_TOTAL);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_SIZE_BYTES);
mOutItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_TOTAL);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_SIZE_BYTES);
mTotalProcessMs = mMetricsRecordRef.CreateTimeCounter(METRIC_COMPONENT_TOTAL_PROCESS_TIME_MS);
mDiscardedItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_TOTAL);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_SIZE_BYTES);
}

bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) {
Expand Down
3 changes: 1 addition & 2 deletions core/common/compression/CompressorFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#include "common/compression/CompressorFactory.h"

#include "common/ParamExtractor.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "common/compression/LZ4Compressor.h"
#include "common/compression/ZstdCompressor.h"
#include "monitor/metric_constants/MetricConstants.h"

using namespace std;

Expand Down Expand Up @@ -64,7 +64,6 @@ unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
compressor->SetMetricRecordRef({{METRIC_LABEL_KEY_PROJECT, ctx.GetProjectName()},
{METRIC_LABEL_KEY_PIPELINE_NAME, ctx.GetConfigName()},
{METRIC_LABEL_KEY_COMPONENT_NAME, METRIC_LABEL_VALUE_COMPONENT_NAME_COMPRESSOR},
{METRIC_LABEL_KEY_METRIC_CATEGORY, METRIC_LABEL_KEY_METRIC_CATEGORY_COMPONENT},
{METRIC_LABEL_KEY_FLUSHER_PLUGIN_ID, flusherId}});
return compressor;
}
Expand Down
2 changes: 1 addition & 1 deletion core/container_manager/ContainerDiscoveryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ bool ContainerDiscoveryOptions::Init(const Json::Value& config, const PipelineCo

void ContainerDiscoveryOptions::GenerateContainerMetaFetchingGoPipeline(Json::Value& res,
const FileDiscoveryOptions* fileDiscovery,
const PluginInstance::PluginMeta pluginMeta) const {
const PluginInstance::PluginMeta& pluginMeta) const {
Json::Value plugin(Json::objectValue);
Json::Value detail(Json::objectValue);
Json::Value object(Json::objectValue);
Expand Down
2 changes: 1 addition & 1 deletion core/container_manager/ContainerDiscoveryOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct ContainerDiscoveryOptions {
bool Init(const Json::Value& config, const PipelineContext& ctx, const std::string& pluginType);
void GenerateContainerMetaFetchingGoPipeline(Json::Value& res,
const FileDiscoveryOptions* fileDiscovery = nullptr,
const PluginInstance::PluginMeta pluginMeta = {"0"}) const;
const PluginInstance::PluginMeta& pluginMeta = {"0"}) const;
};

using ContainerDiscoveryConfig = std::pair<const ContainerDiscoveryOptions*, const PipelineContext*>;
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/SelfMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ void NetworkObserverSelfMonitor::HandleStatistic(nami::eBPFStatistics& stats) {

eBPFSelfMonitorMgr::eBPFSelfMonitorMgr() : mSelfMonitors({}), mInited({}) {}

void eBPFSelfMonitorMgr::Init(const nami::PluginType type, std::shared_ptr<PluginMetricManager> mgr, const std::string& name, const std::string& logstore) {
void eBPFSelfMonitorMgr::Init(const nami::PluginType type, PluginMetricManagerPtr mgr, const std::string& name, const std::string& logstore) {
if (mInited[int(type)]) return;

WriteLock lk(mLock);
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/SelfMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class FileSecuritySelfMonitor : public BaseBPFMonitor {
class eBPFSelfMonitorMgr {
public:
eBPFSelfMonitorMgr();
void Init(const nami::PluginType type, std::shared_ptr<PluginMetricManager> mgr, const std::string& name, const std::string& project);
void Init(const nami::PluginType type, PluginMetricManagerPtr mgr, const std::string& name, const std::string& project);
void Release(const nami::PluginType type);
void Suspend(const nami::PluginType type);
void HandleStatistic(std::vector<nami::eBPFStatistics>&& stats);
Expand Down
8 changes: 4 additions & 4 deletions core/ebpf/eBPFServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ void eBPFServer::Init() {
DynamicMetricLabels dynamicLabels;
dynamicLabels.emplace_back(METRIC_LABEL_KEY_PROJECT, [this]() -> std::string { return this->GetAllProjects(); });
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mRef,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_EBPF_SERVER},
{METRIC_LABEL_KEY_METRIC_CATEGORY, METRIC_LABEL_KEY_METRIC_CATEGORY_RUNNER}},
MetricCategory::METRIC_CATEGORY_RUNNER,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_EBPF_SERVER}},
std::move(dynamicLabels));

mStartPluginTotal = mRef.CreateCounter(METRIC_RUNNER_EBPF_START_PLUGIN_TOTAL);
Expand Down Expand Up @@ -202,7 +202,7 @@ void eBPFServer::Stop() {
bool eBPFServer::StartPluginInternal(const std::string& pipeline_name, uint32_t plugin_index,
nami::PluginType type,
const logtail::PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, std::shared_ptr<PluginMetricManager> mgr) {
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, PluginMetricManagerPtr mgr) {

std::string prev_pipeline_name = CheckLoadedPipelineName(type);
if (prev_pipeline_name.size() && prev_pipeline_name != pipeline_name) {
Expand Down Expand Up @@ -314,7 +314,7 @@ bool eBPFServer::HasRegisteredPlugins() const {
bool eBPFServer::EnablePlugin(const std::string& pipeline_name, uint32_t plugin_index,
nami::PluginType type,
const PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, std::shared_ptr<PluginMetricManager> mgr) {
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, PluginMetricManagerPtr mgr) {
if (!IsSupportedEnv(type)) {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions core/ebpf/eBPFServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class eBPFServer : public InputRunner {
bool EnablePlugin(const std::string& pipeline_name, uint32_t plugin_index,
nami::PluginType type,
const logtail::PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, std::shared_ptr<PluginMetricManager> mgr);
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, PluginMetricManagerPtr mgr);

bool DisablePlugin(const std::string& pipeline_name, nami::PluginType type);

Expand All @@ -88,7 +88,7 @@ class eBPFServer : public InputRunner {
bool StartPluginInternal(const std::string& pipeline_name, uint32_t plugin_index,
nami::PluginType type,
const logtail::PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, std::shared_ptr<PluginMetricManager> mgr);
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, PluginMetricManagerPtr mgr);
eBPFServer() = default;
~eBPFServer() = default;

Expand Down
4 changes: 2 additions & 2 deletions core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ namespace logtail {
FileServer::FileServer() {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FILE_SERVER},
{METRIC_LABEL_KEY_METRIC_CATEGORY, METRIC_LABEL_KEY_METRIC_CATEGORY_RUNNER}});
MetricCategory::METRIC_CATEGORY_RUNNER,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FILE_SERVER}});
}

// 启动文件服务,包括加载配置、处理检查点、注册事件等
Expand Down
138 changes: 87 additions & 51 deletions core/monitor/LogtailMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ using namespace sls_logs;

namespace logtail {

const std::string LABEL_PREFIX = "label.";
const std::string VALUE_PREFIX = "value.";
const std::string METRIC_KEY_LABEL = "label";
const std::string METRIC_KEY_VALUE = "value";
const std::string METRIC_KEY_CATEGORY = "category";
const std::string MetricCategory::METRIC_CATEGORY_UNKNOWN = "unknown";
const std::string MetricCategory::METRIC_CATEGORY_AGENT = "agent";
const std::string MetricCategory::METRIC_CATEGORY_RUNNER = "runner";
const std::string MetricCategory::METRIC_CATEGORY_PIPELINE = "pipeline";
const std::string MetricCategory::METRIC_CATEGORY_COMPONENT = "component";
const std::string MetricCategory::METRIC_CATEGORY_PLUGIN = "plugin";
const std::string MetricCategory::METRIC_CATEGORY_PLUGIN_SOURCE = "plugin_source";

MetricsRecord::MetricsRecord(MetricLabelsPtr labels, DynamicMetricLabelsPtr dynamicLabels)
: mLabels(labels), mDynamicLabels(dynamicLabels), mDeleted(false) {
MetricsRecord::MetricsRecord(const std::string& category, MetricLabelsPtr labels, DynamicMetricLabelsPtr dynamicLabels)
: mCategory(category), mLabels(labels), mDynamicLabels(dynamicLabels), mDeleted(false) {
}

CounterPtr MetricsRecord::CreateCounter(const std::string& name) {
Expand Down Expand Up @@ -64,6 +72,10 @@ bool MetricsRecord::IsDeleted() const {
return mDeleted;
}

const std::string& MetricsRecord::GetCategory() const {
return mCategory;
}

const MetricLabelsPtr& MetricsRecord::GetLabels() const {
return mLabels;
}
Expand All @@ -89,7 +101,7 @@ const std::vector<DoubleGaugePtr>& MetricsRecord::GetDoubleGauges() const {
}

MetricsRecord* MetricsRecord::Collect() {
MetricsRecord* metrics = new MetricsRecord(mLabels, mDynamicLabels);
MetricsRecord* metrics = new MetricsRecord(mCategory, mLabels, mDynamicLabels);
for (auto& item : mCounters) {
CounterPtr newPtr(item->Collect());
metrics->mCounters.emplace_back(newPtr);
Expand Down Expand Up @@ -127,6 +139,10 @@ void MetricsRecordRef::SetMetricsRecord(MetricsRecord* metricRecord) {
mMetrics = metricRecord;
}

const std::string& MetricsRecordRef::GetCategory() const {
return mMetrics->GetCategory();
}

const MetricLabelsPtr& MetricsRecordRef::GetLabels() const {
return mMetrics->GetLabels();
}
Expand Down Expand Up @@ -171,8 +187,12 @@ bool MetricsRecordRef::HasLabel(const std::string& key, const std::string& value
#endif

// ReentrantMetricsRecord相关操作可以无锁,因为mCounters、mGauges只在初始化时会添加内容,后续只允许Get操作
void ReentrantMetricsRecord::Init(MetricLabels& labels, std::unordered_map<std::string, MetricType>& metricKeys) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels));
void ReentrantMetricsRecord::Init(const std::string& category,
MetricLabels& labels,
DynamicMetricLabels& dynamicLabels,
std::unordered_map<std::string, MetricType>& metricKeys) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, category, std::move(labels), std::move(dynamicLabels));
for (auto metric : metricKeys) {
switch (metric.second) {
case MetricType::METRIC_TYPE_COUNTER:
Expand Down Expand Up @@ -237,17 +257,19 @@ WriteMetrics::~WriteMetrics() {
}

void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref,
const std::string& category,
MetricLabels&& labels,
DynamicMetricLabels&& dynamicLabels) {
CreateMetricsRecordRef(ref, std::move(labels), std::move(dynamicLabels));
CreateMetricsRecordRef(ref, category, std::move(labels), std::move(dynamicLabels));
CommitMetricsRecordRef(ref);
}

void WriteMetrics::CreateMetricsRecordRef(MetricsRecordRef& ref,
const std::string& category,
MetricLabels&& labels,
DynamicMetricLabels&& dynamicLabels) {
MetricsRecord* cur = new MetricsRecord(std::make_shared<MetricLabels>(labels),
std::make_shared<DynamicMetricLabels>(dynamicLabels));
MetricsRecord* cur = new MetricsRecord(
category, std::make_shared<MetricLabels>(labels), std::make_shared<DynamicMetricLabels>(dynamicLabels));
ref.SetMetricsRecord(cur);
}

Expand Down Expand Up @@ -395,42 +417,53 @@ void ReadMetrics::ReadAsLogGroup(const std::string& regionFieldName,
auto now = GetCurrentLogtailTime();
SetLogTime(logPtr,
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec);
for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(LABEL_PREFIX + pair.first);
contentPtr->set_value(pair.second);
}
for (auto item = tmp->GetDynamicLabels()->begin(); item != tmp->GetDynamicLabels()->end(); ++item) {
std::pair<std::string, std::function<std::string()>> pair = *item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(LABEL_PREFIX + pair.first);
contentPtr->set_value(pair.second());
}

for (auto& item : tmp->GetCounters()) {
CounterPtr counter = item;
{ // category
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + counter->GetName());
contentPtr->set_value(ToString(counter->GetValue()));
contentPtr->set_key(METRIC_KEY_CATEGORY);
contentPtr->set_value(tmp->GetCategory());
}
for (auto& item : tmp->GetTimeCounters()) {
TimeCounterPtr counter = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + counter->GetName());
contentPtr->set_value(ToString(counter->GetValue()));
}
for (auto& item : tmp->GetIntGauges()) {
IntGaugePtr gauge = item;
{ // label
Json::Value metricsRecordLabel;
for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
metricsRecordLabel[pair.first] = pair.second;
}
for (auto item = tmp->GetDynamicLabels()->begin(); item != tmp->GetDynamicLabels()->end(); ++item) {
std::pair<std::string, std::function<std::string()>> pair = *item;
metricsRecordLabel[pair.first] = pair.second();
}
Json::StreamWriterBuilder writer;
writer["indentation"] = "";
std::string jsonString = Json::writeString(writer, metricsRecordLabel);
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
contentPtr->set_key(METRIC_KEY_LABEL);
contentPtr->set_value(jsonString);
}
for (auto& item : tmp->GetDoubleGauges()) {
DoubleGaugePtr gauge = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
{ // value
for (auto& item : tmp->GetCounters()) {
CounterPtr counter = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(counter->GetName());
contentPtr->set_value(ToString(counter->GetValue()));
}
for (auto& item : tmp->GetTimeCounters()) {
TimeCounterPtr counter = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(counter->GetName());
contentPtr->set_value(ToString(counter->GetValue()));
}
for (auto& item : tmp->GetIntGauges()) {
IntGaugePtr gauge = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
}
for (auto& item : tmp->GetDoubleGauges()) {
DoubleGaugePtr gauge = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
}
}
tmp = tmp->GetNext();
}
Expand All @@ -443,40 +476,43 @@ void ReadMetrics::ReadAsFileBuffer(std::string& metricsContent) const {

MetricsRecord* tmp = mHead;
while (tmp) {
Json::Value metricsRecordValue;
Json::Value metricsRecordJson, metricsRecordLabel;
auto now = GetCurrentLogtailTime();
metricsRecordValue["time"]
metricsRecordJson["time"]
= AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec;

metricsRecordJson[METRIC_KEY_CATEGORY] = tmp->GetCategory();

for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
metricsRecordValue[LABEL_PREFIX + pair.first] = pair.second;
metricsRecordLabel[pair.first] = pair.second;
}
for (auto item = tmp->GetDynamicLabels()->begin(); item != tmp->GetDynamicLabels()->end(); ++item) {
std::pair<std::string, std::function<std::string()>> pair = *item;
metricsRecordValue[LABEL_PREFIX + pair.first] = pair.second();
metricsRecordLabel[pair.first] = pair.second();
}
metricsRecordJson[METRIC_KEY_LABEL] = metricsRecordLabel;

for (auto& item : tmp->GetCounters()) {
CounterPtr counter = item;
metricsRecordValue[VALUE_PREFIX + counter->GetName()] = ToString(counter->GetValue());
metricsRecordJson[counter->GetName()] = ToString(counter->GetValue());
}
for (auto& item : tmp->GetTimeCounters()) {
TimeCounterPtr counter = item;
metricsRecordValue[VALUE_PREFIX + counter->GetName()] = ToString(counter->GetValue());
metricsRecordJson[counter->GetName()] = ToString(counter->GetValue());
}
for (auto& item : tmp->GetIntGauges()) {
IntGaugePtr gauge = item;
metricsRecordValue[VALUE_PREFIX + gauge->GetName()] = ToString(gauge->GetValue());
metricsRecordJson[gauge->GetName()] = ToString(gauge->GetValue());
}
for (auto& item : tmp->GetDoubleGauges()) {
DoubleGaugePtr gauge = item;
metricsRecordValue[VALUE_PREFIX + gauge->GetName()] = ToString(gauge->GetValue());
metricsRecordJson[gauge->GetName()] = ToString(gauge->GetValue());
}

Json::StreamWriterBuilder writer;
writer["indentation"] = "";
std::string jsonString = Json::writeString(writer, metricsRecordValue);
std::string jsonString = Json::writeString(writer, metricsRecordJson);
oss << jsonString << '\n';

tmp = tmp->GetNext();
Expand Down
Loading
Loading