Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Spark: 4.0 snapshot support #11583

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/workflows/java-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
jvm: [11, 17, 21]
jvm: [17, 21]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
Expand All @@ -108,7 +108,7 @@ jobs:
runs-on: ubuntu-22.04
strategy:
matrix:
jvm: [11, 17, 21]
jvm: [17, 21]
steps:
- uses: actions/checkout@v4
- uses: actions/setup-java@v4
Expand Down
4 changes: 2 additions & 2 deletions .github/workflows/jmh-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,8 @@ on:
description: 'The branch name'
required: true
spark_version:
description: 'The spark project version to use, such as iceberg-spark-3.5'
default: 'iceberg-spark-3.5'
description: 'The spark project version to use, such as iceberg-spark-4.0'
default: 'iceberg-spark-4.0'
required: true
benchmarks:
description: 'A list of comma-separated double-quoted Benchmark names, such as "IcebergSourceFlatParquetDataReadBenchmark", "IcebergSourceFlatParquetDataFilterBenchmark"'
Expand Down
2 changes: 1 addition & 1 deletion .github/workflows/publish-snapshot.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,4 +41,4 @@ jobs:
- run: |
./gradlew printVersion
./gradlew -DallModules publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
./gradlew -DflinkVersions= -DsparkVersions=3.3,3.4,3.5 -DscalaVersion=2.13 -DkafkaVersions=3 -DhiveVersions= publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
./gradlew -DflinkVersions= -DsparkVersions=3.3,3.4,3.5,4.0 -DscalaVersion=2.13 -DkafkaVersions=3 -DhiveVersions= publishApachePublicationToMavenRepository -PmavenUser=${{ secrets.NEXUS_USER }} -PmavenPassword=${{ secrets.NEXUS_PW }}
2 changes: 1 addition & 1 deletion .github/workflows/recurring-jmh-benchmarks.yml
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ jobs:
"IcebergSourceNestedParquetDataReadBenchmark", "IcebergSourceNestedParquetDataWriteBenchmark",
"IcebergSourceParquetEqDeleteBenchmark", "IcebergSourceParquetMultiDeleteFileBenchmark",
"IcebergSourceParquetPosDeleteBenchmark", "IcebergSourceParquetWithUnrelatedDeleteBenchmark"]
spark_version: ['iceberg-spark-3.5']
spark_version: ['iceberg-spark-4.0']
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
8 changes: 6 additions & 2 deletions .github/workflows/spark-ci.yml
Original file line number Diff line number Diff line change
Expand Up @@ -73,15 +73,19 @@ jobs:
strategy:
matrix:
jvm: [11, 17, 21]
spark: ['3.3', '3.4', '3.5']
scala: ['2.12', '2.13']
spark: [ '3.3', '3.4', '3.5', '4.0' ]
scala: [ '2.12', '2.13' ]
exclude:
# Spark 3.5 is the first version not failing on Java 21 (https://issues.apache.org/jira/browse/SPARK-42369)
# Full Java 21 support is coming in Spark 4 (https://issues.apache.org/jira/browse/SPARK-43831)
- jvm: 11
spark: '4.0'
- jvm: 21
spark: '3.3'
- jvm: 21
spark: '3.4'
- spark: '4.0'
scala: '2.12'
env:
SPARK_LOCAL_IP: localhost
steps:
Expand Down
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ spark/v3.4/spark/benchmark/*
spark/v3.4/spark-extensions/benchmark/*
spark/v3.5/spark/benchmark/*
spark/v3.5/spark-extensions/benchmark/*
spark/v4.0/spark/benchmark/*
spark/v4.0/spark-extensions/benchmark/*
*/benchmark/*

__pycache__/
Expand Down
3 changes: 3 additions & 0 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -119,6 +119,9 @@ allprojects {
repositories {
mavenCentral()
mavenLocal()
maven {
url "https://repository.apache.org/content/repositories/snapshots/"
}
}
}

Expand Down
4 changes: 2 additions & 2 deletions gradle.properties
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,8 @@ systemProp.defaultFlinkVersions=1.20
systemProp.knownFlinkVersions=1.18,1.19,1.20
systemProp.defaultHiveVersions=2
systemProp.knownHiveVersions=2,3
systemProp.defaultSparkVersions=3.5
systemProp.knownSparkVersions=3.3,3.4,3.5
systemProp.defaultSparkVersions=4.0
systemProp.knownSparkVersions=3.3,3.4,3.5,4.0
systemProp.defaultKafkaVersions=3
systemProp.knownKafkaVersions=3
systemProp.defaultScalaVersion=2.12
Expand Down
6 changes: 6 additions & 0 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
activation = "1.1.1"
aliyun-sdk-oss = "3.10.2"
antlr = "4.9.3"
antlr413 = "4.13.1" # For Spark 4.0 support
aircompressor = "0.27"
apiguardian = "1.1.2"
arrow = "15.0.2"
Expand All @@ -48,6 +49,7 @@ google-libraries-bom = "26.50.0"
guava = "33.3.1-jre"
hadoop2 = "2.7.3"
hadoop3 = "3.4.1"
hadoop34 = "3.4.0" # For Spark 4.0 support
httpcomponents-httpclient5 = "5.4.1"
hive2 = { strictly = "2.3.9"} # see rich version usage explanation above
hive3 = "3.1.3"
Expand Down Expand Up @@ -83,6 +85,7 @@ snowflake-jdbc = "3.20.0"
spark-hive33 = "3.3.4"
spark-hive34 = "3.4.4"
spark-hive35 = "3.5.2"
spark-hive40 = "4.0.0-SNAPSHOT"
sqlite-jdbc = "3.47.0.0"
testcontainers = "1.20.3"
tez010 = "0.10.4"
Expand All @@ -94,6 +97,8 @@ aircompressor = { module = "io.airlift:aircompressor", version.ref = "aircompres
aliyun-sdk-oss = { module = "com.aliyun.oss:aliyun-sdk-oss", version.ref = "aliyun-sdk-oss" }
antlr-antlr4 = { module = "org.antlr:antlr4", version.ref = "antlr" }
antlr-runtime = { module = "org.antlr:antlr4-runtime", version.ref = "antlr" }
antlr-antlr413 = { module = "org.antlr:antlr4", version.ref = "antlr413" }
antlr-runtime413 = { module = "org.antlr:antlr4-runtime", version.ref = "antlr413" }
arrow-memory-netty = { module = "org.apache.arrow:arrow-memory-netty", version.ref = "arrow" }
arrow-vector = { module = "org.apache.arrow:arrow-vector", version.ref = "arrow" }
avro-avro = { module = "org.apache.avro:avro", version.ref = "avro" }
Expand Down Expand Up @@ -135,6 +140,7 @@ hadoop2-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-c
hadoop2-minicluster = { module = "org.apache.hadoop:hadoop-minicluster", version.ref = "hadoop2" }
hadoop3-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop3" }
hadoop3-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop3" }
hadoop34-minicluster = { module = "org.apache.hadoop:hadoop-minicluster", version.ref = "hadoop34" }
hive2-exec = { module = "org.apache.hive:hive-exec", version.ref = "hive2" }
hive2-metastore = { module = "org.apache.hive:hive-metastore", version.ref = "hive2" }
hive2-serde = { module = "org.apache.hive:hive-serde", version.ref = "hive2" }
Expand Down
5 changes: 5 additions & 0 deletions jmh.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,11 @@ if (sparkVersions.contains("3.5")) {
jmhProjects.add(project(":iceberg-spark:iceberg-spark-extensions-3.5_${scalaVersion}"))
}

if (sparkVersions.contains("4.0")) {
jmhProjects.add(project(":iceberg-spark:iceberg-spark-4.0_2.13"))
jmhProjects.add(project(":iceberg-spark:iceberg-spark-extensions-4.0_2.13"))
}

configure(jmhProjects) {
apply plugin: 'me.champeau.jmh'
apply plugin: 'io.morethan.jmhreport'
Expand Down
12 changes: 12 additions & 0 deletions settings.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,18 @@ if (sparkVersions.contains("3.5")) {
project(":iceberg-spark:spark-runtime-3.5_${scalaVersion}").name = "iceberg-spark-runtime-3.5_${scalaVersion}"
}

if (sparkVersions.contains("4.0")) {
include ":iceberg-spark:spark-4.0_2.13"
include ":iceberg-spark:spark-extensions-4.0_2.13"
include ":iceberg-spark:spark-runtime-4.0_2.13"
project(":iceberg-spark:spark-4.0_2.13").projectDir = file('spark/v4.0/spark')
project(":iceberg-spark:spark-4.0_2.13").name = "iceberg-spark-4.0_2.13"
project(":iceberg-spark:spark-extensions-4.0_2.13").projectDir = file('spark/v4.0/spark-extensions')
project(":iceberg-spark:spark-extensions-4.0_2.13").name = "iceberg-spark-extensions-4.0_2.13"
project(":iceberg-spark:spark-runtime-4.0_2.13").projectDir = file('spark/v4.0/spark-runtime')
project(":iceberg-spark:spark-runtime-4.0_2.13").name = "iceberg-spark-runtime-4.0_2.13"
}

// hive 3 depends on hive 2, so always add hive 2 if hive3 is enabled
if (hiveVersions.contains("2") || hiveVersions.contains("3")) {
include 'mr'
Expand Down
5 changes: 5 additions & 0 deletions spark/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -31,3 +31,8 @@ if (sparkVersions.contains("3.4")) {
if (sparkVersions.contains("3.5")) {
apply from: file("$projectDir/v3.5/build.gradle")
}


if (sparkVersions.contains("4.0")) {
apply from: file("$projectDir/v4.0/build.gradle")
}
21 changes: 13 additions & 8 deletions spark/v4.0/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,8 @@
* under the License.
*/

