Skip to content

Commit

Permalink
feat: support singleton input
Browse files Browse the repository at this point in the history
  • Loading branch information
Abingcbc committed Nov 28, 2024
1 parent 4993457 commit 5c0d9c8
Show file tree
Hide file tree
Showing 18 changed files with 308 additions and 39 deletions.
20 changes: 20 additions & 0 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
LOG_DEBUG(sLogger, ("config files scan done", "no task update"));
}

SortPipelineConfigDiff(pDiff);
return make_pair(std::move(pDiff), std::move(tDiff));
}

Expand Down Expand Up @@ -405,4 +406,23 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
return true;
}

void PipelineConfigWatcher::SortPipelineConfigDiff(PipelineConfigDiff& pDiff) {
// sort rule
// 1. sort by create time first, if create time is 0 (include local config), put it back
// 2. if create time is the same, sort by name
auto cmp = [](const PipelineConfig& a, const PipelineConfig& b) {
if (a.mCreateTime == b.mCreateTime) {
return a.mName < b.mName;
} else if (a.mCreateTime == 0 && b.mCreateTime != 0) {
return false;
} else if (a.mCreateTime != 0 && b.mCreateTime == 0) {
return true;
} else {
return a.mCreateTime < b.mCreateTime;
}
};
sort(pDiff.mAdded.begin(), pDiff.mAdded.end(), cmp);
sort(pDiff.mModified.begin(), pDiff.mModified.end(), cmp);
}

} // namespace logtail
8 changes: 7 additions & 1 deletion core/config/watcher/PipelineConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class PipelineConfigWatcher : public ConfigWatcher {
PipelineConfigWatcher();
~PipelineConfigWatcher() = default;

void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void
InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
bool CheckAddedConfig(const std::string& configName,
std::unique_ptr<Json::Value>&& configDetail,
Expand All @@ -56,9 +57,14 @@ class PipelineConfigWatcher : public ConfigWatcher {
std::unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff);
void SortPipelineConfigDiff(PipelineConfigDiff& pDiff);

const PipelineManager* mPipelineManager = nullptr;
const TaskPipelineManager* mTaskPipelineManager = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineConfigWatcherUnittest;
#endif
};

} // namespace logtail
43 changes: 40 additions & 3 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
#include "shennong/ShennongManager.h"
#endif
#include "config/feedbacker/ConfigFeedbackReceiver.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKeyManager.h"
#include "plugin/PluginRegistry.h"

using namespace std;

Expand All @@ -40,7 +39,7 @@ PipelineManager::PipelineManager()
: mInputRunners({
PrometheusInputRunner::GetInstance(),
#if defined(__linux__) && !defined(__ANDROID__)
ebpf::eBPFServer::GetInstance(),
ebpf::eBPFServer::GetInstance(),
#endif
}) {
}
Expand Down Expand Up @@ -82,6 +81,10 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackStatus::DELETED);
}
for (auto& config : diff.mModified) {
if (!PreCheckPipelineConfig(config)) {
continue;
}

auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue
if (!p) {
LOG_WARNING(sLogger,
Expand All @@ -97,6 +100,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackStatus::FAILED);
continue;
}

LOG_INFO(sLogger,
("pipeline building for existing config succeeded",
"stop the old pipeline and start the new one")("config", config.mName));
Expand All @@ -111,6 +115,10 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackStatus::APPLIED);
}
for (auto& config : diff.mAdded) {
if (!PreCheckPipelineConfig(config)) {
continue;
}

auto p = BuildPipeline(std::move(config));
if (!p) {
LOG_WARNING(sLogger,
Expand Down Expand Up @@ -238,6 +246,35 @@ void PipelineManager::DecreasePluginUsageCnt(const unordered_map<string, unorder
}
}


bool PipelineManager::PreCheckPipelineConfig(PipelineConfig& config) {
if (CheckIfGlobalSingletonInputLoaded(config.mInputs)) {
LOG_WARNING(sLogger,
("global singleton input plugin is already loaded", "skip current object")("config", config.mName));
AlarmManager::GetInstance()->SendAlarm(
CATEGORY_CONFIG_ALARM,
"global singleton input plugin is already loaded: skip current object, config: " + config.mName,
config.mProject,
config.mLogstore,
config.mRegion);
ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName,
ConfigFeedbackStatus::FAILED);
return false;
}
return true;
}

bool PipelineManager::CheckIfGlobalSingletonInputLoaded(std::vector<const Json::Value*>& inputConfig) {
for (const auto& input : inputConfig) {
auto inputType = (*input)["Type"].asString();
if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(inputType)
&& mPluginCntMap["inputs"][inputType] > 0) {
return true;
}
}
return false;
}

