Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support set spark_config and use to request taskmanager #3613

Merged
merged 5 commits into from
Nov 29, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions docs/en/reference/sql/ddl/SET_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ The following format is also equivalent.
| @@session.enable_trace|@@enable_trace | When the value is `true`, an error message stack will be printed when the SQL statement has a syntax error or an error occurs during the plan generation process. <br />When the value is `false`, only the basic error message will be printed if there is a SQL syntax error or an error occurs during the plan generation process. | `true`, <br /> `false` | `false` |
| @@session.sync_job|@@sync_job | When the value is `true`, the offline command will be executed synchronously, waiting for the final result of the execution.<br />When the value is `false`, the offline command returns immediately. If you need to check the execution, please use `SHOW JOB` command. | `true`, <br /> `false` | `false` |
| @@session.sync_timeout|@@sync_timeout | When `sync_job=true`, you can configure the waiting time for synchronization commands. The timeout will return immediately. After the timeout returns, you can still view the command execution through `SHOW JOB`. | Int | 20000 |
| @@session.spark_config|@@spark_config | Set the Spark configuration for offline jobs, configure like 'spark.executor.memory=2g;spark.executor.cores=2'. Notice that the priority of this Spark configuration is higer than TaskManager Spark configuration but lower than CLI Spark configuration file. | String | "" |

## Example

Expand Down
2 changes: 1 addition & 1 deletion docs/zh/openmldb_sql/ddl/SET_STATEMENT.md
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ sessionVariableName ::= '@@'Identifier | '@@session.'Identifier | '@@global.'Ide
| @@session.enable_trace|@@enable_trace | 当该变量值为 `true`,SQL语句有语法错误或者在计划生成过程发生错误时,会打印错误信息栈。<br />当该变量值为 `false`,SQL语句有语法错误或者在计划生成过程发生错误时,仅打印基本错误信息。 | "true" \| "false" | "false" |
| @@session.sync_job|@@sync_job | 当该变量值为 `true`,离线的命令将变为同步,等待执行的最终结果。<br />当该变量值为 `false`,离线的命令即时返回,若要查看命令的执行情况,请使用`SHOW JOB`。 | "true" \| "false" | "false" |
| @@session.job_timeout|@@job_timeout | 可配置离线异步命令或离线管理命令的等待时间(以*毫秒*为单位),将立即返回。离线异步命令返回后仍可通过`SHOW JOB`查看命令执行情况。 | Int | "20000" |

| @@session.spark_config|@@spark_config | 设置离线任务的 Spark 参数,配置项参考 'spark.executor.memory=2g;spark.executor.cores=2'。注意此 Spark 配置优先级高于 TaskManager 默认 Spark 配置,低于命令行的 Spark 配置文件。 | String | "" |
## Example

### 设置和显示会话系统变量
Expand Down
64 changes: 63 additions & 1 deletion src/sdk/sql_cluster_router.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,10 @@
#include <algorithm>
#include <fstream>
#include <future>
#include <iostream>
#include <memory>
#include <string>
#include <sstream>
#include <unordered_map>
#include <utility>

