From 0c9f9e8ec6ef7664603b2f5722b07ca173d15ef7 Mon Sep 17 00:00:00 2001 From: tobe Date: Wed, 29 Nov 2023 17:40:14 +0800 Subject: [PATCH] Pass sql as job info args --- .../openmldb/batch/api/OpenmldbSession.scala | 4 +++- .../taskmanager/OpenmldbBatchjobManager.scala | 20 +++++++++---------- .../taskmanager/k8s/K8sJobManager.scala | 8 +++++++- .../taskmanager/spark/SparkJobManager.scala | 9 ++++++++- 4 files changed, 28 insertions(+), 13 deletions(-) diff --git a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala index 9a3113b4c09..5aba17b7305 100755 --- a/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala +++ b/java/openmldb-batch/src/main/scala/com/_4paradigm/openmldb/batch/api/OpenmldbSession.scala @@ -174,6 +174,8 @@ class OpenmldbSession { * @return */ def openmldbSql(sqlText: String): OpenmldbDataframe = { + logger.info("Try to execute OpenMLDB SQL: " + sqlText) + if (config.enableSparksql) { return OpenmldbDataframe(this, sparksql(sqlText)) } @@ -323,7 +325,7 @@ class OpenmldbSession { } } catch { case e: Exception => { - logger.warn(s"Fail to register table $dbName.$tableName " + ExceptionUtils.getStackTrace(e)) + logger.warn(s"Fail to register table $dbName.$tableName, exception: " + e.getMessage) } } } diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala index 62e442f65a6..6d942b1eb9e 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/OpenmldbBatchjobManager.scala @@ -57,10 +57,10 @@ object OpenmldbBatchjobManager { if (TaskManagerConfig.isK8s) { val args = List(sql) - K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, + K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) } else { - SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, + SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb, blocking = true) } } @@ -73,11 +73,11 @@ object OpenmldbBatchjobManager { if (TaskManagerConfig.isK8s) { val args = List(sql) - K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, + K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) } else { val args = List(tempSqlFile.getAbsolutePath) - SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, + SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) } } @@ -90,11 +90,11 @@ object OpenmldbBatchjobManager { if (TaskManagerConfig.isK8s) { val args = List(sql) - K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, + K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) } else { val args = List(tempSqlFile.getAbsolutePath) - SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, + SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) } } @@ -107,11 +107,11 @@ object OpenmldbBatchjobManager { if (TaskManagerConfig.isK8s) { val args = List(sql) - K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, + K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) } else { val args = List(tempSqlFile.getAbsolutePath) - SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, + SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) } } @@ -124,11 +124,11 @@ object OpenmldbBatchjobManager { if (TaskManagerConfig.isK8s) { val args = List(sql) - K8sJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, + K8sJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) } else { val args = List(tempSqlFile.getAbsolutePath) - SparkJobManager.submitSparkJob(jobType, mainClass, args, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, + SparkJobManager.submitSparkJob(jobType, mainClass, args, sql, tempSqlFile.getAbsolutePath, sparkConf.asScala.toMap, defaultDb) } } diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala index b9985a263b0..0c459969003 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/k8s/K8sJobManager.scala @@ -44,12 +44,18 @@ object K8sJobManager { def submitSparkJob(jobType: String, mainClass: String, args: List[String] = List(), + sql: String = "", localSqlFile: String = "", sparkConf: Map[String, String] = Map(), defaultDb: String = "", blocking: Boolean = false): JobInfo = { - val jobInfo = JobInfoManager.createJobInfo(jobType, args, sparkConf) + val jobInfoArgs = if (sql.nonEmpty) { + List(sql) + } else { + args + } + val jobInfo = JobInfoManager.createJobInfo(jobType, jobInfoArgs, sparkConf) val jobName = getK8sJobName(jobInfo.getId) jobInfo.setApplicationId(jobName) diff --git a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala index bc8c5dfebbe..44fc619e536 100644 --- a/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala +++ b/java/openmldb-taskmanager/src/main/scala/com/_4paradigm/openmldb/taskmanager/spark/SparkJobManager.scala @@ -77,11 +77,18 @@ object SparkJobManager { def submitSparkJob(jobType: String, mainClass: String, args: List[String] = List(), + sql: String = "", localSqlFile: String = "", sparkConf: Map[String, String] = Map(), defaultDb: String = "", blocking: Boolean = false): JobInfo = { - val jobInfo = JobInfoManager.createJobInfo(jobType, args, sparkConf) + + val jobInfoArgs = if (sql.nonEmpty) { + List(sql) + } else { + args + } + val jobInfo = JobInfoManager.createJobInfo(jobType, jobInfoArgs, sparkConf) // Submit Spark application with SparkLauncher val launcher = createSparkLauncher(mainClass)