bool PipelineManager::CheckIfFileServerUpdated(const Json::Value& config) {
string inputType = config["Type"].asString();
return inputType == "input_file" || inputType == "input_container_stdio";
Expand Down
2 changes: 2 additions & 0 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class PipelineManager {
void DecreasePluginUsageCnt(
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& statistics);
void FlushAllBatch();
bool PreCheckPipelineConfig(PipelineConfig& config);
bool CheckIfGlobalSingletonInputLoaded(std::vector<const Json::Value*>& inputConfig);
// TODO: 长期过渡使用
bool CheckIfFileServerUpdated(const Json::Value& config);

Expand Down
49 changes: 35 additions & 14 deletions core/pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const {
void PluginRegistry::LoadStaticPlugins() {
RegisterInputCreator(new StaticInputCreator<InputFile>());
RegisterInputCreator(new StaticInputCreator<InputPrometheus>());
RegisterInputCreator(new StaticInputCreator<InputInternalMetrics>());
RegisterInputCreator(new StaticInputCreator<InputInternalMetrics>(), true);
#if defined(__linux__) && !defined(__ANDROID__)
RegisterInputCreator(new StaticInputCreator<InputContainerStdio>());
RegisterInputCreator(new StaticInputCreator<InputFileSecurity>());
RegisterInputCreator(new StaticInputCreator<InputNetworkObserver>());
RegisterInputCreator(new StaticInputCreator<InputNetworkSecurity>());
RegisterInputCreator(new StaticInputCreator<InputProcessSecurity>());
RegisterInputCreator(new StaticInputCreator<InputFileSecurity>(), true);
RegisterInputCreator(new StaticInputCreator<InputNetworkObserver>(), true);
RegisterInputCreator(new StaticInputCreator<InputNetworkSecurity>(), true);
RegisterInputCreator(new StaticInputCreator<InputProcessSecurity>(), true);
#endif

RegisterProcessorCreator(new StaticProcessorCreator<ProcessorSplitLogStringNative>());
Expand Down Expand Up @@ -183,16 +183,16 @@ void PluginRegistry::LoadDynamicPlugins(const set<string>& plugins) {
}
}

void PluginRegistry::RegisterInputCreator(PluginCreator* creator) {
RegisterCreator(INPUT_PLUGIN, creator);
void PluginRegistry::RegisterInputCreator(PluginCreator* creator, bool isSingleton) {
RegisterCreator(INPUT_PLUGIN, creator, isSingleton);
}

void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator) {
RegisterCreator(PROCESSOR_PLUGIN, creator);
void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator, bool isSingleton) {
RegisterCreator(PROCESSOR_PLUGIN, creator, isSingleton);
}

void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator) {
RegisterCreator(FLUSHER_PLUGIN, creator);
void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator, bool isSingleton) {
RegisterCreator(FLUSHER_PLUGIN, creator, isSingleton);
}

PluginCreator* PluginRegistry::LoadProcessorPlugin(DynamicLibLoader& loader, const string pluginType) {
Expand All @@ -217,21 +217,42 @@ PluginCreator* PluginRegistry::LoadProcessorPlugin(DynamicLibLoader& loader, con
return new DynamicCProcessorCreator(plugin, loader.Release());
}

void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator) {
void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton) {
if (!creator) {
return;
}
mPluginDict.emplace(PluginKey(cat, creator->Name()), shared_ptr<PluginCreator>(creator));
mPluginDict.emplace(PluginKey(cat, creator->Name()),
PluginCreatorWithInfo(shared_ptr<PluginCreator>(creator), isSingleton));
}

unique_ptr<PluginInstance>
PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance::PluginMeta& pluginMeta) {
unique_ptr<PluginInstance> ins;
auto creatorEntry = mPluginDict.find(PluginKey(cat, name));
if (creatorEntry != mPluginDict.end()) {
ins = creatorEntry->second->Create(pluginMeta);
ins = creatorEntry->second.first->Create(pluginMeta);
}
return ins;
}

bool PluginRegistry::IsGlobalSingletonInputPlugin(const string& name) const {
return IsGlobalSingleton(INPUT_PLUGIN, name);
}

bool PluginRegistry::IsGlobalSingletonProcessorPlugin(const string& name) const {
return IsGlobalSingleton(PROCESSOR_PLUGIN, name);
}

bool PluginRegistry::IsGlobalSingletonFlusherPlugin(const string& name) const {
return IsGlobalSingleton(FLUSHER_PLUGIN, name);
}

bool PluginRegistry::IsGlobalSingleton(PluginCat cat, const string& name) const {
auto creatorEntry = mPluginDict.find(PluginKey(cat, name));
if (creatorEntry != mPluginDict.end()) {
return creatorEntry->second.second;
}
return false;
}

} // namespace logtail
25 changes: 17 additions & 8 deletions core/pipeline/plugin/PluginRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ class PluginRegistry {
void LoadPlugins();
void UnloadPlugins();
std::unique_ptr<InputInstance> CreateInput(const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
std::unique_ptr<ProcessorInstance> CreateProcessor(const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
std::unique_ptr<FlusherInstance> CreateFlusher(const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
std::unique_ptr<ProcessorInstance> CreateProcessor(const std::string& name,
const PluginInstance::PluginMeta& pluginMeta);
std::unique_ptr<FlusherInstance> CreateFlusher(const std::string& name,
const PluginInstance::PluginMeta& pluginMeta);
bool IsValidGoPlugin(const std::string& name) const;
bool IsValidNativeInputPlugin(const std::string& name) const;
bool IsValidNativeProcessorPlugin(const std::string& name) const;
bool IsValidNativeFlusherPlugin(const std::string& name) const;
bool IsGlobalSingletonInputPlugin(const std::string& name) const;
bool IsGlobalSingletonProcessorPlugin(const std::string& name) const;
bool IsGlobalSingletonFlusherPlugin(const std::string& name) const;

private:
enum PluginCat { INPUT_PLUGIN, PROCESSOR_PLUGIN, FLUSHER_PLUGIN };
Expand All @@ -69,19 +74,23 @@ class PluginRegistry {
}
};

using PluginCreatorWithInfo = std::pair<std::shared_ptr<PluginCreator>, bool>;

PluginRegistry() {}
~PluginRegistry() = default;

void LoadStaticPlugins();
void LoadDynamicPlugins(const std::set<std::string>& plugins);
void RegisterInputCreator(PluginCreator* creator);
void RegisterProcessorCreator(PluginCreator* creator);
void RegisterFlusherCreator(PluginCreator* creator);
void RegisterInputCreator(PluginCreator* creator, bool isSingleton = false);
void RegisterProcessorCreator(PluginCreator* creator, bool isSingleton = false);
void RegisterFlusherCreator(PluginCreator* creator, bool isSingleton = false);
PluginCreator* LoadProcessorPlugin(DynamicLibLoader& loader, const std::string pluginType);
void RegisterCreator(PluginCat cat, PluginCreator* creator);
std::unique_ptr<PluginInstance> Create(PluginCat cat, const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
void RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton);
std::unique_ptr<PluginInstance>
Create(PluginCat cat, const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
bool IsGlobalSingleton(PluginCat cat, const std::string& name) const;

std::unordered_map<PluginKey, std::shared_ptr<PluginCreator>, PluginKeyHash> mPluginDict;
std::unordered_map<PluginKey, PluginCreatorWithInfo, PluginKeyHash> mPluginDict;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PluginRegistryUnittest;
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/plugin/creator/PluginCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#pragma once

#include <string>
#include <memory>
#include <string>

#include "pipeline/plugin/instance/PluginInstance.h"

Expand Down
8 changes: 3 additions & 5 deletions core/pipeline/plugin/instance/InputInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ namespace logtail {

class InputInstance : public PluginInstance {
public:
InputInstance(Input* plugin, const PluginInstance::PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {}
InputInstance(Input* plugin, const PluginInstance::PluginMeta& pluginMeta)
: PluginInstance(pluginMeta), mPlugin(plugin) {}

const std::string& Name() const override { return mPlugin->Name(); }

bool Init(const Json::Value& config,
PipelineContext& context,
size_t inputIdx,
Json::Value& optionalGoPipeline);
bool Init(const Json::Value& config, PipelineContext& context, size_t inputIdx, Json::Value& optionalGoPipeline);
bool Start() { return mPlugin->Start(); }
bool Stop(bool isPipelineRemoving) { return mPlugin->Stop(isPipelineRemoving); }
bool SupportAck() const { return mPlugin->SupportAck(); }
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputContainerStdio.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#include "container_manager/ContainerDiscoveryOptions.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "file_server/reader/FileReaderOptions.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"
#include "file_server/reader/FileReaderOptions.h"

namespace logtail {

Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#include "container_manager/ContainerDiscoveryOptions.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "file_server/reader/FileReaderOptions.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"
#include "file_server/reader/FileReaderOptions.h"

namespace logtail {

Expand Down
4 changes: 2 additions & 2 deletions core/plugin/input/InputFileSecurity.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <string>

#include "ebpf/config.h"
#include "pipeline/plugin/interface/Input.h"
#include "ebpf/eBPFServer.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"

namespace logtail {

Expand All @@ -34,7 +34,7 @@ class InputFileSecurity : public Input {
bool Start() override;
bool Stop(bool isPipelineRemoving) override;
bool SupportAck() const override { return false; }

ebpf::SecurityOptions mSecurityOptions;
PluginMetricManagerPtr mPluginMgr;
};
Expand Down
1 change: 1 addition & 0 deletions core/plugin/input/InputInternalMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class InputInternalMetrics : public Input {
bool Start() override;
bool Stop(bool isPipelineRemoving) override;
bool SupportAck() const override { return true; }

private:
SelfMonitorMetricRules mSelfMonitorMetricRules;
};
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputNetworkObserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <string>

#include "ebpf/config.h"
#include "pipeline/plugin/interface/Input.h"
#include "ebpf/include/export.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"

namespace logtail {

Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputNetworkSecurity.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <string>

#include "ebpf/config.h"
#include "pipeline/plugin/interface/Input.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"

namespace logtail {

Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputProcessSecurity.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <string>

#include "ebpf/config.h"
#include "pipeline/plugin/interface/Input.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"

namespace logtail {

Expand Down
Loading

0 comments on commit 5c0d9c8

Please sign in to comment.