Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add InputInternalAlarms #2061

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,6 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

// TODO: this should be refactored to internal pipeline
AlarmManager::GetInstance()->Init();

time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0;
#ifndef LOGTAIL_NO_TC_MALLOC
time_t lastTcmallocReleaseMemTime = 0;
Expand Down Expand Up @@ -373,7 +370,6 @@ void Application::Exit() {

LogtailMonitor::GetInstance()->Stop();
LoongCollectorMonitor::GetInstance()->Stop();
AlarmManager::GetInstance()->Stop();
LogtailPlugin::GetInstance()->StopBuiltInModules();
// from now on, alarm should not be used.

Expand Down
2 changes: 2 additions & 0 deletions core/collection_pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "plugin/input/InputPrometheus.h"
#if defined(__linux__) && !defined(__ANDROID__)
#include "plugin/input/InputFileSecurity.h"
#include "plugin/input/InputInternalAlarms.h"
#include "plugin/input/InputInternalMetrics.h"
#include "plugin/input/InputNetworkObserver.h"
#include "plugin/input/InputNetworkSecurity.h"
Expand Down Expand Up @@ -129,6 +130,7 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const {
void PluginRegistry::LoadStaticPlugins() {
RegisterInputCreator(new StaticInputCreator<InputFile>());
RegisterInputCreator(new StaticInputCreator<InputPrometheus>());
RegisterInputCreator(new StaticInputCreator<InputInternalAlarms>(), true);
RegisterInputCreator(new StaticInputCreator<InputInternalMetrics>(), true);
#if defined(__linux__) && !defined(__ANDROID__)
RegisterInputCreator(new StaticInputCreator<InputContainerStdio>());
Expand Down
127 changes: 19 additions & 108 deletions core/monitor/AlarmManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,49 +107,12 @@ AlarmManager::AlarmManager() {
mMessageType[REGISTER_HANDLERS_TOO_SLOW_ALARM] = "REGISTER_HANDLERS_TOO_SLOW_ALARM";
}

void AlarmManager::Init() {
mThreadRes = async(launch::async, &AlarmManager::SendAlarmLoop, this);
}

void AlarmManager::Stop() {
ForceToSend();
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("alarm gathering", "stopped successfully"));
} else {
LOG_WARNING(sLogger, ("alarm gathering", "forced to stopped"));
}
}

bool AlarmManager::SendAlarmLoop() {
LOG_INFO(sLogger, ("alarm gathering", "started"));
{
unique_lock<mutex> lock(mThreadRunningMux);
while (mIsThreadRunning) {
SendAllRegionAlarm();
if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) {
break;
}
}
}
SendAllRegionAlarm();
return true;
}

