Skip to content

Commit

Permalink
Support from_utc_timestamp on the GPU for non-UTC timezones (non-DST) (
Browse files Browse the repository at this point in the history
…#9810)

* Updates to test suite to start using GpuTimeZoneDB

Signed-off-by: Navin Kumar <[email protected]>

* Initialize the GPU timezone database in the plugin initialization for executor, use this in tests

Signed-off-by: Navin Kumar <[email protected]>

* Remove GpuTimeZoneDB init from plugin for now, eventually add again and hide behind a config flag when first expression using it is completed.

Signed-off-by: Navin Kumar <[email protected]>

* Add plugin initialization code and change config flag for non UTC timezone support

Signed-off-by: Navin Kumar <[email protected]>

* Use GpuTimeZoneDB to run from_utc_timestamp on the GPU for non-UTC non-DST timezones

Signed-off-by: Navin Kumar <[email protected]>

* Fix formatting of includes here

Signed-off-by: Navin Kumar <[email protected]>

* Add some Olson timezones to fallback test and add config option to test proper fallback

Signed-off-by: Navin Kumar <[email protected]>

* Add fallback test for when config option is not enabled

Signed-off-by: Navin Kumar <[email protected]>

---------

Signed-off-by: Navin Kumar <[email protected]>
  • Loading branch information
NVnavkumar authored Nov 28, 2023
1 parent 26c9e37 commit 9af612f
Show file tree
Hide file tree
Showing 7 changed files with 157 additions and 74 deletions.
19 changes: 14 additions & 5 deletions integration_tests/src/main/python/date_time_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -286,22 +286,31 @@ def test_from_utc_timestamp(data_gen, time_zone):
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)))

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST"], ids=idfn)
@pytest.mark.parametrize('time_zone', ["Asia/Shanghai", "EST", "MST", "VST", "PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone):
def test_from_utc_timestamp_non_utc_fallback(data_gen, time_zone):
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
'FromUTCTimestamp')

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('time_zone', ["PST", "NST", "AST", "America/Los_Angeles", "America/New_York", "America/Chicago"], ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
def test_from_utc_timestamp_unsupported_timezone_fallback(data_gen, time_zone):
assert_gpu_fallback_collect(
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
'FromUTCTimestamp',
conf = {"spark.rapids.sql.nonUTC.enabled": "true"})


@pytest.mark.parametrize('time_zone', ["UTC", "Asia/Shanghai", "EST", "MST", "VST"], ids=idfn)
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
@pytest.mark.xfail(condition = is_not_utc(), reason = 'xfail non-UTC time zone tests because of https://github.com/NVIDIA/spark-rapids/issues/9653')
def test_from_utc_timestamp_supported_timezones(data_gen, time_zone):
# Remove spark.rapids.test.CPU.timezone configuration when GPU kernel is ready to really test on GPU
# TODO: Remove spark.rapids.sql.nonUTC.enabled configuration
assert_gpu_and_cpu_are_equal_collect(
lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)), conf = {"spark.rapids.test.CPU.timezone": "true"})

lambda spark: unary_op_df(spark, data_gen).select(f.from_utc_timestamp(f.col('a'), time_zone)),
conf = {"spark.rapids.sql.nonUTC.enabled": "true"})

@allow_non_gpu('ProjectExec')
@pytest.mark.parametrize('data_gen', [timestamp_gen], ids=idfn)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -621,6 +621,10 @@ object GpuOverrides extends Logging {
}
}

def isUTCTimezone(timezoneId: ZoneId): Boolean = {
timezoneId.normalized() == UTC_TIMEZONE_ID
}

def areAllSupportedTypes(types: DataType*): Boolean = types.forall(isSupportedType(_))

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import scala.util.Try

import ai.rapids.cudf.{Cuda, CudaException, CudaFatalException, CudfException, MemoryCleaner}
import com.nvidia.spark.rapids.filecache.{FileCache, FileCacheLocalityManager, FileCacheLocalityMsg}
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.python.PythonWorkerSemaphore
import org.apache.commons.lang3.exception.ExceptionUtils