String sparkMajorVersion = '3.5'
String scalaVersion = System.getProperty("scalaVersion") != null ? System.getProperty("scalaVersion") : System.getProperty("defaultScalaVersion")
String sparkMajorVersion = '4.0'
String scalaVersion = '2.13'

def sparkProjects = [
project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}"),
Expand Down Expand Up @@ -67,7 +67,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {

compileOnly libs.errorprone.annotations
compileOnly libs.avro.avro
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive35.get()}") {
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive40.get()}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
exclude group: 'org.apache.parquet'
Expand Down Expand Up @@ -96,7 +96,10 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {

implementation libs.caffeine

testImplementation(libs.hadoop2.minicluster) {
// Add BoneCP dependency for test configurations
testImplementation 'com.jolbox:bonecp:0.8.0.RELEASE'

testImplementation(libs.hadoop34.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
Expand Down Expand Up @@ -154,7 +157,7 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
compileOnly project(':iceberg-core')
compileOnly project(':iceberg-common')
compileOnly project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}")
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive35.get()}") {
compileOnly("org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive40.get()}") {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.apache.arrow'
exclude group: 'org.apache.parquet'
Expand All @@ -172,14 +175,16 @@ project(":iceberg-spark:iceberg-spark-extensions-${sparkMajorVersion}_${scalaVer
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-hive-metastore', configuration: 'testArtifacts')
testImplementation project(path: ":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}", configuration: 'testArtifacts')
// Add BoneCP dependency for test configurations
testImplementation 'com.jolbox:bonecp:0.8.0.RELEASE'

testImplementation libs.avro.avro
testImplementation libs.parquet.hadoop
testImplementation libs.awaitility

// Required because we remove antlr plugin dependencies from the compile configuration, see note above
runtimeOnly libs.antlr.runtime
antlr libs.antlr.antlr4
runtimeOnly libs.antlr.runtime413
antlr libs.antlr.antlr413
}

test {
Expand Down Expand Up @@ -246,7 +251,7 @@ project(":iceberg-spark:iceberg-spark-runtime-${sparkMajorVersion}_${scalaVersio
}

integrationImplementation "org.scala-lang.modules:scala-collection-compat_${scalaVersion}:${libs.versions.scala.collection.compat.get()}"
integrationImplementation "org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive35.get()}"
integrationImplementation "org.apache.spark:spark-hive_${scalaVersion}:${libs.versions.spark.hive40.get()}"
integrationImplementation libs.junit.vintage.engine
integrationImplementation libs.junit.jupiter
integrationImplementation libs.slf4j.simple
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@
/**
* A benchmark that evaluates the delete file index build and lookup performance.
*
* <p>To run this benchmark for spark-3.5: <code>
* ./gradlew -DsparkVersions=3.5 :iceberg-spark:iceberg-spark-extensions-3.5_2.12:jmh
* <p>To run this benchmark for spark-4.0: <code>
* ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-extensions-4.0_2.13:jmh
* -PjmhIncludeRegex=DeleteFileIndexBenchmark
* -PjmhOutputPath=benchmark/iceberg-delete-file-index-benchmark.txt
* </code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,8 +53,8 @@
/**
* A benchmark that evaluates the performance of the cardinality check in MERGE operations.
*
* <p>To run this benchmark for spark-3.5: <code>
* ./gradlew -DsparkVersions=3.5 :iceberg-spark:iceberg-spark-extensions-3.5_2.12:jmh
* <p>To run this benchmark for spark-4.0: <code>
* ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-extensions-4.0_2.13:jmh
* -PjmhIncludeRegex=MergeCardinalityCheckBenchmark
* -PjmhOutputPath=benchmark/iceberg-merge-cardinality-check-benchmark.txt
* </code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@
/**
* A benchmark that evaluates the job planning performance.
*
* <p>To run this benchmark for spark-3.5: <code>
* ./gradlew -DsparkVersions=3.5 :iceberg-spark:iceberg-spark-extensions-3.5_2.12:jmh
* <p>To run this benchmark for spark-4.0: <code>
* ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-extensions-4.0_2.13:jmh
* -PjmhIncludeRegex=PlanningBenchmark
* -PjmhOutputPath=benchmark/iceberg-planning-benchmark.txt
* </code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,8 @@
/**
* A benchmark that evaluates the task group planning performance.
*
* <p>To run this benchmark for spark-3.5: <code>
* ./gradlew -DsparkVersions=3.5 :iceberg-spark:iceberg-spark-extensions-3.5_2.12:jmh
* <p>To run this benchmark for spark-4.0: <code>
* ./gradlew -DsparkVersions=4.0 :iceberg-spark:iceberg-spark-extensions-4.0_2.13:jmh
* -PjmhIncludeRegex=TaskGroupPlanningBenchmark
* -PjmhOutputPath=benchmark/iceberg-task-group-planning-benchmark.txt
* </code>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,8 +48,8 @@ object CheckViews extends (LogicalPlan => Unit) {
}

case AlterViewAs(ResolvedV2View(_, _), _, _) =>
throw new AnalysisException("ALTER VIEW <viewName> AS is not supported. Use CREATE OR REPLACE VIEW instead")

throw new IcebergAnalysisException(
"ALTER VIEW <viewName> AS is not supported. Use CREATE OR REPLACE VIEW instead")
case _ => // OK
}
}
Expand Down Expand Up @@ -105,7 +105,7 @@ object CheckViews extends (LogicalPlan => Unit) {
): Unit = {
val newCyclePath = cyclePath :+ currentViewIdent
if (currentViewIdent == viewIdent) {
throw new AnalysisException(String.format("Recursive cycle in view detected: %s (cycle: %s)",
throw new IcebergAnalysisException(String.format("Recursive cycle in view detected: %s (cycle: %s)",
viewIdent.asIdentifier, newCyclePath.map(p => p.mkString(".")).mkString(" -> ")))
} else {
children.foreach { c =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,15 +19,14 @@

package org.apache.spark.sql.catalyst.analysis

import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.expressions.Cast
import org.apache.spark.sql.catalyst.plans.logical.Call
import org.apache.spark.sql.catalyst.plans.logical.IcebergCall
import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.catalyst.rules.Rule

object ProcedureArgumentCoercion extends Rule[LogicalPlan] {
override def apply(plan: LogicalPlan): LogicalPlan = plan resolveOperators {
case c @ Call(procedure, args) if c.resolved =>
case c @ IcebergCall(procedure, args) if c.resolved =>
val params = procedure.parameters

val newArgs = args.zipWithIndex.map { case (arg, index) =>
Expand All @@ -36,7 +35,7 @@ object ProcedureArgumentCoercion extends Rule[LogicalPlan] {
val argType = arg.dataType

if (paramType != argType && !Cast.canUpCast(argType, paramType)) {
throw new AnalysisException(
throw new IcebergAnalysisException(
s"Wrong arg type for ${param.name}: cannot cast $argType to $paramType")
}

Expand Down
Loading
Loading