Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

External Data Sources: RPC implementation for the describe methods + ydb scheme describe support #14509

Merged
merged 4 commits into from
Feb 13, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
91 changes: 91 additions & 0 deletions ydb/core/grpc_services/rpc_describe_external_data_source.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include "rpc_scheme_base.h"
#include "service_table.h"

#include <ydb/core/grpc_services/base/base.h>
#include <ydb/public/api/protos/ydb_table.pb.h>

namespace NKikimr::NGRpcService {

using namespace NActors;
using namespace NYql;

using TEvDescribeExternalDataSourceRequest = TGrpcRequestOperationCall<
Ydb::Table::DescribeExternalDataSourceRequest,
Ydb::Table::DescribeExternalDataSourceResponse
>;

class TDescribeExternalDataSourceRPC : public TRpcSchemeRequestActor<TDescribeExternalDataSourceRPC, TEvDescribeExternalDataSourceRequest> {
using TBase = TRpcSchemeRequestActor<TDescribeExternalDataSourceRPC, TEvDescribeExternalDataSourceRequest>;

public:

using TBase::TBase;

void Bootstrap() {
DescribeScheme();
}

private:

void DescribeScheme() {
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
SetAuthToken(ev, *Request_);
SetDatabase(ev.get(), *Request_);
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());

Send(MakeTxProxyID(), ev.release());
Become(&TDescribeExternalDataSourceRPC::StateDescribeScheme);
}

STATEFN(StateDescribeScheme) {
switch (ev->GetTypeRewrite()) {
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->GetRecord();
const auto& pathDescription = record.GetPathDescription();

if (record.HasReason()) {
Request_->RaiseIssue(TIssue(record.GetReason()));
}

switch (record.GetStatus()) {
case NKikimrScheme::StatusSuccess: {
if (pathDescription.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeExternalDataSource) {
Request_->RaiseIssue(TIssue(
TStringBuilder() << "Unexpected path type: " << pathDescription.GetSelf().GetPathType()
));
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

return ReplyWithResult(
Ydb::StatusIds::SUCCESS,
Ydb::Table::DescribeExternalDataSourceResult(), // to do: convert private protobuf to public
ctx
);
}
case NKikimrScheme::StatusPathDoesNotExist:
case NKikimrScheme::StatusSchemeError:
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);

case NKikimrScheme::StatusAccessDenied:
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);

case NKikimrScheme::StatusNotAvailable:
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);

default:
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
}
}
};

void DoDescribeExternalDataSourceRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
f.RegisterActor(new TDescribeExternalDataSourceRPC(p.release()));
}

}
91 changes: 91 additions & 0 deletions ydb/core/grpc_services/rpc_describe_external_table.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
#include "rpc_scheme_base.h"
#include "service_table.h"

#include <ydb/core/grpc_services/base/base.h>
#include <ydb/public/api/protos/ydb_table.pb.h>

namespace NKikimr::NGRpcService {

using namespace NActors;
using namespace NYql;

using TEvDescribeExternalTableRequest = TGrpcRequestOperationCall<
Ydb::Table::DescribeExternalTableRequest,
Ydb::Table::DescribeExternalTableResponse
>;

class TDescribeExternalTableRPC : public TRpcSchemeRequestActor<TDescribeExternalTableRPC, TEvDescribeExternalTableRequest> {
using TBase = TRpcSchemeRequestActor<TDescribeExternalTableRPC, TEvDescribeExternalTableRequest>;

public:

using TBase::TBase;

void Bootstrap() {
DescribeScheme();
}

private:

void DescribeScheme() {
auto ev = std::make_unique<TEvTxUserProxy::TEvNavigate>();
SetAuthToken(ev, *Request_);
SetDatabase(ev.get(), *Request_);
ev->Record.MutableDescribePath()->SetPath(GetProtoRequest()->path());

Send(MakeTxProxyID(), ev.release());
Become(&TDescribeExternalTableRPC::StateDescribeScheme);
}

STATEFN(StateDescribeScheme) {
switch (ev->GetTypeRewrite()) {
HFunc(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult, Handle);
default:
return TBase::StateWork(ev);
}
}

void Handle(NSchemeShard::TEvSchemeShard::TEvDescribeSchemeResult::TPtr& ev, const TActorContext& ctx) {
const auto& record = ev->Get()->GetRecord();
const auto& pathDescription = record.GetPathDescription();

if (record.HasReason()) {
Request_->RaiseIssue(TIssue(record.GetReason()));
}

switch (record.GetStatus()) {
case NKikimrScheme::StatusSuccess: {
if (pathDescription.GetSelf().GetPathType() != NKikimrSchemeOp::EPathTypeExternalTable) {
Request_->RaiseIssue(TIssue(
TStringBuilder() << "Unexpected path type: " << pathDescription.GetSelf().GetPathType()
));
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);
}

return ReplyWithResult(
Ydb::StatusIds::SUCCESS,
Ydb::Table::DescribeExternalTableResult(), // to do: convert private proto to public
ctx
);
}
case NKikimrScheme::StatusPathDoesNotExist:
case NKikimrScheme::StatusSchemeError:
return Reply(Ydb::StatusIds::SCHEME_ERROR, ctx);

case NKikimrScheme::StatusAccessDenied:
return Reply(Ydb::StatusIds::UNAUTHORIZED, ctx);

case NKikimrScheme::StatusNotAvailable:
return Reply(Ydb::StatusIds::UNAVAILABLE, ctx);

default:
return Reply(Ydb::StatusIds::GENERIC_ERROR, ctx);
}
}
};

void DoDescribeExternalTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f) {
f.RegisterActor(new TDescribeExternalTableRPC(p.release()));
}

}
2 changes: 2 additions & 0 deletions ydb/core/grpc_services/service_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@ void DoCopyTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvide
void DoCopyTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoRenameTablesRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoDescribeTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoDescribeExternalDataSourceRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoDescribeExternalTableRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoCreateSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoDeleteSessionRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
void DoKeepAliveRequest(std::unique_ptr<IRequestOpCtx> p, const IFacilityProvider& f);
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/grpc_services/ya.make
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ SRCS(
rpc_describe_path.cpp
rpc_describe_table.cpp
rpc_describe_table_options.cpp
rpc_describe_external_data_source.cpp
rpc_describe_external_table.cpp
rpc_drop_coordination_node.cpp
rpc_drop_table.cpp
rpc_discovery.cpp
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/jaeger_tracing/request_discriminator.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,8 @@ extern const THashMap<TStringBuf, ERequestType> NameToRequestType = {
{"Table.StreamExecuteScanQuery", ERequestType::TABLE_STREAMEXECUTESCANQUERY},
{"Table.StreamReadTable", ERequestType::TABLE_STREAMREADTABLE},
{"Table.ReadRows", ERequestType::TABLE_READROWS},
{"Table.DescribeExternalDataSource", ERequestType::TABLE_DESCRIBEEXTERNALDATASOURCE},
{"Table.DescribeExternalTable", ERequestType::TABLE_DESCRIBEEXTERNALTABLE},

{"Query.ExecuteQuery", ERequestType::QUERY_EXECUTEQUERY},
{"Query.ExecuteScript", ERequestType::QUERY_EXECUTESCRIPT},
Expand Down
2 changes: 2 additions & 0 deletions ydb/core/jaeger_tracing/request_discriminator.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,8 @@ enum class ERequestType: size_t {
TABLE_STREAMEXECUTESCANQUERY,
TABLE_STREAMREADTABLE,
TABLE_READROWS,
TABLE_DESCRIBEEXTERNALDATASOURCE,
TABLE_DESCRIBEEXTERNALTABLE,

QUERY_EXECUTEQUERY,
QUERY_EXECUTESCRIPT,
Expand Down
34 changes: 34 additions & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_scheme.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,10 @@ int TCommandDescribe::PrintPathResponse(TDriver& driver, const NScheme::TDescrib
return DescribeReplication(driver);
case NScheme::ESchemeEntryType::View:
return DescribeView(driver);
case NScheme::ESchemeEntryType::ExternalDataSource:
return DescribeExternalDataSource(driver);
case NScheme::ESchemeEntryType::ExternalTable:
return DescribeExternalTable(driver);
default:
return DescribeEntryDefault(entry);
}
Expand Down Expand Up @@ -615,6 +619,36 @@ int TCommandDescribe::DescribeView(const TDriver& driver) {
return PrintDescription(this, OutputFormat, result, &TCommandDescribe::PrintViewResponsePretty);
}

int TCommandDescribe::PrintExternalDataSourceResponsePretty(const NYdb::NTable::TExternalDataSourceDescription& description) const {
// to do
jepett0 marked this conversation as resolved.
Show resolved Hide resolved
return EXIT_SUCCESS;
}

int TCommandDescribe::DescribeExternalDataSource(const TDriver& driver) {
NTable::TTableClient client(driver);
const auto sessionResult = client.CreateSession().ExtractValueSync();
NStatusHelpers::ThrowOnErrorOrPrintIssues(sessionResult);
const auto description = sessionResult.GetSession().DescribeExternalDataSource(Path).ExtractValueSync();
NStatusHelpers::ThrowOnErrorOrPrintIssues(description);

return PrintDescription(this, OutputFormat, description.GetExternalDataSourceDescription(), &TCommandDescribe::PrintExternalDataSourceResponsePretty);
}

int TCommandDescribe::PrintExternalTableResponsePretty(const NYdb::NTable::TExternalTableDescription& description) const {
// to do
return EXIT_SUCCESS;
}

int TCommandDescribe::DescribeExternalTable(const TDriver& driver) {
NTable::TTableClient client(driver);
const auto sessionResult = client.CreateSession().ExtractValueSync();
NStatusHelpers::ThrowOnErrorOrPrintIssues(sessionResult);
const auto result = sessionResult.GetSession().DescribeExternalTable(Path).ExtractValueSync();
NStatusHelpers::ThrowOnErrorOrPrintIssues(result);

return PrintDescription(this, OutputFormat, result.GetExternalTableDescription(), &TCommandDescribe::PrintExternalTableResponsePretty);
}

namespace {
void PrintColumns(const NTable::TTableDescription& tableDescription) {
if (!tableDescription.GetTableColumns().size()) {
Expand Down
6 changes: 6 additions & 0 deletions ydb/public/lib/ydb_cli/commands/ydb_service_scheme.h
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,12 @@ class TCommandDescribe : public TYdbOperationCommand, public TCommandWithPath, p
int DescribeView(const TDriver& driver);
int PrintViewResponsePretty(const NYdb::NView::TDescribeViewResult& result) const;

int DescribeExternalDataSource(const TDriver& driver);
int PrintExternalDataSourceResponsePretty(const NYdb::NTable::TExternalDataSourceDescription& result) const;

int DescribeExternalTable(const TDriver& driver);
int PrintExternalTableResponsePretty(const NYdb::NTable::TExternalTableDescription& result) const;

int TryTopicConsumerDescribeOrFail(NYdb::TDriver& driver, const NScheme::TDescribePathResult& result);
std::pair<TString, TString> ParseTopicConsumer() const;
int PrintConsumerResponsePretty(const NYdb::NTopic::TConsumerDescription& description) const;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,8 @@ class TProtoAccessor {
static ::google::protobuf::Map<TStringType, Ydb::TypedValue>* GetProtoMapPtr(TParams& params);
static const Ydb::TableStats::QueryStats& GetProto(const NTable::TQueryStats& queryStats);
static const Ydb::Table::DescribeTableResult& GetProto(const NTable::TTableDescription& tableDescription);
static const Ydb::Table::DescribeExternalDataSourceResult& GetProto(const NTable::TExternalDataSourceDescription&);
static const Ydb::Table::DescribeExternalTableResult& GetProto(const NTable::TExternalTableDescription&);
static const Ydb::Topic::DescribeTopicResult& GetProto(const NYdb::NTopic::TTopicDescription& topicDescription);
static const Ydb::Topic::DescribeConsumerResult& GetProto(const NYdb::NTopic::TConsumerDescription& consumerDescription);
static const Ydb::Monitoring::SelfCheckResult& GetProto(const NYdb::NMonitoring::TSelfCheckResult& selfCheckResult);
Expand Down
8 changes: 8 additions & 0 deletions ydb/public/sdk/cpp/src/client/table/proto_accessor.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,14 @@ const Ydb::Table::DescribeTableResult& TProtoAccessor::GetProto(const NTable::TT
return tableDescription.GetProto();
}

const Ydb::Table::DescribeExternalDataSourceResult& TProtoAccessor::GetProto(const NTable::TExternalDataSourceDescription& description) {
return description.GetProto();
}

const Ydb::Table::DescribeExternalTableResult& TProtoAccessor::GetProto(const NTable::TExternalTableDescription& description) {
return description.GetProto();
}

NTable::TQueryStats TProtoAccessor::FromProto(const Ydb::TableStats::QueryStats& queryStats) {
return NTable::TQueryStats(queryStats);
}
Expand Down
2 changes: 2 additions & 0 deletions ydb/services/ydb/ydb_table.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,8 @@ void TGRpcYdbTableService::SetupIncomingRequests(NYdbGrpc::TLoggerPtr logger) {
ADD_REQUEST_LIMIT(CreateTable, DoCreateTableRequest, Rps, CREATETABLE)
ADD_REQUEST_LIMIT(DropTable, DoDropTableRequest, Rps, DROPTABLE)
ADD_REQUEST_LIMIT(DescribeTable, DoDescribeTableRequest, Rps, DESCRIBETABLE)
ADD_REQUEST_LIMIT(DescribeExternalDataSource, DoDescribeExternalDataSourceRequest, Rps, DESCRIBEEXTERNALDATASOURCE)
ADD_REQUEST_LIMIT(DescribeExternalTable, DoDescribeExternalTableRequest, Rps, DESCRIBEEXTERNALTABLE)
ADD_REQUEST_LIMIT(CopyTable, DoCopyTableRequest, Rps, COPYTABLE)
ADD_REQUEST_LIMIT(CopyTables, DoCopyTablesRequest, Rps, COPYTABLES)
ADD_REQUEST_LIMIT(RenameTables, DoRenameTablesRequest, Rps, RENAMETABLES)
Expand Down
Loading