Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add InputInternalAlarms #2061

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 0 additions & 4 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -268,9 +268,6 @@ void Application::Start() { // GCOVR_EXCL_START
LogtailPlugin::GetInstance()->LoadPluginBase();
}

// TODO: this should be refactored to internal pipeline
AlarmManager::GetInstance()->Init();

time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0;
#ifndef LOGTAIL_NO_TC_MALLOC
time_t lastTcmallocReleaseMemTime = 0;
Expand Down Expand Up @@ -373,7 +370,6 @@ void Application::Exit() {

LogtailMonitor::GetInstance()->Stop();
LoongCollectorMonitor::GetInstance()->Stop();
AlarmManager::GetInstance()->Stop();
LogtailPlugin::GetInstance()->StopBuiltInModules();
// from now on, alarm should not be used.

Expand Down
1 change: 1 addition & 0 deletions core/collection_pipeline/CollectionPipeline.h
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ class CollectionPipeline {
friend class PipelineUnittest;
friend class InputContainerStdioUnittest;
friend class InputFileUnittest;
friend class InputInternalAlarmsUnittest;
friend class InputInternalMetricsUnittest;
friend class InputPrometheusUnittest;
friend class ProcessorTagNativeUnittest;
Expand Down
2 changes: 2 additions & 0 deletions core/collection_pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
#include "plugin/input/InputPrometheus.h"
#if defined(__linux__) && !defined(__ANDROID__)
#include "plugin/input/InputFileSecurity.h"
#include "plugin/input/InputInternalAlarms.h"
#include "plugin/input/InputInternalMetrics.h"
#include "plugin/input/InputNetworkObserver.h"
#include "plugin/input/InputNetworkSecurity.h"
Expand Down Expand Up @@ -129,6 +130,7 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const {
void PluginRegistry::LoadStaticPlugins() {
RegisterInputCreator(new StaticInputCreator<InputFile>());
RegisterInputCreator(new StaticInputCreator<InputPrometheus>());
RegisterInputCreator(new StaticInputCreator<InputInternalAlarms>(), true);
RegisterInputCreator(new StaticInputCreator<InputInternalMetrics>(), true);
#if defined(__linux__) && !defined(__ANDROID__)
RegisterInputCreator(new StaticInputCreator<InputContainerStdio>());
Expand Down
2 changes: 2 additions & 0 deletions core/models/PipelineEventGroup.h
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,8 @@ enum class EventGroupMetaKey {
PROMETHEUS_STREAM_ID,
PROMETHEUS_STREAM_TOTAL,

TARGET_REGION,

SOURCE_ID
};

Expand Down
129 changes: 20 additions & 109 deletions core/monitor/AlarmManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -107,49 +107,12 @@ AlarmManager::AlarmManager() {
mMessageType[REGISTER_HANDLERS_TOO_SLOW_ALARM] = "REGISTER_HANDLERS_TOO_SLOW_ALARM";
}

void AlarmManager::Init() {
mThreadRes = async(launch::async, &AlarmManager::SendAlarmLoop, this);
}

void AlarmManager::Stop() {
ForceToSend();
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
}
mStopCV.notify_one();
if (!mThreadRes.valid()) {
return;
}
future_status s = mThreadRes.wait_for(chrono::seconds(1));
if (s == future_status::ready) {
LOG_INFO(sLogger, ("alarm gathering", "stopped successfully"));
} else {
LOG_WARNING(sLogger, ("alarm gathering", "forced to stopped"));
}
}

bool AlarmManager::SendAlarmLoop() {
LOG_INFO(sLogger, ("alarm gathering", "started"));
{
unique_lock<mutex> lock(mThreadRunningMux);
while (mIsThreadRunning) {
SendAllRegionAlarm();
if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) {
break;
}
}
}
SendAllRegionAlarm();
return true;
}

void AlarmManager::SendAllRegionAlarm() {
void AlarmManager::FlushAllRegionAlarm(vector<PipelineEventGroup>& pipelineEventGroupList) {
int32_t currentTime = time(nullptr);
size_t sendRegionIndex = 0;
size_t sendAlarmTypeIndex = 0;
do {
LogGroup logGroup;
PipelineEventGroup pipelineEventGroup(std::make_shared<SourceBuffer>());
string region;
{
PTScopedLock lock(mAlarmBufferMutex);
Expand All @@ -163,7 +126,8 @@ void AlarmManager::SendAllRegionAlarm() {
++allAlarmIter;
}
region = allAlarmIter->first;
// LOG_DEBUG(sLogger, ("1Send Alarm", region)("region", sendRegionIndex));
pipelineEventGroup.SetMetadata(EventGroupMetaKey::TARGET_REGION, region);

AlarmVector& alarmBufferVec = *(allAlarmIter->second.first);
std::vector<int32_t>& lastUpdateTimeVec = allAlarmIter->second.second;
// check this region end
Expand All @@ -173,8 +137,7 @@ void AlarmManager::SendAllRegionAlarm() {
sendAlarmTypeIndex = 0;
continue;
}
// LOG_DEBUG(sLogger, ("2Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));

// check valid
if (alarmBufferVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM
|| lastUpdateTimeVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM) {
Expand All @@ -187,98 +150,46 @@ void AlarmManager::SendAllRegionAlarm() {
continue;
}

// LOG_DEBUG(sLogger, ("3Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
map<string, unique_ptr<AlarmMessage>>& alarmMap = alarmBufferVec[sendAlarmTypeIndex];
if (alarmMap.size() == 0
|| currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) {
// go next alarm type
++sendAlarmTypeIndex;
continue;
}
// check sender queue status, if invalid jump this region

string project = GetProfileSender()->GetProfileProjectName(region);
QueueKey alarmPrjLogstoreKey
= QueueKeyManager::GetInstance()->GetKey("-flusher_sls-" + project + "#" + ALARM_SLS_LOGSTORE_NAME);
if (SenderQueueManager::GetInstance()->GetQueue(alarmPrjLogstoreKey) == nullptr) {
CollectionPipelineContext ctx;
SenderQueueManager::GetInstance()->CreateQueue(
alarmPrjLogstoreKey,
"self_monitor",
ctx,
{{"region", FlusherSLS::GetRegionConcurrencyLimiter(region)},
{"project", FlusherSLS::GetProjectConcurrencyLimiter(project)},
{"logstore", FlusherSLS::GetLogstoreConcurrencyLimiter(project, ALARM_SLS_LOGSTORE_NAME)}});
}
if (!SenderQueueManager::GetInstance()->IsValidToPush(alarmPrjLogstoreKey)) {
// jump this region
++sendRegionIndex;
sendAlarmTypeIndex = 0;
continue;
}

// LOG_DEBUG(sLogger, ("4Send Alarm", region)("region", sendRegionIndex)("alarm index",
// mMessageType[sendAlarmTypeIndex]));
logGroup.set_source(LoongCollectorMonitor::mIpAddr);
logGroup.set_category(ALARM_SLS_LOGSTORE_NAME);
pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr);
pipelineEventGroup.SetTag(LOG_RESERVED_KEY_TOPIC, "__alarm__");
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

alarm

auto now = GetCurrentLogtailTime();
for (map<string, unique_ptr<AlarmMessage>>::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end();
++mapIter) {
auto& messagePtr = mapIter->second;

// LOG_DEBUG(sLogger, ("5Send Alarm", region)("region", sendRegionIndex)("alarm index",
// sendAlarmTypeIndex)("msg", messagePtr->mMessage));

Log* logPtr = logGroup.add_logs();
SetLogTime(logPtr,
AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta()
: now.tv_sec);
Log_Content* contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_type");
contentPtr->set_value(messagePtr->mMessageType);

contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_message");
contentPtr->set_value(messagePtr->mMessage);

contentPtr = logPtr->add_contents();
contentPtr->set_key("alarm_count");
contentPtr->set_value(ToString(messagePtr->mCount));

contentPtr = logPtr->add_contents();
contentPtr->set_key("ip");
contentPtr->set_value(LoongCollectorMonitor::mIpAddr);

contentPtr = logPtr->add_contents();
contentPtr->set_key("os");
contentPtr->set_value(OS_NAME);

contentPtr = logPtr->add_contents();
contentPtr->set_key("ver");
contentPtr->set_value(ILOGTAIL_VERSION);

LogEvent* logEvent = pipelineEventGroup.AddLogEvent();
logEvent->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta()
: now.tv_sec);
logEvent->SetContent("alarm_type", messagePtr->mMessageType);
logEvent->SetContent("alarm_message", messagePtr->mMessage);
logEvent->SetContent("alarm_count", ToString(messagePtr->mCount));
logEvent->SetContent("ip", LoongCollectorMonitor::mIpAddr);
logEvent->SetContent("os", OS_NAME);
logEvent->SetContent("ver", string(ILOGTAIL_VERSION));
if (!messagePtr->mProjectName.empty()) {
contentPtr = logPtr->add_contents();
contentPtr->set_key("project_name");
contentPtr->set_value(messagePtr->mProjectName);
logEvent->SetContent("project_name", messagePtr->mProjectName);
}

if (!messagePtr->mCategory.empty()) {
contentPtr = logPtr->add_contents();
contentPtr->set_key("category");
contentPtr->set_value(messagePtr->mCategory);
logEvent->SetContent("category", messagePtr->mCategory);
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

设计上改造后能带上config吗

}
}
lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime;
alarmMap.clear();
++sendAlarmTypeIndex;
}
if (logGroup.logs_size() <= 0) {
if (pipelineEventGroup.GetEvents().size() <= 0) {
continue;
}
// this is an anonymous send and non lock send
GetProfileSender()->SendToProfileProject(region, logGroup);
pipelineEventGroupList.emplace_back(std::move(pipelineEventGroup));
} while (true);
}

Expand Down
18 changes: 7 additions & 11 deletions core/monitor/AlarmManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#include <vector>

#include "common/Lock.h"
#include "models/PipelineEventGroup.h"

namespace logtail {

Expand Down Expand Up @@ -122,9 +123,6 @@ class AlarmManager {
return &instance;
}

void Init();
void Stop();

void SendAlarm(const AlarmType alarmType,
const std::string& message,
const std::string& projectName = "",
Expand All @@ -134,29 +132,27 @@ class AlarmManager {
void ForceToSend();
bool IsLowLevelAlarmValid();

void FlushAllRegionAlarm(std::vector<PipelineEventGroup>& pipelineEventGroupList);

private:
using AlarmVector = std::vector<std::map<std::string, std::unique_ptr<AlarmMessage>>>;

AlarmManager();
~AlarmManager() = default;

bool SendAlarmLoop();
// without lock
AlarmVector* MakesureLogtailAlarmMapVecUnlocked(const std::string& region);
void SendAllRegionAlarm();

std::future<bool> mThreadRes;
std::mutex mThreadRunningMux;
bool mIsThreadRunning = true;
std::condition_variable mStopCV;


std::vector<std::string> mMessageType;
std::map<std::string, std::pair<std::shared_ptr<AlarmVector>, std::vector<int32_t>>> mAllAlarmMap;
PTMutex mAlarmBufferMutex;

std::atomic_int mLastLowLevelTime{0};
std::atomic_int mLastLowLevelCount{0};

#ifdef APSARA_UNIT_TEST_MAIN
friend class AlarmManagerUnittest;
#endif
};

} // namespace logtail
39 changes: 35 additions & 4 deletions core/monitor/SelfMonitorServer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,20 @@ void SelfMonitorServer::Init() {
void SelfMonitorServer::Monitor() {
LOG_INFO(sLogger, ("self-monitor", "started"));
int32_t lastMonitorTime = time(NULL);
int32_t lastAlarmTime = time(NULL);
{
unique_lock<mutex> lock(mThreadRunningMux);
while (mIsThreadRunning) {
if (mStopCV.wait_for(lock, std::chrono::seconds(1), [this]() { return !mIsThreadRunning; })) {
break;
}
int32_t monitorTime = time(NULL);
if ((monitorTime - lastMonitorTime) >= 60) { // 60s
lastMonitorTime = monitorTime;
int32_t nowTime = time(NULL);
if ((nowTime - lastMonitorTime) >= 60) { // 60s
lastMonitorTime = nowTime;
SendMetrics();
}
if ((nowTime - lastAlarmTime) >= 3) { // 3s
lastAlarmTime = nowTime;
SendAlarms();
}
}
Expand All @@ -59,6 +63,7 @@ void SelfMonitorServer::Monitor() {
}
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved

void SelfMonitorServer::Stop() {
AlarmManager::GetInstance()->ForceToSend();
{
lock_guard<mutex> lock(mThreadRunningMux);
mIsThreadRunning = false;
Expand Down Expand Up @@ -170,11 +175,37 @@ void SelfMonitorServer::ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEve
}

void SelfMonitorServer::UpdateAlarmPipeline(CollectionPipelineContext* ctx) {
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
lock_guard<mutex> lock(mAlarmPipelineMux);
WriteLock lock(mAlarmPipelineMux);
mAlarmPipelineCtx = ctx;
LOG_INFO(sLogger, ("self-monitor alarms pipeline", "updated"));
}

void SelfMonitorServer::RemoveAlarmPipeline() {
WriteLock lock(mAlarmPipelineMux);
mAlarmPipelineCtx = nullptr;
LOG_INFO(sLogger, ("self-monitor alarms pipeline", "removed"));
}

void SelfMonitorServer::SendAlarms() {
ReadLock lock(mAlarmPipelineMux);
if (mAlarmPipelineCtx == nullptr) {
Takuka0311 marked this conversation as resolved.
Show resolved Hide resolved
return;
}
// tags: __topic__:__alarm__
// metadata: __region__:${region}
vector<PipelineEventGroup> pipelineEventGroupList;
AlarmManager::GetInstance()->FlushAllRegionAlarm(pipelineEventGroupList);

shared_ptr<CollectionPipeline> pipeline
= CollectionPipelineManager::GetInstance()->FindConfigByName(mAlarmPipelineCtx->GetConfigName());
if (pipeline.get() != nullptr) {
for (auto& pipelineEventGroup : pipelineEventGroupList) {
if (pipelineEventGroup.GetEvents().size() > 0) {
ProcessorRunner::GetInstance()->PushQueue(
pipeline->GetContext().GetProcessQueueKey(), 0, std::move(pipelineEventGroup));
}
}
}
}

} // namespace logtail
Loading
Loading