Skip to content

Commit

Permalink
Add time zone config to set non-UTC [databricks] (#9652)
Browse files Browse the repository at this point in the history
* Add time zone config to set non-UTC

Signed-off-by: Chong Gao <[email protected]>

* Remove the skip logic when time zone is not UTC

* Add default value

* Remove useless is_tz_utc

* Fix bug

* Fix bug: remove skip code for non-UTC time zone

* Use TZ insteand of TEST_TZ to set time zone

* Remove the TZ overwrite logic from date_gen.py

---------

Signed-off-by: Chong Gao <[email protected]>
Co-authored-by: Chong Gao <[email protected]>
  • Loading branch information
res-life and Chong Gao authored Nov 17, 2023
1 parent 244ceab commit 3ef145d
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 35 deletions.
9 changes: 6 additions & 3 deletions integration_tests/run_pyspark_from_build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -223,11 +223,14 @@ else
export PYSP_TEST_spark_jars="${ALL_JARS//:/,}"
fi

# time zone will be tested; use export TZ=time_zone_name before run this script
TZ=${TZ:-UTC}

This comment has been minimized.

Copy link
@jlowe

jlowe Nov 17, 2023

Contributor

This probably needs to export. fastparquet tests fail if run on a machine that is not in the UTC timezone but we're telling the JVM to use UTC, for example. Alternatively, don't force a JVM timezone if it's not specified (the user won't, for example).


# Set the Delta log cache size to prevent the driver from caching every Delta log indefinitely
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=UTC -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS"
export PYSP_TEST_spark_executor_extraJavaOptions='-ea -Duser.timezone=UTC'
export PYSP_TEST_spark_driver_extraJavaOptions="-ea -Duser.timezone=$TZ -Ddelta.log.cacheSize=10 $COVERAGE_SUBMIT_FLAGS"
export PYSP_TEST_spark_executor_extraJavaOptions="-ea -Duser.timezone=$TZ"
export PYSP_TEST_spark_ui_showConsoleProgress='false'
export PYSP_TEST_spark_sql_session_timeZone='UTC'
export PYSP_TEST_spark_sql_session_timeZone=$TZ
export PYSP_TEST_spark_sql_shuffle_partitions='4'
# prevent cluster shape to change
export PYSP_TEST_spark_dynamicAllocation_enabled='false'
Expand Down
9 changes: 9 additions & 0 deletions integration_tests/src/main/python/conftest.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,15 @@ def is_emr_runtime():
def is_dataproc_runtime():
return runtime_env() == "dataproc"

def get_test_tz():
return os.environ.get('TZ', 'UTC')

def is_utc():
return get_test_tz() == "UTC"

def is_not_utc():
return not is_utc()

_is_nightly_run = False
_is_precommit_run = False

Expand Down
33 changes: 11 additions & 22 deletions integration_tests/src/main/python/data_gen.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
from pyspark.sql.types import *
import pyspark.sql.functions as f
import random
from spark_session import is_tz_utc, is_before_spark_340, with_cpu_session
from spark_session import is_before_spark_340, with_cpu_session
import sre_yield
import struct
from conftest import skip_unless_precommit_tests,get_datagen_seed
Expand All @@ -30,11 +30,6 @@
from functools import lru_cache
import hashlib

# set time zone to UTC for timestamp test cases to avoid `datetime` out-of-range error:
# refer to: https://github.com/NVIDIA/spark-rapids/issues/7535
os.environ['TZ'] = 'UTC'
time.tzset()

class DataGen:
"""Base class for data generation"""

Expand Down Expand Up @@ -584,12 +579,18 @@ def __init__(self, start=None, end=None, nullable=True, tzinfo=timezone.utc):
elif not isinstance(start, datetime):
raise RuntimeError('Unsupported type passed in for start {}'.format(start))

# Spark supports time through: "9999-12-31 23:59:59.999999"
# but in order to avoid out-of-range error in non-UTC time zone, here use 9999-12-30 instead of 12-31 as max end
# for details, refer to https://github.com/NVIDIA/spark-rapids/issues/7535
max_end = datetime(9999, 12, 30, 23, 59, 59, 999999, tzinfo=tzinfo)
if end is None:
# Spark supports time through
# "9999-12-31 23:59:59.999999"
end = datetime(9999, 12, 31, 23, 59, 59, 999999, tzinfo=tzinfo)
end = max_end
elif isinstance(end, timedelta):
end = start + end
max_timedelta = max_end - start
if ( end >= max_timedelta):
end = max_end
else:
end = start + end
elif not isinstance(start, date):
raise RuntimeError('Unsupported type passed in for end {}'.format(end))

Expand Down Expand Up @@ -749,10 +750,6 @@ def gen_bytes():
return bytes([ rand.randint(0, 255) for _ in range(length) ])
self._start(rand, gen_bytes)

def skip_if_not_utc():
if (not is_tz_utc()):
skip_unless_precommit_tests('The java system time zone is not set to UTC')

# Note: Current(2023/06/06) maxmium IT data size is 7282688 bytes, so LRU cache with maxsize 128
# will lead to 7282688 * 128 = 932 MB additional memory usage in edge case, which is acceptable.
@lru_cache(maxsize=128, typed=True)
Expand All @@ -776,10 +773,6 @@ def gen_df(spark, data_gen, length=2048, seed=None, num_slices=None):
# we cannot create a data frame from a nullable struct
assert not data_gen.nullable

# Before we get too far we need to verify that we can run with timestamps
if src.contains_ts():
skip_if_not_utc()

data = gen_df_help(src, length, seed_value)

# We use `numSlices` to create an RDD with the specific number of partitions,
Expand Down Expand Up @@ -832,10 +825,6 @@ def _gen_scalars_common(data_gen, count, seed=None):
else:
seed_value = seed

# Before we get too far we need to verify that we can run with timestamps
if src.contains_ts():
skip_if_not_utc()

rand = random.Random(seed_value)
src.start(rand)
return src
Expand Down
10 changes: 0 additions & 10 deletions integration_tests/src/main/python/spark_session.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,16 +55,6 @@ def _from_scala_map(scala_map):
'spark.sql.legacy.allowNegativeScaleOfDecimal': 'true',
}

def is_tz_utc(spark=_spark):
"""
true if the tz is UTC else false
"""
# Now we have to do some kind of ugly internal java stuff
jvm = spark.sparkContext._jvm
utc = jvm.java.time.ZoneId.of('UTC').normalized()
sys_tz = jvm.java.time.ZoneId.systemDefault().normalized()
return utc == sys_tz

def _set_all_confs(conf):
newconf = _default_conf.copy()
if (should_inject_oom()):
Expand Down

0 comments on commit 3ef145d

Please sign in to comment.