Skip to content

Commit

Permalink
fix enterprise code in ObserveHandler (#1961)
Browse files Browse the repository at this point in the history
  • Loading branch information
yyuuttaaoo authored Dec 12, 2024
1 parent fc6820f commit 13ab38a
Show file tree
Hide file tree
Showing 2 changed files with 96 additions and 79 deletions.
161 changes: 89 additions & 72 deletions core/ebpf/handler/ObserveHandler.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,69 +31,80 @@ namespace logtail {
namespace ebpf {

#define ADD_STATUS_METRICS(METRIC_NAME, FIELD_NAME, VALUE) \
{if (!inner->FIELD_NAME) return; \
auto event = group.AddMetricEvent(); \
for (const auto& tag : measure->tags_) { \
event->SetTag(tag.first, tag.second); \
} \
event->SetTag(std::string("status_code"), std::string(VALUE)); \
event->SetName(METRIC_NAME); \
event->SetTimestamp(ts); \
event->SetValue(UntypedSingleValue{(double)inner->FIELD_NAME});} \
{ \
if (!inner->FIELD_NAME) { \
return; \
} \
auto* event = group.AddMetricEvent(); \
for (const auto& tag : measure->tags_) { \
event->SetTag(tag.first, tag.second); \
} \
event->SetTag(std::string("status_code"), std::string(VALUE)); \
event->SetName(METRIC_NAME); \
event->SetTimestamp(ts); \
event->SetValue(UntypedSingleValue{(double)inner->FIELD_NAME}); \
}

#define GENERATE_METRICS(FUNC_NAME, MEASURE_TYPE, INNER_TYPE, METRIC_NAME, FIELD_NAME) \
void FUNC_NAME(PipelineEventGroup& group, std::unique_ptr<Measure>& measure, uint64_t ts) { \
if (measure->type_ != MEASURE_TYPE) return; \
auto inner = static_cast<INNER_TYPE*>(measure->inner_measure_.get()); \
if (!inner->FIELD_NAME) return; \
auto event = group.AddMetricEvent(); \
for (const auto& tag : measure->tags_) { \
event->SetTag(tag.first, tag.second); \
} \
event->SetName(METRIC_NAME); \
event->SetTimestamp(ts); \
event->SetValue(UntypedSingleValue{(double)inner->FIELD_NAME}); \
}

void OtelMeterHandler::handle(std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures, uint64_t timestamp) {
if (measures.empty()) return;
void FUNC_NAME(PipelineEventGroup& group, const std::unique_ptr<Measure>& measure, uint64_t ts) { \
if (measure->type_ != (MEASURE_TYPE)) { \
return; \
} \
const auto* inner = static_cast<INNER_TYPE*>(measure->inner_measure_.get()); \
if (!inner->FIELD_NAME) { \
return; \
} \
auto* event = group.AddMetricEvent(); \
for (const auto& tag : measure->tags_) { \
event->SetTag(tag.first, tag.second); \
} \
event->SetName(METRIC_NAME); \
event->SetTimestamp(ts); \
event->SetValue(UntypedSingleValue{(double)inner->FIELD_NAME}); \
}

for (const auto& appBatchMeasures : measures) {
PipelineEventGroup eventGroup(std::make_shared<SourceBuffer>());
for (const auto& measure : appBatchMeasures->measures_) {
auto type = measure->type_;
if (type == MeasureType::MEASURE_TYPE_APP) {
auto inner = static_cast<AppSingleMeasure*>(measure->inner_measure_.get());
auto event = eventGroup.AddMetricEvent();
for (const auto& tag : measure->tags_) {
event->SetTag(tag.first, tag.second);
void OtelMeterHandler::handle(const std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures,
uint64_t timestamp) {
if (measures.empty()) {
return;
}
for (const auto& appBatchMeasures : measures) {
PipelineEventGroup eventGroup(std::make_shared<SourceBuffer>());
for (const auto& measure : appBatchMeasures->measures_) {
auto type = measure->type_;
if (type == MeasureType::MEASURE_TYPE_APP) {
auto* inner = static_cast<AppSingleMeasure*>(measure->inner_measure_.get());
auto* event = eventGroup.AddMetricEvent();
for (const auto& tag : measure->tags_) {
event->SetTag(tag.first, tag.second);
}
event->SetName("service_requests_total");
event->SetTimestamp(timestamp);
event->SetValue(UntypedSingleValue{(double)inner->request_total_});
}
event->SetName("service_requests_total");
event->SetTimestamp(timestamp);
event->SetValue(UntypedSingleValue{(double)inner->request_total_});
mProcessTotalCnt++;
}
mProcessTotalCnt++;
}
#ifdef APSARA_UNIT_TEST_MAIN
continue;
#endif
std::unique_ptr<ProcessQueueItem> item = std::make_unique<ProcessQueueItem>(std::move(eventGroup), mPluginIdx);
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
LOG_WARNING(sLogger, ("configName", mCtx->GetConfigName())("pluginIdx",mPluginIdx)("[Otel Metrics] push queue failed!", ""));
LOG_WARNING(sLogger,
("configName", mCtx->GetConfigName())("pluginIdx",
mPluginIdx)("[Otel Metrics] push queue failed!", ""));
}

}
return;
}

void OtelSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>& spans) {
if (spans.empty()) return;
}

void OtelSpanHandler::handle(const std::vector<std::unique_ptr<ApplicationBatchSpan>>& spans) {
if (spans.empty()) {
return;
}
for (const auto& span : spans) {
std::shared_ptr<SourceBuffer> sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup eventGroup(sourceBuffer);
for (const auto& x : span->single_spans_) {
auto spanEvent = eventGroup.AddSpanEvent();
auto* spanEvent = eventGroup.AddSpanEvent();
for (const auto& tag : x->tags_) {
spanEvent->SetTag(tag.first, tag.second);
}
Expand All @@ -110,30 +121,35 @@ void OtelSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>&
#endif
std::unique_ptr<ProcessQueueItem> item = std::make_unique<ProcessQueueItem>(std::move(eventGroup), mPluginIdx);
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
LOG_WARNING(sLogger, ("configName", mCtx->GetConfigName())("pluginIdx",mPluginIdx)("[Span] push queue failed!", ""));
LOG_WARNING(
sLogger,
("configName", mCtx->GetConfigName())("pluginIdx", mPluginIdx)("[Span] push queue failed!", ""));
}

}

return;
}

void EventHandler::handle(std::vector<std::unique_ptr<ApplicationBatchEvent>>& events) {
if (events.empty()) return;

void EventHandler::handle(const std::vector<std::unique_ptr<ApplicationBatchEvent>>& events) {
if (events.empty()) {
return;
}
for (const auto& appEvents : events) {
if (!appEvents || appEvents->events_.empty()) continue;
if (!appEvents || appEvents->events_.empty()) {
continue;
}
std::shared_ptr<SourceBuffer> sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup eventGroup(sourceBuffer);
for (const auto& event : appEvents->events_) {
if (!event || event->GetAllTags().empty()) continue;
auto logEvent = eventGroup.AddLogEvent();
if (!event || event->GetAllTags().empty()) {
continue;
}
auto* logEvent = eventGroup.AddLogEvent();
for (const auto& tag : event->GetAllTags()) {
logEvent->SetContent(tag.first, tag.second);
auto seconds = std::chrono::duration_cast<std::chrono::seconds>(std::chrono::nanoseconds(event->GetTimestamp()));
auto seconds
= std::chrono::duration_cast<std::chrono::seconds>(std::chrono::nanoseconds(event->GetTimestamp()));
logEvent->SetTimestamp(seconds.count(), event->GetTimestamp() - seconds.count() * 1e9);
}
mProcessTotalCnt ++;
mProcessTotalCnt++;
}
for (const auto& tag : appEvents->tags_) {
eventGroup.SetTag(tag.first, tag.second);
Expand Down Expand Up @@ -169,9 +185,11 @@ GENERATE_METRICS(GenerateRequestsSlowMetrics, MeasureType::MEASURE_TYPE_APP, App
GENERATE_METRICS(GenerateRequestsErrorMetrics, MeasureType::MEASURE_TYPE_APP, AppSingleMeasure, rpc_request_err_count, error_total_)
GENERATE_METRICS(GenerateRequestsDurationSumMetrics, MeasureType::MEASURE_TYPE_APP, AppSingleMeasure, rpc_request_status_count, duration_ms_sum_)

void GenerateRequestsStatusMetrics(PipelineEventGroup& group, std::unique_ptr<Measure>& measure, uint64_t ts) {
if (measure->type_ != MeasureType::MEASURE_TYPE_APP) return;
auto inner = static_cast<AppSingleMeasure*>(measure->inner_measure_.get());
void GenerateRequestsStatusMetrics(PipelineEventGroup& group, const std::unique_ptr<Measure>& measure, uint64_t ts) {
if (measure->type_ != MeasureType::MEASURE_TYPE_APP) {
return;
}
const auto* inner = static_cast<const AppSingleMeasure*>(measure->inner_measure_.get());
ADD_STATUS_METRICS(rpc_request_status_count, status_2xx_count_, status_2xx_key);
ADD_STATUS_METRICS(rpc_request_status_count, status_3xx_count_, status_3xx_key);
ADD_STATUS_METRICS(rpc_request_status_count, status_4xx_count_, status_4xx_key);
Expand All @@ -195,15 +213,16 @@ GENERATE_METRICS(GenerateTcpRecvBytesTotalMetrics, MeasureType::MEASURE_TYPE_NET
GENERATE_METRICS(GenerateTcpSendPktsTotalMetrics, MeasureType::MEASURE_TYPE_NET, NetSingleMeasure, npm_send_pkt_total, send_pkt_total_)
GENERATE_METRICS(GenerateTcpSendBytesTotalMetrics, MeasureType::MEASURE_TYPE_NET, NetSingleMeasure, npm_send_byte_total, send_byte_total_)

void ArmsSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>& spans) {
if (spans.empty()) return;

void ArmsSpanHandler::handle(const std::vector<std::unique_ptr<ApplicationBatchSpan>>& spans) {
if (spans.empty()) {
return;
}
for (const auto& span : spans) {
std::shared_ptr<SourceBuffer> sourceBuffer = std::make_shared<SourceBuffer>();
PipelineEventGroup eventGroup(sourceBuffer);
eventGroup.SetTag(app_id_key, span->app_id_);
for (const auto& x : span->single_spans_) {
auto spanEvent = eventGroup.AddSpanEvent();
auto* spanEvent = eventGroup.AddSpanEvent();
for (const auto& tag : x->tags_) {
spanEvent->SetTag(tag.first, tag.second);
}
Expand All @@ -223,13 +242,13 @@ void ArmsSpanHandler::handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>&
LOG_WARNING(sLogger, ("configName", mCtx->GetConfigName())("pluginIdx",mPluginIdx)("[Span] push queue failed!", ""));
}
}

return;
}

void ArmsMeterHandler::handle(std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures, uint64_t timestamp) {
if (measures.empty()) return;

void ArmsMeterHandler::handle(const std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures,
uint64_t timestamp) {
if (measures.empty()) {
return;
}
for (const auto& appBatchMeasures : measures) {
std::shared_ptr<SourceBuffer> sourceBuffer = std::make_shared<SourceBuffer>();;
PipelineEventGroup eventGroup(sourceBuffer);
Expand Down Expand Up @@ -264,9 +283,7 @@ void ArmsMeterHandler::handle(std::vector<std::unique_ptr<ApplicationBatchMeasur
if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) {
LOG_WARNING(sLogger, ("configName", mCtx->GetConfigName())("pluginIdx",mPluginIdx)("[Metrics] push queue failed!", ""));
}

}
return;
}

#endif
Expand Down
14 changes: 7 additions & 7 deletions core/ebpf/handler/ObserveHandler.h
Original file line number Diff line number Diff line change
Expand Up @@ -26,45 +26,45 @@ class MeterHandler : public AbstractHandler {
public:
MeterHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : AbstractHandler(ctx, key, idx) {}

virtual void handle(std::vector<std::unique_ptr<ApplicationBatchMeasure>>&, uint64_t) = 0;
virtual void handle(const std::vector<std::unique_ptr<ApplicationBatchMeasure>>&, uint64_t) = 0;
};

class OtelMeterHandler : public MeterHandler {
public:
OtelMeterHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : MeterHandler(ctx, key, idx) {}
void handle(std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures, uint64_t timestamp) override;
void handle(const std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures, uint64_t timestamp) override;
};

class SpanHandler : public AbstractHandler {
public:
SpanHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : AbstractHandler(ctx, key, idx) {}
virtual void handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>&) = 0;
virtual void handle(const std::vector<std::unique_ptr<ApplicationBatchSpan>>&) = 0;
};

class OtelSpanHandler : public SpanHandler {
public:
OtelSpanHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : SpanHandler(ctx, key, idx) {}
void handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>&) override;
void handle(const std::vector<std::unique_ptr<ApplicationBatchSpan>>&) override;
};

class EventHandler : public AbstractHandler {
public:
EventHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : AbstractHandler(ctx, key, idx) {}
void handle(std::vector<std::unique_ptr<ApplicationBatchEvent>>&);
void handle(const std::vector<std::unique_ptr<ApplicationBatchEvent>>&);
};

#ifdef __ENTERPRISE__

class ArmsMeterHandler : public MeterHandler {
public:
ArmsMeterHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : MeterHandler(ctx, key, idx) {}
void handle(std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures, uint64_t timestamp) override;
void handle(const std::vector<std::unique_ptr<ApplicationBatchMeasure>>& measures, uint64_t timestamp) override;
};

class ArmsSpanHandler : public SpanHandler {
public:
ArmsSpanHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : SpanHandler(ctx, key, idx) {}
void handle(std::vector<std::unique_ptr<ApplicationBatchSpan>>&) override;
void handle(const std::vector<std::unique_ptr<ApplicationBatchSpan>>&) override;
};

#endif
Expand Down

0 comments on commit 13ab38a

Please sign in to comment.