diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index 162ed8a82b..6c36c26eeb 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -113,7 +113,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake) set(SUB_DIRECTORIES_LIST application app_config checkpoint container_manager logger go_pipeline monitor profile_sender models config config/feedbacker config/provider config/watcher - pipeline pipeline/batch pipeline/compression pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer + pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer runner runner/sink/http protobuf/config_server/v1 protobuf/config_server/v2 protobuf/sls file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling diff --git a/core/common/common.cmake b/core/common/common.cmake index e905a95fa3..caf8e45b98 100644 --- a/core/common/common.cmake +++ b/core/common/common.cmake @@ -30,7 +30,7 @@ list(APPEND THIS_SOURCE_FILES_LIST ${XX_HASH_SOURCE_FILES}) list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/memory/SourceBuffer.h) list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/http/AsynCurlRunner.cpp ${CMAKE_SOURCE_DIR}/common/http/Curl.cpp ${CMAKE_SOURCE_DIR}/common/http/HttpResponse.cpp) list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/timer/Timer.cpp ${CMAKE_SOURCE_DIR}/common/timer/HttpRequestTimerEvent.cpp) - +list(APPEND THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/compression/Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/CompressorFactory.cpp ${CMAKE_SOURCE_DIR}/common/compression/LZ4Compressor.cpp ${CMAKE_SOURCE_DIR}/common/compression/ZstdCompressor.cpp) # remove several files in common list(REMOVE_ITEM THIS_SOURCE_FILES_LIST ${CMAKE_SOURCE_DIR}/common/BoostRegexValidator.cpp ${CMAKE_SOURCE_DIR}/common/GetUUID.cpp) diff --git a/core/pipeline/compression/CompressType.h b/core/common/compression/CompressType.h similarity index 85% rename from core/pipeline/compression/CompressType.h rename to core/common/compression/CompressType.h index 96f996ee9a..1580bb5967 100644 --- a/core/pipeline/compression/CompressType.h +++ b/core/common/compression/CompressType.h @@ -18,6 +18,14 @@ namespace logtail { -enum class CompressType { NONE, LZ4, ZSTD }; +enum class CompressType { + NONE, + LZ4, + ZSTD +#ifdef APSARA_UNIT_TEST_MAIN + , + MOCK +#endif +}; } // namespace logtail diff --git a/core/common/compression/Compressor.cpp b/core/common/compression/Compressor.cpp new file mode 100644 index 0000000000..39a7af4b57 --- /dev/null +++ b/core/common/compression/Compressor.cpp @@ -0,0 +1,59 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "common/compression/Compressor.h" + +#include + +#include "monitor/MetricConstants.h" + +using namespace std; + +namespace logtail { + +void Compressor::SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels) { + WriteMetrics::GetInstance()->PrepareMetricsRecordRef( + mMetricsRecordRef, std::move(labels), std::move(dynamicLabels)); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); + mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); + mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEM_SIZE_BYTES); + mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt"); + mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_size_bytes"); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); +} + +bool Compressor::DoCompress(const string& input, string& output, string& errorMsg) { + if (mMetricsRecordRef != nullptr) { + mInItemsCnt->Add(1); + mInItemSizeBytes->Add(input.size()); + } + + auto before = chrono::system_clock::now(); + auto res = Compress(input, output, errorMsg); + + if (mMetricsRecordRef != nullptr) { + mTotalDelayMs->Add(chrono::duration_cast(chrono::system_clock::now() - before).count()); + if (res) { + mOutItemsCnt->Add(1); + mOutItemSizeBytes->Add(output.size()); + } else { + mDiscardedItemsCnt->Add(1); + mDiscardedItemSizeBytes->Add(input.size()); + } + } + return res; +} + +} // namespace logtail diff --git a/core/pipeline/compression/Compressor.h b/core/common/compression/Compressor.h similarity index 64% rename from core/pipeline/compression/Compressor.h rename to core/common/compression/Compressor.h index 1694f6e6ad..0c32358ca5 100644 --- a/core/pipeline/compression/Compressor.h +++ b/core/common/compression/Compressor.h @@ -18,7 +18,8 @@ #include -#include "pipeline/compression/CompressType.h" +#include "monitor/LogtailMetric.h" +#include "common/compression/CompressType.h" namespace logtail { @@ -27,7 +28,7 @@ class Compressor { Compressor(CompressType type) : mType(type) {} virtual ~Compressor() = default; - virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0; + bool DoCompress(const std::string& input, std::string& output, std::string& errorMsg); #ifdef APSARA_UNIT_TEST_MAIN // buffer shoudl be reserved for output before calling this function @@ -35,9 +36,27 @@ class Compressor { #endif CompressType GetCompressType() const { return mType; } + void SetMetricRecordRef(MetricLabels&& labels, DynamicMetricLabels&& dynamicLabels = {}); + +protected: + mutable MetricsRecordRef mMetricsRecordRef; + CounterPtr mInItemsCnt; + CounterPtr mInItemSizeBytes; + CounterPtr mOutItemsCnt; + CounterPtr mOutItemSizeBytes; + CounterPtr mDiscardedItemsCnt; + CounterPtr mDiscardedItemSizeBytes; + CounterPtr mTotalDelayMs; private: + virtual bool Compress(const std::string& input, std::string& output, std::string& errorMsg) = 0; + CompressType mType = CompressType::NONE; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class CompressorUnittest; + friend class CompressorFactoryUnittest; +#endif }; } // namespace logtail diff --git a/core/pipeline/compression/CompressorFactory.cpp b/core/common/compression/CompressorFactory.cpp similarity index 71% rename from core/pipeline/compression/CompressorFactory.cpp rename to core/common/compression/CompressorFactory.cpp index 1c91f24035..0e3b27197b 100644 --- a/core/pipeline/compression/CompressorFactory.cpp +++ b/core/common/compression/CompressorFactory.cpp @@ -12,21 +12,24 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "pipeline/compression/CompressorFactory.h" +#include "common/compression/CompressorFactory.h" #include "common/ParamExtractor.h" -#include "pipeline/compression/LZ4Compressor.h" -#include "pipeline/compression/ZstdCompressor.h" +#include "monitor/MetricConstants.h" +#include "common/compression/LZ4Compressor.h" +#include "common/compression/ZstdCompressor.h" using namespace std; namespace logtail { unique_ptr CompressorFactory::Create(const Json::Value& config, - const PipelineContext& ctx, - const string& pluginType, - CompressType defaultType) { + const PipelineContext& ctx, + const string& pluginType, + const string& flusherId, + CompressType defaultType) { string compressType, errorMsg; + unique_ptr compressor; if (!GetOptionalStringParam(config, "CompressType", compressType, errorMsg)) { PARAM_WARNING_DEFAULT(ctx.GetLogger(), ctx.GetAlarm(), @@ -37,11 +40,11 @@ unique_ptr CompressorFactory::Create(const Json::Value& config, ctx.GetProjectName(), ctx.GetLogstoreName(), ctx.GetRegion()); - return Create(defaultType); + compressor = Create(defaultType); } else if (compressType == "lz4") { - return Create(CompressType::LZ4); + compressor = Create(CompressType::LZ4); } else if (compressType == "zstd") { - return Create(CompressType::ZSTD); + compressor = Create(CompressType::ZSTD); } else if (compressType == "none") { return nullptr; } else if (!compressType.empty()) { @@ -54,10 +57,15 @@ unique_ptr CompressorFactory::Create(const Json::Value& config, ctx.GetProjectName(), ctx.GetLogstoreName(), ctx.GetRegion()); - return Create(defaultType); + compressor = Create(defaultType); } else { - return Create(defaultType); + compressor = Create(defaultType); } + compressor->SetMetricRecordRef({{METRIC_LABEL_PROJECT, ctx.GetProjectName()}, + {METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()}, + {METRIC_LABEL_KEY_COMPONENT_NAME, "compressor"}, + {METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusherId}}); + return compressor; } unique_ptr CompressorFactory::Create(CompressType type) { diff --git a/core/pipeline/compression/CompressorFactory.h b/core/common/compression/CompressorFactory.h similarity index 87% rename from core/pipeline/compression/CompressorFactory.h rename to core/common/compression/CompressorFactory.h index dbdc826f16..8311332b7a 100644 --- a/core/pipeline/compression/CompressorFactory.h +++ b/core/common/compression/CompressorFactory.h @@ -21,9 +21,9 @@ #include #include -#include "pipeline/compression/CompressType.h" -#include "pipeline/compression/Compressor.h" #include "pipeline/PipelineContext.h" +#include "common/compression/CompressType.h" +#include "common/compression/Compressor.h" namespace logtail { @@ -42,13 +42,13 @@ class CompressorFactory { std::unique_ptr Create(const Json::Value& config, const PipelineContext& ctx, const std::string& pluginType, + const std::string& flusherId, CompressType defaultType); + std::unique_ptr Create(CompressType type); private: CompressorFactory() = default; ~CompressorFactory() = default; - - std::unique_ptr Create(CompressType defaultType); }; } // namespace logtail diff --git a/core/pipeline/compression/LZ4Compressor.cpp b/core/common/compression/LZ4Compressor.cpp similarity index 97% rename from core/pipeline/compression/LZ4Compressor.cpp rename to core/common/compression/LZ4Compressor.cpp index 7063812610..ec54c1c4e4 100644 --- a/core/pipeline/compression/LZ4Compressor.cpp +++ b/core/common/compression/LZ4Compressor.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "pipeline/compression/LZ4Compressor.h" +#include "common/compression/LZ4Compressor.h" #include diff --git a/core/pipeline/compression/LZ4Compressor.h b/core/common/compression/LZ4Compressor.h similarity index 89% rename from core/pipeline/compression/LZ4Compressor.h rename to core/common/compression/LZ4Compressor.h index 6a64ea2af6..9fd453c163 100644 --- a/core/pipeline/compression/LZ4Compressor.h +++ b/core/common/compression/LZ4Compressor.h @@ -16,19 +16,20 @@ #pragma once -#include "pipeline/compression/Compressor.h" +#include "common/compression/Compressor.h" namespace logtail { class LZ4Compressor : public Compressor { public: - LZ4Compressor(CompressType type) : Compressor(type){}; + LZ4Compressor(CompressType type) : Compressor(type) {}; - bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override; - #ifdef APSARA_UNIT_TEST_MAIN bool UnCompress(const std::string& input, std::string& output, std::string& errorMsg) override; #endif + +private: + bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override; }; } // namespace logtail diff --git a/core/pipeline/compression/ZstdCompressor.cpp b/core/common/compression/ZstdCompressor.cpp similarity index 97% rename from core/pipeline/compression/ZstdCompressor.cpp rename to core/common/compression/ZstdCompressor.cpp index 6e47985b5e..c482c569a1 100644 --- a/core/pipeline/compression/ZstdCompressor.cpp +++ b/core/common/compression/ZstdCompressor.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "pipeline/compression/ZstdCompressor.h" +#include "common/compression/ZstdCompressor.h" #include diff --git a/core/pipeline/compression/ZstdCompressor.h b/core/common/compression/ZstdCompressor.h similarity index 92% rename from core/pipeline/compression/ZstdCompressor.h rename to core/common/compression/ZstdCompressor.h index 1e910a2b0d..bf53c7d96d 100644 --- a/core/pipeline/compression/ZstdCompressor.h +++ b/core/common/compression/ZstdCompressor.h @@ -16,21 +16,21 @@ #pragma once -#include "pipeline/compression/Compressor.h" +#include "common/compression/Compressor.h" namespace logtail { class ZstdCompressor : public Compressor { public: - ZstdCompressor(CompressType type, int32_t level = 1) : Compressor(type), mCompressionLevel(level){}; - - bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override; + ZstdCompressor(CompressType type, int32_t level = 1) : Compressor(type), mCompressionLevel(level) {}; #ifdef APSARA_UNIT_TEST_MAIN bool UnCompress(const std::string& input, std::string& output, std::string& errorMsg) override; #endif private: + bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override; + int32_t mCompressionLevel = 1; }; diff --git a/core/go_pipeline/LogtailPlugin.cpp b/core/go_pipeline/LogtailPlugin.cpp index a84ff893c9..bb90dcdecc 100644 --- a/core/go_pipeline/LogtailPlugin.cpp +++ b/core/go_pipeline/LogtailPlugin.cpp @@ -22,15 +22,15 @@ #include "common/JsonUtil.h" #include "common/LogtailCommonFlags.h" #include "common/TimeUtil.h" -#include "pipeline/compression/CompressorFactory.h" +#include "common/compression/CompressorFactory.h" #include "container_manager/ConfigContainerInfoUpdateCmd.h" #include "file_server/ConfigManager.h" #include "logger/Logger.h" #include "monitor/LogFileProfiler.h" #include "monitor/LogtailAlarm.h" #include "pipeline/PipelineManager.h" -#include "profile_sender/ProfileSender.h" #include "pipeline/queue/SenderQueueManager.h" +#include "profile_sender/ProfileSender.h" DEFINE_FLAG_BOOL(enable_sls_metrics_format, "if enable format metrics in SLS metricstore log pattern", false); DEFINE_FLAG_BOOL(enable_containerd_upper_dir_detect, @@ -53,16 +53,13 @@ LogtailPlugin::LogtailPlugin() { mPluginValid = false; mPluginAlarmConfig.mLogstore = "logtail_alarm"; mPluginAlarmConfig.mAliuid = STRING_FLAG(logtail_profile_aliuid); - mPluginAlarmConfig.mCompressor - = CompressorFactory::GetInstance()->Create(Json::Value(), PipelineContext(), "flusher_sls", CompressType::ZSTD); + mPluginAlarmConfig.mCompressor = CompressorFactory::GetInstance()->Create(CompressType::ZSTD); mPluginProfileConfig.mLogstore = "shennong_log_profile"; mPluginProfileConfig.mAliuid = STRING_FLAG(logtail_profile_aliuid); - mPluginProfileConfig.mCompressor - = CompressorFactory::GetInstance()->Create(Json::Value(), PipelineContext(), "flusher_sls", CompressType::ZSTD); + mPluginProfileConfig.mCompressor = CompressorFactory::GetInstance()->Create(CompressType::ZSTD); mPluginContainerConfig.mLogstore = "logtail_containers"; mPluginContainerConfig.mAliuid = STRING_FLAG(logtail_profile_aliuid); - mPluginContainerConfig.mCompressor - = CompressorFactory::GetInstance()->Create(Json::Value(), PipelineContext(), "flusher_sls", CompressType::ZSTD); + mPluginContainerConfig.mCompressor = CompressorFactory::GetInstance()->Create(CompressType::ZSTD); mPluginCfg["LogtailSysConfDir"] = AppConfig::GetInstance()->GetLogtailSysConfDir(); mPluginCfg["HostIP"] = LogFileProfiler::mIpAddr; @@ -470,7 +467,8 @@ void LogtailPlugin::ProcessLogGroup(const std::string& configName, } } -void LogtailPlugin::GetGoMetrics(std::vector>& metircsList, const string& metricType) { +void LogtailPlugin::GetGoMetrics(std::vector>& metircsList, + const string& metricType) { if (mGetGoMetricsFun != nullptr) { GoString type; type.n = metricType.size(); diff --git a/core/monitor/LogtailMetric.h b/core/monitor/LogtailMetric.h index d6d76666ed..6c98081b2a 100644 --- a/core/monitor/LogtailMetric.h +++ b/core/monitor/LogtailMetric.h @@ -58,6 +58,8 @@ class MetricsRecord { class MetricsRecordRef { friend class WriteMetrics; + friend bool operator==(const MetricsRecordRef& lhs, std::nullptr_t rhs); + friend bool operator==(std::nullptr_t rhs, const MetricsRecordRef& lhs); private: MetricsRecord* mMetrics = nullptr; @@ -83,6 +85,22 @@ class MetricsRecordRef { #endif }; +inline bool operator==(const MetricsRecordRef& lhs, std::nullptr_t rhs) { + return lhs.mMetrics == rhs; +} + +inline bool operator==(std::nullptr_t lhs, const MetricsRecordRef& rhs) { + return lhs == rhs.mMetrics; +} + +inline bool operator!=(const MetricsRecordRef& lhs, std::nullptr_t rhs) { + return !(lhs == rhs); +} + +inline bool operator!=(std::nullptr_t lhs, const MetricsRecordRef& rhs) { + return !(lhs == rhs); +} + class ReentrantMetricsRecord { private: MetricsRecordRef mMetricsRecordRef; diff --git a/core/monitor/MetricConstants.cpp b/core/monitor/MetricConstants.cpp index 52c8d1158b..24212c9d3c 100644 --- a/core/monitor/MetricConstants.cpp +++ b/core/monitor/MetricConstants.cpp @@ -137,8 +137,6 @@ const std::string METRIC_PROC_PARSE_STDOUT_TOTAL = "proc_parse_stdout_total"; const std::string METRIC_PROC_PARSE_STDERR_TOTAL = "proc_parse_stderr_total"; // flusher common metrics -const std::string METRIC_FLUSHER_IN_RECORDS_TOTAL = "flusher_in_records_total"; -const std::string METRIC_FLUSHER_IN_RECORDS_SIZE_BYTES = "flusher_in_records_size_bytes"; const std::string METRIC_FLUSHER_ERROR_TOTAL = "flusher_error_total"; const std::string METRIC_FLUSHER_DISCARD_RECORDS_TOTAL = "flusher_discard_records_total"; const std::string METRIC_FLUSHER_SUCCESS_RECORDS_TOTAL = "flusher_success_records_total"; @@ -151,4 +149,14 @@ const std::string METRIC_FLUSHER_QUOTA_ERROR_TOTAL = "flusher_quota_error_total" const std::string METRIC_FLUSHER_RETRIES_TOTAL = "flusher_retries_total"; const std::string METRIC_FLUSHER_RETRIES_ERROR_TOTAL = "flusher_retries_error_total"; +const std::string METRIC_IN_EVENTS_CNT = "in_events_cnt"; +const std::string METRIC_IN_ITEMS_CNT = "in_items_cnt"; +const std::string METRIC_IN_EVENT_GROUP_SIZE_BYTES = "in_event_group_data_size_bytes"; +const std::string METRIC_IN_ITEM_SIZE_BYTES = "in_item_data_size_bytes"; +const std::string METRIC_OUT_EVENTS_CNT = "out_events_cnt"; +const std::string METRIC_OUT_ITEMS_CNT = "out_items_cnt"; +const std::string METRIC_OUT_EVENT_GROUP_SIZE_BYTES = "out_event_group_data_size_bytes"; +const std::string METRIC_OUT_ITEM_SIZE_BYTES = "out_item_data_size_bytes"; +const std::string METRIC_TOTAL_DELAY_MS = "total_delay_ms"; + } // namespace logtail diff --git a/core/monitor/MetricConstants.h b/core/monitor/MetricConstants.h index 27f3a762e3..3b78a5eae0 100644 --- a/core/monitor/MetricConstants.h +++ b/core/monitor/MetricConstants.h @@ -130,8 +130,6 @@ extern const std::string METRIC_PROC_PARSE_STDOUT_TOTAL; extern const std::string METRIC_PROC_PARSE_STDERR_TOTAL; // flusher common metrics -extern const std::string METRIC_FLUSHER_IN_RECORDS_TOTAL; -extern const std::string METRIC_FLUSHER_IN_RECORDS_SIZE_BYTES; extern const std::string METRIC_FLUSHER_ERROR_TOTAL; extern const std::string METRIC_FLUSHER_DISCARD_RECORDS_TOTAL; extern const std::string METRIC_FLUSHER_SUCCESS_RECORDS_TOTAL; @@ -144,4 +142,14 @@ extern const std::string METRIC_FLUSHER_QUOTA_ERROR_TOTAL; extern const std::string METRIC_FLUSHER_RETRIES_TOTAL; extern const std::string METRIC_FLUSHER_RETRIES_ERROR_TOTAL; +extern const std::string METRIC_IN_EVENTS_CNT; +extern const std::string METRIC_IN_ITEMS_CNT; +extern const std::string METRIC_IN_EVENT_GROUP_SIZE_BYTES; +extern const std::string METRIC_IN_ITEM_SIZE_BYTES; +extern const std::string METRIC_OUT_EVENTS_CNT; +extern const std::string METRIC_OUT_ITEMS_CNT; +extern const std::string METRIC_OUT_EVENT_GROUP_SIZE_BYTES; +extern const std::string METRIC_OUT_ITEM_SIZE_BYTES; +extern const std::string METRIC_TOTAL_DELAY_MS; + } // namespace logtail diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 511b642b96..7d38880084 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -16,6 +16,7 @@ #include "pipeline/Pipeline.h" +#include #include #include @@ -314,10 +315,19 @@ bool Pipeline::Init(PipelineConfig&& config) { ProcessQueueManager::GetInstance()->SetDownStreamQueues(mContext.GetProcessQueueKey(), std::move(senderQueues)); } + WriteMetrics::GetInstance()->PrepareMetricsRecordRef( + mMetricsRecordRef, {{METRIC_LABEL_PROJECT, mContext.GetProjectName()}, {METRIC_LABEL_CONFIG_NAME, mName}}); + mStartTime = mMetricsRecordRef.CreateIntGauge("start_time"); + mProcessorsInEventsCnt = mMetricsRecordRef.CreateCounter("processors_in_events_cnt"); + mProcessorsInGroupsCnt = mMetricsRecordRef.CreateCounter("processors_in_event_groups_cnt"); + mProcessorsInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter("processors_in_event_group_data_size_bytes"); + mProcessorsTotalDelayMs = mMetricsRecordRef.CreateCounter("processors_total_delay_ms"); + return true; } void Pipeline::Start() { +#ifndef APSARA_UNIT_TEST_MAIN // TODO: 应该保证指定时间内返回,如果无法返回,将配置放入startDisabled里 for (const auto& flusher : mFlushers) { flusher->Start(); @@ -337,16 +347,27 @@ void Pipeline::Start() { input->Start(); } - LOG_INFO(sLogger, ("pipeline start", "succeeded")("config", mName)); + mStartTime->Set(chrono::duration_cast(chrono::system_clock::now().time_since_epoch()).count()); +#endif + LOG_INFO(sLogger, ("pipeline start", "succeeded")("config", mName)("ptr", mStartTime.get())); } void Pipeline::Process(vector& logGroupList, size_t inputIndex) { + for (const auto& logGroup : logGroupList) { + mProcessorsInEventsCnt->Add(logGroup.GetEvents().size()); + mProcessorsInGroupDataSizeBytes->Add(logGroup.DataSize()); + } + mProcessorsInGroupsCnt->Add(logGroupList.size()); + + auto before = chrono::system_clock::now(); for (auto& p : mInputs[inputIndex]->GetInnerProcessors()) { p->Process(logGroupList); } for (auto& p : mProcessorLine) { p->Process(logGroupList); } + mProcessorsTotalDelayMs->Add( + chrono::duration_cast(chrono::system_clock::now() - before).count()); } bool Pipeline::Send(vector&& groupList) { @@ -381,6 +402,7 @@ bool Pipeline::FlushBatch() { } void Pipeline::Stop(bool isRemoving) { +#ifndef APSARA_UNIT_TEST_MAIN // TODO: 应该保证指定时间内返回,如果无法返回,将配置放入stopDisabled里 for (const auto& input : mInputs) { input->Stop(isRemoving); @@ -403,7 +425,7 @@ void Pipeline::Stop(bool isRemoving) { for (const auto& flusher : mFlushers) { flusher->Stop(isRemoving); } - +#endif LOG_INFO(sLogger, ("pipeline stop", "succeeded")("config", mName)); } diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index 90bc5ada01..e3775f7b48 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -24,14 +24,15 @@ #include #include "config/PipelineConfig.h" -#include "plugin/input/InputContainerStdio.h" -#include "plugin/input/InputFile.h" #include "models/PipelineEventGroup.h" +#include "monitor/LogtailMetric.h" #include "pipeline/PipelineContext.h" #include "pipeline/plugin/instance/FlusherInstance.h" #include "pipeline/plugin/instance/InputInstance.h" #include "pipeline/plugin/instance/ProcessorInstance.h" #include "pipeline/route/Router.h" +#include "plugin/input/InputContainerStdio.h" +#include "plugin/input/InputFile.h" namespace logtail { @@ -84,8 +85,15 @@ class Pipeline { std::unique_ptr mConfig; std::atomic_uint16_t mPluginID; + mutable MetricsRecordRef mMetricsRecordRef; + IntGaugePtr mStartTime; + CounterPtr mProcessorsInEventsCnt; + CounterPtr mProcessorsInGroupsCnt; + CounterPtr mProcessorsInGroupDataSizeBytes; + CounterPtr mProcessorsTotalDelayMs; + #ifdef APSARA_UNIT_TEST_MAIN - friend class PipelineMock; + friend class PipelineMock; friend class PipelineUnittest; friend class InputContainerStdioUnittest; friend class InputFileUnittest; diff --git a/core/pipeline/batch/BatchItem.h b/core/pipeline/batch/BatchItem.h index b27fa0f5e8..9fa956dbef 100644 --- a/core/pipeline/batch/BatchItem.h +++ b/core/pipeline/batch/BatchItem.h @@ -20,11 +20,11 @@ #include #include +#include "models/PipelineEventGroup.h" +#include "models/StringView.h" #include "pipeline/batch/BatchStatus.h" #include "pipeline/batch/BatchedEvents.h" #include "pipeline/batch/FlushStrategy.h" -#include "models/PipelineEventGroup.h" -#include "models/StringView.h" namespace logtail { @@ -32,7 +32,9 @@ class GroupBatchItem { public: GroupBatchItem() { mStatus.Reset(); } - void Add(BatchedEvents&& g) { + void Add(BatchedEvents&& g, int64_t totalEnqueTimeMs) { + mEventsCnt += g.mEvents.size(); + mTotalEnqueTimeMs += totalEnqueTimeMs; mGroups.emplace_back(std::move(g)); mStatus.Update(mGroups.back()); } @@ -56,6 +58,10 @@ class GroupBatchItem { } GroupBatchStatus& GetStatus() { return mStatus; } + size_t GroupSize() const { return mGroups.size(); } + size_t EventSize() const { return mEventsCnt; } + size_t DataSize() const { return mStatus.GetSize(); } + int64_t TotalEnqueTimeMs() const { return mTotalEnqueTimeMs; } bool IsEmpty() { return mGroups.empty(); } @@ -63,11 +69,17 @@ class GroupBatchItem { void Clear() { mGroups.clear(); mStatus.Reset(); + mEventsCnt = 0; + mTotalEnqueTimeMs = 0; } std::vector mGroups; GroupBatchStatus mStatus; + size_t mEventsCnt = 0; + // if more than 10^6 events are contained in the batch, the value may overflow + // however, this is almost impossible in practice + int64_t mTotalEnqueTimeMs = 0; #ifdef APSARA_UNIT_TEST_MAIN friend class EventBatchItemUnittest; @@ -82,6 +94,9 @@ class EventBatchItem { void Add(PipelineEventPtr&& e) { mBatch.mEvents.emplace_back(std::move(e)); mStatus.Update(mBatch.mEvents.back()); + mTotalEnqueTimeMs += std::chrono::time_point_cast(std::chrono::system_clock::now()) + .time_since_epoch() + .count(); } void Flush(GroupBatchItem& res) { @@ -91,7 +106,8 @@ class EventBatchItem { if (mBatch.mExactlyOnceCheckpoint) { UpdateExactlyOnceLogPosition(); } - res.Add(std::move(mBatch)); + mBatch.mSizeBytes = DataSize(); + res.Add(std::move(mBatch), mTotalEnqueTimeMs); Clear(); } @@ -102,6 +118,7 @@ class EventBatchItem { if (mBatch.mExactlyOnceCheckpoint) { UpdateExactlyOnceLogPosition(); } + mBatch.mSizeBytes = DataSize(); res.emplace_back(std::move(mBatch)); Clear(); } @@ -114,6 +131,7 @@ class EventBatchItem { if (mBatch.mExactlyOnceCheckpoint) { UpdateExactlyOnceLogPosition(); } + mBatch.mSizeBytes = DataSize(); res.back().emplace_back(std::move(mBatch)); Clear(); } @@ -141,11 +159,16 @@ class EventBatchItem { bool IsEmpty() { return mBatch.mEvents.empty(); } + size_t DataSize() const { return sizeof(decltype(mBatch.mEvents)) + mStatus.GetSize() + mBatch.mTags.DataSize(); } + size_t EventSize() const { return mBatch.mEvents.size(); } + int64_t TotalEnqueTimeMs() const { return mTotalEnqueTimeMs; } + private: void Clear() { mBatch.Clear(); mSourceBuffers.clear(); mStatus.Reset(); + mTotalEnqueTimeMs = 0; } void UpdateExactlyOnceLogPosition() { @@ -159,6 +182,9 @@ class EventBatchItem { BatchedEvents mBatch; std::unordered_set mSourceBuffers; T mStatus; + // if more than 10^6 events are contained in the batch, the value may overflow + // however, this is almost impossible in practice + int64_t mTotalEnqueTimeMs = 0; #ifdef APSARA_UNIT_TEST_MAIN friend class EventBatchItemUnittest; diff --git a/core/pipeline/batch/BatchStatus.h b/core/pipeline/batch/BatchStatus.h index 18c2b7db6c..224a83dc78 100644 --- a/core/pipeline/batch/BatchStatus.h +++ b/core/pipeline/batch/BatchStatus.h @@ -67,7 +67,7 @@ class GroupBatchStatus { if (mCreateTime == 0) { mCreateTime = time(nullptr); } - mSizeBytes += g.DataSize(); + mSizeBytes += g.mSizeBytes; } uint32_t GetSize() const { return mSizeBytes; } diff --git a/core/pipeline/batch/BatchedEvents.h b/core/pipeline/batch/BatchedEvents.h index 9ec5d0ff37..e16d15be15 100644 --- a/core/pipeline/batch/BatchedEvents.h +++ b/core/pipeline/batch/BatchedEvents.h @@ -28,6 +28,7 @@ struct BatchedEvents { EventsContainer mEvents; SizedMap mTags; std::vector> mSourceBuffers; + size_t mSizeBytes = 0; // only set on completion // for flusher_sls only RangeCheckpointPtr mExactlyOnceCheckpoint; StringView mPackIdPrefix; @@ -45,20 +46,17 @@ struct BatchedEvents { mExactlyOnceCheckpoint(std::move(eoo)), mPackIdPrefix(packIdPrefix) { mSourceBuffers.emplace_back(std::move(sourceBuffer)); - } - - size_t DataSize() const { - size_t eventsSize = sizeof(decltype(mEvents)); + mSizeBytes = sizeof(decltype(mEvents)) + mTags.DataSize(); for (const auto& item : mEvents) { - eventsSize += item->DataSize(); + mSizeBytes += item->DataSize(); } - return eventsSize + mTags.DataSize(); } void Clear() { mEvents.clear(); mTags.Clear(); mSourceBuffers.clear(); + mSizeBytes = 0; mExactlyOnceCheckpoint.reset(); mPackIdPrefix = StringView(); } diff --git a/core/pipeline/batch/Batcher.h b/core/pipeline/batch/Batcher.h index b165bab370..ca01986ca5 100644 --- a/core/pipeline/batch/Batcher.h +++ b/core/pipeline/batch/Batcher.h @@ -24,14 +24,16 @@ #include #include -#include "pipeline/batch/BatchItem.h" -#include "pipeline/batch/BatchStatus.h" -#include "pipeline/batch/FlushStrategy.h" -#include "pipeline/batch/TimeoutFlushManager.h" #include "common/Flags.h" #include "common/ParamExtractor.h" #include "models/PipelineEventGroup.h" +#include "monitor/LogtailMetric.h" +#include "monitor/MetricConstants.h" #include "pipeline/PipelineContext.h" +#include "pipeline/batch/BatchItem.h" +#include "pipeline/batch/BatchStatus.h" +#include "pipeline/batch/FlushStrategy.h" +#include "pipeline/batch/TimeoutFlushManager.h" namespace logtail { @@ -97,6 +99,26 @@ class Batcher { mFlusher = flusher; + std::vector> labels{ + {METRIC_LABEL_PROJECT, ctx.GetProjectName()}, + {METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()}, + {METRIC_LABEL_KEY_COMPONENT_NAME, "batcher"}, + {METRIC_LABEL_KEY_FLUSHER_NODE_ID, flusher->GetNodeID()}}; + if (enableGroupBatch) { + labels.emplace_back("enable_group_batch", "true"); + } else { + labels.emplace_back("enable_group_batch", "false"); + } + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, std::move(labels)); + mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENTS_CNT); + mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); + mOutEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_EVENTS_CNT); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); + mEventBatchItemsCnt = mMetricsRecordRef.CreateIntGauge("event_batches_cnt"); + mBufferedGroupsCnt = mMetricsRecordRef.CreateIntGauge("buffered_groups_cnt"); + mBufferedEventsCnt = mMetricsRecordRef.CreateIntGauge("buffered_events_cnt"); + mBufferedDataSizeByte = mMetricsRecordRef.CreateIntGauge("buffered_data_size_bytes"); + return true; } @@ -105,15 +127,20 @@ class Batcher { std::lock_guard lock(mMux); size_t key = g.GetTagsHash(); EventBatchItem& item = mEventQueueMap[key]; + mInEventsCnt->Add(g.GetEvents().size()); + mInGroupDataSizeBytes->Add(g.DataSize()); + mEventBatchItemsCnt->Set(mEventQueueMap.size()); size_t eventsSize = g.GetEvents().size(); for (size_t i = 0; i < eventsSize; ++i) { PipelineEventPtr& e = g.MutableEvents()[i]; if (!item.IsEmpty() && mEventFlushStrategy.NeedFlushByTime(item.GetStatus(), e)) { if (!mGroupQueue) { + UpdateMetricsOnFlushingEventQueue(item); item.Flush(res); } else { if (!mGroupQueue->IsEmpty() && mGroupFlushStrategy->NeedFlushByTime(mGroupQueue->GetStatus())) { + UpdateMetricsOnFlushingGroupQueue(); mGroupQueue->Flush(res); } if (mGroupQueue->IsEmpty()) { @@ -125,6 +152,7 @@ class Batcher { } item.Flush(mGroupQueue.value()); if (mGroupFlushStrategy->NeedFlushBySize(mGroupQueue->GetStatus())) { + UpdateMetricsOnFlushingGroupQueue(); mGroupQueue->Flush(res); } } @@ -136,12 +164,17 @@ class Batcher { g.GetMetadata(EventGroupMetaKey::SOURCE_ID)); TimeoutFlushManager::GetInstance()->UpdateRecord( mFlusher->GetContext().GetConfigName(), 0, key, mEventFlushStrategy.GetTimeoutSecs(), mFlusher); + mBufferedGroupsCnt->Add(1); + mBufferedDataSizeByte->Add(item.DataSize()); } else if (i == 0) { item.AddSourceBuffer(g.GetSourceBuffer()); } + mBufferedEventsCnt->Add(1); + mBufferedDataSizeByte->Add(e->DataSize()); item.Add(std::move(e)); if (mEventFlushStrategy.NeedFlushBySize(item.GetStatus()) || mEventFlushStrategy.NeedFlushByCnt(item.GetStatus())) { + UpdateMetricsOnFlushingEventQueue(item); item.Flush(res); } } @@ -155,6 +188,7 @@ class Batcher { if (!mGroupQueue) { return; } + UpdateMetricsOnFlushingGroupQueue(); return mGroupQueue->Flush(res); } @@ -164,12 +198,15 @@ class Batcher { } if (!mGroupQueue) { + UpdateMetricsOnFlushingEventQueue(iter->second); iter->second.Flush(res); mEventQueueMap.erase(iter); + mEventBatchItemsCnt->Set(mEventQueueMap.size()); return; } if (!mGroupQueue->IsEmpty() && mGroupFlushStrategy->NeedFlushByTime(mGroupQueue->GetStatus())) { + UpdateMetricsOnFlushingGroupQueue(); mGroupQueue->Flush(res); } if (mGroupQueue->IsEmpty()) { @@ -178,7 +215,9 @@ class Batcher { } iter->second.Flush(mGroupQueue.value()); mEventQueueMap.erase(iter); + mEventBatchItemsCnt->Set(mEventQueueMap.size()); if (mGroupFlushStrategy->NeedFlushBySize(mGroupQueue->GetStatus())) { + UpdateMetricsOnFlushingGroupQueue(); mGroupQueue->Flush(res); } } @@ -187,20 +226,25 @@ class Batcher { std::lock_guard lock(mMux); for (auto& item : mEventQueueMap) { if (!mGroupQueue) { + UpdateMetricsOnFlushingEventQueue(item.second); item.second.Flush(res); } else { if (!mGroupQueue->IsEmpty() && mGroupFlushStrategy->NeedFlushByTime(mGroupQueue->GetStatus())) { + UpdateMetricsOnFlushingGroupQueue(); mGroupQueue->Flush(res); } item.second.Flush(mGroupQueue.value()); if (mGroupFlushStrategy->NeedFlushBySize(mGroupQueue->GetStatus())) { + UpdateMetricsOnFlushingGroupQueue(); mGroupQueue->Flush(res); } } } if (mGroupQueue) { + UpdateMetricsOnFlushingGroupQueue(); mGroupQueue->Flush(res); } + mEventBatchItemsCnt->Set(0); mEventQueueMap.clear(); } @@ -210,6 +254,32 @@ class Batcher { #endif private: + void UpdateMetricsOnFlushingEventQueue(const EventBatchItem& item) { + mOutEventsCnt->Add(item.EventSize()); + mTotalDelayMs->Add( + item.EventSize() + * std::chrono::time_point_cast(std::chrono::system_clock::now()) + .time_since_epoch() + .count() + - item.TotalEnqueTimeMs()); + mBufferedGroupsCnt->Sub(1); + mBufferedEventsCnt->Sub(item.EventSize()); + mBufferedDataSizeByte->Sub(item.DataSize()); + } + + void UpdateMetricsOnFlushingGroupQueue() { + mOutEventsCnt->Add(mGroupQueue->EventSize()); + mTotalDelayMs->Add( + mGroupQueue->EventSize() + * std::chrono::time_point_cast(std::chrono::system_clock::now()) + .time_since_epoch() + .count() + - mGroupQueue->TotalEnqueTimeMs()); + mBufferedGroupsCnt->Sub(mGroupQueue->GroupSize()); + mBufferedEventsCnt->Sub(mGroupQueue->EventSize()); + mBufferedDataSizeByte->Sub(mGroupQueue->DataSize()); + } + std::mutex mMux; std::map> mEventQueueMap; EventFlushStrategy mEventFlushStrategy; @@ -219,6 +289,16 @@ class Batcher { Flusher* mFlusher = nullptr; + mutable MetricsRecordRef mMetricsRecordRef; + CounterPtr mInEventsCnt; + CounterPtr mInGroupDataSizeBytes; + CounterPtr mOutEventsCnt; + CounterPtr mTotalDelayMs; + IntGaugePtr mEventBatchItemsCnt; + IntGaugePtr mBufferedGroupsCnt; + IntGaugePtr mBufferedEventsCnt; + IntGaugePtr mBufferedDataSizeByte; + #ifdef APSARA_UNIT_TEST_MAIN friend class BatcherUnittest; #endif diff --git a/core/pipeline/plugin/instance/FlusherInstance.cpp b/core/pipeline/plugin/instance/FlusherInstance.cpp index 9623a96b75..997f75fe4b 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.cpp +++ b/core/pipeline/plugin/instance/FlusherInstance.cpp @@ -25,14 +25,14 @@ bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, return false; } - mFlusherInRecordsTotal = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_FLUSHER_IN_RECORDS_TOTAL); - mFlusherInRecordsSizeBytes = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_FLUSHER_IN_RECORDS_SIZE_BYTES); + mInEventsCnt = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_IN_EVENTS_CNT); + mInGroupDataSizeBytes = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); return true; } bool FlusherInstance::Send(PipelineEventGroup&& g) { - mFlusherInRecordsTotal->Add(g.GetEvents().size()); - mFlusherInRecordsSizeBytes->Add(g.DataSize()); + mInEventsCnt->Add(g.GetEvents().size()); + mInGroupDataSizeBytes->Add(g.DataSize()); return mPlugin->Send(std::move(g)); } diff --git a/core/pipeline/plugin/instance/FlusherInstance.h b/core/pipeline/plugin/instance/FlusherInstance.h index 2900a594df..4abec0d766 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.h +++ b/core/pipeline/plugin/instance/FlusherInstance.h @@ -46,8 +46,8 @@ class FlusherInstance : public PluginInstance { private: std::unique_ptr mPlugin; - CounterPtr mFlusherInRecordsTotal; - CounterPtr mFlusherInRecordsSizeBytes; + CounterPtr mInEventsCnt; + CounterPtr mInGroupDataSizeBytes; }; } // namespace logtail diff --git a/core/pipeline/plugin/instance/ProcessorInstance.cpp b/core/pipeline/plugin/instance/ProcessorInstance.cpp index 8790886cd4..70dbc5a172 100644 --- a/core/pipeline/plugin/instance/ProcessorInstance.cpp +++ b/core/pipeline/plugin/instance/ProcessorInstance.cpp @@ -49,11 +49,9 @@ void ProcessorInstance::Process(vector& logGroupList) { mProcInRecordsTotal->Add(logGroup.GetEvents().size()); } - uint64_t startTime = GetCurrentTimeInMilliSeconds(); + auto before = chrono::system_clock::now(); mPlugin->Process(logGroupList); - uint64_t durationTime = GetCurrentTimeInMilliSeconds() - startTime; - - mProcTimeMS->Add(durationTime); + mProcTimeMS->Add(chrono::duration_cast(chrono::system_clock::now() - before).count()); for (const auto& logGroup : logGroupList) { mProcOutRecordsTotal->Add(logGroup.GetEvents().size()); diff --git a/core/pipeline/queue/QueueInterface.h b/core/pipeline/queue/QueueInterface.h index 5c2e3b82cb..1c450013ad 100644 --- a/core/pipeline/queue/QueueInterface.h +++ b/core/pipeline/queue/QueueInterface.h @@ -33,10 +33,10 @@ class QueueInterface { {METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()}, }); - mInItemsCnt = mMetricsRecordRef.CreateCounter("in_items_cnt"); - mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter("in_item_data_size_bytes"); - mOutItemsCnt = mMetricsRecordRef.CreateCounter("out_items_cnt"); - mTotalDelayMs = mMetricsRecordRef.CreateCounter("total_delay_ms"); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); + mInItemDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); mQueueSize = mMetricsRecordRef.CreateIntGauge("queue_size"); mQueueDataSizeByte = mMetricsRecordRef.CreateIntGauge("queue_data_size_bytes"); } diff --git a/core/pipeline/route/Router.cpp b/core/pipeline/route/Router.cpp index e9c4a817cd..73de16bcda 100644 --- a/core/pipeline/route/Router.cpp +++ b/core/pipeline/route/Router.cpp @@ -15,6 +15,7 @@ #include "pipeline/route/Router.h" #include "common/ParamExtractor.h" +#include "monitor/MetricConstants.h" #include "pipeline/Pipeline.h" #include "pipeline/plugin/interface/Flusher.h" @@ -33,10 +34,20 @@ bool Router::Init(std::vector> configs, const P mAlwaysMatchedFlusherIdx.push_back(item.first); } } + + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(mMetricsRecordRef, + {{METRIC_LABEL_PROJECT, ctx.GetProjectName()}, + {METRIC_LABEL_CONFIG_NAME, ctx.GetConfigName()}, + {METRIC_LABEL_KEY_COMPONENT_NAME, "router"}}); + mInEventsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENTS_CNT); + mInGroupDataSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_EVENT_GROUP_SIZE_BYTES); return true; } vector Router::Route(const PipelineEventGroup& g) const { + mInEventsCnt->Add(g.GetEvents().size()); + mInGroupDataSizeBytes->Add(g.DataSize()); + vector res(mAlwaysMatchedFlusherIdx); for (size_t i = 0; i < mConditions.size(); ++i) { if (mConditions[i].second.Check(g)) { diff --git a/core/pipeline/route/Router.h b/core/pipeline/route/Router.h index c46dedca6e..b036256551 100644 --- a/core/pipeline/route/Router.h +++ b/core/pipeline/route/Router.h @@ -22,6 +22,7 @@ #include #include "models/PipelineEventGroup.h" +#include "monitor/LogtailMetric.h" #include "pipeline/route/Condition.h" namespace logtail { @@ -37,6 +38,10 @@ class Router { std::vector> mConditions; std::vector mAlwaysMatchedFlusherIdx; + mutable MetricsRecordRef mMetricsRecordRef; + CounterPtr mInEventsCnt; + CounterPtr mInGroupDataSizeBytes; + #ifdef APSARA_UNIT_TEST_MAIN friend class RouterUnittest; friend class PipelineUnittest; diff --git a/core/pipeline/serializer/SLSSerializer.cpp b/core/pipeline/serializer/SLSSerializer.cpp index 600e709f68..f24030c100 100644 --- a/core/pipeline/serializer/SLSSerializer.cpp +++ b/core/pipeline/serializer/SLSSerializer.cpp @@ -17,14 +17,14 @@ #include "application/Application.h" #include "common/Flags.h" #include "common/TimeUtil.h" -#include "pipeline/compression/CompressType.h" +#include "common/compression/CompressType.h" #include "plugin/flusher/sls/FlusherSLS.h" DEFINE_FLAG_INT32(max_send_log_group_size, "bytes", 10 * 1024 * 1024); const std::string METRIC_RESERVED_KEY_NAME = "__name__"; -const std::string METRIC_RESERVED_KEY_LABELS = "__labels__"; +const std::string METRIC_RESERVED_KEY_LABELS = "__labels__"; const std::string METRIC_RESERVED_KEY_VALUE = "__value__"; const std::string METRIC_RESERVED_KEY_TIME_NANO = "__time_nano__"; @@ -35,7 +35,31 @@ using namespace std; namespace logtail { +template <> +bool Serializer>::DoSerialize(vector&& p, + std::string& output, + std::string& errorMsg) { + auto inputSize = 0; + for (auto& item : p) { + inputSize += item.mData.size(); + } + mInItemsCnt->Add(1); + mInItemSizeBytes->Add(inputSize); + auto before = std::chrono::system_clock::now(); + auto res = Serialize(std::move(p), output, errorMsg); + mTotalDelayMs->Add( + std::chrono::duration_cast(std::chrono::system_clock::now() - before).count()); + + if (res) { + mOutItemsCnt->Add(1); + mOutItemSizeBytes->Add(output.size()); + } else { + mDiscardedItemsCnt->Add(1); + mDiscardedItemSizeBytes->Add(inputSize); + } + return res; +} bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, string& errorMsg) { sls_logs::LogGroup logGroup; @@ -56,7 +80,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri } else if (e.Is()) { const auto& metricEvent = e.Cast(); if (metricEvent.Is()) { - continue; + continue; } auto log = logGroup.add_logs(); std::ostringstream oss; @@ -78,9 +102,10 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri logPtr = log->add_contents(); logPtr->set_key(METRIC_RESERVED_KEY_TIME_NANO); if (metricEvent.GetTimestampNanosecond()) { - logPtr->set_value(std::to_string(metricEvent.GetTimestamp()) + NumberToDigitString(metricEvent.GetTimestampNanosecond().value(), 9)); + logPtr->set_value(std::to_string(metricEvent.GetTimestamp()) + + NumberToDigitString(metricEvent.GetTimestampNanosecond().value(), 9)); } else { - logPtr->set_value(std::to_string(metricEvent.GetTimestamp())); + logPtr->set_value(std::to_string(metricEvent.GetTimestamp())); } // set __value__ if (metricEvent.Is()) { @@ -88,7 +113,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri logPtr = log->add_contents(); logPtr->set_key(METRIC_RESERVED_KEY_VALUE); logPtr->set_value(std::to_string(value)); - } + } // set __name__ logPtr = log->add_contents(); logPtr->set_key(METRIC_RESERVED_KEY_NAME); @@ -122,9 +147,7 @@ bool SLSEventGroupSerializer::Serialize(BatchedEvents&& group, string& res, stri return true; } -bool SLSEventGroupListSerializer::Serialize(vector&& v, - string& res, - string& errorMsg) { +bool SLSEventGroupListSerializer::Serialize(vector&& v, string& res, string& errorMsg) { sls_logs::SlsLogPackageList logPackageList; for (const auto& item : v) { auto package = logPackageList.add_packages(); diff --git a/core/pipeline/serializer/SLSSerializer.h b/core/pipeline/serializer/SLSSerializer.h index f19d0f8e3a..c9bdb56798 100644 --- a/core/pipeline/serializer/SLSSerializer.h +++ b/core/pipeline/serializer/SLSSerializer.h @@ -27,6 +27,7 @@ class SLSEventGroupSerializer : public Serializer { public: SLSEventGroupSerializer(Flusher* f) : Serializer(f) {} +private: bool Serialize(BatchedEvents&& p, std::string& res, std::string& errorMsg) override; }; @@ -37,10 +38,14 @@ struct CompressedLogGroup { CompressedLogGroup(std::string&& data, size_t rawSize) : mData(std::move(data)), mRawSize(rawSize) {} }; +template<> +bool Serializer>::DoSerialize(std::vector&& p, std::string& output, std::string& errorMsg); + class SLSEventGroupListSerializer : public Serializer> { public: SLSEventGroupListSerializer(Flusher* f) : Serializer>(f) {} +private: bool Serialize(std::vector&& v, std::string& res, std::string& errorMsg) override; }; diff --git a/core/pipeline/serializer/Serializer.h b/core/pipeline/serializer/Serializer.h index 968f882a91..b632124b7a 100644 --- a/core/pipeline/serializer/Serializer.h +++ b/core/pipeline/serializer/Serializer.h @@ -16,27 +16,92 @@ #pragma once +#include #include -#include "pipeline/batch/BatchedEvents.h" #include "models/PipelineEventPtr.h" +#include "monitor/MetricConstants.h" +#include "pipeline/batch/BatchedEvents.h" #include "pipeline/plugin/interface/Flusher.h" namespace logtail { +inline size_t GetInputSize(const PipelineEventPtr& p) { + return p->DataSize(); +} + +inline size_t GetInputSize(const BatchedEvents& p) { + return p.mSizeBytes; +} + +inline size_t GetInputSize(const BatchedEventsList& p) { + size_t size = 0; + for (const auto& e : p) { + size += GetInputSize(e); + } + return size; +} + // T: PipelineEventPtr, BatchedEvents, BatchedEventsList template class Serializer { public: - Serializer() = default; - Serializer(Flusher* f) : mFlusher(f) {} + Serializer(Flusher* f) : mFlusher(f) { + WriteMetrics::GetInstance()->PrepareMetricsRecordRef( + mMetricsRecordRef, + {{METRIC_LABEL_PROJECT, f->GetContext().GetProjectName()}, + {METRIC_LABEL_CONFIG_NAME, f->GetContext().GetConfigName()}, + {METRIC_LABEL_KEY_COMPONENT_NAME, "serializer"}, + {METRIC_LABEL_KEY_FLUSHER_NODE_ID, f->GetNodeID()}}); + mInItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEMS_CNT); + mInItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_IN_ITEM_SIZE_BYTES); + mOutItemsCnt = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEMS_CNT); + mOutItemSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_OUT_ITEM_SIZE_BYTES); + mDiscardedItemsCnt = mMetricsRecordRef.CreateCounter("discarded_items_cnt"); + mDiscardedItemSizeBytes = mMetricsRecordRef.CreateCounter("discarded_item_data_size_bytes"); + mTotalDelayMs = mMetricsRecordRef.CreateCounter(METRIC_TOTAL_DELAY_MS); + } virtual ~Serializer() = default; - virtual bool Serialize(T&& p, std::string& res, std::string& errorMsg) = 0; + bool DoSerialize(T&& p, std::string& output, std::string& errorMsg) { + auto inputSize = GetInputSize(p); + mInItemsCnt->Add(1); + mInItemSizeBytes->Add(inputSize); + + auto before = std::chrono::system_clock::now(); + auto res = Serialize(std::move(p), output, errorMsg); + mTotalDelayMs->Add( + std::chrono::duration_cast(std::chrono::system_clock::now() - before).count()); + + if (res) { + mOutItemsCnt->Add(1); + mOutItemSizeBytes->Add(output.size()); + } else { + mDiscardedItemsCnt->Add(1); + mDiscardedItemSizeBytes->Add(inputSize); + } + return res; + } protected: // if serialized output contains output related info, it can be obtained via this member const Flusher* mFlusher = nullptr; + + mutable MetricsRecordRef mMetricsRecordRef; + CounterPtr mInItemsCnt; + CounterPtr mInItemSizeBytes; + CounterPtr mOutItemsCnt; + CounterPtr mOutItemSizeBytes; + CounterPtr mDiscardedItemsCnt; + CounterPtr mDiscardedItemSizeBytes; + CounterPtr mTotalDelayMs; + +private: + virtual bool Serialize(T&& p, std::string& res, std::string& errorMsg) = 0; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class SerializerUnittest; +#endif }; using EventSerializer = Serializer; diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 5df36854c9..5f965a4535 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -23,9 +23,9 @@ #include "common/LogtailCommonFlags.h" #include "common/ParamExtractor.h" #include "common/TimeUtil.h" +#include "common/compression/CompressorFactory.h" #include "pipeline/Pipeline.h" #include "pipeline/batch/FlushStrategy.h" -#include "pipeline/compression/CompressorFactory.h" #include "pipeline/queue/QueueKeyManager.h" #include "pipeline/queue/SLSSenderQueueItem.h" #include "pipeline/queue/SenderQueueManager.h" @@ -445,7 +445,7 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline // CompressType if (BOOL_FLAG(sls_client_send_compress)) { - mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, CompressType::LZ4); + mCompressor = CompressorFactory::GetInstance()->Create(config, *mContext, sName, mNodeID, CompressType::LZ4); } mGroupSerializer = make_unique(this); @@ -809,7 +809,7 @@ bool FlusherSLS::Send(string&& data, const string& shardHashKey, const string& l string compressedData; if (mCompressor) { string errorMsg; - if (!mCompressor->Compress(data, compressedData, errorMsg)) { + if (!mCompressor->DoCompress(data, compressedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to compress data", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); @@ -866,7 +866,7 @@ bool FlusherSLS::SerializeAndPush(PipelineEventGroup&& group) { std::move(group.GetExactlyOnceCheckpoint())); AddPackId(g); string errorMsg; - if (!mGroupSerializer->Serialize(std::move(g), serializedData, errorMsg)) { + if (!mGroupSerializer->DoSerialize(std::move(g), serializedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to serialize event group", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); @@ -880,7 +880,7 @@ bool FlusherSLS::SerializeAndPush(PipelineEventGroup&& group) { return false; } if (mCompressor) { - if (!mCompressor->Compress(serializedData, compressedData, errorMsg)) { + if (!mCompressor->DoCompress(serializedData, compressedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to compress event group", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); @@ -926,7 +926,7 @@ bool FlusherSLS::SerializeAndPush(BatchedEventsList&& groupList) { } AddPackId(group); string errorMsg; - if (!mGroupSerializer->Serialize(std::move(group), serializedData, errorMsg)) { + if (!mGroupSerializer->DoSerialize(std::move(group), serializedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to serialize event group", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); @@ -941,7 +941,7 @@ bool FlusherSLS::SerializeAndPush(BatchedEventsList&& groupList) { continue; } if (mCompressor) { - if (!mCompressor->Compress(serializedData, compressedData, errorMsg)) { + if (!mCompressor->DoCompress(serializedData, compressedData, errorMsg)) { LOG_WARNING(mContext->GetLogger(), ("failed to compress event group", errorMsg)("action", "discard data")("plugin", sName)("config", mContext->GetConfigName())); @@ -991,7 +991,7 @@ bool FlusherSLS::SerializeAndPush(BatchedEventsList&& groupList) { } if (enablePackageList) { string errorMsg; - mGroupListSerializer->Serialize(std::move(compressedLogGroups), serializedData, errorMsg); + mGroupListSerializer->DoSerialize(std::move(compressedLogGroups), serializedData, errorMsg); allSucceeded = Flusher::PushToQueue(make_unique( std::move(serializedData), packageSize, this, mQueueKey, mLogstore, RawDataType::EVENT_GROUP_LIST)) diff --git a/core/plugin/flusher/sls/FlusherSLS.h b/core/plugin/flusher/sls/FlusherSLS.h index 2840b94fe4..7e3d4658a1 100644 --- a/core/plugin/flusher/sls/FlusherSLS.h +++ b/core/plugin/flusher/sls/FlusherSLS.h @@ -24,12 +24,12 @@ #include #include +#include "common/compression/Compressor.h" +#include "models/PipelineEventGroup.h" #include "pipeline/batch/BatchStatus.h" #include "pipeline/batch/Batcher.h" -#include "pipeline/compression/Compressor.h" -#include "models/PipelineEventGroup.h" -#include "pipeline/plugin/interface/HttpFlusher.h" #include "pipeline/limiter/ConcurrencyLimiter.h" +#include "pipeline/plugin/interface/HttpFlusher.h" #include "pipeline/serializer/SLSSerializer.h" namespace logtail { diff --git a/core/profile_sender/ProfileSender.cpp b/core/profile_sender/ProfileSender.cpp index 1121e91216..7098f31ce1 100644 --- a/core/profile_sender/ProfileSender.cpp +++ b/core/profile_sender/ProfileSender.cpp @@ -28,7 +28,7 @@ #include "sdk/Exception.h" #include "sls_control/SLSControl.h" // TODO: temporarily used -#include "pipeline/compression/CompressorFactory.h" +#include "common/compression/CompressorFactory.h" using namespace std; @@ -101,8 +101,8 @@ void ProfileSender::SetProfileProjectName(const string& region, const string& pr flusher.mAliuid = STRING_FLAG(logtail_profile_aliuid); // logstore is given at send time // TODO: temporarily used - flusher.mCompressor - = CompressorFactory::GetInstance()->Create(Json::Value(), PipelineContext(), "flusher_sls", CompressType::ZSTD); + flusher.mCompressor = CompressorFactory::GetInstance()->Create( + Json::Value(), PipelineContext(), "flusher_sls", "", CompressType::ZSTD); } FlusherSLS* ProfileSender::GetFlusher(const string& region) { diff --git a/core/unittest/batch/BatchItemUnittest.cpp b/core/unittest/batch/BatchItemUnittest.cpp index 71d2d7fa59..0aac5d30b3 100644 --- a/core/unittest/batch/BatchItemUnittest.cpp +++ b/core/unittest/batch/BatchItemUnittest.cpp @@ -92,6 +92,7 @@ void EventBatchItemUnittest::TestAdd() { APSARA_TEST_EQUAL(1U, mItem.mBatch.mEvents.size()); APSARA_TEST_EQUAL(1U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(size, mItem.GetStatus().GetSize()); + APSARA_TEST_NOT_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestFlushEmpty() { @@ -116,10 +117,12 @@ void EventBatchItemUnittest::TestFlushGroupBatchItem() { sEventGroup->AddLogEvent(); PipelineEventPtr& e = sEventGroup->MutableEvents().back(); mItem.Add(std::move(e)); + auto size = mItem.DataSize(); GroupBatchItem res; mItem.Flush(res); APSARA_TEST_EQUAL(1U, res.mGroups.size()); + APSARA_TEST_EQUAL(size, res.mGroups[0].mSizeBytes); APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_TRUE(mItem.mBatch.mTags.mInner.empty()); @@ -130,12 +133,14 @@ void EventBatchItemUnittest::TestFlushGroupBatchItem() { APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); + APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestFlushBatchedEvensList() { sEventGroup->AddLogEvent(); PipelineEventPtr& e = sEventGroup->MutableEvents().back(); mItem.Add(std::move(e)); + auto size = mItem.DataSize(); BatchedEventsList res; mItem.Flush(res); @@ -145,6 +150,7 @@ void EventBatchItemUnittest::TestFlushBatchedEvensList() { APSARA_TEST_NOT_EQUAL(nullptr, res[0].mExactlyOnceCheckpoint); APSARA_TEST_STREQ("pack_id", res[0].mPackIdPrefix.data()); APSARA_TEST_EQUAL(1U, res[0].mSourceBuffers.size()); + APSARA_TEST_EQUAL(size, res[0].mSizeBytes); APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_TRUE(mItem.mBatch.mTags.mInner.empty()); @@ -155,12 +161,14 @@ void EventBatchItemUnittest::TestFlushBatchedEvensList() { APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); + APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestFlushBatchedEvensLists() { sEventGroup->AddLogEvent(); PipelineEventPtr& e = sEventGroup->MutableEvents().back(); mItem.Add(std::move(e)); + auto size = mItem.DataSize(); vector res; mItem.Flush(res); @@ -171,6 +179,7 @@ void EventBatchItemUnittest::TestFlushBatchedEvensLists() { APSARA_TEST_NOT_EQUAL(nullptr, res[0][0].mExactlyOnceCheckpoint); APSARA_TEST_STREQ("pack_id", res[0][0].mPackIdPrefix.data()); APSARA_TEST_EQUAL(1U, res[0][0].mSourceBuffers.size()); + APSARA_TEST_EQUAL(size, res[0][0].mSizeBytes); APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_TRUE(mItem.mBatch.mTags.mInner.empty()); @@ -181,6 +190,7 @@ void EventBatchItemUnittest::TestFlushBatchedEvensLists() { APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetCnt()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); + APSARA_TEST_EQUAL(0, mItem.mTotalEnqueTimeMs); } void EventBatchItemUnittest::TestExactlyOnce() { @@ -223,6 +233,7 @@ class GroupBatchItemUnittest : public ::testing::Test { mBatch.mEvents = std::move(eventGroup.MutableEvents()); mBatch.mSourceBuffers.emplace_back(std::move(eventGroup.GetSourceBuffer())); mBatch.mTags = std::move(eventGroup.GetSizedTags()); + mBatch.mSizeBytes = 100; } void TearDown() override { @@ -236,11 +247,15 @@ class GroupBatchItemUnittest : public ::testing::Test { }; void GroupBatchItemUnittest::TestAdd() { - size_t size = mBatch.DataSize(); - mItem.Add(std::move(mBatch)); + size_t size = mBatch.mSizeBytes; + mItem.Add(std::move(mBatch), 1234567890000); APSARA_TEST_EQUAL(1U, mItem.mGroups.size()); APSARA_TEST_EQUAL(size, mItem.GetStatus().GetSize()); + APSARA_TEST_EQUAL(1234567890000, mItem.TotalEnqueTimeMs()); + APSARA_TEST_EQUAL(1U, mItem.EventSize()); + APSARA_TEST_EQUAL(1U, mItem.GroupSize()); + APSARA_TEST_EQUAL(100U, mItem.DataSize()); } void GroupBatchItemUnittest::TestFlushEmpty() { @@ -257,7 +272,7 @@ void GroupBatchItemUnittest::TestFlushEmpty() { } void GroupBatchItemUnittest::TestFlushBatchedEvensList() { - mItem.Add(std::move(mBatch)); + mItem.Add(std::move(mBatch), 1234567890000); BatchedEventsList res; mItem.Flush(res); @@ -266,10 +281,14 @@ void GroupBatchItemUnittest::TestFlushBatchedEvensList() { APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); + APSARA_TEST_EQUAL(0, mItem.TotalEnqueTimeMs()); + APSARA_TEST_EQUAL(0U, mItem.EventSize()); + APSARA_TEST_EQUAL(0U, mItem.GroupSize()); + APSARA_TEST_EQUAL(0U, mItem.DataSize()); } void GroupBatchItemUnittest::TestFlushBatchedEvensLists() { - mItem.Add(std::move(mBatch)); + mItem.Add(std::move(mBatch), 1234567890000); vector res; mItem.Flush(res); @@ -279,6 +298,10 @@ void GroupBatchItemUnittest::TestFlushBatchedEvensLists() { APSARA_TEST_TRUE(mItem.IsEmpty()); APSARA_TEST_EQUAL(0U, mItem.GetStatus().GetSize()); APSARA_TEST_EQUAL(0, mItem.GetStatus().GetCreateTime()); + APSARA_TEST_EQUAL(0, mItem.TotalEnqueTimeMs()); + APSARA_TEST_EQUAL(0U, mItem.EventSize()); + APSARA_TEST_EQUAL(0U, mItem.GroupSize()); + APSARA_TEST_EQUAL(0U, mItem.DataSize()); } UNIT_TEST_CASE(GroupBatchItemUnittest, TestAdd) diff --git a/core/unittest/batch/BatchStatusUnittest.cpp b/core/unittest/batch/BatchStatusUnittest.cpp index 23932bef46..094333fd3d 100644 --- a/core/unittest/batch/BatchStatusUnittest.cpp +++ b/core/unittest/batch/BatchStatusUnittest.cpp @@ -124,6 +124,7 @@ class GroupBatchStatusUnittest : public ::testing::Test { void TestUpdate(); protected: + static void SetUpTestCase() { sBatch.mSizeBytes = 100; } void SetUp() override { mStatus.Reset(); } private: @@ -142,10 +143,10 @@ void GroupBatchStatusUnittest::TestReset() { void GroupBatchStatusUnittest::TestUpdate() { mStatus.Update(sBatch); time_t createTime = mStatus.GetCreateTime(); - APSARA_TEST_EQUAL(sBatch.DataSize(), mStatus.GetSize()); + APSARA_TEST_EQUAL(sBatch.mSizeBytes, mStatus.GetSize()); mStatus.Update(sBatch); - APSARA_TEST_EQUAL(2 * sBatch.DataSize(), mStatus.GetSize()); + APSARA_TEST_EQUAL(2 * sBatch.mSizeBytes, mStatus.GetSize()); APSARA_TEST_EQUAL(createTime, mStatus.GetCreateTime()); } diff --git a/core/unittest/batch/BatcherUnittest.cpp b/core/unittest/batch/BatcherUnittest.cpp index 8d40721565..799153692d 100644 --- a/core/unittest/batch/BatcherUnittest.cpp +++ b/core/unittest/batch/BatcherUnittest.cpp @@ -12,9 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. - -#include "pipeline/batch/Batcher.h" #include "common/JsonUtil.h" +#include "pipeline/batch/Batcher.h" #include "unittest/Unittest.h" #include "unittest/plugin/PluginMock.h" @@ -34,6 +33,7 @@ class BatcherUnittest : public ::testing::Test { void TestFlushGroupQueue(); void TestFlushAllWithoutGroupBatch(); void TestFlushAllWithGroupBatch(); + void TestMetric(); protected: static void SetUpTestCase() { sFlusher = make_unique(); } @@ -42,6 +42,7 @@ class BatcherUnittest : public ::testing::Test { mCtx.SetConfigName("test_config"); sFlusher->SetContext(mCtx); sFlusher->SetMetricsRecordRef(FlusherMock::sName, "1", "1", "1"); + sFlusher->SetNodeID("1"); } void TearDown() override { TimeoutFlushManager::GetInstance()->mTimeoutRecords.clear(); } @@ -559,6 +560,63 @@ void BatcherUnittest::TestFlushAllWithGroupBatch() { APSARA_TEST_STREQ("pack_id", res[1][0].mPackIdPrefix.data()); } +void BatcherUnittest::TestMetric() { + { + DefaultFlushStrategyOptions strategy; + strategy.mMaxCnt = 2; + strategy.mMaxSizeBytes = 1000; + strategy.mTimeoutSecs = 3; + + Batcher<> batch; + batch.Init(Json::Value(), sFlusher.get(), strategy, false); + + PipelineEventGroup g = CreateEventGroup(3); + auto groupSize = g.DataSize(); + auto batchSize = groupSize - 2 * g.GetEvents()[0]->DataSize(); + + vector res; + batch.Add(std::move(g), res); + APSARA_TEST_EQUAL(5U, batch.mMetricsRecordRef->GetLabels()->size()); + APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_PROJECT, "")); + APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_CONFIG_NAME, "test_config")); + APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_COMPONENT_NAME, "batcher")); + APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_FLUSHER_NODE_ID, "1")); + APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel("enable_group_batch", "false")); + APSARA_TEST_EQUAL(3U, batch.mInEventsCnt->GetValue()); + APSARA_TEST_EQUAL(groupSize, batch.mInGroupDataSizeBytes->GetValue()); + APSARA_TEST_EQUAL(2U, batch.mOutEventsCnt->GetValue()); + APSARA_TEST_EQUAL(1U, batch.mEventBatchItemsCnt->GetValue()); + APSARA_TEST_EQUAL(1U, batch.mBufferedGroupsCnt->GetValue()); + APSARA_TEST_EQUAL(1U, batch.mBufferedEventsCnt->GetValue()); + APSARA_TEST_EQUAL(batchSize, batch.mBufferedDataSizeByte->GetValue()); + } + { + DefaultFlushStrategyOptions strategy; + strategy.mMaxCnt = 2; + strategy.mMaxSizeBytes = 1000; + strategy.mTimeoutSecs = 3; + + Batcher<> batch; + batch.Init(Json::Value(), sFlusher.get(), strategy, true); + + PipelineEventGroup g = CreateEventGroup(3); + auto groupSize = g.DataSize(); + auto batchSize = groupSize - 2 * g.GetEvents()[0]->DataSize(); + + vector res; + batch.Add(std::move(g), res); + batch.FlushQueue(0, res[0]); + APSARA_TEST_TRUE(batch.mMetricsRecordRef.HasLabel("enable_group_batch", "true")); + APSARA_TEST_EQUAL(3U, batch.mInEventsCnt->GetValue()); + APSARA_TEST_EQUAL(groupSize, batch.mInGroupDataSizeBytes->GetValue()); + APSARA_TEST_EQUAL(2U, batch.mOutEventsCnt->GetValue()); + APSARA_TEST_EQUAL(1U, batch.mEventBatchItemsCnt->GetValue()); + APSARA_TEST_EQUAL(1U, batch.mBufferedGroupsCnt->GetValue()); + APSARA_TEST_EQUAL(1U, batch.mBufferedEventsCnt->GetValue()); + APSARA_TEST_EQUAL(batchSize, batch.mBufferedDataSizeByte->GetValue()); + } +} + PipelineEventGroup BatcherUnittest::CreateEventGroup(size_t cnt) { PipelineEventGroup group(make_shared()); group.SetTag(string("key"), string("val")); @@ -581,6 +639,7 @@ UNIT_TEST_CASE(BatcherUnittest, TestFlushEventQueueWithGroupBatch) UNIT_TEST_CASE(BatcherUnittest, TestFlushGroupQueue) UNIT_TEST_CASE(BatcherUnittest, TestFlushAllWithoutGroupBatch) UNIT_TEST_CASE(BatcherUnittest, TestFlushAllWithGroupBatch) +UNIT_TEST_CASE(BatcherUnittest, TestMetric) } // namespace logtail diff --git a/core/unittest/compression/CMakeLists.txt b/core/unittest/compression/CMakeLists.txt index 442a8db893..60f1655e5d 100644 --- a/core/unittest/compression/CMakeLists.txt +++ b/core/unittest/compression/CMakeLists.txt @@ -18,6 +18,9 @@ project(compression_unittest) add_executable(compressor_factory_unittest CompressorFactoryUnittest.cpp) target_link_libraries(compressor_factory_unittest ${UT_BASE_TARGET}) +add_executable(compressor_unittest CompressorUnittest.cpp) +target_link_libraries(compressor_unittest ${UT_BASE_TARGET}) + add_executable(lz4_compressor_unittest LZ4CompressorUnittest.cpp) target_link_libraries(lz4_compressor_unittest ${UT_BASE_TARGET}) @@ -26,5 +29,6 @@ target_link_libraries(zstd_compressor_unittest ${UT_BASE_TARGET}) include(GoogleTest) gtest_discover_tests(compressor_factory_unittest) +gtest_discover_tests(compressor_unittest) gtest_discover_tests(lz4_compressor_unittest) gtest_discover_tests(zstd_compressor_unittest) diff --git a/core/unittest/compression/CompressorFactoryUnittest.cpp b/core/unittest/compression/CompressorFactoryUnittest.cpp index 7093a37ff3..84e159ee34 100644 --- a/core/unittest/compression/CompressorFactoryUnittest.cpp +++ b/core/unittest/compression/CompressorFactoryUnittest.cpp @@ -12,7 +12,8 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "pipeline/compression/CompressorFactory.h" +#include "monitor/MetricConstants.h" +#include "common/compression/CompressorFactory.h" #include "unittest/Unittest.h" using namespace std; @@ -23,56 +24,64 @@ class CompressorFactoryUnittest : public ::testing::Test { public: void TestCreate(); void TestCompressTypeToString(); + void TestMetric(); protected: - void SetUp() { mCtx.SetConfigName("test_config"); } + void SetUp() { + mCtx.SetConfigName("test_config"); + mFlusherId = "1"; + } private: PipelineContext mCtx; + string mFlusherId; }; void CompressorFactoryUnittest::TestCreate() { { // use default - auto compressor - = CompressorFactory::GetInstance()->Create(Json::Value(), mCtx, "test_plugin", CompressType::LZ4); + auto compressor = CompressorFactory::GetInstance()->Create( + Json::Value(), mCtx, "test_plugin", mFlusherId, CompressType::LZ4); APSARA_TEST_EQUAL(CompressType::LZ4, compressor->GetCompressType()); } { // lz4 Json::Value config; config["CompressType"] = "lz4"; - auto compressor = CompressorFactory::GetInstance()->Create(config, mCtx, "test_plugin", CompressType::ZSTD); + auto compressor + = CompressorFactory::GetInstance()->Create(config, mCtx, "test_plugin", mFlusherId, CompressType::ZSTD); APSARA_TEST_EQUAL(CompressType::LZ4, compressor->GetCompressType()); } { // zstd Json::Value config; config["CompressType"] = "zstd"; - auto compressor = CompressorFactory::GetInstance()->Create(config, mCtx, "test_plugin", CompressType::LZ4); + auto compressor + = CompressorFactory::GetInstance()->Create(config, mCtx, "test_plugin", mFlusherId, CompressType::LZ4); APSARA_TEST_EQUAL(CompressType::ZSTD, compressor->GetCompressType()); } { // none Json::Value config; config["CompressType"] = "none"; - auto compressor = CompressorFactory::GetInstance()->Create(config, mCtx, "test_plugin", CompressType::LZ4); + auto compressor + = CompressorFactory::GetInstance()->Create(config, mCtx, "test_plugin", mFlusherId, CompressType::LZ4); APSARA_TEST_EQUAL(nullptr, compressor); } { // unknown Json::Value config; config["CompressType"] = "unknown"; - auto compressor - = CompressorFactory::GetInstance()->Create(Json::Value(), mCtx, "test_plugin", CompressType::LZ4); + auto compressor = CompressorFactory::GetInstance()->Create( + Json::Value(), mCtx, "test_plugin", mFlusherId, CompressType::LZ4); APSARA_TEST_EQUAL(CompressType::LZ4, compressor->GetCompressType()); } { // invalid Json::Value config; config["CompressType"] = 123; - auto compressor - = CompressorFactory::GetInstance()->Create(Json::Value(), mCtx, "test_plugin", CompressType::LZ4); + auto compressor = CompressorFactory::GetInstance()->Create( + Json::Value(), mCtx, "test_plugin", mFlusherId, CompressType::LZ4); APSARA_TEST_EQUAL(CompressType::LZ4, compressor->GetCompressType()); } } @@ -83,8 +92,19 @@ void CompressorFactoryUnittest::TestCompressTypeToString() { APSARA_TEST_STREQ("none", CompressTypeToString(CompressType::NONE).data()); } +void CompressorFactoryUnittest::TestMetric() { + auto compressor + = CompressorFactory::GetInstance()->Create(Json::Value(), mCtx, "test_plugin", mFlusherId, CompressType::LZ4); + APSARA_TEST_EQUAL(4U, compressor->mMetricsRecordRef->GetLabels()->size()); + APSARA_TEST_TRUE(compressor->mMetricsRecordRef.HasLabel(METRIC_LABEL_PROJECT, "")); + APSARA_TEST_TRUE(compressor->mMetricsRecordRef.HasLabel(METRIC_LABEL_CONFIG_NAME, "test_config")); + APSARA_TEST_TRUE(compressor->mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_COMPONENT_NAME, "compressor")); + APSARA_TEST_TRUE(compressor->mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_FLUSHER_NODE_ID, mFlusherId)); +} + UNIT_TEST_CASE(CompressorFactoryUnittest, TestCreate) UNIT_TEST_CASE(CompressorFactoryUnittest, TestCompressTypeToString) +UNIT_TEST_CASE(CompressorFactoryUnittest, TestMetric) } // namespace logtail diff --git a/core/unittest/compression/CompressorUnittest.cpp b/core/unittest/compression/CompressorUnittest.cpp new file mode 100644 index 0000000000..54576c63a4 --- /dev/null +++ b/core/unittest/compression/CompressorUnittest.cpp @@ -0,0 +1,79 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "common/compression/LZ4Compressor.h" +#include "monitor/MetricConstants.h" +#include "unittest/Unittest.h" + +using namespace std; + +namespace logtail { + +class CompressorMock : public Compressor { +public: + CompressorMock(CompressType type) : Compressor(type) {}; + + bool UnCompress(const std::string& input, std::string& output, std::string& errorMsg) override { return true; } + +private: + bool Compress(const std::string& input, std::string& output, std::string& errorMsg) override { + if (input == "failed") { + return false; + } + output = input.substr(0, input.size() / 2); + return true; + } +}; + +class CompressorUnittest : public ::testing::Test { +public: + void TestMetric(); +}; + +void CompressorUnittest::TestMetric() { + { + CompressorMock compressor(CompressType::MOCK); + compressor.SetMetricRecordRef({}); + string input = "hello world"; + string output; + string errorMsg; + compressor.DoCompress(input, output, errorMsg); + APSARA_TEST_EQUAL(1U, compressor.mInItemsCnt->GetValue()); + APSARA_TEST_EQUAL(input.size(), compressor.mInItemSizeBytes->GetValue()); + APSARA_TEST_EQUAL(1U, compressor.mOutItemsCnt->GetValue()); + APSARA_TEST_EQUAL(output.size(), compressor.mOutItemSizeBytes->GetValue()); + APSARA_TEST_EQUAL(0U, compressor.mDiscardedItemsCnt->GetValue()); + APSARA_TEST_EQUAL(0U, compressor.mDiscardedItemSizeBytes->GetValue()); + } + { + CompressorMock compressor(CompressType::MOCK); + compressor.SetMetricRecordRef({}); + string input = "failed"; + string output; + string errorMsg; + compressor.DoCompress(input, output, errorMsg); + APSARA_TEST_EQUAL(1U, compressor.mInItemsCnt->GetValue()); + APSARA_TEST_EQUAL(input.size(), compressor.mInItemSizeBytes->GetValue()); + APSARA_TEST_EQUAL(0U, compressor.mOutItemsCnt->GetValue()); + APSARA_TEST_EQUAL(0U, compressor.mOutItemSizeBytes->GetValue()); + APSARA_TEST_EQUAL(1U, compressor.mDiscardedItemsCnt->GetValue()); + APSARA_TEST_EQUAL(input.size(), compressor.mDiscardedItemSizeBytes->GetValue()); + } +} + +UNIT_TEST_CASE(CompressorUnittest, TestMetric) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/compression/LZ4CompressorUnittest.cpp b/core/unittest/compression/LZ4CompressorUnittest.cpp index 7bd62300f7..e852c063cf 100644 --- a/core/unittest/compression/LZ4CompressorUnittest.cpp +++ b/core/unittest/compression/LZ4CompressorUnittest.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "pipeline/compression/LZ4Compressor.h" +#include "common/compression/LZ4Compressor.h" #include "unittest/Unittest.h" using namespace std; @@ -29,7 +29,7 @@ void LZ4CompressorUnittest::TestCompress() { string input = "hello world"; string output; string errorMsg; - APSARA_TEST_TRUE(compressor.Compress(input, output, errorMsg)); + APSARA_TEST_TRUE(compressor.DoCompress(input, output, errorMsg)); string decompressed; decompressed.resize(input.size()); APSARA_TEST_TRUE(compressor.UnCompress(output, decompressed, errorMsg)); diff --git a/core/unittest/compression/ZstdCompressorUnittest.cpp b/core/unittest/compression/ZstdCompressorUnittest.cpp index 6362907173..0e165b1987 100644 --- a/core/unittest/compression/ZstdCompressorUnittest.cpp +++ b/core/unittest/compression/ZstdCompressorUnittest.cpp @@ -12,7 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -#include "pipeline/compression/ZstdCompressor.h" +#include "common/compression/ZstdCompressor.h" #include "unittest/Unittest.h" using namespace std; @@ -29,7 +29,7 @@ void ZstdCompressorUnittest::TestCompress() { string input = "hello world"; string output; string errorMsg; - APSARA_TEST_TRUE(compressor.Compress(input, output, errorMsg)); + APSARA_TEST_TRUE(compressor.DoCompress(input, output, errorMsg)); string decompressed; decompressed.resize(input.size()); APSARA_TEST_TRUE(compressor.UnCompress(output, decompressed, errorMsg)); diff --git a/core/unittest/flusher/FlusherSLSUnittest.cpp b/core/unittest/flusher/FlusherSLSUnittest.cpp index 52a3d2f510..71e13cbcc6 100644 --- a/core/unittest/flusher/FlusherSLSUnittest.cpp +++ b/core/unittest/flusher/FlusherSLSUnittest.cpp @@ -22,10 +22,7 @@ #ifdef __ENTERPRISE__ #include "config/provider/EnterpriseConfigProvider.h" #endif -#include "pipeline/compression/CompressorFactory.h" -#include "plugin/flusher/sls/FlusherSLS.h" -#include "plugin/flusher/sls/PackIdManager.h" -#include "plugin/flusher/sls/SLSClientManager.h" +#include "common/compression/CompressorFactory.h" #include "pipeline/Pipeline.h" #include "pipeline/PipelineContext.h" #include "pipeline/queue/ExactlyOnceQueueManager.h" @@ -33,6 +30,9 @@ #include "pipeline/queue/QueueKeyManager.h" #include "pipeline/queue/SLSSenderQueueItem.h" #include "pipeline/queue/SenderQueueManager.h" +#include "plugin/flusher/sls/FlusherSLS.h" +#include "plugin/flusher/sls/PackIdManager.h" +#include "plugin/flusher/sls/SLSClientManager.h" #include "unittest/Unittest.h" DECLARE_FLAG_INT32(batch_send_interval); @@ -617,7 +617,7 @@ void FlusherSLSUnittest::TestSend() { APSARA_TEST_EQUAL(cpt, item->mExactlyOnceCheckpoint); auto compressor - = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", CompressType::LZ4); + = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output, errorMsg; output.resize(item->mRawSize); APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg)); @@ -671,7 +671,7 @@ void FlusherSLSUnittest::TestSend() { APSARA_TEST_EQUAL(checkpoints[0], item->mExactlyOnceCheckpoint); auto compressor - = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", CompressType::LZ4); + = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output, errorMsg; output.resize(item->mRawSize); APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg)); @@ -750,7 +750,7 @@ void FlusherSLSUnittest::TestSend() { APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore); auto compressor - = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", CompressType::LZ4); + = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output, errorMsg; output.resize(item->mRawSize); APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg)); @@ -844,7 +844,7 @@ void FlusherSLSUnittest::TestSend() { APSARA_TEST_EQUAL(flusher.mLogstore, item->mLogstore); auto compressor - = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", CompressType::LZ4); + = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); sls_logs::SlsLogPackageList packageList; APSARA_TEST_TRUE(packageList.ParseFromString(item->mData)); @@ -1017,7 +1017,7 @@ void FlusherSLSUnittest::OnGoPipelineSend() { APSARA_TEST_EQUAL("other_logstore", item->mLogstore); auto compressor - = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", CompressType::LZ4); + = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output; output.resize(item->mRawSize); APSARA_TEST_TRUE(compressor->UnCompress(item->mData, output, errorMsg)); @@ -1040,7 +1040,7 @@ void FlusherSLSUnittest::OnGoPipelineSend() { flusher.mProject = "test_project"; flusher.mLogstore = "test_logstore"; flusher.mCompressor = CompressorFactory::GetInstance()->Create( - Json::Value(), PipelineContext(), "flusher_sls", CompressType::LZ4); + Json::Value(), PipelineContext(), "flusher_sls", "1", CompressType::LZ4); APSARA_TEST_TRUE(flusher.Send("content", "")); @@ -1060,7 +1060,7 @@ void FlusherSLSUnittest::OnGoPipelineSend() { APSARA_TEST_EQUAL("test_logstore", item->mLogstore); auto compressor - = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", CompressType::LZ4); + = CompressorFactory::GetInstance()->Create(Json::Value(), ctx, "flusher_sls", "1", CompressType::LZ4); string output; output.resize(item->mRawSize); string errorMsg; diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 98b41a5ff6..d4d76b5138 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -18,17 +18,17 @@ #include #include "app_config/AppConfig.h" -#include "pipeline/batch/TimeoutFlushManager.h" #include "common/JsonUtil.h" #include "config/PipelineConfig.h" -#include "plugin/input/InputFeedbackInterfaceRegistry.h" #include "pipeline/Pipeline.h" +#include "pipeline/batch/TimeoutFlushManager.h" #include "pipeline/plugin/PluginRegistry.h" -#include "plugin/processor/inner/ProcessorSplitLogStringNative.h" -#include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h" #include "pipeline/queue/BoundedProcessQueue.h" #include "pipeline/queue/ProcessQueueManager.h" #include "pipeline/queue/QueueKeyManager.h" +#include "plugin/input/InputFeedbackInterfaceRegistry.h" +#include "plugin/processor/inner/ProcessorSplitLogStringNative.h" +#include "plugin/processor/inner/ProcessorSplitMultilineLogStringNative.h" #include "unittest/Unittest.h" #include "unittest/plugin/PluginMock.h" @@ -112,6 +112,9 @@ void PipelineUnittest::OnSuccessfulInit() const { APSARA_TEST_EQUAL("test_region", pipeline->GetContext().GetRegion()); APSARA_TEST_EQUAL(QueueKeyManager::GetInstance()->GetKey("test_config-flusher_sls-test_project#test_logstore"), pipeline->GetContext().GetLogstoreKey()); + APSARA_TEST_EQUAL(2U, pipeline->mMetricsRecordRef->GetLabels()->size()); + APSARA_TEST_TRUE(pipeline->mMetricsRecordRef.HasLabel(METRIC_LABEL_CONFIG_NAME, configName)); + APSARA_TEST_TRUE(pipeline->mMetricsRecordRef.HasLabel(METRIC_LABEL_PROJECT, "test_project")); // without sls flusher configStr = R"( @@ -2684,12 +2687,24 @@ void PipelineUnittest::TestProcess() const { processor->Init(Json::Value(), ctx); pipeline.mProcessorLine.emplace_back(std::move(processor)); - vector group; - group.emplace_back(make_shared()); - pipeline.Process(group, 0); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, {}); + pipeline.mProcessorsInEventsCnt = pipeline.mMetricsRecordRef.CreateCounter("processors_in_events_cnt"); + pipeline.mProcessorsInGroupsCnt = pipeline.mMetricsRecordRef.CreateCounter("processors_in_event_groups_cnt"); + pipeline.mProcessorsInGroupDataSizeBytes + = pipeline.mMetricsRecordRef.CreateCounter("processors_in_event_group_data_size_bytes"); + pipeline.mProcessorsTotalDelayMs = pipeline.mMetricsRecordRef.CreateCounter("processors_total_delay_ms"); + + vector groups; + groups.emplace_back(make_shared()); + groups.back().AddLogEvent(); + auto size = groups.back().DataSize(); + pipeline.Process(groups, 0); APSARA_TEST_EQUAL( 1U, static_cast(pipeline.mInputs[0]->GetInnerProcessors()[0]->mPlugin.get())->mCnt); APSARA_TEST_EQUAL(1U, static_cast(pipeline.mProcessorLine[0]->mPlugin.get())->mCnt); + APSARA_TEST_EQUAL(1U, pipeline.mProcessorsInEventsCnt->GetValue()); + APSARA_TEST_EQUAL(1U, pipeline.mProcessorsInGroupsCnt->GetValue()); + APSARA_TEST_EQUAL(size, pipeline.mProcessorsInGroupDataSizeBytes->GetValue()); } void PipelineUnittest::TestSend() const { diff --git a/core/unittest/route/RouterUnittest.cpp b/core/unittest/route/RouterUnittest.cpp index bee7bb1803..3e42ff722a 100644 --- a/core/unittest/route/RouterUnittest.cpp +++ b/core/unittest/route/RouterUnittest.cpp @@ -13,6 +13,7 @@ // limitations under the License. #include "common/JsonUtil.h" +#include "monitor/MetricConstants.h" #include "pipeline/Pipeline.h" #include "pipeline/route/Router.h" #include "unittest/Unittest.h" @@ -25,6 +26,10 @@ class RouterUnittest : public testing::Test { public: void TestInit(); void TestRoute(); + void TestMetric(); + +protected: + void SetUp() override { ctx.SetConfigName("test_config"); } private: PipelineContext ctx; @@ -124,8 +129,44 @@ void RouterUnittest::TestRoute() { } } +void RouterUnittest::TestMetric() { + Json::Value configJson; + string errorMsg; + string configStr = R"( + [ + { + "Type": "event_type", + "Value": "log" + } + ] + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + vector> configs; + for (Json::Value::ArrayIndex i = 0; i < configJson.size(); ++i) { + configs.emplace_back(i, &configJson[i]); + } + configs.emplace_back(configJson.size(), nullptr); + + Router router; + router.Init(configs, ctx); + + APSARA_TEST_EQUAL(3U, router.mMetricsRecordRef->GetLabels()->size()); + APSARA_TEST_TRUE(router.mMetricsRecordRef.HasLabel(METRIC_LABEL_PROJECT, "")); + APSARA_TEST_TRUE(router.mMetricsRecordRef.HasLabel(METRIC_LABEL_CONFIG_NAME, "test_config")); + APSARA_TEST_TRUE(router.mMetricsRecordRef.HasLabel(METRIC_LABEL_KEY_COMPONENT_NAME, "router")); + + PipelineEventGroup g(make_shared()); + g.AddLogEvent(); + auto size = g.DataSize(); + router.Route(g); + + APSARA_TEST_EQUAL(1U, router.mInEventsCnt->GetValue()); + APSARA_TEST_EQUAL(size, router.mInGroupDataSizeBytes->GetValue()); +} + UNIT_TEST_CASE(RouterUnittest, TestInit) UNIT_TEST_CASE(RouterUnittest, TestRoute) +UNIT_TEST_CASE(RouterUnittest, TestMetric) } // namespace logtail diff --git a/core/unittest/serializer/CMakeLists.txt b/core/unittest/serializer/CMakeLists.txt index 11c1436ee5..b471e3b81a 100644 --- a/core/unittest/serializer/CMakeLists.txt +++ b/core/unittest/serializer/CMakeLists.txt @@ -15,8 +15,12 @@ cmake_minimum_required(VERSION 3.22) project(serializer_unittest) +add_executable(serializer_unittest SerializerUnittest.cpp) +target_link_libraries(serializer_unittest ${UT_BASE_TARGET}) + add_executable(sls_serializer_unittest SLSSerializerUnittest.cpp) target_link_libraries(sls_serializer_unittest ${UT_BASE_TARGET}) include(GoogleTest) +gtest_discover_tests(serializer_unittest) gtest_discover_tests(sls_serializer_unittest) diff --git a/core/unittest/serializer/SLSSerializerUnittest.cpp b/core/unittest/serializer/SLSSerializerUnittest.cpp index a3695df4d1..7d57cef290 100644 --- a/core/unittest/serializer/SLSSerializerUnittest.cpp +++ b/core/unittest/serializer/SLSSerializerUnittest.cpp @@ -52,7 +52,7 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { { // nano second disabled, and set string res, errorMsg; - APSARA_TEST_TRUE(serializer.Serialize(CreateBatchedEvents(false), res, errorMsg)); + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedEvents(false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1, logGroup.logs_size()); @@ -72,7 +72,7 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { // nano second enabled, and set const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res, errorMsg; - APSARA_TEST_TRUE(serializer.Serialize(CreateBatchedEvents(true), res, errorMsg)); + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedEvents(true), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); @@ -83,7 +83,7 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { // nano second enabled, not set const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res, errorMsg; - APSARA_TEST_TRUE(serializer.Serialize(CreateBatchedEvents(false), res, errorMsg)); + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedEvents(false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); APSARA_TEST_EQUAL(1234567890U, logGroup.logs(0).time()); @@ -94,13 +94,13 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { // log group exceed size limit INT32_FLAG(max_send_log_group_size) = 0; string res, errorMsg; - APSARA_TEST_FALSE(serializer.Serialize(CreateBatchedEvents(true), res, errorMsg)); + APSARA_TEST_FALSE(serializer.DoSerialize(CreateBatchedEvents(true), res, errorMsg)); INT32_FLAG(max_send_log_group_size) = 10 * 1024 * 1024; } { // metric event string res, errorMsg; - APSARA_TEST_TRUE(serializer.Serialize(CreateBatchedMetricEvents(false, 0, false), res, errorMsg)); + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); @@ -125,7 +125,7 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res, errorMsg; - APSARA_TEST_TRUE(serializer.Serialize(CreateBatchedMetricEvents(true, 1, false), res, errorMsg)); + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(true, 1, false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); @@ -151,7 +151,7 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res, errorMsg; - APSARA_TEST_TRUE(serializer.Serialize(CreateBatchedMetricEvents(true, 1999999999, false), res, errorMsg)); + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(true, 1999999999, false), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); @@ -178,7 +178,7 @@ void SLSSerializerUnittest::TestSerializeEventGroup() { const_cast(mCtx.GetGlobalConfig()).mEnableTimestampNanosecond = true; string res, errorMsg; - APSARA_TEST_TRUE(serializer.Serialize(CreateBatchedMetricEvents(false, 0, true), res, errorMsg)); + APSARA_TEST_TRUE(serializer.DoSerialize(CreateBatchedMetricEvents(false, 0, true), res, errorMsg)); sls_logs::LogGroup logGroup; APSARA_TEST_TRUE(logGroup.ParseFromString(res)); @@ -192,7 +192,7 @@ void SLSSerializerUnittest::TestSerializeEventGroupList() { SLSEventGroupListSerializer serializer(sFlusher.get()); string res, errorMsg; - APSARA_TEST_TRUE(serializer.Serialize(std::move(v), res, errorMsg)); + APSARA_TEST_TRUE(serializer.DoSerialize(std::move(v), res, errorMsg)); sls_logs::SlsLogPackageList logPackageList; APSARA_TEST_TRUE(logPackageList.ParseFromString(res)); APSARA_TEST_EQUAL(1, logPackageList.packages_size()); diff --git a/core/unittest/serializer/SerializerUnittest.cpp b/core/unittest/serializer/SerializerUnittest.cpp new file mode 100644 index 0000000000..ed632e02ae --- /dev/null +++ b/core/unittest/serializer/SerializerUnittest.cpp @@ -0,0 +1,114 @@ +// Copyright 2024 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include "monitor/MetricConstants.h" +#include "pipeline/plugin/interface/Flusher.h" +#include "pipeline/serializer/Serializer.h" +#include "unittest/Unittest.h" +#include "unittest/plugin/PluginMock.h" + +using namespace std; + +namespace logtail { + +class SerializerMock : public Serializer { +public: + SerializerMock(Flusher* f) : Serializer(f) {}; + +private: + bool Serialize(BatchedEvents&& p, std::string& res, std::string& errorMsg) override { + if (p.mEvents.empty()) { + return false; + } + res = "result"; + return true; + } +}; + +class SerializerUnittest : public ::testing::Test { +public: + void TestMetric(); + +protected: + static void SetUpTestCase() { sFlusher = make_unique(); } + + void SetUp() override { + mCtx.SetConfigName("test_config"); + sFlusher->SetContext(mCtx); + sFlusher->SetMetricsRecordRef(FlusherMock::sName, "1", "1", "1"); + } + +private: + static unique_ptr sFlusher; + + BatchedEvents CreateBatchedMetricEvents(bool withEvents = true); + + PipelineContext mCtx; +}; + +unique_ptr SerializerUnittest::sFlusher; + +void SerializerUnittest::TestMetric() { + { + SerializerMock serializer(sFlusher.get()); + auto input = CreateBatchedMetricEvents(); + auto inputSize = input.mSizeBytes; + string output; + string errorMsg; + serializer.DoSerialize(std::move(input), output, errorMsg); + APSARA_TEST_EQUAL(1U, serializer.mInItemsCnt->GetValue()); + APSARA_TEST_EQUAL(inputSize, serializer.mInItemSizeBytes->GetValue()); + APSARA_TEST_EQUAL(1U, serializer.mOutItemsCnt->GetValue()); + APSARA_TEST_EQUAL(output.size(), serializer.mOutItemSizeBytes->GetValue()); + APSARA_TEST_EQUAL(0U, serializer.mDiscardedItemsCnt->GetValue()); + APSARA_TEST_EQUAL(0U, serializer.mDiscardedItemSizeBytes->GetValue()); + } + { + SerializerMock serializer(sFlusher.get()); + auto input = CreateBatchedMetricEvents(false); + auto inputSize = input.mSizeBytes; + string output; + string errorMsg; + serializer.DoSerialize(std::move(input), output, errorMsg); + APSARA_TEST_EQUAL(1U, serializer.mInItemsCnt->GetValue()); + APSARA_TEST_EQUAL(inputSize, serializer.mInItemSizeBytes->GetValue()); + APSARA_TEST_EQUAL(0U, serializer.mOutItemsCnt->GetValue()); + APSARA_TEST_EQUAL(0U, serializer.mOutItemSizeBytes->GetValue()); + APSARA_TEST_EQUAL(1U, serializer.mDiscardedItemsCnt->GetValue()); + APSARA_TEST_EQUAL(inputSize, serializer.mDiscardedItemSizeBytes->GetValue()); + } +} + +BatchedEvents SerializerUnittest::CreateBatchedMetricEvents(bool withEvents) { + PipelineEventGroup group(make_shared()); + group.SetTag(string("key"), string("value")); + StringBuffer b = group.GetSourceBuffer()->CopyString(string("pack_id")); + group.SetMetadataNoCopy(EventGroupMetaKey::SOURCE_ID, StringView(b.data, b.size)); + group.SetExactlyOnceCheckpoint(RangeCheckpointPtr(new RangeCheckpoint)); + if (withEvents) { + group.AddLogEvent(); + } + BatchedEvents batch(std::move(group.MutableEvents()), + std::move(group.GetSizedTags()), + std::move(group.GetSourceBuffer()), + group.GetMetadata(EventGroupMetaKey::SOURCE_ID), + std::move(group.GetExactlyOnceCheckpoint())); + return batch; +} + +UNIT_TEST_CASE(SerializerUnittest, TestMetric) + +} // namespace logtail + +UNIT_TEST_MAIN