Skip to content

Commit

Permalink
feat: Add config to enable native upper and lower string conversion (#…
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Aug 3, 2024
1 parent 5b5142b commit b6d2a6f
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 13 deletions.
8 changes: 8 additions & 0 deletions common/src/main/scala/org/apache/comet/CometConf.scala
Original file line number Diff line number Diff line change
Expand Up @@ -401,6 +401,14 @@ object CometConf extends ShimCometConf {
.booleanConf
.createWithDefault(COMET_ANSI_MODE_ENABLED_DEFAULT)

val COMET_CASE_CONVERSION_ENABLED: ConfigEntry[Boolean] =
conf("spark.comet.caseConversion.enabled")
.doc(
"Java uses locale-specific rules when converting strings to upper or lower case and " +
"Rust does not, so we disable upper and lower by default.")
.booleanConf
.createWithDefault(false)

val COMET_CAST_ALLOW_INCOMPATIBLE: ConfigEntry[Boolean] =
conf("spark.comet.cast.allowIncompatible")
.doc(
Expand Down
1 change: 1 addition & 0 deletions docs/source/user-guide/configs.md
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ Comet provides the following configuration settings.
| Config | Description | Default Value |
|--------|-------------|---------------|
| spark.comet.batchSize | The columnar batch size, i.e., the maximum number of rows that a batch can contain. | 8192 |
| spark.comet.caseConversion.enabled | Java uses locale-specific rules when converting strings to upper or lower case and Rust does not, so we disable upper and lower by default. | false |
| spark.comet.cast.allowIncompatible | Comet is not currently fully compatible with Spark for all cast operations. Set this config to true to allow them anyway. See compatibility guide for more information. | false |
| spark.comet.columnar.shuffle.async.enabled | Whether to enable asynchronous shuffle for Arrow-based shuffle. By default, this config is false. | false |
| spark.comet.columnar.shuffle.async.max.thread.num | Maximum number of threads on an executor used for Comet async columnar shuffle. By default, this config is 100. This is the upper bound of total number of shuffle threads per executor. In other words, if the number of cores * the number of shuffle threads per task `spark.comet.columnar.shuffle.async.thread.num` is larger than this config. Comet will use this config as the number of shuffle threads per executor instead. | 100 |
Expand Down
38 changes: 28 additions & 10 deletions spark/src/main/scala/org/apache/comet/serde/QueryPlanSerde.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1898,12 +1898,6 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
val optExpr = scalarExprToProto("length", childExpr)
optExprWithInfo(optExpr, expr, castExpr)

case Lower(child) =>
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs)
val optExpr = scalarExprToProto("lower", childExpr)
optExprWithInfo(optExpr, expr, castExpr)

case Md5(child) =>
val childExpr = exprToProtoInternal(child, inputs)
val optExpr = scalarExprToProto("md5", childExpr)
Expand Down Expand Up @@ -1970,10 +1964,34 @@ object QueryPlanSerde extends Logging with ShimQueryPlanSerde with CometExprShim
trim(expr, srcStr, trimStr, inputs, "btrim")

case Upper(child) =>
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs)
val optExpr = scalarExprToProto("upper", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
if (CometConf.COMET_CASE_CONVERSION_ENABLED.get()) {
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs)
val optExpr = scalarExprToProto("upper", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
} else {
withInfo(
expr,
"Comet is not compatible with Spark for case conversion in " +
s"locale-specific cases. Set ${CometConf.COMET_CASE_CONVERSION_ENABLED.key}=true " +
"to enable it anyway.")
None
}

case Lower(child) =>
if (CometConf.COMET_CASE_CONVERSION_ENABLED.get()) {
val castExpr = Cast(child, StringType)
val childExpr = exprToProtoInternal(castExpr, inputs)
val optExpr = scalarExprToProto("lower", childExpr)
optExprWithInfo(optExpr, expr, castExpr)
} else {
withInfo(
expr,
"Comet is not compatible with Spark for case conversion in " +
s"locale-specific cases. Set ${CometConf.COMET_CASE_CONVERSION_ENABLED.key}=true " +
"to enable it anyway.")
None
}

case BitwiseAnd(left, right) =>
val leftExpr = exprToProtoInternal(left, inputs)
Expand Down
23 changes: 21 additions & 2 deletions spark/src/test/scala/org/apache/comet/CometExpressionSuite.scala
Original file line number Diff line number Diff line change
Expand Up @@ -1128,6 +1128,23 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
}
}

test("Upper and Lower") {
Seq(false, true).foreach { dictionary =>
withSQLConf(
"parquet.enable.dictionary" -> dictionary.toString,
CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true") {
val table = "names"
withTable(table) {
sql(s"create table $table(id int, name varchar(20)) using parquet")
sql(
s"insert into $table values(1, 'James Smith'), (2, 'Michael Rose')," +
" (3, 'Robert Williams'), (4, 'Rames Rose'), (5, 'James Smith')")
checkSparkAnswerAndOperator(s"SELECT name, upper(name), lower(name) FROM $table")
}
}
}
}

test("Various String scalar functions") {
Seq(false, true).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
Expand All @@ -1138,7 +1155,7 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {
s"insert into $table values(1, 'James Smith'), (2, 'Michael Rose')," +
" (3, 'Robert Williams'), (4, 'Rames Rose'), (5, 'James Smith')")
checkSparkAnswerAndOperator(
s"SELECT ascii(name), bit_length(name), octet_length(name), upper(name), lower(name) FROM $table")
s"SELECT ascii(name), bit_length(name), octet_length(name) FROM $table")
}
}
}
Expand Down Expand Up @@ -1218,7 +1235,9 @@ class CometExpressionSuite extends CometTestBase with AdaptiveSparkPlanHelper {

test("trim") {
Seq(false, true).foreach { dictionary =>
withSQLConf("parquet.enable.dictionary" -> dictionary.toString) {
withSQLConf(
"parquet.enable.dictionary" -> dictionary.toString,
CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true") {
val table = "test"
withTable(table) {
sql(s"create table $table(col varchar(20)) using parquet")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -207,7 +207,8 @@ object CometStringExpressionBenchmark extends CometBenchmarkBase {
withSQLConf(
CometConf.COMET_ENABLED.key -> "true",
CometConf.COMET_EXEC_ENABLED.key -> "true",
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true") {
CometConf.COMET_EXEC_ALL_OPERATOR_ENABLED.key -> "true",
CometConf.COMET_CASE_CONVERSION_ENABLED.key -> "true") {
spark.sql("select upper(c1) from parquetV1Table").noop()
}
}
Expand Down

0 comments on commit b6d2a6f

Please sign in to comment.