Skip to content

Commit

Permalink
rework grpc monitoring (#11191)
Browse files Browse the repository at this point in the history
  • Loading branch information
uzhastik authored Nov 1, 2024
1 parent f2cdb34 commit ef036a2
Show file tree
Hide file tree
Showing 5 changed files with 54 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
#include <ydb/library/security/ydb_credentials_provider_factory.h>

#include <ydb/public/lib/fq/scope.h>
#include <ydb/public/sdk/cpp/client/resources/ydb_resources.h>
#include <ydb/public/sdk/cpp/client/ydb_query/client.h>
#include <ydb/public/sdk/cpp/client/ydb_operation/operation.h>

Expand Down Expand Up @@ -308,6 +309,18 @@ class TCreateDatabaseRequestActor : public NActors::TActorBootstrapped<TCreateDa
};
};

bool IsValidLoadControlConfig(const NConfig::TLoadControlConfig& config) {
if (!config.GetEnable()) {
return true;
}

const auto& databaseConnection = config.GetDatabaseConnection();
if (!databaseConnection.GetDatabase()) {
return false;
}
return databaseConnection.GetEndpoint() || config.GetMonitoringEndpoint();
}

}

class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrapped<TComputeDatabaseControlPlaneServiceActor> {
Expand Down Expand Up @@ -342,7 +355,7 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
switch (controlPlane.type_case()) {
case NConfig::TYdbComputeControlPlane::TYPE_NOT_SET:
case NConfig::TYdbComputeControlPlane::kSingle:
CreateSingleClientActors(controlPlane.GetSingle());
CreateSingleClientActors();
break;
case NConfig::TYdbComputeControlPlane::kCms:
CreateCmsClientActors(controlPlane.GetCms(), controlPlane.GetDatabasesCacheReloadPeriod());
Expand All @@ -361,11 +374,13 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
if (connection.GetCertificateFile()) {
settings.CertificateRootCA = StripString(TFileInput(connection.GetCertificateFile()).ReadAll());
}
settings.Headers[NYdb::YDB_DATABASE_HEADER] = connection.GetDatabase();
return settings;
}

static NGrpcActorClient::TGrpcClientSettings CreateGrpcClientSettings(const auto& connection) {
static NGrpcActorClient::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) {
NGrpcActorClient::TGrpcClientSettings settings;
const auto& connection = config.GetExecutionConnection();
settings.Endpoint = connection.GetEndpoint();
settings.EnableSsl = connection.GetUseSsl();
if (connection.GetCertificateFile()) {
Expand All @@ -375,23 +390,21 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
return settings;
}

static NGrpcActorClient::TGrpcClientSettings CreateGrpcClientSettings(const NConfig::TComputeDatabaseConfig& config) {
return CreateGrpcClientSettings(config.GetControlPlaneConnection());
}

void CreateSingleClientActors(const NConfig::TYdbComputeControlPlane::TSingle& singleConfig) {
void CreateSingleClientActors() {
auto globalLoadConfig = Config.GetYdb().GetLoadControlConfig();
if (globalLoadConfig.GetEnable()) {
TActorId clientActor;
auto monitoringEndpoint = globalLoadConfig.GetMonitoringEndpoint();
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(singleConfig.GetConnection()))->CreateProvider();
if (monitoringEndpoint) {
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, singleConfig.GetConnection().GetDatabase(), credentialsProvider).release());
} else {
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(singleConfig.GetConnection()), credentialsProvider).release());
}
MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release());
if (!globalLoadConfig.GetEnable() || !IsValidLoadControlConfig(globalLoadConfig)) {
return;
}
TActorId clientActor;
auto monitoringEndpoint = globalLoadConfig.GetMonitoringEndpoint();
const auto& databaseConnection = globalLoadConfig.GetDatabaseConnection();
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(databaseConnection))->CreateProvider();
if (monitoringEndpoint) {
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, databaseConnection.GetDatabase(), credentialsProvider).release());
} else {
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(databaseConnection), credentialsProvider).release());
}
MonitoringActorId = Register(CreateDatabaseMonitoringActor(clientActor, globalLoadConfig, Counters).release());
}

