Skip to content

Commit

Permalink
add support fot task pipeline (#1883)
Browse files Browse the repository at this point in the history
  • Loading branch information
henryzhx8 authored Nov 19, 2024
1 parent 18cc292 commit ac991b8
Show file tree
Hide file tree
Showing 38 changed files with 1,868 additions and 488 deletions.
1 change: 1 addition & 0 deletions core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ set(SUB_DIRECTORIES_LIST
application app_config checkpoint container_manager metadata logger go_pipeline monitor monitor/metric_constants monitor/profile_sender models
config config/watcher constants
pipeline pipeline/batch pipeline/limiter pipeline/plugin pipeline/plugin/creator pipeline/plugin/instance pipeline/plugin/interface pipeline/queue pipeline/route pipeline/serializer
task_pipeline
runner runner/sink/http
protobuf/sls protobuf/models
file_server file_server/event file_server/event_handler file_server/event_listener file_server/reader file_server/polling
Expand Down
1 change: 1 addition & 0 deletions core/app_config/AppConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
#include "common/FileSystemUtil.h"
#include "common/JsonUtil.h"
#include "common/LogtailCommonFlags.h"
#include "config/InstanceConfigManager.h"
#include "config/watcher/InstanceConfigWatcher.h"
#include "file_server/ConfigManager.h"
#include "file_server/reader/LogFileReader.h"
Expand Down
14 changes: 9 additions & 5 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
#include "common/version.h"
#include "config/ConfigDiff.h"
#include "config/InstanceConfigManager.h"
#include "config/watcher/ConfigWatcher.h"
#include "config/watcher/PipelineConfigWatcher.h"
#include "config/watcher/InstanceConfigWatcher.h"
#include "file_server/ConfigManager.h"
#include "file_server/EventDispatcher.h"
Expand All @@ -51,6 +51,7 @@
#include "runner/FlusherRunner.h"
#include "runner/ProcessorRunner.h"
#include "runner/sink/http/HttpSink.h"
#include "task_pipeline/TaskPipelineManager.h"
#ifdef __ENTERPRISE__
#include "config/provider/EnterpriseConfigProvider.h"
#include "config/provider/LegacyConfigProvider.h"
Expand Down Expand Up @@ -218,7 +219,7 @@ void Application::Start() { // GCOVR_EXCL_START
("failed to create dir for local pipeline_config",
"manual creation may be required")("error code", ec.value())("error msg", ec.message()));
}
ConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
PipelineConfigWatcher::GetInstance()->AddSource(localConfigPath.string());
}

#ifdef __ENTERPRISE__
Expand Down Expand Up @@ -275,9 +276,12 @@ void Application::Start() { // GCOVR_EXCL_START
lastCheckTagsTime = curTime;
}
if (curTime - lastConfigCheckTime >= INT32_FLAG(config_scan_interval)) {
PipelineConfigDiff pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckConfigDiff();
if (!pipelineConfigDiff.IsEmpty()) {
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff);
auto configDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff();
if (!configDiff.first.IsEmpty()) {
PipelineManager::GetInstance()->UpdatePipelines(configDiff.first);
}
if (!configDiff.second.IsEmpty()) {
TaskPipelineManager::GetInstance()->UpdatePipelines(configDiff.second);
}
InstanceConfigDiff instanceConfigDiff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff();
if (!instanceConfigDiff.IsEmpty()) {
Expand Down
39 changes: 39 additions & 0 deletions core/common/ParamExtractor.h
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,45 @@
region); \
}