void AlarmManager::SendAllRegionAlarm() {
void AlarmManager::FlushAllRegionAlarm(vector<PipelineEventGroup> &pipelineEventGroupList) {
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
int32_t currentTime = time(nullptr);
size_t sendRegionIndex = 0;
size_t sendAlarmTypeIndex = 0;
do {
LogGroup logGroup;
PipelineEventGroup pipelineEventGroup(std::make_shared<SourceBuffer>());
string region;
{
PTScopedLock lock(mAlarmBufferMutex);
Expand All @@ -163,7 +126,8 @@ void AlarmManager::SendAllRegionAlarm() {
++allAlarmIter;
}
region = allAlarmIter->first;
// LOG_DEBUG(sLogger, ("1Send Alarm", region)("region", sendRegionIndex));
pipelineEventGroup.SetTag("__region__", region);
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved

AlarmVector& alarmBufferVec = *(allAlarmIter->second.first);
std::vector<int32_t>& lastUpdateTimeVec = allAlarmIter->second.second;
// check this region end
Expand All @@ -173,8 +137,7 @@ void AlarmManager::SendAllRegionAlarm() {
sendAlarmTypeIndex = 0;
continue;
}
// LOG_DEBUG(sLogger, ("2Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));

// check valid
if (alarmBufferVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM
|| lastUpdateTimeVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM) {
Expand All @@ -187,98 +150,46 @@ void AlarmManager::SendAllRegionAlarm() {
continue;
}

// LOG_DEBUG(sLogger, ("3Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
map<string, unique_ptr<AlarmMessage>>& alarmMap = alarmBufferVec[sendAlarmTypeIndex];
if (alarmMap.size() == 0
|| currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) {
// go next alarm type
++sendAlarmTypeIndex;
continue;
}
// check sender queue status, if invalid jump this region

string project = GetProfileSender()->GetProfileProjectName(region);
QueueKey alarmPrjLogstoreKey
= QueueKeyManager::GetInstance()->GetKey("-flusher_sls-" + project + "#" + ALARM_SLS_LOGSTORE_NAME);
if (SenderQueueManager::GetInstance()->GetQueue(alarmPrjLogstoreKey) == nullptr) {
CollectionPipelineContext ctx;
SenderQueueManager::GetInstance()->CreateQueue(
alarmPrjLogstoreKey,
"self_monitor",
ctx,
{{"region", FlusherSLS::GetRegionConcurrencyLimiter(region)},
{"project", FlusherSLS::GetProjectConcurrencyLimiter(project)},
{"logstore", FlusherSLS::GetLogstoreConcurrencyLimiter(project, ALARM_SLS_LOGSTORE_NAME)}});
}
if (!SenderQueueManager::GetInstance()->IsValidToPush(alarmPrjLogstoreKey)) {
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

// LOG_DEBUG(sLogger, ("4Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
logGroup.set_source(LoongCollectorMonitor::mIpAddr);
logGroup.set_category(ALARM_SLS_LOGSTORE_NAME);
pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr);
pipelineEventGroup.SetTag(LOG_RESERVED_KEY_TOPIC, "__alarm__");
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
auto now = GetCurrentLogtailTime();
for (map<string, unique_ptr<AlarmMessage>>::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end();
++mapIter) {
auto& messagePtr = mapIter->second;

// LOG_DEBUG(sLogger, ("5Send Alarm", region)("region", sendRegionIndex)("alarm index",
// sendAlarmTypeIndex)("msg", messagePtr->mMessage));

Log* logPtr = logGroup.add_logs();
SetLogTime(logPtr,
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta()
LogEvent* logEvent = pipelineEventGroup.AddLogEvent();
logEvent->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta()
: now.tv_sec);
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_type");
contentPtr->set_value(messagePtr->mMessageType);

contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_message");
contentPtr->set_value(messagePtr->mMessage);

contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_count");
contentPtr->set_value(ToString(messagePtr->mCount));

contentPtr = logPtr->add_contents();
contentPtr->set_key("ip");
contentPtr->set_value(LoongCollectorMonitor::mIpAddr);

contentPtr = logPtr->add_contents();
contentPtr->set_key("os");
contentPtr->set_value(OS_NAME);

contentPtr = logPtr->add_contents();
contentPtr->set_key("ver");
contentPtr->set_value(ILOGTAIL_VERSION);

logEvent->SetContent("alarm_type", messagePtr->mMessageType);
logEvent->SetContent("alarm_message", messagePtr->mMessage);
logEvent->SetContent("alarm_count", ToString(messagePtr->mCount));
logEvent->SetContent("ip", LoongCollectorMonitor::mIpAddr);
logEvent->SetContent("os", OS_NAME);
logEvent->SetContent("ver", string(ILOGTAIL_VERSION));
if (!messagePtr->mProjectName.empty()) {
contentPtr = logPtr->add_contents();
contentPtr->set_key("project_name");
contentPtr->set_value(messagePtr->mProjectName);
logEvent->SetContent("project_name", messagePtr->mProjectName);
}

if (!messagePtr->mCategory.empty()) {
contentPtr = logPtr->add_contents();
contentPtr->set_key("category");
contentPtr->set_value(messagePtr->mCategory);
logEvent->SetContent("category", messagePtr->mCategory);
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
}
}
lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime;
alarmMap.clear();
++sendAlarmTypeIndex;
}
if (logGroup.logs_size() <= 0) {
if (pipelineEventGroup.GetEvents().size() <= 0) {
continue;
}
// this is an anonymous send and non lock send
GetProfileSender()->SendToProfileProject(region, logGroup);
pipelineEventGroupList.emplace_back(std::move(pipelineEventGroup));
} while (true);
}

Expand Down
8 changes: 3 additions & 5 deletions core/monitor/AlarmManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <vector>

#include "common/Lock.h"
#include "models/PipelineEventGroup.h"

namespace logtail {

Expand Down Expand Up @@ -122,9 +123,6 @@ class AlarmManager {
return &instance;
}

void Init();
void Stop();

void SendAlarm(const AlarmType alarmType,
const std::string& message,
const std::string& projectName = "",
Expand All @@ -134,16 +132,16 @@ class AlarmManager {
void ForceToSend();
bool IsLowLevelAlarmValid();

void FlushAllRegionAlarm(std::vector<PipelineEventGroup> &pipelineEventGroupList);

private:
using AlarmVector = std::vector<std::map<std::string, std::unique_ptr<AlarmMessage>>>;

AlarmManager();
~AlarmManager() = default;

bool SendAlarmLoop();
// without lock
AlarmVector* MakesureLogtailAlarmMapVecUnlocked(const std::string& region);
void SendAllRegionAlarm();

std::future<bool> mThreadRes;
std::mutex mThreadRunningMux;
Expand Down
38 changes: 34 additions & 4 deletions core/monitor/SelfMonitorServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@ void SelfMonitorServer::Init() {
void SelfMonitorServer::Monitor() {
LOG_INFO(sLogger, ("self-monitor", "started"));
int32_t lastMonitorTime = time(NULL);
int32_t lastAlarmTime = time(NULL);
{
unique_lock<mutex> lock(mThreadRunningMux);
while (mIsThreadRunning) {
if (mStopCV.wait_for(lock, std::chrono::seconds(1), [this]() { return !mIsThreadRunning; })) {
break;
}
int32_t monitorTime = time(NULL);
if ((monitorTime - lastMonitorTime) >= 60) { // 60s
lastMonitorTime = monitorTime;
int32_t nowTime = time(NULL);
if ((nowTime - lastMonitorTime) >= 60) { // 60s
lastMonitorTime = nowTime;
SendMetrics();
}
if ((nowTime - lastAlarmTime) >= 3) { // 3s
lastAlarmTime = nowTime;
SendAlarms();
}
}
Expand All @@ -59,6 +63,7 @@ void SelfMonitorServer::Monitor() {
}
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved

void SelfMonitorServer::Stop() {
AlarmManager::GetInstance()->ForceToSend();
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
Expand Down Expand Up @@ -170,11 +175,36 @@ void SelfMonitorServer::ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEve
}

void SelfMonitorServer::UpdateAlarmPipeline(CollectionPipelineContext* ctx) {
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
lock_guard<mutex> lock(mAlarmPipelineMux);
WriteLock lock(mAlarmPipelineMux);
mAlarmPipelineCtx = ctx;
LOG_INFO(sLogger, ("self-monitor alarms pipeline", "updated"));
}

void SelfMonitorServer::RemoveAlarmPipeline() {
WriteLock lock(mAlarmPipelineMux);
mAlarmPipelineCtx = nullptr;
LOG_INFO(sLogger, ("self-monitor alarms pipeline", "removed"));
}

void SelfMonitorServer::SendAlarms() {
ReadLock lock(mAlarmPipelineMux);
if (mAlarmPipelineCtx == nullptr) {
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
// tags: __topic__:__alarm__, __region__:${region}
vector<PipelineEventGroup> pipelineEventGroupList;
AlarmManager::GetInstance()->FlushAllRegionAlarm(pipelineEventGroupList);

shared_ptr<CollectionPipeline> pipeline
= CollectionPipelineManager::GetInstance()->FindConfigByName(mAlarmPipelineCtx->GetConfigName());
if (pipeline.get() != nullptr) {
for (auto& pipelineEventGroup : pipelineEventGroupList) {
if (pipelineEventGroup.GetEvents().size() > 0) {
ProcessorRunner::GetInstance()->PushQueue(
pipeline->GetContext().GetProcessQueueKey(), 0, std::move(pipelineEventGroup));
}
}
}
}

} // namespace logtail
9 changes: 6 additions & 3 deletions core/monitor/SelfMonitorServer.h
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@ class SelfMonitorServer {

void UpdateMetricPipeline(CollectionPipelineContext* ctx, SelfMonitorMetricRules* rules);
void RemoveMetricPipeline();
void UpdateAlarmPipeline(CollectionPipelineContext* ctx); // Todo
void UpdateAlarmPipeline(CollectionPipelineContext* ctx);
void RemoveAlarmPipeline();
private:
SelfMonitorServer();
~SelfMonitorServer() = default;
Expand All @@ -44,20 +45,22 @@ class SelfMonitorServer {
bool mIsThreadRunning = true;
std::condition_variable mStopCV;

// metrics
void SendMetrics();
bool ProcessSelfMonitorMetricEvent(SelfMonitorMetricEvent& event, const SelfMonitorMetricRule& rule);
void PushSelfMonitorMetricEvents(std::vector<SelfMonitorMetricEvent>& events);
void ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEventGroup);

mutable ReadWriteLock mMetricPipelineLock;
CollectionPipelineContext* mMetricPipelineCtx = nullptr;
SelfMonitorMetricRules* mSelfMonitorMetricRules = nullptr;
SelfMonitorMetricEventMap mSelfMonitorMetricEventMap;
mutable ReadWriteLock mMetricPipelineLock;

// alarms
void SendAlarms();

mutable ReadWriteLock mAlarmPipelineMux;
CollectionPipelineContext* mAlarmPipelineCtx;
std::mutex mAlarmPipelineMux;
#ifdef APSARA_UNIT_TEST_MAIN
friend class InputInternalMetricsUnittest;
#endif
Expand Down
Loading
Loading