Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: change metric event map tags to vector tags #2008

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions core/models/MetricEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@

#include "models/MetricEvent.h"

#include <algorithm>

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -48,15 +50,16 @@ void MetricEvent::SetNameNoCopy(StringView name) {
}

StringView MetricEvent::GetTag(StringView key) const {
auto it = mTags.mInner.find(key);
auto it = std::find_if(mTags.mInner.begin(), mTags.mInner.end(), [&key](const auto& p) { return p.first == key; });
if (it != mTags.mInner.end()) {
return it->second;
}
return gEmptyStringView;
}

bool MetricEvent::HasTag(StringView key) const {
return mTags.mInner.find(key) != mTags.mInner.end();
auto it = std::find_if(mTags.mInner.begin(), mTags.mInner.end(), [&key](const auto& p) { return p.first == key; });
return it != mTags.mInner.end();
}

void MetricEvent::SetTag(StringView key, StringView val) {
Expand Down
20 changes: 15 additions & 5 deletions core/models/MetricEvent.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,14 @@

#pragma once

#include <cstddef>

#include <map>
#include <string>
#include <utility>
#include <vector>

#include "StringView.h"
#include "common/memory/SourceBuffer.h"
#include "models/MetricValue.h"
#include "models/PipelineEvent.h"
Expand Down Expand Up @@ -71,20 +76,25 @@ class MetricEvent : public PipelineEvent {
mValue = UntypedMultiDoubleValues{multiDoubleValues.mValues, this};
}

StringView GetTag(StringView key) const;
bool HasTag(StringView key) const;
[[nodiscard]] StringView GetTag(StringView key) const;
[[nodiscard]] bool HasTag(StringView key) const;
void SetTag(StringView key, StringView val);
void SetTag(const std::string& key, const std::string& val);
void SetTagNoCopy(const StringBuffer& key, const StringBuffer& val);
void SetTagNoCopy(StringView key, StringView val);

void DelTag(StringView key);
void EraseIf(const std::function<bool(std::pair<StringView, StringView>)>& condition) { mTags.EraseIf(condition); }
void SortTags() { std::sort(mTags.mInner.begin(), mTags.mInner.end()); };

std::vector<std::pair<StringView, StringView>>::const_iterator TagsBegin() const { return mTags.mInner.begin(); }
std::vector<std::pair<StringView, StringView>>::const_iterator TagsEnd() const { return mTags.mInner.end(); }

std::map<StringView, StringView>::const_iterator TagsBegin() const { return mTags.mInner.begin(); }
std::map<StringView, StringView>::const_iterator TagsEnd() const { return mTags.mInner.end(); }
size_t TagsSize() const { return mTags.mInner.size(); }

size_t DataSize() const override;


#ifdef APSARA_UNIT_TEST_MAIN
Json::Value ToJson(bool enableEventMeta = false) const override;
bool FromJson(const Json::Value&) override;
Expand All @@ -95,7 +105,7 @@ class MetricEvent : public PipelineEvent {

StringView mName;
MetricValue mValue;
SizedMap mTags;
SizedVectorTags mTags;
};

} // namespace logtail
47 changes: 47 additions & 0 deletions core/models/SizedContainer.h
Original file line number Diff line number Diff line change
Expand Up @@ -59,4 +59,51 @@ class SizedMap {
size_t mAllocatedSize = 0;
};

