Skip to content

Commit

Permalink
Remove derby upsert support
Browse files Browse the repository at this point in the history
Derby does not support indices on temp tables:
https://db.apache.org/derby/docs/10.2/ref/rrefdeclaretemptable.html
  • Loading branch information
EnricoMi committed Jan 23, 2024
1 parent 3518a98 commit 60e41ca
Show file tree
Hide file tree
Showing 2 changed files with 2 additions and 51 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import scala.jdk.CollectionConverters._

import org.apache.commons.io.FileUtils
import org.apache.commons.io.output.TeeOutputStream
import org.apache.commons.lang3.{JavaVersion, SystemUtils}
import org.scalactic.TolerantNumerics
import org.scalatest.PrivateMethodTester

Expand Down Expand Up @@ -453,42 +452,6 @@ class ClientE2ETestSuite extends RemoteSparkSession with SQLHelper with PrivateM
}
}

test("write jdbc upsert") {
assume(IntegrationTestUtils.isSparkHiveJarAvailable)
if (SystemUtils.isJavaVersionAtLeast(JavaVersion.JAVA_9)) {
val url = "jdbc:derby:memory:1234"
val table = "upsert"
try {
spark
.range(10)
.select(col("id"), col("id").as("val"))
.write
.jdbc(url = s"$url;create=true", table, new Properties())
spark
.range(5, 15, 1, 10)
.withColumn("val", lit(-1))
.write
.options(Map("upsert" -> "true", "upsertKeyColumns" -> "id"))
.mode(SaveMode.Append)
.jdbc(url = url, table, new Properties())
val result = spark.read
.jdbc(url = url, table, new Properties())
.select((col("val") === -1).as("updated"))
.groupBy(col("updated"))
.count()
.sort(col("updated"))
.collect()
// we expect 5 unchanged rows (ids 0..4) and 10 updated rows (ids 5..14)
assert(result === Seq(Row(false, 5), Row(true, 10)))
} finally {
// clean up
assertThrows[SparkException] {
spark.read.jdbc(url = s"$url;drop=true", table, new Properties()).collect()
}
}
}
}

test("writeTo with create") {
withTable("testcat.myTableV2") {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

package org.apache.spark.sql.jdbc

import java.sql.{Statement, Types}
import java.sql.Types
import java.util.Locale

import org.apache.spark.sql.connector.catalog.Identifier
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.jdbc.JdbcOptionsInWrite
import org.apache.spark.sql.types._


private object DerbyDialect extends JdbcDialect with MergeByTempTable {
private object DerbyDialect extends JdbcDialect {

override def canHandle(url: String): Boolean =
url.toLowerCase(Locale.ROOT).startsWith("jdbc:derby")
Expand Down Expand Up @@ -57,17 +56,6 @@ private object DerbyDialect extends JdbcDialect with MergeByTempTable {

override def isCascadingTruncateTable(): Option[Boolean] = Some(false)

override def createTempTableName(): String = "SESSION." + super.createTempTableName()

override def createTempTable(
statement: Statement,
tableName: String,
strSchema: String,
options: JdbcOptionsInWrite): Unit = {
statement.executeUpdate(s"DECLARE GLOBAL TEMPORARY TABLE $tableName ($strSchema) " +
s"ON COMMIT DELETE ROWS NOT LOGGED ON ROLLBACK DELETE ROWS")
}

// See https://db.apache.org/derby/docs/10.15/ref/rrefsqljrenametablestatement.html
override def renameTable(oldTable: Identifier, newTable: Identifier): String = {
if (!oldTable.namespace().sameElements(newTable.namespace())) {
Expand Down

0 comments on commit 60e41ca

Please sign in to comment.