Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

improve send log and adjust runner exit timeout #1832

Merged
merged 15 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -113,10 +113,9 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/plugin/flusher/links.cmake)

# Subdirectories (modules). except for common, input, processor, flusher, observer, helper, spl, and provider.
set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager logger go_pipeline monitor monitor/metric_constants profile_sender models
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants profile_sender models
config config/watcher
instance_config
metadata pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
runner runner/sink/http
protobuf/sls protobuf/models
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
Expand Down
4 changes: 2 additions & 2 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -31,14 +31,14 @@
#include "common/UUIDUtil.h"
#include "common/version.h"
#include "config/ConfigDiff.h"
#include "config/InstanceConfigManager.h"
#include "config/watcher/ConfigWatcher.h"
#include "config/watcher/InstanceConfigWatcher.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
#include "file_server/FileServer.h"
#include "file_server/event_handler/LogInput.h"
#include "go_pipeline/LogtailPlugin.h"
#include "instance_config/InstanceConfigManager.h"
#include "logger/Logger.h"
#include "monitor/LogFileProfiler.h"
#include "monitor/MetricExportor.h"
Expand Down Expand Up @@ -192,7 +192,7 @@ void Application::Init() {
}

void Application::Start() { // GCOVR_EXCL_START
LogFileProfiler::mStartTime = GetTimeStamp(time(NULL), "%Y-%m-%d %H:%M:%S");
LogFileProfiler::mStartTime = GetTimeStamp(time(NULL), "%Y-%m-%d %H:%M:%S");
LogtailMonitor::GetInstance()->UpdateConstMetric("start_time", LogFileProfiler::mStartTime);

#if defined(__ENTERPRISE__) && defined(_MSC_VER)
Expand Down
2 changes: 0 additions & 2 deletions core/checkpoint/RangeCheckpoint.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,8 @@ class RangeCheckpoint {
std::string key;
QueueKey fbKey;
RangeCheckpointPB data;
std::vector<std::pair<uint64_t, size_t>> positions;

inline void Prepare() {
positions.clear();
data.set_committed(false);
save();
}
Expand Down
22 changes: 15 additions & 7 deletions core/common/http/AsynCurlRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,34 +182,42 @@ void AsynCurlRunner::HandleCompletedRequests(int& runningHandlers) {
CURL* handler = msg->easy_handle;
AsynHttpRequest* request = nullptr;
curl_easy_getinfo(handler, CURLINFO_PRIVATE, &request);
LOG_DEBUG(sLogger,
("send http request completed, request address",
request)("response time",ToString(chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now()- request->mLastSendTime).count()) + "ms")
("try cnt", ToString(request->mTryCnt)));
auto responseTime
= chrono::duration_cast<chrono::milliseconds>(chrono::system_clock::now() - request->mLastSendTime)
.count();
switch (msg->data.result) {
case CURLE_OK: {
long statusCode = 0;
curl_easy_getinfo(handler, CURLINFO_RESPONSE_CODE, &statusCode);
request->mResponse.mStatusCode = (int32_t)statusCode;
request->OnSendDone(request->mResponse);
LOG_DEBUG(sLogger,
("send http request succeeded, request address",
request)("response time", ToString(responseTime) + "ms")("try cnt",
ToString(request->mTryCnt)));
break;
}
default:
// considered as network error
if (request->mTryCnt <= request->mMaxTryCnt) {
if (request->mTryCnt < request->mMaxTryCnt) {
LOG_WARNING(sLogger,
("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)(
"errMsg", curl_easy_strerror(msg->data.result)));
("failed to send http request", "retry immediately")("request address", request)(
"try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(msg->data.result)));
// free first,becase mPrivateData will be reset in AddRequestToClient
if (request->mPrivateData) {
curl_slist_free_all((curl_slist*)request->mPrivateData);
request->mPrivateData = nullptr;
}
++request->mTryCnt;
AddRequestToClient(unique_ptr<AsynHttpRequest>(request));
++runningHandlers;
requestReused = true;
} else {
request->OnSendDone(request->mResponse);
LOG_DEBUG(
sLogger,
("failed to send http request", "abort")("request address", request)(
"response time", ToString(responseTime) + "ms")("try cnt", ToString(request->mTryCnt)));
}
break;
}
Expand Down
42 changes: 24 additions & 18 deletions core/common/http/Curl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,10 @@
#include <map>
#include <string>

#include "common/DNSCache.h"
#include "app_config/AppConfig.h"
#include "logger/Logger.h"
#include "common/DNSCache.h"
#include "common/http/HttpResponse.h"
#include "logger/Logger.h"

using namespace std;

Expand Down Expand Up @@ -139,35 +139,41 @@ CURL* CreateCurlHandler(const std::string& method,
bool SendHttpRequest(std::unique_ptr<HttpRequest>&& request, HttpResponse& response) {
curl_slist* headers = NULL;
CURL* curl = CreateCurlHandler(request->mMethod,
request->mHTTPSFlag,
request->mHost,
request->mPort,
request->mUrl,
request->mQueryString,
request->mHeader,
request->mBody,
response,
headers,
request->mTimeout,
AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(),
AppConfig::GetInstance()->GetBindInterface());
request->mHTTPSFlag,
request->mHost,
request->mPort,
request->mUrl,
request->mQueryString,
request->mHeader,
request->mBody,
response,
headers,
request->mTimeout,
AppConfig::GetInstance()->IsHostIPReplacePolicyEnabled(),
AppConfig::GetInstance()->GetBindInterface());
if (curl == NULL) {
LOG_ERROR(sLogger, ("failed to init curl handler", "failed to init curl client")("request address", request.get()));
LOG_ERROR(sLogger,
("failed to init curl handler", "failed to init curl client")("request address", request.get()));
return false;
}
bool success = false;
while (request->mTryCnt <= request->mMaxTryCnt) {
while (true) {
CURLcode res = curl_easy_perform(curl);
if (res == CURLE_OK) {
long http_code = 0;
curl_easy_getinfo(curl, CURLINFO_RESPONSE_CODE, &http_code);
response.mStatusCode = (int32_t)http_code;
success = true;
break;
} else if (request->mTryCnt < request->mMaxTryCnt) {
LOG_WARNING(sLogger,
("failed to send http request", "retry immediately")("request address", request.get())(
"try cnt", request->mTryCnt)("errMsg", curl_easy_strerror(res)));
++request->mTryCnt;
} else {
LOG_WARNING(sLogger,("failed to send request", "retry immediately")("retryCnt", request->mTryCnt++)("errMsg", curl_easy_strerror(res))("request address", request.get()));
break;
}
}
}
if (headers != NULL) {
curl_slist_free_all(headers);
}
Expand Down
2 changes: 1 addition & 1 deletion core/common/http/HttpRequest.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ struct HttpRequest {
struct AsynHttpRequest : public HttpRequest {
HttpResponse mResponse;
void* mPrivateData = nullptr;
time_t mEnqueTime = 0;
std::chrono::system_clock::time_point mEnqueTime;

AsynHttpRequest(const std::string& method,
bool httpsFlag,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
* limitations under the License.
*/

#include "instance_config/InstanceConfigManager.h"
#include "config/InstanceConfigManager.h"

#include "app_config/AppConfig.h"
#include "config/feedbacker/ConfigFeedbackReceiver.h"
Expand Down
2 changes: 1 addition & 1 deletion core/config/common_provider/CommonConfigProvider.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ namespace logtail {
std::string CommonConfigProvider::configVersion = "version";

void CommonConfigProvider::Init(const string& dir) {
sName = "CommonConfigProvider";
sName = "common config provider";

ConfigProvider::Init(dir);
LoadConfigFile();
Expand Down
17 changes: 0 additions & 17 deletions core/deps/README.md

This file was deleted.

157 changes: 0 additions & 157 deletions core/deps/build.sh

This file was deleted.

2 changes: 2 additions & 0 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ class PipelineEventGroup {
MetricEvent* AddMetricEvent();
SpanEvent* AddSpanEvent();
void SwapEvents(EventsContainer& other) { mEvents.swap(other); }
void ReserveEvents(size_t size) { mEvents.reserve(size); }

std::shared_ptr<SourceBuffer>& GetSourceBuffer() { return mSourceBuffer; }

void SetMetadata(EventGroupMetaKey key, StringView val);
Expand Down
1 change: 0 additions & 1 deletion core/monitor/Monitor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -724,7 +724,6 @@ void LoongCollectorMonitor::Init() {
mAgentGoRoutinesTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_GO_ROUTINES_TOTAL);
mAgentOpenFdTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_OPEN_FD_TOTAL);
mAgentConfigTotal = mMetricsRecordRef.CreateIntGauge(METRIC_AGENT_PIPELINE_CONFIG_TOTAL);
LOG_INFO(sLogger, ("LoongCollectorMonitor", "started"));
}

void LoongCollectorMonitor::Stop() {
Expand Down
Loading
Loading