Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[SPARK-49872] Spark History UI -- StreamConstraintsException: String length (20054016) exceeds the maximum length (20000000) #6

Closed
wants to merge 9 commits into from
Closed
Original file line number Diff line number Diff line change
Expand Up @@ -154,16 +154,24 @@ public static class Collation {
*/
public final boolean supportsLowercaseEquality;

/**
* Support for Space Trimming implies that that based on specifier (for now only right trim)
* leading, trailing or both spaces are removed from the input string before comparison.
*/
public final boolean supportsSpaceTrimming;

public Collation(
String collationName,
String provider,
Collator collator,
Comparator<UTF8String> comparator,
String version,
ToLongFunction<UTF8String> hashFunction,
BiFunction<UTF8String, UTF8String, Boolean> equalsFunction,
boolean supportsBinaryEquality,
boolean supportsBinaryOrdering,
boolean supportsLowercaseEquality) {
boolean supportsLowercaseEquality,
boolean supportsSpaceTrimming) {
this.collationName = collationName;
this.provider = provider;
this.collator = collator;
Expand All @@ -173,19 +181,15 @@ public Collation(
this.supportsBinaryEquality = supportsBinaryEquality;
this.supportsBinaryOrdering = supportsBinaryOrdering;
this.supportsLowercaseEquality = supportsLowercaseEquality;
this.equalsFunction = equalsFunction;
this.supportsSpaceTrimming = supportsSpaceTrimming;

// De Morgan's Law to check supportsBinaryOrdering => supportsBinaryEquality
assert(!supportsBinaryOrdering || supportsBinaryEquality);
// No Collation can simultaneously support binary equality and lowercase equality
assert(!supportsBinaryEquality || !supportsLowercaseEquality);

assert(SUPPORTED_PROVIDERS.contains(provider));

if (supportsBinaryEquality) {
this.equalsFunction = UTF8String::equals;
} else {
this.equalsFunction = (s1, s2) -> this.comparator.compare(s1, s2) == 0;
}
}

/**
Expand Down Expand Up @@ -538,27 +542,63 @@ private static boolean isValidCollationId(int collationId) {
@Override
protected Collation buildCollation() {
if (caseSensitivity == CaseSensitivity.UNSPECIFIED) {
Comparator<UTF8String> comparator;
ToLongFunction<UTF8String> hashFunction;
BiFunction<UTF8String, UTF8String, Boolean> equalsFunction;
boolean supportsSpaceTrimming = spaceTrimming != SpaceTrimming.NONE;

if (spaceTrimming == SpaceTrimming.NONE) {
comparator = UTF8String::binaryCompare;
hashFunction = s -> (long) s.hashCode();
equalsFunction = UTF8String::equals;
} else {
comparator = (s1, s2) -> applyTrimmingPolicy(s1, spaceTrimming).binaryCompare(
applyTrimmingPolicy(s2, spaceTrimming));
hashFunction = s -> (long) applyTrimmingPolicy(s, spaceTrimming).hashCode();
equalsFunction = (s1, s2) -> applyTrimmingPolicy(s1, spaceTrimming).equals(
applyTrimmingPolicy(s2, spaceTrimming));
}

return new Collation(
normalizedCollationName(),
PROVIDER_SPARK,
null,
UTF8String::binaryCompare,
comparator,
"1.0",
s -> (long) s.hashCode(),
hashFunction,
equalsFunction,
/* supportsBinaryEquality = */ true,
/* supportsBinaryOrdering = */ true,
/* supportsLowercaseEquality = */ false);
/* supportsLowercaseEquality = */ false,
spaceTrimming != SpaceTrimming.NONE);
} else {
Comparator<UTF8String> comparator;
ToLongFunction<UTF8String> hashFunction;

if (spaceTrimming == SpaceTrimming.NONE) {
comparator = CollationAwareUTF8String::compareLowerCase;
hashFunction = s ->
(long) CollationAwareUTF8String.lowerCaseCodePoints(s).hashCode();
} else {
comparator = (s1, s2) -> CollationAwareUTF8String.compareLowerCase(
applyTrimmingPolicy(s1, spaceTrimming),
applyTrimmingPolicy(s2, spaceTrimming));
hashFunction = s -> (long) CollationAwareUTF8String.lowerCaseCodePoints(
applyTrimmingPolicy(s, spaceTrimming)).hashCode();
}

return new Collation(
normalizedCollationName(),
PROVIDER_SPARK,
null,
CollationAwareUTF8String::compareLowerCase,
comparator,
"1.0",
s -> (long) CollationAwareUTF8String.lowerCaseCodePoints(s).hashCode(),
hashFunction,
(s1, s2) -> comparator.compare(s1, s2) == 0,
/* supportsBinaryEquality = */ false,
/* supportsBinaryOrdering = */ false,
/* supportsLowercaseEquality = */ true);
/* supportsLowercaseEquality = */ true,
spaceTrimming != SpaceTrimming.NONE);
}
}

