diff --git a/core/application/Application.cpp b/core/application/Application.cpp index a88f9a4e1b..f104eb12a4 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -268,9 +268,6 @@ void Application::Start() { // GCOVR_EXCL_START LogtailPlugin::GetInstance()->LoadPluginBase(); } - // TODO: this should be refactored to internal pipeline - AlarmManager::GetInstance()->Init(); - time_t curTime = 0, lastConfigCheckTime = 0, lastUpdateMetricTime = 0, lastCheckTagsTime = 0, lastQueueGCTime = 0; #ifndef LOGTAIL_NO_TC_MALLOC time_t lastTcmallocReleaseMemTime = 0; @@ -373,7 +370,6 @@ void Application::Exit() { LogtailMonitor::GetInstance()->Stop(); LoongCollectorMonitor::GetInstance()->Stop(); - AlarmManager::GetInstance()->Stop(); LogtailPlugin::GetInstance()->StopBuiltInModules(); // from now on, alarm should not be used. diff --git a/core/collection_pipeline/CollectionPipeline.h b/core/collection_pipeline/CollectionPipeline.h index 657fe95447..24ab795d61 100644 --- a/core/collection_pipeline/CollectionPipeline.h +++ b/core/collection_pipeline/CollectionPipeline.h @@ -123,6 +123,7 @@ class CollectionPipeline { friend class PipelineUnittest; friend class InputContainerStdioUnittest; friend class InputFileUnittest; + friend class InputInternalAlarmsUnittest; friend class InputInternalMetricsUnittest; friend class InputPrometheusUnittest; friend class ProcessorTagNativeUnittest; diff --git a/core/collection_pipeline/plugin/PluginRegistry.cpp b/core/collection_pipeline/plugin/PluginRegistry.cpp index ae67384fa2..85c652eff1 100644 --- a/core/collection_pipeline/plugin/PluginRegistry.cpp +++ b/core/collection_pipeline/plugin/PluginRegistry.cpp @@ -33,6 +33,7 @@ #include "plugin/input/InputPrometheus.h" #if defined(__linux__) && !defined(__ANDROID__) #include "plugin/input/InputFileSecurity.h" +#include "plugin/input/InputInternalAlarms.h" #include "plugin/input/InputInternalMetrics.h" #include "plugin/input/InputNetworkObserver.h" #include "plugin/input/InputNetworkSecurity.h" @@ -129,6 +130,7 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const { void PluginRegistry::LoadStaticPlugins() { RegisterInputCreator(new StaticInputCreator()); RegisterInputCreator(new StaticInputCreator()); + RegisterInputCreator(new StaticInputCreator(), true); RegisterInputCreator(new StaticInputCreator(), true); #if defined(__linux__) && !defined(__ANDROID__) RegisterInputCreator(new StaticInputCreator()); diff --git a/core/models/PipelineEventGroup.h b/core/models/PipelineEventGroup.h index 929c6e1ebc..9cf37f7df8 100644 --- a/core/models/PipelineEventGroup.h +++ b/core/models/PipelineEventGroup.h @@ -60,6 +60,8 @@ enum class EventGroupMetaKey { PROMETHEUS_STREAM_ID, PROMETHEUS_STREAM_TOTAL, + TARGET_REGION, + SOURCE_ID }; diff --git a/core/monitor/AlarmManager.cpp b/core/monitor/AlarmManager.cpp index 58dfdcc7c5..06e9e09554 100644 --- a/core/monitor/AlarmManager.cpp +++ b/core/monitor/AlarmManager.cpp @@ -107,49 +107,12 @@ AlarmManager::AlarmManager() { mMessageType[REGISTER_HANDLERS_TOO_SLOW_ALARM] = "REGISTER_HANDLERS_TOO_SLOW_ALARM"; } -void AlarmManager::Init() { - mThreadRes = async(launch::async, &AlarmManager::SendAlarmLoop, this); -} - -void AlarmManager::Stop() { - ForceToSend(); - { - lock_guard lock(mThreadRunningMux); - mIsThreadRunning = false; - } - mStopCV.notify_one(); - if (!mThreadRes.valid()) { - return; - } - future_status s = mThreadRes.wait_for(chrono::seconds(1)); - if (s == future_status::ready) { - LOG_INFO(sLogger, ("alarm gathering", "stopped successfully")); - } else { - LOG_WARNING(sLogger, ("alarm gathering", "forced to stopped")); - } -} - -bool AlarmManager::SendAlarmLoop() { - LOG_INFO(sLogger, ("alarm gathering", "started")); - { - unique_lock lock(mThreadRunningMux); - while (mIsThreadRunning) { - SendAllRegionAlarm(); - if (mStopCV.wait_for(lock, std::chrono::seconds(3), [this]() { return !mIsThreadRunning; })) { - break; - } - } - } - SendAllRegionAlarm(); - return true; -} - -void AlarmManager::SendAllRegionAlarm() { +void AlarmManager::FlushAllRegionAlarm(vector& pipelineEventGroupList) { int32_t currentTime = time(nullptr); size_t sendRegionIndex = 0; size_t sendAlarmTypeIndex = 0; do { - LogGroup logGroup; + PipelineEventGroup pipelineEventGroup(std::make_shared()); string region; { PTScopedLock lock(mAlarmBufferMutex); @@ -163,7 +126,8 @@ void AlarmManager::SendAllRegionAlarm() { ++allAlarmIter; } region = allAlarmIter->first; - // LOG_DEBUG(sLogger, ("1Send Alarm", region)("region", sendRegionIndex)); + pipelineEventGroup.SetMetadata(EventGroupMetaKey::TARGET_REGION, region); + AlarmVector& alarmBufferVec = *(allAlarmIter->second.first); std::vector& lastUpdateTimeVec = allAlarmIter->second.second; // check this region end @@ -173,8 +137,7 @@ void AlarmManager::SendAllRegionAlarm() { sendAlarmTypeIndex = 0; continue; } - // LOG_DEBUG(sLogger, ("2Send Alarm", region)("region", sendRegionIndex)("alarm index", - // mMessageType[sendAlarmTypeIndex])); + // check valid if (alarmBufferVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM || lastUpdateTimeVec.size() != (size_t)ALL_LOGTAIL_ALARM_NUM) { @@ -187,8 +150,6 @@ void AlarmManager::SendAllRegionAlarm() { continue; } - // LOG_DEBUG(sLogger, ("3Send Alarm", region)("region", sendRegionIndex)("alarm index", - // mMessageType[sendAlarmTypeIndex])); map>& alarmMap = alarmBufferVec[sendAlarmTypeIndex]; if (alarmMap.size() == 0 || currentTime - lastUpdateTimeVec[sendAlarmTypeIndex] < INT32_FLAG(logtail_alarm_interval)) { @@ -196,89 +157,39 @@ void AlarmManager::SendAllRegionAlarm() { ++sendAlarmTypeIndex; continue; } - // check sender queue status, if invalid jump this region - - string project = GetProfileSender()->GetProfileProjectName(region); - QueueKey alarmPrjLogstoreKey - = QueueKeyManager::GetInstance()->GetKey("-flusher_sls-" + project + "#" + ALARM_SLS_LOGSTORE_NAME); - if (SenderQueueManager::GetInstance()->GetQueue(alarmPrjLogstoreKey) == nullptr) { - CollectionPipelineContext ctx; - SenderQueueManager::GetInstance()->CreateQueue( - alarmPrjLogstoreKey, - "self_monitor", - ctx, - {{"region", FlusherSLS::GetRegionConcurrencyLimiter(region)}, - {"project", FlusherSLS::GetProjectConcurrencyLimiter(project)}, - {"logstore", FlusherSLS::GetLogstoreConcurrencyLimiter(project, ALARM_SLS_LOGSTORE_NAME)}}); - } - if (!SenderQueueManager::GetInstance()->IsValidToPush(alarmPrjLogstoreKey)) { - // jump this region - ++sendRegionIndex; - sendAlarmTypeIndex = 0; - continue; - } - // LOG_DEBUG(sLogger, ("4Send Alarm", region)("region", sendRegionIndex)("alarm index", - // mMessageType[sendAlarmTypeIndex])); - logGroup.set_source(LoongCollectorMonitor::mIpAddr); - logGroup.set_category(ALARM_SLS_LOGSTORE_NAME); + pipelineEventGroup.SetTagNoCopy(LOG_RESERVED_KEY_SOURCE, LoongCollectorMonitor::mIpAddr); + pipelineEventGroup.SetTag(LOG_RESERVED_KEY_TOPIC, "__alarm__"); auto now = GetCurrentLogtailTime(); for (map>::iterator mapIter = alarmMap.begin(); mapIter != alarmMap.end(); ++mapIter) { auto& messagePtr = mapIter->second; - // LOG_DEBUG(sLogger, ("5Send Alarm", region)("region", sendRegionIndex)("alarm index", - // sendAlarmTypeIndex)("msg", messagePtr->mMessage)); - - Log* logPtr = logGroup.add_logs(); - SetLogTime(logPtr, - AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() - : now.tv_sec); - Log_Content* contentPtr = logPtr->add_contents(); - contentPtr->set_key("alarm_type"); - contentPtr->set_value(messagePtr->mMessageType); - - contentPtr = logPtr->add_contents(); - contentPtr->set_key("alarm_message"); - contentPtr->set_value(messagePtr->mMessage); - - contentPtr = logPtr->add_contents(); - contentPtr->set_key("alarm_count"); - contentPtr->set_value(ToString(messagePtr->mCount)); - - contentPtr = logPtr->add_contents(); - contentPtr->set_key("ip"); - contentPtr->set_value(LoongCollectorMonitor::mIpAddr); - - contentPtr = logPtr->add_contents(); - contentPtr->set_key("os"); - contentPtr->set_value(OS_NAME); - - contentPtr = logPtr->add_contents(); - contentPtr->set_key("ver"); - contentPtr->set_value(ILOGTAIL_VERSION); - + LogEvent* logEvent = pipelineEventGroup.AddLogEvent(); + logEvent->SetTimestamp(AppConfig::GetInstance()->EnableLogTimeAutoAdjust() ? now.tv_sec + GetTimeDelta() + : now.tv_sec); + logEvent->SetContent("alarm_type", messagePtr->mMessageType); + logEvent->SetContent("alarm_message", messagePtr->mMessage); + logEvent->SetContent("alarm_count", ToString(messagePtr->mCount)); + logEvent->SetContent("ip", LoongCollectorMonitor::mIpAddr); + logEvent->SetContent("os", OS_NAME); + logEvent->SetContent("ver", string(ILOGTAIL_VERSION)); if (!messagePtr->mProjectName.empty()) { - contentPtr = logPtr->add_contents(); - contentPtr->set_key("project_name"); - contentPtr->set_value(messagePtr->mProjectName); + logEvent->SetContent("project_name", messagePtr->mProjectName); } - if (!messagePtr->mCategory.empty()) { - contentPtr = logPtr->add_contents(); - contentPtr->set_key("category"); - contentPtr->set_value(messagePtr->mCategory); + logEvent->SetContent("category", messagePtr->mCategory); } } lastUpdateTimeVec[sendAlarmTypeIndex] = currentTime; alarmMap.clear(); ++sendAlarmTypeIndex; } - if (logGroup.logs_size() <= 0) { + if (pipelineEventGroup.GetEvents().size() <= 0) { continue; } // this is an anonymous send and non lock send - GetProfileSender()->SendToProfileProject(region, logGroup); + pipelineEventGroupList.emplace_back(std::move(pipelineEventGroup)); } while (true); } diff --git a/core/monitor/AlarmManager.h b/core/monitor/AlarmManager.h index 6357097262..b45f4727a6 100644 --- a/core/monitor/AlarmManager.h +++ b/core/monitor/AlarmManager.h @@ -27,6 +27,7 @@ #include #include "common/Lock.h" +#include "models/PipelineEventGroup.h" namespace logtail { @@ -122,9 +123,6 @@ class AlarmManager { return &instance; } - void Init(); - void Stop(); - void SendAlarm(const AlarmType alarmType, const std::string& message, const std::string& projectName = "", @@ -134,22 +132,16 @@ class AlarmManager { void ForceToSend(); bool IsLowLevelAlarmValid(); + void FlushAllRegionAlarm(std::vector& pipelineEventGroupList); + private: using AlarmVector = std::vector>>; AlarmManager(); ~AlarmManager() = default; - bool SendAlarmLoop(); // without lock AlarmVector* MakesureLogtailAlarmMapVecUnlocked(const std::string& region); - void SendAllRegionAlarm(); - - std::future mThreadRes; - std::mutex mThreadRunningMux; - bool mIsThreadRunning = true; - std::condition_variable mStopCV; - std::vector mMessageType; std::map, std::vector>> mAllAlarmMap; @@ -157,6 +149,10 @@ class AlarmManager { std::atomic_int mLastLowLevelTime{0}; std::atomic_int mLastLowLevelCount{0}; + +#ifdef APSARA_UNIT_TEST_MAIN + friend class AlarmManagerUnittest; +#endif }; } // namespace logtail diff --git a/core/monitor/SelfMonitorServer.cpp b/core/monitor/SelfMonitorServer.cpp index 5e56a955a3..5349c5539a 100644 --- a/core/monitor/SelfMonitorServer.cpp +++ b/core/monitor/SelfMonitorServer.cpp @@ -40,16 +40,20 @@ void SelfMonitorServer::Init() { void SelfMonitorServer::Monitor() { LOG_INFO(sLogger, ("self-monitor", "started")); int32_t lastMonitorTime = time(NULL); + int32_t lastAlarmTime = time(NULL); { unique_lock lock(mThreadRunningMux); while (mIsThreadRunning) { if (mStopCV.wait_for(lock, std::chrono::seconds(1), [this]() { return !mIsThreadRunning; })) { break; } - int32_t monitorTime = time(NULL); - if ((monitorTime - lastMonitorTime) >= 60) { // 60s - lastMonitorTime = monitorTime; + int32_t nowTime = time(NULL); + if ((nowTime - lastMonitorTime) >= 60) { // 60s + lastMonitorTime = nowTime; SendMetrics(); + } + if ((nowTime - lastAlarmTime) >= 3) { // 3s + lastAlarmTime = nowTime; SendAlarms(); } } @@ -59,6 +63,7 @@ void SelfMonitorServer::Monitor() { } void SelfMonitorServer::Stop() { + AlarmManager::GetInstance()->ForceToSend(); { lock_guard lock(mThreadRunningMux); mIsThreadRunning = false; @@ -170,11 +175,37 @@ void SelfMonitorServer::ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEve } void SelfMonitorServer::UpdateAlarmPipeline(CollectionPipelineContext* ctx) { - lock_guard lock(mAlarmPipelineMux); + WriteLock lock(mAlarmPipelineMux); mAlarmPipelineCtx = ctx; + LOG_INFO(sLogger, ("self-monitor alarms pipeline", "updated")); +} + +void SelfMonitorServer::RemoveAlarmPipeline() { + WriteLock lock(mAlarmPipelineMux); + mAlarmPipelineCtx = nullptr; + LOG_INFO(sLogger, ("self-monitor alarms pipeline", "removed")); } void SelfMonitorServer::SendAlarms() { + ReadLock lock(mAlarmPipelineMux); + if (mAlarmPipelineCtx == nullptr) { + return; + } + // tags: __topic__:__alarm__ + // metadata: __region__:${region} + vector pipelineEventGroupList; + AlarmManager::GetInstance()->FlushAllRegionAlarm(pipelineEventGroupList); + + shared_ptr pipeline + = CollectionPipelineManager::GetInstance()->FindConfigByName(mAlarmPipelineCtx->GetConfigName()); + if (pipeline.get() != nullptr) { + for (auto& pipelineEventGroup : pipelineEventGroupList) { + if (pipelineEventGroup.GetEvents().size() > 0) { + ProcessorRunner::GetInstance()->PushQueue( + pipeline->GetContext().GetProcessQueueKey(), 0, std::move(pipelineEventGroup)); + } + } + } } } // namespace logtail diff --git a/core/monitor/SelfMonitorServer.h b/core/monitor/SelfMonitorServer.h index 6bb9dd64a1..0e3362c891 100644 --- a/core/monitor/SelfMonitorServer.h +++ b/core/monitor/SelfMonitorServer.h @@ -34,7 +34,9 @@ class SelfMonitorServer { void UpdateMetricPipeline(CollectionPipelineContext* ctx, SelfMonitorMetricRules* rules); void RemoveMetricPipeline(); - void UpdateAlarmPipeline(CollectionPipelineContext* ctx); // Todo + void UpdateAlarmPipeline(CollectionPipelineContext* ctx); + void RemoveAlarmPipeline(); + private: SelfMonitorServer(); ~SelfMonitorServer() = default; @@ -44,21 +46,24 @@ class SelfMonitorServer { bool mIsThreadRunning = true; std::condition_variable mStopCV; + // metrics void SendMetrics(); bool ProcessSelfMonitorMetricEvent(SelfMonitorMetricEvent& event, const SelfMonitorMetricRule& rule); void PushSelfMonitorMetricEvents(std::vector& events); void ReadAsPipelineEventGroup(PipelineEventGroup& pipelineEventGroup); + mutable ReadWriteLock mMetricPipelineLock; CollectionPipelineContext* mMetricPipelineCtx = nullptr; SelfMonitorMetricRules* mSelfMonitorMetricRules = nullptr; SelfMonitorMetricEventMap mSelfMonitorMetricEventMap; - mutable ReadWriteLock mMetricPipelineLock; + // alarms void SendAlarms(); - CollectionPipelineContext* mAlarmPipelineCtx; - std::mutex mAlarmPipelineMux; + mutable ReadWriteLock mAlarmPipelineMux; + CollectionPipelineContext* mAlarmPipelineCtx = nullptr; #ifdef APSARA_UNIT_TEST_MAIN + friend class InputInternalAlarmsUnittest; friend class InputInternalMetricsUnittest; #endif }; diff --git a/core/plugin/input/InputInternalAlarms.cpp b/core/plugin/input/InputInternalAlarms.cpp new file mode 100644 index 0000000000..59cb91c6a5 --- /dev/null +++ b/core/plugin/input/InputInternalAlarms.cpp @@ -0,0 +1,39 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "plugin/input/InputInternalAlarms.h" + +namespace logtail { + +const std::string InputInternalAlarms::sName = "input_internal_alarms"; + +bool InputInternalAlarms::Init(const Json::Value& config, Json::Value& optionalGoPipeline) { + return true; +} + +bool InputInternalAlarms::Start() { + SelfMonitorServer::GetInstance()->UpdateAlarmPipeline(mContext); + return true; +} + +bool InputInternalAlarms::Stop(bool isPipelineRemoving) { + if (isPipelineRemoving) { + SelfMonitorServer::GetInstance()->RemoveAlarmPipeline(); + } + return true; +} + +} // namespace logtail diff --git a/core/plugin/input/InputInternalAlarms.h b/core/plugin/input/InputInternalAlarms.h new file mode 100644 index 0000000000..d78fc2598c --- /dev/null +++ b/core/plugin/input/InputInternalAlarms.h @@ -0,0 +1,37 @@ +/* + * Copyright 2024 iLogtail Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include "collection_pipeline/plugin/interface/Input.h" +#include "monitor/SelfMonitorServer.h" + +namespace logtail { + +class InputInternalAlarms : public Input { +public: + static const std::string sName; + + const std::string& Name() const override { return sName; } + bool Init(const Json::Value& config, Json::Value& optionalGoPipeline) override; + bool Start() override; + bool Stop(bool isPipelineRemoving) override; + bool SupportAck() const override { return true; } + +private: +}; + +} // namespace logtail diff --git a/core/unittest/input/CMakeLists.txt b/core/unittest/input/CMakeLists.txt index 4544e875bf..42650e58c7 100644 --- a/core/unittest/input/CMakeLists.txt +++ b/core/unittest/input/CMakeLists.txt @@ -36,6 +36,9 @@ target_link_libraries(input_ebpf_network_security_unittest unittest_base) add_executable(input_ebpf_network_observer_unittest InputNetworkObserverUnittest.cpp) target_link_libraries(input_ebpf_network_observer_unittest unittest_base) +add_executable(input_internal_alarms_unittest InputInternalMetricsUnittest.cpp) +target_link_libraries(input_internal_alarms_unittest ${UT_BASE_TARGET}) + add_executable(input_internal_metrics_unittest InputInternalMetricsUnittest.cpp) target_link_libraries(input_internal_metrics_unittest ${UT_BASE_TARGET}) @@ -47,4 +50,5 @@ gtest_discover_tests(input_ebpf_file_security_unittest) gtest_discover_tests(input_ebpf_process_security_unittest) gtest_discover_tests(input_ebpf_network_security_unittest) gtest_discover_tests(input_ebpf_network_observer_unittest) +gtest_discover_tests(input_internal_alarms_unittest) gtest_discover_tests(input_internal_metrics_unittest) diff --git a/core/unittest/input/InputInternalAlarmsUnittest.cpp b/core/unittest/input/InputInternalAlarmsUnittest.cpp new file mode 100644 index 0000000000..c26c0bffba --- /dev/null +++ b/core/unittest/input/InputInternalAlarmsUnittest.cpp @@ -0,0 +1,90 @@ +// Copyright 2023 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include + +#include "json/json.h" + +#include "app_config/AppConfig.h" +#include "collection_pipeline/CollectionPipeline.h" +#include "collection_pipeline/CollectionPipelineContext.h" +#include "collection_pipeline/plugin/PluginRegistry.h" +#include "common/JsonUtil.h" +#include "monitor/Monitor.h" +#include "plugin/input/InputInternalAlarms.h" +#include "unittest/Unittest.h" + +DECLARE_FLAG_INT32(default_plugin_log_queue_size); + +using namespace std; + +namespace logtail { + +class InputInternalAlarmsUnittest : public testing::Test { +public: + void OnPipelineUpdate(); + +protected: + static void SetUpTestCase() { + LoongCollectorMonitor::GetInstance()->Init(); + PluginRegistry::GetInstance()->LoadPlugins(); + } + + static void TearDownTestCase() { + PluginRegistry::GetInstance()->UnloadPlugins(); + LoongCollectorMonitor::GetInstance()->Stop(); + } + + void SetUp() override { + p.mName = "test_config"; + ctx.SetConfigName("test_config"); + p.mPluginID.store(0); + ctx.SetPipeline(p); + } + +private: + CollectionPipeline p; + CollectionPipelineContext ctx; +}; + +void InputInternalAlarmsUnittest::OnPipelineUpdate() { + Json::Value configJson, optionalGoPipeline; + InputInternalAlarms input; + input.SetContext(ctx); + string configStr, errorMsg; + + configStr = R"( + { + "Type": "input_internal_alarms" + } + )"; + APSARA_TEST_TRUE(ParseJsonTable(configStr, configJson, errorMsg)); + input.SetContext(ctx); + input.SetMetricsRecordRef(InputInternalAlarms::sName, "1"); + APSARA_TEST_TRUE(input.Init(configJson, optionalGoPipeline)); + + APSARA_TEST_EQUAL(nullptr, SelfMonitorServer::GetInstance()->mAlarmPipelineCtx); + APSARA_TEST_TRUE(input.Start()); + APSARA_TEST_NOT_EQUAL(nullptr, SelfMonitorServer::GetInstance()->mAlarmPipelineCtx); + APSARA_TEST_TRUE(input.Stop(true)); + APSARA_TEST_EQUAL(nullptr, SelfMonitorServer::GetInstance()->mAlarmPipelineCtx); +} + +UNIT_TEST_CASE(InputInternalAlarmsUnittest, OnPipelineUpdate) + +} // namespace logtail + +UNIT_TEST_MAIN diff --git a/core/unittest/monitor/AlarmManagerUnittest.cpp b/core/unittest/monitor/AlarmManagerUnittest.cpp new file mode 100644 index 0000000000..69f83549fa --- /dev/null +++ b/core/unittest/monitor/AlarmManagerUnittest.cpp @@ -0,0 +1,86 @@ +// Copyright 2025 iLogtail Authors +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include + +#include "json/json.h" + +#include "AlarmManager.h" +#include "unittest/Unittest.h" + +namespace logtail { + +static std::atomic_bool running(true); + +class AlarmManagerUnittest : public ::testing::Test { +public: + void SetUp() {} + + void TearDown() { + + } + + void TestSendAlarm(); + void TestFlushAllRegionAlarm(); +}; + +APSARA_UNIT_TEST_CASE(AlarmManagerUnittest, TestSendAlarm, 0); +APSARA_UNIT_TEST_CASE(AlarmManagerUnittest, TestFlushAllRegionAlarm, 1); + +void AlarmManagerUnittest::TestSendAlarm() { + { + std::string message = "Test Alarm Message"; + std::string projectName = "TestProject"; + std::string category = "TestCategory"; + std::string region = "TestRegion"; + AlarmType alarmType = USER_CONFIG_ALARM; // Assuming USER_CONFIG_ALARM is valid + + AlarmManager::GetInstance()->SendAlarm(alarmType, message, projectName, category, region); + // Assuming we have a method to retrieve alarms for testing + AlarmManager::AlarmVector& alarmBufferVec = *AlarmManager::GetInstance()->MakesureLogtailAlarmMapVecUnlocked(region); + + std::string key = projectName + "_" + category; + APSARA_TEST_EQUAL(1U, alarmBufferVec[alarmType].size()); + APSARA_TEST_EQUAL(true, alarmBufferVec[alarmType].find(key) != alarmBufferVec[alarmType].end()); + APSARA_TEST_EQUAL(category, alarmBufferVec[alarmType][key]->mCategory); + APSARA_TEST_EQUAL(1, alarmBufferVec[alarmType][key]->mCount); + APSARA_TEST_EQUAL(message, alarmBufferVec[alarmType][key]->mMessage); + APSARA_TEST_EQUAL(AlarmManager::GetInstance()->mMessageType[alarmType], alarmBufferVec[alarmType][key]->mMessageType); + APSARA_TEST_EQUAL(projectName, alarmBufferVec[alarmType][key]->mProjectName); + } +} + +void AlarmManagerUnittest::TestFlushAllRegionAlarm() { + AlarmManager::GetInstance()->mAllAlarmMap.clear(); + // Simulate adding some alarms + AlarmManager::GetInstance()->SendAlarm(USER_CONFIG_ALARM, "Test1", "Project1", "Cat1", "Region1"); + AlarmManager::GetInstance()->SendAlarm(GLOBAL_CONFIG_ALARM, "Test2", "Project2", "Cat2", "Region2"); + + std::vector pipelineEventGroupList; + AlarmManager::GetInstance()->FlushAllRegionAlarm(pipelineEventGroupList); + + // Assuming each alarm results in a PipelineEventGroup + APSARA_TEST_EQUAL(2U, pipelineEventGroupList.size()); +} + +} // namespace logtail + +int main(int argc, char** argv) { + logtail::Logger::Instance().InitGlobalLoggers(); + ::testing::InitGoogleTest(&argc, argv); + return RUN_ALL_TESTS(); +} diff --git a/core/unittest/monitor/CMakeLists.txt b/core/unittest/monitor/CMakeLists.txt index d3e7c760f2..c24003b6f9 100644 --- a/core/unittest/monitor/CMakeLists.txt +++ b/core/unittest/monitor/CMakeLists.txt @@ -13,7 +13,10 @@ # limitations under the License. cmake_minimum_required(VERSION 3.22) -project(metric_manager_unittest) +project(self_monitor_unittest) + +add_executable(alarm_manager_unittest AlarmManagerUnittest.cpp) +target_link_libraries(alarm_manager_unittest ${UT_BASE_TARGET}) add_executable(metric_manager_unittest MetricManagerUnittest.cpp) target_link_libraries(metric_manager_unittest ${UT_BASE_TARGET}) @@ -25,6 +28,7 @@ add_executable(self_monitor_metric_event_unittest SelfMonitorMetricEventUnittest target_link_libraries(self_monitor_metric_event_unittest ${UT_BASE_TARGET}) include(GoogleTest) +gtest_discover_tests(alarm_manager_unittest) gtest_discover_tests(metric_manager_unittest) gtest_discover_tests(plugin_metric_manager_unittest) gtest_discover_tests(self_monitor_metric_event_unittest)