diff --git a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala index 19391593d38ce..f2f1571452c0a 100644 --- a/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala +++ b/connector/connect/client/jvm/src/test/scala/org/apache/spark/sql/ClientE2ETestSuite.scala @@ -26,7 +26,6 @@ import scala.jdk.CollectionConverters._ import org.apache.commons.io.FileUtils import org.apache.commons.io.output.TeeOutputStream -import org.apache.commons.lang3.{JavaVersion, SystemUtils} import org.scalactic.TolerantNumerics import org.scalatest.PrivateMethodTester @@ -453,42 +452,6 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM } } - test("write jdbc upsert") { - assume(IntegrationTestUtils.isSparkHiveJarAvailable) - if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) { - val url = "jdbc:derby:memory:1234" - val table = "upsert" - try { - spark - .range(10) - .select(col("id"), col("id").as("val")) - .write - .jdbc(url = s"$url;create=true", table, new Properties()) - spark - .range(5, 15, 1, 10) - .withColumn("val", lit(-1)) - .write - .options(Map("upsert" -> "true", "upsertKeyColumns" -> "id")) - .mode(SaveMode.Append) - .jdbc(url = url, table, new Properties()) - val result = spark.read - .jdbc(url = url, table, new Properties()) - .select((col("val") === -1).as("updated")) - .groupBy(col("updated")) - .count() - .sort(col("updated")) - .collect() - // we expect 5 unchanged rows (ids 0..4) and 10 updated rows (ids 5..14) - assert(result === Seq(Row(false, 5), Row(true, 10))) - } finally { - // clean up - assertThrows[SparkException] { - spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect() - } - } - } - } - test("writeTo with create") { withTable("testcat.myTableV2") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala index 22d967f77bb37..545cbf265bb09 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/DerbyDialect.scala @@ -17,16 +17,15 @@ package org.apache.spark.sql.jdbc -import java.sql.{Statement, Types} +import java.sql.Types import java.util.Locale import org.apache.spark.sql.connector.catalog.Identifier import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors} -import org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite import org.apache.spark.sql.types._ -private object DerbyDialect extends JdbcDialect with MergeByTempTable { +private object DerbyDialect extends JdbcDialect { override def canHandle(url: String): Boolean = url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby") @@ -57,17 +56,6 @@ private object DerbyDialect extends JdbcDialect with MergeByTempTable { override def isCascadingTruncateTable(): Option[Boolean] = Some(false) - override def createTempTableName(): String = "SESSION." + super.createTempTableName() - - override def createTempTable( - statement: Statement, - tableName: String, - strSchema: String, - options: JdbcOptionsInWrite): Unit = { - statement.executeUpdate(s"DECLARE GLOBAL TEMPORARY TABLE $tableName ($strSchema) " + - s"ON COMMIT DELETE ROWS NOT LOGGED ON ROLLBACK DELETE ROWS") - } - // See https://db.apache.org/derby/docs/10.15/ref/rrefsqljrenametablestatement.html override def renameTable(oldTable: Identifier, newTable: Identifier): String = { if (!oldTable.namespace().sameElements(newTable.namespace())) {