Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support singleton input #1933

Merged
merged 16 commits into from
Dec 11, 2024
20 changes: 20 additions & 0 deletions core/config/watcher/PipelineConfigWatcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ pair<PipelineConfigDiff, TaskConfigDiff> PipelineConfigWatcher::CheckConfigDiff(
LOG_DEBUG(sLogger, ("config files scan done", "no task update"));
}

SortPipelineConfigDiff(pDiff);
return make_pair(std::move(pDiff), std::move(tDiff));
}

Expand Down Expand Up @@ -405,4 +406,23 @@ bool PipelineConfigWatcher::CheckModifiedConfig(const string& configName,
return true;
}

void PipelineConfigWatcher::SortPipelineConfigDiff(PipelineConfigDiff& pDiff) {
// sort rule
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
// 1. sort by create time first, if create time is 0 (include local config), put it back
// 2. if create time is the same, sort by name
auto cmp = [](const PipelineConfig& a, const PipelineConfig& b) {
if (a.mCreateTime == b.mCreateTime) {
return a.mName < b.mName;
} else if (a.mCreateTime == 0 && b.mCreateTime != 0) {
return false;
} else if (a.mCreateTime != 0 && b.mCreateTime == 0) {
return true;
} else {
return a.mCreateTime < b.mCreateTime;
}
};
sort(pDiff.mAdded.begin(), pDiff.mAdded.end(), cmp);
sort(pDiff.mModified.begin(), pDiff.mModified.end(), cmp);
}

} // namespace logtail
8 changes: 7 additions & 1 deletion core/config/watcher/PipelineConfigWatcher.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,8 @@ class PipelineConfigWatcher : public ConfigWatcher {
PipelineConfigWatcher();
~PipelineConfigWatcher() = default;

void InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void
InsertInnerPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
void InsertPipelines(PipelineConfigDiff& pDiff, TaskConfigDiff& tDiff, std::unordered_set<std::string>& configSet);
bool CheckAddedConfig(const std::string& configName,
std::unique_ptr<Json::Value>&& configDetail,
Expand All @@ -56,9 +57,14 @@ class PipelineConfigWatcher : public ConfigWatcher {
std::unique_ptr<Json::Value>&& configDetail,
PipelineConfigDiff& pDiff,
TaskConfigDiff& tDiff);
void SortPipelineConfigDiff(PipelineConfigDiff& pDiff);

const PipelineManager* mPipelineManager = nullptr;
const TaskPipelineManager* mTaskPipelineManager = nullptr;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PipelineConfigWatcherUnittest;
#endif
};

} // namespace logtail
43 changes: 40 additions & 3 deletions core/pipeline/PipelineManager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,7 @@
#include "shennong/ShennongManager.h"
#endif
#include "config/feedbacker/ConfigFeedbackReceiver.h"
#include "pipeline/queue/ProcessQueueManager.h"
#include "pipeline/queue/QueueKeyManager.h"
#include "plugin/PluginRegistry.h"

using namespace std;

Expand All @@ -40,7 +39,7 @@ PipelineManager::PipelineManager()
: mInputRunners({
PrometheusInputRunner::GetInstance(),
#if defined(__linux__) && !defined(__ANDROID__)
ebpf::eBPFServer::GetInstance(),
ebpf::eBPFServer::GetInstance(),
#endif
}) {
}
Expand Down Expand Up @@ -82,6 +81,10 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackStatus::DELETED);
}
for (auto& config : diff.mModified) {
if (!PreCheckPipelineConfig(config)) {
continue;
}

auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue
if (!p) {
LOG_WARNING(sLogger,
Expand All @@ -97,6 +100,7 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackStatus::FAILED);
continue;
}

LOG_INFO(sLogger,
("pipeline building for existing config succeeded",
"stop the old pipeline and start the new one")("config", config.mName));
Expand All @@ -111,6 +115,10 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) {
ConfigFeedbackStatus::APPLIED);
}
for (auto& config : diff.mAdded) {
if (!PreCheckPipelineConfig(config)) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
continue;
}