class SizedVectorTags {
Copy link
Collaborator

@messixukejia messixukejia Jan 12, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

基础的数据结构是否有足够用例保证?特别是边界测试

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

其他意见详见待办意见

public:
void Insert(StringView key, StringView val) {
auto iter = std::find_if(mInner.begin(), mInner.end(), [key](const auto& item) { return item.first == key; });
if (iter != mInner.end()) {
mAllocatedSize += val.size() - iter->second.size();
iter->second = val;
} else {
mAllocatedSize += key.size() + val.size();
mInner.emplace_back(key, val);
}
}

void Erase(StringView key) {
auto iter = std::find_if(mInner.begin(), mInner.end(), [key](const auto& item) { return item.first == key; });
if (iter != mInner.end()) {
mAllocatedSize -= (iter->first.size() + iter->second.size());
mInner.erase(iter);
}
}

void EraseIf(const std::function<bool(std::pair<StringView, StringView>)>& condition) {
size_t index = 0;
mAllocatedSize = 0;
for (const auto& item : mInner) {
if (condition(item)) {
continue;
}
mInner[index++] = item;
mAllocatedSize += item.first.size() + item.second.size();
}
mInner.resize(index);
}

size_t DataSize() const { return sizeof(decltype(mInner)) + mAllocatedSize; }

void Clear() {
mInner.clear();
mAllocatedSize = 0;
}

std::vector<std::pair<StringView, StringView>> mInner;

private:
size_t mAllocatedSize = 0;
};

} // namespace logtail
3 changes: 2 additions & 1 deletion core/pipeline/serializer/SLSSerializer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -235,12 +235,13 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri
break;
case PipelineEvent::Type::METRIC:
for (size_t i = 0; i < group.mEvents.size(); ++i) {
const auto& e = group.mEvents[i].Cast<MetricEvent>();
auto& e = group.mEvents[i].Cast<MetricEvent>();
if (e.Is<std::monostate>()) {
continue;
}
serializer.StartToAddLog(logSZ[i]);
serializer.AddLogTime(e.GetTimestamp());
e.SortTags();
serializer.AddLogContentMetricLabel(e, metricEventContentCache[i].second);
serializer.AddLogContentMetricTimeNano(e);
serializer.AddLogContent(METRIC_RESERVED_KEY_VALUE, metricEventContentCache[i].first);
Expand Down
93 changes: 33 additions & 60 deletions core/plugin/processor/inner/ProcessorPromRelabelMetricNative.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@

#include "json/json.h"

#include "common/Flags.h"
#include "StringView.h"
#include "common/StringTools.h"
#include "models/MetricEvent.h"
#include "models/PipelineEventGroup.h"
Expand All @@ -28,8 +28,6 @@

using namespace std;

DECLARE_FLAG_STRING(_pod_name_);

namespace logtail {

const string ProcessorPromRelabelMetricNative::sName = "processor_prom_relabel_metric_native";
Expand All @@ -42,21 +40,18 @@ bool ProcessorPromRelabelMetricNative::Init(const Json::Value& config) {
return false;
}

mLoongCollectorScraper = STRING_FLAG(_pod_name_);

return true;
}

void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& metricGroup) {
// if mMetricRelabelConfigs is empty and honor_labels is true, skip it
auto targetTags = metricGroup.GetTags();
auto toDelete = GetToDeleteTargetLabels(targetTags);

if (!mScrapeConfigPtr->mMetricRelabelConfigs.Empty() || !targetTags.empty()) {
EventsContainer& events = metricGroup.MutableEvents();
size_t wIdx = 0;
for (size_t rIdx = 0; rIdx < events.size(); ++rIdx) {
if (ProcessEvent(events[rIdx], targetTags, toDelete)) {
if (ProcessEvent(events[rIdx], targetTags)) {
if (wIdx != rIdx) {
events[wIdx] = std::move(events[rIdx]);
}
Expand All @@ -66,18 +61,12 @@ void ProcessorPromRelabelMetricNative::Process(PipelineEventGroup& metricGroup)
events.resize(wIdx);
}

// delete mTags when key starts with __
for (const auto& k : toDelete) {
metricGroup.DelTag(k);
}

if (metricGroup.HasMetadata(EventGroupMetaKey::PROMETHEUS_STREAM_TOTAL)) {
auto autoMetric = prom::AutoMetric();
UpdateAutoMetrics(metricGroup, autoMetric);
AddAutoMetrics(metricGroup, autoMetric);
}


// delete all tags
for (const auto& [k, v] : targetTags) {
metricGroup.DelTag(k);
Expand All @@ -88,75 +77,59 @@ bool ProcessorPromRelabelMetricNative::IsSupportedEvent(const PipelineEventPtr&
return e.Is<MetricEvent>();
}

bool ProcessorPromRelabelMetricNative::ProcessEvent(PipelineEventPtr& e,
const GroupTags& targetTags,
const vector<StringView>& toDelete) {
bool ProcessorPromRelabelMetricNative::ProcessEvent(PipelineEventPtr& e, const GroupTags& targetTags) {
if (!IsSupportedEvent(e)) {
return false;
}
auto& sourceEvent = e.Cast<MetricEvent>();

for (const auto& [k, v] : targetTags) {
if (sourceEvent.HasTag(k)) {
if (!mScrapeConfigPtr->mHonorLabels) {
// metric event labels is secondary
// if confiliction, then rename it exported_<label_name>
auto key = prometheus::EXPORTED_PREFIX + k.to_string();
auto b = sourceEvent.GetSourceBuffer()->CopyString(key);
sourceEvent.SetTagNoCopy(StringView(b.data, b.size), sourceEvent.GetTag(k));
sourceEvent.SetTagNoCopy(k, v);
}
} else {
sourceEvent.SetTagNoCopy(k, v);
}
AppendLabel(k, v, sourceEvent, mScrapeConfigPtr->mHonorLabels);
}

vector<string> toDeleteInRelabel;
if (!mScrapeConfigPtr->mMetricRelabelConfigs.Empty()
&& !mScrapeConfigPtr->mMetricRelabelConfigs.Process(sourceEvent, toDeleteInRelabel)) {
&& !mScrapeConfigPtr->mMetricRelabelConfigs.Process(sourceEvent)) {
return false;
}
// set metricEvent name
sourceEvent.SetNameNoCopy(sourceEvent.GetTag(prometheus::NAME));

for (const auto& k : toDelete) {
sourceEvent.DelTag(k);
}
for (const auto& k : toDeleteInRelabel) {
sourceEvent.DelTag(k);
}
sourceEvent.EraseIf([](const std::pair<StringView, StringView>& tag) -> bool {
if (tag.first.starts_with("__") && tag.first != prometheus::NAME) {
return true;
}
return false;
});

for (const auto& [k, v] : mScrapeConfigPtr->mExternalLabels) {
if (sourceEvent.HasTag(k)) {
if (!mScrapeConfigPtr->mHonorLabels) {
// metric event labels is secondary
// if confiliction, then rename it exported_<label_name>
auto key = prometheus::EXPORTED_PREFIX + k;
auto b = sourceEvent.GetSourceBuffer()->CopyString(key);
sourceEvent.SetTagNoCopy(StringView(b.data, b.size), sourceEvent.GetTag(k));
sourceEvent.SetTagNoCopy(k, v);
}
} else {
sourceEvent.SetTagNoCopy(k, v);
}
// the lifetime of mExternalLabels is longer than the sourceEvent
AppendLabel(k, v, sourceEvent, mScrapeConfigPtr->mHonorLabels);
}

// set metricEvent name
sourceEvent.SetTagNoCopy(prometheus::NAME, sourceEvent.GetName());

return true;
}

vector<StringView> ProcessorPromRelabelMetricNative::GetToDeleteTargetLabels(const GroupTags& targetTags) const {
// delete tag which starts with __
vector<StringView> toDelete;
for (const auto& [k, v] : targetTags) {
if (k.starts_with("__")) {
toDelete.push_back(k);
void ProcessorPromRelabelMetricNative::AppendLabel(StringView k, StringView v, MetricEvent& e, bool honorLabels) const {
auto tagValue = e.GetTag(k);
if (!tagValue.empty()) {
if (!honorLabels) {
// metric event labels is secondary
// if confiliction, then rename it exported_<label_name>
auto exportedKey = prometheus::EXPORTED_PREFIX + k.to_string();
auto exportedTagValue = e.GetTag(exportedKey);
if (!exportedTagValue.empty()) {
auto exportedExportedKey = prometheus::EXPORTED_PREFIX + exportedKey;
auto sb = e.GetSourceBuffer()->CopyString(exportedExportedKey);
e.SetTagNoCopy(StringView(sb.data, sb.size), exportedTagValue);
}
auto sb = e.GetSourceBuffer()->CopyString(exportedKey);
e.SetTagNoCopy(StringView(sb.data, sb.size), tagValue);
e.SetTagNoCopy(k, v);
}
} else {
e.SetTagNoCopy(k, v);
}
return toDelete;
}
};

void ProcessorPromRelabelMetricNative::UpdateAutoMetrics(const PipelineEventGroup& eGroup,
prom::AutoMetric& autoMetric) const {
Expand Down Expand Up @@ -247,7 +220,7 @@ void ProcessorPromRelabelMetricNative::AddMetric(PipelineEventGroup& metricGroup
metricEvent->SetTimestamp(timestamp, nanoSec);
metricEvent->SetTag(prometheus::NAME, name);
for (const auto& [k, v] : targetTags) {
metricEvent->SetTag(k, v);
metricEvent->SetTagNoCopy(k, v);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,8 +50,7 @@ class ProcessorPromRelabelMetricNative : public Processor {
bool IsSupportedEvent(const PipelineEventPtr& e) const override;

private:
bool ProcessEvent(PipelineEventPtr& e, const GroupTags& targetTags, const std::vector<StringView>& toDelete);
std::vector<StringView> GetToDeleteTargetLabels(const GroupTags& targetTags) const;
bool ProcessEvent(PipelineEventPtr& e, const GroupTags& targetTags);

void AddAutoMetrics(PipelineEventGroup& eGroup, const prom::AutoMetric& autoMetric) const;
void UpdateAutoMetrics(const PipelineEventGroup& eGroup, prom::AutoMetric& autoMetric) const;
Expand All @@ -61,9 +60,9 @@ class ProcessorPromRelabelMetricNative : public Processor {
time_t timestamp,
uint32_t nanoSec,
const GroupTags& targetTags) const;
void AppendLabel(StringView k, StringView v, MetricEvent& e, bool honorLabels) const;

std::unique_ptr<ScrapeConfig> mScrapeConfigPtr;
std::string mLoongCollectorScraper;

#ifdef APSARA_UNIT_TEST_MAIN
friend class ProcessorPromRelabelMetricNativeUnittest;
Expand Down
Loading
Loading