Skip to content

Commit

Permalink
fix
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Dec 27, 2024
1 parent 6f8d8d4 commit f7b71ab
Show file tree
Hide file tree
Showing 20 changed files with 114 additions and 333 deletions.
89 changes: 0 additions & 89 deletions core/common/MachineInfoUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,6 @@

#include <string.h>

#include <boost/filesystem.hpp>

#include "AppConfig.h"
#include "common/UUIDUtil.h"
#if defined(__linux__)
#include <arpa/inet.h>
#include <ifaddrs.h>
Expand All @@ -43,12 +39,10 @@

#include <thread>

#include "FileSystemUtil.h"
#include "StringTools.h"
#include "common/FileSystemUtil.h"
#include "logger/Logger.h"

DEFINE_FLAG_STRING(agent_host_id, "", "");

#if defined(_MSC_VER)
typedef LONG NTSTATUS, *PNTSTATUS;
Expand Down Expand Up @@ -503,89 +497,6 @@ size_t FetchECSMetaCallback(char* buffer, size_t size, size_t nmemb, std::string
return sizes;
}

std::string GetSerialNumberFromEcsAssist(const std::string& machineIdFile) {
std::string sn;
if (CheckExistance(machineIdFile)) {
if (!ReadFileContent(machineIdFile, sn)) {
return "";
}
}
return sn;
}

static std::string GetEcsAssistMachineIdFile() {
#if defined(WIN32)
return "C:\\ProgramData\\aliyun\\assist\\hybrid\\machine-id";
#else
return "/usr/local/share/aliyun-assist/hybrid/machine-id";
#endif
}

std::string GetSerialNumberFromEcsAssist() {
return GetSerialNumberFromEcsAssist(GetEcsAssistMachineIdFile());
}

std::string RandomHostid() {
static std::string hostId = CalculateRandomUUID();
return hostId;
}

const std::string GetLocalHostId() {
static std::string fileName = AppConfig::GetInstance()->GetLoongcollectorConfDir() + PATH_SEPARATOR + "host_id";
std::string hostId;
if (CheckExistance(fileName)) {
if (!ReadFileContent(fileName, hostId)) {
hostId = "";
}
}
if (hostId.empty()) {
hostId = RandomHostid();

LOG_INFO(sLogger, ("save hostId file to local file system, hostId", hostId));
int fd = open(fileName.c_str(), O_CREAT | O_EXCL | O_WRONLY, 0755);
if (fd == -1) {
int savedErrno = errno;
if (savedErrno != EEXIST) {
LOG_ERROR(sLogger, ("save hostId file fail", fileName)("errno", strerror(savedErrno)));
}
} else {
// 文件成功创建,现在写入hostId
ssize_t written = write(fd, hostId.c_str(), hostId.length());
if (written == static_cast<ssize_t>(hostId.length())) {
LOG_INFO(sLogger, ("hostId saved successfully to", fileName));
} else {
int writeErrno = errno;
LOG_ERROR(sLogger, ("Failed to write hostId to file", fileName)("errno", strerror(writeErrno)));
}
close(fd);
}
}
return hostId;
}

std::string FetchHostId() {
static std::string hostId;
if (!hostId.empty()) {
return hostId;
}
ECSMeta meta = FetchECSMeta();
hostId = meta.instanceID;
if (!hostId.empty()) {
return hostId;
}
hostId = GetSerialNumberFromEcsAssist();
if (!hostId.empty()) {
return hostId;
}
hostId = STRING_FLAG(agent_host_id);
if (!hostId.empty()) {
return hostId;
}
hostId = GetLocalHostId();

return hostId;
}

