Skip to content

Commit

Permalink
[WIP] support configServer V2 (#1592)
Browse files Browse the repository at this point in the history
* modify pb

* fix

* rm mAttributes

* rm FetchConfig

* rm SendHeartBeat

* add ProcessSourceDir

* support CommonConfigProvider

* add FetchProcessConfigFromServer and FetchPipelineConfigFromServer

* fix test

* add config feedbacker

* fix loadconfigfile

* heartbeat

* rm once from cpp

* add mProcessFileInfoMap

* add CommandSource

* modify ConfigWatcher

* modify mRegion

* modify operator=

* modify

* format

* modify

* fix Update

* add test

* add RegisterCallback

* fix

* format

* add test

* fix test

* add FeedbackProcessConfigStatus

* rm callbacks

* fix test

* add ConfigFeedbackableUnittest

* fix

* add GenerateCommandFeedBackKey

* LCOV_EXCL_STOP

* fix

* rm provider

---------

Co-authored-by: Tom Yu <[email protected]>
  • Loading branch information
quzard and yyuuttaaoo authored Jul 16, 2024
1 parent 0a477c0 commit c3252b1
Show file tree
Hide file tree
Showing 62 changed files with 10,630 additions and 537 deletions.
4 changes: 2 additions & 2 deletions config_server/protocol/v2/agent.proto
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ enum RequestFlags {
// API: /Agent/Heartbeat

// Agent sends requests to the ConfigServer to get config updates and receive commands.
message HearbeatRequest {
message HeartbeatRequest {
bytes request_id = 1;
uint64 sequence_num = 2; // Increment every request, for server to check sync status
uint64 capabilities = 3; // Bitmask of flags defined by AgentCapabilities enum
Expand All @@ -85,7 +85,7 @@ message HearbeatRequest {
repeated CommandInfo custom_commands = 12; // Information about command history
uint64 flags = 13; // Predefined command flag
bytes opaque = 14; // Opaque data for extension
// before 100 (inclusive) are reserved for future official fields}
// before 100 (inclusive) are reserved for future official fields
}

// Define Config's detail
Expand Down
2 changes: 1 addition & 1 deletion core/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ include(${CMAKE_CURRENT_SOURCE_DIR}/flusher/links.cmake)

# Subdirectories (modules). except for common, input, processor, flusher, observer, helper and spl.
set(SUB_DIRECTORIES_LIST
batch application app_config checkpoint compression config config/provider config/watcher config_manager config_server_pb
batch application app_config checkpoint compression config config/feedbacker config/provider config/watcher config_manager config_server_pb/v1 config_server_pb/v2
container_manager controller event event_handler event_listener file_server go_pipeline log_pb logger
models monitor parser pipeline plugin plugin/creator plugin/instance plugin/interface polling
profile_sender queue reader sdk sender serializer sls_control fuse
Expand Down
48 changes: 34 additions & 14 deletions core/application/Application.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@
#include "monitor/LogFileProfiler.h"
#include "monitor/MetricExportor.h"
#include "monitor/Monitor.h"
#include "pipeline/PipelineManager.h"
#include "pipeline/PipelineConfigManager.h"
#include "pipeline/ProcessConfigManager.h"
#include "plugin/PluginRegistry.h"
#include "processor/daemon/LogProcess.h"
#include "sender/Sender.h"
Expand Down Expand Up @@ -193,17 +194,32 @@ void Application::Start() { // GCOVR_EXCL_START
// flusher_sls should always be loaded, since profiling will rely on this.
Sender::Instance()->Init();

// add local config dir
filesystem::path localConfigPath
= filesystem::path(AppConfig::GetInstance()->GetLogtailSysConfDir()) / "config" / "local";
error_code ec;
filesystem::create_directories(localConfigPath, ec);
if (ec) {
LOG_WARNING(sLogger,
("failed to create dir for local config",
"manual creation may be required")("error code", ec.value())("error msg", ec.message()));
{
// add local config dir
filesystem::path localConfigPath
= filesystem::path(AppConfig::GetInstance()->GetLogtailSysConfDir()) / "config" / "local";
error_code ec;
filesystem::create_directories(localConfigPath, ec);
if (ec) {
LOG_WARNING(sLogger,
("failed to create dir for local pipelineconfig",
"manual creation may be required")("error code", ec.value())("error msg", ec.message()));
}
ConfigWatcher::GetInstance()->AddPipelineSource(localConfigPath.string());
}
{
// add local config dir
filesystem::path localConfigPath
= filesystem::path(AppConfig::GetInstance()->GetLogtailSysConfDir()) / "processconfig" / "local";
error_code ec;
filesystem::create_directories(localConfigPath, ec);
if (ec) {
LOG_WARNING(sLogger,
("failed to create dir for local processconfig",
"manual creation may be required")("error code", ec.value())("error msg", ec.message()));
}
ConfigWatcher::GetInstance()->AddProcessSource(localConfigPath.string());
}
ConfigWatcher::GetInstance()->AddSource(localConfigPath.string());

#ifdef __ENTERPRISE__
EnterpriseConfigProvider::GetInstance()->Init("enterprise");
Expand Down Expand Up @@ -251,9 +267,13 @@ void Application::Start() { // GCOVR_EXCL_START
lastCheckTagsTime = curTime;
}
if (curTime - lastConfigCheckTime >= INT32_FLAG(config_scan_interval)) {
ConfigDiff diff = ConfigWatcher::GetInstance()->CheckConfigDiff();
if (!diff.IsEmpty()) {
PipelineManager::GetInstance()->UpdatePipelines(diff);
PipelineConfigDiff pipelineConfigDiff = ConfigWatcher::GetInstance()->CheckPipelineConfigDiff();
if (!pipelineConfigDiff.IsEmpty()) {
PipelineManager::GetInstance()->UpdatePipelines(pipelineConfigDiff);
}
ProcessConfigDiff processConfigDiff = ConfigWatcher::GetInstance()->CheckProcessConfigDiff();
if (!processConfigDiff.IsEmpty()) {
ProcessConfigManager::GetInstance()->UpdateProcessConfigs(processConfigDiff);
}
lastConfigCheckTime = curTime;
}
Expand Down
18 changes: 14 additions & 4 deletions core/config/ConfigDiff.h
Original file line number Diff line number Diff line change
Expand Up @@ -19,16 +19,26 @@
#include <string>
#include <vector>

#include "config/Config.h"
#include "config/PipelineConfig.h"
#include "config/ProcessConfig.h"

namespace logtail {

struct ConfigDiff {
std::vector<Config> mAdded;
std::vector<Config> mModified;
class PipelineConfigDiff {
public:
std::vector<PipelineConfig> mAdded;
std::vector<PipelineConfig> mModified;
std::vector<std::string> mRemoved;
std::vector<std::string> mUnchanged; // 过渡使用,仅供插件系统用
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

class ProcessConfigDiff {
public:
std::vector<ProcessConfig> mAdded;
std::vector<ProcessConfig> mModified;
std::vector<std::string> mRemoved;
std::vector<std::string> mUnchanged; // 过渡使用,仅供插件系统用
bool IsEmpty() { return mRemoved.empty() && mAdded.empty() && mModified.empty(); }
};

Expand Down
6 changes: 3 additions & 3 deletions core/config/Config.cpp → core/config/PipelineConfig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

#include "config/Config.h"
#include "config/PipelineConfig.h"

#include <string>

Expand Down Expand Up @@ -94,7 +94,7 @@ static void ReplaceEnvVarRef(Json::Value& value, bool& res) {
}
}

bool Config::Parse() {
bool PipelineConfig::Parse() {
if (BOOL_FLAG(enable_env_ref_in_config)) {
if (ReplaceEnvVar()) {
LOG_INFO(sLogger, ("env vars in config are replaced, config", mDetail->toStyledString())("config", mName));
Expand Down Expand Up @@ -698,7 +698,7 @@ bool Config::Parse() {
return true;
}

bool Config::ReplaceEnvVar() {
bool PipelineConfig::ReplaceEnvVar() {
bool res = false;
ReplaceEnvVarRef(*mDetail, res);
return res;
Expand Down
8 changes: 4 additions & 4 deletions core/config/Config.h → core/config/PipelineConfig.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@

namespace logtail {

struct Config {
struct PipelineConfig {
std::string mName;
std::unique_ptr<Json::Value> mDetail;
uint32_t mCreateTime = 0;
Expand All @@ -49,7 +49,7 @@ struct Config {
std::string mLogstore;
std::string mRegion;

Config(const std::string& name, std::unique_ptr<Json::Value>&& detail) : mName(name), mDetail(std::move(detail)) {}
PipelineConfig(const std::string& name, std::unique_ptr<Json::Value>&& detail) : mName(name), mDetail(std::move(detail)) {}

bool Parse();

Expand All @@ -75,11 +75,11 @@ struct Config {
bool ReplaceEnvVar();
};

inline bool operator==(const Config& lhs, const Config& rhs) {
inline bool operator==(const PipelineConfig& lhs, const PipelineConfig& rhs) {
return (lhs.mName == rhs.mName) && (*lhs.mDetail == *rhs.mDetail);
}

inline bool operator!=(const Config& lhs, const Config& rhs) {
inline bool operator!=(const PipelineConfig& lhs, const PipelineConfig& rhs) {
return !(lhs == rhs);
}

Expand Down
30 changes: 30 additions & 0 deletions core/config/ProcessConfig.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
// Copyright 2023 iLogtail Authors
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "config/ProcessConfig.h"

#include <string>

#include "app_config/AppConfig.h"
#include "common/FileSystemUtil.h"
#include "common/Flags.h"
#include "common/JsonUtil.h"
#include "common/ParamExtractor.h"
#include "common/YamlUtil.h"
#include "plugin/PluginRegistry.h"


using namespace std;

namespace logtail {} // namespace logtail
88 changes: 88 additions & 0 deletions core/config/ProcessConfig.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
/*
* Copyright 2023 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

#pragma once

#include <json/json.h>
#include <re2/re2.h>

#include <cstdint>
#include <filesystem>
#include <memory>
#include <string>
#include <vector>

namespace logtail {

struct ProcessConfig {
std::string mName;
std::unique_ptr<Json::Value> mDetail;

// for alarm only
std::string mProject;
std::string mLogstore;
std::string mRegion;

ProcessConfig(const std::string& name, std::unique_ptr<Json::Value>&& detail)
: mName(name), mDetail(std::move(detail)) {
mProject = "";
mLogstore = "";
mRegion = "";
}
ProcessConfig(const logtail::ProcessConfig& config) {
mName = config.mName;
mDetail = std::make_unique<Json::Value>(*config.mDetail);
mProject = "";
mLogstore = "";
mRegion = "";
}

ProcessConfig& operator=(ProcessConfig&& other) {
if (this != &other) {
mName = std::move(other.mName);
mDetail = std::move(other.mDetail);
mProject = "";
mLogstore = "";
mRegion = "";
}
return *this;
}

ProcessConfig& operator=(const ProcessConfig& other) {
if (this != &other) {
mName = other.mName;
mDetail = std::make_unique<Json::Value>(*other.mDetail);
mProject = "";
mLogstore = "";
mRegion = "";
}
return *this;
}

bool Parse() { return true; }

const Json::Value& GetConfig() const { return *mDetail; }
};

inline bool operator==(const ProcessConfig& lhs, const ProcessConfig& rhs) {
return (lhs.mName == rhs.mName) && (*lhs.mDetail == *rhs.mDetail);
}

inline bool operator!=(const ProcessConfig& lhs, const ProcessConfig& rhs) {
return !(lhs == rhs);
}

} // namespace logtail
91 changes: 91 additions & 0 deletions core/config/feedbacker/ConfigFeedbackReceiver.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Copyright 2024 iLogtail Authors
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/


#include "config/feedbacker/ConfigFeedbackReceiver.h"

#include <unordered_map>

namespace logtail {

ConfigFeedbackReceiver& ConfigFeedbackReceiver::GetInstance() {
static ConfigFeedbackReceiver instance;
return instance;
}

void ConfigFeedbackReceiver::RegisterPipelineConfig(const std::string& name, ConfigFeedbackable* feedbackable) {
std::lock_guard<std::mutex> lock(mMutex);
mPipelineConfigFeedbackableMap[name] = feedbackable;
}

void ConfigFeedbackReceiver::RegisterProcessConfig(const std::string& name, ConfigFeedbackable* feedbackable) {
std::lock_guard<std::mutex> lock(mMutex);
mProcessConfigFeedbackableMap[name] = feedbackable;
}

void ConfigFeedbackReceiver::RegisterCommand(const std::string& type,
const std::string& name,
ConfigFeedbackable* feedbackable) {
std::lock_guard<std::mutex> lock(mMutex);
mCommandFeedbackableMap[GenerateCommandFeedBackKey(type, name)] = feedbackable;
}

void ConfigFeedbackReceiver::UnregisterPipelineConfig(const std::string& name) {
std::lock_guard<std::mutex> lock(mMutex);
mPipelineConfigFeedbackableMap.erase(name);
}

void ConfigFeedbackReceiver::UnregisterProcessConfig(const std::string& name) {
std::lock_guard<std::mutex> lock(mMutex);
mProcessConfigFeedbackableMap.erase(name);
}

void ConfigFeedbackReceiver::UnregisterCommand(const std::string& type, const std::string& name) {
std::lock_guard<std::mutex> lock(mMutex);
mCommandFeedbackableMap.erase(GenerateCommandFeedBackKey(type, name));
}

void ConfigFeedbackReceiver::FeedbackPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status) {
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mPipelineConfigFeedbackableMap.find(name);
if (iter != mPipelineConfigFeedbackableMap.end()) {
iter->second->FeedbackPipelineConfigStatus(name, status);
}
}

void ConfigFeedbackReceiver::FeedbackProcessConfigStatus(const std::string& name, ConfigFeedbackStatus status) {
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mProcessConfigFeedbackableMap.find(name);
if (iter != mProcessConfigFeedbackableMap.end()) {
iter->second->FeedbackProcessConfigStatus(name, status);
}
}

void ConfigFeedbackReceiver::FeedbackCommandConfigStatus(const std::string& type,
const std::string& name,
ConfigFeedbackStatus status) {
std::lock_guard<std::mutex> lock(mMutex);
auto iter = mCommandFeedbackableMap.find(GenerateCommandFeedBackKey(type, name));
if (iter != mCommandFeedbackableMap.end()) {
iter->second->FeedbackCommandConfigStatus(type, name, status);
}
}

std::string GenerateCommandFeedBackKey(const std::string& type, const std::string& name) {
return type + '\1' + name;
}

} // namespace logtail
Loading

0 comments on commit c3252b1

Please sign in to comment.