Expand Down Expand Up @@ -917,16 +957,35 @@ protected Collation buildCollation() {
Collator collator = Collator.getInstance(resultLocale);
// Freeze ICU collator to ensure thread safety.
collator.freeze();

Comparator<UTF8String> comparator;
ToLongFunction<UTF8String> hashFunction;

if (spaceTrimming == SpaceTrimming.NONE) {
hashFunction = s -> (long) collator.getCollationKey(
s.toValidString()).hashCode();
comparator = (s1, s2) ->
collator.compare(s1.toValidString(), s2.toValidString());
} else {
comparator = (s1, s2) -> collator.compare(
applyTrimmingPolicy(s1, spaceTrimming).toValidString(),
applyTrimmingPolicy(s2, spaceTrimming).toValidString());
hashFunction = s -> (long) collator.getCollationKey(
applyTrimmingPolicy(s, spaceTrimming).toValidString()).hashCode();
}

return new Collation(
normalizedCollationName(),
PROVIDER_ICU,
collator,
(s1, s2) -> collator.compare(s1.toValidString(), s2.toValidString()),
comparator,
ICU_COLLATOR_VERSION,
s -> (long) collator.getCollationKey(s.toValidString()).hashCode(),
hashFunction,
(s1, s2) -> comparator.compare(s1, s2) == 0,
/* supportsBinaryEquality = */ false,
/* supportsBinaryOrdering = */ false,
/* supportsLowercaseEquality = */ false);
/* supportsLowercaseEquality = */ false,
spaceTrimming != SpaceTrimming.NONE);
}

@Override
Expand Down Expand Up @@ -1103,14 +1162,6 @@ public static boolean isCaseSensitiveAndAccentInsensitive(int collationId) {
Collation.CollationSpecICU.AccentSensitivity.AI;
}

/**
* Returns whether the collation uses trim collation for the given collation id.
*/
public static boolean usesTrimCollation(int collationId) {
return Collation.CollationSpec.getSpaceTrimming(collationId) !=
Collation.CollationSpec.SpaceTrimming.NONE;
}