auto p = BuildPipeline(std::move(config));
if (!p) {
LOG_WARNING(sLogger,
Expand Down Expand Up @@ -238,6 +246,35 @@ void PipelineManager::DecreasePluginUsageCnt(const unordered_map<string, unorder
}
}


bool PipelineManager::PreCheckPipelineConfig(PipelineConfig& config) {
if (CheckIfGlobalSingletonInputLoaded(config.mInputs)) {
LOG_WARNING(sLogger,
("global singleton input plugin is already loaded", "skip current object")("config", config.mName));
AlarmManager::GetInstance()->SendAlarm(
CATEGORY_CONFIG_ALARM,
"global singleton input plugin is already loaded: skip current object, config: " + config.mName,
config.mProject,
config.mLogstore,
config.mRegion);
ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName,
ConfigFeedbackStatus::FAILED);
return false;
}
return true;
}

bool PipelineManager::CheckIfGlobalSingletonInputLoaded(std::vector<const Json::Value*>& inputConfig) {
for (const auto& input : inputConfig) {
auto inputType = (*input)["Type"].asString();
if (PluginRegistry::GetInstance()->IsGlobalSingletonInputPlugin(inputType)
&& mPluginCntMap["inputs"][inputType] > 0) {
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
return true;
}
}
return false;
}