#define TASK_PARAM_ERROR_RETURN(logger, alarm, msg, module, config) \
if (module.empty()) { \
LOG_ERROR(logger, ("failed to parse config", msg)("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, std::string(msg) + ": abort, config: " + config); \
} else { \
LOG_ERROR(logger, ("failed to parse config", msg)("module", module)("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, \
std::string(msg) + ": abort, module: " + module + ", config: " + config); \
} \
return false
#define TASK_PARAM_WARNING_IGNORE(logger, alarm, msg, module, config) \
if (module.empty()) { \
LOG_WARNING(logger, \
("problem encountered in config parsing", msg)("action", "ignore param")("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, std::string(msg) + ": ignore param, config: " + config); \
} else { \
LOG_WARNING(logger, \
("problem encountered in config parsing", \
msg)("action", "ignore param")("module", module)("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, \
std::string(msg) + ": ignore param, module: " + module + ", config: " + config); \
}
#define TASK_PARAM_WARNING_DEFAULT(logger, alarm, msg, val, module, config) \
if (module.empty()) { \
LOG_WARNING(logger, \
("problem encountered in config parsing", \
msg)("action", "use default value instead")("default value", ToString(val))("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, \
std::string(msg) + ": use default value instead, default value: " + ToString(val) \
+ ", config: " + config); \
} else { \
LOG_WARNING(logger, \
("problem encountered in config parsing", msg)("action", "use default value instead")( \
"default value", ToString(val))("module", module)("config", config)); \
alarm.SendAlarm(CATEGORY_CONFIG_ALARM, \
std::string(msg) + ": use default value instead, default value: " + ToString(val) \
+ ", module: " + module + ", config: " + config); \
}

namespace logtail {

const std::string noModule = "";
Expand Down
20 changes: 9 additions & 11 deletions core/config/ConfigDiff.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,23 +21,21 @@

#include "config/InstanceConfig.h"
#include "config/PipelineConfig.h"
#include "config/TaskConfig.h"

namespace logtail {

class PipelineConfigDiff {
public:
std::vector<PipelineConfig> mAdded;
std::vector<PipelineConfig> mModified;
template <class T>
struct ConfigDiff {
std::vector<T> mAdded;
std::vector<T> mModified;
std::vector<std::string> mRemoved;
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

class InstanceConfigDiff {
public:
std::vector<InstanceConfig> mAdded;
std::vector<InstanceConfig> mModified;
std::vector<std::string> mRemoved;
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

using PipelineConfigDiff = ConfigDiff<PipelineConfig>;
using TaskConfigDiff = ConfigDiff<TaskConfig>;
using InstanceConfigDiff = ConfigDiff<InstanceConfig>;

} // namespace logtail
87 changes: 87 additions & 0 deletions core/config/ConfigUtil.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,87 @@
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "config/ConfigUtil.h"

#include "common/FileSystemUtil.h"
#include "common/JsonUtil.h"
#include "common/YamlUtil.h"
#include "logger/Logger.h"

using namespace std;

namespace logtail {

bool LoadConfigDetailFromFile(const filesystem::path& filepath, Json::Value& detail) {
const string& ext = filepath.extension().string();
const string& configName = filepath.stem().string();
if (configName == "region_config") {
return false;
}
if (ext != ".yaml" && ext != ".yml" && ext != ".json") {
LOG_WARNING(sLogger, ("unsupported config file format", "skip current object")("filepath", filepath));
return false;
}
string content;
if (!ReadFile(filepath.string(), content)) {
LOG_WARNING(sLogger, ("failed to open config file", "skip current object")("filepath", filepath));
return false;
}
if (content.empty()) {
LOG_WARNING(sLogger, ("empty config file", "skip current object")("filepath", filepath));
return false;
}
string errorMsg;
if (!ParseConfigDetail(content, ext, detail, errorMsg)) {
LOG_WARNING(sLogger,
("config file format error", "skip current object")("error msg", errorMsg)("filepath", filepath));
return false;
}
return true;
}

bool ParseConfigDetail(const string& content, const string& extension, Json::Value& detail, string& errorMsg) {
if (extension == ".json") {
return ParseJsonTable(content, detail, errorMsg);
} else if (extension == ".yaml" || extension == ".yml") {
YAML::Node yamlRoot;
if (!ParseYamlTable(content, yamlRoot, errorMsg)) {
return false;
}
detail = ConvertYamlToJson(yamlRoot);
return true;
}
return false;
}

bool IsConfigEnabled(const string& name, const Json::Value& detail) {
const char* key = "enable";
const Json::Value* itr = detail.find(key, key + strlen(key));
if (itr != nullptr) {
if (!itr->isBool()) {
LOG_WARNING(sLogger,
("problem encountered in config parsing",
"param enable is not of type bool")("action", "ignore the config")("config", name));
return false;
}
return itr->asBool();
}
return true;
}

ConfigType GetConfigType(const Json::Value& detail) {
return detail.isMember("task") ? ConfigType::Task : ConfigType::Pipeline;
}

} // namespace logtail
36 changes: 36 additions & 0 deletions core/config/ConfigUtil.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
/*
* Copyright 2024 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <json/json.h>

#include <filesystem>
#include <string>

namespace logtail {

enum class ConfigType { Pipeline, Task };

bool LoadConfigDetailFromFile(const std::filesystem::path& filepath, Json::Value& detail);
bool ParseConfigDetail(const std::string& content,
const std::string& extenstion,
Json::Value& detail,
std::string& errorMsg);
bool IsConfigEnabled(const std::string& name, const Json::Value& detail);
ConfigType GetConfigType(const Json::Value& detail);

} // namespace logtail
30 changes: 0 additions & 30 deletions core/config/InstanceConfig.cpp

This file was deleted.

60 changes: 0 additions & 60 deletions core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,8 @@
#include <string>

#include "app_config/AppConfig.h"
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/JsonUtil.h"
#include "common/ParamExtractor.h"
#include "common/YamlUtil.h"
#include "pipeline/plugin/PluginRegistry.h"

DEFINE_FLAG_BOOL(enable_env_ref_in_config, "enable environment variable reference replacement in configuration", false);
Expand Down Expand Up @@ -667,61 +664,4 @@ bool PipelineConfig::ReplaceEnvVar() {
return res;
}

bool LoadConfigDetailFromFile(const filesystem::path& filepath, Json::Value& detail) {
const string& ext = filepath.extension().string();
const string& configName = filepath.stem().string();
if (configName == "region_config") {
return false;
}
if (ext != ".yaml" && ext != ".yml" && ext != ".json") {
LOG_WARNING(sLogger, ("unsupported config file format", "skip current object")("filepath", filepath));
return false;
}
string content;
if (!ReadFile(filepath.string(), content)) {
LOG_WARNING(sLogger, ("failed to open config file", "skip current object")("filepath", filepath));
return false;
}
if (content.empty()) {
LOG_WARNING(sLogger, ("empty config file", "skip current object")("filepath", filepath));
return false;
}
string errorMsg;
if (!ParseConfigDetail(content, ext, detail, errorMsg)) {
LOG_WARNING(sLogger,
("config file format error", "skip current object")("error msg", errorMsg)("filepath", filepath));
return false;
}
return true;
}

bool ParseConfigDetail(const string& content, const string& extension, Json::Value& detail, string& errorMsg) {
if (extension == ".json") {
return ParseJsonTable(content, detail, errorMsg);
} else if (extension == ".yaml" || extension == ".yml") {
YAML::Node yamlRoot;
if (!ParseYamlTable(content, yamlRoot, errorMsg)) {
return false;
}
detail = ConvertYamlToJson(yamlRoot);
return true;
}
return false;
}

bool IsConfigEnabled(const string& name, const Json::Value& detail) {
const char* key = "enable";
const Json::Value* itr = detail.find(key, key + strlen(key));
if (itr != nullptr) {
if (!itr->isBool()) {
LOG_WARNING(sLogger,
("problem encountered in config parsing",
"param enable is not of type bool")("action", "ignore the config")("config", name));
return false;
}
return itr->asBool();
}
return true;
}

} // namespace logtail
8 changes: 0 additions & 8 deletions core/config/PipelineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@
#pragma once

#include <json/json.h>
#include <re2/re2.h>

#include <cstdint>
#include <filesystem>
Expand Down Expand Up @@ -84,11 +83,4 @@ inline bool operator!=(const PipelineConfig& lhs, const PipelineConfig& rhs) {
return !(lhs == rhs);
}

bool LoadConfigDetailFromFile(const std::filesystem::path& filepath, Json::Value& detail);
bool ParseConfigDetail(const std::string& content,
const std::string& extenstion,
Json::Value& detail,
std::string& errorMsg);
bool IsConfigEnabled(const std::string& name, const Json::Value& detail);

} // namespace logtail
Loading

0 comments on commit ac991b8

Please sign in to comment.