public static void assertValidProvider(String provider) throws SparkException {
if (!SUPPORTED_PROVIDERS.contains(provider.toLowerCase())) {
Map<String, String> params = Map.of(
Expand All @@ -1137,7 +1188,7 @@ public static String[] getICULocaleNames() {

public static UTF8String getCollationKey(UTF8String input, int collationId) {
Collation collation = fetchCollation(collationId);
if (usesTrimCollation(collationId)) {
if (collation.supportsSpaceTrimming) {
input = Collation.CollationSpec.applyTrimmingPolicy(input, collationId);
}
if (collation.supportsBinaryEquality) {
Expand All @@ -1153,7 +1204,7 @@ public static UTF8String getCollationKey(UTF8String input, int collationId) {

public static byte[] getCollationKeyBytes(UTF8String input, int collationId) {
Collation collation = fetchCollation(collationId);
if (usesTrimCollation(collationId)) {
if (collation.supportsSpaceTrimming) {
input = Collation.CollationSpec.applyTrimmingPolicy(input, collationId);
}
if (collation.supportsBinaryEquality) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -127,22 +127,42 @@ class CollationFactorySuite extends AnyFunSuite with Matchers { // scalastyle:ig
CollationTestCase("UTF8_BINARY", "aaa", "AAA", false),
CollationTestCase("UTF8_BINARY", "aaa", "bbb", false),
CollationTestCase("UTF8_BINARY", "å", "a\u030A", false),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa", "aaa", true),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa", "aaa ", true),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa ", "aaa ", true),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa", " aaa ", false),
CollationTestCase("UTF8_BINARY_RTRIM", " ", " ", true),
CollationTestCase("UTF8_LCASE", "aaa", "aaa", true),
CollationTestCase("UTF8_LCASE", "aaa", "AAA", true),
CollationTestCase("UTF8_LCASE", "aaa", "AaA", true),
CollationTestCase("UTF8_LCASE", "aaa", "AaA", true),
CollationTestCase("UTF8_LCASE", "aaa", "aa", false),
CollationTestCase("UTF8_LCASE", "aaa", "bbb", false),
CollationTestCase("UTF8_LCASE", "å", "a\u030A", false),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa", "AaA", true),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa", "AaA ", true),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa ", "AaA ", true),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa", " AaA ", false),
CollationTestCase("UTF8_LCASE_RTRIM", " ", " ", true),
CollationTestCase("UNICODE", "aaa", "aaa", true),
CollationTestCase("UNICODE", "aaa", "AAA", false),
CollationTestCase("UNICODE", "aaa", "bbb", false),
CollationTestCase("UNICODE", "å", "a\u030A", true),
CollationTestCase("UNICODE_RTRIM", "aaa", "aaa", true),
CollationTestCase("UNICODE_RTRIM", "aaa", "aaa ", true),
CollationTestCase("UNICODE_RTRIM", "aaa ", "aaa ", true),
CollationTestCase("UNICODE_RTRIM", "aaa", " aaa ", false),
CollationTestCase("UNICODE_RTRIM", " ", " ", true),
CollationTestCase("UNICODE_CI", "aaa", "aaa", true),
CollationTestCase("UNICODE_CI", "aaa", "AAA", true),
CollationTestCase("UNICODE_CI", "aaa", "bbb", false),
CollationTestCase("UNICODE_CI", "å", "a\u030A", true),
CollationTestCase("UNICODE_CI", "Å", "a\u030A", true)
CollationTestCase("UNICODE_CI", "Å", "a\u030A", true),
CollationTestCase("UNICODE_CI_RTRIM", "aaa", "AaA", true),
CollationTestCase("UNICODE_CI_RTRIM", "aaa", "AaA ", true),
CollationTestCase("UNICODE_CI_RTRIM", "aaa ", "AaA ", true),
CollationTestCase("UNICODE_CI_RTRIM", "aaa", " AaA ", false),
CollationTestCase("UNICODE_RTRIM", " ", " ", true)
)

checks.foreach(testCase => {
Expand All @@ -162,19 +182,48 @@ class CollationFactorySuite extends AnyFunSuite with Matchers { // scalastyle:ig
CollationTestCase("UTF8_BINARY", "aaa", "AAA", 1),
CollationTestCase("UTF8_BINARY", "aaa", "bbb", -1),
CollationTestCase("UTF8_BINARY", "aaa", "BBB", 1),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa ", "aaa", 0),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa ", "aaa ", 0),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa ", "bbb", -1),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa ", "bbb ", -1),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa", "BBB" , 1),
CollationTestCase("UTF8_BINARY_RTRIM", "aaa ", "BBB " , 1),
CollationTestCase("UTF8_BINARY_RTRIM", " ", " " , 0),
CollationTestCase("UTF8_LCASE", "aaa", "aaa", 0),
CollationTestCase("UTF8_LCASE", "aaa", "AAA", 0),
CollationTestCase("UTF8_LCASE", "aaa", "AaA", 0),
CollationTestCase("UTF8_LCASE", "aaa", "AaA", 0),
CollationTestCase("UTF8_LCASE", "aaa", "aa", 1),
CollationTestCase("UTF8_LCASE", "aaa", "bbb", -1),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa ", "AAA", 0),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa ", "AAA ", 0),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa", "bbb ", -1),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa ", "bbb ", -1),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa ", "aa", 1),
CollationTestCase("UTF8_LCASE_RTRIM", "aaa ", "aa ", 1),
CollationTestCase("UTF8_LCASE_RTRIM", " ", " ", 0),
CollationTestCase("UNICODE", "aaa", "aaa", 0),
CollationTestCase("UNICODE", "aaa", "AAA", -1),
CollationTestCase("UNICODE", "aaa", "bbb", -1),
CollationTestCase("UNICODE", "aaa", "BBB", -1),
CollationTestCase("UNICODE_RTRIM", "aaa ", "aaa", 0),
CollationTestCase("UNICODE_RTRIM", "aaa ", "aaa ", 0),
CollationTestCase("UNICODE_RTRIM", "aaa ", "bbb", -1),
CollationTestCase("UNICODE_RTRIM", "aaa ", "bbb ", -1),
CollationTestCase("UNICODE_RTRIM", "aaa", "BBB" , -1),
CollationTestCase("UNICODE_RTRIM", "aaa ", "BBB " , -1),
CollationTestCase("UNICODE_RTRIM", " ", " ", 0),
CollationTestCase("UNICODE_CI", "aaa", "aaa", 0),
CollationTestCase("UNICODE_CI", "aaa", "AAA", 0),
CollationTestCase("UNICODE_CI", "aaa", "bbb", -1))
CollationTestCase("UNICODE_CI", "aaa", "bbb", -1),
CollationTestCase("UNICODE_CI_RTRIM", "aaa ", "AAA", 0),
CollationTestCase("UNICODE_CI_RTRIM", "aaa ", "AAA ", 0),
CollationTestCase("UNICODE_CI_RTRIM", "aaa", "bbb ", -1),
CollationTestCase("UNICODE_CI_RTRIM", "aaa ", "bbb ", -1),
CollationTestCase("UNICODE_CI_RTRIM", "aaa ", "aa", 1),
CollationTestCase("UNICODE_CI_RTRIM", "aaa ", "aa ", 1),
CollationTestCase("UNICODE_CI_RTRIM", " ", " ", 0)
)

checks.foreach(testCase => {
val collation = fetchCollation(testCase.collationName)
Expand Down
11 changes: 6 additions & 5 deletions common/utils/src/main/resources/error/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1212,6 +1212,12 @@
],
"sqlState" : "42604"
},
"EMPTY_SCHEMA_NOT_SUPPORTED_FOR_DATASOURCE" : {
"message" : [
"The <format> datasource does not support writing empty or nested empty schemas. Please make sure the data schema has at least one or more column(s)."
],
"sqlState" : "0A000"
},
"ENCODER_NOT_FOUND" : {
"message" : [
"Not found an encoder of the type <typeName> to Spark SQL internal representation.",
Expand Down Expand Up @@ -6162,11 +6168,6 @@
"Multiple sources found for <provider> (<sourceNames>), please specify the fully qualified class name."
]
},
"_LEGACY_ERROR_TEMP_1142" : {
"message" : [
"Datasource does not support writing empty or nested empty schemas. Please make sure the data schema has at least one or more column(s)."
]
},
"_LEGACY_ERROR_TEMP_1143" : {
"message" : [
"The data to be inserted needs to have the same number of columns as the target table: target table has <targetSize> column(s) but the inserted data has <actualSize> column(s), which contain <staticPartitionsSize> partition column(s) having assigned constant values."
Expand Down
14 changes: 12 additions & 2 deletions common/utils/src/main/scala/org/apache/spark/util/JsonUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,25 @@ package org.apache.spark.util
import java.io.ByteArrayOutputStream
import java.nio.charset.StandardCharsets

import com.fasterxml.jackson.core.{JsonEncoding, JsonGenerator}
import com.fasterxml.jackson.core.{JsonEncoding, JsonFactory, JsonGenerator, StreamReadConstraints}
import com.fasterxml.jackson.databind.{DeserializationFeature, ObjectMapper}
import com.fasterxml.jackson.module.scala.DefaultScalaModule

import org.apache.spark.util.SparkErrorUtils.tryWithResource

private[spark] trait JsonUtils {

protected val mapper: ObjectMapper = new ObjectMapper().registerModule(DefaultScalaModule)
private val streamReadConstraints: StreamReadConstraints = StreamReadConstraints
.builder()
.maxStringLength(200)
.maxNestingDepth(200)
.maxNumberLength(200)
.build()

private val jsonFactory = new JsonFactory().setStreamReadConstraints(streamReadConstraints)

protected val mapper: ObjectMapper = new ObjectMapper(jsonFactory)
.registerModule(DefaultScalaModule)
.configure(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES, false)

def toJsonString(block: JsonGenerator => Unit): String = {
Expand Down
2 changes: 1 addition & 1 deletion dev/deps/spark-deps-hadoop-3-hive-2.3
Original file line number Diff line number Diff line change
Expand Up @@ -274,7 +274,7 @@ tink/1.15.0//tink-1.15.0.jar
transaction-api/1.1//transaction-api-1.1.jar
univocity-parsers/2.9.1//univocity-parsers-2.9.1.jar
wildfly-openssl/1.1.3.Final//wildfly-openssl-1.1.3.Final.jar
xbean-asm9-shaded/4.25//xbean-asm9-shaded-4.25.jar
xbean-asm9-shaded/4.26//xbean-asm9-shaded-4.26.jar
xmlschema-core/2.3.1//xmlschema-core-2.3.1.jar
xz/1.10//xz-1.10.jar
zjsonpatch/0.3.0//zjsonpatch-0.3.0.jar
Expand Down
4 changes: 2 additions & 2 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@
<maven.version>3.9.9</maven.version>
<exec-maven-plugin.version>3.2.0</exec-maven-plugin.version>
<sbt.project.name>spark</sbt.project.name>
<asm.version>9.7</asm.version>
<asm.version>9.7.1</asm.version>
<slf4j.version>2.0.16</slf4j.version>
<log4j.version>2.24.1</log4j.version>
<!-- make sure to update IsolatedClientLoader whenever this version is changed -->
Expand Down Expand Up @@ -491,7 +491,7 @@
<dependency>
<groupId>org.apache.xbean</groupId>
<artifactId>xbean-asm9-shaded</artifactId>
<version>4.25</version>
<version>4.26</version>
</dependency>

<!-- Shaded deps marked as provided. These are promoted to compile scope
Expand Down
4 changes: 2 additions & 2 deletions project/plugins.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,9 @@ addSbtPlugin("com.github.sbt" % "sbt-unidoc" % "0.5.0")

addSbtPlugin("io.spray" % "sbt-revolver" % "0.10.0")

libraryDependencies += "org.ow2.asm" % "asm" % "9.7"
libraryDependencies += "org.ow2.asm" % "asm" % "9.7.1"

libraryDependencies += "org.ow2.asm" % "asm-commons" % "9.7"
libraryDependencies += "org.ow2.asm" % "asm-commons" % "9.7.1"

addSbtPlugin("com.simplytyped" % "sbt-antlr4" % "0.8.3")

Expand Down
5 changes: 5 additions & 0 deletions python/pyspark/errors/error-conditions.json
Original file line number Diff line number Diff line change
Expand Up @@ -1103,6 +1103,11 @@
"`<backend>` is not supported, it should be one of the values from <supported_backends>"
]
},
"UNSUPPORTED_PLOT_BACKEND_PARAM": {
"message": [
"`<backend>` does not support `<param>` set to <value>, it should be one of the values from <supported_values>"
]
},
"UNSUPPORTED_SIGNATURE": {
"message": [
"Unsupported signature: <signature>."
Expand Down
Loading
Loading