Skip to content

Commit

Permalink
refactor sls client manager (#1954)
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 authored Dec 31, 2024
1 parent dc71cb8 commit 62c55e4
Show file tree
Hide file tree
Showing 89 changed files with 2,999 additions and 4,923 deletions.
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ set(SUB_DIRECTORIES_LIST
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
prometheus prometheus/labels prometheus/schedulers prometheus/async prometheus/component
ebpf ebpf/observer ebpf/security ebpf/handler
parser sls_control sdk
parser
)
if (LINUX)
if (ENABLE_ENTERPRISE)
Expand Down
17 changes: 0 additions & 17 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -124,8 +124,6 @@ DECLARE_FLAG_INT32(reader_close_unused_file_time);
DECLARE_FLAG_INT32(batch_send_interval);
DECLARE_FLAG_INT32(batch_send_metric_size);

DECLARE_FLAG_BOOL(send_prefer_real_ip);
DECLARE_FLAG_INT32(send_switch_real_ip_interval);
DECLARE_FLAG_INT32(truncate_pos_skip_bytes);
DECLARE_FLAG_INT32(default_tail_limit_kb);

Expand Down Expand Up @@ -989,26 +987,11 @@ void AppConfig::LoadResourceConf(const Json::Value& confJson) {
mCheckPointFilePath = AbsolutePath(mCheckPointFilePath, mProcessExecutionDir);
LOG_INFO(sLogger, ("logtail checkpoint path", mCheckPointFilePath));

if (confJson.isMember("send_prefer_real_ip") && confJson["send_prefer_real_ip"].isBool()) {
BOOL_FLAG(send_prefer_real_ip) = confJson["send_prefer_real_ip"].asBool();
}

if (confJson.isMember("send_switch_real_ip_interval") && confJson["send_switch_real_ip_interval"].isInt()) {
INT32_FLAG(send_switch_real_ip_interval) = confJson["send_switch_real_ip_interval"].asInt();
}

LoadInt32Parameter(INT32_FLAG(truncate_pos_skip_bytes),
confJson,
"truncate_pos_skip_bytes",
"ALIYUN_LOGTAIL_TRUNCATE_POS_SKIP_BYTES");

if (BOOL_FLAG(send_prefer_real_ip)) {
LOG_INFO(sLogger,
("change send policy, prefer use real ip, switch interval seconds",
INT32_FLAG(send_switch_real_ip_interval))("truncate skip read offset",
INT32_FLAG(truncate_pos_skip_bytes)));
}

if (confJson.isMember("ignore_dir_inode_changed") && confJson["ignore_dir_inode_changed"].isBool()) {
mIgnoreDirInodeChanged = confJson["ignore_dir_inode_changed"].asBool();
}
Expand Down
4 changes: 3 additions & 1 deletion core/app_config/AppConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -316,7 +316,7 @@ class AppConfig {

public:
AppConfig();
~AppConfig(){};
~AppConfig() {};

void LoadInstanceConfig(const std::map<std::string, std::shared_ptr<InstanceConfig>>&);

Expand Down Expand Up @@ -533,6 +533,8 @@ class AppConfig {
friend class InputPrometheusUnittest;
friend class InputContainerStdioUnittest;
friend class BatcherUnittest;
friend class EnterpriseSLSClientManagerUnittest;
friend class FlusherRunnerUnittest;
friend class PipelineUpdateUnittest;
#endif
};
Expand Down
41 changes: 18 additions & 23 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
#include "pipeline/queue/ExactlyOnceQueueManager.h"
#include "pipeline/queue/SenderQueueManager.h"
#include "plugin/flusher/sls/DiskBufferWriter.h"
#include "plugin/flusher/sls/FlusherSLS.h"
#include "plugin/input/InputFeedbackInterfaceRegistry.h"
#include "prometheus/PrometheusInputRunner.h"
#include "runner/FlusherRunner.h"
Expand Down Expand Up @@ -73,9 +74,6 @@ DEFINE_FLAG_INT32(queue_check_gc_interval_sec, "30s", 30);
DEFINE_FLAG_BOOL(enable_cgroup, "", false);
#endif

DECLARE_FLAG_BOOL(send_prefer_real_ip);
DECLARE_FLAG_BOOL(global_network_success);

using namespace std;

namespace logtail {
Expand Down Expand Up @@ -199,11 +197,13 @@ void Application::Start() { // GCOVR_EXCL_START
#if defined(__ENTERPRISE__) && defined(_MSC_VER)
InitWindowsSignalObject();
#endif
BoundedSenderQueueInterface::SetFeedback(ProcessQueueManager::GetInstance());

HttpSink::GetInstance()->Init();
FlusherRunner::GetInstance()->Init();
// resource monitor
// TODO: move metric related initialization to input Init
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();

// config provider
{
// add local config dir
filesystem::path localConfigPath = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir())
Expand All @@ -217,18 +217,23 @@ void Application::Start() { // GCOVR_EXCL_START
}
PipelineConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
}

#ifdef __ENTERPRISE__
EnterpriseConfigProvider::GetInstance()->Start();
LegacyConfigProvider::GetInstance()->Init("legacy");
#else
InitRemoteConfigProviders();
#endif

AlarmManager::GetInstance()->Init();
LoongCollectorMonitor::GetInstance()->Init();
LogtailMonitor::GetInstance()->Init();
// runner
BoundedSenderQueueInterface::SetFeedback(ProcessQueueManager::GetInstance());
HttpSink::GetInstance()->Init();
FlusherRunner::GetInstance()->Init();
ProcessorRunner::GetInstance()->Init();

// flusher_sls resource should be explicitly initialized to allow internal metrics and alarms to be sent
FlusherSLS::InitResource();

// plugin registration
PluginRegistry::GetInstance()->LoadPlugins();
InputFeedbackInterfaceRegistry::GetInstance()->LoadFeedbackInterfaces();

Expand Down Expand Up @@ -258,10 +263,10 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

ProcessorRunner::GetInstance()->Init();
// TODO: this should be refactored to internal pipeline
AlarmManager::GetInstance()->Init();

time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0,
lastCheckTagsTime = 0, lastQueueGCTime = 0;
time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0;
#ifndef LOGTAIL_NO_TC_MALLOC
time_t lastTcmallocReleaseMemTime = 0;
#endif
Expand Down Expand Up @@ -393,16 +398,6 @@ void Application::CheckCriticalCondition(int32_t curTime) {
_exit(1);
}
#endif
// if network is fail in 2 hours, force exit (for ant only)
// work around for no network when docker start
if (BOOL_FLAG(send_prefer_real_ip) && !BOOL_FLAG(global_network_success) && curTime - mStartTime > 7200) {
LOG_ERROR(sLogger, ("network is fail", "prepare force exit"));
AlarmManager::GetInstance()->SendAlarm(LOGTAIL_CRASH_ALARM,
"network is fail since " + ToString(mStartTime) + " force exit");
AlarmManager::GetInstance()->ForceToSend();
sleep(10);
_exit(1);
}
}

bool Application::GetUUIDThread() {
Expand Down
171 changes: 0 additions & 171 deletions core/common/CompressTools.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -15,141 +15,9 @@
#include "CompressTools.h"

#include <lz4/lz4.h>
#ifdef __ANDROID__
#include <zlib.h>
#else
#include <zlib/zlib.h>
#endif
#include <zstd/zstd.h>

#include <cstring>

#include "protobuf/sls/sls_logs.pb.h"

namespace logtail {

const int32_t ZSTD_DEFAULT_LEVEL = 1;

bool UncompressData(sls_logs::SlsCompressType compressType,
const std::string& src,
uint32_t rawSize,
std::string& dst) {
switch (compressType) {
case sls_logs::SLS_CMP_NONE:
dst = src;
return true;
case sls_logs::SLS_CMP_LZ4:
return UncompressLz4(src, rawSize, dst);
case sls_logs::SLS_CMP_DEFLATE:
return UncompressDeflate(src, rawSize, dst);
case sls_logs::SLS_CMP_ZSTD:
return UncompressZstd(src, rawSize, dst);
default:
return false;
}
}

bool CompressData(sls_logs::SlsCompressType compressType, const std::string& src, std::string& dst) {
switch (compressType) {
case sls_logs::SLS_CMP_NONE:
dst = src;
return true;
case sls_logs::SLS_CMP_LZ4:
return CompressLz4(src, dst);
case sls_logs::SLS_CMP_DEFLATE:
return CompressDeflate(src, dst);
case sls_logs::SLS_CMP_ZSTD:
return CompressZstd(src, dst, ZSTD_DEFAULT_LEVEL);
default:
return false;
}
}

bool CompressData(sls_logs::SlsCompressType compressType, const char* src, uint32_t size, std::string& dst) {
switch (compressType) {
case sls_logs::SLS_CMP_NONE: {
dst.assign(src, size);
return true;
}
case sls_logs::SLS_CMP_LZ4:
return CompressLz4(src, size, dst);
case sls_logs::SLS_CMP_DEFLATE:
return CompressDeflate(src, size, dst);
case sls_logs::SLS_CMP_ZSTD:
return CompressZstd(src, size, dst, ZSTD_DEFAULT_LEVEL);
default:
return false;
}
}

bool UncompressLz4(const std::string& src, const uint32_t rawSize, char* dst) {
uint32_t length = 0;
try {
length = LZ4_decompress_safe(src.c_str(), dst, src.length(), rawSize);
} catch (...) {
return false;
}
if (length != rawSize) {
return false;
}
return true;
}

bool UncompressLz4(const char* srcPtr, const uint32_t srcSize, const uint32_t rawSize, std::string& dst) {
dst.resize(rawSize);
char* unCompressed = const_cast<char*>(dst.c_str());
uint32_t length = 0;
try {
length = LZ4_decompress_safe(srcPtr, unCompressed, srcSize, rawSize);
} catch (...) {
return false;
}
if (length != rawSize) {
return false;
}
return true;
}
bool CompressDeflate(const char* srcPtr, const uint32_t srcSize, std::string& dst) {
int64_t dstLen = compressBound(srcSize);
dst.resize(dstLen);
if (compress((Bytef*)(dst.c_str()), (uLongf*)&dstLen, (const Bytef*)srcPtr, srcSize) == Z_OK) {
dst.resize(dstLen);
return true;
}
return false;
}

bool CompressDeflate(const std::string& src, std::string& dst) {
int64_t dstLen = compressBound(src.size());
dst.resize(dstLen);
if (compress((Bytef*)(dst.c_str()), (uLongf*)&dstLen, (const Bytef*)(src.c_str()), src.size()) == Z_OK) {
dst.resize(dstLen);
return true;
}
return false;
}

bool UncompressDeflate(const char* srcPtr, const uint32_t srcSize, const int64_t rawSize, std::string& dst) {
static const int64_t MAX_UMCOMPRESS_SIZE = 128 * 1024 * 1024;
if (rawSize > MAX_UMCOMPRESS_SIZE) {
return false;
}
dst.resize(rawSize);
if (uncompress((Bytef*)(dst.c_str()), (uLongf*)&rawSize, (const Bytef*)(srcPtr), srcSize) != Z_OK) {
return false;
}
return true;
}


bool UncompressDeflate(const std::string& src, const int64_t rawSize, std::string& dst) {
return UncompressDeflate(src.c_str(), src.size(), rawSize, dst);
}


bool UncompressLz4(const std::string& src, const uint32_t rawSize, std::string& dst) {
return UncompressLz4(src.c_str(), src.length(), rawSize, dst);
}
bool CompressLz4(const char* srcPtr, const uint32_t srcSize, std::string& dst) {
uint32_t encodingSize = LZ4_compressBound(srcSize);
dst.resize(encodingSize);
Expand All @@ -169,43 +37,4 @@ bool CompressLz4(const std::string& src, std::string& dst) {
return CompressLz4(src.c_str(), src.length(), dst);
}

bool UncompressZstd(const std::string& src, const uint32_t rawSize, std::string& dst) {
return UncompressZstd(src.c_str(), src.length(), rawSize, dst);
}

bool UncompressZstd(const char* srcPtr, const uint32_t srcSize, const uint32_t rawSize, std::string& dst) {
dst.resize(rawSize);
char* unCompressed = const_cast<char*>(dst.c_str());
uint32_t length = 0;
try {
length = ZSTD_decompress(unCompressed, rawSize, srcPtr, srcSize);
} catch (...) {
return false;
}
if (length != rawSize) {
return false;
}
return true;
}

bool CompressZstd(const char* srcPtr, const uint32_t srcSize, std::string& dst, int32_t level) {
uint32_t encodingSize = ZSTD_compressBound(srcSize);
dst.resize(encodingSize);
char* compressed = const_cast<char*>(dst.c_str());
try {
size_t const cmp_size = ZSTD_compress(compressed, encodingSize, srcPtr, srcSize, level);
if (ZSTD_isError(cmp_size)) {
return false;
}
dst.resize(cmp_size);
return true;
} catch (...) {
}
return false;
}

bool CompressZstd(const std::string& src, std::string& dst, int32_t level) {
return CompressZstd(src.c_str(), src.length(), dst, level);
}

} // namespace logtail
28 changes: 3 additions & 25 deletions core/common/CompressTools.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,36 +15,14 @@
*/

#pragma once
#include <string>
#include <cstdint>
#include "protobuf/sls/sls_logs.pb.h"

namespace logtail {

extern const int32_t ZSTD_DEFAULT_LEVEL;

bool UncompressData(sls_logs::SlsCompressType compressType, const std::string& src, uint32_t rawSize, std::string& dst);

bool CompressData(sls_logs::SlsCompressType compressType, const std::string& src, std::string& dst);
bool CompressData(sls_logs::SlsCompressType compressType, const char* src, uint32_t size, std::string& dst);

bool UncompressDeflate(const std::string& src, const int64_t rawSize, std::string& dst);
bool UncompressDeflate(const char* srcPtr, const uint32_t srcSize, const int64_t rawSize, std::string& dst);
#include <cstdint>

bool CompressDeflate(const std::string& src, std::string& dst);
bool CompressDeflate(const char* srcPtr, const uint32_t srcSize, std::string& dst);
#include <string>

bool UncompressLz4(const std::string& src, const uint32_t rawSize, std::string& dst);
bool UncompressLz4(const std::string& src, const uint32_t rawSize, char* dst);
bool UncompressLz4(const char* srcPtr, const uint32_t srcSize, const uint32_t rawSize, std::string& dst);
namespace logtail {

bool CompressLz4(const std::string& src, std::string& dst);
bool CompressLz4(const char* srcPtr, const uint32_t srcSize, std::string& dest);

bool UncompressZstd(const std::string& src, const uint32_t rawSize, std::string& dst);
bool UncompressZstd(const char* srcPtr, const uint32_t srcSize, const uint32_t rawSize, std::string& dst);

bool CompressZstd(const char* srcPtr, const uint32_t srcSize, std::string& dst, int32_t level);
bool CompressZstd(const std::string& src, std::string& dst, int32_t level);

} // namespace logtail
Loading

0 comments on commit 62c55e4

Please sign in to comment.