diff --git a/core/ebpf/handler/ObserveHandler.cpp b/core/ebpf/handler/ObserveHandler.cpp index e3e47bc01c..695568f873 100644 --- a/core/ebpf/handler/ObserveHandler.cpp +++ b/core/ebpf/handler/ObserveHandler.cpp @@ -31,69 +31,80 @@ namespace logtail { namespace ebpf { #define ADD_STATUS_METRICS(METRIC_NAME, FIELD_NAME, VALUE) \ - {if (!inner->FIELD_NAME) return; \ - auto event = group.AddMetricEvent(); \ - for (const auto& tag : measure->tags_) { \ - event->SetTag(tag.first, tag.second); \ - } \ - event->SetTag(std::string("status_code"), std::string(VALUE)); \ - event->SetName(METRIC_NAME); \ - event->SetTimestamp(ts); \ - event->SetValue(UntypedSingleValue{(double)inner->FIELD_NAME});} \ + { \ + if (!inner->FIELD_NAME) { \ + return; \ + } \ + auto* event = group.AddMetricEvent(); \ + for (const auto& tag : measure->tags_) { \ + event->SetTag(tag.first, tag.second); \ + } \ + event->SetTag(std::string("status_code"), std::string(VALUE)); \ + event->SetName(METRIC_NAME); \ + event->SetTimestamp(ts); \ + event->SetValue(UntypedSingleValue{(double)inner->FIELD_NAME}); \ + } #define GENERATE_METRICS(FUNC_NAME, MEASURE_TYPE, INNER_TYPE, METRIC_NAME, FIELD_NAME) \ -void FUNC_NAME(PipelineEventGroup& group, std::unique_ptr& measure, uint64_t ts) { \ - if (measure->type_ != MEASURE_TYPE) return; \ - auto inner = static_cast(measure->inner_measure_.get()); \ - if (!inner->FIELD_NAME) return; \ - auto event = group.AddMetricEvent(); \ - for (const auto& tag : measure->tags_) { \ - event->SetTag(tag.first, tag.second); \ - } \ - event->SetName(METRIC_NAME); \ - event->SetTimestamp(ts); \ - event->SetValue(UntypedSingleValue{(double)inner->FIELD_NAME}); \ -} - -void OtelMeterHandler::handle(std::vector>& measures, uint64_t timestamp) { - if (measures.empty()) return; + void FUNC_NAME(PipelineEventGroup& group, const std::unique_ptr& measure, uint64_t ts) { \ + if (measure->type_ != (MEASURE_TYPE)) { \ + return; \ + } \ + const auto* inner = static_cast(measure->inner_measure_.get()); \ + if (!inner->FIELD_NAME) { \ + return; \ + } \ + auto* event = group.AddMetricEvent(); \ + for (const auto& tag : measure->tags_) { \ + event->SetTag(tag.first, tag.second); \ + } \ + event->SetName(METRIC_NAME); \ + event->SetTimestamp(ts); \ + event->SetValue(UntypedSingleValue{(double)inner->FIELD_NAME}); \ + } - for (const auto& appBatchMeasures : measures) { - PipelineEventGroup eventGroup(std::make_shared()); - for (const auto& measure : appBatchMeasures->measures_) { - auto type = measure->type_; - if (type == MeasureType::MEASURE_TYPE_APP) { - auto inner = static_cast(measure->inner_measure_.get()); - auto event = eventGroup.AddMetricEvent(); - for (const auto& tag : measure->tags_) { - event->SetTag(tag.first, tag.second); + void OtelMeterHandler::handle(const std::vector>& measures, + uint64_t timestamp) { + if (measures.empty()) { + return; + } + for (const auto& appBatchMeasures : measures) { + PipelineEventGroup eventGroup(std::make_shared()); + for (const auto& measure : appBatchMeasures->measures_) { + auto type = measure->type_; + if (type == MeasureType::MEASURE_TYPE_APP) { + auto* inner = static_cast(measure->inner_measure_.get()); + auto* event = eventGroup.AddMetricEvent(); + for (const auto& tag : measure->tags_) { + event->SetTag(tag.first, tag.second); + } + event->SetName("service_requests_total"); + event->SetTimestamp(timestamp); + event->SetValue(UntypedSingleValue{(double)inner->request_total_}); } - event->SetName("service_requests_total"); - event->SetTimestamp(timestamp); - event->SetValue(UntypedSingleValue{(double)inner->request_total_}); + mProcessTotalCnt++; } - mProcessTotalCnt++; - } #ifdef APSARA_UNIT_TEST_MAIN continue; #endif std::unique_ptr item = std::make_unique(std::move(eventGroup), mPluginIdx); if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) { - LOG_WARNING(sLogger, ("configName", mCtx->GetConfigName())("pluginIdx",mPluginIdx)("[Otel Metrics] push queue failed!", "")); + LOG_WARNING(sLogger, + ("configName", mCtx->GetConfigName())("pluginIdx", + mPluginIdx)("[Otel Metrics] push queue failed!", "")); } - } - return; -} - -void OtelSpanHandler::handle(std::vector>& spans) { - if (spans.empty()) return; + } +void OtelSpanHandler::handle(const std::vector>& spans) { + if (spans.empty()) { + return; + } for (const auto& span : spans) { std::shared_ptr sourceBuffer = std::make_shared(); PipelineEventGroup eventGroup(sourceBuffer); for (const auto& x : span->single_spans_) { - auto spanEvent = eventGroup.AddSpanEvent(); + auto* spanEvent = eventGroup.AddSpanEvent(); for (const auto& tag : x->tags_) { spanEvent->SetTag(tag.first, tag.second); } @@ -110,30 +121,35 @@ void OtelSpanHandler::handle(std::vector>& #endif std::unique_ptr item = std::make_unique(std::move(eventGroup), mPluginIdx); if (ProcessQueueManager::GetInstance()->PushQueue(mQueueKey, std::move(item))) { - LOG_WARNING(sLogger, ("configName", mCtx->GetConfigName())("pluginIdx",mPluginIdx)("[Span] push queue failed!", "")); + LOG_WARNING( + sLogger, + ("configName", mCtx->GetConfigName())("pluginIdx", mPluginIdx)("[Span] push queue failed!", "")); } - } - - return; } -void EventHandler::handle(std::vector>& events) { - if (events.empty()) return; - +void EventHandler::handle(const std::vector>& events) { + if (events.empty()) { + return; + } for (const auto& appEvents : events) { - if (!appEvents || appEvents->events_.empty()) continue; + if (!appEvents || appEvents->events_.empty()) { + continue; + } std::shared_ptr sourceBuffer = std::make_shared(); PipelineEventGroup eventGroup(sourceBuffer); for (const auto& event : appEvents->events_) { - if (!event || event->GetAllTags().empty()) continue; - auto logEvent = eventGroup.AddLogEvent(); + if (!event || event->GetAllTags().empty()) { + continue; + } + auto* logEvent = eventGroup.AddLogEvent(); for (const auto& tag : event->GetAllTags()) { logEvent->SetContent(tag.first, tag.second); - auto seconds = std::chrono::duration_cast(std::chrono::nanoseconds(event->GetTimestamp())); + auto seconds + = std::chrono::duration_cast(std::chrono::nanoseconds(event->GetTimestamp())); logEvent->SetTimestamp(seconds.count(), event->GetTimestamp() - seconds.count() * 1e9); } - mProcessTotalCnt ++; + mProcessTotalCnt++; } for (const auto& tag : appEvents->tags_) { eventGroup.SetTag(tag.first, tag.second); @@ -169,9 +185,11 @@ GENERATE_METRICS(GenerateRequestsSlowMetrics, MeasureType::MEASURE_TYPE_APP, App GENERATE_METRICS(GenerateRequestsErrorMetrics, MeasureType::MEASURE_TYPE_APP, AppSingleMeasure, rpc_request_err_count, error_total_) GENERATE_METRICS(GenerateRequestsDurationSumMetrics, MeasureType::MEASURE_TYPE_APP, AppSingleMeasure, rpc_request_status_count, duration_ms_sum_) -void GenerateRequestsStatusMetrics(PipelineEventGroup& group, std::unique_ptr& measure, uint64_t ts) { - if (measure->type_ != MeasureType::MEASURE_TYPE_APP) return; - auto inner = static_cast(measure->inner_measure_.get()); +void GenerateRequestsStatusMetrics(PipelineEventGroup& group, const std::unique_ptr& measure, uint64_t ts) { + if (measure->type_ != MeasureType::MEASURE_TYPE_APP) { + return; + } + const auto* inner = static_cast(measure->inner_measure_.get()); ADD_STATUS_METRICS(rpc_request_status_count, status_2xx_count_, status_2xx_key); ADD_STATUS_METRICS(rpc_request_status_count, status_3xx_count_, status_3xx_key); ADD_STATUS_METRICS(rpc_request_status_count, status_4xx_count_, status_4xx_key); @@ -195,15 +213,16 @@ GENERATE_METRICS(GenerateTcpRecvBytesTotalMetrics, MeasureType::MEASURE_TYPE_NET GENERATE_METRICS(GenerateTcpSendPktsTotalMetrics, MeasureType::MEASURE_TYPE_NET, NetSingleMeasure, npm_send_pkt_total, send_pkt_total_) GENERATE_METRICS(GenerateTcpSendBytesTotalMetrics, MeasureType::MEASURE_TYPE_NET, NetSingleMeasure, npm_send_byte_total, send_byte_total_) -void ArmsSpanHandler::handle(std::vector>& spans) { - if (spans.empty()) return; - +void ArmsSpanHandler::handle(const std::vector>& spans) { + if (spans.empty()) { + return; + } for (const auto& span : spans) { std::shared_ptr sourceBuffer = std::make_shared(); PipelineEventGroup eventGroup(sourceBuffer); eventGroup.SetTag(app_id_key, span->app_id_); for (const auto& x : span->single_spans_) { - auto spanEvent = eventGroup.AddSpanEvent(); + auto* spanEvent = eventGroup.AddSpanEvent(); for (const auto& tag : x->tags_) { spanEvent->SetTag(tag.first, tag.second); } @@ -223,13 +242,13 @@ void ArmsSpanHandler::handle(std::vector>& LOG_WARNING(sLogger, ("configName", mCtx->GetConfigName())("pluginIdx",mPluginIdx)("[Span] push queue failed!", "")); } } - - return; } -void ArmsMeterHandler::handle(std::vector>& measures, uint64_t timestamp) { - if (measures.empty()) return; - +void ArmsMeterHandler::handle(const std::vector>& measures, + uint64_t timestamp) { + if (measures.empty()) { + return; + } for (const auto& appBatchMeasures : measures) { std::shared_ptr sourceBuffer = std::make_shared();; PipelineEventGroup eventGroup(sourceBuffer); @@ -264,9 +283,7 @@ void ArmsMeterHandler::handle(std::vectorPushQueue(mQueueKey, std::move(item))) { LOG_WARNING(sLogger, ("configName", mCtx->GetConfigName())("pluginIdx",mPluginIdx)("[Metrics] push queue failed!", "")); } - } - return; } #endif diff --git a/core/ebpf/handler/ObserveHandler.h b/core/ebpf/handler/ObserveHandler.h index d70cf33470..a1cc53f711 100644 --- a/core/ebpf/handler/ObserveHandler.h +++ b/core/ebpf/handler/ObserveHandler.h @@ -26,31 +26,31 @@ class MeterHandler : public AbstractHandler { public: MeterHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : AbstractHandler(ctx, key, idx) {} - virtual void handle(std::vector>&, uint64_t) = 0; + virtual void handle(const std::vector>&, uint64_t) = 0; }; class OtelMeterHandler : public MeterHandler { public: OtelMeterHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : MeterHandler(ctx, key, idx) {} - void handle(std::vector>& measures, uint64_t timestamp) override; + void handle(const std::vector>& measures, uint64_t timestamp) override; }; class SpanHandler : public AbstractHandler { public: SpanHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : AbstractHandler(ctx, key, idx) {} - virtual void handle(std::vector>&) = 0; + virtual void handle(const std::vector>&) = 0; }; class OtelSpanHandler : public SpanHandler { public: OtelSpanHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : SpanHandler(ctx, key, idx) {} - void handle(std::vector>&) override; + void handle(const std::vector>&) override; }; class EventHandler : public AbstractHandler { public: EventHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : AbstractHandler(ctx, key, idx) {} - void handle(std::vector>&); + void handle(const std::vector>&); }; #ifdef __ENTERPRISE__ @@ -58,13 +58,13 @@ class EventHandler : public AbstractHandler { class ArmsMeterHandler : public MeterHandler { public: ArmsMeterHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : MeterHandler(ctx, key, idx) {} - void handle(std::vector>& measures, uint64_t timestamp) override; + void handle(const std::vector>& measures, uint64_t timestamp) override; }; class ArmsSpanHandler : public SpanHandler { public: ArmsSpanHandler(const logtail::PipelineContext* ctx, QueueKey key, uint32_t idx) : SpanHandler(ctx, key, idx) {} - void handle(std::vector>&) override; + void handle(const std::vector>&) override; }; #endif