diff --git a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala index 0ccfd4912ec..f56aa977b65 100644 --- a/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala +++ b/extensions/spark/kyuubi-spark-connector-hive/src/main/scala/org/apache/kyuubi/spark/connector/hive/HiveConnectorUtils.scala @@ -19,15 +19,16 @@ package org.apache.kyuubi.spark.connector.hive import java.lang.{Boolean => JBoolean, Long => JLong} +import scala.util.Try + import org.apache.hadoop.fs.{FileStatus, Path} import org.apache.hadoop.hive.ql.plan.{FileSinkDesc, TableDesc} -import org.apache.spark.SPARK_VERSION import org.apache.spark.internal.Logging import org.apache.spark.sql.SparkSession import org.apache.spark.sql.catalyst.InternalRow import org.apache.spark.sql.catalyst.catalog.{CatalogTable, CatalogTablePartition} import org.apache.spark.sql.connector.catalog.TableChange -import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, After, ColumnPosition, DeleteColumn, First, RenameColumn, UpdateColumnComment, UpdateColumnNullability, UpdateColumnPosition, UpdateColumnType} +import org.apache.spark.sql.connector.catalog.TableChange._ import org.apache.spark.sql.execution.command.CommandUtils import org.apache.spark.sql.execution.command.CommandUtils.{calculateMultipleLocationSizes, calculateSingleLocationSize} import org.apache.spark.sql.execution.datasources.{PartitionDirectory, PartitionedFile} @@ -35,20 +36,18 @@ import org.apache.spark.sql.hive.execution.HiveFileFormat import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.types.{ArrayType, MapType, StructField, StructType} -import org.apache.kyuubi.spark.connector.common.SparkUtils.SPARK_RUNTIME_VERSION import org.apache.kyuubi.util.reflect.{DynClasses, DynConstructors, DynMethods} import org.apache.kyuubi.util.reflect.ReflectUtils.invokeAs object HiveConnectorUtils extends Logging { - // SPARK-43186 - def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = { - if (SPARK_RUNTIME_VERSION >= "3.5") { + def getHiveFileFormat(fileSinkConf: FileSinkDesc): HiveFileFormat = + Try { // SPARK-43186: 3.5.0 DynConstructors.builder() .impl(classOf[HiveFileFormat], classOf[FileSinkDesc]) .build[HiveFileFormat]() .newInstance(fileSinkConf) - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => val shimFileSinkDescClz = DynClasses.builder() .impl("org.apache.spark.sql.hive.HiveShim$ShimFileSinkDesc") .build() @@ -67,34 +66,26 @@ object HiveConnectorUtils extends Logging { .impl(classOf[HiveFileFormat], shimFileSinkDescClz) .build[HiveFileFormat]() .newInstance(shimFileSinkDesc) - } else { - throw unsupportedSparkVersion() - } - } + }.get - // SPARK-41970 - def partitionedFilePath(file: PartitionedFile): String = { - if (SPARK_RUNTIME_VERSION >= "3.4") { + def partitionedFilePath(file: PartitionedFile): String = + Try { // SPARK-41970: 3.4.0 invokeAs[String](file, "urlEncodedPath") - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => invokeAs[String](file, "filePath") - } else { - throw unsupportedSparkVersion() - } - } + }.get def splitFiles( sparkSession: SparkSession, file: AnyRef, filePath: Path, - isSplitable: Boolean, - maxSplitBytes: Long, - partitionValues: InternalRow): Seq[PartitionedFile] = { - - if (SPARK_RUNTIME_VERSION >= "4.0") { // SPARK-42821 + isSplitable: JBoolean, + maxSplitBytes: JLong, + partitionValues: InternalRow): Seq[PartitionedFile] = + Try { // SPARK-42821: 4.0.0-preview2 val fileStatusWithMetadataClz = DynClasses.builder() .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") - .build() + .buildChecked() DynMethods .builder("splitFiles") .impl( @@ -103,35 +94,58 @@ object HiveConnectorUtils extends Logging { classOf[Boolean], classOf[Long], classOf[InternalRow]) - .build() - .invoke[Seq[PartitionedFile]]( + .buildChecked() + .invokeChecked[Seq[PartitionedFile]]( null, file, - isSplitable.asInstanceOf[JBoolean], - maxSplitBytes.asInstanceOf[JLong], + isSplitable, + maxSplitBytes, partitionValues) - } else if (SPARK_RUNTIME_VERSION >= "3.5") { // SPARK-43039 + }.recover { case _: Exception => // SPARK-51185: Spark 3.5.5 val fileStatusWithMetadataClz = DynClasses.builder() .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") - .build() + .buildChecked() DynMethods .builder("splitFiles") .impl( "org.apache.spark.sql.execution.PartitionedFileUtil", classOf[SparkSession], fileStatusWithMetadataClz, + classOf[Path], classOf[Boolean], classOf[Long], classOf[InternalRow]) - .build() - .invoke[Seq[PartitionedFile]]( + .buildChecked() + .invokeChecked[Seq[PartitionedFile]]( + null, + sparkSession, + file, + filePath, + isSplitable, + maxSplitBytes, + partitionValues) + }.recover { case _: Exception => // SPARK-43039: 3.5.0 + val fileStatusWithMetadataClz = DynClasses.builder() + .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") + .buildChecked() + DynMethods + .builder("splitFiles") + .impl( + "org.apache.spark.sql.execution.PartitionedFileUtil", + classOf[SparkSession], + fileStatusWithMetadataClz, + classOf[Boolean], + classOf[Long], + classOf[InternalRow]) + .buildChecked() + .invokeChecked[Seq[PartitionedFile]]( null, sparkSession, file, - isSplitable.asInstanceOf[JBoolean], - maxSplitBytes.asInstanceOf[JLong], + isSplitable, + maxSplitBytes, partitionValues) - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => DynMethods .builder("splitFiles") .impl( @@ -142,55 +156,41 @@ object HiveConnectorUtils extends Logging { classOf[Boolean], classOf[Long], classOf[InternalRow]) - .build() - .invoke[Seq[PartitionedFile]]( + .buildChecked() + .invokeChecked[Seq[PartitionedFile]]( null, sparkSession, file, filePath, - isSplitable.asInstanceOf[JBoolean], - maxSplitBytes.asInstanceOf[JLong], + isSplitable, + maxSplitBytes, partitionValues) - } else { - throw unsupportedSparkVersion() - } - } + }.get - def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory = { - if (SPARK_RUNTIME_VERSION >= "3.5") { + def createPartitionDirectory(values: InternalRow, files: Seq[FileStatus]): PartitionDirectory = + Try { // SPARK-43039: 3.5.0 new DynMethods.Builder("apply") .impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Array[FileStatus]]) .buildChecked() .asStatic() .invoke[PartitionDirectory](values, files.toArray) - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => new DynMethods.Builder("apply") .impl(classOf[PartitionDirectory], classOf[InternalRow], classOf[Seq[FileStatus]]) .buildChecked() .asStatic() .invoke[PartitionDirectory](values, files) - } else { - throw unsupportedSparkVersion() - } - } + }.get - def getPartitionFilePath(file: AnyRef): Path = { - if (SPARK_RUNTIME_VERSION >= "3.5") { + def getPartitionFilePath(file: AnyRef): Path = + Try { // SPARK-43039: 3.5.0 new DynMethods.Builder("getPath") .impl("org.apache.spark.sql.execution.datasources.FileStatusWithMetadata") .build() .invoke[Path](file) - } else if (SPARK_RUNTIME_VERSION >= "3.3") { + }.recover { case _: Exception => file.asInstanceOf[FileStatus].getPath - } else { - throw unsupportedSparkVersion() - } - } - - private def unsupportedSparkVersion(): KyuubiHiveConnectorException = { - KyuubiHiveConnectorException(s"Spark version $SPARK_VERSION " + - "is not supported by Kyuubi spark hive connector.") - } + }.get def calculateTotalSize( spark: SparkSession, diff --git a/pom.xml b/pom.xml index 2bd3e4d93bc..45784f15d12 100644 --- a/pom.xml +++ b/pom.xml @@ -200,7 +200,7 @@ DO NOT forget to change the following properties when change the minor version of Spark: `delta.version`, `delta.artifact`, `maven.plugin.scalatest.exclude.tags` --> - 3.5.4 + 3.5.5 3.5 spark-${spark.version}-bin-hadoop3${spark.archive.scala.suffix}.tgz @@ -1287,6 +1287,18 @@ Maven Repository https://repo.maven.apache.org/maven2 + + + + true + + + false + + staging + Spark 3.5.5 Staging Repository + https://repository.apache.org/content/repositories/orgapachespark-1476/ + @@ -2044,11 +2056,12 @@ extensions/spark/kyuubi-spark-connector-hive - 3.5.4 + 3.5.5 3.5 3.3.0 delta-spark_${scala.binary.version} org.scalatest.tags.Slow + https://dist.apache.org/repos/dist/dev/spark/v3.5.5-rc1-bin