Skip to content

Commit

Permalink
change metric struct (#1862)
Browse files Browse the repository at this point in the history
  • Loading branch information
Takuka0311 authored Nov 13, 2024
1 parent 9005b10 commit eaf69cb
Show file tree
Hide file tree
Showing 60 changed files with 538 additions and 375 deletions.
4 changes: 2 additions & 2 deletions core/common/compression/Compressor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -24,14 +24,14 @@ namespace logtail {

void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, std::move(labels), std::move(dynamicLabels));
mMetricsRecordRef, MetricCategory::METRIC_CATEGORY_COMPONENT, std::move(labels), std::move(dynamicLabels));
mInItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_ITEMS_TOTAL);
mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_IN_SIZE_BYTES);
mOutItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_ITEMS_TOTAL);
mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_OUT_SIZE_BYTES);
mTotalProcessMs = mMetricsRecordRef.CreateTimeCounter(METRIC_COMPONENT_TOTAL_PROCESS_TIME_MS);
mDiscardedItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_TOTAL);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_ITEMS_SIZE_BYTES);
mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_COMPONENT_DISCARDED_SIZE_BYTES);
}

bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) {
Expand Down
3 changes: 1 addition & 2 deletions core/common/compression/CompressorFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,9 +15,9 @@
#include "common/compression/CompressorFactory.h"

#include "common/ParamExtractor.h"
#include "monitor/metric_constants/MetricConstants.h"
#include "common/compression/LZ4Compressor.h"
#include "common/compression/ZstdCompressor.h"
#include "monitor/metric_constants/MetricConstants.h"

using namespace std;

Expand Down Expand Up @@ -64,7 +64,6 @@ unique_ptr<Compressor> CompressorFactory::Create(const Json::Value& config,
compressor->SetMetricRecordRef({{METRIC_LABEL_KEY_PROJECT, ctx.GetProjectName()},
{METRIC_LABEL_KEY_PIPELINE_NAME, ctx.GetConfigName()},
{METRIC_LABEL_KEY_COMPONENT_NAME, METRIC_LABEL_VALUE_COMPONENT_NAME_COMPRESSOR},
{METRIC_LABEL_KEY_METRIC_CATEGORY, METRIC_LABEL_KEY_METRIC_CATEGORY_COMPONENT},
{METRIC_LABEL_KEY_FLUSHER_PLUGIN_ID, flusherId}});
return compressor;
}
Expand Down
2 changes: 1 addition & 1 deletion core/container_manager/ContainerDiscoveryOptions.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,7 @@ bool ContainerDiscoveryOptions::Init(const Json::Value& config, const PipelineCo

void ContainerDiscoveryOptions::GenerateContainerMetaFetchingGoPipeline(Json::Value& res,
const FileDiscoveryOptions* fileDiscovery,
const PluginInstance::PluginMeta pluginMeta) const {
const PluginInstance::PluginMeta& pluginMeta) const {
Json::Value plugin(Json::objectValue);
Json::Value detail(Json::objectValue);
Json::Value object(Json::objectValue);
Expand Down
2 changes: 1 addition & 1 deletion core/container_manager/ContainerDiscoveryOptions.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ struct ContainerDiscoveryOptions {
bool Init(const Json::Value& config, const PipelineContext& ctx, const std::string& pluginType);
void GenerateContainerMetaFetchingGoPipeline(Json::Value& res,
const FileDiscoveryOptions* fileDiscovery = nullptr,
const PluginInstance::PluginMeta pluginMeta = {"0"}) const;
const PluginInstance::PluginMeta& pluginMeta = {"0"}) const;
};

using ContainerDiscoveryConfig = std::pair<const ContainerDiscoveryOptions*, const PipelineContext*>;
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/SelfMonitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ void NetworkObserverSelfMonitor::HandleStatistic(nami::eBPFStatistics& stats) {

eBPFSelfMonitorMgr::eBPFSelfMonitorMgr() : mSelfMonitors({}), mInited({}) {}

void eBPFSelfMonitorMgr::Init(const nami::PluginType type, std::shared_ptr<PluginMetricManager> mgr, const std::string& name, const std::string& logstore) {
void eBPFSelfMonitorMgr::Init(const nami::PluginType type, PluginMetricManagerPtr mgr, const std::string& name, const std::string& logstore) {
if (mInited[int(type)]) return;

WriteLock lk(mLock);
Expand Down
2 changes: 1 addition & 1 deletion core/ebpf/SelfMonitor.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ class FileSecuritySelfMonitor : public BaseBPFMonitor {
class eBPFSelfMonitorMgr {
public:
eBPFSelfMonitorMgr();
void Init(const nami::PluginType type, std::shared_ptr<PluginMetricManager> mgr, const std::string& name, const std::string& project);
void Init(const nami::PluginType type, PluginMetricManagerPtr mgr, const std::string& name, const std::string& project);
void Release(const nami::PluginType type);
void Suspend(const nami::PluginType type);
void HandleStatistic(std::vector<nami::eBPFStatistics>&& stats);
Expand Down
8 changes: 4 additions & 4 deletions core/ebpf/eBPFServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -152,8 +152,8 @@ void eBPFServer::Init() {
DynamicMetricLabels dynamicLabels;
dynamicLabels.emplace_back(METRIC_LABEL_KEY_PROJECT, [this]() -> std::string { return this->GetAllProjects(); });
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mRef,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_EBPF_SERVER},
{METRIC_LABEL_KEY_METRIC_CATEGORY, METRIC_LABEL_KEY_METRIC_CATEGORY_RUNNER}},
MetricCategory::METRIC_CATEGORY_RUNNER,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_EBPF_SERVER}},
std::move(dynamicLabels));

mStartPluginTotal = mRef.CreateCounter(METRIC_RUNNER_EBPF_START_PLUGIN_TOTAL);
Expand Down Expand Up @@ -202,7 +202,7 @@ void eBPFServer::Stop() {
bool eBPFServer::StartPluginInternal(const std::string& pipeline_name, uint32_t plugin_index,
nami::PluginType type,
const logtail::PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, std::shared_ptr<PluginMetricManager> mgr) {
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, PluginMetricManagerPtr mgr) {

std::string prev_pipeline_name = CheckLoadedPipelineName(type);
if (prev_pipeline_name.size() && prev_pipeline_name != pipeline_name) {
Expand Down Expand Up @@ -314,7 +314,7 @@ bool eBPFServer::HasRegisteredPlugins() const {
bool eBPFServer::EnablePlugin(const std::string& pipeline_name, uint32_t plugin_index,
nami::PluginType type,
const PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, std::shared_ptr<PluginMetricManager> mgr) {
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, PluginMetricManagerPtr mgr) {
if (!IsSupportedEnv(type)) {
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions core/ebpf/eBPFServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ class eBPFServer : public InputRunner {
bool EnablePlugin(const std::string& pipeline_name, uint32_t plugin_index,
nami::PluginType type,
const logtail::PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, std::shared_ptr<PluginMetricManager> mgr);
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, PluginMetricManagerPtr mgr);

bool DisablePlugin(const std::string& pipeline_name, nami::PluginType type);

Expand All @@ -88,7 +88,7 @@ class eBPFServer : public InputRunner {
bool StartPluginInternal(const std::string& pipeline_name, uint32_t plugin_index,
nami::PluginType type,
const logtail::PipelineContext* ctx,
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, std::shared_ptr<PluginMetricManager> mgr);
const std::variant<SecurityOptions*, nami::ObserverNetworkOption*> options, PluginMetricManagerPtr mgr);
eBPFServer() = default;
~eBPFServer() = default;

Expand Down
4 changes: 2 additions & 2 deletions core/file_server/FileServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ namespace logtail {
FileServer::FileServer() {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FILE_SERVER},
{METRIC_LABEL_KEY_METRIC_CATEGORY, METRIC_LABEL_KEY_METRIC_CATEGORY_RUNNER}});
MetricCategory::METRIC_CATEGORY_RUNNER,
{{METRIC_LABEL_KEY_RUNNER_NAME, METRIC_LABEL_VALUE_RUNNER_NAME_FILE_SERVER}});
}

// 启动文件服务,包括加载配置、处理检查点、注册事件等
Expand Down
138 changes: 87 additions & 51 deletions core/monitor/LogtailMetric.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,11 +25,19 @@ using namespace sls_logs;

namespace logtail {

const std::string LABEL_PREFIX = "label.";
const std::string VALUE_PREFIX = "value.";
const std::string METRIC_KEY_LABEL = "label";
const std::string METRIC_KEY_VALUE = "value";
const std::string METRIC_KEY_CATEGORY = "category";
const std::string MetricCategory::METRIC_CATEGORY_UNKNOWN = "unknown";
const std::string MetricCategory::METRIC_CATEGORY_AGENT = "agent";
const std::string MetricCategory::METRIC_CATEGORY_RUNNER = "runner";
const std::string MetricCategory::METRIC_CATEGORY_PIPELINE = "pipeline";
const std::string MetricCategory::METRIC_CATEGORY_COMPONENT = "component";
const std::string MetricCategory::METRIC_CATEGORY_PLUGIN = "plugin";
const std::string MetricCategory::METRIC_CATEGORY_PLUGIN_SOURCE = "plugin_source";

MetricsRecord::MetricsRecord(MetricLabelsPtr labels, DynamicMetricLabelsPtr dynamicLabels)
: mLabels(labels), mDynamicLabels(dynamicLabels), mDeleted(false) {
MetricsRecord::MetricsRecord(const std::string& category, MetricLabelsPtr labels, DynamicMetricLabelsPtr dynamicLabels)
: mCategory(category), mLabels(labels), mDynamicLabels(dynamicLabels), mDeleted(false) {
}

CounterPtr MetricsRecord::CreateCounter(const std::string& name) {
Expand Down Expand Up @@ -64,6 +72,10 @@ bool MetricsRecord::IsDeleted() const {
return mDeleted;
}

const std::string& MetricsRecord::GetCategory() const {
return mCategory;
}

const MetricLabelsPtr& MetricsRecord::GetLabels() const {
return mLabels;
}
Expand All @@ -89,7 +101,7 @@ const std::vector<DoubleGaugePtr>& MetricsRecord::GetDoubleGauges() const {
}

MetricsRecord* MetricsRecord::Collect() {
MetricsRecord* metrics = new MetricsRecord(mLabels, mDynamicLabels);
MetricsRecord* metrics = new MetricsRecord(mCategory, mLabels, mDynamicLabels);
for (auto& item : mCounters) {
CounterPtr newPtr(item->Collect());
metrics->mCounters.emplace_back(newPtr);
Expand Down Expand Up @@ -127,6 +139,10 @@ void MetricsRecordRef::SetMetricsRecord(MetricsRecord* metricRecord) {
mMetrics = metricRecord;
}

const std::string& MetricsRecordRef::GetCategory() const {
return mMetrics->GetCategory();
}

const MetricLabelsPtr& MetricsRecordRef::GetLabels() const {
return mMetrics->GetLabels();
}
Expand Down Expand Up @@ -171,8 +187,12 @@ bool MetricsRecordRef::HasLabel(const std::string& key, const std::string& value
#endif

// ReentrantMetricsRecord相关操作可以无锁,因为mCounters、mGauges只在初始化时会添加内容,后续只允许Get操作
void ReentrantMetricsRecord::Init(MetricLabels& labels, std::unordered_map<std::string, MetricType>& metricKeys) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels));
void ReentrantMetricsRecord::Init(const std::string& category,
MetricLabels& labels,
DynamicMetricLabels& dynamicLabels,
std::unordered_map<std::string, MetricType>& metricKeys) {
WriteMetrics::GetInstance()->PrepareMetricsRecordRef(
mMetricsRecordRef, category, std::move(labels), std::move(dynamicLabels));
for (auto metric : metricKeys) {
switch (metric.second) {
case MetricType::METRIC_TYPE_COUNTER:
Expand Down Expand Up @@ -237,17 +257,19 @@ WriteMetrics::~WriteMetrics() {
}

void WriteMetrics::PrepareMetricsRecordRef(MetricsRecordRef& ref,
const std::string& category,
MetricLabels&& labels,
DynamicMetricLabels&& dynamicLabels) {
CreateMetricsRecordRef(ref, std::move(labels), std::move(dynamicLabels));
CreateMetricsRecordRef(ref, category, std::move(labels), std::move(dynamicLabels));
CommitMetricsRecordRef(ref);
}

void WriteMetrics::CreateMetricsRecordRef(MetricsRecordRef& ref,
const std::string& category,
MetricLabels&& labels,
DynamicMetricLabels&& dynamicLabels) {
MetricsRecord* cur = new MetricsRecord(std::make_shared<MetricLabels>(labels),
std::make_shared<DynamicMetricLabels>(dynamicLabels));
MetricsRecord* cur = new MetricsRecord(
category, std::make_shared<MetricLabels>(labels), std::make_shared<DynamicMetricLabels>(dynamicLabels));
ref.SetMetricsRecord(cur);
}

Expand Down Expand Up @@ -395,42 +417,53 @@ void ReadMetrics::ReadAsLogGroup(const std::string& regionFieldName,
auto now = GetCurrentLogtailTime();
SetLogTime(logPtr,
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec);
for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(LABEL_PREFIX + pair.first);
contentPtr->set_value(pair.second);
}
for (auto item = tmp->GetDynamicLabels()->begin(); item != tmp->GetDynamicLabels()->end(); ++item) {
std::pair<std::string, std::function<std::string()>> pair = *item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(LABEL_PREFIX + pair.first);
contentPtr->set_value(pair.second());
}

for (auto& item : tmp->GetCounters()) {
CounterPtr counter = item;
{ // category
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + counter->GetName());
contentPtr->set_value(ToString(counter->GetValue()));
contentPtr->set_key(METRIC_KEY_CATEGORY);
contentPtr->set_value(tmp->GetCategory());
}
for (auto& item : tmp->GetTimeCounters()) {
TimeCounterPtr counter = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + counter->GetName());
contentPtr->set_value(ToString(counter->GetValue()));
}
for (auto& item : tmp->GetIntGauges()) {
IntGaugePtr gauge = item;
{ // label
Json::Value metricsRecordLabel;
for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
metricsRecordLabel[pair.first] = pair.second;
}
for (auto item = tmp->GetDynamicLabels()->begin(); item != tmp->GetDynamicLabels()->end(); ++item) {
std::pair<std::string, std::function<std::string()>> pair = *item;
metricsRecordLabel[pair.first] = pair.second();
}
Json::StreamWriterBuilder writer;
writer["indentation"] = "";
std::string jsonString = Json::writeString(writer, metricsRecordLabel);
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
contentPtr->set_key(METRIC_KEY_LABEL);
contentPtr->set_value(jsonString);
}
for (auto& item : tmp->GetDoubleGauges()) {
DoubleGaugePtr gauge = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(VALUE_PREFIX + gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
{ // value
for (auto& item : tmp->GetCounters()) {
CounterPtr counter = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(counter->GetName());
contentPtr->set_value(ToString(counter->GetValue()));
}
for (auto& item : tmp->GetTimeCounters()) {
TimeCounterPtr counter = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(counter->GetName());
contentPtr->set_value(ToString(counter->GetValue()));
}
for (auto& item : tmp->GetIntGauges()) {
IntGaugePtr gauge = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
}
for (auto& item : tmp->GetDoubleGauges()) {
DoubleGaugePtr gauge = item;
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key(gauge->GetName());
contentPtr->set_value(ToString(gauge->GetValue()));
}
}
tmp = tmp->GetNext();
}
Expand All @@ -443,40 +476,43 @@ void ReadMetrics::ReadAsFileBuffer(std::string& metricsContent) const {

MetricsRecord* tmp = mHead;
while (tmp) {
Json::Value metricsRecordValue;
Json::Value metricsRecordJson, metricsRecordLabel;
auto now = GetCurrentLogtailTime();
metricsRecordValue["time"]
metricsRecordJson["time"]
= AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() : now.tv_sec;

metricsRecordJson[METRIC_KEY_CATEGORY] = tmp->GetCategory();

for (auto item = tmp->GetLabels()->begin(); item != tmp->GetLabels()->end(); ++item) {
std::pair<std::string, std::string> pair = *item;
metricsRecordValue[LABEL_PREFIX + pair.first] = pair.second;
metricsRecordLabel[pair.first] = pair.second;
}
for (auto item = tmp->GetDynamicLabels()->begin(); item != tmp->GetDynamicLabels()->end(); ++item) {
std::pair<std::string, std::function<std::string()>> pair = *item;
metricsRecordValue[LABEL_PREFIX + pair.first] = pair.second();
metricsRecordLabel[pair.first] = pair.second();
}
metricsRecordJson[METRIC_KEY_LABEL] = metricsRecordLabel;

for (auto& item : tmp->GetCounters()) {
CounterPtr counter = item;
metricsRecordValue[VALUE_PREFIX + counter->GetName()] = ToString(counter->GetValue());
metricsRecordJson[counter->GetName()] = ToString(counter->GetValue());
}
for (auto& item : tmp->GetTimeCounters()) {
TimeCounterPtr counter = item;
metricsRecordValue[VALUE_PREFIX + counter->GetName()] = ToString(counter->GetValue());
metricsRecordJson[counter->GetName()] = ToString(counter->GetValue());
}
for (auto& item : tmp->GetIntGauges()) {
IntGaugePtr gauge = item;
metricsRecordValue[VALUE_PREFIX + gauge->GetName()] = ToString(gauge->GetValue());
metricsRecordJson[gauge->GetName()] = ToString(gauge->GetValue());
}
for (auto& item : tmp->GetDoubleGauges()) {
DoubleGaugePtr gauge = item;
metricsRecordValue[VALUE_PREFIX + gauge->GetName()] = ToString(gauge->GetValue());
metricsRecordJson[gauge->GetName()] = ToString(gauge->GetValue());
}

Json::StreamWriterBuilder writer;
writer["indentation"] = "";
std::string jsonString = Json::writeString(writer, metricsRecordValue);
std::string jsonString = Json::writeString(writer, metricsRecordJson);
oss << jsonString << '\n';

tmp = tmp->GetNext();
Expand Down
Loading

0 comments on commit eaf69cb

Please sign in to comment.