Skip to content

Commit

Permalink
native entrance of insert plan
Browse files Browse the repository at this point in the history
  • Loading branch information
Matagits committed Apr 8, 2024
1 parent 67c7796 commit 589712e
Show file tree
Hide file tree
Showing 5 changed files with 71 additions and 7 deletions.
29 changes: 29 additions & 0 deletions src/client/taskmanager_client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -228,6 +228,35 @@ ::openmldb::base::Status TaskManagerClient::ExportOfflineData(const std::string&
return st;
}

::openmldb::base::Status TaskManagerClient::InsertOfflineData(const std::string& sql,
const std::map<std::string, std::string>& config,
const std::string& default_db, bool sync_job,
int job_timeout,
::openmldb::taskmanager::JobInfo* job_info) {
::openmldb::taskmanager::InsertOfflineDataRequest request;
::openmldb::taskmanager::ShowJobResponse response;

request.set_sql(sql);
request.set_default_db(default_db);
request.set_sync_job(sync_job);
for (const auto& it : config) {
(*request.mutable_conf())[it.first] = it.second;
}

auto st = client_.SendRequestSt(&::openmldb::taskmanager::TaskManagerServer_Stub::InsertOfflineData, &request,
&response, job_timeout, 1);

if (st.OK()) {
if (response.code() == 0) {
if (response.has_job()) {
job_info->CopyFrom(response.job());
}
}
return {response.code(), response.msg()};
}
return st;
}

::openmldb::base::Status TaskManagerClient::DropOfflineTable(const std::string& db, const std::string& table,
int job_timeout) {
::openmldb::taskmanager::DropOfflineTableRequest request;
Expand Down
4 changes: 4 additions & 0 deletions src/client/taskmanager_client.h
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ class TaskManagerClient : public Client {
const std::string& default_db, bool sync_job, int job_timeout,
::openmldb::taskmanager::JobInfo* job_info);

::openmldb::base::Status InsertOfflineData(const std::string& sql, const std::map<std::string, std::string>& config,
const std::string& default_db, bool sync_job, int job_timeout,
::openmldb::taskmanager::JobInfo* job_info);

::openmldb::base::Status DropOfflineTable(const std::string& db, const std::string& table, int job_timeout);

::openmldb::base::Status CreateFunction(const std::shared_ptr<::openmldb::common::ExternalFun>& fun,
Expand Down
8 changes: 8 additions & 0 deletions src/proto/taskmanager.proto
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,13 @@ message ExportOfflineDataRequest {
optional bool sync_job = 4 [default = false];
}

message InsertOfflineDataRequest {
required string sql = 1;
map<string, string> conf = 2;
optional string default_db = 3 [default = ""];
optional bool sync_job = 4 [default = false];
}

message DropOfflineTableRequest {
required string db = 1;
required string table = 2;
Expand Down Expand Up @@ -174,6 +181,7 @@ service TaskManagerServer {
rpc ImportOnlineData(ImportOnlineDataRequest) returns (ShowJobResponse);
rpc ImportOfflineData(ImportOfflineDataRequest) returns (ShowJobResponse);
rpc ExportOfflineData(ExportOfflineDataRequest) returns (ShowJobResponse);
rpc InsertOfflineData(InsertOfflineDataRequest) returns (ShowJobResponse);
rpc DropOfflineTable(DropOfflineTableRequest) returns (DropOfflineTableResponse);
rpc GetJobLog(GetJobLogRequest) returns (GetJobLogResponse);
rpc CreateFunction(CreateFunctionRequest) returns (CreateFunctionResponse);
Expand Down
33 changes: 26 additions & 7 deletions src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -2555,6 +2555,18 @@ ::openmldb::base::Status SQLClusterRouter::ExportOfflineData(const std::string&
return taskmanager_client_ptr->ExportOfflineData(sql, config, default_db, sync_job, job_timeout, job_info);
}

::openmldb::base::Status SQLClusterRouter::InsertOfflineData(const std::string& sql,
const std::map<std::string, std::string>& config,
const std::string& default_db, bool sync_job,
int job_timeout,
::openmldb::taskmanager::JobInfo* job_info) {
auto taskmanager_client_ptr = cluster_sdk_->GetTaskManagerClient();
if (!taskmanager_client_ptr) {
return {base::ReturnCode::kServerConnError, "Fail to get TaskManager client"};
}
return taskmanager_client_ptr->InsertOfflineData(sql, config, default_db, sync_job, job_timeout, job_info);
}

::openmldb::base::Status SQLClusterRouter::CreatePreAggrTable(const std::string& aggr_db, const std::string& aggr_table,
const ::openmldb::base::LongWindowInfo& window_info,
const ::openmldb::nameserver::TableInfo& base_table_info,
Expand Down Expand Up @@ -2801,14 +2813,21 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteSQL(
return {};
}
case hybridse::node::kPlanTypeInsert: {
if (cluster_sdk_->IsClusterMode() && !is_online_mode) {
// Not support for inserting into offline storage
*status = {StatusCode::kCmdError,
"Can not insert in offline mode, please set @@SESSION.execute_mode='online'"};
return {};
if (!cluster_sdk_->IsClusterMode() || is_online_mode) {
ExecuteInsert(db, sql, status);
} else {
::openmldb::taskmanager::JobInfo job_info;
std::map<std::string, std::string> config = ParseSparkConfigString(GetSparkConfig());
ReadSparkConfFromFile(std::dynamic_pointer_cast<SQLRouterOptions>(options_)->spark_conf_path, &config);
AddUserToConfig(&config);

auto base_status = InsertOfflineData(sql, config, db, is_sync_job, offline_job_timeout, &job_info);
if (base_status.OK()) {
return this->GetJobResultSet(job_info.id(), status);
} else {
*status = {StatusCode::kCmdError, base_status.msg};
}
}
// if db name has been specified in sql, db parameter will be ignored
ExecuteInsert(db, sql, status);
return {};
}
case hybridse::node::kPlanTypeDeploy: {
Expand Down
4 changes: 4 additions & 0 deletions src/sdk/sql_cluster_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ class SQLClusterRouter : public SQLRouter {
const std::string& default_db, bool sync_job, int job_timeout,
::openmldb::taskmanager::JobInfo* job_info);

::openmldb::base::Status InsertOfflineData(const std::string& sql, const std::map<std::string, std::string>& config,
const std::string& default_db, bool sync_job, int job_timeout,
::openmldb::taskmanager::JobInfo* job_info);

::openmldb::base::Status CreatePreAggrTable(const std::string& aggr_db, const std::string& aggr_table,
const ::openmldb::base::LongWindowInfo& window_info,
const ::openmldb::nameserver::TableInfo& base_table_info,
Expand Down

0 comments on commit 589712e

Please sign in to comment.