Skip to content

Commit

Permalink
Revert [SPARK-50230][SQL] Added logic to support reading unknown coll…
Browse files Browse the repository at this point in the history
…ation name as utf8_binary

### What changes were proposed in this pull request?

I propose reverting changes for  new `SQLConf` entry which enables spark to read an invalid collation name as `UTF8_BINARY`.

### Why are the changes needed?

Since the original changes may bring unwanted data corruption when a user writes in a table that has unknown collation and modifies its properties, the original PR must be reverted.

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

Not applicable.

### Was this patch authored or co-authored using generative AI tooling?

No.

Closes apache#48876 from vladanvasi-db/vladanvasi-db/allow-reading-unknown-collations-as-utf8-binary-revert.

Authored-by: Vladan Vasić <[email protected]>
Signed-off-by: Max Gekk <[email protected]>
  • Loading branch information
vladanvasi-db authored and MaxGekk committed Nov 22, 2024
1 parent ad82505 commit d5da49d
Show file tree
Hide file tree
Showing 5 changed files with 5 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,6 @@ private[sql] trait SqlApiConf {
def stackTracesInDataFrameContext: Int
def dataFrameQueryContextEnabled: Boolean
def legacyAllowUntypedScalaUDFs: Boolean
def allowReadingUnknownCollations: Boolean
}

