Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add support fot task pipeline #1883

Merged
merged 6 commits into from
Nov 19, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants monitor/profile_sender models
config config/watcher constants
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
task_pipeline
runner runner/sink/http
protobuf/sls protobuf/models
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
Expand Down
1 change: 1 addition & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/FileSystemUtil.h"
#include "common/JsonUtil.h"
#include "common/LogtailCommonFlags.h"
#include "config/InstanceConfigManager.h"
#include "config/watcher/InstanceConfigWatcher.h"
#include "file_server/ConfigManager.h"
#include "file_server/reader/LogFileReader.h"
Expand Down
14 changes: 9 additions & 5 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include "common/version.h"
#include "config/ConfigDiff.h"
#include "config/InstanceConfigManager.h"
#include "config/watcher/ConfigWatcher.h"
#include "config/watcher/PipelineConfigWatcher.h"
#include "config/watcher/InstanceConfigWatcher.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
Expand All @@ -52,6 +52,7 @@
#include "runner/FlusherRunner.h"
#include "runner/ProcessorRunner.h"
#include "runner/sink/http/HttpSink.h"
#include "task_pipeline/TaskPipelineManager.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#include "config/provider/LegacyConfigProvider.h"
Expand Down Expand Up @@ -219,7 +220,7 @@ void Application::Start() { // GCOVR_EXCL_START
("failed to create dir for local pipeline_config",
"manual creation may be required")("error code", ec.value())("error msg", ec.message()));
}
ConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
PipelineConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
}

#ifdef __ENTERPRISE__
Expand Down Expand Up @@ -276,9 +277,12 @@ void Application::Start() { // GCOVR_EXCL_START
lastCheckTagsTime = curTime;
}
if (curTime - lastConfigCheckTime >= INT32_FLAG(config_scan_interval)) {
PipelineConfigDiff pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckConfigDiff();
if (!pipelineConfigDiff.IsEmpty()) {
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff);
auto configDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
if (!configDiff.first.IsEmpty()) {
PipelineManager::GetInstance()->UpdatePipelines(configDiff.first);
}
if (!configDiff.second.IsEmpty()) {
TaskPipelineManager::GetInstance()->UpdatePipelines(configDiff.second);
}
InstanceConfigDiff instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff();
if (!instanceConfigDiff.IsEmpty()) {
Expand Down
39 changes: 39 additions & 0 deletions core/common/ParamExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,45 @@
region); \
}

