Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main' into merge-opensource-ma…
Browse files Browse the repository at this point in the history
…in-20241229 merge

Link: https://code.alibaba-inc.com/sls/ilogtail/codereview/19919644
* set the logtail_mode flag to true for enterprise builds (#1976)

* fix: add check future object valid before wait on it (#1986)

* init (#1993)

* Optimize the limiter code to meet better isolation and recovery scenarios (#1985)

* fix: flusher_otlp stop nil log/metric/trace client (#1994)

* fix nil log/metric/trace otlp client stop issue

Change-Id: I8948322d1e54a61c7b17968a5bcaf5590d9a437c

* add ut

Change-Id: Ib0003a387cb9d23592f28fd8097bc49ff3ae2351

* Polish README (#1992)

* Update README.md

Polish quick start.

* test: pipeline update unittest (#1991)

* test: pipeline update unittest

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* fix

* feat: prom stream scrape and stream process (#1925)

* add ut (#1997)

* feat: add metrics for logtail mode (#2001)

* init

* fix ut

* Merge remote-tracking branch 'upstream/main' into merge-opensource-main-20241229
merge
  • Loading branch information
Takuka0311 authored and yemo.xkj committed Dec 31, 2024
1 parent c75af25 commit ddcb3ee
Show file tree
Hide file tree
Showing 94 changed files with 4,345 additions and 537 deletions.
8 changes: 3 additions & 5 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -45,19 +45,17 @@ The core advantages of **LoongCollector**:

## Quick Start

For the complexity of C++ dependencies, the compilation of LoongCollector requires you have docker installed. If you aim to build LoongCollector from sources, you can go ahead and start with the following commands.

1. Start with local
For the complexity of C++ dependencies, the compilation of LoongCollector requires you have docker and golang installed. If you aim to build LoongCollector from sources, you can go ahead and start with the following commands.

```bash
make
cp -r example_config/quick_start/* output
cd output
./loongcollector
# Now, LoongCollector is collecting data from output/simple.log and outputing the result to stdout
# Now, LoongCollector is collecting data from output/simple.log,
# and outputing the result to stdout.
```

HEAD

## Documentation

Expand Down
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ set(SUB_DIRECTORIES_LIST
runner runner/sink/http
protobuf/sls protobuf/models
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
prometheus prometheus/labels prometheus/schedulers prometheus/async
prometheus prometheus/labels prometheus/schedulers prometheus/async prometheus/component
ebpf ebpf/observer ebpf/security ebpf/handler
parser sls_control sdk
)
Expand Down
81 changes: 36 additions & 45 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,11 +40,13 @@

using namespace std;

#define ILOGTAIL_PREFIX "ilogtail_"
#define ILOGTAIL_PIDFILE_SUFFIX ".pid"
#define LOONGCOLLECTOR_PREFIX "loongcollector_"

#ifdef __ENTERPRISE__
DEFINE_FLAG_BOOL(logtail_mode, "logtail mode", true);
#else
DEFINE_FLAG_BOOL(logtail_mode, "logtail mode", false);
#endif
DEFINE_FLAG_INT32(max_buffer_num, "max size", 40);
DEFINE_FLAG_INT32(pub_max_buffer_num, "max size", 8);
DEFINE_FLAG_INT32(pub_max_send_byte_per_sec, "the max send speed per sec, realtime thread", 20 * 1024 * 1024);
Expand Down Expand Up @@ -173,6 +175,7 @@ DEFINE_FLAG_STRING(logtail_snapshot_dir, "snapshot dir on local disk", "snapshot
DEFINE_FLAG_STRING(logtail_profile_snapshot, "reader profile on local disk", "logtail_profile_snapshot");
DEFINE_FLAG_STRING(ilogtail_config_env_name, "config file path", "ALIYUN_LOGTAIL_CONFIG");


#if defined(__linux__)
DEFINE_FLAG_STRING(adhoc_check_point_file_dir, "", "/tmp/logtail_adhoc_checkpoint");
#elif defined(_MSC_VER)
Expand All @@ -192,6 +195,21 @@ DEFINE_FLAG_STRING(sls_observer_ebpf_host_path,
namespace logtail {
constexpr int32_t kDefaultMaxSendBytePerSec = 25 * 1024 * 1024; // the max send speed per sec, realtime thread


// 全局并发度保留余量百分比
const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION = 0.5;
// 单地域并发度最小值
const int32_t MIN_SEND_REQUEST_CONCURRENCY = 15;
// 单地域并发度最大值
const int32_t MAX_SEND_REQUEST_CONCURRENCY = 80;
// 并发度统计数量&&时间间隔
const uint32_t CONCURRENCY_STATISTIC_THRESHOLD = 10;
const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS = 3;
// 并发度不回退百分比阈值
const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE = 10;
// 并发度慢回退百分比阈值
const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE = 40;

std::string AppConfig::sLocalConfigDir = "local";
void CreateAgentDir() {
try {
Expand Down Expand Up @@ -423,11 +441,7 @@ string GetAgentLoggersPrefix() {
}

string GetAgentLogName() {
if (BOOL_FLAG(logtail_mode)) {
return "ilogtail.LOG";
} else {
return "loongcollector.LOG";
}
return "loongcollector.LOG";
}

string GetObserverEbpfHostPath() {
Expand Down Expand Up @@ -481,19 +495,11 @@ string GetContinuousPipelineConfigDir() {
}

string GetPluginLogName() {
if (BOOL_FLAG(logtail_mode)) {
return "logtail_plugin.LOG";
} else {
return "go_plugin.LOG";
}
return "go_plugin.LOG";
}

std::string GetVersionTag() {
if (BOOL_FLAG(logtail_mode)) {
return "logtail_version";
} else {
return "loongcollector_version";
}
return "loongcollector_version";
}

std::string GetGoPluginCheckpoint() {
Expand All @@ -505,43 +511,19 @@ std::string GetGoPluginCheckpoint() {
}

std::string GetAgentName() {
if (BOOL_FLAG(logtail_mode)) {
return "ilogtail";
} else {
return "loongcollector";
}
return "loongcollector";
}

std::string GetMonitorInfoFileName() {
if (BOOL_FLAG(logtail_mode)) {
return "logtail_monitor_info";
} else {
return "loongcollector_monitor_info";
}
return "loongcollector_monitor_info";
}

std::string GetSymLinkName() {
if (BOOL_FLAG(logtail_mode)) {
return GetProcessExecutionDir() + "ilogtail";
} else {
return GetProcessExecutionDir() + "loongcollector";
}
}

std::string GetPidFileName() {
if (BOOL_FLAG(logtail_mode)) {
return GetProcessExecutionDir() + ILOGTAIL_PREFIX + ILOGTAIL_VERSION + ILOGTAIL_PIDFILE_SUFFIX;
} else {
return GetAgentRunDir() + "loongcollector.pid";
}
return GetProcessExecutionDir() + "loongcollector";
}

std::string GetAgentPrefix() {
if (BOOL_FLAG(logtail_mode)) {
return ILOGTAIL_PREFIX;
} else {
return LOONGCOLLECTOR_PREFIX;
}
return LOONGCOLLECTOR_PREFIX;
}

AppConfig::AppConfig() {
Expand Down Expand Up @@ -1195,6 +1177,15 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mBindInterface.clear();
LOG_INFO(sLogger, ("bind_interface", mBindInterface));
}

// mSendRequestConcurrency was limited
if (mSendRequestConcurrency < MIN_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MIN_SEND_REQUEST_CONCURRENCY;
}
if (mSendRequestConcurrency > MAX_SEND_REQUEST_CONCURRENCY) {
mSendRequestConcurrency = MAX_SEND_REQUEST_CONCURRENCY;
}
mSendRequestGlobalConcurrency = mSendRequestConcurrency * (1 + GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION);
}

bool AppConfig::CheckAndResetProxyEnv() {
Expand Down
19 changes: 17 additions & 2 deletions core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,14 @@
namespace logtail {
extern const int32_t kDefaultMaxSendBytePerSec;

extern const double GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION;
extern const int32_t MIN_SEND_REQUEST_CONCURRENCY;
extern const int32_t MAX_SEND_REQUEST_CONCURRENCY;
extern const uint32_t CONCURRENCY_STATISTIC_THRESHOLD;
extern const uint32_t CONCURRENCY_STATISTIC_INTERVAL_THRESHOLD_SECONDS;
extern const uint32_t NO_FALL_BACK_FAIL_PERCENTAGE;
extern const uint32_t SLOW_FALL_BACK_FAIL_PERCENTAGE;

void CreateAgentDir();

std::string GetAgentLogDir();
Expand Down Expand Up @@ -60,7 +68,6 @@ std::string GetGoPluginCheckpoint();
std::string GetAgentName();
std::string GetMonitorInfoFileName();
std::string GetSymLinkName();
std::string GetPidFileName();
std::string GetAgentPrefix();

template <class T>
Expand Down Expand Up @@ -132,6 +139,7 @@ class AppConfig {
int32_t mNumOfBufferFile;
int32_t mLocalFileSize;
int32_t mSendRequestConcurrency;
int32_t mSendRequestGlobalConcurrency;
std::string mBufferFilePath;

// checkpoint
Expand Down Expand Up @@ -208,6 +216,8 @@ class AppConfig {

std::string mBindInterface;



// /**
// * @brief Load ConfigServer, DataServer and network interface
// *
Expand Down Expand Up @@ -435,8 +445,12 @@ class AppConfig {
int32_t GetLocalFileSize() const { return mLocalFileSize; }

const std::string& GetBufferFilePath() const { return mBufferFilePath; }

// 单地域并发度
int32_t GetSendRequestConcurrency() const { return mSendRequestConcurrency; }
// 全局并发度
int32_t GetSendRequestGlobalConcurrency() const { return mSendRequestGlobalConcurrency; }

double GetGlobalConcurrencyFreePercentageForOneRegion() const { return GLOBAL_CONCURRENCY_FREE_PERCENTAGE_FOR_ONE_REGION; }

int32_t GetProcessThreadCount() const { return mProcessThreadCount; }

Expand Down Expand Up @@ -519,6 +533,7 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class PipelineUpdateUnittest;
#endif
};

Expand Down
3 changes: 3 additions & 0 deletions core/common/http/AsynCurlRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,9 @@ bool AsynCurlRunner::Init() {
void AsynCurlRunner::Stop() {
mIsFlush = true;
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (!mThreadRes.valid()) {
return;
}
if (s == future_status::ready) {
LOG_INFO(sLogger, ("async curl runner", "stopped successfully"));
} else {
Expand Down
6 changes: 5 additions & 1 deletion core/common/http/HttpResponse.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ class HttpResponse {
HttpResponse()
: mHeader(compareHeader),
mBody(new std::string(), [](void* p) { delete static_cast<std::string*>(p); }),
mWriteCallback(DefaultWriteCallback) {};
mWriteCallback(DefaultWriteCallback){};
HttpResponse(void* body,
const std::function<void(void*)>& bodyDeleter,
size_t (*callback)(char*, size_t, size_t, void*))
Expand Down Expand Up @@ -155,6 +155,10 @@ class HttpResponse {
std::map<std::string, std::string, decltype(compareHeader)*> mHeader;
std::unique_ptr<void, std::function<void(void*)>> mBody;
size_t (*mWriteCallback)(char*, size_t, size_t, void*) = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class HttpSinkMock;
#endif
};

} // namespace logtail
3 changes: 3 additions & 0 deletions core/common/timer/Timer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@ void Timer::Stop() {
mIsThreadRunning = false;
}
mCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("timer", "stopped successfully"));
Expand Down
11 changes: 10 additions & 1 deletion core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -246,10 +246,17 @@ bool PipelineConfig::Parse() {
}
}
mInputs.push_back(&plugin);
#ifndef APSARA_UNIT_TEST_MAIN
// TODO: remove these special restrictions
if (pluginType == "input_file" || pluginType == "input_container_stdio") {
hasFileInput = true;
}
#else
// TODO: remove these special restrictions after all C++ inputs support Go processors
if (pluginType.find("input_file") != string::npos || pluginType.find("input_container_stdio") != string::npos) {
hasFileInput = true;
}
#endif
}
// TODO: remove these special restrictions
if (hasFileInput && (*mDetail)["inputs"].size() > 1) {
Expand Down Expand Up @@ -530,7 +537,9 @@ bool PipelineConfig::Parse() {
}
mRouter.emplace_back(i, itr);
} else {
mRouter.emplace_back(i, nullptr);
if (!IsFlushingThroughGoPipelineExisted()) {
mRouter.emplace_back(i, nullptr);
}
}
}

Expand Down
4 changes: 3 additions & 1 deletion core/file_server/event_handler/LogInput.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -350,7 +350,8 @@ void LogInput::ProcessEvent(EventDispatcher* dispatcher, Event* ev) {

void LogInput::UpdateCriticalMetric(int32_t curTime) {
mLastRunTime->Set(mLastReadEventTime.load());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
LoongCollectorMonitor::GetInstance()->SetAgentOpenFdTotal(
GloablFileDescriptorManager::GetInstance()->GetOpenedFilePtrSize());
mRegisterdHandlersTotal->Set(EventDispatcher::GetInstance()->GetHandlerCount());
mActiveReadersTotal->Set(CheckPointManager::Instance()->GetReaderCount());
mEventProcessCount = 0;
Expand Down Expand Up @@ -529,6 +530,7 @@ Event* LogInput::PopEventQueue() {
#ifdef APSARA_UNIT_TEST_MAIN
void LogInput::CleanEnviroments() {
mIdleFlag = true;
mInteruptFlag = true;
usleep(100 * 1000);
while (true) {
Event* ev = PopEventQueue();
Expand Down
Loading

0 comments on commit ddcb3ee

Please sign in to comment.