Skip to content

Commit

Permalink
improve send log and adjust runner exit timeout (#1832)
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 authored Oct 25, 2024
1 parent be5aa4f commit 19c681f
Show file tree
Hide file tree
Showing 39 changed files with 235 additions and 338 deletions.
5 changes: 2 additions & 3 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake)

# Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider.
set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager logger go_pipeline monitor monitor/metric_constants profile_sender models
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants profile_sender models
config config/watcher
instance_config
metadata pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
protobuf/sls protobuf/models
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
Expand Down
2 changes: 1 addition & 1 deletion core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
#include "common/UUIDUtil.h"
#include "common/version.h"
#include "config/ConfigDiff.h"
#include "config/InstanceConfigManager.h"
#include "config/watcher/ConfigWatcher.h"
#include "config/watcher/InstanceConfigWatcher.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "file_server/event_handler/LogInput.h"
#include "go_pipeline/LogtailPlugin.h"
#include "instance_config/InstanceConfigManager.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/MetricExportor.h"
Expand Down
2 changes: 0 additions & 2 deletions core/checkpoint/RangeCheckpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ class RangeCheckpoint {
std::string key;
QueueKey fbKey;
RangeCheckpointPB data;
std::vector<std::pair<uint64_t, size_t>> positions;

inline void Prepare() {
positions.clear();
data.set_committed(false);
save();
}
Expand Down
22 changes: 15 additions & 7 deletions core/common/http/AsynCurlRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,34 +182,42 @@ void AsynCurlRunner::HandleCompletedRequests(int& runningHandlers) {
CURL* handler = msg->easy_handle;
AsynHttpRequest* request = nullptr;
curl_easy_getinfo(handler, CURLINFO_PRIVATE, &request);
LOG_DEBUG(sLogger,
("send http request completed, request address",
request)("response time",ToString(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()- request->mLastSendTime).count()) + "ms")
("try cnt", ToString(request->mTryCnt)));
auto responseTime
= chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - request->mLastSendTime)
.count();
switch (msg->data.result) {
case CURLE_OK: {
long statusCode = 0;
curl_easy_getinfo(handler, CURLINFO_RESPONSE_CODE, &statusCode);
request->mResponse.mStatusCode = (int32_t)statusCode;
request->OnSendDone(request->mResponse);
LOG_DEBUG(sLogger,
("send http request succeeded, request address",
request)("response time", ToString(responseTime) + "ms")("try cnt",
ToString(request->mTryCnt)));
break;
}
default:
// considered as network error
if (request->mTryCnt <= request->mMaxTryCnt) {
if (request->mTryCnt < request->mMaxTryCnt) {
LOG_WARNING(sLogger,
("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)(
"errMsg", curl_easy_strerror(msg->data.result)));
("failed to send http request", "retry immediately")("request address", request)(
"try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(msg->data.result)));
// free first,becase mPrivateData will be reset in AddRequestToClient
if (request->mPrivateData) {
curl_slist_free_all((curl_slist*)request->mPrivateData);
request->mPrivateData = nullptr;
}
++request->mTryCnt;
AddRequestToClient(unique_ptr<AsynHttpRequest>(request));
++runningHandlers;
requestReused = true;
} else {
request->OnSendDone(request->mResponse);
LOG_DEBUG(
sLogger,
("failed to send http request", "abort")("request address", request)(
"response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt)));
}
break;
}
Expand Down
42 changes: 24 additions & 18 deletions core/common/http/Curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include <map>
#include <string>

#include "common/DNSCache.h"
#include "app_config/AppConfig.h"
#include "logger/Logger.h"
#include "common/DNSCache.h"
#include "common/http/HttpResponse.h"
#include "logger/Logger.h"

using namespace std;

Expand Down Expand Up @@ -139,35 +139,41 @@ CURL* CreateCurlHandler(const std::string& method,
bool SendHttpRequest(std::unique_ptr<HttpRequest>&& request, HttpResponse& response) {
curl_slist* headers = NULL;
CURL* curl = CreateCurlHandler(request->mMethod,
request->mHTTPSFlag,
request->mHost,
request->mPort,
request->mUrl,
request->mQueryString,
request->mHeader,
request->mBody,
response,
headers,
request->mTimeout,
AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(),
AppConfig::GetInstance()->GetBindInterface());
request->mHTTPSFlag,
request->mHost,
request->mPort,
request->mUrl,
request->mQueryString,
request->mHeader,
request->mBody,
response,
headers,
request->mTimeout,
AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(),
AppConfig::GetInstance()->GetBindInterface());
if (curl == NULL) {
LOG_ERROR(sLogger, ("failed to init curl handler", "failed to init curl client")("request address", request.get()));
LOG_ERROR(sLogger,
("failed to init curl handler", "failed to init curl client")("request address", request.get()));
return false;
}
bool success = false;
while (request->mTryCnt <= request->mMaxTryCnt) {
while (true) {
CURLcode res = curl_easy_perform(curl);
if (res == CURLE_OK) {
long http_code = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
response.mStatusCode = (int32_t)http_code;
success = true;
break;
} else if (request->mTryCnt < request->mMaxTryCnt) {
LOG_WARNING(sLogger,
("failed to send http request", "retry immediately")("request address", request.get())(
"try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(res)));
++request->mTryCnt;
} else {
LOG_WARNING(sLogger,("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)("errMsg", curl_easy_strerror(res))("request address", request.get()));
break;
}
}
}
if (headers != NULL) {
curl_slist_free_all(headers);
}
Expand Down
2 changes: 1 addition & 1 deletion core/common/http/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct HttpRequest {
struct AsynHttpRequest : public HttpRequest {
HttpResponse mResponse;
void* mPrivateData = nullptr;
time_t mEnqueTime = 0;
std::chrono::system_clock::time_point mEnqueTime;

AsynHttpRequest(const std::string& method,
bool httpsFlag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "instance_config/InstanceConfigManager.h"
#include "config/InstanceConfigManager.h"

#include "app_config/AppConfig.h"
#include "config/feedbacker/ConfigFeedbackReceiver.h"
Expand Down
File renamed without changes.
2 changes: 1 addition & 1 deletion core/config/common_provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace logtail {
std::string CommonConfigProvider::configVersion = "version";

void CommonConfigProvider::Init(const string& dir) {
sName = "CommonConfigProvider";
sName = "common config provider";

ConfigProvider::Init(dir);
LoadConfigFile();
Expand Down
17 changes: 0 additions & 17 deletions core/deps/README.md

This file was deleted.

157 changes: 0 additions & 157 deletions core/deps/build.sh

This file was deleted.

2 changes: 2 additions & 0 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class PipelineEventGroup {
MetricEvent* AddMetricEvent();
SpanEvent* AddSpanEvent();
void SwapEvents(EventsContainer& other) { mEvents.swap(other); }
void ReserveEvents(size_t size) { mEvents.reserve(size); }

std::shared_ptr<SourceBuffer>& GetSourceBuffer() { return mSourceBuffer; }

void SetMetadata(EventGroupMetaKey key, StringView val);
Expand Down
1 change: 0 additions & 1 deletion core/monitor/Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,6 @@ void LoongCollectorMonitor::Init() {
mAgentGoRoutinesTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_GO_ROUTINES_TOTAL);
mAgentOpenFdTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_OPEN_FD_TOTAL);
mAgentConfigTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL);
LOG_INFO(sLogger, ("LoongCollectorMonitor", "started"));
}

void LoongCollectorMonitor::Stop() {
Expand Down
5 changes: 5 additions & 0 deletions core/monitor/metric_constants/MetricConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,10 @@ extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENTS_TOTAL;
extern const std::string METRIC_PIPELINE_PROCESSORS_IN_EVENT_GROUPS_TOTAL;
extern const std::string METRIC_PIPELINE_PROCESSORS_IN_SIZE_BYTES;
extern const std::string METRIC_PIPELINE_PROCESSORS_TOTAL_PROCESS_TIME_MS;
extern const std::string METRIC_PIPELINE_FLUSHERS_IN_EVENTS_TOTAL;
extern const std::string METRIC_PIPELINE_FLUSHERS_IN_EVENT_GROUPS_TOTAL;
extern const std::string METRIC_PIPELINE_FLUSHERS_IN_SIZE_BYTES;
extern const std::string METRIC_PIPELINE_FLUSHERS_TOTAL_PACKAGE_TIME_MS;
extern const std::string METRIC_PIPELINE_START_TIME;

//////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -125,6 +129,7 @@ extern const std::string METRIC_PLUGIN_OUT_SUCCESSFUL_EVENTS_TOTAL;
/**********************************************************
* all flusher (所有发送插件通用指标)
**********************************************************/
extern const std::string METRIC_PLUGIN_FLUSHER_TOTAL_PACKAGE_TIME_MS;
extern const std::string METRIC_PLUGIN_FLUSHER_OUT_EVENT_GROUPS_TOTAL;
extern const std::string METRIC_PLUGIN_FLUSHER_SEND_DONE_TOTAL;
extern const std::string METRIC_PLUGIN_FLUSHER_SUCCESS_TOTAL;
Expand Down
Loading

0 comments on commit 19c681f

Please sign in to comment.