From 19c681fc162d7b326340fb21fc28791bae179dd2 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 17:35:13 +0800 Subject: [PATCH] improve send log and adjust runner exit timeout (#1832) --- core/CMakeLists.txt | 5 +- core/application/Application.cpp | 2 +- core/checkpoint/RangeCheckpoint.h | 2 - core/common/http/AsynCurlRunner.cpp | 22 ++- core/common/http/Curl.cpp | 42 +++-- core/common/http/HttpRequest.h | 2 +- .../InstanceConfigManager.cpp | 2 +- .../InstanceConfigManager.h | 0 .../common_provider/CommonConfigProvider.cpp | 2 +- core/deps/README.md | 17 -- core/deps/build.sh | 157 ------------------ core/models/PipelineEventGroup.h | 2 + core/monitor/Monitor.cpp | 1 - .../metric_constants/MetricConstants.h | 5 + .../metric_constants/PipelineMetrics.cpp | 4 + .../metric_constants/PluginMetrics.cpp | 1 + core/pipeline/Pipeline.cpp | 12 ++ core/pipeline/Pipeline.h | 4 + core/pipeline/PipelineManager.cpp | 3 - .../plugin/instance/FlusherInstance.cpp | 12 +- .../plugin/instance/FlusherInstance.h | 2 + .../queue/ExactlyOnceQueueManager.cpp | 16 -- core/pipeline/queue/ExactlyOnceQueueManager.h | 4 - core/pipeline/queue/ProcessQueueManager.cpp | 17 -- core/pipeline/queue/ProcessQueueManager.h | 4 - core/pipeline/queue/SenderQueueItem.h | 2 +- core/plugin/flusher/sls/DiskBufferWriter.cpp | 23 ++- core/plugin/flusher/sls/FlusherSLS.cpp | 59 ++++--- core/plugin/flusher/sls/SLSClientManager.cpp | 10 +- .../inner/ProcessorSplitLogStringNative.cpp | 4 - ...ProcessorSplitMultilineLogStringNative.cpp | 4 - core/runner/FlusherRunner.cpp | 15 +- core/runner/FlusherRunner.h | 4 +- core/runner/ProcessorRunner.cpp | 4 +- core/runner/sink/http/HttpSink.cpp | 67 +++++--- .../config/CommonConfigProviderUnittest.cpp | 12 +- .../InstanceConfigManagerUnittest.cpp | 2 +- .../models/PipelineEventGroupUnittest.cpp | 7 + core/unittest/pipeline/PipelineUnittest.cpp | 20 +++ 39 files changed, 235 insertions(+), 338 deletions(-) rename core/{instance_config => config}/InstanceConfigManager.cpp (97%) rename core/{instance_config => config}/InstanceConfigManager.h (100%) delete mode 100644 core/deps/README.md delete mode 100755 core/deps/build.sh diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index b48a538935..6b197647e3 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -113,10 +113,9 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake) # Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider. set(SUB_DIRECTORIES_LIST - application app_config checkpoint container_manager logger go_pipeline monitor monitor/metric_constants profile_sender models + application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants profile_sender models config config/watcher - instance_config - metadata pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer + pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer runner runner/sink/http protobuf/sls protobuf/models file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling diff --git a/core/application/Application.cpp b/core/application/Application.cpp index 14548eb09d..11fe230a67 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -31,6 +31,7 @@ #include "common/UUIDUtil.h" #include "common/version.h" #include "config/ConfigDiff.h" +#include "config/InstanceConfigManager.h" #include "config/watcher/ConfigWatcher.h" #include "config/watcher/InstanceConfigWatcher.h" #include "file_server/ConfigManager.h" @@ -38,7 +39,6 @@ #include "file_server/FileServer.h" #include "file_server/event_handler/LogInput.h" #include "go_pipeline/LogtailPlugin.h" -#include "instance_config/InstanceConfigManager.h" #include "logger/Logger.h" #include "monitor/LogFileProfiler.h" #include "monitor/MetricExportor.h" diff --git a/core/checkpoint/RangeCheckpoint.h b/core/checkpoint/RangeCheckpoint.h index ad2f9cc4dd..30880dde6f 100644 --- a/core/checkpoint/RangeCheckpoint.h +++ b/core/checkpoint/RangeCheckpoint.h @@ -29,10 +29,8 @@ class RangeCheckpoint { std::string key; QueueKey fbKey; RangeCheckpointPB data; - std::vector> positions; inline void Prepare() { - positions.clear(); data.set_committed(false); save(); } diff --git a/core/common/http/AsynCurlRunner.cpp b/core/common/http/AsynCurlRunner.cpp index f9aa2b5440..9be67bb5cb 100644 --- a/core/common/http/AsynCurlRunner.cpp +++ b/core/common/http/AsynCurlRunner.cpp @@ -182,34 +182,42 @@ void AsynCurlRunner::HandleCompletedRequests(int& runningHandlers) { CURL* handler = msg->easy_handle; AsynHttpRequest* request = nullptr; curl_easy_getinfo(handler, CURLINFO_PRIVATE, &request); - LOG_DEBUG(sLogger, - ("send http request completed, request address", - request)("response time",ToString(chrono::duration_cast(chrono::system_clock::now()- request->mLastSendTime).count()) + "ms") - ("try cnt", ToString(request->mTryCnt))); + auto responseTime + = chrono::duration_cast(chrono::system_clock::now() - request->mLastSendTime) + .count(); switch (msg->data.result) { case CURLE_OK: { long statusCode = 0; curl_easy_getinfo(handler, CURLINFO_RESPONSE_CODE, &statusCode); request->mResponse.mStatusCode = (int32_t)statusCode; request->OnSendDone(request->mResponse); + LOG_DEBUG(sLogger, + ("send http request succeeded, request address", + request)("response time", ToString(responseTime) + "ms")("try cnt", + ToString(request->mTryCnt))); break; } default: // considered as network error - if (request->mTryCnt <= request->mMaxTryCnt) { + if (request->mTryCnt < request->mMaxTryCnt) { LOG_WARNING(sLogger, - ("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)( - "errMsg", curl_easy_strerror(msg->data.result))); + ("failed to send http request", "retry immediately")("request address", request)( + "try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(msg->data.result))); // free first,becase mPrivateData will be reset in AddRequestToClient if (request->mPrivateData) { curl_slist_free_all((curl_slist*)request->mPrivateData); request->mPrivateData = nullptr; } + ++request->mTryCnt; AddRequestToClient(unique_ptr(request)); ++runningHandlers; requestReused = true; } else { request->OnSendDone(request->mResponse); + LOG_DEBUG( + sLogger, + ("failed to send http request", "abort")("request address", request)( + "response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt))); } break; } diff --git a/core/common/http/Curl.cpp b/core/common/http/Curl.cpp index af02793437..7c0d8c6f49 100644 --- a/core/common/http/Curl.cpp +++ b/core/common/http/Curl.cpp @@ -18,10 +18,10 @@ #include #include -#include "common/DNSCache.h" #include "app_config/AppConfig.h" -#include "logger/Logger.h" +#include "common/DNSCache.h" #include "common/http/HttpResponse.h" +#include "logger/Logger.h" using namespace std; @@ -139,24 +139,25 @@ CURL* CreateCurlHandler(const std::string& method, bool SendHttpRequest(std::unique_ptr&& request, HttpResponse& response) { curl_slist* headers = NULL; CURL* curl = CreateCurlHandler(request->mMethod, - request->mHTTPSFlag, - request->mHost, - request->mPort, - request->mUrl, - request->mQueryString, - request->mHeader, - request->mBody, - response, - headers, - request->mTimeout, - AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(), - AppConfig::GetInstance()->GetBindInterface()); + request->mHTTPSFlag, + request->mHost, + request->mPort, + request->mUrl, + request->mQueryString, + request->mHeader, + request->mBody, + response, + headers, + request->mTimeout, + AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(), + AppConfig::GetInstance()->GetBindInterface()); if (curl == NULL) { - LOG_ERROR(sLogger, ("failed to init curl handler", "failed to init curl client")("request address", request.get())); + LOG_ERROR(sLogger, + ("failed to init curl handler", "failed to init curl client")("request address", request.get())); return false; } bool success = false; - while (request->mTryCnt <= request->mMaxTryCnt) { + while (true) { CURLcode res = curl_easy_perform(curl); if (res == CURLE_OK) { long http_code = 0; @@ -164,10 +165,15 @@ bool SendHttpRequest(std::unique_ptr&& request, HttpResponse& respo response.mStatusCode = (int32_t)http_code; success = true; break; + } else if (request->mTryCnt < request->mMaxTryCnt) { + LOG_WARNING(sLogger, + ("failed to send http request", "retry immediately")("request address", request.get())( + "try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(res))); + ++request->mTryCnt; } else { - LOG_WARNING(sLogger,("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)("errMsg", curl_easy_strerror(res))("request address", request.get())); + break; } - } + } if (headers != NULL) { curl_slist_free_all(headers); } diff --git a/core/common/http/HttpRequest.h b/core/common/http/HttpRequest.h index bd12b9f971..13fc9ec12f 100644 --- a/core/common/http/HttpRequest.h +++ b/core/common/http/HttpRequest.h @@ -72,7 +72,7 @@ struct HttpRequest { struct AsynHttpRequest : public HttpRequest { HttpResponse mResponse; void* mPrivateData = nullptr; - time_t mEnqueTime = 0; + std::chrono::system_clock::time_point mEnqueTime; AsynHttpRequest(const std::string& method, bool httpsFlag, diff --git a/core/instance_config/InstanceConfigManager.cpp b/core/config/InstanceConfigManager.cpp similarity index 97% rename from core/instance_config/InstanceConfigManager.cpp rename to core/config/InstanceConfigManager.cpp index 986cc5ba5f..24673cea85 100644 --- a/core/instance_config/InstanceConfigManager.cpp +++ b/core/config/InstanceConfigManager.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "instance_config/InstanceConfigManager.h" +#include "config/InstanceConfigManager.h" #include "app_config/AppConfig.h" #include "config/feedbacker/ConfigFeedbackReceiver.h" diff --git a/core/instance_config/InstanceConfigManager.h b/core/config/InstanceConfigManager.h similarity index 100% rename from core/instance_config/InstanceConfigManager.h rename to core/config/InstanceConfigManager.h diff --git a/core/config/common_provider/CommonConfigProvider.cpp b/core/config/common_provider/CommonConfigProvider.cpp index cc62335208..52186d411d 100644 --- a/core/config/common_provider/CommonConfigProvider.cpp +++ b/core/config/common_provider/CommonConfigProvider.cpp @@ -45,7 +45,7 @@ namespace logtail { std::string CommonConfigProvider::configVersion = "version"; void CommonConfigProvider::Init(const string& dir) { - sName = "CommonConfigProvider"; + sName = "common config provider"; ConfigProvider::Init(dir); LoadConfigFile(); diff --git a/core/deps/README.md b/core/deps/README.md deleted file mode 100644 index 72da636884..0000000000 --- a/core/deps/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Dependencies - -- [gflags](https://github.com/gflags/gflags): v2.2.1 -- [spdlog](https://github.com/gabime/spdlog/): v1.1.0 -- [jsoncpp](https://github.com/open-source-parsers/jsoncpp): v1.8.4 -- [protobuf](https://github.com/protocolbuffers/protobuf): v2.5.0 -- [libunwind](https://www.nongnu.org/libunwind/): v1.2.1 -- [re2](https://github.com/google/re2): release/2018-09-01 -- [boost](https://www.boost.org/): v1.6.8 -- [rapidjson](https://github.com/Tencent/rapidjson/commit/c0ca05f6ddb690c943a7fc59efab9ca9cfc39736): latest -- [cityhash](https://github.com/google/cityhash/commit/8af9b8c2b889d80c22d6bc26ba0df1afb79a30db): latest -- [liblz4](https://github.com/lz4/lz4): v1.8.3 -- [zlib](https://zlib.net/): v1.2.11 -- libuuid - - get_dmi_uuid -- libpthread -- libdl diff --git a/core/deps/build.sh b/core/deps/build.sh deleted file mode 100755 index 76878ea89b..0000000000 --- a/core/deps/build.sh +++ /dev/null @@ -1,157 +0,0 @@ -#!/bin/bash -# Copyright 2022 iLogtail Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -DESTINATION=/opt/logtail/deps -LIB_DIR=$DESTINATION/lib -LIB64_DIR=$DESTINATION/lib64 - -enter_cmake_build_dir() -{ - # rm -rf build-static - mkdir build-static - cd build-static -} - -remove_old_lib() -{ - sudo rm -f $LIB_DIR/$1* - sudo rm -f $LIB64_DIR/$1* -} - -# gtest -remove_old_lib libgtest -remove_old_lib libgmock -rm -rf googletest-release-1.8.1 -tar xf gtest-1.8.1.tar.gz -cd googletest-release-1.8.1 -enter_cmake_build_dir -cmake -DCMAKE_INSTALL_PREFIX=$DESTINATION -DCMAKE_BUILD_TYPE=Release .. -make -j24 -sudo make install -sudo ln -s $LIB64_DIR/libgtest.a $LIB_DIR/libgtest.a -cd ../.. - -# protobuf -remove_old_lib libprotobuf -rm -rf protobuf-3.6.1 -tar xf protobuf-cpp-3.6.1.tar.gz -cd protobuf-3.6.1 -ln -s `pwd`/../googletest-release-1.8.1 gtest -./configure --prefix=$DESTINATION --disable-shared --enable-static -make -j24 -sudo make install -cd .. - -# re2 -remove_old_lib libre2 -rm -rf re2-2018-09-01 -tar xf re2-2018-09-01.tar.gz -cd re2-2018-09-01 -enter_cmake_build_dir -cmake -DCMAKE_INSTALL_PREFIX=$DESTINATION -DCMAKE_BUILD_TYPE=Release .. -make -j24 -sudo make install -cd ../.. - -# tcmalloc -remove_old_lib libtcmalloc -rm -rf gperftools-gperftools-2.7 -tar xf gperftools-2.7.tar.gz -cd gperftools-gperftools-2.7 -./autogen.sh -./configure --prefix=$DESTINATION --disable-shared --enable-static -make -j24 -sudo make install -cd .. - -# cityhash -remove_old_lib libcityhash -cd cityhash -sudo make clean -./configure --prefix=$DESTINATION --disable-shared --enable-static -make -j24 -sudo make install -cd .. - -# gflags -remove_old_lib libgflags -rm -rf gflags-2.2.1 -tar xf gflags-2.2.1.tar.gz -cd gflags-2.2.1 -enter_cmake_build_dir -cmake -DCMAKE_INSTALL_PREFIX=$DESTINATION -DCMAKE_BUILD_TYPE=Release .. -make -j24 -sudo make install -cd ../.. - -# jsoncpp -remove_old_lib libjsoncpp -rm -rf jsoncpp-1.8.4 -tar xf jsoncpp-1.8.4.tar.gz -cd jsoncpp-1.8.4 -enter_cmake_build_dir -cmake -DCMAKE_INSTALL_PREFIX=$DESTINATION -DCMAKE_BUILD_TYPE=Release .. -make -j24 -sudo make install -sudo ln -s $LIB64_DIR/libjsoncpp.a $LIB_DIR/libjsoncpp.a -cd ../../ - -# boost -remove_old_lib libboost -rm -rf boost_1_68_0 -tar xf boost_1_68_0.tar.gz -cd boost_1_68_0 -./bootstrap.sh --prefix=$DESTINATION -sudo env PATH=$PATH ./b2 install -j24 -cd .. - -# lz4 -remove_old_lib liblz4 -rm -rf lz4-1.8.3 -tar xf lz4-1.8.3.tar.gz -cd lz4-1.8.3 -make -j24 -sudo make prefix=$DESTINATION install -cd .. - -# zlib -remove_old_lib libz -rm -rf zlib-1.2.11 -tar xf zlib-1.2.11.tar.gz -cd zlib-1.2.11 -./configure --prefix=$DESTINATION --static -make -j24 -sudo make install -cd .. - -# curl -remove_old_lib libcurl -rm -rf curl-7.61.1 -tar xf curl-7.61.1.tar.gz -cd curl-7.61.1 -./configure --prefix=$DESTINATION --disable-shared --enable-static CFLAGS="-std=c90 -D_POSIX_C_SOURCE -Werror=implicit-function-declaration" -make -j24 -sudo make install -cd .. - -# unwind -remove_old_lib libunwind -cd libunwind-1.2.1 -sudo make clean -./autogen.sh -./configure --prefix=$DESTINATION --disable-shared --enable-static -make -j24 -sudo make install diff --git a/core/models/PipelineEventGroup.h b/core/models/PipelineEventGroup.h index a83be6e61d..cecba0f315 100644 --- a/core/models/PipelineEventGroup.h +++ b/core/models/PipelineEventGroup.h @@ -86,6 +86,8 @@ class PipelineEventGroup { MetricEvent* AddMetricEvent(); SpanEvent* AddSpanEvent(); void SwapEvents(EventsContainer& other) { mEvents.swap(other); } + void ReserveEvents(size_t size) { mEvents.reserve(size); } + std::shared_ptr& GetSourceBuffer() { return mSourceBuffer; } void SetMetadata(EventGroupMetaKey key, StringView val); diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index 74bbf70aa4..e66c3769c1 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -718,7 +718,6 @@ void LoongCollectorMonitor::Init() { mAgentGoRoutinesTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_GO_ROUTINES_TOTAL); mAgentOpenFdTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_OPEN_FD_TOTAL); mAgentConfigTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL); - LOG_INFO(sLogger, ("LoongCollectorMonitor", "started")); } void LoongCollectorMonitor::Stop() { diff --git a/core/monitor/metric_constants/MetricConstants.h b/core/monitor/metric_constants/MetricConstants.h index b3650a3359..46da86ceba 100644 --- a/core/monitor/metric_constants/MetricConstants.h +++ b/core/monitor/metric_constants/MetricConstants.h @@ -66,6 +66,10 @@ extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL; extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL; extern const std::string METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES; extern const std::string METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS; +extern const std::string METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL; +extern const std::string METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL; +extern const std::string METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES; +extern const std::string METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS; extern const std::string METRIC_PIPELINE_START_TIME; ////////////////////////////////////////////////////////////////////////// @@ -125,6 +129,7 @@ extern const std::string METRIC_PLUGIN_OUT_SUCCESSFUL_EVENTS_TOTAL; /********************************************************** * all flusher (所有发送插件通用指标) **********************************************************/ +extern const std::string METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS; extern const std::string METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL; extern const std::string METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL; extern const std::string METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL; diff --git a/core/monitor/metric_constants/PipelineMetrics.cpp b/core/monitor/metric_constants/PipelineMetrics.cpp index 7f1bf75e3a..85fd7f3e8f 100644 --- a/core/monitor/metric_constants/PipelineMetrics.cpp +++ b/core/monitor/metric_constants/PipelineMetrics.cpp @@ -31,6 +31,10 @@ const string METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL = "pipeline_processors_i const string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL = "pipeline_processors_in_event_groups_total"; const string METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES = "pipeline_processors_in_size_bytes"; const string METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS = "pipeline_processors_total_process_time_ms"; +const string METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL = "pipeline_flushers_in_events_total"; +const string METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL = "pipeline_flushers_in_event_groups_total"; +const string METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES = "pipeline_flushers_in_size_bytes"; +const string METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS = "pipeline_flushers_total_package_time_ms"; const string METRIC_PIPELINE_START_TIME = "pipeline_start_time"; } // namespace logtail diff --git a/core/monitor/metric_constants/PluginMetrics.cpp b/core/monitor/metric_constants/PluginMetrics.cpp index 47499ca957..1a7923531d 100644 --- a/core/monitor/metric_constants/PluginMetrics.cpp +++ b/core/monitor/metric_constants/PluginMetrics.cpp @@ -100,6 +100,7 @@ const string METRIC_PLUGIN_PARSE_STDOUT_TOTAL = "plugin_parse_stdout_total"; /********************************************************** * all flusher (所有发送插件通用指标) **********************************************************/ +const string METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS = "plugin_flusher_total_package_time_ms"; const string METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL = "plugin_flusher_send_total"; const string METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL = "plugin_flusher_send_done_total"; const string METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL = "plugin_flusher_success_total"; diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index f85463282d..db517363f8 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -327,6 +327,10 @@ bool Pipeline::Init(PipelineConfig&& config) { mProcessorsInSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES); mProcessorsTotalProcessTimeMs = mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS); + mFlushersInGroupsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL); + mFlushersInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL); + mFlushersInSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES); + mFlushersTotalPackageTimeMs = mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS); return true; } @@ -375,6 +379,13 @@ void Pipeline::Process(vector& logGroupList, size_t inputInd } bool Pipeline::Send(vector&& groupList) { + for (const auto& group : groupList) { + mFlushersInEventsTotal->Add(group.GetEvents().size()); + mFlushersInSizeBytes->Add(group.DataSize()); + } + mFlushersInGroupsTotal->Add(groupList.size()); + + auto before = chrono::system_clock::now(); bool allSucceeded = true; for (auto& group : groupList) { auto res = mRouter.Route(group); @@ -388,6 +399,7 @@ bool Pipeline::Send(vector&& groupList) { allSucceeded = mFlushers[item.first]->Send(std::move(item.second)) && allSucceeded; } } + mFlushersTotalPackageTimeMs->Add(chrono::system_clock::now() - before); return allSucceeded; } diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index 815b9907bf..9e4b0dd1be 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -109,6 +109,10 @@ class Pipeline { CounterPtr mProcessorsInGroupsTotal; CounterPtr mProcessorsInSizeBytes; TimeCounterPtr mProcessorsTotalProcessTimeMs; + CounterPtr mFlushersInGroupsTotal; + CounterPtr mFlushersInEventsTotal; + CounterPtr mFlushersInSizeBytes; + TimeCounterPtr mFlushersTotalPackageTimeMs; #ifdef APSARA_UNIT_TEST_MAIN friend class PipelineMock; diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index b683e20f79..da33c8bd20 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -200,9 +200,6 @@ void PipelineManager::StopAllPipelines() { LogtailPlugin::GetInstance()->StopAllPipelines(false); - // TODO: make it common - FlusherSLS::RecycleResourceIfNotUsed(); - // Sender should be stopped after profiling threads are stopped. LOG_INFO(sLogger, ("stop all pipelines", "succeeded")); } diff --git a/core/pipeline/plugin/instance/FlusherInstance.cpp b/core/pipeline/plugin/instance/FlusherInstance.cpp index 04ddd12b06..38cb9dd3b8 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.cpp +++ b/core/pipeline/plugin/instance/FlusherInstance.cpp @@ -16,7 +16,10 @@ #include "monitor/metric_constants/MetricConstants.h" +using namespace std; + namespace logtail { + bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline) { mPlugin->SetContext(context); mPlugin->SetPluginID(PluginID()); @@ -25,15 +28,22 @@ bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, return false; } + mInGroupsTotal = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_EVENT_GROUPS_TOTAL); mInEventsTotal = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_EVENTS_TOTAL); mInSizeBytes = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_SIZE_BYTES); + mTotalPackageTimeMs = mPlugin->GetMetricsRecordRef().CreateTimeCounter(METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS); return true; } bool FlusherInstance::Send(PipelineEventGroup&& g) { + mInGroupsTotal->Add(1); mInEventsTotal->Add(g.GetEvents().size()); mInSizeBytes->Add(g.DataSize()); - return mPlugin->Send(std::move(g)); + + auto before = chrono::system_clock::now(); + auto res = mPlugin->Send(std::move(g)); + mTotalPackageTimeMs->Add(chrono::system_clock::now() - before); + return res; } } // namespace logtail diff --git a/core/pipeline/plugin/instance/FlusherInstance.h b/core/pipeline/plugin/instance/FlusherInstance.h index 6ce367051d..69bbba3db2 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.h +++ b/core/pipeline/plugin/instance/FlusherInstance.h @@ -46,8 +46,10 @@ class FlusherInstance : public PluginInstance { private: std::unique_ptr mPlugin; + CounterPtr mInGroupsTotal; CounterPtr mInEventsTotal; CounterPtr mInSizeBytes; + TimeCounterPtr mTotalPackageTimeMs; }; } // namespace logtail diff --git a/core/pipeline/queue/ExactlyOnceQueueManager.cpp b/core/pipeline/queue/ExactlyOnceQueueManager.cpp index 7446f6a9bc..35e8271e6b 100644 --- a/core/pipeline/queue/ExactlyOnceQueueManager.cpp +++ b/core/pipeline/queue/ExactlyOnceQueueManager.cpp @@ -263,22 +263,6 @@ void ExactlyOnceQueueManager::ClearTimeoutQueues() { } } -uint32_t ExactlyOnceQueueManager::GetInvalidProcessQueueCnt() const { - uint32_t res = 0; - lock_guard lock(mProcessQueueMux); - for (const auto& q : mProcessQueues) { - if (q.second->IsValidToPush()) { - ++res; - } - } - return res; -} - -uint32_t ExactlyOnceQueueManager::GetProcessQueueCnt() const { - lock_guard lock(mProcessQueueMux); - return mProcessQueues.size(); -} - void ExactlyOnceQueueManager::SetPipelineForSenderItems(QueueKey key, const std::shared_ptr& p) { lock_guard lock(mSenderQueueMux); auto iter = mSenderQueues.find(key); diff --git a/core/pipeline/queue/ExactlyOnceQueueManager.h b/core/pipeline/queue/ExactlyOnceQueueManager.h index d53be58bd0..6fb51ab0d3 100644 --- a/core/pipeline/queue/ExactlyOnceQueueManager.h +++ b/core/pipeline/queue/ExactlyOnceQueueManager.h @@ -71,10 +71,6 @@ class ExactlyOnceQueueManager { void ClearTimeoutQueues(); - // TODO: should be removed when self-telemetry is refactored - uint32_t GetInvalidProcessQueueCnt() const; - uint32_t GetProcessQueueCnt() const; - #ifdef APSARA_UNIT_TEST_MAIN void Clear(); #endif diff --git a/core/pipeline/queue/ProcessQueueManager.cpp b/core/pipeline/queue/ProcessQueueManager.cpp index a7e32324cc..a7b13dccfe 100644 --- a/core/pipeline/queue/ProcessQueueManager.cpp +++ b/core/pipeline/queue/ProcessQueueManager.cpp @@ -329,23 +329,6 @@ void ProcessQueueManager::ResetCurrentQueueIndex() { mCurrentQueueIndex.second = mPriorityQueue[0].begin(); } -uint32_t ProcessQueueManager::GetInvalidCnt() const { - uint32_t res = 0; - lock_guard lock(mQueueMux); - for (const auto& q : mQueues) { - if (q.second.second == QueueType::BOUNDED - && static_cast(q.second.first->get())->IsValidToPush()) { - ++res; - } - } - return res; -} - -uint32_t ProcessQueueManager::GetCnt() const { - lock_guard lock(mQueueMux); - return mQueues.size(); -} - #ifdef APSARA_UNIT_TEST_MAIN void ProcessQueueManager::Clear() { lock_guard lock(mQueueMux); diff --git a/core/pipeline/queue/ProcessQueueManager.h b/core/pipeline/queue/ProcessQueueManager.h index ab877a2c7d..a77a5769d2 100644 --- a/core/pipeline/queue/ProcessQueueManager.h +++ b/core/pipeline/queue/ProcessQueueManager.h @@ -68,10 +68,6 @@ class ProcessQueueManager : public FeedbackInterface { bool Wait(uint64_t ms); void Trigger(); - // TODO: should be removed when self-telemetry is refactored - uint32_t GetInvalidCnt() const; - uint32_t GetCnt() const; - private: ProcessQueueManager(); ~ProcessQueueManager() = default; diff --git a/core/pipeline/queue/SenderQueueItem.h b/core/pipeline/queue/SenderQueueItem.h index 151abad0db..e791479f19 100644 --- a/core/pipeline/queue/SenderQueueItem.h +++ b/core/pipeline/queue/SenderQueueItem.h @@ -43,7 +43,7 @@ struct SenderQueueItem { std::atomic mStatus; std::chrono::system_clock::time_point mFirstEnqueTime; - time_t mLastSendTime = 0; + std::chrono::system_clock::time_point mLastSendTime; uint32_t mTryCnt = 1; SenderQueueItem(std::string&& data, diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 3d7c86a338..6be224f86a 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -22,16 +22,16 @@ #include "common/FileSystemUtil.h" #include "common/RuntimeUtil.h" #include "common/StringTools.h" -#include "plugin/flusher/sls/FlusherSLS.h" -#include "plugin/flusher/sls/SLSClientManager.h" -#include "protobuf/sls/sls_logs.pb.h" #include "logger/Logger.h" #include "monitor/LogtailAlarm.h" -#include "provider/Provider.h" +#include "pipeline/limiter/RateLimiter.h" #include "pipeline/queue/QueueKeyManager.h" #include "pipeline/queue/SLSSenderQueueItem.h" +#include "plugin/flusher/sls/FlusherSLS.h" +#include "plugin/flusher/sls/SLSClientManager.h" +#include "protobuf/sls/sls_logs.pb.h" +#include "provider/Provider.h" #include "sdk/Exception.h" -#include "pipeline/limiter/RateLimiter.h" #include "sls_control/SLSControl.h" DEFINE_FLAG_INT32(write_secondary_wait_timeout, "interval of dump seconary buffer from memory to file, seconds", 2); @@ -68,7 +68,7 @@ void DiskBufferWriter::Stop() { } mStopCV.notify_one(); { - future_status s = mBufferWriterThreadRes.wait_for(chrono::seconds(3)); + future_status s = mBufferWriterThreadRes.wait_for(chrono::seconds(5)); if (s == future_status::ready) { LOG_INFO(sLogger, ("disk buffer writer", "stopped successfully")); } else { @@ -76,7 +76,8 @@ void DiskBufferWriter::Stop() { } } { - future_status s = mBufferSenderThreadRes.wait_for(chrono::seconds(1)); + // timeout should be larger than network timeout, which is 15 for now + future_status s = mBufferSenderThreadRes.wait_for(chrono::seconds(20)); if (s == future_status::ready) { LOG_INFO(sLogger, ("disk buffer sender", "stopped successfully")); } else { @@ -158,6 +159,7 @@ void DiskBufferWriter::BufferSenderThread() { } continue; } + lock.unlock(); // mIsSendingBuffer = true; int32_t fileToSendCount = int32_t(filesToSend.size()); int32_t bufferFileNumValue = AppConfig::GetInstance()->GetNumOfBufferFile(); @@ -197,6 +199,7 @@ void DiskBufferWriter::BufferSenderThread() { } } // mIsSendingBuffer = false; + lock.lock(); if (mStopCV.wait_for(lock, chrono::seconds(mCheckPeriod), [this]() { return !mIsSendBufferThreadRunning; })) { break; } @@ -509,6 +512,12 @@ void DiskBufferWriter::SendEncryptionBuffer(const std::string& filename, int32_t filename); if (!sendResult) writeBack = true; + { + lock_guard lock(mBufferSenderThreadRunningMux); + if (!mIsSendBufferThreadRunning) { + return; + } + } } if (!writeBack) { remove(filename.c_str()); diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 9f655c5463..9f5b4cd08b 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -120,7 +120,8 @@ shared_ptr GetConcurrencyLimiter() { return make_shared(AppConfig::GetInstance()->GetSendRequestConcurrency()); } -shared_ptr FlusherSLS::GetLogstoreConcurrencyLimiter(const std::string& project, const std::string& logstore) { +shared_ptr FlusherSLS::GetLogstoreConcurrencyLimiter(const std::string& project, + const std::string& logstore) { lock_guard lock(sMux); std::string key = project + "-" + logstore; @@ -509,11 +510,9 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mQueueKey, mPluginID, *mContext, - { - {"region", GetRegionConcurrencyLimiter(mRegion)}, - {"project", GetProjectConcurrencyLimiter(mProject)}, - {"logstore", GetLogstoreConcurrencyLimiter(mProject, mLogstore)} - }, + {{"region", GetRegionConcurrencyLimiter(mRegion)}, + {"project", GetProjectConcurrencyLimiter(mProject)}, + {"logstore", GetLogstoreConcurrencyLimiter(mProject, mLogstore)}}, mMaxSendRate); } @@ -537,12 +536,14 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mSuccessCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL); mNetworkErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL); mServerErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL); - mShardWriteQuotaErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SHARD_WRITE_QUOTA_ERROR_TOTAL); + mShardWriteQuotaErrorCnt + = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SHARD_WRITE_QUOTA_ERROR_TOTAL); mProjectQuotaErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_PROJECT_QUOTA_ERROR_TOTAL); mUnauthErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_UNAUTH_ERROR_TOTAL); mParamsErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_PARAMS_ERROR_TOTAL); mSequenceIDErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SEQUENCE_ID_ERROR_TOTAL); - mRequestExpiredErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_REQUEST_EXPRIRED_ERROR_TOTAL); + mRequestExpiredErrorCnt + = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_REQUEST_EXPRIRED_ERROR_TOTAL); mOtherErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OTHER_ERROR_TOTAL); return true; @@ -686,17 +687,20 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) cpt->Commit(); cpt->IncreaseSequenceID(); } - LOG_DEBUG(sLogger, - ("send data to sls succeeded, item address", - item)("request id", slsResponse.mRequestId)("config", configName)("region", mRegion)( - "project", mProject)("logstore", data->mLogstore)("response time", curTime - data->mLastSendTime)( - "total send time", - ToString(chrono::duration_cast(curSystemTime - item->mFirstEnqueTime).count()) - + "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentEndpoint)("is profile data", - isProfileData)); + LOG_DEBUG( + sLogger, + ("send data to sls succeeded, item address", item)("request id", slsResponse.mRequestId)( + "config", configName)("region", mRegion)("project", mProject)("logstore", data->mLogstore)( + "response time", + ToString(chrono::duration_cast(curSystemTime - item->mLastSendTime).count()) + + "ms")( + "total send time", + ToString(chrono::duration_cast(curSystemTime - item->mFirstEnqueTime).count()) + + "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentEndpoint)("is profile data", + isProfileData)); GetRegionConcurrencyLimiter(mRegion)->OnSuccess(); GetProjectConcurrencyLimiter(mProject)->OnSuccess(); - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey); DealSenderQueueItemAfterSend(item, false); if (mSuccessCnt) { @@ -871,14 +875,16 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) } #define LOG_PATTERN \ - ("failed to send request", failDetail.str())("operation", GetOperationString(operation))( \ - "suggestion", suggestion.str())("item address", item)("request id", slsResponse.mRequestId)( \ - "status code", slsResponse.mStatusCode)("error code", slsResponse.mErrorCode)( \ - "errMsg", slsResponse.mErrorMsg)("config", configName)("region", mRegion)("project", mProject)( \ - "logstore", data->mLogstore)("try cnt", data->mTryCnt)("response time", curTime - data->mLastSendTime)( \ - "total send time", \ - ToString(chrono::duration_cast(curSystemTime - data->mFirstEnqueTime).count()) \ - + "ms")("endpoint", data->mCurrentEndpoint)("is profile data", isProfileData) + ("failed to send request", failDetail.str())("operation", GetOperationString(operation))("suggestion", \ + suggestion.str())( \ + "item address", item)("request id", slsResponse.mRequestId)("status code", slsResponse.mStatusCode)( \ + "error code", slsResponse.mErrorCode)("errMsg", slsResponse.mErrorMsg)("config", configName)( \ + "region", mRegion)("project", mProject)("logstore", data->mLogstore)("try cnt", data->mTryCnt)( \ + "response time", \ + ToString(chrono::duration_cast(curSystemTime - data->mLastSendTime).count()) \ + + "ms")("total send time", \ + ToString(chrono::duration_cast(curSystemTime - data->mFirstEnqueTime).count()) \ + + "ms")("endpoint", data->mCurrentEndpoint)("is profile data", isProfileData) switch (operation) { case OperationOnFail::RETRY_IMMEDIATELY: @@ -941,7 +947,8 @@ bool FlusherSLS::Send(string&& data, const string& shardHashKey, const string& l key = QueueKeyManager::GetInstance()->GetKey(mProject + "-" + mLogstore); if (SenderQueueManager::GetInstance()->GetQueue(key) == nullptr) { PipelineContext ctx; - SenderQueueManager::GetInstance()->CreateQueue(key, "", ctx, std::unordered_map>()); + SenderQueueManager::GetInstance()->CreateQueue( + key, "", ctx, std::unordered_map>()); } } return Flusher::PushToQueue(make_unique(std::move(compressedData), diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index c3b73467cb..33a0381bef 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -152,17 +152,17 @@ void SLSClientManager::Stop() { if (mDataServerSwitchPolicy == EndpointSwitchPolicy::DESIGNATED_FIRST) { future_status s = mProbeNetworkThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { - LOG_INFO(sLogger, ("test endpoint thread", "stopped successfully")); + LOG_INFO(sLogger, ("sls endpoint probe", "stopped successfully")); } else { - LOG_WARNING(sLogger, ("test endpoint thread", "forced to stopped")); + LOG_WARNING(sLogger, ("sls endpoint probe", "forced to stopped")); } } if (BOOL_FLAG(send_prefer_real_ip)) { future_status s = mUpdateRealIpThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { - LOG_INFO(sLogger, ("update real ip thread", "stopped successfully")); + LOG_INFO(sLogger, ("sls real ip update", "stopped successfully")); } else { - LOG_WARNING(sLogger, ("update real ip thread", "forced to stopped")); + LOG_WARNING(sLogger, ("sls real ip update", "forced to stopped")); } } } @@ -385,6 +385,7 @@ bool SLSClientManager::HasNetworkAvailable() { } void SLSClientManager::ProbeNetworkThread() { + LOG_INFO(sLogger, ("sls endpoint probe", "started")); // pair represents the weight of each endpoint map>> unavaliableEndpoints; set unavaliableRegions; @@ -539,6 +540,7 @@ void SLSClientManager::UpdateSendClientRealIp(sdk::Client* client, const string& } void SLSClientManager::UpdateRealIpThread() { + LOG_INFO(sLogger, ("sls real ip update", "started")); int32_t lastUpdateRealIpTime = 0; vector regionEndpointArray; vector regionArray; diff --git a/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp b/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp index 35716caf59..38adf9571f 100644 --- a/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp +++ b/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp @@ -148,10 +148,6 @@ void ProcessorSplitLogStringNative::ProcessEvent(PipelineEventGroup& logGroup, StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); } - // TODO: remove the following code after the flusher refactorization - if (logGroup.GetExactlyOnceCheckpoint() != nullptr) { - logGroup.GetExactlyOnceCheckpoint()->positions.emplace_back(offset, content.size()); - } newEvents.emplace_back(std::move(targetEvent)); begin += content.size() + 1; } diff --git a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp index c5970b6dc2..cfd178e0fe 100644 --- a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp +++ b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp @@ -317,10 +317,6 @@ void ProcessorSplitMultilineLogStringNative::CreateNewEvent(const StringView& co StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); } - // TODO: remove the following code after the flusher refactorization - if (logGroup.GetExactlyOnceCheckpoint() != nullptr) { - logGroup.GetExactlyOnceCheckpoint()->positions.emplace_back(offset, content.size()); - } newEvents.emplace_back(std::move(targetEvent)); } diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 1723cca935..ae1d55dc1a 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -36,6 +36,7 @@ using namespace std; DEFINE_FLAG_INT32(check_send_client_timeout_interval, "", 600); DEFINE_FLAG_BOOL(enable_flow_control, "if enable flow control", true); DEFINE_FLAG_BOOL(enable_send_tps_smoothing, "avoid web server load burst", true); +DEFINE_FLAG_INT32(flusher_runner_exit_timeout_secs, "", 60); static const int SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND = 3; @@ -76,8 +77,8 @@ bool FlusherRunner::LoadModuleConfig(bool isInit) { if (isInit) { // Only handle parameters that do not allow hot loading } - auto maxBytePerSec = AppConfig::GetInstance()->MergeInt32(kDefaultMaxSendBytePerSec, - AppConfig::GetInstance()->GetMaxBytePerSec(), "max_bytes_per_sec", ValidateFn); + auto maxBytePerSec = AppConfig::GetInstance()->MergeInt32( + kDefaultMaxSendBytePerSec, AppConfig::GetInstance()->GetMaxBytePerSec(), "max_bytes_per_sec", ValidateFn); AppConfig::GetInstance()->SetMaxBytePerSec(maxBytePerSec); UpdateSendFlowControl(); return true; @@ -103,7 +104,7 @@ void FlusherRunner::UpdateSendFlowControl() { void FlusherRunner::Stop() { mIsFlush = true; SenderQueueManager::GetInstance()->Trigger(); - future_status s = mThreadRes.wait_for(chrono::seconds(10)); + future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(flusher_runner_exit_timeout_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("flusher runner", "stopped successfully")); } else { @@ -143,10 +144,13 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) { } auto req = static_cast(item->mFlusher)->BuildRequest(item); - item->mLastSendTime = time(nullptr); - req->mEnqueTime = item->mLastSendTime; + req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now(); HttpSink::GetInstance()->AddRequest(std::move(req)); ++mHttpSendingCnt; + LOG_DEBUG(sLogger, + ("send item to http sink, item address", item)("config-flusher-dst", + QueueKeyManager::GetInstance()->GetName(item->mQueueKey))( + "sending cnt", ToString(mHttpSendingCnt.load()))); } void FlusherRunner::Run() { @@ -162,6 +166,7 @@ void FlusherRunner::Run() { if (items.empty()) { SenderQueueManager::GetInstance()->Wait(1000); } else { + LOG_DEBUG(sLogger, ("got items from sender queue, cnt", items.size())); for (auto itr = items.begin(); itr != items.end(); ++itr) { mInItemDataSizeBytes->Add((*itr)->mData.size()); mInItemRawDataSizeBytes->Add((*itr)->mRawSize); diff --git a/core/runner/FlusherRunner.h b/core/runner/FlusherRunner.h index 1ce797868a..4c25d1e350 100644 --- a/core/runner/FlusherRunner.h +++ b/core/runner/FlusherRunner.h @@ -45,7 +45,7 @@ class FlusherRunner { // TODO: should be private void PushToHttpSink(SenderQueueItem* item, bool withLimit = true); - int32_t GetSendingBufferCount() { return mHttpSendingCnt; } + int32_t GetSendingBufferCount() { return mHttpSendingCnt.load(); } bool LoadModuleConfig(bool isInit); @@ -62,7 +62,7 @@ class FlusherRunner { std::future mThreadRes; std::atomic_bool mIsFlush = false; - std::atomic_int mHttpSendingCnt{0}; + std::atomic_int32_t mHttpSendingCnt{0}; // TODO: temporarily here int32_t mLastCheckSendClientTime = 0; diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index 21f8f23e1a..1291e1fa2f 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -39,6 +39,7 @@ using namespace std; DEFINE_FLAG_BOOL(enable_chinese_tag_path, "Enable Chinese __tag__.__path__", true); #endif DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1); +DEFINE_FLAG_INT32(processor_runner_exit_timeout_secs, "", 60); namespace logtail { @@ -62,7 +63,8 @@ void ProcessorRunner::Stop() { mIsFlush = true; ProcessQueueManager::GetInstance()->Trigger(); for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) { - future_status s = mThreadRes[threadNo].wait_for(chrono::seconds(1)); + future_status s + = mThreadRes[threadNo].wait_for(chrono::seconds(INT32_FLAG(processor_runner_exit_timeout_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("processor runner", "stopped successfully")("threadNo", threadNo)); } else { diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 032ee441a2..06372e36e2 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -15,6 +15,7 @@ #include "runner/sink/http/HttpSink.h" #include "app_config/AppConfig.h" +#include "common/Flags.h" #include "common/StringTools.h" #include "common/http/Curl.h" #include "logger/Logger.h" @@ -24,6 +25,8 @@ #include "pipeline/queue/SenderQueueItem.h" #include "runner/FlusherRunner.h" +DEFINE_FLAG_INT32(http_sink_exit_timeout_secs, "", 5); + using namespace std; namespace logtail { @@ -43,8 +46,10 @@ bool HttpSink::Init() { mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); mOutSuccessfulItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_OUT_SUCCESSFUL_ITEMS_TOTAL); mOutFailedItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_OUT_FAILED_ITEMS_TOTAL); - mSuccessfulItemTotalResponseTimeMs = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_SUCCESSFUL_ITEM_TOTAL_RESPONSE_TIME_MS); - mFailedItemTotalResponseTimeMs = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_FAILED_ITEM_TOTAL_RESPONSE_TIME_MS); + mSuccessfulItemTotalResponseTimeMs + = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_SUCCESSFUL_ITEM_TOTAL_RESPONSE_TIME_MS); + mFailedItemTotalResponseTimeMs + = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_FAILED_ITEM_TOTAL_RESPONSE_TIME_MS); mSendingItemsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_SINK_SENDING_ITEMS_TOTAL); mSendConcurrency = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_SINK_SEND_CONCURRENCY); @@ -57,7 +62,7 @@ bool HttpSink::Init() { void HttpSink::Stop() { mIsFlush = true; - future_status s = mThreadRes.wait_for(chrono::seconds(1)); + future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(http_sink_exit_timeout_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("http sink", "stopped successfully")); } else { @@ -73,11 +78,13 @@ void HttpSink::Run() { unique_ptr request; if (mQueue.WaitAndPop(request, 500)) { mInItemsTotal->Add(1); - LOG_DEBUG( - sLogger, - ("got item from flusher runner, item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( - "wait time", ToString(time(nullptr) - request->mEnqueTime))("try cnt", ToString(request->mTryCnt))); + LOG_DEBUG(sLogger, + ("got item from flusher runner, item address", request->mItem)( + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "wait time", + ToString(chrono::duration_cast(chrono::system_clock::now() + - request->mEnqueTime) + .count()))("try cnt", ToString(request->mTryCnt))); if (!AddRequestToClient(std::move(request))) { continue; } @@ -117,7 +124,8 @@ bool HttpSink::AddRequestToClient(unique_ptr&& request) { LOG_ERROR(sLogger, ("failed to send request", "failed to init curl handler")( "action", "put sender queue item back to sender queue")("item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))); + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); return false; } @@ -135,7 +143,8 @@ bool HttpSink::AddRequestToClient(unique_ptr&& request) { ("failed to send request", "failed to add the easy curl handle to multi_handle")("errMsg", curl_multi_strerror(res))( "action", "put sender queue item back to sender queue")("item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))); + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); return false; } // let sink destruct the request @@ -161,11 +170,13 @@ void HttpSink::DoRun() { unique_ptr request; if (mQueue.TryPop(request)) { mInItemsTotal->Add(1); - LOG_DEBUG( - sLogger, - ("got item from flusher runner, item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( - "wait time", ToString(time(nullptr) - request->mEnqueTime))("try cnt", ToString(request->mTryCnt))); + LOG_DEBUG(sLogger, + ("got item from flusher runner, item address", request->mItem)( + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "wait time", + ToString(chrono::duration_cast(chrono::system_clock::now() + - request->mEnqueTime) + .count()))("try cnt", ToString(request->mTryCnt))); if (AddRequestToClient(std::move(request))) { ++runningHandlers; mSendingItemsTotal->Add(1); @@ -222,10 +233,6 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { auto responseTime = chrono::duration_cast(chrono::system_clock::now() - request->mLastSendTime) .count(); - LOG_DEBUG(sLogger, - ("send http request completed, item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( - "response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt))); switch (msg->data.result) { case CURLE_OK: { long statusCode = 0; @@ -236,22 +243,29 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { mOutSuccessfulItemsTotal->Add(1); mSuccessfulItemTotalResponseTimeMs->Add(responseTime); mSendingItemsTotal->Sub(1); + LOG_DEBUG( + sLogger, + ("send http request succeeded, item address", request->mItem)( + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); break; } default: // considered as network error - if (++request->mTryCnt <= request->mMaxTryCnt) { + if (request->mTryCnt <= request->mMaxTryCnt) { LOG_WARNING( sLogger, - ("failed to send request", "retry immediately")("item address", request->mItem)( - "try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(msg->data.result))( + ("failed to send http request", "retry immediately")("item address", request->mItem)( "config-flusher-dst", - QueueKeyManager::GetInstance()->GetName(request->mItem->mFlusher->GetQueueKey()))); + QueueKeyManager::GetInstance()->GetName(request->mItem->mFlusher->GetQueueKey()))( + "try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(msg->data.result))); // free first,becase mPrivateData will be reset in AddRequestToClient if (request->mPrivateData) { curl_slist_free_all((curl_slist*)request->mPrivateData); request->mPrivateData = nullptr; } + ++request->mTryCnt; AddRequestToClient(unique_ptr(request)); ++runningHandlers; requestReused = true; @@ -259,6 +273,13 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { static_cast(request->mItem->mFlusher) ->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); + LOG_DEBUG( + sLogger, + ("failed to send http request", "abort")("item address", request->mItem)( + "config-flusher-dst", + QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); } mOutFailedItemsTotal->Add(1); mFailedItemTotalResponseTimeMs->Add(responseTime); diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index 66b1b17298..fe41260944 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -19,11 +19,11 @@ #include "common/FileSystemUtil.h" #include "common/version.h" #include "config/ConfigDiff.h" +#include "config/InstanceConfigManager.h" #include "config/common_provider/CommonConfigProvider.h" #include "config/watcher/ConfigWatcher.h" #include "config/watcher/InstanceConfigWatcher.h" #include "gmock/gmock.h" -#include "instance_config/InstanceConfigManager.h" #include "pipeline/PipelineManager.h" #include "unittest/Unittest.h" @@ -132,7 +132,7 @@ void CommonConfigProviderUnittest::TestInit() { MockCommonConfigProvider provider; provider.Init("common_v2"); APSARA_TEST_EQUAL(provider.mSequenceNum, 0); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 1); APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com"); @@ -170,7 +170,7 @@ void CommonConfigProviderUnittest::TestInit() { MockCommonConfigProvider provider; provider.Init("common_v2"); APSARA_TEST_EQUAL(provider.mSequenceNum, 0); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 2); APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com"); @@ -235,7 +235,7 @@ void CommonConfigProviderUnittest::TestInit() { MockCommonConfigProvider provider; provider.Init("common_v2"); APSARA_TEST_EQUAL(provider.mSequenceNum, 0); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, false); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 0); APSARA_TEST_EQUAL(provider.mConfigServerTags.size(), 0); @@ -407,7 +407,7 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { provider.Init("common_v2"); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 1); APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com"); @@ -625,7 +625,7 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { provider.Init("common_v2"); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 1); APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com"); diff --git a/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp b/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp index df4e479674..5adb906538 100644 --- a/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp +++ b/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp @@ -16,7 +16,7 @@ #include "app_config/AppConfig.h" #include "common/JsonUtil.h" #include "config/InstanceConfig.h" -#include "instance_config/InstanceConfigManager.h" +#include "config/InstanceConfigManager.h" #include "runner/FlusherRunner.h" #include "unittest/Unittest.h" diff --git a/core/unittest/models/PipelineEventGroupUnittest.cpp b/core/unittest/models/PipelineEventGroupUnittest.cpp index b0ffd53a45..4a96d31024 100644 --- a/core/unittest/models/PipelineEventGroupUnittest.cpp +++ b/core/unittest/models/PipelineEventGroupUnittest.cpp @@ -23,6 +23,7 @@ namespace logtail { class PipelineEventGroupUnittest : public ::testing::Test { public: void TestSwapEvents(); + void TestReserveEvents(); void TestCopy(); void TestSetMetadata(); void TestDelMetadata(); @@ -49,6 +50,11 @@ void PipelineEventGroupUnittest::TestSwapEvents() { APSARA_TEST_EQUAL_FATAL(0U, mEventGroup->GetEvents().size()); } +void PipelineEventGroupUnittest::TestReserveEvents() { + mEventGroup->ReserveEvents(10); + APSARA_TEST_EQUAL(10U, mEventGroup->GetEvents().capacity()); +} + void PipelineEventGroupUnittest::TestCopy() { mEventGroup->AddLogEvent(); auto res = mEventGroup->Copy(); @@ -136,6 +142,7 @@ void PipelineEventGroupUnittest::TestFromJsonToJson() { } UNIT_TEST_CASE(PipelineEventGroupUnittest, TestSwapEvents) +UNIT_TEST_CASE(PipelineEventGroupUnittest, TestReserveEvents) UNIT_TEST_CASE(PipelineEventGroupUnittest, TestCopy) UNIT_TEST_CASE(PipelineEventGroupUnittest, TestSetMetadata) UNIT_TEST_CASE(PipelineEventGroupUnittest, TestDelMetadata) diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 270acf1b33..743bfdf7e1 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -2749,6 +2749,16 @@ void PipelineUnittest::TestSend() const { configs.emplace_back(0, nullptr); configs.emplace_back(1, nullptr); pipeline.mRouter.Init(configs, ctx); + + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, {}); + pipeline.mFlushersInGroupsTotal + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL); + pipeline.mFlushersInEventsTotal + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL); + pipeline.mFlushersInSizeBytes + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES); + pipeline.mFlushersTotalPackageTimeMs + = pipeline.mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS); { // all valid vector group; @@ -2804,6 +2814,16 @@ void PipelineUnittest::TestSend() const { configs.emplace_back(configJson.size(), nullptr); pipeline.mRouter.Init(configs, ctx); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, {}); + pipeline.mFlushersInGroupsTotal + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL); + pipeline.mFlushersInEventsTotal + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL); + pipeline.mFlushersInSizeBytes + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES); + pipeline.mFlushersTotalPackageTimeMs + = pipeline.mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS); + { vector group; group.emplace_back(make_shared());