From e1019cddf8d011ee69c7429b955cac6b6435e1f5 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Wed, 23 Oct 2024 08:37:53 +0000 Subject: [PATCH 01/15] improve send log and adjust runner exit timeout --- core/checkpoint/RangeCheckpoint.h | 2 - core/config/PipelineConfig.cpp | 23 ++--------- core/pipeline/PipelineManager.cpp | 3 -- .../queue/ExactlyOnceQueueManager.cpp | 16 -------- core/pipeline/queue/ExactlyOnceQueueManager.h | 4 -- core/pipeline/queue/ProcessQueueManager.cpp | 17 --------- core/pipeline/queue/ProcessQueueManager.h | 4 -- core/plugin/flusher/sls/DiskBufferWriter.cpp | 23 +++++++---- .../inner/ProcessorSplitLogStringNative.cpp | 4 -- ...ProcessorSplitMultilineLogStringNative.cpp | 4 -- core/runner/FlusherRunner.cpp | 12 ++++-- core/runner/FlusherRunner.h | 4 +- core/runner/ProcessorRunner.cpp | 4 +- core/runner/sink/http/HttpSink.cpp | 38 +++++++++++++------ 14 files changed, 61 insertions(+), 97 deletions(-) diff --git a/core/checkpoint/RangeCheckpoint.h b/core/checkpoint/RangeCheckpoint.h index ad2f9cc4dd..30880dde6f 100644 --- a/core/checkpoint/RangeCheckpoint.h +++ b/core/checkpoint/RangeCheckpoint.h @@ -29,10 +29,8 @@ class RangeCheckpoint { std::string key; QueueKey fbKey; RangeCheckpointPB data; - std::vector> positions; inline void Prepare() { - positions.clear(); data.set_committed(false); save(); } diff --git a/core/config/PipelineConfig.cpp b/core/config/PipelineConfig.cpp index b44d05df87..534cc73bee 100644 --- a/core/config/PipelineConfig.cpp +++ b/core/config/PipelineConfig.cpp @@ -138,7 +138,6 @@ bool PipelineConfig::Parse() { // inputs, processors and flushers module must be parsed first and parsed by order, since aggregators and // extensions module parsing will rely on their results. - bool hasObserverInput = false; bool hasFileInput = false; key = "inputs"; itr = mDetail->find(key.c_str(), key.c_str() + key.size()); @@ -237,15 +236,12 @@ bool PipelineConfig::Parse() { } mInputs.push_back(&plugin); // TODO: remove these special restrictions - if (pluginType == "input_observer_network") { - hasObserverInput = true; - } else if (pluginType == "input_file" || pluginType == "input_container_stdio") { + if (pluginType == "input_file" || pluginType == "input_container_stdio") { hasFileInput = true; } } // TODO: remove these special restrictions - bool hasSpecialInput = hasObserverInput || hasFileInput; - if (hasSpecialInput && (*mDetail)["inputs"].size() > 1) { + if (hasFileInput && (*mDetail)["inputs"].size() > 1) { PARAM_ERROR_RETURN(sLogger, alarm, "more than 1 input_file or input_container_stdio plugin is given", @@ -331,7 +327,7 @@ bool PipelineConfig::Parse() { if (isCurrentPluginNative) { if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { // TODO: remove these special restrictions - if (!hasObserverInput && !hasFileInput) { + if (!hasFileInput) { PARAM_ERROR_RETURN(sLogger, alarm, "extended processor plugins coexist with native input plugins other " @@ -365,17 +361,6 @@ bool PipelineConfig::Parse() { mRegion); } } else { - // TODO: remove these special restrictions - if (hasObserverInput) { - PARAM_ERROR_RETURN(sLogger, - alarm, - "native processor plugins coexist with input_observer_network", - noModule, - mName, - mProject, - mLogstore, - mRegion); - } mHasNativeProcessor = true; } } else { @@ -470,7 +455,7 @@ bool PipelineConfig::Parse() { const string pluginType = it->asString(); if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { // TODO: remove these special restrictions - if (mHasNativeInput && !hasFileInput && !hasObserverInput) { + if (mHasNativeInput && !hasFileInput) { PARAM_ERROR_RETURN(sLogger, alarm, "extended flusher plugins coexist with native input plugins other than " diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index b683e20f79..da33c8bd20 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -200,9 +200,6 @@ void PipelineManager::StopAllPipelines() { LogtailPlugin::GetInstance()->StopAllPipelines(false); - // TODO: make it common - FlusherSLS::RecycleResourceIfNotUsed(); - // Sender should be stopped after profiling threads are stopped. LOG_INFO(sLogger, ("stop all pipelines", "succeeded")); } diff --git a/core/pipeline/queue/ExactlyOnceQueueManager.cpp b/core/pipeline/queue/ExactlyOnceQueueManager.cpp index 7446f6a9bc..35e8271e6b 100644 --- a/core/pipeline/queue/ExactlyOnceQueueManager.cpp +++ b/core/pipeline/queue/ExactlyOnceQueueManager.cpp @@ -263,22 +263,6 @@ void ExactlyOnceQueueManager::ClearTimeoutQueues() { } } -uint32_t ExactlyOnceQueueManager::GetInvalidProcessQueueCnt() const { - uint32_t res = 0; - lock_guard lock(mProcessQueueMux); - for (const auto& q : mProcessQueues) { - if (q.second->IsValidToPush()) { - ++res; - } - } - return res; -} - -uint32_t ExactlyOnceQueueManager::GetProcessQueueCnt() const { - lock_guard lock(mProcessQueueMux); - return mProcessQueues.size(); -} - void ExactlyOnceQueueManager::SetPipelineForSenderItems(QueueKey key, const std::shared_ptr& p) { lock_guard lock(mSenderQueueMux); auto iter = mSenderQueues.find(key); diff --git a/core/pipeline/queue/ExactlyOnceQueueManager.h b/core/pipeline/queue/ExactlyOnceQueueManager.h index d53be58bd0..6fb51ab0d3 100644 --- a/core/pipeline/queue/ExactlyOnceQueueManager.h +++ b/core/pipeline/queue/ExactlyOnceQueueManager.h @@ -71,10 +71,6 @@ class ExactlyOnceQueueManager { void ClearTimeoutQueues(); - // TODO: should be removed when self-telemetry is refactored - uint32_t GetInvalidProcessQueueCnt() const; - uint32_t GetProcessQueueCnt() const; - #ifdef APSARA_UNIT_TEST_MAIN void Clear(); #endif diff --git a/core/pipeline/queue/ProcessQueueManager.cpp b/core/pipeline/queue/ProcessQueueManager.cpp index a7e32324cc..a7b13dccfe 100644 --- a/core/pipeline/queue/ProcessQueueManager.cpp +++ b/core/pipeline/queue/ProcessQueueManager.cpp @@ -329,23 +329,6 @@ void ProcessQueueManager::ResetCurrentQueueIndex() { mCurrentQueueIndex.second = mPriorityQueue[0].begin(); } -uint32_t ProcessQueueManager::GetInvalidCnt() const { - uint32_t res = 0; - lock_guard lock(mQueueMux); - for (const auto& q : mQueues) { - if (q.second.second == QueueType::BOUNDED - && static_cast(q.second.first->get())->IsValidToPush()) { - ++res; - } - } - return res; -} - -uint32_t ProcessQueueManager::GetCnt() const { - lock_guard lock(mQueueMux); - return mQueues.size(); -} - #ifdef APSARA_UNIT_TEST_MAIN void ProcessQueueManager::Clear() { lock_guard lock(mQueueMux); diff --git a/core/pipeline/queue/ProcessQueueManager.h b/core/pipeline/queue/ProcessQueueManager.h index ab877a2c7d..a77a5769d2 100644 --- a/core/pipeline/queue/ProcessQueueManager.h +++ b/core/pipeline/queue/ProcessQueueManager.h @@ -68,10 +68,6 @@ class ProcessQueueManager : public FeedbackInterface { bool Wait(uint64_t ms); void Trigger(); - // TODO: should be removed when self-telemetry is refactored - uint32_t GetInvalidCnt() const; - uint32_t GetCnt() const; - private: ProcessQueueManager(); ~ProcessQueueManager() = default; diff --git a/core/plugin/flusher/sls/DiskBufferWriter.cpp b/core/plugin/flusher/sls/DiskBufferWriter.cpp index 3d7c86a338..6be224f86a 100644 --- a/core/plugin/flusher/sls/DiskBufferWriter.cpp +++ b/core/plugin/flusher/sls/DiskBufferWriter.cpp @@ -22,16 +22,16 @@ #include "common/FileSystemUtil.h" #include "common/RuntimeUtil.h" #include "common/StringTools.h" -#include "plugin/flusher/sls/FlusherSLS.h" -#include "plugin/flusher/sls/SLSClientManager.h" -#include "protobuf/sls/sls_logs.pb.h" #include "logger/Logger.h" #include "monitor/LogtailAlarm.h" -#include "provider/Provider.h" +#include "pipeline/limiter/RateLimiter.h" #include "pipeline/queue/QueueKeyManager.h" #include "pipeline/queue/SLSSenderQueueItem.h" +#include "plugin/flusher/sls/FlusherSLS.h" +#include "plugin/flusher/sls/SLSClientManager.h" +#include "protobuf/sls/sls_logs.pb.h" +#include "provider/Provider.h" #include "sdk/Exception.h" -#include "pipeline/limiter/RateLimiter.h" #include "sls_control/SLSControl.h" DEFINE_FLAG_INT32(write_secondary_wait_timeout, "interval of dump seconary buffer from memory to file, seconds", 2); @@ -68,7 +68,7 @@ void DiskBufferWriter::Stop() { } mStopCV.notify_one(); { - future_status s = mBufferWriterThreadRes.wait_for(chrono::seconds(3)); + future_status s = mBufferWriterThreadRes.wait_for(chrono::seconds(5)); if (s == future_status::ready) { LOG_INFO(sLogger, ("disk buffer writer", "stopped successfully")); } else { @@ -76,7 +76,8 @@ void DiskBufferWriter::Stop() { } } { - future_status s = mBufferSenderThreadRes.wait_for(chrono::seconds(1)); + // timeout should be larger than network timeout, which is 15 for now + future_status s = mBufferSenderThreadRes.wait_for(chrono::seconds(20)); if (s == future_status::ready) { LOG_INFO(sLogger, ("disk buffer sender", "stopped successfully")); } else { @@ -158,6 +159,7 @@ void DiskBufferWriter::BufferSenderThread() { } continue; } + lock.unlock(); // mIsSendingBuffer = true; int32_t fileToSendCount = int32_t(filesToSend.size()); int32_t bufferFileNumValue = AppConfig::GetInstance()->GetNumOfBufferFile(); @@ -197,6 +199,7 @@ void DiskBufferWriter::BufferSenderThread() { } } // mIsSendingBuffer = false; + lock.lock(); if (mStopCV.wait_for(lock, chrono::seconds(mCheckPeriod), [this]() { return !mIsSendBufferThreadRunning; })) { break; } @@ -509,6 +512,12 @@ void DiskBufferWriter::SendEncryptionBuffer(const std::string& filename, int32_t filename); if (!sendResult) writeBack = true; + { + lock_guard lock(mBufferSenderThreadRunningMux); + if (!mIsSendBufferThreadRunning) { + return; + } + } } if (!writeBack) { remove(filename.c_str()); diff --git a/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp b/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp index 35716caf59..38adf9571f 100644 --- a/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp +++ b/core/plugin/processor/inner/ProcessorSplitLogStringNative.cpp @@ -148,10 +148,6 @@ void ProcessorSplitLogStringNative::ProcessEvent(PipelineEventGroup& logGroup, StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); } - // TODO: remove the following code after the flusher refactorization - if (logGroup.GetExactlyOnceCheckpoint() != nullptr) { - logGroup.GetExactlyOnceCheckpoint()->positions.emplace_back(offset, content.size()); - } newEvents.emplace_back(std::move(targetEvent)); begin += content.size() + 1; } diff --git a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp index c5970b6dc2..cfd178e0fe 100644 --- a/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp +++ b/core/plugin/processor/inner/ProcessorSplitMultilineLogStringNative.cpp @@ -317,10 +317,6 @@ void ProcessorSplitMultilineLogStringNative::CreateNewEvent(const StringView& co StringBuffer offsetStr = logGroup.GetSourceBuffer()->CopyString(ToString(offset)); targetEvent->SetContentNoCopy(LOG_RESERVED_KEY_FILE_OFFSET, StringView(offsetStr.data, offsetStr.size)); } - // TODO: remove the following code after the flusher refactorization - if (logGroup.GetExactlyOnceCheckpoint() != nullptr) { - logGroup.GetExactlyOnceCheckpoint()->positions.emplace_back(offset, content.size()); - } newEvents.emplace_back(std::move(targetEvent)); } diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 1723cca935..06d9dcccf5 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -36,6 +36,7 @@ using namespace std; DEFINE_FLAG_INT32(check_send_client_timeout_interval, "", 600); DEFINE_FLAG_BOOL(enable_flow_control, "if enable flow control", true); DEFINE_FLAG_BOOL(enable_send_tps_smoothing, "avoid web server load burst", true); +DEFINE_FLAG_INT32(default_flusher_runner_exit_wait_time_secs, "", 60); static const int SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND = 3; @@ -76,8 +77,8 @@ bool FlusherRunner::LoadModuleConfig(bool isInit) { if (isInit) { // Only handle parameters that do not allow hot loading } - auto maxBytePerSec = AppConfig::GetInstance()->MergeInt32(kDefaultMaxSendBytePerSec, - AppConfig::GetInstance()->GetMaxBytePerSec(), "max_bytes_per_sec", ValidateFn); + auto maxBytePerSec = AppConfig::GetInstance()->MergeInt32( + kDefaultMaxSendBytePerSec, AppConfig::GetInstance()->GetMaxBytePerSec(), "max_bytes_per_sec", ValidateFn); AppConfig::GetInstance()->SetMaxBytePerSec(maxBytePerSec); UpdateSendFlowControl(); return true; @@ -103,7 +104,7 @@ void FlusherRunner::UpdateSendFlowControl() { void FlusherRunner::Stop() { mIsFlush = true; SenderQueueManager::GetInstance()->Trigger(); - future_status s = mThreadRes.wait_for(chrono::seconds(10)); + future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(default_flusher_runner_exit_wait_time_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("flusher runner", "stopped successfully")); } else { @@ -147,6 +148,10 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) { req->mEnqueTime = item->mLastSendTime; HttpSink::GetInstance()->AddRequest(std::move(req)); ++mHttpSendingCnt; + LOG_DEBUG(sLogger, + ("send item to http sink, item address", item)("config-flusher-dst", + QueueKeyManager::GetInstance()->GetName(item->mQueueKey))( + "sending cnt", ToString(mHttpSendingCnt.load()))); } void FlusherRunner::Run() { @@ -162,6 +167,7 @@ void FlusherRunner::Run() { if (items.empty()) { SenderQueueManager::GetInstance()->Wait(1000); } else { + LOG_DEBUG(sLogger, ("got items from sender queue, cnt", items.size())); for (auto itr = items.begin(); itr != items.end(); ++itr) { mInItemDataSizeBytes->Add((*itr)->mData.size()); mInItemRawDataSizeBytes->Add((*itr)->mRawSize); diff --git a/core/runner/FlusherRunner.h b/core/runner/FlusherRunner.h index 1ce797868a..4c25d1e350 100644 --- a/core/runner/FlusherRunner.h +++ b/core/runner/FlusherRunner.h @@ -45,7 +45,7 @@ class FlusherRunner { // TODO: should be private void PushToHttpSink(SenderQueueItem* item, bool withLimit = true); - int32_t GetSendingBufferCount() { return mHttpSendingCnt; } + int32_t GetSendingBufferCount() { return mHttpSendingCnt.load(); } bool LoadModuleConfig(bool isInit); @@ -62,7 +62,7 @@ class FlusherRunner { std::future mThreadRes; std::atomic_bool mIsFlush = false; - std::atomic_int mHttpSendingCnt{0}; + std::atomic_int32_t mHttpSendingCnt{0}; // TODO: temporarily here int32_t mLastCheckSendClientTime = 0; diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index 7cf1d50836..50b3d85392 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -39,6 +39,7 @@ using namespace std; DEFINE_FLAG_BOOL(enable_chinese_tag_path, "Enable Chinese __tag__.__path__", true); #endif DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1); +DEFINE_FLAG_INT32(default_processor_runner_exit_wait_time_secs, "", 60); namespace logtail { @@ -62,7 +63,8 @@ void ProcessorRunner::Stop() { mIsFlush = true; ProcessQueueManager::GetInstance()->Trigger(); for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) { - future_status s = mThreadRes[threadNo].wait_for(chrono::seconds(1)); + future_status s + = mThreadRes[threadNo].wait_for(chrono::seconds(INT32_FLAG(default_processor_runner_exit_wait_time_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("processor runner", "stopped successfully")("threadNo", threadNo)); } else { diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 032ee441a2..3457bc8d36 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -15,6 +15,7 @@ #include "runner/sink/http/HttpSink.h" #include "app_config/AppConfig.h" +#include "common/Flags.h" #include "common/StringTools.h" #include "common/http/Curl.h" #include "logger/Logger.h" @@ -24,6 +25,8 @@ #include "pipeline/queue/SenderQueueItem.h" #include "runner/FlusherRunner.h" +DEFINE_FLAG_INT32(default_http_sink_exit_wait_time_secs, "", 5); + using namespace std; namespace logtail { @@ -43,8 +46,10 @@ bool HttpSink::Init() { mLastRunTime = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_LAST_RUN_TIME); mOutSuccessfulItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_OUT_SUCCESSFUL_ITEMS_TOTAL); mOutFailedItemsTotal = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_OUT_FAILED_ITEMS_TOTAL); - mSuccessfulItemTotalResponseTimeMs = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_SUCCESSFUL_ITEM_TOTAL_RESPONSE_TIME_MS); - mFailedItemTotalResponseTimeMs = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_FAILED_ITEM_TOTAL_RESPONSE_TIME_MS); + mSuccessfulItemTotalResponseTimeMs + = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_SUCCESSFUL_ITEM_TOTAL_RESPONSE_TIME_MS); + mFailedItemTotalResponseTimeMs + = mMetricsRecordRef.CreateCounter(METRIC_RUNNER_SINK_FAILED_ITEM_TOTAL_RESPONSE_TIME_MS); mSendingItemsTotal = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_SINK_SENDING_ITEMS_TOTAL); mSendConcurrency = mMetricsRecordRef.CreateIntGauge(METRIC_RUNNER_SINK_SEND_CONCURRENCY); @@ -57,7 +62,7 @@ bool HttpSink::Init() { void HttpSink::Stop() { mIsFlush = true; - future_status s = mThreadRes.wait_for(chrono::seconds(1)); + future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(default_http_sink_exit_wait_time_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("http sink", "stopped successfully")); } else { @@ -117,7 +122,8 @@ bool HttpSink::AddRequestToClient(unique_ptr&& request) { LOG_ERROR(sLogger, ("failed to send request", "failed to init curl handler")( "action", "put sender queue item back to sender queue")("item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))); + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); return false; } @@ -135,7 +141,8 @@ bool HttpSink::AddRequestToClient(unique_ptr&& request) { ("failed to send request", "failed to add the easy curl handle to multi_handle")("errMsg", curl_multi_strerror(res))( "action", "put sender queue item back to sender queue")("item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))); + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); return false; } // let sink destruct the request @@ -222,10 +229,6 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { auto responseTime = chrono::duration_cast(chrono::system_clock::now() - request->mLastSendTime) .count(); - LOG_DEBUG(sLogger, - ("send http request completed, item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( - "response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt))); switch (msg->data.result) { case CURLE_OK: { long statusCode = 0; @@ -236,6 +239,12 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { mOutSuccessfulItemsTotal->Add(1); mSuccessfulItemTotalResponseTimeMs->Add(responseTime); mSendingItemsTotal->Sub(1); + LOG_DEBUG( + sLogger, + ("send http request completed, item address", request->mItem)( + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); break; } default: @@ -244,9 +253,9 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { LOG_WARNING( sLogger, ("failed to send request", "retry immediately")("item address", request->mItem)( - "try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(msg->data.result))( "config-flusher-dst", - QueueKeyManager::GetInstance()->GetName(request->mItem->mFlusher->GetQueueKey()))); + QueueKeyManager::GetInstance()->GetName(request->mItem->mFlusher->GetQueueKey()))( + "try cnt", request->mTryCnt - 1)("errMsg", curl_easy_strerror(msg->data.result))); // free first,becase mPrivateData will be reset in AddRequestToClient if (request->mPrivateData) { curl_slist_free_all((curl_slist*)request->mPrivateData); @@ -259,6 +268,13 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { static_cast(request->mItem->mFlusher) ->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); + LOG_DEBUG(sLogger, + ("send http request completed, item address", + request->mItem)("config-flusher-dst", + QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "response time", ToString(responseTime) + "ms")("try cnt", + ToString(request->mTryCnt - 1))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); } mOutFailedItemsTotal->Add(1); mFailedItemTotalResponseTimeMs->Add(responseTime); From 46c884191c84ef8fa6f9e95471fb1b32b77e6375 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Thu, 24 Oct 2024 05:38:56 +0000 Subject: [PATCH 02/15] polish --- core/monitor/metric_constants/MetricConstants.h | 5 +++++ core/monitor/metric_constants/PipelineMetrics.cpp | 4 ++++ core/monitor/metric_constants/PluginMetrics.cpp | 1 + core/pipeline/Pipeline.cpp | 12 ++++++++++++ core/pipeline/Pipeline.h | 4 ++++ core/pipeline/plugin/instance/FlusherInstance.cpp | 12 +++++++++++- core/pipeline/plugin/instance/FlusherInstance.h | 2 ++ 7 files changed, 39 insertions(+), 1 deletion(-) diff --git a/core/monitor/metric_constants/MetricConstants.h b/core/monitor/metric_constants/MetricConstants.h index b3650a3359..46da86ceba 100644 --- a/core/monitor/metric_constants/MetricConstants.h +++ b/core/monitor/metric_constants/MetricConstants.h @@ -66,6 +66,10 @@ extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL; extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL; extern const std::string METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES; extern const std::string METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS; +extern const std::string METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL; +extern const std::string METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL; +extern const std::string METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES; +extern const std::string METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS; extern const std::string METRIC_PIPELINE_START_TIME; ////////////////////////////////////////////////////////////////////////// @@ -125,6 +129,7 @@ extern const std::string METRIC_PLUGIN_OUT_SUCCESSFUL_EVENTS_TOTAL; /********************************************************** * all flusher (所有发送插件通用指标) **********************************************************/ +extern const std::string METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS; extern const std::string METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL; extern const std::string METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL; extern const std::string METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL; diff --git a/core/monitor/metric_constants/PipelineMetrics.cpp b/core/monitor/metric_constants/PipelineMetrics.cpp index 7f1bf75e3a..85fd7f3e8f 100644 --- a/core/monitor/metric_constants/PipelineMetrics.cpp +++ b/core/monitor/metric_constants/PipelineMetrics.cpp @@ -31,6 +31,10 @@ const string METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL = "pipeline_processors_i const string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL = "pipeline_processors_in_event_groups_total"; const string METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES = "pipeline_processors_in_size_bytes"; const string METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS = "pipeline_processors_total_process_time_ms"; +const string METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL = "pipeline_flushers_in_events_total"; +const string METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL = "pipeline_flushers_in_event_groups_total"; +const string METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES = "pipeline_flushers_in_size_bytes"; +const string METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS = "pipeline_flushers_total_package_time_ms"; const string METRIC_PIPELINE_START_TIME = "pipeline_start_time"; } // namespace logtail diff --git a/core/monitor/metric_constants/PluginMetrics.cpp b/core/monitor/metric_constants/PluginMetrics.cpp index 47499ca957..1a7923531d 100644 --- a/core/monitor/metric_constants/PluginMetrics.cpp +++ b/core/monitor/metric_constants/PluginMetrics.cpp @@ -100,6 +100,7 @@ const string METRIC_PLUGIN_PARSE_STDOUT_TOTAL = "plugin_parse_stdout_total"; /********************************************************** * all flusher (所有发送插件通用指标) **********************************************************/ +const string METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS = "plugin_flusher_total_package_time_ms"; const string METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL = "plugin_flusher_send_total"; const string METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL = "plugin_flusher_send_done_total"; const string METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL = "plugin_flusher_success_total"; diff --git a/core/pipeline/Pipeline.cpp b/core/pipeline/Pipeline.cpp index 96d2b4746c..5bdc918047 100644 --- a/core/pipeline/Pipeline.cpp +++ b/core/pipeline/Pipeline.cpp @@ -327,6 +327,10 @@ bool Pipeline::Init(PipelineConfig&& config) { mProcessorsInSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES); mProcessorsTotalProcessTimeMs = mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS); + mFlushersInGroupsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL); + mFlushersInEventsTotal = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL); + mFlushersInSizeBytes = mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES); + mFlushersTotalPackageTimeMs = mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS); return true; } @@ -375,6 +379,13 @@ void Pipeline::Process(vector& logGroupList, size_t inputInd } bool Pipeline::Send(vector&& groupList) { + for (const auto& group : groupList) { + mFlushersInEventsTotal->Add(group.GetEvents().size()); + mFlushersInSizeBytes->Add(group.DataSize()); + } + mFlushersInGroupsTotal->Add(groupList.size()); + + auto before = chrono::system_clock::now(); bool allSucceeded = true; for (auto& group : groupList) { auto flusherIdx = mRouter.Route(group); @@ -393,6 +404,7 @@ bool Pipeline::Send(vector&& groupList) { } } } + mFlushersTotalPackageTimeMs->Add(chrono::system_clock::now() - before); return allSucceeded; } diff --git a/core/pipeline/Pipeline.h b/core/pipeline/Pipeline.h index 8ccecfef7b..970b15d022 100644 --- a/core/pipeline/Pipeline.h +++ b/core/pipeline/Pipeline.h @@ -109,6 +109,10 @@ class Pipeline { CounterPtr mProcessorsInGroupsTotal; CounterPtr mProcessorsInSizeBytes; TimeCounterPtr mProcessorsTotalProcessTimeMs; + CounterPtr mFlushersInGroupsTotal; + CounterPtr mFlushersInEventsTotal; + CounterPtr mFlushersInSizeBytes; + TimeCounterPtr mFlushersTotalPackageTimeMs; #ifdef APSARA_UNIT_TEST_MAIN friend class PipelineMock; diff --git a/core/pipeline/plugin/instance/FlusherInstance.cpp b/core/pipeline/plugin/instance/FlusherInstance.cpp index 04ddd12b06..38cb9dd3b8 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.cpp +++ b/core/pipeline/plugin/instance/FlusherInstance.cpp @@ -16,7 +16,10 @@ #include "monitor/metric_constants/MetricConstants.h" +using namespace std; + namespace logtail { + bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, Json::Value& optionalGoPipeline) { mPlugin->SetContext(context); mPlugin->SetPluginID(PluginID()); @@ -25,15 +28,22 @@ bool FlusherInstance::Init(const Json::Value& config, PipelineContext& context, return false; } + mInGroupsTotal = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_EVENT_GROUPS_TOTAL); mInEventsTotal = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_EVENTS_TOTAL); mInSizeBytes = mPlugin->GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_IN_SIZE_BYTES); + mTotalPackageTimeMs = mPlugin->GetMetricsRecordRef().CreateTimeCounter(METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS); return true; } bool FlusherInstance::Send(PipelineEventGroup&& g) { + mInGroupsTotal->Add(1); mInEventsTotal->Add(g.GetEvents().size()); mInSizeBytes->Add(g.DataSize()); - return mPlugin->Send(std::move(g)); + + auto before = chrono::system_clock::now(); + auto res = mPlugin->Send(std::move(g)); + mTotalPackageTimeMs->Add(chrono::system_clock::now() - before); + return res; } } // namespace logtail diff --git a/core/pipeline/plugin/instance/FlusherInstance.h b/core/pipeline/plugin/instance/FlusherInstance.h index 6ce367051d..69bbba3db2 100644 --- a/core/pipeline/plugin/instance/FlusherInstance.h +++ b/core/pipeline/plugin/instance/FlusherInstance.h @@ -46,8 +46,10 @@ class FlusherInstance : public PluginInstance { private: std::unique_ptr mPlugin; + CounterPtr mInGroupsTotal; CounterPtr mInEventsTotal; CounterPtr mInSizeBytes; + TimeCounterPtr mTotalPackageTimeMs; }; } // namespace logtail From af2cb0b3bbb10c65ff3e88324137e3503d6c9a8d Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Thu, 24 Oct 2024 05:40:43 +0000 Subject: [PATCH 03/15] polish --- core/config/common_provider/CommonConfigProvider.cpp | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/core/config/common_provider/CommonConfigProvider.cpp b/core/config/common_provider/CommonConfigProvider.cpp index cc62335208..52186d411d 100644 --- a/core/config/common_provider/CommonConfigProvider.cpp +++ b/core/config/common_provider/CommonConfigProvider.cpp @@ -45,7 +45,7 @@ namespace logtail { std::string CommonConfigProvider::configVersion = "version"; void CommonConfigProvider::Init(const string& dir) { - sName = "CommonConfigProvider"; + sName = "common config provider"; ConfigProvider::Init(dir); LoadConfigFile(); From ab5ec41903004432ab0538407a146e04dfb2e84f Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Thu, 24 Oct 2024 05:54:19 +0000 Subject: [PATCH 04/15] polish --- core/CMakeLists.txt | 1 - .../InstanceConfigManager.cpp | 0 .../InstanceConfigManager.h | 0 core/deps/README.md | 17 -- core/deps/build.sh | 157 ------------------ 5 files changed, 175 deletions(-) rename core/{instance_config => config}/InstanceConfigManager.cpp (100%) rename core/{instance_config => config}/InstanceConfigManager.h (100%) delete mode 100644 core/deps/README.md delete mode 100755 core/deps/build.sh diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index fbe3e45714..bc4b500036 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -115,7 +115,6 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake) set(SUB_DIRECTORIES_LIST application app_config checkpoint container_manager logger go_pipeline monitor monitor/metric_constants profile_sender models config config/watcher - instance_config metadata pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer runner runner/sink/http protobuf/sls protobuf/models diff --git a/core/instance_config/InstanceConfigManager.cpp b/core/config/InstanceConfigManager.cpp similarity index 100% rename from core/instance_config/InstanceConfigManager.cpp rename to core/config/InstanceConfigManager.cpp diff --git a/core/instance_config/InstanceConfigManager.h b/core/config/InstanceConfigManager.h similarity index 100% rename from core/instance_config/InstanceConfigManager.h rename to core/config/InstanceConfigManager.h diff --git a/core/deps/README.md b/core/deps/README.md deleted file mode 100644 index 72da636884..0000000000 --- a/core/deps/README.md +++ /dev/null @@ -1,17 +0,0 @@ -# Dependencies - -- [gflags](https://github.com/gflags/gflags): v2.2.1 -- [spdlog](https://github.com/gabime/spdlog/): v1.1.0 -- [jsoncpp](https://github.com/open-source-parsers/jsoncpp): v1.8.4 -- [protobuf](https://github.com/protocolbuffers/protobuf): v2.5.0 -- [libunwind](https://www.nongnu.org/libunwind/): v1.2.1 -- [re2](https://github.com/google/re2): release/2018-09-01 -- [boost](https://www.boost.org/): v1.6.8 -- [rapidjson](https://github.com/Tencent/rapidjson/commit/c0ca05f6ddb690c943a7fc59efab9ca9cfc39736): latest -- [cityhash](https://github.com/google/cityhash/commit/8af9b8c2b889d80c22d6bc26ba0df1afb79a30db): latest -- [liblz4](https://github.com/lz4/lz4): v1.8.3 -- [zlib](https://zlib.net/): v1.2.11 -- libuuid - - get_dmi_uuid -- libpthread -- libdl diff --git a/core/deps/build.sh b/core/deps/build.sh deleted file mode 100755 index 76878ea89b..0000000000 --- a/core/deps/build.sh +++ /dev/null @@ -1,157 +0,0 @@ -#!/bin/bash -# Copyright 2022 iLogtail Authors -# -# Licensed under the Apache License, Version 2.0 (the "License"); -# you may not use this file except in compliance with the License. -# You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. - - -DESTINATION=/opt/logtail/deps -LIB_DIR=$DESTINATION/lib -LIB64_DIR=$DESTINATION/lib64 - -enter_cmake_build_dir() -{ - # rm -rf build-static - mkdir build-static - cd build-static -} - -remove_old_lib() -{ - sudo rm -f $LIB_DIR/$1* - sudo rm -f $LIB64_DIR/$1* -} - -# gtest -remove_old_lib libgtest -remove_old_lib libgmock -rm -rf googletest-release-1.8.1 -tar xf gtest-1.8.1.tar.gz -cd googletest-release-1.8.1 -enter_cmake_build_dir -cmake -DCMAKE_INSTALL_PREFIX=$DESTINATION -DCMAKE_BUILD_TYPE=Release .. -make -j24 -sudo make install -sudo ln -s $LIB64_DIR/libgtest.a $LIB_DIR/libgtest.a -cd ../.. - -# protobuf -remove_old_lib libprotobuf -rm -rf protobuf-3.6.1 -tar xf protobuf-cpp-3.6.1.tar.gz -cd protobuf-3.6.1 -ln -s `pwd`/../googletest-release-1.8.1 gtest -./configure --prefix=$DESTINATION --disable-shared --enable-static -make -j24 -sudo make install -cd .. - -# re2 -remove_old_lib libre2 -rm -rf re2-2018-09-01 -tar xf re2-2018-09-01.tar.gz -cd re2-2018-09-01 -enter_cmake_build_dir -cmake -DCMAKE_INSTALL_PREFIX=$DESTINATION -DCMAKE_BUILD_TYPE=Release .. -make -j24 -sudo make install -cd ../.. - -# tcmalloc -remove_old_lib libtcmalloc -rm -rf gperftools-gperftools-2.7 -tar xf gperftools-2.7.tar.gz -cd gperftools-gperftools-2.7 -./autogen.sh -./configure --prefix=$DESTINATION --disable-shared --enable-static -make -j24 -sudo make install -cd .. - -# cityhash -remove_old_lib libcityhash -cd cityhash -sudo make clean -./configure --prefix=$DESTINATION --disable-shared --enable-static -make -j24 -sudo make install -cd .. - -# gflags -remove_old_lib libgflags -rm -rf gflags-2.2.1 -tar xf gflags-2.2.1.tar.gz -cd gflags-2.2.1 -enter_cmake_build_dir -cmake -DCMAKE_INSTALL_PREFIX=$DESTINATION -DCMAKE_BUILD_TYPE=Release .. -make -j24 -sudo make install -cd ../.. - -# jsoncpp -remove_old_lib libjsoncpp -rm -rf jsoncpp-1.8.4 -tar xf jsoncpp-1.8.4.tar.gz -cd jsoncpp-1.8.4 -enter_cmake_build_dir -cmake -DCMAKE_INSTALL_PREFIX=$DESTINATION -DCMAKE_BUILD_TYPE=Release .. -make -j24 -sudo make install -sudo ln -s $LIB64_DIR/libjsoncpp.a $LIB_DIR/libjsoncpp.a -cd ../../ - -# boost -remove_old_lib libboost -rm -rf boost_1_68_0 -tar xf boost_1_68_0.tar.gz -cd boost_1_68_0 -./bootstrap.sh --prefix=$DESTINATION -sudo env PATH=$PATH ./b2 install -j24 -cd .. - -# lz4 -remove_old_lib liblz4 -rm -rf lz4-1.8.3 -tar xf lz4-1.8.3.tar.gz -cd lz4-1.8.3 -make -j24 -sudo make prefix=$DESTINATION install -cd .. - -# zlib -remove_old_lib libz -rm -rf zlib-1.2.11 -tar xf zlib-1.2.11.tar.gz -cd zlib-1.2.11 -./configure --prefix=$DESTINATION --static -make -j24 -sudo make install -cd .. - -# curl -remove_old_lib libcurl -rm -rf curl-7.61.1 -tar xf curl-7.61.1.tar.gz -cd curl-7.61.1 -./configure --prefix=$DESTINATION --disable-shared --enable-static CFLAGS="-std=c90 -D_POSIX_C_SOURCE -Werror=implicit-function-declaration" -make -j24 -sudo make install -cd .. - -# unwind -remove_old_lib libunwind -cd libunwind-1.2.1 -sudo make clean -./autogen.sh -./configure --prefix=$DESTINATION --disable-shared --enable-static -make -j24 -sudo make install From 8fd8bcc88a74b66daf8901071446172efacf0f0d Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Thu, 24 Oct 2024 05:56:05 +0000 Subject: [PATCH 05/15] polish --- core/CMakeLists.txt | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/CMakeLists.txt b/core/CMakeLists.txt index bc4b500036..4e806fde7b 100644 --- a/core/CMakeLists.txt +++ b/core/CMakeLists.txt @@ -113,9 +113,9 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake) # Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider. set(SUB_DIRECTORIES_LIST - application app_config checkpoint container_manager logger go_pipeline monitor monitor/metric_constants profile_sender models + application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants profile_sender models config config/watcher - metadata pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer + pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer runner runner/sink/http protobuf/sls protobuf/models file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling From 8db98ceae9d69367502936f76f18601bf5adfb65 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Thu, 24 Oct 2024 05:59:50 +0000 Subject: [PATCH 06/15] polish --- core/runner/FlusherRunner.cpp | 4 ++-- core/runner/ProcessorRunner.cpp | 4 ++-- core/runner/sink/http/HttpSink.cpp | 4 ++-- 3 files changed, 6 insertions(+), 6 deletions(-) diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 06d9dcccf5..4a8a3d42ea 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -36,7 +36,7 @@ using namespace std; DEFINE_FLAG_INT32(check_send_client_timeout_interval, "", 600); DEFINE_FLAG_BOOL(enable_flow_control, "if enable flow control", true); DEFINE_FLAG_BOOL(enable_send_tps_smoothing, "avoid web server load burst", true); -DEFINE_FLAG_INT32(default_flusher_runner_exit_wait_time_secs, "", 60); +DEFINE_FLAG_INT32(flusher_runner_exit_timeout_secs, "", 60); static const int SEND_BLOCK_COST_TIME_ALARM_INTERVAL_SECOND = 3; @@ -104,7 +104,7 @@ void FlusherRunner::UpdateSendFlowControl() { void FlusherRunner::Stop() { mIsFlush = true; SenderQueueManager::GetInstance()->Trigger(); - future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(default_flusher_runner_exit_wait_time_secs))); + future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(flusher_runner_exit_timeout_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("flusher runner", "stopped successfully")); } else { diff --git a/core/runner/ProcessorRunner.cpp b/core/runner/ProcessorRunner.cpp index 50b3d85392..d7086a0fff 100644 --- a/core/runner/ProcessorRunner.cpp +++ b/core/runner/ProcessorRunner.cpp @@ -39,7 +39,7 @@ using namespace std; DEFINE_FLAG_BOOL(enable_chinese_tag_path, "Enable Chinese __tag__.__path__", true); #endif DEFINE_FLAG_INT32(default_flush_merged_buffer_interval, "default flush merged buffer, seconds", 1); -DEFINE_FLAG_INT32(default_processor_runner_exit_wait_time_secs, "", 60); +DEFINE_FLAG_INT32(processor_runner_exit_timeout_secs, "", 60); namespace logtail { @@ -64,7 +64,7 @@ void ProcessorRunner::Stop() { ProcessQueueManager::GetInstance()->Trigger(); for (uint32_t threadNo = 0; threadNo < mThreadCount; ++threadNo) { future_status s - = mThreadRes[threadNo].wait_for(chrono::seconds(INT32_FLAG(default_processor_runner_exit_wait_time_secs))); + = mThreadRes[threadNo].wait_for(chrono::seconds(INT32_FLAG(processor_runner_exit_timeout_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("processor runner", "stopped successfully")("threadNo", threadNo)); } else { diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index 3457bc8d36..cc6ab64f0d 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -25,7 +25,7 @@ #include "pipeline/queue/SenderQueueItem.h" #include "runner/FlusherRunner.h" -DEFINE_FLAG_INT32(default_http_sink_exit_wait_time_secs, "", 5); +DEFINE_FLAG_INT32(http_sink_exit_timeout_secs, "", 5); using namespace std; @@ -62,7 +62,7 @@ bool HttpSink::Init() { void HttpSink::Stop() { mIsFlush = true; - future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(default_http_sink_exit_wait_time_secs))); + future_status s = mThreadRes.wait_for(chrono::seconds(INT32_FLAG(http_sink_exit_timeout_secs))); if (s == future_status::ready) { LOG_INFO(sLogger, ("http sink", "stopped successfully")); } else { From bccc857d7be0976ecef71078669781c401eb7371 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 01:43:55 +0000 Subject: [PATCH 07/15] polish --- core/config/PipelineConfig.cpp | 23 +++++++++++++++++++---- 1 file changed, 19 insertions(+), 4 deletions(-) diff --git a/core/config/PipelineConfig.cpp b/core/config/PipelineConfig.cpp index 534cc73bee..b44d05df87 100644 --- a/core/config/PipelineConfig.cpp +++ b/core/config/PipelineConfig.cpp @@ -138,6 +138,7 @@ bool PipelineConfig::Parse() { // inputs, processors and flushers module must be parsed first and parsed by order, since aggregators and // extensions module parsing will rely on their results. + bool hasObserverInput = false; bool hasFileInput = false; key = "inputs"; itr = mDetail->find(key.c_str(), key.c_str() + key.size()); @@ -236,12 +237,15 @@ bool PipelineConfig::Parse() { } mInputs.push_back(&plugin); // TODO: remove these special restrictions - if (pluginType == "input_file" || pluginType == "input_container_stdio") { + if (pluginType == "input_observer_network") { + hasObserverInput = true; + } else if (pluginType == "input_file" || pluginType == "input_container_stdio") { hasFileInput = true; } } // TODO: remove these special restrictions - if (hasFileInput && (*mDetail)["inputs"].size() > 1) { + bool hasSpecialInput = hasObserverInput || hasFileInput; + if (hasSpecialInput && (*mDetail)["inputs"].size() > 1) { PARAM_ERROR_RETURN(sLogger, alarm, "more than 1 input_file or input_container_stdio plugin is given", @@ -327,7 +331,7 @@ bool PipelineConfig::Parse() { if (isCurrentPluginNative) { if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { // TODO: remove these special restrictions - if (!hasFileInput) { + if (!hasObserverInput && !hasFileInput) { PARAM_ERROR_RETURN(sLogger, alarm, "extended processor plugins coexist with native input plugins other " @@ -361,6 +365,17 @@ bool PipelineConfig::Parse() { mRegion); } } else { + // TODO: remove these special restrictions + if (hasObserverInput) { + PARAM_ERROR_RETURN(sLogger, + alarm, + "native processor plugins coexist with input_observer_network", + noModule, + mName, + mProject, + mLogstore, + mRegion); + } mHasNativeProcessor = true; } } else { @@ -455,7 +470,7 @@ bool PipelineConfig::Parse() { const string pluginType = it->asString(); if (PluginRegistry::GetInstance()->IsValidGoPlugin(pluginType)) { // TODO: remove these special restrictions - if (mHasNativeInput && !hasFileInput) { + if (mHasNativeInput && !hasFileInput && !hasObserverInput) { PARAM_ERROR_RETURN(sLogger, alarm, "extended flusher plugins coexist with native input plugins other than " From e29303c39cb94d12245cbfe16f867d62afb8e138 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 02:14:18 +0000 Subject: [PATCH 08/15] polish --- core/monitor/Monitor.cpp | 1 - core/plugin/flusher/sls/SLSClientManager.cpp | 10 ++++++---- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/monitor/Monitor.cpp b/core/monitor/Monitor.cpp index 774ba97352..aab591a171 100644 --- a/core/monitor/Monitor.cpp +++ b/core/monitor/Monitor.cpp @@ -724,7 +724,6 @@ void LoongCollectorMonitor::Init() { mAgentGoRoutinesTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_GO_ROUTINES_TOTAL); mAgentOpenFdTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_OPEN_FD_TOTAL); mAgentConfigTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL); - LOG_INFO(sLogger, ("LoongCollectorMonitor", "started")); } void LoongCollectorMonitor::Stop() { diff --git a/core/plugin/flusher/sls/SLSClientManager.cpp b/core/plugin/flusher/sls/SLSClientManager.cpp index 5f04a51947..059ca52d55 100644 --- a/core/plugin/flusher/sls/SLSClientManager.cpp +++ b/core/plugin/flusher/sls/SLSClientManager.cpp @@ -145,17 +145,17 @@ void SLSClientManager::Stop() { if (mDataServerSwitchPolicy == EndpointSwitchPolicy::DESIGNATED_FIRST) { future_status s = mProbeNetworkThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { - LOG_INFO(sLogger, ("test endpoint thread", "stopped successfully")); + LOG_INFO(sLogger, ("sls endpoint probe", "stopped successfully")); } else { - LOG_WARNING(sLogger, ("test endpoint thread", "forced to stopped")); + LOG_WARNING(sLogger, ("sls endpoint probe", "forced to stopped")); } } if (BOOL_FLAG(send_prefer_real_ip)) { future_status s = mUpdateRealIpThreadRes.wait_for(chrono::seconds(1)); if (s == future_status::ready) { - LOG_INFO(sLogger, ("update real ip thread", "stopped successfully")); + LOG_INFO(sLogger, ("sls real ip update", "stopped successfully")); } else { - LOG_WARNING(sLogger, ("update real ip thread", "forced to stopped")); + LOG_WARNING(sLogger, ("sls real ip update", "forced to stopped")); } } } @@ -373,6 +373,7 @@ bool SLSClientManager::HasNetworkAvailable() { } void SLSClientManager::ProbeNetworkThread() { + LOG_INFO(sLogger, ("sls endpoint probe", "started")); // pair represents the weight of each endpoint map>> unavaliableEndpoints; set unavaliableRegions; @@ -527,6 +528,7 @@ void SLSClientManager::UpdateSendClientRealIp(sdk::Client* client, const string& } void SLSClientManager::UpdateRealIpThread() { + LOG_INFO(sLogger, ("sls real ip update", "started")); int32_t lastUpdateRealIpTime = 0; vector regionEndpointArray; vector regionArray; From aece05ee3191be8a0cc85f50a71a1bb5a29685df Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 02:28:16 +0000 Subject: [PATCH 09/15] polish --- core/application/Application.cpp | 4 ++-- core/config/InstanceConfigManager.cpp | 2 +- core/unittest/config/CommonConfigProviderUnittest.cpp | 2 +- .../instance_config/InstanceConfigManagerUnittest.cpp | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/application/Application.cpp b/core/application/Application.cpp index 1f62c281e8..13ccd584e2 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -31,6 +31,7 @@ #include "common/UUIDUtil.h" #include "common/version.h" #include "config/ConfigDiff.h" +#include "config/InstanceConfigManager.h" #include "config/watcher/ConfigWatcher.h" #include "config/watcher/InstanceConfigWatcher.h" #include "file_server/ConfigManager.h" @@ -38,7 +39,6 @@ #include "file_server/FileServer.h" #include "file_server/event_handler/LogInput.h" #include "go_pipeline/LogtailPlugin.h" -#include "instance_config/InstanceConfigManager.h" #include "logger/Logger.h" #include "monitor/LogFileProfiler.h" #include "monitor/MetricExportor.h" @@ -192,7 +192,7 @@ void Application::Init() { } void Application::Start() { // GCOVR_EXCL_START - LogFileProfiler::mStartTime = GetTimeStamp(time(NULL), "%Y-%m-%d %H:%M:%S"); + LogFileProfiler::mStartTime = GetTimeStamp(time(NULL), "%Y-%m-%d %H:%M:%S"); LogtailMonitor::GetInstance()->UpdateConstMetric("start_time", LogFileProfiler::mStartTime); #if defined(__ENTERPRISE__) && defined(_MSC_VER) diff --git a/core/config/InstanceConfigManager.cpp b/core/config/InstanceConfigManager.cpp index 986cc5ba5f..24673cea85 100644 --- a/core/config/InstanceConfigManager.cpp +++ b/core/config/InstanceConfigManager.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include "instance_config/InstanceConfigManager.h" +#include "config/InstanceConfigManager.h" #include "app_config/AppConfig.h" #include "config/feedbacker/ConfigFeedbackReceiver.h" diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index 66b1b17298..84ea7bfa73 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -19,11 +19,11 @@ #include "common/FileSystemUtil.h" #include "common/version.h" #include "config/ConfigDiff.h" +#include "config/InstanceConfigManager.h" #include "config/common_provider/CommonConfigProvider.h" #include "config/watcher/ConfigWatcher.h" #include "config/watcher/InstanceConfigWatcher.h" #include "gmock/gmock.h" -#include "instance_config/InstanceConfigManager.h" #include "pipeline/PipelineManager.h" #include "unittest/Unittest.h" diff --git a/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp b/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp index df4e479674..5adb906538 100644 --- a/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp +++ b/core/unittest/instance_config/InstanceConfigManagerUnittest.cpp @@ -16,7 +16,7 @@ #include "app_config/AppConfig.h" #include "common/JsonUtil.h" #include "config/InstanceConfig.h" -#include "instance_config/InstanceConfigManager.h" +#include "config/InstanceConfigManager.h" #include "runner/FlusherRunner.h" #include "unittest/Unittest.h" From 1450d435688511ef96e5467d14b4e67a3a43ab85 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 02:34:52 +0000 Subject: [PATCH 10/15] polish --- core/models/PipelineEventGroup.h | 2 ++ 1 file changed, 2 insertions(+) diff --git a/core/models/PipelineEventGroup.h b/core/models/PipelineEventGroup.h index a83be6e61d..cecba0f315 100644 --- a/core/models/PipelineEventGroup.h +++ b/core/models/PipelineEventGroup.h @@ -86,6 +86,8 @@ class PipelineEventGroup { MetricEvent* AddMetricEvent(); SpanEvent* AddSpanEvent(); void SwapEvents(EventsContainer& other) { mEvents.swap(other); } + void ReserveEvents(size_t size) { mEvents.reserve(size); } + std::shared_ptr& GetSourceBuffer() { return mSourceBuffer; } void SetMetadata(EventGroupMetaKey key, StringView val); From 68f23f436a23c2f4b417bd20c023075ff9b3d0c9 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 02:36:18 +0000 Subject: [PATCH 11/15] polish --- core/unittest/models/PipelineEventGroupUnittest.cpp | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/core/unittest/models/PipelineEventGroupUnittest.cpp b/core/unittest/models/PipelineEventGroupUnittest.cpp index b0ffd53a45..4a96d31024 100644 --- a/core/unittest/models/PipelineEventGroupUnittest.cpp +++ b/core/unittest/models/PipelineEventGroupUnittest.cpp @@ -23,6 +23,7 @@ namespace logtail { class PipelineEventGroupUnittest : public ::testing::Test { public: void TestSwapEvents(); + void TestReserveEvents(); void TestCopy(); void TestSetMetadata(); void TestDelMetadata(); @@ -49,6 +50,11 @@ void PipelineEventGroupUnittest::TestSwapEvents() { APSARA_TEST_EQUAL_FATAL(0U, mEventGroup->GetEvents().size()); } +void PipelineEventGroupUnittest::TestReserveEvents() { + mEventGroup->ReserveEvents(10); + APSARA_TEST_EQUAL(10U, mEventGroup->GetEvents().capacity()); +} + void PipelineEventGroupUnittest::TestCopy() { mEventGroup->AddLogEvent(); auto res = mEventGroup->Copy(); @@ -136,6 +142,7 @@ void PipelineEventGroupUnittest::TestFromJsonToJson() { } UNIT_TEST_CASE(PipelineEventGroupUnittest, TestSwapEvents) +UNIT_TEST_CASE(PipelineEventGroupUnittest, TestReserveEvents) UNIT_TEST_CASE(PipelineEventGroupUnittest, TestCopy) UNIT_TEST_CASE(PipelineEventGroupUnittest, TestSetMetadata) UNIT_TEST_CASE(PipelineEventGroupUnittest, TestDelMetadata) From a1a2b0053e7d3d2d24d96cd760b65be3ad54d2c1 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 02:59:26 +0000 Subject: [PATCH 12/15] polish --- core/common/http/AsynCurlRunner.cpp | 22 +++++++--- core/common/http/Curl.cpp | 42 ++++++++++-------- core/common/http/HttpRequest.h | 2 +- core/pipeline/queue/SenderQueueItem.h | 2 +- core/plugin/flusher/sls/FlusherSLS.cpp | 59 ++++++++++++++------------ core/runner/FlusherRunner.cpp | 3 +- core/runner/sink/http/HttpSink.cpp | 47 +++++++++++--------- 7 files changed, 101 insertions(+), 76 deletions(-) diff --git a/core/common/http/AsynCurlRunner.cpp b/core/common/http/AsynCurlRunner.cpp index f9aa2b5440..9be67bb5cb 100644 --- a/core/common/http/AsynCurlRunner.cpp +++ b/core/common/http/AsynCurlRunner.cpp @@ -182,34 +182,42 @@ void AsynCurlRunner::HandleCompletedRequests(int& runningHandlers) { CURL* handler = msg->easy_handle; AsynHttpRequest* request = nullptr; curl_easy_getinfo(handler, CURLINFO_PRIVATE, &request); - LOG_DEBUG(sLogger, - ("send http request completed, request address", - request)("response time",ToString(chrono::duration_cast(chrono::system_clock::now()- request->mLastSendTime).count()) + "ms") - ("try cnt", ToString(request->mTryCnt))); + auto responseTime + = chrono::duration_cast(chrono::system_clock::now() - request->mLastSendTime) + .count(); switch (msg->data.result) { case CURLE_OK: { long statusCode = 0; curl_easy_getinfo(handler, CURLINFO_RESPONSE_CODE, &statusCode); request->mResponse.mStatusCode = (int32_t)statusCode; request->OnSendDone(request->mResponse); + LOG_DEBUG(sLogger, + ("send http request succeeded, request address", + request)("response time", ToString(responseTime) + "ms")("try cnt", + ToString(request->mTryCnt))); break; } default: // considered as network error - if (request->mTryCnt <= request->mMaxTryCnt) { + if (request->mTryCnt < request->mMaxTryCnt) { LOG_WARNING(sLogger, - ("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)( - "errMsg", curl_easy_strerror(msg->data.result))); + ("failed to send http request", "retry immediately")("request address", request)( + "try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(msg->data.result))); // free first,becase mPrivateData will be reset in AddRequestToClient if (request->mPrivateData) { curl_slist_free_all((curl_slist*)request->mPrivateData); request->mPrivateData = nullptr; } + ++request->mTryCnt; AddRequestToClient(unique_ptr(request)); ++runningHandlers; requestReused = true; } else { request->OnSendDone(request->mResponse); + LOG_DEBUG( + sLogger, + ("failed to send http request", "abort")("request address", request)( + "response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt))); } break; } diff --git a/core/common/http/Curl.cpp b/core/common/http/Curl.cpp index af02793437..25f19b630d 100644 --- a/core/common/http/Curl.cpp +++ b/core/common/http/Curl.cpp @@ -18,10 +18,10 @@ #include #include -#include "common/DNSCache.h" #include "app_config/AppConfig.h" -#include "logger/Logger.h" +#include "common/DNSCache.h" #include "common/http/HttpResponse.h" +#include "logger/Logger.h" using namespace std; @@ -139,24 +139,25 @@ CURL* CreateCurlHandler(const std::string& method, bool SendHttpRequest(std::unique_ptr&& request, HttpResponse& response) { curl_slist* headers = NULL; CURL* curl = CreateCurlHandler(request->mMethod, - request->mHTTPSFlag, - request->mHost, - request->mPort, - request->mUrl, - request->mQueryString, - request->mHeader, - request->mBody, - response, - headers, - request->mTimeout, - AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(), - AppConfig::GetInstance()->GetBindInterface()); + request->mHTTPSFlag, + request->mHost, + request->mPort, + request->mUrl, + request->mQueryString, + request->mHeader, + request->mBody, + response, + headers, + request->mTimeout, + AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(), + AppConfig::GetInstance()->GetBindInterface()); if (curl == NULL) { - LOG_ERROR(sLogger, ("failed to init curl handler", "failed to init curl client")("request address", request.get())); + LOG_ERROR(sLogger, + ("failed to init curl handler", "failed to init curl client")("request address", request.get())); return false; } bool success = false; - while (request->mTryCnt <= request->mMaxTryCnt) { + while (true) { CURLcode res = curl_easy_perform(curl); if (res == CURLE_OK) { long http_code = 0; @@ -164,10 +165,15 @@ bool SendHttpRequest(std::unique_ptr&& request, HttpResponse& respo response.mStatusCode = (int32_t)http_code; success = true; break; + } else if (request->mTryCnt < request->mMaxTryCnt) { + LOG_WARNING(sLogger, + ("failed to send request", "retry immediately")("retryCnt", request->mTryCnt)( + "errMsg", curl_easy_strerror(res))("request address", request.get())); + ++request->mTryCnt; } else { - LOG_WARNING(sLogger,("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)("errMsg", curl_easy_strerror(res))("request address", request.get())); + break; } - } + } if (headers != NULL) { curl_slist_free_all(headers); } diff --git a/core/common/http/HttpRequest.h b/core/common/http/HttpRequest.h index bd12b9f971..13fc9ec12f 100644 --- a/core/common/http/HttpRequest.h +++ b/core/common/http/HttpRequest.h @@ -72,7 +72,7 @@ struct HttpRequest { struct AsynHttpRequest : public HttpRequest { HttpResponse mResponse; void* mPrivateData = nullptr; - time_t mEnqueTime = 0; + std::chrono::system_clock::time_point mEnqueTime; AsynHttpRequest(const std::string& method, bool httpsFlag, diff --git a/core/pipeline/queue/SenderQueueItem.h b/core/pipeline/queue/SenderQueueItem.h index 151abad0db..e791479f19 100644 --- a/core/pipeline/queue/SenderQueueItem.h +++ b/core/pipeline/queue/SenderQueueItem.h @@ -43,7 +43,7 @@ struct SenderQueueItem { std::atomic mStatus; std::chrono::system_clock::time_point mFirstEnqueTime; - time_t mLastSendTime = 0; + std::chrono::system_clock::time_point mLastSendTime; uint32_t mTryCnt = 1; SenderQueueItem(std::string&& data, diff --git a/core/plugin/flusher/sls/FlusherSLS.cpp b/core/plugin/flusher/sls/FlusherSLS.cpp index 92dbc76342..9023c3a325 100644 --- a/core/plugin/flusher/sls/FlusherSLS.cpp +++ b/core/plugin/flusher/sls/FlusherSLS.cpp @@ -120,7 +120,8 @@ shared_ptr GetConcurrencyLimiter() { return make_shared(AppConfig::GetInstance()->GetSendRequestConcurrency()); } -shared_ptr FlusherSLS::GetLogstoreConcurrencyLimiter(const std::string& project, const std::string& logstore) { +shared_ptr FlusherSLS::GetLogstoreConcurrencyLimiter(const std::string& project, + const std::string& logstore) { lock_guard lock(sMux); std::string key = project + "-" + logstore; @@ -508,11 +509,9 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mQueueKey, mPluginID, *mContext, - { - {"region", GetRegionConcurrencyLimiter(mRegion)}, - {"project", GetProjectConcurrencyLimiter(mProject)}, - {"logstore", GetLogstoreConcurrencyLimiter(mProject, mLogstore)} - }, + {{"region", GetRegionConcurrencyLimiter(mRegion)}, + {"project", GetProjectConcurrencyLimiter(mProject)}, + {"logstore", GetLogstoreConcurrencyLimiter(mProject, mLogstore)}}, mMaxSendRate); } @@ -536,12 +535,14 @@ bool FlusherSLS::Init(const Json::Value& config, Json::Value& optionalGoPipeline mSuccessCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL); mNetworkErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_NETWORK_ERROR_TOTAL); mServerErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SERVER_ERROR_TOTAL); - mShardWriteQuotaErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SHARD_WRITE_QUOTA_ERROR_TOTAL); + mShardWriteQuotaErrorCnt + = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SHARD_WRITE_QUOTA_ERROR_TOTAL); mProjectQuotaErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_PROJECT_QUOTA_ERROR_TOTAL); mUnauthErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_UNAUTH_ERROR_TOTAL); mParamsErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_PARAMS_ERROR_TOTAL); mSequenceIDErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_SEQUENCE_ID_ERROR_TOTAL); - mRequestExpiredErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_REQUEST_EXPRIRED_ERROR_TOTAL); + mRequestExpiredErrorCnt + = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_SLS_REQUEST_EXPRIRED_ERROR_TOTAL); mOtherErrorCnt = GetMetricsRecordRef().CreateCounter(METRIC_PLUGIN_FLUSHER_OTHER_ERROR_TOTAL); return true; @@ -685,17 +686,20 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) cpt->Commit(); cpt->IncreaseSequenceID(); } - LOG_DEBUG(sLogger, - ("send data to sls succeeded, item address", - item)("request id", slsResponse.mRequestId)("config", configName)("region", mRegion)( - "project", mProject)("logstore", data->mLogstore)("response time", curTime - data->mLastSendTime)( - "total send time", - ToString(chrono::duration_cast(curSystemTime - item->mFirstEnqueTime).count()) - + "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentEndpoint)("is profile data", - isProfileData)); + LOG_DEBUG( + sLogger, + ("send data to sls succeeded, item address", item)("request id", slsResponse.mRequestId)( + "config", configName)("region", mRegion)("project", mProject)("logstore", data->mLogstore)( + "response time", + ToString(chrono::duration_cast(curSystemTime - item->mLastSendTime).count()) + + "ms")( + "total send time", + ToString(chrono::duration_cast(curSystemTime - item->mFirstEnqueTime).count()) + + "ms")("try cnt", data->mTryCnt)("endpoint", data->mCurrentEndpoint)("is profile data", + isProfileData)); GetRegionConcurrencyLimiter(mRegion)->OnSuccess(); GetProjectConcurrencyLimiter(mProject)->OnSuccess(); - GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); + GetLogstoreConcurrencyLimiter(mProject, mLogstore)->OnSuccess(); SenderQueueManager::GetInstance()->DecreaseConcurrencyLimiterInSendingCnt(item->mQueueKey); DealSenderQueueItemAfterSend(item, false); if (mSuccessCnt) { @@ -870,14 +874,16 @@ void FlusherSLS::OnSendDone(const HttpResponse& response, SenderQueueItem* item) } #define LOG_PATTERN \ - ("failed to send request", failDetail.str())("operation", GetOperationString(operation))( \ - "suggestion", suggestion.str())("item address", item)("request id", slsResponse.mRequestId)( \ - "status code", slsResponse.mStatusCode)("error code", slsResponse.mErrorCode)( \ - "errMsg", slsResponse.mErrorMsg)("config", configName)("region", mRegion)("project", mProject)( \ - "logstore", data->mLogstore)("try cnt", data->mTryCnt)("response time", curTime - data->mLastSendTime)( \ - "total send time", \ - ToString(chrono::duration_cast(curSystemTime - data->mFirstEnqueTime).count()) \ - + "ms")("endpoint", data->mCurrentEndpoint)("is profile data", isProfileData) + ("failed to send request", failDetail.str())("operation", GetOperationString(operation))("suggestion", \ + suggestion.str())( \ + "item address", item)("request id", slsResponse.mRequestId)("status code", slsResponse.mStatusCode)( \ + "error code", slsResponse.mErrorCode)("errMsg", slsResponse.mErrorMsg)("config", configName)( \ + "region", mRegion)("project", mProject)("logstore", data->mLogstore)("try cnt", data->mTryCnt)( \ + "response time", \ + ToString(chrono::duration_cast(curSystemTime - data->mLastSendTime).count()) \ + + "ms")("total send time", \ + ToString(chrono::duration_cast(curSystemTime - data->mFirstEnqueTime).count()) \ + + "ms")("endpoint", data->mCurrentEndpoint)("is profile data", isProfileData) switch (operation) { case OperationOnFail::RETRY_IMMEDIATELY: @@ -940,7 +946,8 @@ bool FlusherSLS::Send(string&& data, const string& shardHashKey, const string& l key = QueueKeyManager::GetInstance()->GetKey(mProject + "-" + mLogstore); if (SenderQueueManager::GetInstance()->GetQueue(key) == nullptr) { PipelineContext ctx; - SenderQueueManager::GetInstance()->CreateQueue(key, "", ctx, std::unordered_map>()); + SenderQueueManager::GetInstance()->CreateQueue( + key, "", ctx, std::unordered_map>()); } } return Flusher::PushToQueue(make_unique(std::move(compressedData), diff --git a/core/runner/FlusherRunner.cpp b/core/runner/FlusherRunner.cpp index 4a8a3d42ea..ae1d55dc1a 100644 --- a/core/runner/FlusherRunner.cpp +++ b/core/runner/FlusherRunner.cpp @@ -144,8 +144,7 @@ void FlusherRunner::PushToHttpSink(SenderQueueItem* item, bool withLimit) { } auto req = static_cast(item->mFlusher)->BuildRequest(item); - item->mLastSendTime = time(nullptr); - req->mEnqueTime = item->mLastSendTime; + req->mEnqueTime = item->mLastSendTime = chrono::system_clock::now(); HttpSink::GetInstance()->AddRequest(std::move(req)); ++mHttpSendingCnt; LOG_DEBUG(sLogger, diff --git a/core/runner/sink/http/HttpSink.cpp b/core/runner/sink/http/HttpSink.cpp index cc6ab64f0d..06372e36e2 100644 --- a/core/runner/sink/http/HttpSink.cpp +++ b/core/runner/sink/http/HttpSink.cpp @@ -78,11 +78,13 @@ void HttpSink::Run() { unique_ptr request; if (mQueue.WaitAndPop(request, 500)) { mInItemsTotal->Add(1); - LOG_DEBUG( - sLogger, - ("got item from flusher runner, item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( - "wait time", ToString(time(nullptr) - request->mEnqueTime))("try cnt", ToString(request->mTryCnt))); + LOG_DEBUG(sLogger, + ("got item from flusher runner, item address", request->mItem)( + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "wait time", + ToString(chrono::duration_cast(chrono::system_clock::now() + - request->mEnqueTime) + .count()))("try cnt", ToString(request->mTryCnt))); if (!AddRequestToClient(std::move(request))) { continue; } @@ -168,11 +170,13 @@ void HttpSink::DoRun() { unique_ptr request; if (mQueue.TryPop(request)) { mInItemsTotal->Add(1); - LOG_DEBUG( - sLogger, - ("got item from flusher runner, item address", request->mItem)( - "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( - "wait time", ToString(time(nullptr) - request->mEnqueTime))("try cnt", ToString(request->mTryCnt))); + LOG_DEBUG(sLogger, + ("got item from flusher runner, item address", request->mItem)( + "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "wait time", + ToString(chrono::duration_cast(chrono::system_clock::now() + - request->mEnqueTime) + .count()))("try cnt", ToString(request->mTryCnt))); if (AddRequestToClient(std::move(request))) { ++runningHandlers; mSendingItemsTotal->Add(1); @@ -241,7 +245,7 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { mSendingItemsTotal->Sub(1); LOG_DEBUG( sLogger, - ("send http request completed, item address", request->mItem)( + ("send http request succeeded, item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( "response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt))( "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); @@ -249,18 +253,19 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { } default: // considered as network error - if (++request->mTryCnt <= request->mMaxTryCnt) { + if (request->mTryCnt <= request->mMaxTryCnt) { LOG_WARNING( sLogger, - ("failed to send request", "retry immediately")("item address", request->mItem)( + ("failed to send http request", "retry immediately")("item address", request->mItem)( "config-flusher-dst", QueueKeyManager::GetInstance()->GetName(request->mItem->mFlusher->GetQueueKey()))( - "try cnt", request->mTryCnt - 1)("errMsg", curl_easy_strerror(msg->data.result))); + "try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(msg->data.result))); // free first,becase mPrivateData will be reset in AddRequestToClient if (request->mPrivateData) { curl_slist_free_all((curl_slist*)request->mPrivateData); request->mPrivateData = nullptr; } + ++request->mTryCnt; AddRequestToClient(unique_ptr(request)); ++runningHandlers; requestReused = true; @@ -268,13 +273,13 @@ void HttpSink::HandleCompletedRequests(int& runningHandlers) { static_cast(request->mItem->mFlusher) ->OnSendDone(request->mResponse, request->mItem); FlusherRunner::GetInstance()->DecreaseHttpSendingCnt(); - LOG_DEBUG(sLogger, - ("send http request completed, item address", - request->mItem)("config-flusher-dst", - QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( - "response time", ToString(responseTime) + "ms")("try cnt", - ToString(request->mTryCnt - 1))( - "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); + LOG_DEBUG( + sLogger, + ("failed to send http request", "abort")("item address", request->mItem)( + "config-flusher-dst", + QueueKeyManager::GetInstance()->GetName(request->mItem->mQueueKey))( + "response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt))( + "sending cnt", ToString(FlusherRunner::GetInstance()->GetSendingBufferCount()))); } mOutFailedItemsTotal->Add(1); mFailedItemTotalResponseTimeMs->Add(responseTime); From 4c6bf513929a0dfeaed40c5e262a5e77e8c0f0cc Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 03:01:42 +0000 Subject: [PATCH 13/15] polish --- core/common/http/Curl.cpp | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/core/common/http/Curl.cpp b/core/common/http/Curl.cpp index 25f19b630d..7c0d8c6f49 100644 --- a/core/common/http/Curl.cpp +++ b/core/common/http/Curl.cpp @@ -167,8 +167,8 @@ bool SendHttpRequest(std::unique_ptr&& request, HttpResponse& respo break; } else if (request->mTryCnt < request->mMaxTryCnt) { LOG_WARNING(sLogger, - ("failed to send request", "retry immediately")("retryCnt", request->mTryCnt)( - "errMsg", curl_easy_strerror(res))("request address", request.get())); + ("failed to send http request", "retry immediately")("request address", request.get())( + "try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(res))); ++request->mTryCnt; } else { break; From 80c5be74ba2b5c3889530c5e796e662a878d33e4 Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 04:00:12 +0000 Subject: [PATCH 14/15] polish --- core/unittest/config/CommonConfigProviderUnittest.cpp | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index 84ea7bfa73..fe41260944 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -132,7 +132,7 @@ void CommonConfigProviderUnittest::TestInit() { MockCommonConfigProvider provider; provider.Init("common_v2"); APSARA_TEST_EQUAL(provider.mSequenceNum, 0); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 1); APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com"); @@ -170,7 +170,7 @@ void CommonConfigProviderUnittest::TestInit() { MockCommonConfigProvider provider; provider.Init("common_v2"); APSARA_TEST_EQUAL(provider.mSequenceNum, 0); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 2); APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com"); @@ -235,7 +235,7 @@ void CommonConfigProviderUnittest::TestInit() { MockCommonConfigProvider provider; provider.Init("common_v2"); APSARA_TEST_EQUAL(provider.mSequenceNum, 0); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, false); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 0); APSARA_TEST_EQUAL(provider.mConfigServerTags.size(), 0); @@ -407,7 +407,7 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { provider.Init("common_v2"); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 1); APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com"); @@ -625,7 +625,7 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { provider.Init("common_v2"); - APSARA_TEST_EQUAL(provider.sName, "CommonConfigProvider"); + APSARA_TEST_EQUAL(provider.sName, "common config provider"); APSARA_TEST_EQUAL(provider.mConfigServerAvailable, true); APSARA_TEST_EQUAL(provider.mConfigServerAddresses.size(), 1); APSARA_TEST_EQUAL(provider.mConfigServerAddresses[0].host, "test.config.com"); From fa0c561d7ede76c966e259d27faebc44874df2bc Mon Sep 17 00:00:00 2001 From: henryzhx8 Date: Fri, 25 Oct 2024 08:38:17 +0000 Subject: [PATCH 15/15] polish --- core/unittest/pipeline/PipelineUnittest.cpp | 20 ++++++++++++++++++++ 1 file changed, 20 insertions(+) diff --git a/core/unittest/pipeline/PipelineUnittest.cpp b/core/unittest/pipeline/PipelineUnittest.cpp index 270acf1b33..743bfdf7e1 100644 --- a/core/unittest/pipeline/PipelineUnittest.cpp +++ b/core/unittest/pipeline/PipelineUnittest.cpp @@ -2749,6 +2749,16 @@ void PipelineUnittest::TestSend() const { configs.emplace_back(0, nullptr); configs.emplace_back(1, nullptr); pipeline.mRouter.Init(configs, ctx); + + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, {}); + pipeline.mFlushersInGroupsTotal + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL); + pipeline.mFlushersInEventsTotal + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL); + pipeline.mFlushersInSizeBytes + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES); + pipeline.mFlushersTotalPackageTimeMs + = pipeline.mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS); { // all valid vector group; @@ -2804,6 +2814,16 @@ void PipelineUnittest::TestSend() const { configs.emplace_back(configJson.size(), nullptr); pipeline.mRouter.Init(configs, ctx); + WriteMetrics::GetInstance()->PrepareMetricsRecordRef(pipeline.mMetricsRecordRef, {}); + pipeline.mFlushersInGroupsTotal + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL); + pipeline.mFlushersInEventsTotal + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL); + pipeline.mFlushersInSizeBytes + = pipeline.mMetricsRecordRef.CreateCounter(METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES); + pipeline.mFlushersTotalPackageTimeMs + = pipeline.mMetricsRecordRef.CreateTimeCounter(METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS); + { vector group; group.emplace_back(make_shared());