private[sql] object SqlApiConf {
Expand All @@ -60,7 +59,6 @@ private[sql] object SqlApiConf {
SqlApiConfHelper.LOCAL_RELATION_CACHE_THRESHOLD_KEY
}
val DEFAULT_COLLATION: String = SqlApiConfHelper.DEFAULT_COLLATION
val ALLOW_READING_UNKNOWN_COLLATIONS: String = SqlApiConfHelper.ALLOW_READING_UNKNOWN_COLLATIONS

def get: SqlApiConf = SqlApiConfHelper.getConfGetter.get()()

Expand Down Expand Up @@ -89,5 +87,4 @@ private[sql] object DefaultSqlApiConf extends SqlApiConf {
override def stackTracesInDataFrameContext: Int = 1
override def dataFrameQueryContextEnabled: Boolean = true
override def legacyAllowUntypedScalaUDFs: Boolean = false
override def allowReadingUnknownCollations: Boolean = false
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,6 @@ private[sql] object SqlApiConfHelper {
val SESSION_LOCAL_TIMEZONE_KEY: String = "spark.sql.session.timeZone"
val LOCAL_RELATION_CACHE_THRESHOLD_KEY: String = "spark.sql.session.localRelationCacheThreshold"
val DEFAULT_COLLATION: String = "spark.sql.session.collation.default"
val ALLOW_READING_UNKNOWN_COLLATIONS: String =
"spark.sql.collation.allowReadingUnknownCollations"

val confGetter: AtomicReference[() => SqlApiConf] = {
new AtomicReference[() => SqlApiConf](() => DefaultSqlApiConf)
Expand Down
26 changes: 4 additions & 22 deletions sql/api/src/main/scala/org/apache/spark/sql/types/DataType.scala
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ import org.json4s.JsonAST.JValue
import org.json4s.JsonDSL._
import org.json4s.jackson.JsonMethods._

import org.apache.spark.{SparkException, SparkIllegalArgumentException, SparkThrowable}
import org.apache.spark.{SparkIllegalArgumentException, SparkThrowable}
import org.apache.spark.annotation.Stable
import org.apache.spark.sql.catalyst.analysis.SqlApiAnalysis
import org.apache.spark.sql.catalyst.parser.DataTypeParser
Expand Down Expand Up @@ -340,17 +340,8 @@ object DataType {
fields.collect { case (fieldPath, JString(collation)) =>
collation.split("\\.", 2) match {
case Array(provider: String, collationName: String) =>
try {
CollationFactory.assertValidProvider(provider)
fieldPath -> collationName
} catch {
case e: SparkException
if e.getCondition == "COLLATION_INVALID_PROVIDER" &&
SqlApiConf.get.allowReadingUnknownCollations =>
// If the collation provider is unknown and the config for reading such
// collations is enabled, return the UTF8_BINARY collation.
fieldPath -> "UTF8_BINARY"
}
CollationFactory.assertValidProvider(provider)
fieldPath -> collationName
}
}.toMap

Expand All @@ -359,16 +350,7 @@ object DataType {
}

private def stringTypeWithCollation(collationName: String): StringType = {
try {
StringType(CollationFactory.collationNameToId(collationName))
} catch {
case e: SparkException
if e.getCondition == "COLLATION_INVALID_NAME" &&
SqlApiConf.get.allowReadingUnknownCollations =>
// If the collation name is unknown and the config for reading such collations is enabled,
// return the UTF8_BINARY collation.
StringType(CollationFactory.UTF8_BINARY_COLLATION_ID)
}
StringType(CollationFactory.collationNameToId(collationName))
}

protected[types] def buildFormattedString(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -778,15 +778,6 @@ object SQLConf {
.booleanConf
.createWithDefault(Utils.isTesting)

val ALLOW_READING_UNKNOWN_COLLATIONS =
buildConf(SqlApiConfHelper.ALLOW_READING_UNKNOWN_COLLATIONS)
.internal()
.doc("Enables spark to read unknown collation name as UTF8_BINARY. If the config is " +
"not enabled, when spark encounters an unknown collation name, it will throw an error.")
.version("4.0.0")
.booleanConf
.createWithDefault(false)

val DEFAULT_COLLATION =
buildConf(SqlApiConfHelper.DEFAULT_COLLATION)
.doc("Sets default collation to use for string literals, parameter markers or the string" +
Expand Down Expand Up @@ -5582,8 +5573,6 @@ class SQLConf extends Serializable with Logging with SqlApiConf {
}
}

override def allowReadingUnknownCollations: Boolean = getConf(ALLOW_READING_UNKNOWN_COLLATIONS)

def adaptiveExecutionEnabled: Boolean = getConf(ADAPTIVE_EXECUTION_ENABLED)

def adaptiveExecutionLogLevel: String = getConf(ADAPTIVE_EXECUTION_LOG_LEVEL)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,11 @@ import org.json4s.jackson.JsonMethods
import org.apache.spark.{SparkException, SparkFunSuite, SparkIllegalArgumentException}
import org.apache.spark.sql.catalyst.analysis.{caseInsensitiveResolution, caseSensitiveResolution}
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.plans.SQLHelper
import org.apache.spark.sql.catalyst.types.DataTypeUtils
import org.apache.spark.sql.catalyst.util.{CollationFactory, StringConcat}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.DataTypeTestUtils.{dayTimeIntervalTypes, yearMonthIntervalTypes}

class DataTypeSuite extends SparkFunSuite with SQLHelper {
class DataTypeSuite extends SparkFunSuite {

private val UNICODE_COLLATION_ID = CollationFactory.collationNameToId("UNICODE")

Expand Down Expand Up @@ -878,90 +876,6 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
}
}

test("string field with invalid collation name") {
val collationProviders = Seq("spark", "icu")
collationProviders.foreach { provider =>
val json =
s"""
|{
| "type": "struct",
| "fields": [
| {
| "name": "c1",
| "type": "string",
| "nullable": false,
| "metadata": {
| "${DataType.COLLATIONS_METADATA_KEY}": {
| "c1": "$provider.INVALID"
| }
| }
| }
| ]
|}
|""".stripMargin

// Check that the exception will be thrown in case of invalid collation name and
// UNKNOWN_COLLATION_NAME config not enabled.
checkError(
exception = intercept[SparkException] {
DataType.fromJson(json)
},
condition = "COLLATION_INVALID_NAME",
parameters = Map(
"proposals" -> "id",
"collationName" -> "INVALID"))

// Check that the exception will not be thrown in case of invalid collation name and
// UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be returned.
withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
val dataType = DataType.fromJson(json)
assert(dataType === StructType(
StructField("c1", StringType(CollationFactory.UTF8_BINARY_COLLATION_ID), false) :: Nil))
}
}
}

test("string field with invalid collation provider") {
val json =
s"""
|{
| "type": "struct",
| "fields": [
| {
| "name": "c1",
| "type": "string",
| "nullable": false,
| "metadata": {
| "${DataType.COLLATIONS_METADATA_KEY}": {
| "c1": "INVALID.INVALID"
| }
| }
| }
| ]
|}
|""".stripMargin


// Check that the exception will be thrown in case of invalid collation name and
// UNKNOWN_COLLATION_NAME config not enabled.
checkError(
exception = intercept[SparkException] {
DataType.fromJson(json)
},
condition = "COLLATION_INVALID_PROVIDER",
parameters = Map(
"supportedProviders" -> "spark, icu",
"provider" -> "INVALID"))

// Check that the exception will not be thrown in case of invalid collation name and
// UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be returned.
withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
val dataType = DataType.fromJson(json)
assert(dataType === StructType(
StructField("c1", StringType(CollationFactory.UTF8_BINARY_COLLATION_ID), false) :: Nil))
}
}

test("non string field has collation metadata") {
val json =
s"""
Expand Down Expand Up @@ -1109,42 +1023,6 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
assert(parsedWithCollations === ArrayType(StringType(unicodeCollationId)))
}

test("parse array type with invalid collation metadata") {
val utf8BinaryCollationId = CollationFactory.UTF8_BINARY_COLLATION_ID
val arrayJson =
s"""
|{
| "type": "array",
| "elementType": "string",
| "containsNull": true
|}
|""".stripMargin

val collationsMap = Map("element" -> "INVALID")

// Parse without collations map
assert(DataType.parseDataType(JsonMethods.parse(arrayJson)) === ArrayType(StringType))

// Check that the exception will be thrown in case of invalid collation name and
// UNKNOWN_COLLATION_NAME config not enabled.
checkError(
exception = intercept[SparkException] {
DataType.parseDataType(JsonMethods.parse(arrayJson), collationsMap = collationsMap)
},
condition = "COLLATION_INVALID_NAME",
parameters = Map(
"proposals" -> "id",
"collationName" -> "INVALID"))

// Check that the exception will not be thrown in case of invalid collation name and
// UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be returned.
withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
val dataType = DataType.parseDataType(
JsonMethods.parse(arrayJson), collationsMap = collationsMap)
assert(dataType === ArrayType(StringType(utf8BinaryCollationId)))
}
}

test("parse map type with collation metadata") {
val unicodeCollationId = CollationFactory.collationNameToId("UNICODE")
val mapJson =
Expand All @@ -1168,44 +1046,6 @@ class DataTypeSuite extends SparkFunSuite with SQLHelper {
MapType(StringType(unicodeCollationId), StringType(unicodeCollationId)))
}

test("parse map type with invalid collation metadata") {
val utf8BinaryCollationId = CollationFactory.UTF8_BINARY_COLLATION_ID
val mapJson =
s"""
|{
| "type": "map",
| "keyType": "string",
| "valueType": "string",
| "valueContainsNull": true
|}
|""".stripMargin

val collationsMap = Map("key" -> "INVALID", "value" -> "INVALID")

// Parse without collations map
assert(DataType.parseDataType(JsonMethods.parse(mapJson)) === MapType(StringType, StringType))

// Check that the exception will be thrown in case of invalid collation name and
// UNKNOWN_COLLATION_NAME config not enabled.
checkError(
exception = intercept[SparkException] {
DataType.parseDataType(JsonMethods.parse(mapJson), collationsMap = collationsMap)
},
condition = "COLLATION_INVALID_NAME",
parameters = Map(
"proposals" -> "id",
"collationName" -> "INVALID"))

// Check that the exception will not be thrown in case of invalid collation name and
// UNKNOWN_COLLATION_NAME enabled, but UTF8_BINARY collation will be returned.
withSQLConf(SQLConf.ALLOW_READING_UNKNOWN_COLLATIONS.key -> "true") {
val dataType = DataType.parseDataType(
JsonMethods.parse(mapJson), collationsMap = collationsMap)
assert(dataType === MapType(
StringType(utf8BinaryCollationId), StringType(utf8BinaryCollationId)))
}
}

test("SPARK-48680: Add CharType and VarcharType to DataTypes JAVA API") {
assert(DataTypes.createCharType(1) === CharType(1))
assert(DataTypes.createVarcharType(100) === VarcharType(100))
Expand Down

0 comments on commit d5da49d

Please sign in to comment.