Expand Down Expand Up @@ -504,6 +505,7 @@ class RapidsExecutorPlugin extends ExecutorPlugin with Logging {
}

override def shutdown(): Unit = {
GpuTimeZoneDB.shutdown()
GpuSemaphore.shutdown()
PythonWorkerSemaphore.shutdown()
GpuDeviceManager.shutdown()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2044,6 +2044,13 @@ object RapidsConf {
"The gpu to disk spill bounce buffer must have a positive size")
.createWithDefault(128L * 1024 * 1024)

val NON_UTC_TIME_ZONE_ENABLED =
conf("spark.rapids.sql.nonUTC.enabled")
.doc("An option to enable/disable non-UTC time zone support.")
.internal()
.booleanConf
.createWithDefault(false)

val SPLIT_UNTIL_SIZE_OVERRIDE = conf("spark.rapids.sql.test.overrides.splitUntilSize")
.doc("Only for tests: override the value of GpuDeviceManager.splitUntilSize")
.internal()
Expand All @@ -2056,12 +2063,6 @@ object RapidsConf {
.booleanConf
.createOptional

val TEST_USE_TIMEZONE_CPU_BACKEND = conf("spark.rapids.test.CPU.timezone")
.doc("Only for tests: verify for timezone related functions")
.internal()
.booleanConf
.createOptional

private def printSectionHeader(category: String): Unit =
println(s"\n### $category")

Expand Down Expand Up @@ -2754,6 +2755,8 @@ class RapidsConf(conf: Map[String, String]) extends Logging {

lazy val splitUntilSizeOverride: Option[Long] = get(SPLIT_UNTIL_SIZE_OVERRIDE)

lazy val nonUTCTimeZoneEnabled: Boolean = get(NON_UTC_TIME_ZONE_ENABLED)

private val optimizerDefaults = Map(
// this is not accurate because CPU projections do have a cost due to appending values
// to each row that is produced, but this needs to be a really small number because
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,10 @@ import java.time.ZoneId

import ai.rapids.cudf.{ColumnVector, DType, HostColumnVector}
import com.nvidia.spark.rapids.Arm.withResource
import com.nvidia.spark.rapids.GpuOverrides

import org.apache.spark.sql.catalyst.util.DateTimeUtils

object TimeZoneDB {
def isUTCTimezone(timezoneId: ZoneId): Boolean = {
timezoneId.normalized() == GpuOverrides.UTC_TIMEZONE_ID
}

// Copied from Spark. Used to format time zone ID string with (+|-)h:mm and (+|-)hh:m
def getZoneId(timezoneId: String): ZoneId = {
val formattedZoneId = timezoneId
Expand All @@ -40,7 +35,7 @@ object TimeZoneDB {
}

// Support fixed offset or no transition rule case
def isSupportedTimezone(timezoneId: String): Boolean = {
def isSupportedTimeZone(timezoneId: String): Boolean = {
val rules = getZoneId(timezoneId).getRules
rules.isFixedOffset || rules.getTransitionRules.isEmpty
}
Expand Down Expand Up @@ -153,7 +148,7 @@ object TimeZoneDB {
assert(inputVector.getType == DType.TIMESTAMP_DAYS)
val rowCount = inputVector.getRowCount.toInt
withResource(inputVector.copyToHost()) { input =>
withResource(HostColumnVector.builder(DType.INT64, rowCount)) { builder =>
withResource(HostColumnVector.builder(DType.TIMESTAMP_MICROSECONDS, rowCount)) { builder =>
var currRow = 0
while (currRow < rowCount) {
if (input.isNull(currRow)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,11 @@ import java.time.ZoneId
import java.util.concurrent.TimeUnit

import ai.rapids.cudf.{BinaryOp, CaptureGroups, ColumnVector, ColumnView, DType, RegexProgram, Scalar}
import com.nvidia.spark.rapids.{BinaryExprMeta, BoolUtils, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuBinaryExpressionArgsAnyScalar, GpuCast, GpuColumnVector, GpuExpression, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids.{BinaryExprMeta, BoolUtils, DataFromReplacementRule, DateUtils, GpuBinaryExpression, GpuBinaryExpressionArgsAnyScalar, GpuCast, GpuColumnVector, GpuExpression, GpuOverrides, GpuScalar, GpuUnaryExpression, RapidsConf, RapidsMeta}
import com.nvidia.spark.rapids.Arm._
import com.nvidia.spark.rapids.GpuOverrides.{extractStringLit, getTimeParserPolicy}
import com.nvidia.spark.rapids.RapidsConf.TEST_USE_TIMEZONE_CPU_BACKEND
import com.nvidia.spark.rapids.RapidsPluginImplicits._
import com.nvidia.spark.rapids.jni.GpuTimeZoneDB
import com.nvidia.spark.rapids.shims.ShimBinaryExpression

import org.apache.spark.sql.catalyst.expressions.{BinaryExpression, ExpectsInputTypes, Expression, FromUTCTimestamp, ImplicitCastInputTypes, NullIntolerant, TimeZoneAwareExpression}
Expand Down Expand Up @@ -1046,20 +1046,19 @@ class FromUTCTimestampExprMeta(
extends BinaryExprMeta[FromUTCTimestamp](expr, conf, parent, rule) {

private[this] var timezoneId: ZoneId = null
private[this] val isOnCPU: Boolean = conf.get(TEST_USE_TIMEZONE_CPU_BACKEND).getOrElse(false)
private[this] val nonUTCEnabled: Boolean = conf.nonUTCTimeZoneEnabled

override def tagExprForGpu(): Unit = {
extractStringLit(expr.right) match {
case None =>
willNotWorkOnGpu("timezone input must be a literal string")
case Some(timezoneShortID) =>
if (timezoneShortID != null) {
timezoneId = TimeZoneDB.getZoneId(timezoneShortID)
timezoneId = GpuTimeZoneDB.getZoneId(timezoneShortID)
// Always pass for UTC timezone since it's no-op.
if (!TimeZoneDB.isUTCTimezone(timezoneId)) {
// Check CPU path, mostly for test purpose
if (isOnCPU) {
if(!TimeZoneDB.isSupportedTimezone(timezoneShortID)) {
if (!GpuOverrides.isUTCTimezone(timezoneId)) {
if (nonUTCEnabled) {
if(!GpuTimeZoneDB.isSupportedTimeZone(timezoneShortID)) {
willNotWorkOnGpu(s"Not supported timezone type $timezoneShortID.")
}
} else {
Expand All @@ -1072,11 +1071,11 @@ class FromUTCTimestampExprMeta(
}

override def convertToGpu(timestamp: Expression, timezone: Expression): GpuExpression =
GpuFromUTCTimestamp(timestamp, timezone, timezoneId, isOnCPU)
GpuFromUTCTimestamp(timestamp, timezone, timezoneId, nonUTCEnabled)
}

case class GpuFromUTCTimestamp(
timestamp: Expression, timezone: Expression, zoneId: ZoneId, isOnCPU: Boolean)
timestamp: Expression, timezone: Expression, zoneId: ZoneId, nonUTCEnabled: Boolean)
extends GpuBinaryExpressionArgsAnyScalar
with ImplicitCastInputTypes
with NullIntolerant {
Expand All @@ -1088,12 +1087,12 @@ case class GpuFromUTCTimestamp(

override def doColumnar(lhs: GpuColumnVector, rhs: GpuScalar): ColumnVector = {
if (rhs.getBase.isValid) {
if (TimeZoneDB.isUTCTimezone(zoneId)) {
if (GpuOverrides.isUTCTimezone(zoneId)) {
// For UTC timezone, just a no-op bypassing GPU computation.
lhs.getBase.incRefCount()
} else {
if (isOnCPU){
TimeZoneDB.fromUtcTimestampToTimestamp(lhs.getBase, zoneId)
if (nonUTCEnabled){
GpuTimeZoneDB.fromUtcTimestampToTimestamp(lhs.getBase, zoneId)
} else {
// TODO: remove this until GPU backend supported.
throw new UnsupportedOperationException(
Expand Down
Loading

0 comments on commit 9af612f

Please sign in to comment.