Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

#2175: Add ability to configure how spark handles dates in parquet files #2184

Open
wants to merge 4 commits into
base: develop
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions scripts/bash/_print_help.sh
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ echo " --executor-memory MEM Memory per executor (e.g. 1000M
echo " --dra-num-executors NUM Same as '--num-executors' but used when DRA is enabled. Use with care! DRA won't scale below this NUM."
echo " --dra-executor-cores NUM Same as '--executor-memory' but used when DRA is enabled."
echo " --dra-executor-memory MEM Same as '--executor-cores' but used when DRA is enabled."
echo " --parquet-datetime-read-mode Spark_submit datetime read mode for parquet files with the default value of 'corrected'."
echo " --parquet-datetime-write-mode Spark_submit datetime write mode for parquet files with the default value of 'corrected'."
echo " --master MASTER_URL spark://host:port, mesos://host:port, yarn, k8s://https://host:port, or local"
echo " --deploy-mode DEPLOY_MODE Whether to launch the driver program locally (\"client\") or on one of the worker machines inside the cluster (\"cluster\")."
echo " --driver-cores NUM Number of cores used by the driver, only in cluster mode."
Expand Down
3 changes: 3 additions & 0 deletions scripts/bash/enceladus_env.template.sh
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,9 @@ CONF_DEFAULT_DRA_MIN_EXECUTORS=0
CONF_DEFAULT_DRA_ALLOCATION_RATIO=0.5
CONF_DEFAULT_ADAPTIVE_TARGET_POSTSHUFFLE_INPUT_SIZE=134217728

DEFAULT_PARQUET_DATETIME_READ_MODE="CORRECTED"
DEFAULT_PARQUET_DATETIME_WRITE_MODE="CORRECTED"

DEFAULT_DEPLOY_MODE="client"

LOG_DIR="/tmp"
Expand Down
15 changes: 15 additions & 0 deletions scripts/bash/run_enceladus.sh
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,8 @@ DRA_MIN_EXECUTORS="$DEFAULT_DRA_MIN_EXECUTORS"
DRA_MAX_EXECUTORS="$DEFAULT_DRA_MAX_EXECUTORS"
DRA_ALLOCATION_RATIO="$DEFAULT_DRA_ALLOCATION_RATIO"
ADAPTIVE_TARGET_POSTSHUFFLE_INPUT_SIZE="$DEFAULT_ADAPTIVE_TARGET_POSTSHUFFLE_INPUT_SIZE"
PARQUET_DATETIME_WRITE_MODE="$DEFAULT_PARQUET_DATETIME_WRITE_MODE"
PARQUET_DATETIME_READ_MODE="$DEFAULT_PARQUET_DATETIME_READ_MODE"

# Command like default for the job
JAR=${SPARK_JOBS_JAR_OVERRIDE:-$SPARK_JOBS_JAR}
Expand Down Expand Up @@ -118,6 +120,14 @@ case $key in
DRA_EXECUTOR_MEMORY="$2"
shift 2 # past argument and value
;;
--parquet-datetime-read-mode)
PARQUET_DATETIME_READ_MODE="$2"
shift 2 # past argument and value
;;
--parquet-datetime-write-mode)
PARQUET_DATETIME_WRITE_MODE="$2"
shift 2 # past argument and value
;;
--master)
MASTER="$2"
shift 2 # past argument and value
Expand Down Expand Up @@ -477,6 +487,11 @@ else
add_to_cmd_line "--executor-cores" "${EXECUTOR_CORES}"
fi

add_spark_conf_cmd "spark.sql.parquet.datetimeRebaseModeInRead" "${PARQUET_DATETIME_READ_MODE}"
add_spark_conf_cmd "spark.sql.parquet.datetimeRebaseModeInWrite" "${PARQUET_DATETIME_WRITE_MODE}"
add_spark_conf_cmd "spark.sql.parquet.int96RebaseModeInRead" "${PARQUET_DATETIME_READ_MODE}"
add_spark_conf_cmd "spark.sql.parquet.int96RebaseModeInWrite" "${PARQUET_DATETIME_WRITE_MODE}"

JVM_CONF="spark.driver.extraJavaOptions=-Dstandardized.hdfs.path=$STD_HDFS_PATH \
-Dspline.mongodb.url=$SPLINE_MONGODB_URL -Dspline.mongodb.name=$SPLINE_MONGODB_NAME -Dhdp.version=$HDP_VERSION \
$MT_PATTERN $MIN_PARTITION_SIZE $MAX_PARTITION_SIZE"
Expand Down
19 changes: 19 additions & 0 deletions scripts/cmd/_run_enceladus.cmd
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ SET EXECUTOR_CORES=%DEFAULT_EXECUTOR_CORES%
SET EXECUTOR_MEMORY=%EFAULT_EXECUTOR_MEMORY%
SET DRA_EXECUTOR_CORES=%DEFAULT_DRA_EXECUTOR_CORES%
SET DRA_EXECUTOR_MEMORY=%DEFAULT_DRA_EXECUTOR_MEMORY%
SET PARQUET_DATETIME_READ_MODE=%DEFAULT_PARQUET_DATETIME_READ_MODE%
SET PARQUET_DATETIME_WRITE_MODE=%DEFAULT_PARQUET_DATETIME_WRITE_MODE%
SET NUM_EXECUTORS=%DEFAULT_NUM_EXECUTORS%
SET DRA_NUM_EXECUTORS=
SET FILES=%ENCELADUS_FILES%
Expand Down Expand Up @@ -131,6 +133,18 @@ IF "%1"=="--dra-executor-memory" (
SHIFT
GOTO CmdParse
)
IF "%1"=="--parquet-datetime-read-mode" (
SET PARQUET_DATETIME_READ_MODE=%2
SHIFT
SHIFT
GOTO CmdParse
)
IF "%1"=="--parquet-datetime-write-mode" (
SET PARQUET_DATETIME_WRITE_MODE=%2
SHIFT
SHIFT
GOTO CmdParse
)
IF "%1"=="--master" (
SET MASTER=%2
SHIFT
Expand Down Expand Up @@ -561,6 +575,11 @@ IF %DRA_ENABLED%==true (
IF DEFINED EXECUTOR_CORES SET CMD_LINE=%CMD_LINE% --executor-cores %EXECUTOR_CORES%
)

SET SPARK_CONF=%SPARK_CONF% --conf spark.sql.parquet.datetimeRebaseModeInRead=%PARQUET_DATETIME_READ_MODE%
SET SPARK_CONF=%SPARK_CONF% --conf spark.sql.parquet.datetimeRebaseModeInWrite=%PARQUET_DATETIME_WRITE_MODE%
SET SPARK_CONF=%SPARK_CONF% --conf spark.sql.parquet.int96RebaseModeInRead=%PARQUET_DATETIME_READ_MODE%
SET SPARK_CONF=%SPARK_CONF% --conf spark.sql.parquet.int96RebaseModeInWrite=%PARQUET_DATETIME_WRITE_MODE%

SET JVM_CONF=spark.driver.extraJavaOptions=-Dstandardized.hdfs.path=%STD_HDFS_PATH% -Dspline.mongodb.url=%SPLINE_MONGODB_URL% -Dspline.mongodb.name=%SPLINE_MONGODB_NAME% -Dhdp.version=%HDP_VERSION% %MT_PATTERN% %MIN_BLOCK_SIZE% %MAX_BLOCK_SIZE%

SET CMD_LINE=%SPARK_SUBMIT%
Expand Down