Skip to content

Commit

Permalink
build: Drop Spark 3.2 support (#581)
Browse files Browse the repository at this point in the history
* build: Drop Spark 3.2 support

* remove un-used import

* fix BloomFilterMightContain

* revert the changes for TimestampNTZType and PartitionIdPassthrough

* address comments and remove more 3.2 related code

* remove un-used import

* put back newDataSourceRDD

* remove un-used import and put back lazy val partitions

* address comments

* Trigger Build

* remove the missed 3.2 pipeline

* address comments
  • Loading branch information
huaxingao authored Jun 18, 2024
1 parent d808d36 commit d584229
Show file tree
Hide file tree
Showing 42 changed files with 87 additions and 468 deletions.
15 changes: 2 additions & 13 deletions .github/workflows/pr_build.yml
Original file line number Diff line number Diff line change
Expand Up @@ -109,15 +109,8 @@ jobs:
os: [ubuntu-latest]
java_version: [8, 11, 17]
test-target: [java]
spark-version: ['3.2', '3.3']
spark-version: ['3.3']
scala-version: ['2.12', '2.13']
exclude:
- java_version: 17
spark-version: '3.2'
- java_version: 11
spark-version: '3.2'
- spark-version: '3.2'
scala-version: '2.13'
fail-fast: false
name: ${{ matrix.os }}/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ matrix.test-target }}
runs-on: ${{ matrix.os }}
Expand Down Expand Up @@ -254,15 +247,11 @@ jobs:
matrix:
java_version: [8, 17]
test-target: [java]
spark-version: ['3.2', '3.3']
spark-version: ['3.3']
scala-version: ['2.12', '2.13']
exclude:
- java_version: 17
spark-version: '3.2'
- java_version: 8
spark-version: '3.3'
- spark-version: '3.2'
scala-version: '2.13'
fail-fast: false
name: macos-14(Silicon)/java ${{ matrix.java_version }}-spark-${{matrix.spark-version}}-scala-${{matrix.scala-version}}/${{ matrix.test-target }}
runs-on: macos-14
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,9 @@
package org.apache.comet.parquet

import org.apache.hadoop.conf.Configuration
import org.apache.spark.sql.comet.shims.ShimCometParquetUtils
import org.apache.spark.sql.internal.SQLConf