ECSMeta FetchECSMeta() {
CURL* curl;
for (size_t retryTimes = 1; retryTimes <= 5; retryTimes++) {
Expand Down
1 change: 0 additions & 1 deletion core/common/MachineInfoUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,6 @@ bool GetKernelInfo(std::string& kernelRelease, int64_t& kernelVersion);
bool GetRedHatReleaseInfo(std::string& os, int64_t& osVersion, std::string bashPath = "");
bool IsDigitsDotsHostname(const char* hostname);
ECSMeta FetchECSMeta();
std::string FetchHostId();

// GetAnyAvailableIP walks through all interfaces (AF_INET) to find an available IP.
// Priority:
Expand Down
5 changes: 2 additions & 3 deletions core/constants/EntityConstants.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@

namespace logtail {

const std::string DEFAULT_ENV_KEY_HOST_TYPE = "HOST_TYPE";
const std::string DEFAULT_ENV_VALUE_ECS = "ecs";
const std::string DEFAULT_ENV_VALUE_HOST = "host";
const std::string DEFAULT_HOST_TYPE_ECS = "ecs";
const std::string DEFAULT_HOST_TYPE_HOST = "host";
const std::string DEFAULT_CONTENT_KEY_ENTITY_TYPE = "__entity_type__";
const std::string DEFAULT_CONTENT_KEY_ENTITY_ID = "__entity_id__";
const std::string DEFAULT_CONTENT_KEY_DOMAIN = "__domain__";
Expand Down
4 changes: 2 additions & 2 deletions core/constants/EntityConstants.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
namespace logtail {

extern const std::string DEFAULT_ENV_KEY_HOST_TYPE;
extern const std::string DEFAULT_ENV_VALUE_ECS;
extern const std::string DEFAULT_ENV_VALUE_HOST;
extern const std::string DEFAULT_HOST_TYPE_ECS;
extern const std::string DEFAULT_HOST_TYPE_HOST;
extern const std::string DEFAULT_CONTENT_KEY_ENTITY_TYPE;
extern const std::string DEFAULT_CONTENT_KEY_ENTITY_ID;
extern const std::string DEFAULT_CONTENT_KEY_DOMAIN;
Expand Down
99 changes: 45 additions & 54 deletions core/host_monitor/HostMonitorInputRunner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@

#include "HostMonitorInputRunner.h"

#include <future>
#include <algorithm>
#include <memory>
#include <mutex>
#include <shared_mutex>
Expand All @@ -26,7 +26,6 @@

#include "HostMonitorTimerEvent.h"
#include "ProcessEntityCollector.h"
#include "common/Lock.h"
#include "common/timer/Timer.h"
#include "host_monitor/collector/ProcessEntityCollector.h"
#include "logger/Logger.h"
Expand All @@ -39,33 +38,33 @@ HostMonitorInputRunner::HostMonitorInputRunner() : mThreadPool(ThreadPool(3)) {
RegisterCollector<ProcessEntityCollector>();
}

void HostMonitorInputRunner::UpdateCollector(const std::string& configName,
const std::vector<std::string>& newCollectors,
void HostMonitorInputRunner::UpdateCollector(const std::vector<std::string>& newCollectors,
QueueKey processQueueKey,
int inputIndex) {
std::vector<std::string> oldCollectors;
{
std::unique_lock lock(mCollectorRegisterMapMutex);
auto it = mCollectorRegisterMap.find(configName);
if (it != mCollectorRegisterMap.end()) {
oldCollectors = it->second;
}
mCollectorRegisterMap[configName] = newCollectors;
}
for (const auto& collectorName : newCollectors) {
LOG_INFO(sLogger, ("add new host monitor collector", configName)("collector", collectorName));
HostMonitorTimerEvent::CollectConfig collectConfig(
configName, collectorName, processQueueKey, inputIndex, std::chrono::seconds(DEFAULT_SCHEDULE_INTERVAL));
// only push event when the collector is new added
if (std::find(oldCollectors.begin(), oldCollectors.end(), collectorName) == oldCollectors.end()) {
Timer::GetInstance()->PushEvent(BuildTimerEvent(collectConfig));
std::unique_lock lock(mRegisteredCollectorMapMutex);
for (auto& collector : newCollectors) {
auto oldCollector = mRegisteredCollectorMap.find(collector);
if (oldCollector == mRegisteredCollectorMap.end()) {
mRegisteredCollectorMap[collector] = true;
HostMonitorTimerEvent::CollectConfig collectConfig(
collector, processQueueKey, inputIndex, std::chrono::seconds(DEFAULT_SCHEDULE_INTERVAL));
auto now = std::chrono::steady_clock::now();
auto event = std::make_unique<HostMonitorTimerEvent>(now + collectConfig.mInterval, collectConfig);
Timer::GetInstance()->PushEvent(std::move(event));
} else {
// config removed and added again, timer event is still in the queue
if (!oldCollector->second) {
oldCollector->second = true;
}
}
}
}

void HostMonitorInputRunner::RemoveCollector(const std::string& configName) {
std::unique_lock lock(mCollectorRegisterMapMutex);
mCollectorRegisterMap.erase(configName);
void HostMonitorInputRunner::RemoveCollector() {
std::unique_lock lock(mRegisteredCollectorMapMutex);
for (auto& collector : mRegisteredCollectorMap) {
collector.second = false;
}
}

void HostMonitorInputRunner::Init() {
Expand All @@ -92,59 +91,51 @@ void HostMonitorInputRunner::Stop() {
}

bool HostMonitorInputRunner::HasRegisteredPlugins() const {
std::shared_lock lock(mCollectorRegisterMapMutex);
return !mCollectorRegisterMap.empty();
}

bool HostMonitorInputRunner::IsCollectTaskValid(const std::string& configName, const std::string& collectorName) const {
std::shared_lock lock(mCollectorRegisterMapMutex);
auto collectors = mCollectorRegisterMap.find(configName);
if (collectors == mCollectorRegisterMap.end()) {
return false;
}
for (const auto& collectorName : collectors->second) {
if (collectorName == collectorName) {
std::shared_lock lock(mRegisteredCollectorMapMutex);
for (auto& collector : mRegisteredCollectorMap) {
if (collector.second) {
return true;
}
}
return false;
}

bool HostMonitorInputRunner::IsCollectTaskValid(const std::string& configName, const std::string& collectorName) {
std::unique_lock lock(mRegisteredCollectorMapMutex);
auto it = mRegisteredCollectorMap.find(collectorName);
if (it != mRegisteredCollectorMap.end() && it->second) {
return true;
}
return false;
}

void HostMonitorInputRunner::ScheduleOnce(HostMonitorTimerEvent::CollectConfig& config) {
auto collectFn = [this, config]() mutable {
PipelineEventGroup group(std::make_shared<SourceBuffer>());
auto collector = GetCollector(config.mCollectorName);
if (!collector) {
if (collector) {
collector->Collect(group);
} else {
LOG_ERROR(sLogger,
("collector not found, will not collect",
("collector not found, will not collect again",
"discard data")("config", config.mConfigName)("collector", config.mCollectorName));
return;
}

bool result = ProcessorRunner::GetInstance()->PushQueue(
config.mProcessQueueKey, config.mInputIndex, std::move(group), 3);
if (!result) {
LOG_WARNING(sLogger,
("push queue failed", "discard data")("config", config.mConfigName)("collector",
config.mCollectorName));
if (group.GetEvents().size() > 0) {
bool result = ProcessorRunner::GetInstance()->PushQueue(
config.mProcessQueueKey, config.mInputIndex, std::move(group));
if (!result) {
// there is no process for host monitor, so should not block in process queue
LOG_ERROR(sLogger,
("host monitor push process queue failed",
"discard data")("config", config.mConfigName)("collector", config.mCollectorName));
}
}
LOG_DEBUG(sLogger,
("schedule host monitor collector again", config.mConfigName)("collector", config.mCollectorName));
auto event = BuildTimerEvent(config);
event->ResetForNextExec();
Timer::GetInstance()->PushEvent(std::move(event));
};
mThreadPool.Add(collectFn);
}

std::unique_ptr<HostMonitorTimerEvent>
HostMonitorInputRunner::BuildTimerEvent(HostMonitorTimerEvent::CollectConfig& collectConfig) {
auto now = std::chrono::steady_clock::now();
auto event = std::make_unique<HostMonitorTimerEvent>(now, collectConfig);
return event;
}

std::shared_ptr<BaseCollector> HostMonitorInputRunner::GetCollector(const std::string& collectorName) {
auto it = mCollectorInstanceMap.find(collectorName);
Expand Down
17 changes: 8 additions & 9 deletions core/host_monitor/HostMonitorInputRunner.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
#include <shared_mutex>
#include <string>
#include <unordered_map>
#include <unordered_set>
#include <utility>
#include <vector>

#include "common/ThreadPool.h"
Expand All @@ -46,22 +48,19 @@ class HostMonitorInputRunner : public InputRunner {
return &sInstance;
}

void UpdateCollector(const std::string& configName,
const std::vector<std::string>& collectorNames,
QueueKey processQueueKey,
int inputIndex);
void RemoveCollector(const std::string& configName);
// Only support singleton mode
void UpdateCollector(const std::vector<std::string>& collectorNames, QueueKey processQueueKey, int inputIndex);
void RemoveCollector();

void Init() override;
void Stop() override;
bool HasRegisteredPlugins() const override;

bool IsCollectTaskValid(const std::string& configName, const std::string& collectorName) const;
bool IsCollectTaskValid(const std::string& configName, const std::string& collectorName);
void ScheduleOnce(HostMonitorTimerEvent::CollectConfig& collectConfig);

private:
HostMonitorInputRunner();
std::unique_ptr<HostMonitorTimerEvent> BuildTimerEvent(HostMonitorTimerEvent::CollectConfig& collectConfig);

template <typename T>
void RegisterCollector();
Expand All @@ -71,8 +70,8 @@ class HostMonitorInputRunner : public InputRunner {

ThreadPool mThreadPool;

mutable std::shared_mutex mCollectorRegisterMapMutex;
std::unordered_map<std::string, std::vector<std::string>> mCollectorRegisterMap;
mutable std::shared_mutex mRegisteredCollectorMapMutex;
std::unordered_map<std::string, bool> mRegisteredCollectorMap;
std::unordered_map<std::string, std::shared_ptr<BaseCollector>> mCollectorInstanceMap;

#ifdef APSARA_UNIT_TEST_MAIN
Expand Down
8 changes: 5 additions & 3 deletions core/host_monitor/HostMonitorTimerEvent.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,8 @@

#include "HostMonitorTimerEvent.h"

#include <utility>

#include "HostMonitorInputRunner.h"
#include "common/timer/Timer.h"
#include "host_monitor/HostMonitorInputRunner.h"

namespace logtail {

Expand All @@ -28,7 +27,10 @@ bool HostMonitorTimerEvent::IsValid() const {
}

bool HostMonitorTimerEvent::Execute() {
LOG_DEBUG(sLogger, ("schedule host monitor collector", mCollectConfig.mConfigName));
HostMonitorInputRunner::GetInstance()->ScheduleOnce(mCollectConfig);
auto event = std::make_unique<HostMonitorTimerEvent>(GetExecTime() + mCollectConfig.mInterval, mCollectConfig);
Timer::GetInstance()->PushEvent(std::move(event));
return true;
}

Expand Down
Loading

0 comments on commit f7b71ab

Please sign in to comment.