Skip to content

Commit

Permalink
Merge remote-tracking branch 'github/main' into feat/host-id
Browse files Browse the repository at this point in the history
  • Loading branch information
quzard committed Dec 31, 2024
2 parents b8ffcdf + bd3ad05 commit 66c6b9e
Show file tree
Hide file tree
Showing 163 changed files with 8,379 additions and 5,565 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,17 @@ The core advantages of **LoongCollector**:

## Quick Start

For the complexity of C++ dependencies, the compilation of LoongCollector requires you have docker installed. If you aim to build LoongCollector from sources, you can go ahead and start with the following commands.

1. Start with local
For the complexity of C++ dependencies, the compilation of LoongCollector requires you have docker and golang installed. If you aim to build LoongCollector from sources, you can go ahead and start with the following commands.

```bash
make
cp -r example_config/quick_start/* output
cd output
./loongcollector
# Now, LoongCollector is collecting data from output/simple.log and outputing the result to stdout
# Now, LoongCollector is collecting data from output/simple.log,
# and outputing the result to stdout.
```

HEAD

## Documentation

Expand Down
4 changes: 2 additions & 2 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,9 +127,9 @@ set(SUB_DIRECTORIES_LIST
runner runner/sink/http
protobuf/sls protobuf/models
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
prometheus prometheus/labels prometheus/schedulers prometheus/async
prometheus prometheus/labels prometheus/schedulers prometheus/async prometheus/component
ebpf ebpf/observer ebpf/security ebpf/handler
parser sls_control sdk
parser
)
if (LINUX)
if (ENABLE_ENTERPRISE)
Expand Down
98 changes: 36 additions & 62 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@

using namespace std;

#define ILOGTAIL_PREFIX "ilogtail_"
#define ILOGTAIL_PIDFILE_SUFFIX ".pid"
#define LOONGCOLLECTOR_PREFIX "loongcollector_"

#ifdef __ENTERPRISE__
DEFINE_FLAG_BOOL(logtail_mode, "logtail mode", true);
#else
DEFINE_FLAG_BOOL(logtail_mode, "logtail mode", false);
#endif
DEFINE_FLAG_INT32(max_buffer_num, "max size", 40);
DEFINE_FLAG_INT32(pub_max_buffer_num, "max size", 8);
DEFINE_FLAG_INT32(pub_max_send_byte_per_sec, "the max send speed per sec, realtime thread", 20 * 1024 * 1024);
Expand Down Expand Up @@ -122,8 +124,6 @@ DECLARE_FLAG_INT32(reader_close_unused_file_time);
DECLARE_FLAG_INT32(batch_send_interval);
DECLARE_FLAG_INT32(batch_send_metric_size);

DECLARE_FLAG_BOOL(send_prefer_real_ip);
DECLARE_FLAG_INT32(send_switch_real_ip_interval);
DECLARE_FLAG_INT32(truncate_pos_skip_bytes);
DECLARE_FLAG_INT32(default_tail_limit_kb);