void CreateCmsClientActors(const NConfig::TYdbComputeControlPlane::TCms& cmsConfig, const TString& databasesCacheReloadPeriod) {
Expand All @@ -405,14 +418,15 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
? config.GetLoadControlConfig()
: globalLoadConfig;
if (loadConfig.GetEnable()) {
if (loadConfig.GetEnable() && IsValidLoadControlConfig(loadConfig)) {
TActorId clientActor;
auto monitoringEndpoint = loadConfig.GetMonitoringEndpoint();
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider();
const auto& databaseConnection = loadConfig.GetDatabaseConnection();
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(databaseConnection))->CreateProvider();
if (monitoringEndpoint) {
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, config.GetControlPlaneConnection().GetDatabase(), credentialsProvider).release());
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, databaseConnection.GetDatabase(), credentialsProvider).release());
} else {
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), credentialsProvider).release());
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(databaseConnection), credentialsProvider).release());
}
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, databaseCounters).release());
}
Expand All @@ -429,14 +443,15 @@ class TComputeDatabaseControlPlaneServiceActor : public NActors::TActorBootstrap
const NConfig::TLoadControlConfig& loadConfig = config.GetLoadControlConfig().GetEnable()
? config.GetLoadControlConfig()
: globalLoadConfig;
if (loadConfig.GetEnable()) {
if (loadConfig.GetEnable() && IsValidLoadControlConfig(loadConfig)) {
TActorId clientActor;
auto monitoringEndpoint = loadConfig.GetMonitoringEndpoint();
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(config.GetControlPlaneConnection()))->CreateProvider();
const auto& databaseConnection = loadConfig.GetDatabaseConnection();
auto credentialsProvider = CredentialsProviderFactory(GetYdbCredentialSettings(databaseConnection))->CreateProvider();
if (monitoringEndpoint) {
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, config.GetControlPlaneConnection().GetDatabase(), credentialsProvider).release());
clientActor = Register(CreateMonitoringRestClientActor(monitoringEndpoint, databaseConnection.GetDatabase(), credentialsProvider).release());
} else {
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(config), credentialsProvider).release());
clientActor = Register(CreateMonitoringGrpcClientActor(CreateGrpcClientSettings(databaseConnection), credentialsProvider).release());
}
databaseMonitoringActor = Register(CreateDatabaseMonitoringActor(clientActor, loadConfig, databaseCounters).release());
}
Expand Down
9 changes: 6 additions & 3 deletions ydb/core/fq/libs/compute/ydb/control_plane/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -12,21 +12,24 @@ SRCS(

PEERDIR(
library/cpp/json
ydb/library/actors/core
ydb/library/actors/protos
ydb/library/grpc/actor_client
ydb/core/fq/libs/compute/ydb/synchronization_service
ydb/core/fq/libs/control_plane_storage/proto
ydb/core/fq/libs/quota_manager/proto
ydb/core/kqp/workload_service/common
ydb/core/protos
ydb/library/actors/core
ydb/library/actors/protos
ydb/library/db_pool/protos
ydb/library/grpc/actor_client
ydb/library/yql/public/issue
ydb/library/yql/utils
ydb/library/yql/utils/actors
ydb/public/api/grpc
ydb/public/api/grpc/draft
ydb/public/lib/operation_id/protos
ydb/public/sdk/cpp/client/resources
ydb/public/sdk/cpp/client/ydb_operation
ydb/public/sdk/cpp/client/ydb_query
)

YQL_LAST_ABI_VERSION()
Expand Down
1 change: 1 addition & 0 deletions ydb/core/fq/libs/config/protos/compute.proto
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ message TLoadControlConfig {
bool Strict = 7; // default false, whether to deny execution in load level unavailable
uint32 CpuNumber = 8;
string MonitoringEndpoint = 9; // if defined, will be used as REST API instead of default GRPC
TYdbStorageConfig DatabaseConnection = 10;
}

message TWorkloadManagerConfig {
Expand Down
5 changes: 5 additions & 0 deletions ydb/library/grpc/actor_client/grpc_service_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ class TGrpcServiceClient {
using TServiceConnection = NYdbGrpc::TServiceConnection<TGrpcService>;

NYdbGrpc::TGRpcClientConfig Config;
std::unordered_map<TString, TString> Headers;
NYdbGrpc::TGRpcClientLow Client;
std::unique_ptr<TServiceConnection> Connection;

Expand Down Expand Up @@ -93,6 +94,9 @@ class TGrpcServiceClient {
if (requestId) {
meta.Aux.push_back({"x-request-id", requestId});
}
for (auto [k ,v]: Headers) {
meta.Aux.push_back({k, v});
}

NYdbGrpc::TResponseCallback<TResponseType> callback =
[actorSystem = NActors::TActivationContext::ActorSystem(), prefix = Prefix(requestId), request = ev](NYdbGrpc::TGrpcStatus&& status, TResponseType&& response) -> void {
Expand Down Expand Up @@ -129,6 +133,7 @@ class TGrpcServiceClient {

TGrpcServiceClient(const NGrpcActorClient::TGrpcClientSettings& settings)
: Config(InitGrpcConfig(settings))
, Headers(settings.Headers)
{}
};

Expand Down
2 changes: 2 additions & 0 deletions ydb/library/grpc/actor_client/grpc_service_settings.h
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#pragma once
#include <util/generic/string.h>
#include <util/system/types.h>
#include <unordered_map>

namespace NGrpcActorClient {

Expand All @@ -12,6 +13,7 @@ struct TGrpcClientSettings {
ui32 GrpcKeepAlivePingInterval = 5000;
bool EnableSsl = false;
ui64 RequestTimeoutMs = 10000; // 10 seconds
std::unordered_map<TString, TString> Headers;
};

} // namespace NGrpcActorClient

0 comments on commit ef036a2

Please sign in to comment.