From 5092c8970246eb828a31154796c3b16f0b61bddd Mon Sep 17 00:00:00 2001 From: Michael Zhang Date: Tue, 24 Oct 2023 14:51:45 +0500 Subject: [PATCH] [SPARK-45561][SQL] Add proper conversions for TINYINT in MySQLDialect ### What changes were proposed in this pull request? Change MySql Dialect to convert catalyst TINYINT into MySQL TINYINT rather than BYTE and INTEGER. BYTE does not exist in MySQL. The same applies to MsSqlServerDialect. ### Why are the changes needed? Since BYTE type does not exist in MySQL, any casts that could be pushed down involving BYTE type would fail. ### Does this PR introduce _any_ user-facing change? No ### How was this patch tested? UT pass. ### Was this patch authored or co-authored using generative AI tooling? No Closes #43390 from michaelzhan-db/SPARK-45561. Lead-authored-by: Michael Zhang Co-authored-by: Wenchen Fan Signed-off-by: Max Gekk --- .../org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala | 8 +++++--- .../scala/org/apache/spark/sql/jdbc/MySQLDialect.scala | 5 ++++- .../test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala | 2 ++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala index 429404168d1e0..7116bcc7de3e9 100644 --- a/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala +++ b/connector/docker-integration-tests/src/test/scala/org/apache/spark/sql/jdbc/MySQLIntegrationSuite.scala @@ -56,10 +56,10 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { conn.prepareStatement("CREATE TABLE numbers (onebit BIT(1), tenbits BIT(10), " + "small SMALLINT, med MEDIUMINT, nor INT, big BIGINT, deci DECIMAL(40,20), flt FLOAT, " - + "dbl DOUBLE)").executeUpdate() + + "dbl DOUBLE, tiny TINYINT)").executeUpdate() conn.prepareStatement("INSERT INTO numbers VALUES (b'0', b'1000100101', " + "17, 77777, 123456789, 123456789012345, 123456789012345.123456789012345, " - + "42.75, 1.0000000000000002)").executeUpdate() + + "42.75, 1.0000000000000002, -128)").executeUpdate() conn.prepareStatement("CREATE TABLE dates (d DATE, t TIME, dt DATETIME, ts TIMESTAMP, " + "yr YEAR)").executeUpdate() @@ -89,7 +89,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { val rows = df.collect() assert(rows.length == 1) val types = rows(0).toSeq.map(x => x.getClass.toString) - assert(types.length == 9) + assert(types.length == 10) assert(types(0).equals("class java.lang.Boolean")) assert(types(1).equals("class java.lang.Long")) assert(types(2).equals("class java.lang.Integer")) @@ -99,6 +99,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(types(6).equals("class java.math.BigDecimal")) assert(types(7).equals("class java.lang.Double")) assert(types(8).equals("class java.lang.Double")) + assert(types(9).equals("class java.lang.Byte")) assert(rows(0).getBoolean(0) == false) assert(rows(0).getLong(1) == 0x225) assert(rows(0).getInt(2) == 17) @@ -109,6 +110,7 @@ class MySQLIntegrationSuite extends DockerJDBCIntegrationSuite { assert(rows(0).getAs[BigDecimal](6).equals(bd)) assert(rows(0).getDouble(7) == 42.75) assert(rows(0).getDouble(8) == 1.0000000000000002) + assert(rows(0).getByte(9) == 0x80.toByte) } test("Date types") { diff --git a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala index 3c6d02d86412f..dd74c93bc2e19 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/jdbc/MySQLDialect.scala @@ -32,7 +32,7 @@ import org.apache.spark.sql.connector.catalog.index.TableIndex import org.apache.spark.sql.connector.expressions.{Expression, FieldReference, NamedReference, NullOrdering, SortDirection} import org.apache.spark.sql.errors.QueryExecutionErrors import org.apache.spark.sql.execution.datasources.jdbc.{JDBCOptions, JdbcUtils} -import org.apache.spark.sql.types.{BooleanType, DataType, FloatType, LongType, MetadataBuilder, StringType} +import org.apache.spark.sql.types.{BooleanType, ByteType, DataType, FloatType, LongType, MetadataBuilder, StringType} private case object MySQLDialect extends JdbcDialect with SQLConfHelper { @@ -102,6 +102,8 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // Some MySQL JDBC drivers converts JSON type into Types.VARCHAR with a precision of -1. // Explicitly converts it into StringType here. Some(StringType) + } else if (sqlType == Types.TINYINT && typeName.equals("TINYINT")) { + Some(ByteType) } else None } @@ -184,6 +186,7 @@ private case object MySQLDialect extends JdbcDialect with SQLConfHelper { // We override getJDBCType so that FloatType is mapped to FLOAT instead case FloatType => Option(JdbcType("FLOAT", java.sql.Types.FLOAT)) case StringType => Option(JdbcType("LONGTEXT", java.sql.Types.LONGVARCHAR)) + case ByteType => Option(JdbcType("TINYINT", java.sql.Types.TINYINT)) case _ => JdbcUtils.getCommonJDBCType(dt) } diff --git a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala index 5f28164e8f6a9..e759ef01e2c73 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/jdbc/JDBCSuite.scala @@ -914,6 +914,8 @@ class JDBCSuite extends QueryTest with SharedSparkSession { assert(mySqlDialect.getCatalystType(java.sql.Types.VARBINARY, "BIT", 1, metadata) == None) assert(mySqlDialect.getCatalystType(java.sql.Types.BIT, "TINYINT", 1, metadata) == Some(BooleanType)) + assert(mySqlDialect.getCatalystType(java.sql.Types.TINYINT, "TINYINT", 1, metadata) == + Some(ByteType)) } test("SPARK-35446: MySQLDialect type mapping of float") {