diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
index 0ccfd4912ec..f56aa977b65 100644
--- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
+++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala
@@ -19,15 +19,16 @@ package org.apache.kyuubi.spark.connector.hive
import java.lang.{Boolean => JBoolean, Long => JLong}
+import scala.util.Try
+
import org.apache.hadoop.fs.{FileStatus, Path}
import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc}
-import org.apache.spark.SPARK_VERSION
import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition}
import org.apache.spark.sql.connector.catalog.TableChange
-import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType}
+import org.apache.spark.sql.connector.catalog.TableChange._
import org.apache.spark.sql.execution.command.CommandUtils
import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize}
import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile}
@@ -35,20 +36,18 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType}
-import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION
import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods}
import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs
object HiveConnectorUtils extends Logging {
- // SPARK-43186
- def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat =
+ Try { // SPARK-43186: 3.5.0
DynConstructors.builder()
.impl(classOf[HiveFileFormat], classOf[FileSinkDesc])
.build[HiveFileFormat]()
.newInstance(fileSinkConf)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
val shimFileSinkDescClz = DynClasses.builder()
.impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc")
.build()
@@ -67,34 +66,26 @@ object HiveConnectorUtils extends Logging {
.impl(classOf[HiveFileFormat], shimFileSinkDescClz)
.build[HiveFileFormat]()
.newInstance(shimFileSinkDesc)
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
- // SPARK-41970
- def partitionedFilePath(file: PartitionedFile): String = {
- if (SPARK_RUNTIME_VERSION >= "3.4") {
+ def partitionedFilePath(file: PartitionedFile): String =
+ Try { // SPARK-41970: 3.4.0
invokeAs[String](file, "urlEncodedPath")
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
invokeAs[String](file, "filePath")
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
def splitFiles(
sparkSession: SparkSession,
file: AnyRef,
filePath: Path,
- isSplitable: Boolean,
- maxSplitBytes: Long,
- partitionValues: InternalRow): Seq[PartitionedFile] = {
-
- if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821
+ isSplitable: JBoolean,
+ maxSplitBytes: JLong,
+ partitionValues: InternalRow): Seq[PartitionedFile] =
+ Try { // SPARK-42821: 4.0.0-preview2
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
- .build()
+ .buildChecked()
DynMethods
.builder("splitFiles")
.impl(
@@ -103,35 +94,58 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
- .build()
- .invoke[Seq[PartitionedFile]](
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
null,
file,
- isSplitable.asInstanceOf[JBoolean],
- maxSplitBytes.asInstanceOf[JLong],
+ isSplitable,
+ maxSplitBytes,
partitionValues)
- } else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039
+ }.recover { case _: Exception => // SPARK-51185: Spark 3.5.5
val fileStatusWithMetadataClz = DynClasses.builder()
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
- .build()
+ .buildChecked()
DynMethods
.builder("splitFiles")
.impl(
"org.apache.spark.sql.execution.PartitionedFileUtil",
classOf[SparkSession],
fileStatusWithMetadataClz,
+ classOf[Path],
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
- .build()
- .invoke[Seq[PartitionedFile]](
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
+ null,
+ sparkSession,
+ file,
+ filePath,
+ isSplitable,
+ maxSplitBytes,
+ partitionValues)
+ }.recover { case _: Exception => // SPARK-43039: 3.5.0
+ val fileStatusWithMetadataClz = DynClasses.builder()
+ .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
+ .buildChecked()
+ DynMethods
+ .builder("splitFiles")
+ .impl(
+ "org.apache.spark.sql.execution.PartitionedFileUtil",
+ classOf[SparkSession],
+ fileStatusWithMetadataClz,
+ classOf[Boolean],
+ classOf[Long],
+ classOf[InternalRow])
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
null,
sparkSession,
file,
- isSplitable.asInstanceOf[JBoolean],
- maxSplitBytes.asInstanceOf[JLong],
+ isSplitable,
+ maxSplitBytes,
partitionValues)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
DynMethods
.builder("splitFiles")
.impl(
@@ -142,55 +156,41 @@ object HiveConnectorUtils extends Logging {
classOf[Boolean],
classOf[Long],
classOf[InternalRow])
- .build()
- .invoke[Seq[PartitionedFile]](
+ .buildChecked()
+ .invokeChecked[Seq[PartitionedFile]](
null,
sparkSession,
file,
filePath,
- isSplitable.asInstanceOf[JBoolean],
- maxSplitBytes.asInstanceOf[JLong],
+ isSplitable,
+ maxSplitBytes,
partitionValues)
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
- def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory =
+ Try { // SPARK-43039: 3.5.0
new DynMethods.Builder("apply")
.impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Array[FileStatus]])
.buildChecked()
.asStatic()
.invoke[PartitionDirectory](values, files.toArray)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
new DynMethods.Builder("apply")
.impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Seq[FileStatus]])
.buildChecked()
.asStatic()
.invoke[PartitionDirectory](values, files)
- } else {
- throw unsupportedSparkVersion()
- }
- }
+ }.get
- def getPartitionFilePath(file: AnyRef): Path = {
- if (SPARK_RUNTIME_VERSION >= "3.5") {
+ def getPartitionFilePath(file: AnyRef): Path =
+ Try { // SPARK-43039: 3.5.0
new DynMethods.Builder("getPath")
.impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata")
.build()
.invoke[Path](file)
- } else if (SPARK_RUNTIME_VERSION >= "3.3") {
+ }.recover { case _: Exception =>
file.asInstanceOf[FileStatus].getPath
- } else {
- throw unsupportedSparkVersion()
- }
- }
-
- private def unsupportedSparkVersion(): KyuubiHiveConnectorException = {
- KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " +
- "is not supported by Kyuubi spark hive connector.")
- }
+ }.get
def calculateTotalSize(
spark: SparkSession,
diff --git a/pom.xml b/pom.xml
index 2bd3e4d93bc..45784f15d12 100644
--- a/pom.xml
+++ b/pom.xml
@@ -200,7 +200,7 @@
DO NOT forget to change the following properties when change the minor version of Spark:
`delta.version`, `delta.artifact`, `maven.plugin.scalatest.exclude.tags`
-->
- 3.5.4
+ 3.5.5
3.5
spark-${spark.version}-bin-hadoop3${spark.archive.scala.suffix}.tgz
@@ -1287,6 +1287,18 @@
Maven Repository
https://repo.maven.apache.org/maven2
+
+
+
+ true
+
+
+ false
+
+ staging
+ Spark 3.5.5 Staging Repository
+ https://repository.apache.org/content/repositories/orgapachespark-1476/
+
@@ -2044,11 +2056,12 @@
extensions/spark/kyuubi-spark-connector-hive
- 3.5.4
+ 3.5.5
3.5
3.3.0
delta-spark_${scala.binary.version}
org.scalatest.tags.Slow
+ https://dist.apache.org/repos/dist/dev/spark/v3.5.5-rc1-bin