#define TASK_PARAM_ERROR_RETURN(logger, alarm, msg, module, config) \
if (module.empty()) { \
LOG_ERROR(logger, ("failed to parse config", msg)("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, std::string(msg) + ": abort, config: " + config); \
} else { \
LOG_ERROR(logger, ("failed to parse config", msg)("module", module)("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, \
std::string(msg) + ": abort, module: " + module + ", config: " + config); \
} \
return false
#define TASK_PARAM_WARNING_IGNORE(logger, alarm, msg, module, config) \
if (module.empty()) { \
LOG_WARNING(logger, \
("problem encountered in config parsing", msg)("action", "ignore param")("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, std::string(msg) + ": ignore param, config: " + config); \
} else { \
LOG_WARNING(logger, \
("problem encountered in config parsing", \
msg)("action", "ignore param")("module", module)("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, \
std::string(msg) + ": ignore param, module: " + module + ", config: " + config); \
}
#define TASK_PARAM_WARNING_DEFAULT(logger, alarm, msg, val, module, config) \
if (module.empty()) { \
LOG_WARNING(logger, \
("problem encountered in config parsing", \
msg)("action", "use default value instead")("default value", ToString(val))("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, \
std::string(msg) + ": use default value instead, default value: " + ToString(val) \
+ ", config: " + config); \
} else { \
LOG_WARNING(logger, \
("problem encountered in config parsing", msg)("action", "use default value instead")( \
"default value", ToString(val))("module", module)("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, \
std::string(msg) + ": use default value instead, default value: " + ToString(val) \
+ ", module: " + module + ", config: " + config); \
}

namespace logtail {

const std::string noModule = "";
Expand Down
20 changes: 9 additions & 11 deletions core/config/ConfigDiff.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,21 @@

#include "config/InstanceConfig.h"
#include "config/PipelineConfig.h"
#include "config/TaskConfig.h"

namespace logtail {

class PipelineConfigDiff {
public:
std::vector<PipelineConfig> mAdded;
std::vector<PipelineConfig> mModified;
template <class T>
struct ConfigDiff {
std::vector<T> mAdded;
std::vector<T> mModified;
std::vector<std::string> mRemoved;
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

class InstanceConfigDiff {
public:
std::vector<InstanceConfig> mAdded;
std::vector<InstanceConfig> mModified;
std::vector<std::string> mRemoved;
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

using PipelineConfigDiff = ConfigDiff<PipelineConfig>;
using TaskConfigDiff = ConfigDiff<TaskConfig>;
using InstanceConfigDiff = ConfigDiff<InstanceConfig>;

} // namespace logtail
87 changes: 87 additions & 0 deletions core/config/ConfigUtil.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "config/ConfigUtil.h"

#include "common/FileSystemUtil.h"
#include "common/JsonUtil.h"
#include "common/YamlUtil.h"
#include "logger/Logger.h"

using namespace std;

namespace logtail {

bool LoadConfigDetailFromFile(const filesystem::path& filepath, Json::Value& detail) {
const string& ext = filepath.extension().string();
const string& configName = filepath.stem().string();
if (configName == "region_config") {
return false;
}
if (ext != ".yaml" && ext != ".yml" && ext != ".json") {
LOG_WARNING(sLogger, ("unsupported config file format", "skip current object")("filepath", filepath));
return false;
}
string content;
if (!ReadFile(filepath.string(), content)) {
LOG_WARNING(sLogger, ("failed to open config file", "skip current object")("filepath", filepath));
return false;
}
if (content.empty()) {
LOG_WARNING(sLogger, ("empty config file", "skip current object")("filepath", filepath));
return false;
}
string errorMsg;
if (!ParseConfigDetail(content, ext, detail, errorMsg)) {
LOG_WARNING(sLogger,
("config file format error", "skip current object")("error msg", errorMsg)("filepath", filepath));
return false;
}
return true;
}

bool ParseConfigDetail(const string& content, const string& extension, Json::Value& detail, string& errorMsg) {
if (extension == ".json") {
return ParseJsonTable(content, detail, errorMsg);
} else if (extension == ".yaml" || extension == ".yml") {
YAML::Node yamlRoot;
if (!ParseYamlTable(content, yamlRoot, errorMsg)) {
return false;
}
detail = ConvertYamlToJson(yamlRoot);
return true;
}
return false;
}

bool IsConfigEnabled(const string& name, const Json::Value& detail) {
const char* key = "enable";
const Json::Value* itr = detail.find(key, key + strlen(key));
if (itr != nullptr) {
if (!itr->isBool()) {
LOG_WARNING(sLogger,
("problem encountered in config parsing",
"param enable is not of type bool")("action", "ignore the config")("config", name));
return false;
}
return itr->asBool();
}
return true;
}

ConfigType GetConfigType(const Json::Value& detail) {
return detail.isMember("task") ? ConfigType::Task : ConfigType::Pipeline;
}

} // namespace logtail
36 changes: 36 additions & 0 deletions core/config/ConfigUtil.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <json/json.h>

#include <filesystem>
#include <string>

namespace logtail {

enum class ConfigType { Pipeline, Task };

bool LoadConfigDetailFromFile(const std::filesystem::path& filepath, Json::Value& detail);
bool ParseConfigDetail(const std::string& content,
const std::string& extenstion,
Json::Value& detail,
std::string& errorMsg);
bool IsConfigEnabled(const std::string& name, const Json::Value& detail);
ConfigType GetConfigType(const Json::Value& detail);

} // namespace logtail
30 changes: 0 additions & 30 deletions core/config/InstanceConfig.cpp

This file was deleted.

60 changes: 0 additions & 60 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
#include <string>

#include "app_config/AppConfig.h"
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/JsonUtil.h"
#include "common/ParamExtractor.h"
#include "common/YamlUtil.h"
#include "pipeline/plugin/PluginRegistry.h"

DEFINE_FLAG_BOOL(enable_env_ref_in_config, "enable environment variable reference replacement in configuration", false);
Expand Down Expand Up @@ -667,61 +664,4 @@ bool PipelineConfig::ReplaceEnvVar() {
return res;
}

bool LoadConfigDetailFromFile(const filesystem::path& filepath, Json::Value& detail) {
const string& ext = filepath.extension().string();
const string& configName = filepath.stem().string();
if (configName == "region_config") {
return false;
}
if (ext != ".yaml" && ext != ".yml" && ext != ".json") {
LOG_WARNING(sLogger, ("unsupported config file format", "skip current object")("filepath", filepath));
return false;
}
string content;
if (!ReadFile(filepath.string(), content)) {
LOG_WARNING(sLogger, ("failed to open config file", "skip current object")("filepath", filepath));
return false;
}
if (content.empty()) {
LOG_WARNING(sLogger, ("empty config file", "skip current object")("filepath", filepath));
return false;
}
string errorMsg;
if (!ParseConfigDetail(content, ext, detail, errorMsg)) {
LOG_WARNING(sLogger,
("config file format error", "skip current object")("error msg", errorMsg)("filepath", filepath));
return false;
}
return true;
}

bool ParseConfigDetail(const string& content, const string& extension, Json::Value& detail, string& errorMsg) {
if (extension == ".json") {
return ParseJsonTable(content, detail, errorMsg);
} else if (extension == ".yaml" || extension == ".yml") {
YAML::Node yamlRoot;
if (!ParseYamlTable(content, yamlRoot, errorMsg)) {
return false;
}
detail = ConvertYamlToJson(yamlRoot);
return true;
}
return false;
}

bool IsConfigEnabled(const string& name, const Json::Value& detail) {
const char* key = "enable";
const Json::Value* itr = detail.find(key, key + strlen(key));
if (itr != nullptr) {
if (!itr->isBool()) {
LOG_WARNING(sLogger,
("problem encountered in config parsing",
"param enable is not of type bool")("action", "ignore the config")("config", name));
return false;
}
return itr->asBool();
}
return true;
}

} // namespace logtail
8 changes: 0 additions & 8 deletions core/config/PipelineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#pragma once

#include <json/json.h>
#include <re2/re2.h>

#include <cstdint>
#include <filesystem>
Expand Down Expand Up @@ -84,11 +83,4 @@ inline bool operator!=(const PipelineConfig& lhs, const PipelineConfig& rhs) {
return !(lhs == rhs);
}

bool LoadConfigDetailFromFile(const std::filesystem::path& filepath, Json::Value& detail);
bool ParseConfigDetail(const std::string& content,
const std::string& extenstion,
Json::Value& detail,
std::string& errorMsg);
bool IsConfigEnabled(const std::string& name, const Json::Value& detail);

} // namespace logtail
Loading
Loading