Expand Down Expand Up @@ -319,6 +321,7 @@ bool SQLClusterRouter::Init() {
session_variables_.emplace("enable_trace", "false");
session_variables_.emplace("sync_job", "false");
session_variables_.emplace("job_timeout", "60000"); // rpc request timeout for taskmanager
session_variables_.emplace("spark_config", "");
}
return true;
}
Expand Down Expand Up @@ -2980,7 +2983,7 @@ std::shared_ptr<hybridse::sdk::ResultSet> SQLClusterRouter::ExecuteOfflineQuery(
bool is_sync_job, int job_timeout,
::hybridse::sdk::Status* status) {
RET_IF_NULL_AND_WARN(status, "output status is nullptr");
std::map<std::string, std::string> config;
std::map<std::string, std::string> config = ParseSparkConfigString(GetSparkConfig());
tobegit3hub marked this conversation as resolved.
Show resolved Hide resolved
ReadSparkConfFromFile(std::dynamic_pointer_cast<SQLRouterOptions>(options_)->spark_conf_path, &config);

if (is_sync_job) {
Expand Down Expand Up @@ -3049,6 +3052,16 @@ int SQLClusterRouter::GetJobTimeout() {
return 60000;
}

std::string SQLClusterRouter::GetSparkConfig() {
std::lock_guard<::openmldb::base::SpinMutex> lock(mu_);
auto it = session_variables_.find("spark_config");
if (it != session_variables_.end()) {
return it->second;
}

return "";
}

::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNode* node) {
std::string key = node->Key();
std::transform(key.begin(), key.end(), key.begin(), ::tolower);
Expand Down Expand Up @@ -3083,13 +3096,34 @@ ::hybridse::sdk::Status SQLClusterRouter::SetVariable(hybridse::node::SetPlanNod
if (!absl::SimpleAtoi(value, &new_timeout)) {
return {StatusCode::kCmdError, "Fail to parse value, can't set the request timeout"};
}
} else if (key == "spark_config") {
if (!CheckSparkConfigString(value)) {
return {
StatusCode::kCmdError,
"Fail to parse spark config, set like 'spark.executor.memory=2g;spark.executor.cores=2'"
};
}
} else {
return {};
}
session_variables_[key] = value;
return {};
}

bool SQLClusterRouter::CheckSparkConfigString(const std::string& input) {
std::istringstream iss(input);
std::string keyValue;

while (std::getline(iss, keyValue, ';')) {
// Check if the substring starts with "spark."
if (keyValue.find("spark.") != 0) {
return false;
}
}

return true;
}

::hybridse::sdk::Status SQLClusterRouter::ParseNamesFromArgs(const std::string& db,
const std::vector<std::string>& args, std::string* db_name, std::string* name) {
if (args.size() == 1) {
Expand Down Expand Up @@ -4523,6 +4557,34 @@ bool SQLClusterRouter::CheckTableStatus(const std::string& db, const std::string
return check_succeed;
}

std::map<std::string, std::string> SQLClusterRouter::ParseSparkConfigString(const std::string& input) {
std::map<std::string, std::string> configMap;

std::istringstream iss(input);
std::string keyValue;

while (std::getline(iss, keyValue, ';')) {
// Split the key-value pair
size_t equalPos = keyValue.find('=');
if (equalPos != std::string::npos) {
std::string key = keyValue.substr(0, equalPos);
std::string value = keyValue.substr(equalPos + 1);

// Check if the key starts with "spark."
if (key.find("spark.") == 0) {
// Add to the map
configMap[key] = value;
} else {
std::cerr << "Error: Key does not start with 'spark.' - " << key << std::endl;
}
} else {
std::cerr << "Error: Invalid key-value pair - " << keyValue << std::endl;
}
}

return configMap;
}

void SQLClusterRouter::ReadSparkConfFromFile(std::string conf_file_path, std::map<std::string, std::string>* config) {
if (!conf_file_path.empty()) {
boost::property_tree::ptree pt;
Expand Down
6 changes: 6 additions & 0 deletions src/sdk/sql_cluster_router.h
Original file line number Diff line number Diff line change
Expand Up @@ -283,6 +283,12 @@ class SQLClusterRouter : public SQLRouter {
// get job timeout from the session variables, we will use the timeout when sending requests to the taskmanager
int GetJobTimeout();

std::string GetSparkConfig();

std::map<std::string, std::string> ParseSparkConfigString(const std::string& input);

bool CheckSparkConfigString(const std::string& input);

::openmldb::base::Status ExecuteOfflineQueryAsync(const std::string& sql,
const std::map<std::string, std::string>& config,
const std::string& default_db, int job_timeout,
Expand Down
Loading