From 76da34fc6ae3bea31f0992889e6fcafba7cc034c Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Tue, 19 Nov 2024 10:43:20 +0000 Subject: [PATCH 01/10] fix --- config_server/protocol/v2/agentV2.proto | 15 +++------------ core/app_config/AppConfig.cpp | 4 ++-- core/app_config/AppConfig.h | 2 +- core/application/Application.cpp | 2 +- .../common_provider/CommonConfigProvider.cpp | 4 ++-- .../LegacyCommonConfigProvider.cpp | 11 +++++------ core/config/provider/ConfigProvider.cpp | 10 +++++----- core/config/provider/ConfigProvider.h | 2 +- .../config/CommonConfigProviderUnittest.cpp | 6 +++--- 9 files changed, 23 insertions(+), 33 deletions(-) diff --git a/config_server/protocol/v2/agentV2.proto b/config_server/protocol/v2/agentV2.proto index 649cd433da..8baeac334f 100644 --- a/config_server/protocol/v2/agentV2.proto +++ b/config_server/protocol/v2/agentV2.proto @@ -28,15 +28,6 @@ message ConfigInfo { map extra = 5; // Optional extra info } -// Define the Command information carried in the request -message CommandInfo { - string type = 1; // Command's type - string name = 2; // Required, Command's unique identification - ConfigStatus status = 3; // Command's status - string message = 4; // Optional error message - map extra = 5; // Optional extra info -} - // Define Agent's basic attributes message AgentAttributes { bytes version = 1; // Agent's version @@ -85,7 +76,7 @@ message HeartbeatRequest { int64 startup_time = 9; // Required, Agent's startup time repeated ConfigInfo pipeline_configs = 10; // Information about the current PIPELINE_CONFIG held by the Agent repeated ConfigInfo instance_configs = 11; // Information about the current AGENT_CONFIG held by the Agent - repeated CommandInfo custom_commands = 12; // Information about command history + repeated ConfigInfo custom_commands = 12; // Information about command history uint64 flags = 13; // Predefined command flag bytes opaque = 14; // Opaque data for extension // before 100 (inclusive) are reserved for future official fields @@ -160,7 +151,7 @@ message FetchConfigRequest { bytes instance_id = 2; // Agent's unique identification repeated ConfigInfo pipeline_configs = 3; // Information about the current PIPELINE_CONFIG held by the Agent repeated ConfigInfo instance_configs = 4; // Information about the current AGENT_CONFIG held by the Agent - repeated CommandInfo custom_commands = 5; // Information about command history + repeated ConfigInfo custom_commands = 5; // Information about command history } // ConfigServer response to Agent's config fetching request @@ -180,7 +171,7 @@ message ReportStatusRequest { bytes instance_id = 2; // Agent's unique identification repeated ConfigInfo pipeline_configs = 3; // status about the current PIPELINE_CONFIG held by the Agent repeated ConfigInfo instance_configs = 4; // status about the current AGENT_CONFIG held by the Agent - repeated CommandInfo custom_commands = 5; // status about command history + repeated ConfigInfo custom_commands = 5; // status about command history } // ConfigServer response to Agent's report status request diff --git a/core/app_config/AppConfig.cpp b/core/app_config/AppConfig.cpp index 976a7b3339..cbec74448c 100644 --- a/core/app_config/AppConfig.cpp +++ b/core/app_config/AppConfig.cpp @@ -483,11 +483,11 @@ string GetFileTagsDir() { } } -string GetPipelineConfigDir() { +string GetContinuousPipelineConfigDir() { if (BOOL_FLAG(logtail_mode)) { return "config"; } else { - return "pipeline_config"; + return "continuous_pipeline_config"; } } diff --git a/core/app_config/AppConfig.h b/core/app_config/AppConfig.h index 4a28cf7c1a..12f23a25ee 100644 --- a/core/app_config/AppConfig.h +++ b/core/app_config/AppConfig.h @@ -54,7 +54,7 @@ std::string GetObserverEbpfHostPath(); std::string GetSendBufferFileNamePrefix(); std::string GetLegacyUserLocalConfigFilePath(); std::string GetExactlyOnceCheckpoint(); -std::string GetPipelineConfigDir(); +std::string GetContinuousPipelineConfigDir(); template class DoubleBuffer { diff --git a/core/application/Application.cpp b/core/application/Application.cpp index 08df561cd2..2a60ef20af 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -211,7 +211,7 @@ void Application::Start() { // GCOVR_EXCL_START { // add local config dir filesystem::path localConfigPath - = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) / GetPipelineConfigDir() / "local"; + = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) / GetContinuousPipelineConfigDir() / "local"; error_code ec; filesystem::create_directories(localConfigPath, ec); if (ec) { diff --git a/core/config/common_provider/CommonConfigProvider.cpp b/core/config/common_provider/CommonConfigProvider.cpp index a843521915..45588d16fb 100644 --- a/core/config/common_provider/CommonConfigProvider.cpp +++ b/core/config/common_provider/CommonConfigProvider.cpp @@ -133,7 +133,7 @@ void CommonConfigProvider::LoadConfigFile() { error_code ec; lock_guard pipelineInfomaplock(mPipelineInfoMapMux); lock_guard lockPipeline(mPipelineMux); - for (auto const& entry : filesystem::directory_iterator(mPipelineSourceDir, ec)) { + for (auto const& entry : filesystem::directory_iterator(mContinuousPipelineConfigDir, ec)) { Json::Value detail; if (LoadConfigDetailFromFile(entry, detail)) { ConfigInfo info; @@ -422,7 +422,7 @@ bool CommonConfigProvider::DumpConfigFile(const configserver::proto::v2::ConfigD void CommonConfigProvider::UpdateRemotePipelineConfig( const google::protobuf::RepeatedPtrField& configs) { error_code ec; - const std::filesystem::path& sourceDir = mPipelineSourceDir; + const std::filesystem::path& sourceDir = mContinuousPipelineConfigDir; filesystem::create_directories(sourceDir, ec); if (ec) { StopUsingConfigServer(); diff --git a/core/config/common_provider/LegacyCommonConfigProvider.cpp b/core/config/common_provider/LegacyCommonConfigProvider.cpp index b673bbeda5..96f2132c5a 100644 --- a/core/config/common_provider/LegacyCommonConfigProvider.cpp +++ b/core/config/common_provider/LegacyCommonConfigProvider.cpp @@ -286,20 +286,19 @@ void LegacyCommonConfigProvider::UpdateRemoteConfig( const google::protobuf::RepeatedPtrField& checkResults, const google::protobuf::RepeatedPtrField& configDetails) { error_code ec; - filesystem::create_directories(mPipelineSourceDir, ec); + filesystem::create_directories(mContinuousPipelineConfigDir, ec); if (ec) { StopUsingConfigServer(); LOG_ERROR(sLogger, - ("failed to create dir for legacy common configs", - "stop receiving config from legacy common config server")("dir", mPipelineSourceDir.string())( - "error code", ec.value())("error msg", ec.message())); + ("failed to create dir for legacy common configs", "stop receiving config from legacy common config server")( + "dir", mContinuousPipelineConfigDir.string())("error code", ec.value())("error msg", ec.message())); return; } lock_guard lock(mPipelineMux); for (const auto& checkResult : checkResults) { - filesystem::path filePath = mPipelineSourceDir / (checkResult.name() + ".yaml"); - filesystem::path tmpFilePath = mPipelineSourceDir / (checkResult.name() + ".yaml.new"); + filesystem::path filePath = mContinuousPipelineConfigDir / (checkResult.name() + ".yaml"); + filesystem::path tmpFilePath = mContinuousPipelineConfigDir / (checkResult.name() + ".yaml.new"); switch (checkResult.check_status()) { case configserver::proto::DELETED: mConfigNameVersionMap.erase(checkResult.name()); diff --git a/core/config/provider/ConfigProvider.cpp b/core/config/provider/ConfigProvider.cpp index 1c6b68b31b..28ccd1a72c 100644 --- a/core/config/provider/ConfigProvider.cpp +++ b/core/config/provider/ConfigProvider.cpp @@ -24,17 +24,17 @@ namespace logtail { void ConfigProvider::Init(const string& dir) { // default path: /etc/ilogtail/config/${dir} - mPipelineSourceDir.assign(AppConfig::GetInstance()->GetLoongcollectorConfDir()); - mPipelineSourceDir /= GetPipelineConfigDir(); - mPipelineSourceDir /= dir; + mContinuousPipelineConfigDir.assign(AppConfig::GetInstance()->GetLoongcollectorConfDir()); + mContinuousPipelineConfigDir /= GetContinuousPipelineConfigDir(); + mContinuousPipelineConfigDir /= dir; mInstanceSourceDir.assign(AppConfig::GetInstance()->GetLoongcollectorConfDir()); mInstanceSourceDir /= "instance_config"; mInstanceSourceDir /= dir; error_code ec; - filesystem::create_directories(mPipelineSourceDir, ec); - PipelineConfigWatcher::GetInstance()->AddSource(mPipelineSourceDir, &mPipelineMux); + filesystem::create_directories(mContinuousPipelineConfigDir, ec); + ConfigWatcher::GetInstance()->AddSource(mContinuousPipelineConfigDir, &mPipelineMux); ec.clear(); filesystem::create_directories(mInstanceSourceDir, ec); diff --git a/core/config/provider/ConfigProvider.h b/core/config/provider/ConfigProvider.h index ed58ac3c11..c381fee9a6 100644 --- a/core/config/provider/ConfigProvider.h +++ b/core/config/provider/ConfigProvider.h @@ -34,7 +34,7 @@ class ConfigProvider { ConfigProvider() = default; virtual ~ConfigProvider() = default; - std::filesystem::path mPipelineSourceDir; + std::filesystem::path mContinuousPipelineConfigDir; std::filesystem::path mInstanceSourceDir; mutable std::mutex mPipelineMux; mutable std::mutex mInstanceMux; diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index 1fd43d0652..34fce288be 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -80,7 +80,7 @@ class CommonConfigProviderUnittest : public ::testing::Test { MockCommonConfigProvider provider; provider.Init("common_v2"); provider.Stop(); - bfs::remove_all(provider.mPipelineSourceDir.string()); + bfs::remove_all(provider.mContinuousPipelineConfigDir.string()); bfs::remove_all(provider.mInstanceSourceDir.string()); } else { CreateAgentDir(); @@ -91,7 +91,7 @@ class CommonConfigProviderUnittest : public ::testing::Test { MockCommonConfigProvider provider; provider.Init("common_v2"); provider.Stop(); - bfs::remove_all(provider.mPipelineSourceDir.string()); + bfs::remove_all(provider.mContinuousPipelineConfigDir.string()); bfs::remove_all(provider.mInstanceSourceDir.string()); } } @@ -101,7 +101,7 @@ class CommonConfigProviderUnittest : public ::testing::Test { MockCommonConfigProvider provider; provider.Init("common_v2"); provider.Stop(); - bfs::remove_all(provider.mPipelineSourceDir.string()); + bfs::remove_all(provider.mContinuousPipelineConfigDir.string()); bfs::remove_all(provider.mInstanceSourceDir.string()); } From a7b91f83d0048427e43c2fd939ad1ea4bef1532e Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Tue, 19 Nov 2024 11:34:18 +0000 Subject: [PATCH 02/10] mOnetimePipelineConfigDir --- core/config/provider/ConfigProvider.cpp | 8 ++++++++ core/config/provider/ConfigProvider.h | 2 ++ 2 files changed, 10 insertions(+) diff --git a/core/config/provider/ConfigProvider.cpp b/core/config/provider/ConfigProvider.cpp index 28ccd1a72c..b85c30538b 100644 --- a/core/config/provider/ConfigProvider.cpp +++ b/core/config/provider/ConfigProvider.cpp @@ -32,6 +32,10 @@ void ConfigProvider::Init(const string& dir) { mInstanceSourceDir /= "instance_config"; mInstanceSourceDir /= dir; + mOnetimePipelineConfigDir.assign(AppConfig::GetInstance()->GetLoongcollectorConfDir()); + mOnetimePipelineConfigDir /= "onetime_pipeline_config"; + mOnetimePipelineConfigDir /= dir; + error_code ec; filesystem::create_directories(mContinuousPipelineConfigDir, ec); ConfigWatcher::GetInstance()->AddSource(mContinuousPipelineConfigDir, &mPipelineMux); @@ -39,6 +43,10 @@ void ConfigProvider::Init(const string& dir) { ec.clear(); filesystem::create_directories(mInstanceSourceDir, ec); InstanceConfigWatcher::GetInstance()->AddSource(mInstanceSourceDir, &mInstanceMux); + + ec.clear(); + filesystem::create_directories(mOnetimePipelineConfigDir, ec); + PipelineConfigWatcher::GetInstance()->AddSource(mOnetimePipelineConfigDir, &mOnetimePipelineMux); } } // namespace logtail diff --git a/core/config/provider/ConfigProvider.h b/core/config/provider/ConfigProvider.h index c381fee9a6..8c5a965343 100644 --- a/core/config/provider/ConfigProvider.h +++ b/core/config/provider/ConfigProvider.h @@ -36,8 +36,10 @@ class ConfigProvider { std::filesystem::path mContinuousPipelineConfigDir; std::filesystem::path mInstanceSourceDir; + std::filesystem::path mOnetimePipelineConfigDir; mutable std::mutex mPipelineMux; mutable std::mutex mInstanceMux; + mutable std::mutex mOnetimePipelineMux; }; } // namespace logtail From 0ee4653a6c43dd89442ed873db73040dacbab980 Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Tue, 19 Nov 2024 11:35:46 +0000 Subject: [PATCH 03/10] mContinuousPipelineMux --- core/config/common_provider/CommonConfigProvider.cpp | 4 ++-- core/config/common_provider/LegacyCommonConfigProvider.cpp | 2 +- core/config/provider/ConfigProvider.cpp | 2 +- core/config/provider/ConfigProvider.h | 2 +- 4 files changed, 5 insertions(+), 5 deletions(-) diff --git a/core/config/common_provider/CommonConfigProvider.cpp b/core/config/common_provider/CommonConfigProvider.cpp index 45588d16fb..424617422a 100644 --- a/core/config/common_provider/CommonConfigProvider.cpp +++ b/core/config/common_provider/CommonConfigProvider.cpp @@ -132,7 +132,7 @@ void CommonConfigProvider::Stop() { void CommonConfigProvider::LoadConfigFile() { error_code ec; lock_guard pipelineInfomaplock(mPipelineInfoMapMux); - lock_guard lockPipeline(mPipelineMux); + lock_guard lockPipeline(mContinuousPipelineMux); for (auto const& entry : filesystem::directory_iterator(mContinuousPipelineConfigDir, ec)) { Json::Value detail; if (LoadConfigDetailFromFile(entry, detail)) { @@ -432,7 +432,7 @@ void CommonConfigProvider::UpdateRemotePipelineConfig( return; } - lock_guard lock(mPipelineMux); + lock_guard lock(mContinuousPipelineMux); lock_guard infomaplock(mPipelineInfoMapMux); for (const auto& config : configs) { filesystem::path filePath = sourceDir / (config.name() + ".json"); diff --git a/core/config/common_provider/LegacyCommonConfigProvider.cpp b/core/config/common_provider/LegacyCommonConfigProvider.cpp index 96f2132c5a..5b29defbcc 100644 --- a/core/config/common_provider/LegacyCommonConfigProvider.cpp +++ b/core/config/common_provider/LegacyCommonConfigProvider.cpp @@ -295,7 +295,7 @@ void LegacyCommonConfigProvider::UpdateRemoteConfig( return; } - lock_guard lock(mPipelineMux); + lock_guard lock(mContinuousPipelineMux); for (const auto& checkResult : checkResults) { filesystem::path filePath = mContinuousPipelineConfigDir / (checkResult.name() + ".yaml"); filesystem::path tmpFilePath = mContinuousPipelineConfigDir / (checkResult.name() + ".yaml.new"); diff --git a/core/config/provider/ConfigProvider.cpp b/core/config/provider/ConfigProvider.cpp index b85c30538b..277d8d49d4 100644 --- a/core/config/provider/ConfigProvider.cpp +++ b/core/config/provider/ConfigProvider.cpp @@ -38,7 +38,7 @@ void ConfigProvider::Init(const string& dir) { error_code ec; filesystem::create_directories(mContinuousPipelineConfigDir, ec); - ConfigWatcher::GetInstance()->AddSource(mContinuousPipelineConfigDir, &mPipelineMux); + ConfigWatcher::GetInstance()->AddSource(mContinuousPipelineConfigDir, &mContinuousPipelineMux); ec.clear(); filesystem::create_directories(mInstanceSourceDir, ec); diff --git a/core/config/provider/ConfigProvider.h b/core/config/provider/ConfigProvider.h index 8c5a965343..42ee33112f 100644 --- a/core/config/provider/ConfigProvider.h +++ b/core/config/provider/ConfigProvider.h @@ -37,7 +37,7 @@ class ConfigProvider { std::filesystem::path mContinuousPipelineConfigDir; std::filesystem::path mInstanceSourceDir; std::filesystem::path mOnetimePipelineConfigDir; - mutable std::mutex mPipelineMux; + mutable std::mutex mContinuousPipelineMux; mutable std::mutex mInstanceMux; mutable std::mutex mOnetimePipelineMux; }; From dd8b9d9fb567a44f271314a1666fd644dd3478cc Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Wed, 20 Nov 2024 06:40:05 +0000 Subject: [PATCH 04/10] fix --- config_server/protocol/v2/agentV2.proto | 140 ++++++++++++------------ 1 file changed, 69 insertions(+), 71 deletions(-) diff --git a/config_server/protocol/v2/agentV2.proto b/config_server/protocol/v2/agentV2.proto index 8baeac334f..87ab400912 100644 --- a/config_server/protocol/v2/agentV2.proto +++ b/config_server/protocol/v2/agentV2.proto @@ -21,32 +21,32 @@ enum ConfigStatus { // Define the Config information carried in the request message ConfigInfo { - string name = 1; // Required, Config's unique identification - int64 version = 2; // Required, Config's version number or hash code - ConfigStatus status = 3; // Config's status - string message = 4; // Optional error message + string name = 1; // Required, Config's unique identification + int64 version = 2; // Required, Config's version number or hash code + ConfigStatus status = 3; // Config's status + string message = 4; // Optional error message map extra = 5; // Optional extra info } // Define Agent's basic attributes message AgentAttributes { - bytes version = 1; // Agent's version - bytes ip = 2; // Agent's ip - bytes hostname = 3; // Agent's hostname - bytes hostid = 4; // Agent's hostid https://opentelemetry.io/docs/specs/semconv/attributes-registry/host/ - map extras = 100; // Agent's other attributes + bytes version = 1; // Agent's version + bytes ip = 2; // Agent's ip + bytes hostname = 3; // Agent's hostname + bytes hostid = 4; // Agent's hostid https://opentelemetry.io/docs/specs/semconv/attributes-registry/host/ + map extras = 100; // Agent's other attributes // before 100 (inclusive) are reserved for future official fields } enum AgentCapabilities { // The capabilities field is unspecified. - UnspecifiedAgentCapability = 0; - // The Agent can accept pipeline configuration from the Server. - AcceptsPipelineConfig = 0x00000001; + UnspecifiedAgentCapability = 0; + // The Agent can accept continuous pipeline configuration from the Server. + AcceptsContinuousPipelineConfig = 0x00000001; // The Agent can accept instance configuration from the Server. - AcceptsInstanceConfig = 0x00000002; - // The Agent can accept custom command from the Server. - AcceptsCustomCommand = 0x00000004; + AcceptsInstanceConfig = 0x00000002; + // The Agent can accept onetime pipeline configuration from the Server. + AcceptsOnetimePipelineConfig = 0x00000004; // Add new capabilities here, continuing with the least significant unused bit. } @@ -57,7 +57,7 @@ enum RequestFlags { // Flags is a bit mask. Values below define individual bits. // Must be set if this request contains full state - FullState = 0x00000001; + FullState = 0x00000001; // bits before 2^16 (inclusive) are reserved for future official fields } @@ -66,49 +66,48 @@ enum RequestFlags { // Agent sends requests to the ConfigServer to get config updates and receive commands. message HeartbeatRequest { bytes request_id = 1; - uint64 sequence_num = 2; // Increment every request, for server to check sync status - uint64 capabilities = 3; // Bitmask of flags defined by AgentCapabilities enum - bytes instance_id = 4; // Required, Agent's unique identification, consistent throughout the process lifecycle - string agent_type = 5; // Required, Agent's type(ilogtail, ..) - AgentAttributes attributes = 6; // Agent's basic attributes - repeated AgentGroupTag tags = 7; // Agent's tags - string running_status = 8; // Human readable running status - int64 startup_time = 9; // Required, Agent's startup time - repeated ConfigInfo pipeline_configs = 10; // Information about the current PIPELINE_CONFIG held by the Agent - repeated ConfigInfo instance_configs = 11; // Information about the current AGENT_CONFIG held by the Agent - repeated ConfigInfo custom_commands = 12; // Information about command history - uint64 flags = 13; // Predefined command flag - bytes opaque = 14; // Opaque data for extension + uint64 sequence_num = 2; // Increment every request, for server to check sync status + uint64 capabilities = 3; // Bitmask of flags defined by AgentCapabilities enum + bytes instance_id = 4; // Required, Agent's unique identification, consistent throughout the process lifecycle + string agent_type = 5; // Required, Agent's type(ilogtail, ..) + AgentAttributes attributes = 6; // Agent's basic attributes + repeated AgentGroupTag tags = 7; // Agent's tags + string running_status = 8; // Human readable running status + int64 startup_time = 9; // Required, Agent's startup time + repeated ConfigInfo continuous_pipeline_configs = 10; // Information about the current continuous pipeline configs held by the Agent + repeated ConfigInfo instance_configs = 11; // Information about the current instance configs held by the Agent + repeated ConfigInfo onetime_pipeline_configs = 12; // Information about onetime pipeline configs history + uint64 flags = 13; // Predefined command flag + bytes opaque = 14; // Opaque data for extension // before 100 (inclusive) are reserved for future official fields } // Define Config's detail message ConfigDetail { - string name = 1; // Required, Config's unique identification - int64 version = 2; // Required, Config's version number or hash code - bytes detail = 3; // Required, Config's detail + string name = 1; // Required, Config's unique identification + int64 version = 2; // Required, Config's version number or hash code + bytes detail = 3; // Required, Config's detail map extra = 4; // Optional extra info } message CommandDetail { - string type = 1; // Required, Command type - string name = 2; // Required, Command name - bytes detail = 3; // Required, Command's detail - int64 expire_time = 4; // After which the command can be safely removed from history - map extra = 5; // Optional extra info + string name = 1; // Required, Command name + bytes detail = 2; // Required, Command's detail + int64 expire_time = 3; // After which the command can be safely removed from history + map extra = 4; // Optional extra info } enum ServerCapabilities { // The capabilities field is unspecified. - UnspecifiedServerCapability = 0; + UnspecifiedServerCapability = 0; // The Server can remember agent attributes. - RembersAttribute = 0x00000001; - // The Server can remember pipeline config status. - RembersPipelineConfigStatus = 0x00000002; + RembersAttribute = 0x00000001; + // The Server can remember continuous pipeline config status. + RembersContinuousPipelineConfigStatus = 0x00000002; // The Server can remember instance config status. - RembersInstanceConfigStatus = 0x00000004; - // The Server can remember custom command status. - RembersCustomCommandStatus = 0x00000008; + RembersInstanceConfigStatus = 0x00000004; + // The Server can remember onetime pipeline config status. + RembersOnetimePipelineConfigStatus = 0x00000008; // bits before 2^16 (inclusive) are reserved for future official fields } @@ -122,45 +121,45 @@ enum ResponseFlags { // some sub-message in the last AgentToServer message (which is an allowed // optimization) but the Server detects that it does not have it (e.g. was // restarted and lost state). - ReportFullState = 0x00000001; - // FetchPipelineConfigDetail can be used by the Server to tell Agent to fetch config details by FetchConfig api, + ReportFullState = 0x00000001; + // FetchContinuousPipelineConfigDetail can be used by the Server to tell Agent to fetch continuous pipeline config details by FetchConfig api, // HB response ConfigDetail will not contains details. - FetchPipelineConfigDetail = 0x00000002; - // like FetchPipelineConfigDetail, but for instance config. + FetchContinuousPipelineConfigDetail = 0x00000002; + // like FetchContinuousPipelineConfigDetail, but for instance config. FetchInstanceConfigDetail = 0x00000004; // bits before 2^16 (inclusive) are reserved for future official fields } // ConfigServer's response to Agent's request message HeartbeatResponse { - bytes request_id = 1; - CommonResponse commonResponse = 2; // Set common response - uint64 capabilities = 3; // Bitmask of flags defined by ServerCapabilities enum + bytes request_id = 1; + CommonResponse commonResponse = 2; // Set common response + uint64 capabilities = 3; // Bitmask of flags defined by ServerCapabilities enum - repeated ConfigDetail pipeline_config_updates = 4; // Agent's pipeline config update status - repeated ConfigDetail instance_config_updates = 5; // Agent's instance config update status - repeated CommandDetail custom_command_updates = 6; // Agent's commands updates - uint64 flags = 7; // Predefined command flag - bytes opaque = 8; // Opaque data for extension + repeated ConfigDetail continuous_pipeline_config_updates = 4; // Agent's continuous pipeline config update status + repeated ConfigDetail instance_config_updates = 5; // Agent's instance config update status + repeated CommandDetail onetime_pipeline_config_updates = 6; // Agent's onetime pipeline config updates + uint64 flags = 7; // Predefined command flag + bytes opaque = 8; // Opaque data for extension } // API: /Agent/FetchConfig // optional api for fetching configs details, but not by heartbeat response with config details, see README. message FetchConfigRequest { - bytes request_id = 1; - bytes instance_id = 2; // Agent's unique identification - repeated ConfigInfo pipeline_configs = 3; // Information about the current PIPELINE_CONFIG held by the Agent - repeated ConfigInfo instance_configs = 4; // Information about the current AGENT_CONFIG held by the Agent - repeated ConfigInfo custom_commands = 5; // Information about command history + bytes request_id = 1; + bytes instance_id = 2; // Agent's unique identification + repeated ConfigInfo continuous_pipeline_configs = 3; // Information about the current continuous pipeline configs held by the Agent + repeated ConfigInfo instance_configs = 4; // Information about the current instance configs held by the Agent + repeated ConfigInfo onetime_pipeline_configs = 5; // Information about onetime pipeline configs history } // ConfigServer response to Agent's config fetching request message FetchConfigResponse { - bytes request_id = 1; + bytes request_id = 1; CommonResponse commonResponse = 2; - repeated ConfigDetail pipeline_config_updates = 3; // Agent's pipeline config with details - repeated ConfigDetail instance_config_updates = 4; // Agent's instance config with details - repeated CommandDetail custom_command_updates = 5; // Agent's commands details + repeated ConfigDetail continuous_pipeline_config_updates = 3; // Agent's continuous pipeline config with details + repeated ConfigDetail instance_config_updates = 4; // Agent's instance config with details + repeated CommandDetail onetime_pipeline_config_updates = 5; // Agent's onetime pipeline config details } // API: /Agent/ReportStatus @@ -168,10 +167,10 @@ message FetchConfigResponse { // if HB server and Status server are different service, this api may be help. message ReportStatusRequest { bytes request_id = 1; - bytes instance_id = 2; // Agent's unique identification - repeated ConfigInfo pipeline_configs = 3; // status about the current PIPELINE_CONFIG held by the Agent - repeated ConfigInfo instance_configs = 4; // status about the current AGENT_CONFIG held by the Agent - repeated ConfigInfo custom_commands = 5; // status about command history + bytes instance_id = 2; // Agent's unique identification + repeated ConfigInfo continuous_pipeline_configs = 3; // status about the current continuous pipeline configs held by the Agent + repeated ConfigInfo instance_configs = 4; // status about the current instance configs held by the Agent + repeated ConfigInfo onetime_pipeline_configs = 5; // status about onetime pipeline configs history } // ConfigServer response to Agent's report status request @@ -180,8 +179,7 @@ message ReportStatusResponse { CommonResponse commonResponse = 2; } -message CommonResponse -{ +message CommonResponse { int32 status = 1; bytes errorMessage = 2; } From 7cfbfa835930adf486c6dae690f7ba9fa19b3e92 Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Wed, 20 Nov 2024 06:54:24 +0000 Subject: [PATCH 05/10] rename --- core/app_config/AppConfig.h | 2 +- core/application/Application.cpp | 6 +- .../common_provider/CommonConfigProvider.cpp | 101 ++++++++---------- .../common_provider/CommonConfigProvider.h | 22 ++-- .../LegacyCommonConfigProvider.cpp | 8 +- .../feedbacker/ConfigFeedbackReceiver.cpp | 44 ++++---- .../feedbacker/ConfigFeedbackReceiver.h | 20 ++-- core/config/feedbacker/ConfigFeedbackable.h | 4 +- core/config/provider/ConfigProvider.cpp | 10 +- core/config/provider/ConfigProvider.h | 2 - core/pipeline/PipelineManager.cpp | 19 ++-- core/task_pipeline/TaskPipelineManager.cpp | 17 +-- .../config/CommonConfigProviderUnittest.cpp | 17 +-- 13 files changed, 126 insertions(+), 146 deletions(-) diff --git a/core/app_config/AppConfig.h b/core/app_config/AppConfig.h index 12f23a25ee..05ed011b1a 100644 --- a/core/app_config/AppConfig.h +++ b/core/app_config/AppConfig.h @@ -299,7 +299,7 @@ class AppConfig { public: AppConfig(); - ~AppConfig() {}; + ~AppConfig(){}; void LoadInstanceConfig(const std::map>&); diff --git a/core/application/Application.cpp b/core/application/Application.cpp index 2a60ef20af..9971498d94 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -32,8 +32,8 @@ #include "common/version.h" #include "config/ConfigDiff.h" #include "config/InstanceConfigManager.h" -#include "config/watcher/PipelineConfigWatcher.h" #include "config/watcher/InstanceConfigWatcher.h" +#include "config/watcher/PipelineConfigWatcher.h" #include "file_server/ConfigManager.h" #include "file_server/EventDispatcher.h" #include "file_server/FileServer.h" @@ -210,8 +210,8 @@ void Application::Start() { // GCOVR_EXCL_START { // add local config dir - filesystem::path localConfigPath - = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) / GetContinuousPipelineConfigDir() / "local"; + filesystem::path localConfigPath = filesystem::path(AppConfig::GetInstance()->GetLoongcollectorConfDir()) + / GetContinuousPipelineConfigDir() / "local"; error_code ec; filesystem::create_directories(localConfigPath, ec); if (ec) { diff --git a/core/config/common_provider/CommonConfigProvider.cpp b/core/config/common_provider/CommonConfigProvider.cpp index 424617422a..d8e168c6c9 100644 --- a/core/config/common_provider/CommonConfigProvider.cpp +++ b/core/config/common_provider/CommonConfigProvider.cpp @@ -131,7 +131,7 @@ void CommonConfigProvider::Stop() { void CommonConfigProvider::LoadConfigFile() { error_code ec; - lock_guard pipelineInfomaplock(mPipelineInfoMapMux); + lock_guard pipelineInfomaplock(mContinuousPipelineInfoMapMux); lock_guard lockPipeline(mContinuousPipelineMux); for (auto const& entry : filesystem::directory_iterator(mContinuousPipelineConfigDir, ec)) { Json::Value detail; @@ -144,8 +144,8 @@ void CommonConfigProvider::LoadConfigFile() { } info.status = ConfigFeedbackStatus::APPLYING; info.detail = detail.toStyledString(); - mPipelineConfigInfoMap[info.name] = info; - ConfigFeedbackReceiver::GetInstance().RegisterPipelineConfig(info.name, this); + mContinuousPipelineConfigInfoMap[info.name] = info; + ConfigFeedbackReceiver::GetInstance().RegisterContinuousPipelineConfig(info.name, this); } } lock_guard instanceInfomaplock(mInstanceInfoMapMux); @@ -266,7 +266,7 @@ configserver::proto::v2::HeartbeatRequest CommonConfigProvider::PrepareHeartbeat heartbeatReq.set_request_id(requestID); heartbeatReq.set_sequence_num(mSequenceNum); heartbeatReq.set_capabilities(configserver::proto::v2::AcceptsInstanceConfig - | configserver::proto::v2::AcceptsPipelineConfig); + | configserver::proto::v2::AcceptsContinuousPipelineConfig); heartbeatReq.set_instance_id(GetInstanceId()); heartbeatReq.set_agent_type("LoongCollector"); FillAttributes(*heartbeatReq.mutable_attributes()); @@ -279,37 +279,18 @@ configserver::proto::v2::HeartbeatRequest CommonConfigProvider::PrepareHeartbeat heartbeatReq.set_running_status("running"); heartbeatReq.set_startup_time(mStartTime); - lock_guard pipelineinfomaplock(mPipelineInfoMapMux); - for (const auto& configInfo : mPipelineConfigInfoMap) { - addConfigInfoToRequest(configInfo, heartbeatReq.add_pipeline_configs()); + lock_guard pipelineinfomaplock(mContinuousPipelineInfoMapMux); + for (const auto& configInfo : mContinuousPipelineConfigInfoMap) { + addConfigInfoToRequest(configInfo, heartbeatReq.add_continuous_pipeline_configs()); } lock_guard instanceinfomaplock(mInstanceInfoMapMux); for (const auto& configInfo : mInstanceConfigInfoMap) { addConfigInfoToRequest(configInfo, heartbeatReq.add_instance_configs()); } - for (auto& configInfo : mCommandInfoMap) { - configserver::proto::v2::CommandInfo* command = heartbeatReq.add_custom_commands(); - command->set_type(configInfo.second.type); - command->set_name(configInfo.second.name); - command->set_message(configInfo.second.message); - switch (configInfo.second.status) { - case ConfigFeedbackStatus::UNSET: - command->set_status(configserver::proto::v2::ConfigStatus::UNSET); - break; - case ConfigFeedbackStatus::APPLYING: - command->set_status(configserver::proto::v2::ConfigStatus::APPLYING); - break; - case ConfigFeedbackStatus::APPLIED: - command->set_status(configserver::proto::v2::ConfigStatus::APPLIED); - break; - case ConfigFeedbackStatus::FAILED: - command->set_status(configserver::proto::v2::ConfigStatus::FAILED); - break; - case ConfigFeedbackStatus::DELETED: - break; - } - command->set_message(configInfo.second.message); + lock_guard onetimeinfomaplock(mOnetimePipelineInfoMapMux); + for (const auto& configInfo : mOnetimePipelineConfigInfoMap) { + addConfigInfoToRequest(configInfo, heartbeatReq.add_onetime_pipeline_configs()); } return heartbeatReq; } @@ -370,10 +351,10 @@ bool CommonConfigProvider::SendHttpRequest(const string& operation, bool CommonConfigProvider::FetchPipelineConfig( configserver::proto::v2::HeartbeatResponse& heartbeatResponse, ::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>& result) { - if (heartbeatResponse.flags() & ::configserver::proto::v2::FetchPipelineConfigDetail) { + if (heartbeatResponse.flags() & ::configserver::proto::v2::FetchContinuousPipelineConfigDetail) { return FetchPipelineConfigFromServer(heartbeatResponse, result); } else { - result.Swap(heartbeatResponse.mutable_pipeline_config_updates()); + result.Swap(heartbeatResponse.mutable_continuous_pipeline_config_updates()); return true; } } @@ -381,7 +362,7 @@ bool CommonConfigProvider::FetchPipelineConfig( bool CommonConfigProvider::FetchInstanceConfig( configserver::proto::v2::HeartbeatResponse& heartbeatResponse, ::google::protobuf::RepeatedPtrField< ::configserver::proto::v2::ConfigDetail>& result) { - if (heartbeatResponse.flags() & ::configserver::proto::v2::FetchPipelineConfigDetail) { + if (heartbeatResponse.flags() & ::configserver::proto::v2::FetchContinuousPipelineConfigDetail) { return FetchInstanceConfigFromServer(heartbeatResponse, result); } else { result.Swap(heartbeatResponse.mutable_instance_config_updates()); @@ -433,26 +414,26 @@ void CommonConfigProvider::UpdateRemotePipelineConfig( } lock_guard lock(mContinuousPipelineMux); - lock_guard infomaplock(mPipelineInfoMapMux); + lock_guard infomaplock(mContinuousPipelineInfoMapMux); for (const auto& config : configs) { filesystem::path filePath = sourceDir / (config.name() + ".json"); if (config.version() == -1) { - mPipelineConfigInfoMap.erase(config.name()); + mContinuousPipelineConfigInfoMap.erase(config.name()); filesystem::remove(filePath, ec); - ConfigFeedbackReceiver::GetInstance().UnregisterPipelineConfig(config.name()); + ConfigFeedbackReceiver::GetInstance().UnregisterContinuousPipelineConfig(config.name()); } else { if (!DumpConfigFile(config, sourceDir)) { - mPipelineConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), - .version = config.version(), - .status = ConfigFeedbackStatus::FAILED, - .detail = config.detail()}; + mContinuousPipelineConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), + .version = config.version(), + .status = ConfigFeedbackStatus::FAILED, + .detail = config.detail()}; continue; } - mPipelineConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), - .version = config.version(), - .status = ConfigFeedbackStatus::APPLYING, - .detail = config.detail()}; - ConfigFeedbackReceiver::GetInstance().RegisterPipelineConfig(config.name(), this); + mContinuousPipelineConfigInfoMap[config.name()] = ConfigInfo{.name = config.name(), + .version = config.version(), + .status = ConfigFeedbackStatus::APPLYING, + .detail = config.detail()}; + ConfigFeedbackReceiver::GetInstance().RegisterContinuousPipelineConfig(config.name(), this); } } } @@ -536,8 +517,8 @@ bool CommonConfigProvider::FetchPipelineConfigFromServer( string requestID = CalculateRandomUUID(); fetchConfigRequest.set_request_id(requestID); fetchConfigRequest.set_instance_id(GetInstanceId()); - for (const auto& config : heartbeatResponse.pipeline_config_updates()) { - auto reqConfig = fetchConfigRequest.add_pipeline_configs(); + for (const auto& config : heartbeatResponse.continuous_pipeline_config_updates()) { + auto reqConfig = fetchConfigRequest.add_continuous_pipeline_configs(); reqConfig->set_name(config.name()); reqConfig->set_version(config.version()); } @@ -550,20 +531,22 @@ bool CommonConfigProvider::FetchPipelineConfigFromServer( operation, reqBody, "FetchPipelineConfig", fetchConfigRequest.request_id(), fetchConfigResponse)) { configserver::proto::v2::FetchConfigResponse fetchConfigResponsePb; fetchConfigResponsePb.ParseFromString(fetchConfigResponse); - res.Swap(fetchConfigResponsePb.mutable_pipeline_config_updates()); + res.Swap(fetchConfigResponsePb.mutable_continuous_pipeline_config_updates()); return true; } return false; } -void CommonConfigProvider::FeedbackPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status) { - lock_guard infomaplock(mPipelineInfoMapMux); - auto info = mPipelineConfigInfoMap.find(name); - if (info != mPipelineConfigInfoMap.end()) { +void CommonConfigProvider::FeedbackContinuousPipelineConfigStatus(const std::string& name, + ConfigFeedbackStatus status) { + lock_guard infomaplock(mContinuousPipelineInfoMapMux); + auto info = mContinuousPipelineConfigInfoMap.find(name); + if (info != mContinuousPipelineConfigInfoMap.end()) { info->second.status = status; } LOG_DEBUG(sLogger, - ("CommonConfigProvider", "FeedbackPipelineConfigStatus")("name", name)("status", ToStringView(status))); + ("CommonConfigProvider", "FeedbackContinuousPipelineConfigStatus")("name", name)("status", + ToStringView(status))); } void CommonConfigProvider::FeedbackInstanceConfigStatus(const std::string& name, ConfigFeedbackStatus status) { lock_guard infomaplock(mInstanceInfoMapMux); @@ -574,17 +557,17 @@ void CommonConfigProvider::FeedbackInstanceConfigStatus(const std::string& name, LOG_DEBUG(sLogger, ("CommonConfigProvider", "FeedbackInstanceConfigStatus")("name", name)("status", ToStringView(status))); } -void CommonConfigProvider::FeedbackCommandConfigStatus(const std::string& type, - const std::string& name, - ConfigFeedbackStatus status) { - lock_guard infomaplock(mCommondInfoMapMux); - auto info = mCommandInfoMap.find(GenerateCommandFeedBackKey(type, name)); - if (info != mCommandInfoMap.end()) { +void CommonConfigProvider::FeedbackOnetimePipelineConfigStatus(const std::string& type, + const std::string& name, + ConfigFeedbackStatus status) { + lock_guard infomaplock(mOnetimePipelineInfoMapMux); + auto info = mOnetimePipelineConfigInfoMap.find(GenerateOnetimePipelineConfigFeedBackKey(type, name)); + if (info != mOnetimePipelineConfigInfoMap.end()) { info->second.status = status; } LOG_DEBUG(sLogger, ("CommonConfigProvider", - "FeedbackCommandConfigStatus")("type", type)("name", name)("status", ToStringView(status))); + "FeedbackOnetimePipelineConfigStatus")("type", type)("name", name)("status", ToStringView(status))); } } // namespace logtail diff --git a/core/config/common_provider/CommonConfigProvider.h b/core/config/common_provider/CommonConfigProvider.h index d79d22a95e..f7a4e918a6 100644 --- a/core/config/common_provider/CommonConfigProvider.h +++ b/core/config/common_provider/CommonConfigProvider.h @@ -38,13 +38,6 @@ struct ConfigInfo { std::string detail; }; -struct CommandInfo { - std::string type; - std::string name; - ConfigFeedbackStatus status; - std::string message; -}; - class CommonConfigProvider : public ConfigProvider, ConfigFeedbackable { public: std::string sName; @@ -61,10 +54,11 @@ class CommonConfigProvider : public ConfigProvider, ConfigFeedbackable { void Init(const std::string& dir) override; void Stop() override; - void FeedbackPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status) override; + void FeedbackContinuousPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status) override; void FeedbackInstanceConfigStatus(const std::string& name, ConfigFeedbackStatus status) override; - void - FeedbackCommandConfigStatus(const std::string& type, const std::string& name, ConfigFeedbackStatus status) override; + void FeedbackOnetimePipelineConfigStatus(const std::string& type, + const std::string& name, + ConfigFeedbackStatus status) override; CommonConfigProvider() = default; ~CommonConfigProvider() = default; @@ -105,12 +99,12 @@ class CommonConfigProvider : public ConfigProvider, ConfigFeedbackable { bool mConfigServerAvailable = false; mutable std::mutex mInstanceInfoMapMux; - mutable std::mutex mPipelineInfoMapMux; - mutable std::mutex mCommondInfoMapMux; + mutable std::mutex mContinuousPipelineInfoMapMux; + mutable std::mutex mOnetimePipelineInfoMapMux; - std::unordered_map mPipelineConfigInfoMap; + std::unordered_map mContinuousPipelineConfigInfoMap; std::unordered_map mInstanceConfigInfoMap; - std::unordered_map mCommandInfoMap; + std::unordered_map mOnetimePipelineConfigInfoMap; private: static std::string configVersion; diff --git a/core/config/common_provider/LegacyCommonConfigProvider.cpp b/core/config/common_provider/LegacyCommonConfigProvider.cpp index 5b29defbcc..4c1b70ea53 100644 --- a/core/config/common_provider/LegacyCommonConfigProvider.cpp +++ b/core/config/common_provider/LegacyCommonConfigProvider.cpp @@ -289,9 +289,11 @@ void LegacyCommonConfigProvider::UpdateRemoteConfig( filesystem::create_directories(mContinuousPipelineConfigDir, ec); if (ec) { StopUsingConfigServer(); - LOG_ERROR(sLogger, - ("failed to create dir for legacy common configs", "stop receiving config from legacy common config server")( - "dir", mContinuousPipelineConfigDir.string())("error code", ec.value())("error msg", ec.message())); + LOG_ERROR( + sLogger, + ("failed to create dir for legacy common configs", + "stop receiving config from legacy common config server")("dir", mContinuousPipelineConfigDir.string())( + "error code", ec.value())("error msg", ec.message())); return; } diff --git a/core/config/feedbacker/ConfigFeedbackReceiver.cpp b/core/config/feedbacker/ConfigFeedbackReceiver.cpp index b9807ef9e4..4bad1dabed 100644 --- a/core/config/feedbacker/ConfigFeedbackReceiver.cpp +++ b/core/config/feedbacker/ConfigFeedbackReceiver.cpp @@ -26,9 +26,10 @@ ConfigFeedbackReceiver& ConfigFeedbackReceiver::GetInstance() { return instance; } -void ConfigFeedbackReceiver::RegisterPipelineConfig(const std::string& name, ConfigFeedbackable* feedbackable) { +void ConfigFeedbackReceiver::RegisterContinuousPipelineConfig(const std::string& name, + ConfigFeedbackable* feedbackable) { std::lock_guard lock(mMutex); - mPipelineConfigFeedbackableMap[name] = feedbackable; + mContinuousPipelineConfigFeedbackableMap[name] = feedbackable; } void ConfigFeedbackReceiver::RegisterInstanceConfig(const std::string& name, ConfigFeedbackable* feedbackable) { @@ -36,16 +37,16 @@ void ConfigFeedbackReceiver::RegisterInstanceConfig(const std::string& name, Con mInstanceConfigFeedbackableMap[name] = feedbackable; } -void ConfigFeedbackReceiver::RegisterCommand(const std::string& type, - const std::string& name, - ConfigFeedbackable* feedbackable) { +void ConfigFeedbackReceiver::RegisterOnetimePipelineConfig(const std::string& type, + const std::string& name, + ConfigFeedbackable* feedbackable) { std::lock_guard lock(mMutex); - mCommandFeedbackableMap[GenerateCommandFeedBackKey(type, name)] = feedbackable; + mOnetimePipelineConfigFeedbackableMap[GenerateOnetimePipelineConfigFeedBackKey(type, name)] = feedbackable; } -void ConfigFeedbackReceiver::UnregisterPipelineConfig(const std::string& name) { +void ConfigFeedbackReceiver::UnregisterContinuousPipelineConfig(const std::string& name) { std::lock_guard lock(mMutex); - mPipelineConfigFeedbackableMap.erase(name); + mContinuousPipelineConfigFeedbackableMap.erase(name); } void ConfigFeedbackReceiver::UnregisterInstanceConfig(const std::string& name) { @@ -53,16 +54,17 @@ void ConfigFeedbackReceiver::UnregisterInstanceConfig(const std::string& name) { mInstanceConfigFeedbackableMap.erase(name); } -void ConfigFeedbackReceiver::UnregisterCommand(const std::string& type, const std::string& name) { +void ConfigFeedbackReceiver::UnregisterOnetimePipelineConfig(const std::string& type, const std::string& name) { std::lock_guard lock(mMutex); - mCommandFeedbackableMap.erase(GenerateCommandFeedBackKey(type, name)); + mOnetimePipelineConfigFeedbackableMap.erase(GenerateOnetimePipelineConfigFeedBackKey(type, name)); } -void ConfigFeedbackReceiver::FeedbackPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status) { +void ConfigFeedbackReceiver::FeedbackContinuousPipelineConfigStatus(const std::string& name, + ConfigFeedbackStatus status) { std::lock_guard lock(mMutex); - auto iter = mPipelineConfigFeedbackableMap.find(name); - if (iter != mPipelineConfigFeedbackableMap.end()) { - iter->second->FeedbackPipelineConfigStatus(name, status); + auto iter = mContinuousPipelineConfigFeedbackableMap.find(name); + if (iter != mContinuousPipelineConfigFeedbackableMap.end()) { + iter->second->FeedbackContinuousPipelineConfigStatus(name, status); } } @@ -74,17 +76,17 @@ void ConfigFeedbackReceiver::FeedbackInstanceConfigStatus(const std::string& nam } } -void ConfigFeedbackReceiver::FeedbackCommandConfigStatus(const std::string& type, - const std::string& name, - ConfigFeedbackStatus status) { +void ConfigFeedbackReceiver::FeedbackOnetimePipelineConfigStatus(const std::string& type, + const std::string& name, + ConfigFeedbackStatus status) { std::lock_guard lock(mMutex); - auto iter = mCommandFeedbackableMap.find(GenerateCommandFeedBackKey(type, name)); - if (iter != mCommandFeedbackableMap.end()) { - iter->second->FeedbackCommandConfigStatus(type, name, status); + auto iter = mOnetimePipelineConfigFeedbackableMap.find(GenerateOnetimePipelineConfigFeedBackKey(type, name)); + if (iter != mOnetimePipelineConfigFeedbackableMap.end()) { + iter->second->FeedbackOnetimePipelineConfigStatus(type, name, status); } } -std::string GenerateCommandFeedBackKey(const std::string& type, const std::string& name) { +std::string GenerateOnetimePipelineConfigFeedBackKey(const std::string& type, const std::string& name) { return type + '\1' + name; } diff --git a/core/config/feedbacker/ConfigFeedbackReceiver.h b/core/config/feedbacker/ConfigFeedbackReceiver.h index 86796afbcd..f20bcdb73c 100644 --- a/core/config/feedbacker/ConfigFeedbackReceiver.h +++ b/core/config/feedbacker/ConfigFeedbackReceiver.h @@ -23,27 +23,29 @@ namespace logtail { -std::string GenerateCommandFeedBackKey(const std::string& type, const std::string& name); +std::string GenerateOnetimePipelineConfigFeedBackKey(const std::string& type, const std::string& name); class ConfigFeedbackReceiver { public: static ConfigFeedbackReceiver& GetInstance(); - void RegisterPipelineConfig(const std::string& name, ConfigFeedbackable* feedbackable); + void RegisterContinuousPipelineConfig(const std::string& name, ConfigFeedbackable* feedbackable); void RegisterInstanceConfig(const std::string& name, ConfigFeedbackable* feedbackable); - void RegisterCommand(const std::string& type, const std::string& name, ConfigFeedbackable* feedbackable); - void UnregisterPipelineConfig(const std::string& name); + void + RegisterOnetimePipelineConfig(const std::string& type, const std::string& name, ConfigFeedbackable* feedbackable); + void UnregisterContinuousPipelineConfig(const std::string& name); void UnregisterInstanceConfig(const std::string& name); - void UnregisterCommand(const std::string& type, const std::string& name); - void FeedbackPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status); + void UnregisterOnetimePipelineConfig(const std::string& type, const std::string& name); + void FeedbackContinuousPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status); void FeedbackInstanceConfigStatus(const std::string& name, ConfigFeedbackStatus status); - void FeedbackCommandConfigStatus(const std::string& type, const std::string& name, ConfigFeedbackStatus status); + void + FeedbackOnetimePipelineConfigStatus(const std::string& type, const std::string& name, ConfigFeedbackStatus status); private: ConfigFeedbackReceiver() {} std::mutex mMutex; - std::unordered_map mPipelineConfigFeedbackableMap; + std::unordered_map mContinuousPipelineConfigFeedbackableMap; std::unordered_map mInstanceConfigFeedbackableMap; - std::unordered_map mCommandFeedbackableMap; + std::unordered_map mOnetimePipelineConfigFeedbackableMap; }; } // namespace logtail diff --git a/core/config/feedbacker/ConfigFeedbackable.h b/core/config/feedbacker/ConfigFeedbackable.h index f027e2e758..29126c9d33 100644 --- a/core/config/feedbacker/ConfigFeedbackable.h +++ b/core/config/feedbacker/ConfigFeedbackable.h @@ -28,10 +28,10 @@ std::string_view ToStringView(ConfigFeedbackStatus status); class ConfigFeedbackable { public: virtual ~ConfigFeedbackable() = default; // LCOV_EXCL_LINE - virtual void FeedbackPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status) = 0; + virtual void FeedbackContinuousPipelineConfigStatus(const std::string& name, ConfigFeedbackStatus status) = 0; virtual void FeedbackInstanceConfigStatus(const std::string& name, ConfigFeedbackStatus status) = 0; virtual void - FeedbackCommandConfigStatus(const std::string& type, const std::string& name, ConfigFeedbackStatus status) + FeedbackOnetimePipelineConfigStatus(const std::string& type, const std::string& name, ConfigFeedbackStatus status) = 0; }; diff --git a/core/config/provider/ConfigProvider.cpp b/core/config/provider/ConfigProvider.cpp index 277d8d49d4..207f52a39a 100644 --- a/core/config/provider/ConfigProvider.cpp +++ b/core/config/provider/ConfigProvider.cpp @@ -32,21 +32,13 @@ void ConfigProvider::Init(const string& dir) { mInstanceSourceDir /= "instance_config"; mInstanceSourceDir /= dir; - mOnetimePipelineConfigDir.assign(AppConfig::GetInstance()->GetLoongcollectorConfDir()); - mOnetimePipelineConfigDir /= "onetime_pipeline_config"; - mOnetimePipelineConfigDir /= dir; - error_code ec; filesystem::create_directories(mContinuousPipelineConfigDir, ec); - ConfigWatcher::GetInstance()->AddSource(mContinuousPipelineConfigDir, &mContinuousPipelineMux); + PipelineConfigWatcher::GetInstance()->AddSource(mContinuousPipelineConfigDir, &mContinuousPipelineMux); ec.clear(); filesystem::create_directories(mInstanceSourceDir, ec); InstanceConfigWatcher::GetInstance()->AddSource(mInstanceSourceDir, &mInstanceMux); - - ec.clear(); - filesystem::create_directories(mOnetimePipelineConfigDir, ec); - PipelineConfigWatcher::GetInstance()->AddSource(mOnetimePipelineConfigDir, &mOnetimePipelineMux); } } // namespace logtail diff --git a/core/config/provider/ConfigProvider.h b/core/config/provider/ConfigProvider.h index 42ee33112f..b41f663a8a 100644 --- a/core/config/provider/ConfigProvider.h +++ b/core/config/provider/ConfigProvider.h @@ -36,10 +36,8 @@ class ConfigProvider { std::filesystem::path mContinuousPipelineConfigDir; std::filesystem::path mInstanceSourceDir; - std::filesystem::path mOnetimePipelineConfigDir; mutable std::mutex mContinuousPipelineMux; mutable std::mutex mInstanceMux; - mutable std::mutex mOnetimePipelineMux; }; } // namespace logtail diff --git a/core/pipeline/PipelineManager.cpp b/core/pipeline/PipelineManager.cpp index 1fe4709564..57abcde874 100644 --- a/core/pipeline/PipelineManager.cpp +++ b/core/pipeline/PipelineManager.cpp @@ -40,7 +40,7 @@ PipelineManager::PipelineManager() : mInputRunners({ PrometheusInputRunner::GetInstance(), #if defined(__linux__) && !defined(__ANDROID__) - ebpf::eBPFServer::GetInstance(), + ebpf::eBPFServer::GetInstance(), #endif }) { } @@ -78,7 +78,8 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { DecreasePluginUsageCnt(iter->second->GetPluginStatistics()); iter->second->RemoveProcessQueue(); mPipelineNameEntityMap.erase(iter); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(name, ConfigFeedbackStatus::DELETED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(name, + ConfigFeedbackStatus::DELETED); } for (auto& config : diff.mModified) { auto p = BuildPipeline(std::move(config)); // auto reuse old pipeline's process queue and sender queue @@ -92,8 +93,8 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { config.mProject, config.mLogstore, config.mRegion); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, - ConfigFeedbackStatus::FAILED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName, + ConfigFeedbackStatus::FAILED); continue; } LOG_INFO(sLogger, @@ -106,7 +107,8 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { mPipelineNameEntityMap[config.mName] = p; IncreasePluginUsageCnt(p->GetPluginStatistics()); p->Start(); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName, + ConfigFeedbackStatus::APPLIED); } for (auto& config : diff.mAdded) { auto p = BuildPipeline(std::move(config)); @@ -119,8 +121,8 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { config.mProject, config.mLogstore, config.mRegion); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, - ConfigFeedbackStatus::FAILED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName, + ConfigFeedbackStatus::FAILED); continue; } LOG_INFO(sLogger, @@ -128,7 +130,8 @@ void logtail::PipelineManager::UpdatePipelines(PipelineConfigDiff& diff) { mPipelineNameEntityMap[config.mName] = p; IncreasePluginUsageCnt(p->GetPluginStatistics()); p->Start(); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName, + ConfigFeedbackStatus::APPLIED); } #ifndef APSARA_UNIT_TEST_MAIN diff --git a/core/task_pipeline/TaskPipelineManager.cpp b/core/task_pipeline/TaskPipelineManager.cpp index a846f1dd4d..44d9b50195 100644 --- a/core/task_pipeline/TaskPipelineManager.cpp +++ b/core/task_pipeline/TaskPipelineManager.cpp @@ -29,7 +29,8 @@ void TaskPipelineManager::UpdatePipelines(TaskConfigDiff& diff) { auto iter = mPipelineNameEntityMap.find(name); iter->second->Stop(true); mPipelineNameEntityMap.erase(iter); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(name, ConfigFeedbackStatus::DELETED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(name, + ConfigFeedbackStatus::DELETED); } for (auto& config : diff.mModified) { auto p = BuildPipeline(std::move(config)); @@ -40,8 +41,8 @@ void TaskPipelineManager::UpdatePipelines(TaskConfigDiff& diff) { AlarmManager::GetInstance()->SendAlarm( CATEGORY_CONFIG_ALARM, "failed to build task for existing config: keep current task running, config: " + config.mName); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, - ConfigFeedbackStatus::FAILED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName, + ConfigFeedbackStatus::FAILED); continue; } LOG_INFO(sLogger, @@ -51,7 +52,8 @@ void TaskPipelineManager::UpdatePipelines(TaskConfigDiff& diff) { iter->second->Stop(false); mPipelineNameEntityMap[config.mName] = std::move(p); mPipelineNameEntityMap[config.mName]->Start(); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName, + ConfigFeedbackStatus::APPLIED); } for (auto& config : diff.mAdded) { auto p = BuildPipeline(std::move(config)); @@ -61,14 +63,15 @@ void TaskPipelineManager::UpdatePipelines(TaskConfigDiff& diff) { AlarmManager::GetInstance()->SendAlarm(CATEGORY_CONFIG_ALARM, "failed to build task for new config: skip current object, config: " + config.mName); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, - ConfigFeedbackStatus::FAILED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName, + ConfigFeedbackStatus::FAILED); continue; } LOG_INFO(sLogger, ("task building for new config succeeded", "begin to start task")("config", config.mName)); mPipelineNameEntityMap[config.mName] = std::move(p); mPipelineNameEntityMap[config.mName]->Start(); - ConfigFeedbackReceiver::GetInstance().FeedbackPipelineConfigStatus(config.mName, ConfigFeedbackStatus::APPLIED); + ConfigFeedbackReceiver::GetInstance().FeedbackContinuousPipelineConfigStatus(config.mName, + ConfigFeedbackStatus::APPLIED); } } diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index 34fce288be..40b6338e30 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -21,8 +21,8 @@ #include "config/ConfigDiff.h" #include "config/InstanceConfigManager.h" #include "config/common_provider/CommonConfigProvider.h" -#include "config/watcher/PipelineConfigWatcher.h" #include "config/watcher/InstanceConfigWatcher.h" +#include "config/watcher/PipelineConfigWatcher.h" #include "gmock/gmock.h" #include "monitor/Monitor.h" #include "pipeline/PipelineManager.h" @@ -294,7 +294,8 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { APSARA_TEST_EQUAL(heartbeatReq.sequence_num(), sequence_num); sequence_num++; APSARA_TEST_TRUE(heartbeatReq.capabilities() & configserver::proto::v2::AcceptsInstanceConfig); - APSARA_TEST_TRUE(heartbeatReq.capabilities() & configserver::proto::v2::AcceptsPipelineConfig); + APSARA_TEST_TRUE(heartbeatReq.capabilities() + & configserver::proto::v2::AcceptsContinuousPipelineConfig); APSARA_TEST_EQUAL(heartbeatReq.instance_id(), provider.GetInstanceId()); APSARA_TEST_EQUAL(heartbeatReq.agent_type(), "LoongCollector"); APSARA_TEST_EQUAL(heartbeatReq.attributes().ip(), LoongCollectorMonitor::mIpAddr); @@ -427,9 +428,9 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { configserver::proto::v2::HeartbeatResponse heartbeatResponse; provider.GetConfigUpdate(); - APSARA_TEST_EQUAL(provider.mPipelineConfigInfoMap.size(), 2); - APSARA_TEST_EQUAL(provider.mPipelineConfigInfoMap["config1"].status, ConfigFeedbackStatus::APPLYING); - APSARA_TEST_EQUAL(provider.mPipelineConfigInfoMap["config2"].status, ConfigFeedbackStatus::FAILED); + APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap.size(), 2); + APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap["config1"].status, ConfigFeedbackStatus::APPLYING); + APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap["config2"].status, ConfigFeedbackStatus::FAILED); // 处理 pipelineconfig auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); @@ -474,8 +475,8 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { { MockCommonConfigProvider provider; provider.Init("common_v2"); - APSARA_TEST_EQUAL(provider.mPipelineConfigInfoMap.size(), 1); - APSARA_TEST_EQUAL(provider.mPipelineConfigInfoMap["config1"].status, ConfigFeedbackStatus::APPLYING); + APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap.size(), 1); + APSARA_TEST_EQUAL(provider.mContinuousPipelineConfigInfoMap["config1"].status, ConfigFeedbackStatus::APPLYING); APSARA_TEST_EQUAL(provider.mInstanceConfigInfoMap.size(), 1); APSARA_TEST_EQUAL(provider.mInstanceConfigInfoMap["instanceconfig1"].status, ConfigFeedbackStatus::APPLYING); provider.Stop(); @@ -645,7 +646,7 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { configserver::proto::v2::HeartbeatResponse heartbeatResponse; provider.GetConfigUpdate(); - APSARA_TEST_TRUE(provider.mPipelineConfigInfoMap.empty()); + APSARA_TEST_TRUE(provider.mContinuousPipelineConfigInfoMap.empty()); // 处理pipelineConfigDiff auto pipelineConfigDiff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); From 91f4236efb26b10a34ead6157c3614127f494398 Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Wed, 20 Nov 2024 07:11:34 +0000 Subject: [PATCH 06/10] fix --- config_server/protocol/v2/README.md | 146 +++++++++--------- .../config/CommonConfigProviderUnittest.cpp | 7 +- 2 files changed, 75 insertions(+), 78 deletions(-) diff --git a/config_server/protocol/v2/README.md b/config_server/protocol/v2/README.md index 53e096f2bd..a3297c4faa 100644 --- a/config_server/protocol/v2/README.md +++ b/config_server/protocol/v2/README.md @@ -13,19 +13,19 @@ message HeartbeatRequest { bytes request_id = 1; - uint64 sequence_num = 2; // Increment every request, for server to check sync status - uint64 capabilities = 3; // Bitmask of flags defined by AgentCapabilities enum - bytes instance_id = 4; // Required, Agent's unique identification, consistent throughout the process lifecycle - string agent_type = 5; // Required, Agent's type(ilogtail, ..) - AgentAttributes attributes = 6; // Agent's basic attributes - repeated AgentGroupTag tags = 7; // Agent's tags - string running_status = 8; // Human readable running status - int64 startup_time = 9; // Required, Agent's startup time - repeated ConfigInfo pipeline_configs = 10; // Information about the current PIPELINE_CONFIG held by the Agent - repeated ConfigInfo instance_configs = 11; // Information about the current AGENT_CONFIG held by the Agent - repeated CommandInfo custom_commands = 12; // Information about command history - uint64 flags = 13; // Predefined command flag - bytes opaque = 14; // Opaque data for extension + uint64 sequence_num = 2; // Increment every request, for server to check sync status + uint64 capabilities = 3; // Bitmask of flags defined by AgentCapabilities enum + bytes instance_id = 4; // Required, Agent's unique identification, consistent throughout the process lifecycle + string agent_type = 5; // Required, Agent's type(ilogtail, ..) + AgentAttributes attributes = 6; // Agent's basic attributes + repeated AgentGroupTag tags = 7; // Agent's tags + string running_status = 8; // Human readable running status + int64 startup_time = 9; // Required, Agent's startup time + repeated ConfigInfo continuous_pipeline_configs = 10; // Information about the current continuous pipeline configs held by the Agent + repeated ConfigInfo instance_configs = 11; // Information about the current instance configs held by the Agent + repeated ConfigInfo onetime_pipeline_configs = 12; // Information about onetime pipeline configs history + uint64 flags = 13; // Predefined command flag + bytes opaque = 14; // Opaque data for extension // before 100 (inclusive) are reserved for future official fields } @@ -55,15 +55,6 @@ map extra = 5; // Optional extra info } - // Define the Command information carried in the request - message CommandInfo { - string type = 1; // Command's type - string name = 2; // Required, Command's unique identification - ConfigStatus status = 3; // Command's status - string message = 4; // Optional error message - map extra = 5; // Optional extra info - } - // Define Agent's basic attributes message AgentAttributes { bytes version = 1; // Agent's version @@ -76,14 +67,14 @@ enum AgentCapabilities { // The capabilities field is unspecified. UnspecifiedAgentCapability = 0; - // The Agent can accept pipeline configuration from the Server. - AcceptsPipelineConfig = 0x00000001; + // The Agent can accept continuous pipeline configuration from the Server. + AcceptsContinuousPipelineConfig = 0x00000001; // The Agent can accept instance configuration from the Server. - AcceptsInstanceConfig = 0x00000002; - // The Agent can accept custom command from the Server. - AcceptsCustomCommand = 0x00000004; + AcceptsInstanceConfig = 0x00000002; + // The Agent can accept onetime pipeline configuration from the Server. + AcceptsOnetimePipelineConfig = 0x00000004; - // bits before 2^16 (inclusive) are reserved for future official fields + // Add new capabilities here, continuing with the least significant unused bit. } enum RequestFlags { @@ -99,15 +90,15 @@ ### HeartbeatResponse 消息 message HeartbeatResponse { - bytes request_id = 1; - ServerErrorResponse error_response = 2; // Set value indicates error - uint64 capabilities = 3; // Bitmask of flags defined by ServerCapabilities enum - - repeated ConfigDetail pipeline_config_updates = 4; // Agent's pipeline config update status - repeated ConfigDetail instance_config_updates = 5; // Agent's instance config update status - repeated CommandDetail custom_command_updates = 6; // Agent's commands updates - uint64 flags = 7; // Predefined command flag - bytes opaque = 8; // Opaque data for extension + bytes request_id = 1; + CommonResponse commonResponse = 2; // Set common response + uint64 capabilities = 3; // Bitmask of flags defined by ServerCapabilities enum + + repeated ConfigDetail continuous_pipeline_config_updates = 4; // Agent's continuous pipeline config update status + repeated ConfigDetail instance_config_updates = 5; // Agent's instance config update status + repeated CommandDetail onetime_pipeline_config_updates = 6; // Agent's onetime pipeline config updates + uint64 flags = 7; // Predefined command flag + bytes opaque = 8; // Opaque data for extension } message ConfigDetail { @@ -118,24 +109,23 @@ } message CommandDetail { - string type = 1; // Required, Command type - string name = 2; // Required, Command name - bytes detail = 3; // Required, Command's detail - int64 expire_time = 4; // After which the command can be safely removed from history - map extra = 5; // Optional extra info + string name = 1; // Required, Command name + bytes detail = 2; // Required, Command's detail + int64 expire_time = 3; // After which the command can be safely removed from history + map extra = 4; // Optional extra info } enum ServerCapabilities { // The capabilities field is unspecified. - UnspecifiedServerCapability = 0; + UnspecifiedServerCapability = 0; // The Server can remember agent attributes. - RembersAttribute = 0x00000001; - // The Server can remember pipeline config status. - RembersPipelineConfigStatus = 0x00000002; + RembersAttribute = 0x00000001; + // The Server can remember continuous pipeline config status. + RembersContinuousPipelineConfigStatus = 0x00000002; // The Server can remember instance config status. - RembersInstanceConfigStatus = 0x00000004; - // The Server can remember custom command status. - RembersCustomCommandStatus = 0x00000008; + RembersInstanceConfigStatus = 0x00000004; + // The Server can remember onetime pipeline config status. + RembersOnetimePipelineConfigStatus = 0x00000008; // bits before 2^16 (inclusive) are reserved for future official fields } @@ -154,11 +144,11 @@ // some sub-message in the last AgentToServer message (which is an allowed // optimization) but the Server detects that it does not have it (e.g. was // restarted and lost state). - ReportFullState = 0x00000001; - // FetchPipelineConfigDetail can be used by the Server to tell Agent to fetch config details by FetchConfig api, + ReportFullState = 0x00000001; + // FetchContinuousPipelineConfigDetail can be used by the Server to tell Agent to fetch continuous pipeline config details by FetchConfig api, // HB response ConfigDetail will not contains details. - FetchPipelineConfigDetail = 0x00000002; - // like FetchPipelineConfigDetail, but for instance config. + FetchContinuousPipelineConfigDetail = 0x00000002; + // like FetchContinuousPipelineConfigDetail, but for instance config. FetchInstanceConfigDetail = 0x00000004; // bits before 2^16 (inclusive) are reserved for future official fields } @@ -168,21 +158,21 @@ 额外的 config 拉取接口,不通过心跳返回 config 详情。 message FetchConfigRequest { - bytes request_id = 1; - bytes instance_id = 2; // Agent's unique identification - repeated ConfigInfo pipeline_configs = 3; // Information about the current PIPELINE_CONFIG held by the Agent - repeated ConfigInfo instance_configs = 4; // Information about the current AGENT_CONFIG held by the Agent - repeated CommandInfo custom_commands = 5; // Information about command history + bytes request_id = 1; + bytes instance_id = 2; // Agent's unique identification + repeated ConfigInfo continuous_pipeline_configs = 3; // Information about the current continuous pipeline configs held by the Agent + repeated ConfigInfo instance_configs = 4; // Information about the current instance configs held by the Agent + repeated ConfigInfo onetime_pipeline_configs = 5; // Information about onetime pipeline configs history } ### [Optional] FetchConfigResponse 消息 message FetchConfigResponse { - bytes request_id = 1; + bytes request_id = 1; CommonResponse commonResponse = 2; - repeated ConfigDetail pipeline_config_updates = 3; // Agent's pipeline config with details - repeated ConfigDetail instance_config_updates = 4; // Agent's instance config with details - repeated CommandDetail custom_command_updates = 5; // Agent's commands details + repeated ConfigDetail continuous_pipeline_config_updates = 3; // Agent's continuous pipeline config with details + repeated ConfigDetail instance_config_updates = 4; // Agent's instance config with details + repeated CommandDetail onetime_pipeline_config_updates = 5; // Agent's onetime pipeline config details } ### [Optional] ReportStatusRequest 消息 @@ -191,10 +181,10 @@ message ReportStatusRequest { bytes request_id = 1; - bytes instance_id = 2; // Agent's unique identification - repeated ConfigInfo pipeline_configs = 3; // status about the current PIPELINE_CONFIG held by the Agent - repeated ConfigInfo instance_configs = 4; // status about the current AGENT_CONFIG held by the Agent - repeated CommandInfo custom_commands = 5; // status about command history + bytes instance_id = 2; // Agent's unique identification + repeated ConfigInfo continuous_pipeline_configs = 3; // status about the current continuous pipeline configs held by the Agent + repeated ConfigInfo instance_configs = 4; // status about the current instance configs held by the Agent + repeated ConfigInfo onetime_pipeline_configs = 5; // status about onetime pipeline configs history } ### [Optional] ReportStatusResponse 消息 @@ -218,7 +208,7 @@ Server:应当通过capbilitiies上报Server自身的能力,这样如果新 Client:Agent启动后第一次向Server汇报全量信息,request字段应填尽填。request\_id、sequence\_num、capabilities、instance\_id、agent\_type、startup\_time为必填字段。 -Server:Server根据上报的信息返回响应。pipeline\_config\_updates、instance\_config\_updates中包含agent需要同步的配置,updates中必然包含name和version,是否包含detail取决于server端实现, 如果不包含则需要通过 FetchConfig 拉取。custom\_command_updates包含要求agent执行的命令command中必然包含type、name和expire\_time。 +Server:Server根据上报的信息返回响应。continuous\_pipeline\_config\_updates、instance\_config\_updates中包含agent需要同步的配置,updates中必然包含name和version,是否包含detail取决于server端实现, 如果不包含则需要通过 FetchConfig 拉取。onetime\_pipeline\_config\_updates包含要求agent执行的命令中必然包含name和expire\_time。 Server是否保存Client信息取决于Server实现,如果服务端找不到或保存的sequence\_num + 1 ≠ 心跳的sequence\_num,那么就立刻返回并且flags中必须设置ReportFullStatus标识位。 @@ -243,10 +233,11 @@ Server:同注册 ### 进程配置 可选两种实现: + 1. 在心跳中完成进程配置的状态上报与同步。 Server的注册/心跳响应中有instance\_config\_updates.detail,client 直接从response中获得detail,应用成功后下次心跳需要上报完整状态。 - + 2. 在心跳中完成进程配置的基础信息同步,通过额外的接口完成进程配置的拉取。 Server的响应不包含detail, 只包含要更新的进程配置 name 和 version。client 比较本地的配置和 version 判断需要更新后,根据 instance_config_updates 的信息构造 FetchConfigRequest 后进行一次额外拉取。FetchConfigRequest 至少需要包括 name 和 version。 @@ -258,23 +249,24 @@ Client获取到多个进程配置时,自动合并,若产生冲突默认行 ### 采集配置 可选两种实现: + 1. 在心跳中完成采集配置的状态上报与同步。 - Server的注册/心跳响应中有pipeline\_config\_updates.detail, Client 直接从response中获得detail,应用成功后下次心跳需要上报完整状态。 + Server的注册/心跳响应中有continuous\_pipeline\_config\_updates.detail, Client 直接从response中获得detail,应用成功后下次心跳需要上报完整状态。 2. 在心跳中完成采集配置的基础信息同步,通过额外的接口完成进程配置的拉取。 - Server的响应不包含detail, 只包含要更新的采集配置 name 和 version。client 比较本地的配置和 version 判断需要更新后,根据 pipeline_config_updates 的信息构造 FetchConfigRequest 后进行一次额外拉取。FetchConfigRequest 至少需要包括 name 和 version。 + Server的响应不包含detail, 只包含要更新的采集配置 name 和 version。client 比较本地的配置和 version 判断需要更新后,根据 continuous_pipeline_config_updates 的信息构造 FetchConfigRequest 后进行一次额外拉取。FetchConfigRequest 至少需要包括 name 和 version。 - 心跳 response flag 需要设置 FetchPipelineConfigDetail. + 心跳 response flag 需要设置 FetchContinuousPipelineConfigDetail. 客户端以下2种实现 -实现1:直接将Detail返回在心跳响应中(FetchPipelineConfigDetail flag is unset) +实现1:直接将Detail返回在心跳响应中(FetchContinuousPipelineConfigDetail flag is unset) ![image](https://github.com/alibaba/ilogtail/assets/1827594/be645615-dd99-42dd-9deb-681e9a4069bb) -实现2:仅返回配置名和版本,Detail使用单独请求获取(FetchPipelineConfigDetail flag is set) +实现2:仅返回配置名和版本,Detail使用单独请求获取(FetchContinuousPipelineConfigDetail flag is set) ![image](https://github.com/alibaba/ilogtail/assets/1827594/c409c35c-2a81-4927-bfd2-7fb321ef1ca8) @@ -287,6 +279,7 @@ Client获取到多个进程配置时,自动合并,若产生冲突默认行 对于 Server:这些信息是Agent状态的一部分,可选保存。与通过Event上报可观测信息不同的是,作为状态信息没有时间属性,用户可通过接口可获取即刻状态,而不需要选择时间窗口合并事件。 同进程配置和采集配置,上报配置状态也有两种可选实现: + 1. 在心跳 request 中将配置最新状态带上。 在心跳中将进程配置和采集配置的最新版本和状态一起上报。另外按照心跳协议的定义,配置状态变更后,要求在心跳一定要上报配置最新状态,如果相较于上一次心跳配置状态无变化,则不要求。 @@ -296,7 +289,9 @@ Client获取到多个进程配置时,自动合并,若产生冲突默认行 通过 ReportStatus 额外接口去上报,能够在一定程度上减少心跳服务的复杂度,有利于状态服务和心跳服务的拆分。ReportStatus 接口不用等到下一次心跳,在配置状态发生变化即可上报。 ### 心跳配置拉取/上报与额外接口拉取/上报选择 + 配置状态上报的方式应该和配置拉取方式配套使用: + 1. 如果进程配置和采集配置都通过心跳下发,状态配置也仅应该通过心跳上报。 2. 如果进程配置和采集配置都通过 FetchConfig 接口拉取,状态上报也应该通过 ReportStatus 上报。 @@ -315,7 +310,7 @@ Server: 通过response的flag传递,定义了ReportFullStatus,表明要求C Client: 为了防止服务端重复下发命令以及感知命令执行结果,在command expire前,Client始终应具备向服务端上报command执行状态的能力,实际是否上报取决于心跳压缩机制。在expire\_time超过后,client不应该再上报超时的command状态。 -Server: 如果上报+已知的Agent状态中,缺少应下发的custom\_command\_updates(通过name识别),那么server应该在响应中下发缺少的custom\_command\_updates。 +Server: 如果上报+已知的Agent状态中,缺少应下发的onetime\_pipeline\_config\_updates(通过name识别),那么server应该在响应中下发缺少的onetime\_pipeline\_config\_updates。 ### 异常处理 @@ -324,6 +319,7 @@ Server: 服务端正常返回时HeartbeatResponse中的code应始终设置为0 Client: 当HeartbeatResponse中的code为0时,Agent应该正常处理下发的配置。当HeartbeatResponse中的code不为0时,Agent必须忽略除code和message外的其他字段,并择机重试。 ### 辅助信息 -在command\_info, command\_detail, config\_info, config\_detail中,都预留了extra字段,可以用于传递一些额外的用户自定义的辅助信息。\ + +在command\_detail, config\_info, config\_detail中,都预留了extra字段,可以用于传递一些额外的用户自定义的辅助信息。\ 注意:extra字段仅作传递辅助信息使用,不会对管控行为造成任何影响。 diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index 40b6338e30..6d557dedde 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -315,7 +315,7 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { configserver::proto::v2::HeartbeatResponse heartbeatRespPb; heartbeatRespPb.set_capabilities(configserver::proto::v2::ResponseFlags::ReportFullState); { - auto pipeline = heartbeatRespPb.mutable_pipeline_config_updates(); + auto pipeline = heartbeatRespPb.mutable_continuous_pipeline_config_updates(); auto configDetail = pipeline->Add(); configDetail->set_name("config1"); configDetail->set_version(1); @@ -510,7 +510,8 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { APSARA_TEST_EQUAL(heartbeatReq.sequence_num(), sequence_num); sequence_num++; APSARA_TEST_TRUE(heartbeatReq.capabilities() & configserver::proto::v2::AcceptsInstanceConfig); - APSARA_TEST_TRUE(heartbeatReq.capabilities() & configserver::proto::v2::AcceptsPipelineConfig); + APSARA_TEST_TRUE(heartbeatReq.capabilities() + & configserver::proto::v2::AcceptsContinuousPipelineConfig); APSARA_TEST_EQUAL(heartbeatReq.instance_id(), provider.GetInstanceId()); APSARA_TEST_EQUAL(heartbeatReq.agent_type(), "LoongCollector"); APSARA_TEST_EQUAL(heartbeatReq.attributes().ip(), LoongCollectorMonitor::mIpAddr); @@ -531,7 +532,7 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { heartbeatRespPb.set_capabilities(configserver::proto::v2::ResponseFlags::ReportFullState); // pipeline { - auto pipeline = heartbeatRespPb.mutable_pipeline_config_updates(); + auto pipeline = heartbeatRespPb.mutable_continuous_pipeline_config_updates(); auto configDetail = pipeline->Add(); configDetail->set_name("config1"); configDetail->set_version(-1); From 6249ce218af5794af07ffcf02c443912e95baff7 Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Wed, 20 Nov 2024 07:16:48 +0000 Subject: [PATCH 07/10] fix --- config_server/protocol/v2/README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/config_server/protocol/v2/README.md b/config_server/protocol/v2/README.md index a3297c4faa..e48ab1a8d2 100644 --- a/config_server/protocol/v2/README.md +++ b/config_server/protocol/v2/README.md @@ -74,7 +74,7 @@ // The Agent can accept onetime pipeline configuration from the Server. AcceptsOnetimePipelineConfig = 0x00000004; - // Add new capabilities here, continuing with the least significant unused bit. + // bits before 2^16 (inclusive) are reserved for future official fields } enum RequestFlags { From 9e9a1a743c13833f9d8d6850184df10a2ca2ff63 Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Wed, 20 Nov 2024 07:19:35 +0000 Subject: [PATCH 08/10] fix --- core/unittest/config/CommonConfigProviderUnittest.cpp | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/core/unittest/config/CommonConfigProviderUnittest.cpp b/core/unittest/config/CommonConfigProviderUnittest.cpp index 6d557dedde..0621cf0f26 100644 --- a/core/unittest/config/CommonConfigProviderUnittest.cpp +++ b/core/unittest/config/CommonConfigProviderUnittest.cpp @@ -383,10 +383,9 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { )"); } { - auto commandconfig = heartbeatRespPb.mutable_custom_command_updates(); + auto commandconfig = heartbeatRespPb.mutable_onetime_pipeline_config_updates(); auto configDetail = commandconfig->Add(); configDetail->set_name("commandconfig1"); - configDetail->set_type("history"); configDetail->set_detail(R"( { "enable": true, @@ -602,10 +601,9 @@ void CommonConfigProviderUnittest::TestGetConfigUpdateAndConfigWatcher() { } // commandconfig { - auto commandconfig = heartbeatRespPb.mutable_custom_command_updates(); + auto commandconfig = heartbeatRespPb.mutable_onetime_pipeline_config_updates(); auto configDetail = commandconfig->Add(); configDetail->set_name("commandconfig1"); - configDetail->set_type("history"); configDetail->set_detail(R"( { "enable": true, From 2b91298a57f8452305f31a7fd18365f4fa3de175 Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Wed, 20 Nov 2024 08:35:44 +0000 Subject: [PATCH 09/10] fix e2e --- core/application/Application.cpp | 2 +- core/unittest/config/ConfigUpdateUnittest.cpp | 2 +- core/unittest/config/ConfigWatcherUnittest.cpp | 8 ++++---- scripts/dist.sh | 2 +- scripts/gen_build_scripts.sh | 2 +- .../docker-compose.yaml | 2 +- .../docker-compose.yaml | 2 +- test/engine/setup/dockercompose/compose.go | 2 +- 8 files changed, 11 insertions(+), 11 deletions(-) diff --git a/core/application/Application.cpp b/core/application/Application.cpp index 8dbfa6c077..42c0fbc3f3 100644 --- a/core/application/Application.cpp +++ b/core/application/Application.cpp @@ -218,7 +218,7 @@ void Application::Start() { // GCOVR_EXCL_START filesystem::create_directories(localConfigPath, ec); if (ec) { LOG_WARNING(sLogger, - ("failed to create dir for local pipeline_config", + ("failed to create dir for local continuous_pipeline_config", "manual creation may be required")("error code", ec.value())("error msg", ec.message())); } PipelineConfigWatcher::GetInstance()->AddSource(localConfigPath.string()); diff --git a/core/unittest/config/ConfigUpdateUnittest.cpp b/core/unittest/config/ConfigUpdateUnittest.cpp index c72f645e1a..86c7c7ae84 100644 --- a/core/unittest/config/ConfigUpdateUnittest.cpp +++ b/core/unittest/config/ConfigUpdateUnittest.cpp @@ -101,7 +101,7 @@ class ConfigUpdateUnittest : public testing::Test { void PrepareInitialSettings() const; void GenerateInitialConfigs() const; - filesystem::path configDir = "./pipeline_config"; + filesystem::path configDir = "./continuous_pipeline_config"; vector pipelineConfigPaths = {configDir / "pipeline_invalid_format.json", configDir / "pipeline_invalid_detail.json", configDir / "pipeline_enabled_valid.json", diff --git a/core/unittest/config/ConfigWatcherUnittest.cpp b/core/unittest/config/ConfigWatcherUnittest.cpp index 4edec0c4f2..7ad1cc1fe0 100644 --- a/core/unittest/config/ConfigWatcherUnittest.cpp +++ b/core/unittest/config/ConfigWatcherUnittest.cpp @@ -16,8 +16,8 @@ #include #include "config/ConfigDiff.h" -#include "config/watcher/PipelineConfigWatcher.h" #include "config/watcher/InstanceConfigWatcher.h" +#include "config/watcher/PipelineConfigWatcher.h" #include "pipeline/plugin/PluginRegistry.h" #include "unittest/Unittest.h" @@ -44,7 +44,7 @@ class ConfigWatcherUnittest : public testing::Test { static const filesystem::path instanceConfigDir; }; -const filesystem::path ConfigWatcherUnittest::configDir = "./pipeline_config"; +const filesystem::path ConfigWatcherUnittest::configDir = "./continuous_pipeline_config"; const filesystem::path ConfigWatcherUnittest::instanceConfigDir = "./instance_config"; void ConfigWatcherUnittest::InvalidConfigDirFound() const { @@ -53,11 +53,11 @@ void ConfigWatcherUnittest::InvalidConfigDirFound() const { APSARA_TEST_TRUE(diff.first.IsEmpty()); APSARA_TEST_TRUE(diff.second.IsEmpty()); - { ofstream fout("pipeline_config"); } + { ofstream fout("continuous_pipeline_config"); } diff = PipelineConfigWatcher::GetInstance()->CheckConfigDiff(); APSARA_TEST_TRUE(diff.first.IsEmpty()); APSARA_TEST_TRUE(diff.second.IsEmpty()); - filesystem::remove_all("pipeline_config"); + filesystem::remove_all("continuous_pipeline_config"); } { InstanceConfigDiff diff = InstanceConfigWatcher::GetInstance()->CheckConfigDiff(); diff --git a/scripts/dist.sh b/scripts/dist.sh index 74fbc777f3..946d6c328f 100755 --- a/scripts/dist.sh +++ b/scripts/dist.sh @@ -42,7 +42,7 @@ cp "${ROOTDIR}/${OUT_DIR}/libGoPluginAdapter.so" "${ROOTDIR}/${DIST_DIR}/${PACKA cp "${ROOTDIR}/${OUT_DIR}/libGoPluginBase.so" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}" mkdir -p "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/conf/instance_config/local/" cp "${ROOTDIR}/${OUT_DIR}/conf/instance_config/local/loongcollector_config.json" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/conf/instance_config/local/" -cp -a "${ROOTDIR}/${OUT_DIR}/conf/pipeline_config/local" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/conf" +cp -a "${ROOTDIR}/${OUT_DIR}/conf/continuous_pipeline_config/local" "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/conf" if file "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}/loongcollector" | grep x86-64; then ./scripts/download_ebpflib.sh "${ROOTDIR}/${DIST_DIR}/${PACKAGE_DIR}"; fi # Splitting debug info at build time with -gsplit-dwarf does not work with current gcc version diff --git a/scripts/gen_build_scripts.sh b/scripts/gen_build_scripts.sh index f1116b894b..9ad32874d0 100755 --- a/scripts/gen_build_scripts.sh +++ b/scripts/gen_build_scripts.sh @@ -122,7 +122,7 @@ function generateCopyScript() { fi echo 'mkdir -p $BINDIR/conf/instance_config/local/' >>$COPY_SCRIPT_FILE echo 'echo -e "{\n}" > $BINDIR/conf/instance_config/local/loongcollector_config.json' >>$COPY_SCRIPT_FILE - echo 'mkdir -p $BINDIR/conf/pipeline_config/local' >>$COPY_SCRIPT_FILE + echo 'mkdir -p $BINDIR/conf/continuous_pipeline_config/local' >>$COPY_SCRIPT_FILE echo 'docker rm -v "$id"' >>$COPY_SCRIPT_FILE } diff --git a/test/benchmark/test_cases/performance_file_to_blackhole_ilogtail/docker-compose.yaml b/test/benchmark/test_cases/performance_file_to_blackhole_ilogtail/docker-compose.yaml index f5f31279dc..47545cfb41 100644 --- a/test/benchmark/test_cases/performance_file_to_blackhole_ilogtail/docker-compose.yaml +++ b/test/benchmark/test_cases/performance_file_to_blackhole_ilogtail/docker-compose.yaml @@ -18,7 +18,7 @@ services: ilogtailC: image: aliyun/loongcollector:0.0.1 volumes: - - ./ilogtail.yaml:/loongcollector/conf/pipeline_config/local/ilogtail.yaml + - ./ilogtail.yaml:/loongcollector/conf/continuous_pipeline_config/local/ilogtail.yaml - .:/home/ilogtail healthcheck: test: "cat /loongcollector/log/loongcollector.LOG" diff --git a/test/benchmark/test_cases/performance_file_to_blackhole_ilogtailspl/docker-compose.yaml b/test/benchmark/test_cases/performance_file_to_blackhole_ilogtailspl/docker-compose.yaml index f5f31279dc..47545cfb41 100644 --- a/test/benchmark/test_cases/performance_file_to_blackhole_ilogtailspl/docker-compose.yaml +++ b/test/benchmark/test_cases/performance_file_to_blackhole_ilogtailspl/docker-compose.yaml @@ -18,7 +18,7 @@ services: ilogtailC: image: aliyun/loongcollector:0.0.1 volumes: - - ./ilogtail.yaml:/loongcollector/conf/pipeline_config/local/ilogtail.yaml + - ./ilogtail.yaml:/loongcollector/conf/continuous_pipeline_config/local/ilogtail.yaml - .:/home/ilogtail healthcheck: test: "cat /loongcollector/log/loongcollector.LOG" diff --git a/test/engine/setup/dockercompose/compose.go b/test/engine/setup/dockercompose/compose.go index 4163354568..780df589e5 100644 --- a/test/engine/setup/dockercompose/compose.go +++ b/test/engine/setup/dockercompose/compose.go @@ -62,7 +62,7 @@ services: pid: host volumes: - %s:/loongcollector/conf/default_flusher.json - - %s:/loongcollector/conf/pipeline_config/local + - %s:/loongcollector/conf/continuous_pipeline_config/local - /:/logtail_host - /var/run/docker.sock:/var/run/docker.sock - /sys/:/sys/ From 67dad0672f159d65861ebd8e3893a7a2fa7fffd1 Mon Sep 17 00:00:00 2001 From: quzard <1191890118@qq.com> Date: Wed, 20 Nov 2024 08:37:04 +0000 Subject: [PATCH 10/10] common_response --- config_server/protocol/v2/README.md | 6 +++--- config_server/protocol/v2/agentV2.proto | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/config_server/protocol/v2/README.md b/config_server/protocol/v2/README.md index e48ab1a8d2..324f2f8523 100644 --- a/config_server/protocol/v2/README.md +++ b/config_server/protocol/v2/README.md @@ -91,7 +91,7 @@ message HeartbeatResponse { bytes request_id = 1; - CommonResponse commonResponse = 2; // Set common response + CommonResponse common_response = 2; // Set common response uint64 capabilities = 3; // Bitmask of flags defined by ServerCapabilities enum repeated ConfigDetail continuous_pipeline_config_updates = 4; // Agent's continuous pipeline config update status @@ -169,7 +169,7 @@ message FetchConfigResponse { bytes request_id = 1; - CommonResponse commonResponse = 2; + CommonResponse common_response = 2; repeated ConfigDetail continuous_pipeline_config_updates = 3; // Agent's continuous pipeline config with details repeated ConfigDetail instance_config_updates = 4; // Agent's instance config with details repeated CommandDetail onetime_pipeline_config_updates = 5; // Agent's onetime pipeline config details @@ -191,7 +191,7 @@ message ReportStatusResponse { bytes request_id = 1; - CommonResponse commonResponse = 2; + CommonResponse common_response = 2; } ## 行为规范 diff --git a/config_server/protocol/v2/agentV2.proto b/config_server/protocol/v2/agentV2.proto index 87ab400912..965e553772 100644 --- a/config_server/protocol/v2/agentV2.proto +++ b/config_server/protocol/v2/agentV2.proto @@ -133,7 +133,7 @@ enum ResponseFlags { // ConfigServer's response to Agent's request message HeartbeatResponse { bytes request_id = 1; - CommonResponse commonResponse = 2; // Set common response + CommonResponse common_response = 2; // Set common response uint64 capabilities = 3; // Bitmask of flags defined by ServerCapabilities enum repeated ConfigDetail continuous_pipeline_config_updates = 4; // Agent's continuous pipeline config update status @@ -156,7 +156,7 @@ message FetchConfigRequest { // ConfigServer response to Agent's config fetching request message FetchConfigResponse { bytes request_id = 1; - CommonResponse commonResponse = 2; + CommonResponse common_response = 2; repeated ConfigDetail continuous_pipeline_config_updates = 3; // Agent's continuous pipeline config with details repeated ConfigDetail instance_config_updates = 4; // Agent's instance config with details repeated CommandDetail onetime_pipeline_config_updates = 5; // Agent's onetime pipeline config details @@ -176,7 +176,7 @@ message ReportStatusRequest { // ConfigServer response to Agent's report status request message ReportStatusResponse { bytes request_id = 1; - CommonResponse commonResponse = 2; + CommonResponse common_response = 2; } message CommonResponse {