bool PipelineManager::CheckIfFileServerUpdated(const Json::Value& config) {
string inputType = config["Type"].asString();
return inputType == "input_file" || inputType == "input_container_stdio";
Expand Down
2 changes: 2 additions & 0 deletions core/pipeline/PipelineManager.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,8 @@ class PipelineManager {
void DecreasePluginUsageCnt(
const std::unordered_map<std::string, std::unordered_map<std::string, uint32_t>>& statistics);
void FlushAllBatch();
bool PreCheckPipelineConfig(PipelineConfig& config);
bool CheckIfGlobalSingletonInputLoaded(std::vector<const Json::Value*>& inputConfig);
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
// TODO: 长期过渡使用
bool CheckIfFileServerUpdated(const Json::Value& config);

Expand Down
49 changes: 35 additions & 14 deletions core/pipeline/plugin/PluginRegistry.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,13 +127,13 @@ bool PluginRegistry::IsValidNativeFlusherPlugin(const string& name) const {
void PluginRegistry::LoadStaticPlugins() {
RegisterInputCreator(new StaticInputCreator<InputFile>());
RegisterInputCreator(new StaticInputCreator<InputPrometheus>());
RegisterInputCreator(new StaticInputCreator<InputInternalMetrics>());
RegisterInputCreator(new StaticInputCreator<InputInternalMetrics>(), true);
#if defined(__linux__) && !defined(__ANDROID__)
RegisterInputCreator(new StaticInputCreator<InputContainerStdio>());
RegisterInputCreator(new StaticInputCreator<InputFileSecurity>());
RegisterInputCreator(new StaticInputCreator<InputNetworkObserver>());
RegisterInputCreator(new StaticInputCreator<InputNetworkSecurity>());
RegisterInputCreator(new StaticInputCreator<InputProcessSecurity>());
RegisterInputCreator(new StaticInputCreator<InputFileSecurity>(), true);
RegisterInputCreator(new StaticInputCreator<InputNetworkObserver>(), true);
RegisterInputCreator(new StaticInputCreator<InputNetworkSecurity>(), true);
RegisterInputCreator(new StaticInputCreator<InputProcessSecurity>(), true);
#endif

RegisterProcessorCreator(new StaticProcessorCreator<ProcessorSplitLogStringNative>());
Expand Down Expand Up @@ -183,16 +183,16 @@ void PluginRegistry::LoadDynamicPlugins(const set<string>& plugins) {
}
}

void PluginRegistry::RegisterInputCreator(PluginCreator* creator) {
RegisterCreator(INPUT_PLUGIN, creator);
void PluginRegistry::RegisterInputCreator(PluginCreator* creator, bool isSingleton) {
RegisterCreator(INPUT_PLUGIN, creator, isSingleton);
}

void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator) {
RegisterCreator(PROCESSOR_PLUGIN, creator);
void PluginRegistry::RegisterProcessorCreator(PluginCreator* creator, bool isSingleton) {
RegisterCreator(PROCESSOR_PLUGIN, creator, isSingleton);
}

void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator) {
RegisterCreator(FLUSHER_PLUGIN, creator);
void PluginRegistry::RegisterFlusherCreator(PluginCreator* creator, bool isSingleton) {
RegisterCreator(FLUSHER_PLUGIN, creator, isSingleton);
}

PluginCreator* PluginRegistry::LoadProcessorPlugin(DynamicLibLoader& loader, const string pluginType) {
Expand All @@ -217,21 +217,42 @@ PluginCreator* PluginRegistry::LoadProcessorPlugin(DynamicLibLoader& loader, con
return new DynamicCProcessorCreator(plugin, loader.Release());
}

void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator) {
void PluginRegistry::RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton) {
if (!creator) {
return;
}
mPluginDict.emplace(PluginKey(cat, creator->Name()), shared_ptr<PluginCreator>(creator));
mPluginDict.emplace(PluginKey(cat, creator->Name()),
PluginCreatorWithInfo(shared_ptr<PluginCreator>(creator), isSingleton));
}

unique_ptr<PluginInstance>
PluginRegistry::Create(PluginCat cat, const string& name, const PluginInstance::PluginMeta& pluginMeta) {
unique_ptr<PluginInstance> ins;
auto creatorEntry = mPluginDict.find(PluginKey(cat, name));
if (creatorEntry != mPluginDict.end()) {
ins = creatorEntry->second->Create(pluginMeta);
ins = creatorEntry->second.first->Create(pluginMeta);
}
return ins;
}

bool PluginRegistry::IsGlobalSingletonInputPlugin(const string& name) const {
return IsGlobalSingleton(INPUT_PLUGIN, name);
}

bool PluginRegistry::IsGlobalSingletonProcessorPlugin(const string& name) const {
return IsGlobalSingleton(PROCESSOR_PLUGIN, name);
}

bool PluginRegistry::IsGlobalSingletonFlusherPlugin(const string& name) const {
return IsGlobalSingleton(FLUSHER_PLUGIN, name);
}

bool PluginRegistry::IsGlobalSingleton(PluginCat cat, const string& name) const {
auto creatorEntry = mPluginDict.find(PluginKey(cat, name));
if (creatorEntry != mPluginDict.end()) {
return creatorEntry->second.second;
}
return false;
}

Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
} // namespace logtail
25 changes: 17 additions & 8 deletions core/pipeline/plugin/PluginRegistry.h
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,17 @@ class PluginRegistry {
void LoadPlugins();
void UnloadPlugins();
std::unique_ptr<InputInstance> CreateInput(const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
std::unique_ptr<ProcessorInstance> CreateProcessor(const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
std::unique_ptr<FlusherInstance> CreateFlusher(const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
std::unique_ptr<ProcessorInstance> CreateProcessor(const std::string& name,
const PluginInstance::PluginMeta& pluginMeta);
std::unique_ptr<FlusherInstance> CreateFlusher(const std::string& name,
const PluginInstance::PluginMeta& pluginMeta);
bool IsValidGoPlugin(const std::string& name) const;
bool IsValidNativeInputPlugin(const std::string& name) const;
bool IsValidNativeProcessorPlugin(const std::string& name) const;
bool IsValidNativeFlusherPlugin(const std::string& name) const;
bool IsGlobalSingletonInputPlugin(const std::string& name) const;
bool IsGlobalSingletonProcessorPlugin(const std::string& name) const;
bool IsGlobalSingletonFlusherPlugin(const std::string& name) const;
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved

private:
enum PluginCat { INPUT_PLUGIN, PROCESSOR_PLUGIN, FLUSHER_PLUGIN };
Expand All @@ -69,19 +74,23 @@ class PluginRegistry {
}
};

using PluginCreatorWithInfo = std::pair<std::shared_ptr<PluginCreator>, bool>;

PluginRegistry() {}
~PluginRegistry() = default;

void LoadStaticPlugins();
void LoadDynamicPlugins(const std::set<std::string>& plugins);
void RegisterInputCreator(PluginCreator* creator);
void RegisterProcessorCreator(PluginCreator* creator);
void RegisterFlusherCreator(PluginCreator* creator);
void RegisterInputCreator(PluginCreator* creator, bool isSingleton = false);
void RegisterProcessorCreator(PluginCreator* creator, bool isSingleton = false);
void RegisterFlusherCreator(PluginCreator* creator, bool isSingleton = false);
Abingcbc marked this conversation as resolved.
Show resolved Hide resolved
PluginCreator* LoadProcessorPlugin(DynamicLibLoader& loader, const std::string pluginType);
void RegisterCreator(PluginCat cat, PluginCreator* creator);
std::unique_ptr<PluginInstance> Create(PluginCat cat, const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
void RegisterCreator(PluginCat cat, PluginCreator* creator, bool isSingleton);
std::unique_ptr<PluginInstance>
Create(PluginCat cat, const std::string& name, const PluginInstance::PluginMeta& pluginMeta);
bool IsGlobalSingleton(PluginCat cat, const std::string& name) const;

std::unordered_map<PluginKey, std::shared_ptr<PluginCreator>, PluginKeyHash> mPluginDict;
std::unordered_map<PluginKey, PluginCreatorWithInfo, PluginKeyHash> mPluginDict;

#ifdef APSARA_UNIT_TEST_MAIN
friend class PluginRegistryUnittest;
Expand Down
2 changes: 1 addition & 1 deletion core/pipeline/plugin/creator/PluginCreator.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@

#pragma once

#include <string>
#include <memory>
#include <string>

#include "pipeline/plugin/instance/PluginInstance.h"

Expand Down
8 changes: 3 additions & 5 deletions core/pipeline/plugin/instance/InputInstance.h
Original file line number Diff line number Diff line change
Expand Up @@ -28,14 +28,12 @@ namespace logtail {

class InputInstance : public PluginInstance {
public:
InputInstance(Input* plugin, const PluginInstance::PluginMeta& pluginMeta) : PluginInstance(pluginMeta), mPlugin(plugin) {}
InputInstance(Input* plugin, const PluginInstance::PluginMeta& pluginMeta)
: PluginInstance(pluginMeta), mPlugin(plugin) {}

const std::string& Name() const override { return mPlugin->Name(); }

bool Init(const Json::Value& config,
PipelineContext& context,
size_t inputIdx,
Json::Value& optionalGoPipeline);
bool Init(const Json::Value& config, PipelineContext& context, size_t inputIdx, Json::Value& optionalGoPipeline);
bool Start() { return mPlugin->Start(); }
bool Stop(bool isPipelineRemoving) { return mPlugin->Stop(isPipelineRemoving); }
bool SupportAck() const { return mPlugin->SupportAck(); }
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputContainerStdio.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#include "container_manager/ContainerDiscoveryOptions.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "file_server/reader/FileReaderOptions.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"
#include "file_server/reader/FileReaderOptions.h"

namespace logtail {

Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
#include "container_manager/ContainerDiscoveryOptions.h"
#include "file_server/FileDiscoveryOptions.h"
#include "file_server/MultilineOptions.h"
#include "file_server/reader/FileReaderOptions.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"
#include "file_server/reader/FileReaderOptions.h"

namespace logtail {

Expand Down
4 changes: 2 additions & 2 deletions core/plugin/input/InputFileSecurity.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <string>

#include "ebpf/config.h"
#include "pipeline/plugin/interface/Input.h"
#include "ebpf/eBPFServer.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"

namespace logtail {

Expand All @@ -34,7 +34,7 @@ class InputFileSecurity : public Input {
bool Start() override;
bool Stop(bool isPipelineRemoving) override;
bool SupportAck() const override { return false; }

ebpf::SecurityOptions mSecurityOptions;
PluginMetricManagerPtr mPluginMgr;
};
Expand Down
1 change: 1 addition & 0 deletions core/plugin/input/InputInternalMetrics.h
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class InputInternalMetrics : public Input {
bool Start() override;
bool Stop(bool isPipelineRemoving) override;
bool SupportAck() const override { return true; }

private:
SelfMonitorMetricRules mSelfMonitorMetricRules;
};
Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputNetworkObserver.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
#include <string>

#include "ebpf/config.h"
#include "pipeline/plugin/interface/Input.h"
#include "ebpf/include/export.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"

namespace logtail {

Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputNetworkSecurity.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <string>

#include "ebpf/config.h"
#include "pipeline/plugin/interface/Input.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"

namespace logtail {

Expand Down
2 changes: 1 addition & 1 deletion core/plugin/input/InputProcessSecurity.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,8 @@
#include <string>

#include "ebpf/config.h"
#include "pipeline/plugin/interface/Input.h"
#include "monitor/PluginMetricManager.h"
#include "pipeline/plugin/interface/Input.h"

namespace logtail {

Expand Down
Loading
Loading