Expand Down Expand Up @@ -173,6 +173,7 @@ DEFINE_FLAG_STRING(logtail_snapshot_dir, "snapshot dir on local disk", "snapshot
DEFINE_FLAG_STRING(logtail_profile_snapshot, "reader profile on local disk", "logtail_profile_snapshot");
DEFINE_FLAG_STRING(ilogtail_config_env_name, "config file path", "ALIYUN_LOGTAIL_CONFIG");


#if defined(__linux__)
DEFINE_FLAG_STRING(adhoc_check_point_file_dir, "", "/tmp/logtail_adhoc_checkpoint");
#elif defined(_MSC_VER)
Expand All @@ -192,6 +193,21 @@ DEFINE_FLAG_STRING(sls_observer_ebpf_host_path,
namespace logtail {
constexpr int32_t kDefaultMaxSendBytePerSec = 25 * 1024 * 1024; // the max send speed per sec, realtime thread


// 全局并发度保留余量百分比
const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION = 0.5;
// 单地域并发度最小值
const int32_t MIN_SEND_REQUEST_CONCURRENCY = 15;
// 单地域并发度最大值
const int32_t MAX_SEND_REQUEST_CONCURRENCY = 80;
// 并发度统计数量&&时间间隔
const uint32_t CONCURRENCY_STATISTIC_THRESHOLD = 10;
const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS = 3;
// 并发度不回退百分比阈值
const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE = 10;
// 并发度慢回退百分比阈值
const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE = 40;

std::string AppConfig::sLocalConfigDir = "local";
void CreateAgentDir() {
try {
Expand Down Expand Up @@ -457,11 +473,7 @@ string GetAgentLoggersPrefix() {
}

string GetAgentLogName() {
if (BOOL_FLAG(logtail_mode)) {
return "ilogtail.LOG";
} else {
return "loongcollector.LOG";
}
return "loongcollector.LOG";
}

string GetObserverEbpfHostPath() {
Expand Down Expand Up @@ -515,19 +527,11 @@ string GetContinuousPipelineConfigDir() {
}

string GetPluginLogName() {
if (BOOL_FLAG(logtail_mode)) {
return "logtail_plugin.LOG";
} else {
return "go_plugin.LOG";
}
return "go_plugin.LOG";
}

std::string GetVersionTag() {
if (BOOL_FLAG(logtail_mode)) {
return "logtail_version";
} else {
return "loongcollector_version";
}
return "loongcollector_version";
}

std::string GetGoPluginCheckpoint() {
Expand All @@ -539,43 +543,19 @@ std::string GetGoPluginCheckpoint() {
}

std::string GetAgentName() {
if (BOOL_FLAG(logtail_mode)) {
return "ilogtail";
} else {
return "loongcollector";
}
return "loongcollector";
}

std::string GetMonitorInfoFileName() {
if (BOOL_FLAG(logtail_mode)) {
return "logtail_monitor_info";
} else {
return "loongcollector_monitor_info";
}
return "loongcollector_monitor_info";
}

std::string GetSymLinkName() {
if (BOOL_FLAG(logtail_mode)) {
return GetProcessExecutionDir() + "ilogtail";
} else {
return GetProcessExecutionDir() + "loongcollector";
}
}

std::string GetPidFileName() {
if (BOOL_FLAG(logtail_mode)) {
return GetProcessExecutionDir() + ILOGTAIL_PREFIX + ILOGTAIL_VERSION + ILOGTAIL_PIDFILE_SUFFIX;
} else {
return GetAgentRunDir() + "loongcollector.pid";
}
return GetProcessExecutionDir() + "loongcollector";
}

std::string GetAgentPrefix() {
if (BOOL_FLAG(logtail_mode)) {
return ILOGTAIL_PREFIX;
} else {
return LOONGCOLLECTOR_PREFIX;
}
return LOONGCOLLECTOR_PREFIX;
}

AppConfig::AppConfig() {
Expand Down Expand Up @@ -1041,26 +1021,11 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mCheckPointFilePath = AbsolutePath(mCheckPointFilePath, mProcessExecutionDir);
LOG_INFO(sLogger, ("logtail checkpoint path", mCheckPointFilePath));

if (confJson.isMember("send_prefer_real_ip") && confJson["send_prefer_real_ip"].isBool()) {
BOOL_FLAG(send_prefer_real_ip) = confJson["send_prefer_real_ip"].asBool();
}

if (confJson.isMember("send_switch_real_ip_interval") && confJson["send_switch_real_ip_interval"].isInt()) {
INT32_FLAG(send_switch_real_ip_interval) = confJson["send_switch_real_ip_interval"].asInt();
}

LoadInt32Parameter(INT32_FLAG(truncate_pos_skip_bytes),
confJson,
"truncate_pos_skip_bytes",
"ALIYUN_LOGTAIL_TRUNCATE_POS_SKIP_BYTES");

if (BOOL_FLAG(send_prefer_real_ip)) {
LOG_INFO(sLogger,
("change send policy, prefer use real ip, switch interval seconds",
INT32_FLAG(send_switch_real_ip_interval))("truncate skip read offset",
INT32_FLAG(truncate_pos_skip_bytes)));
}

if (confJson.isMember("ignore_dir_inode_changed") && confJson["ignore_dir_inode_changed"].isBool()) {
mIgnoreDirInodeChanged = confJson["ignore_dir_inode_changed"].asBool();
}
Expand Down Expand Up @@ -1229,6 +1194,15 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mBindInterface.clear();
LOG_INFO(sLogger, ("bind_interface", mBindInterface));
}

// mSendRequestConcurrency was limited
if (mSendRequestConcurrency < MIN_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MIN_SEND_REQUEST_CONCURRENCY;
}
if (mSendRequestConcurrency > MAX_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MAX_SEND_REQUEST_CONCURRENCY;
}
mSendRequestGlobalConcurrency = mSendRequestConcurrency * (1 + GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION);
}

bool AppConfig::CheckAndResetProxyEnv() {
Expand Down
23 changes: 20 additions & 3 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
namespace logtail {
extern const int32_t kDefaultMaxSendBytePerSec;

extern const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION;
extern const int32_t MIN_SEND_REQUEST_CONCURRENCY;
extern const int32_t MAX_SEND_REQUEST_CONCURRENCY;
extern const uint32_t CONCURRENCY_STATISTIC_THRESHOLD;
extern const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS;
extern const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE;
extern const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE;

void CreateAgentDir();

std::string GetAgentLogDir();
Expand Down Expand Up @@ -62,7 +70,6 @@ std::string GetGoPluginCheckpoint();
std::string GetAgentName();
std::string GetMonitorInfoFileName();
std::string GetSymLinkName();
std::string GetPidFileName();
std::string GetAgentPrefix();

template <class T>
Expand Down Expand Up @@ -134,6 +141,7 @@ class AppConfig {
int32_t mNumOfBufferFile;
int32_t mLocalFileSize;
int32_t mSendRequestConcurrency;
int32_t mSendRequestGlobalConcurrency;
std::string mBufferFilePath;

// checkpoint
Expand Down Expand Up @@ -210,6 +218,8 @@ class AppConfig {

std::string mBindInterface;



// /**
// * @brief Load ConfigServer, DataServer and network interface
// *
Expand Down Expand Up @@ -308,7 +318,7 @@ class AppConfig {

public:
AppConfig();
~AppConfig(){};
~AppConfig() {};

void LoadInstanceConfig(const std::map<std::string, std::shared_ptr<InstanceConfig>>&);

Expand Down Expand Up @@ -437,8 +447,12 @@ class AppConfig {
int32_t GetLocalFileSize() const { return mLocalFileSize; }

const std::string& GetBufferFilePath() const { return mBufferFilePath; }

// 单地域并发度
int32_t GetSendRequestConcurrency() const { return mSendRequestConcurrency; }
// 全局并发度
int32_t GetSendRequestGlobalConcurrency() const { return mSendRequestGlobalConcurrency; }

double GetGlobalConcurrencyFreePercentageForOneRegion() const { return GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION; }

int32_t GetProcessThreadCount() const { return mProcessThreadCount; }

Expand Down Expand Up @@ -521,6 +535,9 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class EnterpriseSLSClientManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
38 changes: 17 additions & 21 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "prometheus/PrometheusInputRunner.h"
#include "runner/FlusherRunner.h"
Expand Down Expand Up @@ -73,9 +74,6 @@ DEFINE_FLAG_INT32(queue_check_gc_interval_sec, "30s", 30);
DEFINE_FLAG_BOOL(enable_cgroup, "", false);
#endif

DECLARE_FLAG_BOOL(send_prefer_real_ip);
DECLARE_FLAG_BOOL(global_network_success);

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -199,11 +197,13 @@ void Application::Start() { // GCOVR_EXCL_START
#if defined(__ENTERPRISE__) && defined(_MSC_VER)
InitWindowsSignalObject();
#endif
BoundedSenderQueueInterface::SetFeedback(ProcessQueueManager::GetInstance());

HttpSink::GetInstance()->Init();
FlusherRunner::GetInstance()->Init();
// resource monitor
// TODO: move metric related initialization to input Init
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();

// config provider
{
// add local config dir
filesystem::path localConfigPath = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir())
Expand All @@ -217,18 +217,23 @@ void Application::Start() { // GCOVR_EXCL_START
}
PipelineConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
}

#ifdef __ENTERPRISE__
EnterpriseConfigProvider::GetInstance()->Start();
LegacyConfigProvider::GetInstance()->Init("legacy");
#else
InitRemoteConfigProviders();
#endif

AlarmManager::GetInstance()->Init();
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();
// runner
BoundedSenderQueueInterface::SetFeedback(ProcessQueueManager::GetInstance());
HttpSink::GetInstance()->Init();
FlusherRunner::GetInstance()->Init();
ProcessorRunner::GetInstance()->Init();

// flusher_sls resource should be explicitly initialized to allow internal metrics and alarms to be sent
FlusherSLS::InitResource();

// plugin registration
PluginRegistry::GetInstance()->LoadPlugins();
InputFeedbackInterfaceRegistry::GetInstance()->LoadFeedbackInterfaces();

Expand Down Expand Up @@ -258,7 +263,8 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

ProcessorRunner::GetInstance()->Init();
// TODO: this should be refactored to internal pipeline
AlarmManager::GetInstance()->Init();

time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0;
#ifndef LOGTAIL_NO_TC_MALLOC
Expand Down Expand Up @@ -392,16 +398,6 @@ void Application::CheckCriticalCondition(int32_t curTime) {
_exit(1);
}
#endif
// if network is fail in 2 hours, force exit (for ant only)
// work around for no network when docker start
if (BOOL_FLAG(send_prefer_real_ip) && !BOOL_FLAG(global_network_success) && curTime - mStartTime > 7200) {
LOG_ERROR(sLogger, ("network is fail", "prepare force exit"));
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM,
"network is fail since " + ToString(mStartTime) + " force exit");
AlarmManager::GetInstance()->ForceToSend();
sleep(10);
_exit(1);
}
}

bool Application::GetUUIDThread() {
Expand Down
Loading

0 comments on commit 66c6b9e

Please sign in to comment.