object CometParquetUtils extends ShimCometParquetUtils {
object CometParquetUtils {
private val PARQUET_FIELD_ID_WRITE_ENABLED = "spark.sql.parquet.fieldId.write.enabled"
private val PARQUET_FIELD_ID_READ_ENABLED = "spark.sql.parquet.fieldId.read.enabled"
private val IGNORE_MISSING_PARQUET_FIELD_ID = "spark.sql.parquet.fieldId.read.ignoreMissing"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,9 @@ import org.apache.parquet.schema._
import org.apache.parquet.schema.LogicalTypeAnnotation.ListLogicalTypeAnnotation
import org.apache.parquet.schema.Type.Repetition
import org.apache.spark.sql.errors.QueryExecutionErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.types._

import org.apache.comet.parquet.CometParquetUtils

/**
* This class is copied & slightly modified from [[ParquetReadSupport]] in Spark. Changes:
* - This doesn't extend from Parquet's `ReadSupport` class since that is used for row-based
Expand All @@ -53,7 +52,7 @@ object CometParquetReadSupport {
ignoreMissingIds: Boolean): MessageType = {
if (!ignoreMissingIds &&
!containsFieldIds(parquetSchema) &&
CometParquetUtils.hasFieldIds(catalystSchema)) {
ParquetUtils.hasFieldIds(catalystSchema)) {
throw new RuntimeException(
"Spark read schema expects field Ids, " +
"but Parquet file schema doesn't contain any field Ids.\n" +
Expand Down Expand Up @@ -334,14 +333,14 @@ object CometParquetReadSupport {
}

def matchIdField(f: StructField): Type = {
val fieldId = CometParquetUtils.getFieldId(f)
val fieldId = ParquetUtils.getFieldId(f)
idToParquetFieldMap
.get(fieldId)
.map { parquetTypes =>
if (parquetTypes.size > 1) {
// Need to fail if there is ambiguity, i.e. more than one field is matched
val parquetTypesString = parquetTypes.map(_.getName).mkString("[", ", ", "]")
throw CometParquetUtils.foundDuplicateFieldInFieldIdLookupModeError(
throw QueryExecutionErrors.foundDuplicateFieldInFieldIdLookupModeError(
fieldId,
parquetTypesString)
} else {
Expand All @@ -355,9 +354,9 @@ object CometParquetReadSupport {
}
}

val shouldMatchById = useFieldId && CometParquetUtils.hasFieldIds(structType)
val shouldMatchById = useFieldId && ParquetUtils.hasFieldIds(structType)
structType.map { f =>
if (shouldMatchById && CometParquetUtils.hasFieldId(f)) {
if (shouldMatchById && ParquetUtils.hasFieldId(f)) {
matchIdField(f)
} else if (caseSensitive) {
matchCaseSensitiveField(f)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName._
import org.apache.parquet.schema.Type.Repetition._
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.execution.datasources.parquet.ParquetSchemaConverter
import org.apache.spark.sql.execution.datasources.parquet.ParquetUtils
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._

Expand Down Expand Up @@ -66,8 +67,8 @@ class CometSparkToParquetSchemaConverter(
*/
def convertField(field: StructField): Type = {
val converted = convertField(field, if (field.nullable) OPTIONAL else REQUIRED)
if (useFieldId && CometParquetUtils.hasFieldId(field)) {
converted.withId(CometParquetUtils.getFieldId(field))
if (useFieldId && ParquetUtils.hasFieldId(field)) {
converted.withId(ParquetUtils.getFieldId(field))
} else {
converted
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import org.apache.spark.sql.execution.datasources.PartitionedFile

object ShimBatchReader {

// TODO: remove after dropping Spark 3.2 & 3.3 support and directly call PartitionedFile
// TODO: remove after dropping Spark 3.3 support and directly call PartitionedFile
def newPartitionedFile(partitionValues: InternalRow, file: String): PartitionedFile =
classOf[PartitionedFile].getDeclaredConstructors
.map(c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,15 +21,12 @@ package org.apache.comet.shims

object ShimFileFormat {

// TODO: remove after dropping Spark 3.2 & 3.3 support and directly use FileFormat.ROW_INDEX
// TODO: remove after dropping Spark 3.3 support and directly use FileFormat.ROW_INDEX
val ROW_INDEX = "row_index"

// A name for a temporary column that holds row indexes computed by the file format reader
// until they can be placed in the _metadata struct.
// TODO: remove after dropping Spark 3.2 & 3.3 support and directly use
// TODO: remove after dropping Spark 3.3 support and directly use
// FileFormat.ROW_INDEX_TEMPORARY_COLUMN_NAME
val ROW_INDEX_TEMPORARY_COLUMN_NAME: String = s"_tmp_metadata_$ROW_INDEX"

// TODO: remove after dropping Spark 3.2 support and use FileFormat.OPTION_RETURNING_BATCH
val OPTION_RETURNING_BATCH = "returning_batch"
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ import scala.util.Try
import org.apache.spark.sql.types.{StructField, StructType}

object ShimResolveDefaultColumns {
// TODO: remove after dropping Spark 3.2 & 3.3 support and directly use ResolveDefaultColumns
// TODO: remove after dropping Spark 3.3 support and directly use ResolveDefaultColumns
def getExistenceDefaultValue(field: StructField): Any =
Try {
// scalastyle:off classforname
Expand Down

This file was deleted.

This file was deleted.

2 changes: 1 addition & 1 deletion core/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ pub enum CometError {
Internal(String),

// Note that this message format is based on Spark 3.4 and is more detailed than the message
// returned by Spark 3.2 or 3.3
// returned by Spark 3.3
#[error("[CAST_INVALID_INPUT] The value '{value}' of the type \"{from_type}\" cannot be cast to \"{to_type}\" \
because it is malformed. Correct the value as per the syntax, or change its target type. \
Use `try_cast` to tolerate malformed input and return NULL instead. If necessary \
Expand Down
9 changes: 3 additions & 6 deletions docs/source/contributor-guide/adding_a_new_expression.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ The `QueryPlanSerde` object has a method `exprToProto`, which is responsible for
For example, the `unhex` function looks like this:

```scala
case e: Unhex if !isSpark32 =>
case e: Unhex =>
val unHex = unhexSerde(e)

val childExpr = exprToProtoInternal(unHex._1, inputs)
Expand All @@ -59,7 +59,6 @@ case e: Unhex if !isSpark32 =>

A few things to note here:

* The `isSpark32` check is used to fall back to Spark's implementation of `unhex` in Spark 3.2. This is somewhat context specific, because in this case, due to a bug in Spark 3.2 for `unhex`, we want to use the Spark implementation and not a Comet implementation that would behave differently if correct.
* The function is recursively called on child expressions, so you'll need to make sure that the child expressions are also converted to protobuf.
* `scalarExprToProtoWithReturnType` is for scalar functions that need return type information. Your expression may use a different method depending on the type of expression.

Expand All @@ -71,8 +70,6 @@ For example, this is the test case for the `unhex` expression:

```scala
test("unhex") {
assume(!isSpark32, "unhex function has incorrect behavior in 3.2") // used to skip the test in Spark 3.2

val table = "unhex_table"
withTable(table) {
sql(s"create table $table(col string) using parquet")
Expand Down Expand Up @@ -172,11 +169,11 @@ pub(super) fn spark_unhex(args: &[ColumnarValue]) -> Result<ColumnarValue, DataF
If the expression you're adding has different behavior across different Spark versions, you'll need to account for that in your implementation. There are two tools at your disposal to help with this:

1. Shims that exist in `spark/src/main/spark-$SPARK_VERSION/org/apache/comet/shims/CometExprShim.scala` for each Spark version. These shims are used to provide compatibility between different Spark versions.
2. Variables that correspond to the Spark version, such as `isSpark32`, which can be used to conditionally execute code based on the Spark version.
2. Variables that correspond to the Spark version, such as `isSpark33Plus`, which can be used to conditionally execute code based on the Spark version.

## Shimming to Support Different Spark Versions

By adding shims for each Spark version, you can provide a consistent interface for the expression across different Spark versions. For example, `unhex` added a new optional parameter is Spark 3.4, for if it should `failOnError` or not. So for version 3.2 and 3.3, the shim is:
By adding shims for each Spark version, you can provide a consistent interface for the expression across different Spark versions. For example, `unhex` added a new optional parameter is Spark 3.4, for if it should `failOnError` or not. So for version 3.3, the shim is:

```scala
trait CometExprShim {
Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/installation.md
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Make sure the following requirements are met and software installed on your mach

## Requirements

- Apache Spark 3.2, 3.3, or 3.4
- Apache Spark 3.3, or 3.4
- JDK 8 and up
- GLIBC 2.17 (Centos 7) and up

Expand Down
2 changes: 1 addition & 1 deletion docs/source/user-guide/overview.md
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ The following diagram illustrates the architecture of Comet:

## Current Status

The project is currently integrated into Apache Spark 3.2, 3.3, and 3.4.
The project is currently integrated into Apache Spark 3.3, and 3.4.

## Feature Parity with Apache Spark

Expand Down
14 changes: 0 additions & 14 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -517,20 +517,6 @@ under the License.
</properties>
</profile>

<profile>
<id>spark-3.2</id>
<properties>
<scala.version>2.12.15</scala.version>
<spark.version>3.2.2</spark.version>
<spark.version.short>3.2</spark.version.short>
<parquet.version>1.12.0</parquet.version>
<!-- we don't add special test suits for spark-3.2, so a not existed dir is specified-->
<additional.3_3.test.source>not-needed-yet</additional.3_3.test.source>
<additional.3_4.test.source>not-needed-yet</additional.3_4.test.source>
<shims.minorVerSrc>spark-3.2</shims.minorVerSrc>
</properties>
</profile>

<profile>
<id>spark-3.3</id>
<properties>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ class CometSparkSessionExtensions
isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readDataSchema) &&
isSchemaSupported(scanExec.scan.asInstanceOf[ParquetScan].readPartitionSchema) &&
// Comet does not support pushedAggregate
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isEmpty =>
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isEmpty =>
val cometScan = CometParquetScan(scanExec.scan.asInstanceOf[ParquetScan])
logInfo("Comet extension enabled for Scan")
CometBatchScanExec(
Expand All @@ -116,7 +116,7 @@ class CometSparkSessionExtensions
s"Partition schema $readPartitionSchema is not supported")
// Comet does not support pushedAggregate
val info3 = createMessage(
getPushedAggregate(scanExec.scan.asInstanceOf[ParquetScan]).isDefined,
scanExec.scan.asInstanceOf[ParquetScan].pushedAggregate.isDefined,
"Comet does not support pushed aggregate")
withInfos(scanExec, Seq(info1, info2, info3).flatten.toSet)
scanExec
Expand Down Expand Up @@ -992,8 +992,7 @@ object CometSparkSessionExtensions extends Logging {
case BooleanType | ByteType | ShortType | IntegerType | LongType | FloatType | DoubleType |
BinaryType | StringType | _: DecimalType | DateType | TimestampType =>
true
// `TimestampNTZType` is private in Spark 3.2.
case t: DataType if t.typeName == "timestamp_ntz" && !isSpark32 => true
case t: DataType if t.typeName == "timestamp_ntz" => true
case dt =>
logInfo(s"Comet extension is disabled because data type $dt is not supported")
false
Expand All @@ -1015,11 +1014,6 @@ object CometSparkSessionExtensions extends Logging {
}
}

/** Used for operations that weren't available in Spark 3.2 */
def isSpark32: Boolean = {
org.apache.spark.SPARK_VERSION.matches("3\\.2\\..*")
}

def isSpark33Plus: Boolean = {
org.apache.spark.SPARK_VERSION >= "3.3"
}
Expand Down
Loading

0 comments on commit d584229

Please sign in to comment.