Skip to content

Commit

Permalink
Pass sql as job info args
Browse files Browse the repository at this point in the history
  • Loading branch information
tobegit3hub committed Nov 29, 2023
1 parent 71cf61e commit 0c9f9e8
Show file tree
Hide file tree
Showing 4 changed files with 28 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -174,6 +174,8 @@ class OpenmldbSession {
* @return
*/
def openmldbSql(sqlText: String): OpenmldbDataframe = {
logger.info("Try to execute OpenMLDB SQL: " + sqlText)

if (config.enableSparksql) {
return OpenmldbDataframe(this, sparksql(sqlText))
}
Expand Down Expand Up @@ -323,7 +325,7 @@ class OpenmldbSession {
}
} catch {
case e: Exception => {
logger.warn(s"Fail to register table $dbName.$tableName " + ExceptionUtils.getStackTrace(e))
logger.warn(s"Fail to register table $dbName.$tableName, exception: " + e.getMessage)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,10 +57,10 @@ object OpenmldbBatchjobManager {

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
defaultDb)
} else {
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath,
sparkConf.asScala.toMap, defaultDb, blocking = true)
}
}
Expand All @@ -73,11 +73,11 @@ object OpenmldbBatchjobManager {

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
defaultDb)
} else {
val args = List(tempSqlFile.getAbsolutePath)
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
defaultDb)
}
}
Expand All @@ -90,11 +90,11 @@ object OpenmldbBatchjobManager {

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
defaultDb)
} else {
val args = List(tempSqlFile.getAbsolutePath)
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
defaultDb)
}
}
Expand All @@ -107,11 +107,11 @@ object OpenmldbBatchjobManager {

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
defaultDb)
} else {
val args = List(tempSqlFile.getAbsolutePath)
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
defaultDb)
}
}
Expand All @@ -124,11 +124,11 @@ object OpenmldbBatchjobManager {

if (TaskManagerConfig.isK8s) {
val args = List(sql)
K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
defaultDb)
} else {
val args = List(tempSqlFile.getAbsolutePath)
SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap,
defaultDb)
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,18 @@ object K8sJobManager {

def submitSparkJob(jobType: String, mainClass: String,
args: List[String] = List(),
sql: String = "",
localSqlFile: String = "",
sparkConf: Map[String, String] = Map(),
defaultDb: String = "",
blocking: Boolean = false): JobInfo = {

val jobInfo = JobInfoManager.createJobInfo(jobType, args, sparkConf)
val jobInfoArgs = if (sql.nonEmpty) {
List(sql)
} else {
args
}
val jobInfo = JobInfoManager.createJobInfo(jobType, jobInfoArgs, sparkConf)

val jobName = getK8sJobName(jobInfo.getId)
jobInfo.setApplicationId(jobName)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,11 +77,18 @@ object SparkJobManager {

def submitSparkJob(jobType: String, mainClass: String,
args: List[String] = List(),
sql: String = "",
localSqlFile: String = "",
sparkConf: Map[String, String] = Map(),
defaultDb: String = "",
blocking: Boolean = false): JobInfo = {
val jobInfo = JobInfoManager.createJobInfo(jobType, args, sparkConf)

val jobInfoArgs = if (sql.nonEmpty) {
List(sql)
} else {
args
}
val jobInfo = JobInfoManager.createJobInfo(jobType, jobInfoArgs, sparkConf)

// Submit Spark application with SparkLauncher
val launcher = createSparkLauncher(mainClass)
Expand Down

0 comments on commit 0c9f9e8

Please sign in to comment.