diff --git a/CONTRIBUTING.md b/CONTRIBUTING.md index 1bb9fdf6b0..baea783808 100644 --- a/CONTRIBUTING.md +++ b/CONTRIBUTING.md @@ -235,12 +235,12 @@ and build inside WSL2, e.g. ### Testing Java tests are in the `src/test` directory and c++ tests are in the `src/main/cpp/tests` directory. The c++ tests are built with the `-DBUILD_TESTS` command line option and will build into the -`target/cmake-build/gtests/` directory. Due to building inside the docker container, it is possible +`target/jni/cmake-build/gtests/` directory. Due to building inside the docker container, it is possible that the host environment does not match the container well enough to run these executables, resulting in errors finding libraries. The script `build/run-in-docker` was created to help with this situation. A test can be run directly using this script or the script can be run without any arguments to get into an interactive shell inside the container. -```build/run-in-docker target/cmake-build/gtests/ROW_CONVERSION``` +```build/run-in-docker target/jni/cmake-build/gtests/ROW_CONVERSION``` #### Testing with Compute Sanitizer [Compute Sanitizer](https://docs.nvidia.com/compute-sanitizer/ComputeSanitizer/index.html) is a @@ -311,12 +311,12 @@ in the cuDF [CONTRIBUTING](thirdparty/cudf/CONTRIBUTING.md) guide. ### Benchmarks Benchmarks exist for c++ benchmarks using NVBench and are in the `src/main/cpp/benchmarks` directory. To build these benchmarks requires the `-DBUILD_BENCHMARKS` build option. Once built, the benchmarks -can be found in the `target/cmake-build/benchmarks/` directory. Due to building inside the docker +can be found in the `target/jni/cmake-build/benchmarks/` directory. Due to building inside the docker container, it is possible that the host environment does not match the container well enough to run these executables, resulting in errors finding libraries. The script `build/run-in-docker` was created to help with this situation. A benchmark can be run directly using this script or the script can be run without any arguments to get into an interactive shell inside the container. -```build/run-in-docker target/cmake-build/benchmarks/ROW_CONVERSION_BENCH``` +```build/run-in-docker target/jni/cmake-build/benchmarks/ROW_CONVERSION_BENCH``` ## Code contributions ### Your first issue diff --git a/build/buildcpp.sh b/build/buildcpp.sh new file mode 100755 index 0000000000..0789a800d5 --- /dev/null +++ b/build/buildcpp.sh @@ -0,0 +1,144 @@ +#!/bin/bash +# +# Copyright (c) 2024, NVIDIA CORPORATION. All rights reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# + +# +# Script to build native code in cudf and spark-rapids-jni +# + +set -e + +if [[ $FROM_MAVEN == "true" ]]; then + echo "Building native libraries. To rerun outside Maven enter the build environment via + +$ ./build/run-in-docker + +then run + +$ REUSE_ENV=true $0 +" +fi + +# Disable items on arm64 due to missing dependencies in the CUDA toolkit +if [ "$(uname -m)" == "aarch64" ]; then + USE_GDS="OFF" # cuFile RDMA libraries are missing + BUILD_FAULTINJ="OFF" # libcupti_static.a is missing +fi + +# Environment variables to control the build +PROJECT_BASE_DIR=${PROJECT_BASE_DIR:-$(realpath $(dirname $0)/..)} +PROJECT_BUILD_DIR=${PROJECT_BUILD_DIR:-$PROJECT_BASE_DIR/target} +if [[ "$REUSE_ENV" != "true" ]]; then + echo " +BUILD_BENCHMARKS=${BUILD_BENCHMARKS:-ON} +BUILD_CUDF_BENCHMARKS=${BUILD_CUDF_BENCHMARKS:-OFF} +BUILD_CUDF_TESTS=${BUILD_CUDF_TESTS:-OFF} +BUILD_FAULTINJ=${BUILD_FAULTINJ:-ON} +BUILD_PROFILER=${BUILD_PROFILER:-ON} +BUILD_TESTS=${BUILD_TESTS:-ON} +export CMAKE_GENERATOR=${CMAKE_GENERATOR:-Ninja} +CPP_PARALLEL_LEVEL=${CPP_PARALLEL_LEVEL:-10} +CUDF_BUILD_TYPE=${CUDF_BUILD_TYPE:-Release} +CUDF_PATH=${CUDF_PATH:-$PROJECT_BASE_DIR/thirdparty/cudf} +CUDF_PIN_PATH=${CUDF_PIN_PATH:-$PROJECT_BASE_DIR/thirdparty/cudf-pins} +CUDF_USE_PER_THREAD_DEFAULT_STREAM=${CUDF_USE_PER_THREAD_DEFAULT_STREAM:-ON} +GPU_ARCHS=${GPU_ARCHS:-RAPIDS} +LIBCUDF_BUILD_CONFIGURE=${LIBCUDF_BUILD_CONFIGURE:-false} +LIBCUDF_BUILD_PATH=${LIBCUDF_BUILD_PATH:-$PROJECT_BUILD_DIR/libcudf/cmake-build} +LIBCUDF_DEPENDENCY_MODE=${LIBCUDF_DEPENDENCY_MODE:-pinned} +LIBCUDF_INSTALL_PATH=${LIBCUDF_INSTALL_PATH:-$PROJECT_BUILD_DIR/libcudf-install} +LIBCUDFJNI_BUILD_PATH=${LIBCUDFJNI_BUILD_PATH:-$PROJECT_BUILD_DIR/libcudfjni} +SPARK_JNI_BUILD_PATH=${SPARK_JNI_BUILD_PATH:-$PROJECT_BUILD_DIR/jni/cmake-build} +RMM_LOGGING_LEVEL=${RMM_LOGGING_LEVEL:-OFF} +USE_GDS=${USE_GDS:-OFF}" > "$PROJECT_BUILD_DIR/buildcpp-env.sh" +fi + +source "$PROJECT_BUILD_DIR/buildcpp-env.sh" + +# +# libcudf build +# +mkdir -p "$LIBCUDF_INSTALL_PATH" "$LIBCUDF_BUILD_PATH" +cd "$LIBCUDF_BUILD_PATH" + +# Skip explicit cudf cmake configuration if it appears it has already configured +if [[ $LIBCUDF_BUILD_CONFIGURE == true || ! -f $LIBCUDF_BUILD_PATH/CMakeCache.txt ]]; then + echo "Configuring cudf native libs" + cmake "$CUDF_PATH/cpp" \ + -DBUILD_BENCHMARKS="$BUILD_CUDF_BENCHMARKS" \ + -DBUILD_SHARED_LIBS=OFF \ + -DBUILD_TESTS="$BUILD_CUDF_TESTS" \ + -DCMAKE_BUILD_TYPE="$CUDF_BUILD_TYPE" \ + -DCMAKE_CUDA_ARCHITECTURES="$GPU_ARCHS" \ + -DCMAKE_INSTALL_PREFIX="$LIBCUDF_INSTALL_PATH" \ + -DCUDF_DEPENDENCY_PIN_MODE="$LIBCUDF_DEPENDENCY_MODE" \ + -DCUDA_STATIC_CUFILE=ON \ + -DCUDA_STATIC_RUNTIME=ON \ + -DCUDF_USE_PER_THREAD_DEFAULT_STREAM="$CUDF_USE_PER_THREAD_DEFAULT_STREAM" \ + -DCUDF_KVIKIO_REMOTE_IO=OFF \ + -DCUDF_LARGE_STRINGS_DISABLED=ON \ + -DLIBCUDF_LOGGING_LEVEL="$RMM_LOGGING_LEVEL" \ + -DRMM_LOGGING_LEVEL="$RMM_LOGGING_LEVEL" \ + -C="$CUDF_PIN_PATH/setup.cmake" +fi +echo "Building cudf native libs" +cmake --build "$LIBCUDF_BUILD_PATH" --target install "-j$CPP_PARALLEL_LEVEL" + +# +# libcudfjni build +# +mkdir -p "$LIBCUDFJNI_BUILD_PATH" +cd "$LIBCUDFJNI_BUILD_PATH" +echo "Configuring cudfjni native libs" +CUDF_CPP_BUILD_DIR="$LIBCUDF_BUILD_PATH" CUDF_ROOT="$CUDF_PATH" cmake \ + "$CUDF_PATH/java/src/main/native" \ + -DBUILD_SHARED_LIBS=OFF \ + -DCUDA_STATIC_CUFILE=ON \ + -DCUDA_STATIC_RUNTIME=ON \ + -DCUDF_DEPENDENCY_PIN_MODE=pinned \ + -DCUDF_JNI_LIBCUDF_STATIC=ON \ + -DCUDF_USE_PER_THREAD_DEFAULT_STREAM="$CUDF_USE_PER_THREAD_DEFAULT_STREAM" \ + -DGPU_ARCHS="$GPU_ARCHS" \ + -DRMM_LOGGING_LEVEL="$RMM_LOGGING_LEVEL" \ + -DUSE_GDS="$USE_GDS" \ + -C="$CUDF_PIN_PATH/setup.cmake" +echo "Building cudfjni native libs" +cmake --build "$LIBCUDFJNI_BUILD_PATH" "-j$CPP_PARALLEL_LEVEL" + +# +# sparkjni build +# +mkdir -p "$SPARK_JNI_BUILD_PATH" +cd "$SPARK_JNI_BUILD_PATH" +echo "Configuring spark-rapids-jni native libs" +CUDF_CPP_BUILD_DIR="$LIBCUDF_BUILD_PATH" \ + CUDF_ROOT="$CUDF_PATH" \ + CUDF_INSTALL_DIR="$LIBCUDF_INSTALL_PATH" \ + CUDFJNI_BUILD_DIR="$LIBCUDFJNI_BUILD_PATH" \ + cmake \ + "$PROJECT_BASE_DIR/src/main/cpp" \ + -DBUILD_BENCHMARKS="$BUILD_BENCHMARKS" \ + -DBUILD_FAULTINJ="$BUILD_FAULTINJ" \ + -DBUILD_PROFILER="$BUILD_PROFILER" \ + -DBUILD_TESTS="$BUILD_TESTS" \ + -DCUDF_DEPENDENCY_PIN_MODE=pinned \ + -DCUDF_USE_PER_THREAD_DEFAULT_STREAM="$CUDF_USE_PER_THREAD_DEFAULT_STREAM" \ + -DGPU_ARCHS="$GPU_ARCHS" \ + -DRMM_LOGGING_LEVEL="$RMM_LOGGING_LEVEL" \ + -DUSE_GDS="$USE_GDS" \ + -C="$CUDF_PIN_PATH/setup.cmake" +echo "Building spark-rapids-jni native libs" +cmake --build "$SPARK_JNI_BUILD_PATH" "-j$CPP_PARALLEL_LEVEL" diff --git a/ci/submodule-sync.sh b/ci/submodule-sync.sh index 8d4089a6c7..4efb917382 100755 --- a/ci/submodule-sync.sh +++ b/ci/submodule-sync.sh @@ -75,7 +75,7 @@ set +e # Don't do a full build. Just try to update/build CUDF with no patches on top of it. # calling the antrun directly skips applying patches and also only builds # libcudf -${MVN} antrun:run@build-libcudf ${MVN_MIRROR} \ +${MVN} antrun:run@buildcpp ${MVN_MIRROR} \ -DCPP_PARALLEL_LEVEL=${PARALLEL_LEVEL} \ -Dlibcudf.build.configure=true \ -Dlibcudf.dependency.mode=latest \ diff --git a/pom.xml b/pom.xml index 72de8435db..4e8844d10a 100644 --- a/pom.xml +++ b/pom.xml @@ -430,123 +430,36 @@ - build-libcudf - validate - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - run - - - - build-libcudfjni - validate - - - - - - - - - - - - - - - - - - - - - - - - - - - run - - - - build-sparkrapidsjni + buildcpp validate - - - - - - - - - - - - - - - - - - - - - - + executable="build/buildcpp.sh"> + + + + + + + + + + + + + + + + + + + + + + + diff --git a/src/main/cpp/CMakeLists.txt b/src/main/cpp/CMakeLists.txt index 70c9cd2a59..bb6f9d0ac6 100644 --- a/src/main/cpp/CMakeLists.txt +++ b/src/main/cpp/CMakeLists.txt @@ -132,6 +132,9 @@ include(${CUDF_DIR}/cpp/cmake/thirdparty/get_nvtx.cmake) # find CCCL include(${CUDF_DIR}/cpp/cmake/thirdparty/get_cccl.cmake) +# find spdlog +include(${CMAKE_SOURCE_DIR}/cmake/get_spdlog.cmake) + # JNI find_package(JNI REQUIRED) if(JNI_FOUND) @@ -190,7 +193,7 @@ add_library( src/BloomFilterJni.cpp src/CaseWhenJni.cpp src/CastStringJni.cpp - src/DateTimeRebaseJni.cpp + src/DateTimeUtilsJni.cpp src/DecimalUtilsJni.cpp src/GpuTimeZoneDBJni.cpp src/HashJni.cpp @@ -212,6 +215,7 @@ add_library( src/cast_string.cu src/cast_string_to_float.cu src/datetime_rebase.cu + src/datetime_truncate.cu src/decimal_utils.cu src/format_float.cu src/from_json_to_raw_map.cu @@ -280,12 +284,11 @@ target_link_libraries( ${CUDFJNI_LIB} cudf::cudf nvtx3::nvtx3-cpp + spdlog::spdlog_header_only -Wl,--no-whole-archive ${ARROW_LIB} ${PARQUET_LIB} ${THRIFT_LIB} - PUBLIC rmm::rmm - PRIVATE $ ) rapids_cuda_set_runtime(spark_rapids_jni USE_STATIC ON) set_target_properties(spark_rapids_jni PROPERTIES LINK_LANGUAGE "CXX") diff --git a/src/main/cpp/cmake/get_spdlog.cmake b/src/main/cpp/cmake/get_spdlog.cmake new file mode 100644 index 0000000000..2b9e492c8b --- /dev/null +++ b/src/main/cpp/cmake/get_spdlog.cmake @@ -0,0 +1,25 @@ +# ============================================================================= +# Copyright (c) 2024, NVIDIA CORPORATION. +# +# Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except +# in compliance with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software distributed under the License +# is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express +# or implied. See the License for the specific language governing permissions and limitations under +# the License. +# ============================================================================= + +# Use CPM to find or clone speedlog +function(find_and_configure_spdlog) + + include(${rapids-cmake-dir}/cpm/spdlog.cmake) + rapids_cpm_spdlog( + FMT_OPTION "EXTERNAL_FMT_HO" + ) + +endfunction() + +find_and_configure_spdlog() diff --git a/src/main/cpp/faultinj/README.md b/src/main/cpp/faultinj/README.md index 08ac1e0285..a22748eac8 100644 --- a/src/main/cpp/faultinj/README.md +++ b/src/main/cpp/faultinj/README.md @@ -33,7 +33,7 @@ Spark local mode is a single CUDA process. We can test is as any standalone single-process application. ```bash -CUDA_INJECTION64_PATH=$PWD/target/cmake-build/faultinj/libcufaultinj.so \ +CUDA_INJECTION64_PATH=$PWD/target/jni/cmake-build/faultinj/libcufaultinj.so \ FAULT_INJECTOR_CONFIG_PATH=src/test/cpp/faultinj/test_faultinj.json \ $SPARK_HOME/bin/pyspark \ --jars $SPARK_RAPIDS_REPO/dist/target/rapids-4-spark_2.12-22.08.0-SNAPSHOT-cuda11.jar \ @@ -44,7 +44,7 @@ $SPARK_HOME/bin/pyspark \ $SPARK_HOME/bin/spark-shell \ --jars $SPARK_RAPIDS_REPO/dist/target/rapids-4-spark_2.12-22.08.0-SNAPSHOT-cuda11.jar \ --conf spark.plugins=com.nvidia.spark.SQLPlugin \ - --files ./target/cmake-build/faultinj/libcufaultinj.so,./src/test/cpp/faultinj/test_faultinj.json \ + --files ./target/jni/cmake-build/faultinj/libcufaultinj.so,./src/test/cpp/faultinj/test_faultinj.json \ --conf spark.executorEnv.CUDA_INJECTION64_PATH=./libcufaultinj.so \ --conf spark.executorEnv.FAULT_INJECTOR_CONFIG_PATH=test_faultinj.json \ --conf spark.rapids.memory.gpu.minAllocFraction=0 \ diff --git a/src/main/cpp/src/DateTimeRebaseJni.cpp b/src/main/cpp/src/DateTimeUtilsJni.cpp similarity index 52% rename from src/main/cpp/src/DateTimeRebaseJni.cpp rename to src/main/cpp/src/DateTimeUtilsJni.cpp index f72e9d7e7b..551ace74d5 100644 --- a/src/main/cpp/src/DateTimeRebaseJni.cpp +++ b/src/main/cpp/src/DateTimeUtilsJni.cpp @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -15,11 +15,11 @@ */ #include "cudf_jni_apis.hpp" -#include "datetime_rebase.hpp" +#include "datetime_utils.hpp" extern "C" { -JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_DateTimeRebase_rebaseGregorianToJulian( +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_DateTimeUtils_rebaseGregorianToJulian( JNIEnv* env, jclass, jlong input) { JNI_NULL_CHECK(env, input, "input column is null", 0); @@ -33,7 +33,7 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_DateTimeRebase_rebaseGr CATCH_STD(env, 0); } -JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_DateTimeRebase_rebaseJulianToGregorian( +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_DateTimeUtils_rebaseJulianToGregorian( JNIEnv* env, jclass, jlong input) { JNI_NULL_CHECK(env, input, "input column is null", 0); @@ -47,4 +47,36 @@ JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_DateTimeRebase_rebaseJu CATCH_STD(env, 0); } +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_DateTimeUtils_truncateWithColumnFormat( + JNIEnv* env, jclass, jlong datetime, jlong format) +{ + JNI_NULL_CHECK(env, datetime, "input datetime is null", 0); + JNI_NULL_CHECK(env, format, "input format is null", 0); + + try { + cudf::jni::auto_set_device(env); + + auto const datetime_cv = reinterpret_cast(datetime); + auto const format_cv = reinterpret_cast(format); + return reinterpret_cast(spark_rapids_jni::truncate(*datetime_cv, *format_cv).release()); + } + CATCH_STD(env, 0); +} + +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_DateTimeUtils_truncateWithScalarFormat( + JNIEnv* env, jclass, jlong datetime, jstring format) +{ + JNI_NULL_CHECK(env, datetime, "input datetime is null", 0); + + try { + cudf::jni::auto_set_device(env); + + auto const datetime_cv = reinterpret_cast(datetime); + auto const format_jstr = cudf::jni::native_jstring(env, format); + auto const format = std::string(format_jstr.get(), format_jstr.size_bytes()); + return reinterpret_cast(spark_rapids_jni::truncate(*datetime_cv, format).release()); + } + CATCH_STD(env, 0); +} + } // extern "C" diff --git a/src/main/cpp/src/HashJni.cpp b/src/main/cpp/src/HashJni.cpp index c0adf38686..520b6f24c0 100644 --- a/src/main/cpp/src/HashJni.cpp +++ b/src/main/cpp/src/HashJni.cpp @@ -21,6 +21,11 @@ extern "C" { +JNIEXPORT jint JNICALL Java_com_nvidia_spark_rapids_jni_Hash_getMaxStackDepth(JNIEnv* env, jclass) +{ + return spark_rapids_jni::MAX_STACK_DEPTH; +} + JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_Hash_murmurHash32( JNIEnv* env, jclass, jint seed, jlongArray column_handles) { diff --git a/src/main/cpp/src/datetime_rebase.cu b/src/main/cpp/src/datetime_rebase.cu index 976c9b1530..d850634848 100644 --- a/src/main/cpp/src/datetime_rebase.cu +++ b/src/main/cpp/src/datetime_rebase.cu @@ -14,20 +14,16 @@ * limitations under the License. */ -#include "datetime_rebase.hpp" +#include "datetime_utils.hpp" -// #include #include -#include #include -#include +#include -// #include #include -// #include #include #include @@ -343,32 +339,34 @@ std::unique_ptr julian_to_gregorian_micros(cudf::column_view const namespace spark_rapids_jni { -std::unique_ptr rebase_gregorian_to_julian(cudf::column_view const& input) +std::unique_ptr rebase_gregorian_to_julian(cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + CUDF_FUNC_RANGE(); + auto const type = input.type().id(); CUDF_EXPECTS( type == cudf::type_id::TIMESTAMP_DAYS || type == cudf::type_id::TIMESTAMP_MICROSECONDS, "The input must be either day or microsecond timestamps to rebase."); - if (input.size() == 0) { return cudf::empty_like(input); } - - auto const stream = cudf::get_default_stream(); - auto const mr = rmm::mr::get_current_device_resource(); + if (input.size() == 0) { return cudf::make_empty_column(input.type()); } return type == cudf::type_id::TIMESTAMP_DAYS ? gregorian_to_julian_days(input, stream, mr) : gregorian_to_julian_micros(input, stream, mr); } -std::unique_ptr rebase_julian_to_gregorian(cudf::column_view const& input) +std::unique_ptr rebase_julian_to_gregorian(cudf::column_view const& input, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) { + CUDF_FUNC_RANGE(); + auto const type = input.type().id(); CUDF_EXPECTS( type == cudf::type_id::TIMESTAMP_DAYS || type == cudf::type_id::TIMESTAMP_MICROSECONDS, "The input must be either day or microsecond timestamps to rebase."); - if (input.size() == 0) { return cudf::empty_like(input); } - - auto const stream = cudf::get_default_stream(); - auto const mr = rmm::mr::get_current_device_resource(); + if (input.size() == 0) { return cudf::make_empty_column(input.type()); } return type == cudf::type_id::TIMESTAMP_DAYS ? julian_to_gregorian_days(input, stream, mr) : julian_to_gregorian_micros(input, stream, mr); } diff --git a/src/main/cpp/src/datetime_rebase.hpp b/src/main/cpp/src/datetime_rebase.hpp deleted file mode 100644 index 14451dfbb1..0000000000 --- a/src/main/cpp/src/datetime_rebase.hpp +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -#include - -namespace spark_rapids_jni { - -std::unique_ptr rebase_gregorian_to_julian(cudf::column_view const& input); - -std::unique_ptr rebase_julian_to_gregorian(cudf::column_view const& input); - -} // namespace spark_rapids_jni diff --git a/src/main/cpp/src/datetime_truncate.cu b/src/main/cpp/src/datetime_truncate.cu new file mode 100644 index 0000000000..41dc4404a3 --- /dev/null +++ b/src/main/cpp/src/datetime_truncate.cu @@ -0,0 +1,396 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "datetime_utils.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +#include + +namespace spark_rapids_jni { + +namespace detail { + +namespace { +/** + * @brief The date/time format to perform truncation. + * + * The format must match the descriptions in the Apache Spark documentation: + * - https://spark.apache.org/docs/latest/api/sql/index.html#trunc + * - https://spark.apache.org/docs/latest/api/sql/index.html#date_trunc + */ +enum class truncation_format : uint8_t { + YEAR, + YYYY, + YY, + QUARTER, + MONTH, + MM, + MON, + WEEK, + DAY, + DD, + HOUR, + MINUTE, + SECOND, + MILLISECOND, + MICROSECOND, + INVALID +}; + +// Convert an ASCII character into uppercase. +__host__ __device__ char to_upper(char const c) +{ + return ('a' <= c && c <= 'z') ? static_cast(static_cast(c ^ 0x20)) : c; +} + +// Parse the truncation format from a string. +__host__ __device__ truncation_format parse_format(char const* fmt_data, cudf::size_type fmt_size) +{ + // This must be kept in sync with the `truncation_format` enum. + char const* components[] = {"YEAR", + "YYYY", + "YY", + "QUARTER", + "MONTH", + "MM", + "MON", + "WEEK", + "DAY", + "DD", + "HOUR", + "MINUTE", + "SECOND", + "MILLISECOND", + "MICROSECOND"}; + // Manually calculate sizes of the strings since `strlen` is not available in device code. + cudf::size_type constexpr comp_sizes[] = {4, 4, 2, 7, 5, 2, 3, 4, 3, 2, 4, 6, 6, 11, 11}; + auto constexpr num_components = std::size(components); + + for (std::size_t comp_idx = 0; comp_idx < num_components; ++comp_idx) { + if (fmt_size != comp_sizes[comp_idx]) { continue; } + auto const* ptr = components[comp_idx]; + bool equal = true; + for (cudf::size_type idx = 0; idx < fmt_size; ++idx) { + if (to_upper(fmt_data[idx]) != ptr[idx]) { + equal = false; + break; + } + } + if (equal) { return static_cast(comp_idx); } + } + return truncation_format::INVALID; +} + +// Truncate the given month to the first month of the quarter. +__device__ inline uint32_t trunc_quarter_month(uint32_t month) +{ + auto const zero_based_month = month - 1u; + return (zero_based_month / 3u) * 3u + 1u; +} + +// Truncate the given day to the previous Monday. +__device__ inline cuda::std::chrono::sys_days trunc_to_monday( + cuda::std::chrono::sys_days const days_since_epoch) +{ + // Define our `MONDAY` constant as `cuda::std::chrono::Monday` is not available in device code. + // [0, 6] => [Sun, Sat] + auto constexpr MONDAY = cuda::std::chrono::weekday{1}; + auto const weekday = cuda::std::chrono::weekday{days_since_epoch}; + auto const days_to_subtract = weekday - MONDAY; // [-1, 5] + + if (days_to_subtract.count() == 0) { return days_since_epoch; } + + // If the input is a Sunday (weekday == 0), we have `days_to_subtract` negative thus + // we need to subtract 6 days to get the previous Monday. + return days_to_subtract.count() > 0 ? days_since_epoch - days_to_subtract + : days_since_epoch - cuda::std::chrono::days{6}; +} + +template +__device__ inline cuda::std::optional trunc_date( + cuda::std::chrono::sys_days const days_since_epoch, + cuda::std::chrono::year_month_day const ymd, + truncation_format const fmt) +{ + using namespace cuda::std::chrono; + switch (fmt) { + case truncation_format::YEAR: + case truncation_format::YYYY: + case truncation_format::YY: + return Timestamp{sys_days{year_month_day{ymd.year(), month{1}, day{1}}}}; + case truncation_format::QUARTER: + return Timestamp{sys_days{year_month_day{ + ymd.year(), month{trunc_quarter_month(static_cast(ymd.month()))}, day{1}}}}; + case truncation_format::MONTH: + case truncation_format::MM: + case truncation_format::MON: + return Timestamp{sys_days{year_month_day{ymd.year(), ymd.month(), day{1}}}}; + case truncation_format::WEEK: return Timestamp{trunc_to_monday(days_since_epoch)}; + default: return cuda::std::nullopt; + } +} + +// FormatDeviceT is either a `column_device_view` or `truncation_format`. +template +struct truncate_date_fn { + using Timestamp = cudf::timestamp_D; + static_assert(std::is_same_v || + std::is_same_v, + "FormatDeviceT must be either 'cudf::column_device_view' or 'truncation_format'."); + + cudf::column_device_view datetime; + FormatDeviceT format; + + __device__ inline thrust::pair operator()(cudf::size_type const idx) const + { + auto const datetime_idx = datetime.size() > 1 ? idx : 0; + if (datetime.is_null(datetime_idx)) { return {Timestamp{}, false}; } + if constexpr (cuda::std::is_same_v) { + if (format.is_null(idx)) { return {Timestamp{}, false}; } + } + + truncation_format fmt{}; + if constexpr (cuda::std::is_same_v) { + auto const fmt_str = format.template element(idx); + fmt = parse_format(fmt_str.data(), fmt_str.size_bytes()); + } else { + fmt = format; + } + if (fmt == truncation_format::INVALID) { return {Timestamp{}, false}; } + + using namespace cuda::std::chrono; + auto const ts = datetime.template element(datetime_idx); + auto const days_since_epoch = floor(ts); + auto const ymd = year_month_day(days_since_epoch); + + auto const result = trunc_date(days_since_epoch, ymd, fmt); + return {result.value_or(Timestamp{}), result.has_value()}; + } +}; + +// FormatDeviceT is either a `column_device_view` or `truncation_format`. +template +struct truncate_timestamp_fn { + using Timestamp = cudf::timestamp_us; + static_assert(std::is_same_v || + std::is_same_v, + "FormatDeviceT must be either 'cudf::column_device_view' or 'truncation_format'."); + + cudf::column_device_view datetime; + FormatDeviceT format; + + __device__ inline thrust::pair operator()(cudf::size_type const idx) const + { + auto const datetime_idx = datetime.size() > 1 ? idx : 0; + if (datetime.is_null(datetime_idx)) { return {Timestamp{}, false}; } + if constexpr (cuda::std::is_same_v) { + if (format.is_null(idx)) { return {Timestamp{}, false}; } + } + + truncation_format fmt{}; + if constexpr (cuda::std::is_same_v) { + auto const fmt_str = format.template element(idx); + fmt = parse_format(fmt_str.data(), fmt_str.size_bytes()); + } else { + fmt = format; + } + if (fmt == truncation_format::INVALID) { return {Timestamp{}, false}; } + + auto const ts = datetime.template element(datetime_idx); + + // No truncation needed for microsecond timestamps. + if (fmt == truncation_format::MICROSECOND) { return {ts, true}; } + + // The components that are common for both date and timestamp: year, quarter, month, week. + using namespace cuda::std::chrono; + auto const days_since_epoch = floor(ts); + auto const ymd = year_month_day(days_since_epoch); + if (auto const try_trunc_date = trunc_date(days_since_epoch, ymd, fmt); + try_trunc_date.has_value()) { + return {try_trunc_date.value(), true}; + } + + auto time_since_midnight = ts - days_since_epoch; + if (time_since_midnight.count() < 0) { time_since_midnight += days(1); } + + switch (fmt) { + case truncation_format::DAY: + case truncation_format::DD: return {Timestamp{sys_days{ymd}}, true}; + case truncation_format::HOUR: + return {Timestamp{sys_days{ymd} + floor(time_since_midnight)}, true}; + case truncation_format::MINUTE: + return {Timestamp{sys_days{ymd} + floor(time_since_midnight)}, true}; + case truncation_format::SECOND: + return {Timestamp{sys_days{ymd} + floor(time_since_midnight)}, true}; + case truncation_format::MILLISECOND: + return {Timestamp{sys_days{ymd} + floor(time_since_midnight)}, true}; + default: CUDF_UNREACHABLE("Unhandled truncation format."); + } + } +}; + +template +std::unique_ptr truncate_datetime(cudf::column_view const& datetime, + FormatT const& format, + cudf::size_type output_size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + auto output = cudf::make_fixed_width_column( + datetime.type(), output_size, cudf::mask_state::UNALLOCATED, stream, mr); + auto validity = rmm::device_uvector(output_size, stream); + + auto const input_it = thrust::make_counting_iterator(0); + auto const output_it = + thrust::make_zip_iterator(output->mutable_view().template begin(), validity.begin()); + auto const do_transform = [&](auto trunc_fn) { + thrust::transform(rmm::exec_policy_nosync(stream), + input_it, + input_it + output_size, + output_it, + std::move(trunc_fn)); + }; + + using FormatDeviceT = std::conditional_t, + cudf::column_device_view, + truncation_format>; + using TransformFunc = std::conditional_t, + truncate_date_fn, + truncate_timestamp_fn>; + + auto const d_datetime_ptr = cudf::column_device_view::create(datetime, stream); + if constexpr (std::is_same_v) { + auto const d_format_ptr = cudf::column_device_view::create(format, stream); + do_transform(TransformFunc{*d_datetime_ptr, *d_format_ptr}); + } else { + auto const fmt = parse_format(format.data(), static_cast(format.size())); + if (fmt == truncation_format::INVALID) { + return cudf::make_fixed_width_column( + datetime.type(), output_size, cudf::mask_state::ALL_NULL, stream, mr); + } + do_transform(TransformFunc{*d_datetime_ptr, fmt}); + } + + auto [null_mask, null_count] = + cudf::detail::valid_if(validity.begin(), validity.end(), thrust::identity{}, stream, mr); + output->set_null_mask(null_count > 0 ? std::move(null_mask) : rmm::device_buffer{0, stream, mr}, + null_count); + return output; +} + +template +std::unique_ptr truncate_dispatcher(cudf::column_view const& datetime, + FormatT const& format, + cudf::size_type output_size, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + if (datetime.type().id() == cudf::type_id::TIMESTAMP_DAYS) { + return truncate_datetime(datetime, format, output_size, stream, mr); + } + return truncate_datetime(datetime, format, output_size, stream, mr); +} + +void check_type(cudf::column_view const& datetime) +{ + CUDF_EXPECTS(datetime.type().id() == cudf::type_id::TIMESTAMP_DAYS || + datetime.type().id() == cudf::type_id::TIMESTAMP_MICROSECONDS, + "The date/time input must be either day or microsecond timestamps."); +} + +void check_types(cudf::column_view const& datetime, cudf::column_view const& format) +{ + check_type(datetime); + CUDF_EXPECTS(format.type().id() == cudf::type_id::STRING, + "The format input must be of string type."); +} + +} // namespace + +std::unique_ptr truncate(cudf::column_view const& datetime, + cudf::column_view const& format, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + check_types(datetime, format); + CUDF_EXPECTS(datetime.size() == 1 || datetime.size() == format.size(), + "The input date/time column must have exactly one row or the same number of rows as " + "the format column."); + auto const size = format.size(); + if (size == 0 || datetime.size() == datetime.null_count() || + format.size() == format.null_count()) { + return cudf::make_fixed_width_column( + datetime.type(), size, cudf::mask_state::ALL_NULL, stream, mr); + } + + return truncate_dispatcher(datetime, format, size, stream, mr); +} + +std::unique_ptr truncate(cudf::column_view const& datetime, + std::string const& format, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + check_type(datetime); + + auto const size = datetime.size(); + if (datetime.size() == 0 || datetime.size() == datetime.null_count()) { + return cudf::make_fixed_width_column( + datetime.type(), size, cudf::mask_state::ALL_NULL, stream, mr); + } + + return truncate_dispatcher(datetime, format, size, stream, mr); +} + +} // namespace detail + +std::unique_ptr truncate(cudf::column_view const& datetime, + cudf::column_view const& format, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::truncate(datetime, format, stream, mr); +} + +std::unique_ptr truncate(cudf::column_view const& datetime, + std::string const& format, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + CUDF_FUNC_RANGE(); + return detail::truncate(datetime, format, stream, mr); +} + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/src/datetime_utils.hpp b/src/main/cpp/src/datetime_utils.hpp new file mode 100644 index 0000000000..b531317475 --- /dev/null +++ b/src/main/cpp/src/datetime_utils.hpp @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include + +namespace spark_rapids_jni { +std::unique_ptr rebase_gregorian_to_julian( + cudf::column_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + +std::unique_ptr rebase_julian_to_gregorian( + cudf::column_view const& input, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + +std::unique_ptr truncate( + cudf::column_view const& datetime, + cudf::column_view const& format, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + +std::unique_ptr truncate( + cudf::column_view const& datetime, + std::string const& format, + rmm::cuda_stream_view stream = cudf::get_default_stream(), + rmm::device_async_resource_ref mr = cudf::get_current_device_resource_ref()); + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/src/from_json_to_raw_map_debug.cuh b/src/main/cpp/src/from_json_to_raw_map_debug.cuh index e62953cbb2..481002c369 100644 --- a/src/main/cpp/src/from_json_to_raw_map_debug.cuh +++ b/src/main/cpp/src/from_json_to_raw_map_debug.cuh @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/main/cpp/src/from_json_to_structs.cu b/src/main/cpp/src/from_json_to_structs.cu index 3d7ddbb1c3..aa92929557 100644 --- a/src/main/cpp/src/from_json_to_structs.cu +++ b/src/main/cpp/src/from_json_to_structs.cu @@ -29,7 +29,6 @@ #include #include #include -#include #include #include @@ -598,131 +597,6 @@ std::pair, bool> try_remove_quotes( true}; } -// Copied and modified from `cudf/cpp/src/io/json/parser_features.cpp`. -struct empty_column_functor { - rmm::cuda_stream_view stream; - rmm::device_async_resource_ref mr; - - template ())> - std::unique_ptr operator()(schema_element_with_precision const& schema) const - { - return cudf::make_empty_column(schema.type); - } - - template )> - std::unique_ptr operator()(schema_element_with_precision const& schema) const - { - CUDF_EXPECTS(schema.child_types.size() == 1, "Lists column should have only one child"); - auto offsets = cudf::make_empty_column(cudf::data_type(cudf::type_to_id())); - auto child = cudf::type_dispatcher( - schema.child_types.front().second.type, *this, schema.child_types.front().second); - return cudf::make_lists_column(0, std::move(offsets), std::move(child), 0, {}, stream, mr); - } - - template )> - std::unique_ptr operator()(schema_element_with_precision const& schema) const - { - std::vector> children; - for (auto const& [child_name, child_schema] : schema.child_types) { - children.emplace_back(cudf::type_dispatcher(child_schema.type, *this, child_schema)); - } - return cudf::make_structs_column(0, std::move(children), 0, {}, stream, mr); - } -}; - -// Copied and modified from `cudf/cpp/src/io/json/parser_features.cpp`. -struct allnull_column_functor { - rmm::cuda_stream_view stream; - rmm::device_async_resource_ref mr; - - private: - auto make_zeroed_offsets(cudf::size_type size) const - { - auto offsets_buff = - cudf::detail::make_zeroed_device_uvector_async(size + 1, stream, mr); - return std::make_unique(std::move(offsets_buff), rmm::device_buffer{}, 0); - } - - public: - template () && !std::is_same_v && - !std::is_same_v && - !std::is_same_v)> - std::unique_ptr operator()(Args...) const - { - CUDF_FAIL("Invalid type."); - } - - template ())> - std::unique_ptr operator()(schema_element_with_precision const& schema, - cudf::size_type size) const - { - return cudf::make_fixed_width_column(schema.type, size, cudf::mask_state::ALL_NULL, stream, mr); - } - - template )> - std::unique_ptr operator()(schema_element_with_precision const&, - cudf::size_type size) const - { - auto offsets = make_zeroed_offsets(size); - auto null_mask = cudf::detail::create_null_mask(size, cudf::mask_state::ALL_NULL, stream, mr); - return cudf::make_strings_column( - size, std::move(offsets), rmm::device_buffer{}, size, std::move(null_mask)); - } - - template )> - std::unique_ptr operator()(schema_element_with_precision const& schema, - cudf::size_type size) const - { - CUDF_EXPECTS(schema.child_types.size() == 1, "Lists column should have only one child"); - std::vector> children; - children.emplace_back(make_zeroed_offsets(size)); - children.emplace_back(cudf::type_dispatcher(schema.child_types.front().second.type, - empty_column_functor{stream, mr}, - schema.child_types.front().second)); - auto null_mask = cudf::detail::create_null_mask(size, cudf::mask_state::ALL_NULL, stream, mr); - // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` - // on the child column as it does not have non-empty nulls. - return std::make_unique(cudf::data_type{cudf::type_id::LIST}, - size, - rmm::device_buffer{}, - std::move(null_mask), - size, - std::move(children)); - } - - template )> - std::unique_ptr operator()(schema_element_with_precision const& schema, - cudf::size_type size) const - { - std::vector> children; - children.reserve(schema.child_types.size()); - for (auto const& [child_name, child_schema] : schema.child_types) { - children.emplace_back(cudf::type_dispatcher(child_schema.type, *this, child_schema, size)); - } - auto null_mask = cudf::detail::create_null_mask(size, cudf::mask_state::ALL_NULL, stream, mr); - // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` - // on the children columns. - return std::make_unique(cudf::data_type{cudf::type_id::STRUCT}, - size, - rmm::device_buffer{}, - std::move(null_mask), - size, - std::move(children)); - } -}; - -// This is a workaround for https://github.com/rapidsai/cudf/issues/17167. -// When the issue is fixed, we should remove this utility and adopt it. -std::unique_ptr make_all_nulls_column(schema_element_with_precision const& schema, - cudf::size_type num_rows, - rmm::cuda_stream_view stream, - rmm::device_async_resource_ref mr) -{ - return cudf::type_dispatcher(schema.type, allnull_column_functor{stream, mr}, schema, num_rows); -} - template std::unique_ptr convert_data_type(InputType&& input, schema_element_with_precision const& schema, @@ -813,8 +687,7 @@ std::unique_ptr convert_data_type(InputType&& input, // From here, the input column should have type either LIST or STRUCT. - // Handle mismatched schema. - if (schema.type.id() != d_type) { return make_all_nulls_column(schema, num_rows, stream, mr); } + CUDF_EXPECTS(schema.type.id() == d_type, "Mismatched data type for nested columns."); if constexpr (input_is_column_ptr) { auto const null_count = input->null_count(); @@ -825,9 +698,9 @@ std::unique_ptr convert_data_type(InputType&& input, auto const& child_schema = schema.child_types.front().second; auto& child = input_content.children[cudf::lists_column_view::child_column_index]; - // Handle mismatched child schema. - if (cudf::is_nested(child_schema.type) && (child_schema.type.id() != child->type().id())) { - return make_all_nulls_column(schema, num_rows, stream, mr); + if (cudf::is_nested(child_schema.type)) { + CUDF_EXPECTS(child_schema.type.id() == child->type().id(), + "Mismatched data type for nested child column of a lists column."); } std::vector> new_children; @@ -875,9 +748,9 @@ std::unique_ptr convert_data_type(InputType&& input, auto const& child_schema = schema.child_types.front().second; auto const child = input.child(cudf::lists_column_view::child_column_index); - // Handle mismatched child schema. - if (cudf::is_nested(child_schema.type) && (child_schema.type.id() != child.type().id())) { - return make_all_nulls_column(schema, num_rows, stream, mr); + if (cudf::is_nested(child_schema.type)) { + CUDF_EXPECTS(child_schema.type.id() == child.type().id(), + "Mismatched data type for nested child column of a lists column."); } std::vector> new_children; diff --git a/src/main/cpp/src/hash.hpp b/src/main/cpp/src/hash.hpp index 4021b9e75c..9ec7496031 100644 --- a/src/main/cpp/src/hash.hpp +++ b/src/main/cpp/src/hash.hpp @@ -25,6 +25,7 @@ namespace spark_rapids_jni { constexpr int64_t DEFAULT_XXHASH64_SEED = 42; +constexpr int MAX_STACK_DEPTH = 8; /** * @brief Computes the murmur32 hash value of each row in the input set of columns. diff --git a/src/main/cpp/src/hive_hash.cu b/src/main/cpp/src/hive_hash.cu index c10c424b8b..89d08425fd 100644 --- a/src/main/cpp/src/hive_hash.cu +++ b/src/main/cpp/src/hive_hash.cu @@ -15,6 +15,7 @@ */ #include "hash.cuh" +#include "hash.hpp" #include #include @@ -37,8 +38,6 @@ using hive_hash_value_t = int32_t; constexpr hive_hash_value_t HIVE_HASH_FACTOR = 31; constexpr hive_hash_value_t HIVE_INIT_HASH = 0; -constexpr int MAX_NESTED_DEPTH = 8; - hive_hash_value_t __device__ inline compute_int(int32_t key) { return key; } hive_hash_value_t __device__ inline compute_long(int64_t key) @@ -368,7 +367,7 @@ class hive_device_row_hasher { // The default constructor of `col_stack_frame` is deleted, so it can not allocate an array // of `col_stack_frame` directly. // Instead leverage the byte array to create the col_stack_frame array. - alignas(col_stack_frame) char stack_wrapper[sizeof(col_stack_frame) * MAX_NESTED_DEPTH]; + alignas(col_stack_frame) char stack_wrapper[sizeof(col_stack_frame) * MAX_STACK_DEPTH]; auto col_stack = reinterpret_cast(stack_wrapper); int stack_size = 0; @@ -462,11 +461,11 @@ void check_nested_depth(cudf::table_view const& input) for (auto i = 0; i < input.num_columns(); i++) { cudf::column_view const& col = input.column(i); - CUDF_EXPECTS(get_nested_depth(col) <= MAX_NESTED_DEPTH, + CUDF_EXPECTS(get_nested_depth(col) <= MAX_STACK_DEPTH, "The " + std::to_string(i) + "-th column exceeds the maximum allowed nested depth. " + "Current depth: " + std::to_string(get_nested_depth(col)) + ", " + - "Maximum allowed depth: " + std::to_string(MAX_NESTED_DEPTH)); + "Maximum allowed depth: " + std::to_string(MAX_STACK_DEPTH)); } } diff --git a/src/main/cpp/src/xxhash64.cu b/src/main/cpp/src/xxhash64.cu index daed7590c3..f7a0d7cb35 100644 --- a/src/main/cpp/src/xxhash64.cu +++ b/src/main/cpp/src/xxhash64.cu @@ -15,9 +15,11 @@ */ #include "hash.cuh" +#include "hash.hpp" #include #include +#include #include #include @@ -271,6 +273,28 @@ hash_value_type __device__ inline XXHash_64::operator()( /** * @brief Computes the hash value of a row in the given table. * + * This functor uses Spark conventions for xxhash64 hashing, which differs from + * the xxhash64 implementation used in the rest of libcudf. These differences + * include: + * - Serially using the output hash as an input seed for the next item + * - Ignorance of null values + * + * The serial use of hashes as seeds means that data of different nested types + * can exhibit hash collisions. For example, a row of an integer column + * containing a 1 will have the same hash as a lists column of integers + * containing a list of [1] and a struct column of a single integer column + * containing a struct of {1}. + * + * As a consequence of ignoring null values, inputs like [1], [1, null], and + * [null, 1] have the same hash (an expected hash collision). This kind of + * collision can also occur across a table of nullable columns and with nulls + * in structs ({1, null} and {null, 1} have the same hash). The seed value (the + * previous element's hash value) is returned as the hash if an element is + * null. + * + * For additional differences such as special tail processing and decimal type + * handling, refer to the SparkXXHash64 functor. + * * @tparam Nullate A cudf::nullate type describing whether to check for nulls. */ template @@ -296,27 +320,186 @@ class device_row_hasher { /** * @brief Computes the hash value of an element in the given column. + * + * When the column is non-nested, this is a simple wrapper around the element_hasher. + * When the column is nested, this uses a seed value to serially compute each + * nested element, with the output hash becoming the seed for the next value. + * This requires constructing a new hash functor for each nested element, + * using the new seed from the previous element's hash. The hash of a null + * element is the input seed (the previous element's hash). */ class element_hasher_adapter { public: - template ())> + class element_hasher { + private: + Nullate _check_nulls; + hash_value_type _seed; + + public: + __device__ element_hasher(Nullate check_nulls, hash_value_type seed) + : _check_nulls(check_nulls), _seed(seed) + { + } + + template ())> + __device__ hash_value_type operator()(cudf::column_device_view const& col, + cudf::size_type row_index) const noexcept + { + if (_check_nulls && col.is_null(row_index)) { return _seed; } + return XXHash_64{_seed}(col.element(row_index)); + } + + template ())> + __device__ hash_value_type operator()(cudf::column_device_view const&, + cudf::size_type) const noexcept + { + CUDF_UNREACHABLE("Unsupported type for xxhash64"); + } + }; + + template ())> __device__ hash_value_type operator()(cudf::column_device_view const& col, cudf::size_type row_index, Nullate const _check_nulls, hash_value_type const _seed) const noexcept { - if (_check_nulls && col.is_null(row_index)) { return _seed; } - auto const hasher = XXHash_64{_seed}; - return hasher(col.element(row_index)); + auto const hasher = element_hasher{_check_nulls, _seed}; + return hasher.template operator()(col, row_index); } - template ())> - __device__ hash_value_type operator()(cudf::column_device_view const&, - cudf::size_type, - Nullate const, - hash_value_type const) const noexcept + struct col_stack_frame { + private: + cudf::column_device_view _column; // the column to process + int _idx_to_process; // the index of child or element to process next + + public: + __device__ col_stack_frame() = + delete; // Because the default constructor of `cudf::column_device_view` is deleted + + __device__ col_stack_frame(cudf::column_device_view col) + : _column(std::move(col)), _idx_to_process(0) + { + } + + __device__ int get_and_inc_idx_to_process() { return _idx_to_process++; } + + __device__ int get_idx_to_process() { return _idx_to_process; } + + __device__ cudf::column_device_view get_column() { return _column; } + }; + + /** + * @brief Functor to compute hash value for nested columns. + * + * This functor uses a stack to process nested columns. It iterates through the nested columns + * in a depth-first manner. The stack is used to keep track of the nested columns that need to + * be processed. + * + * - If the current column is a list column, it replaces the list column with its most inner + * non-list child since null values can be ignored in the xxhash64 computation. + * - If the current column is a struct column, there are two cases: + * a. If the struct column has only one row, it would be treated as a struct element. The + * children of the struct element would be pushed into the stack. + * b. If the struct column has multiple rows, it would be treated as a struct column. The + * next struct element would be pushed into the stack. + * - If the current column is a primitive column, it computes the hash value. + * + * For example, consider that the input column is of type `List>`. + * Assume that the element at `row_index` is: [(1, 2.0), (3, 4.0)]. + * The sliced column is noted as L1 here. + * + * L1 List> + * | + * S1 Struct ----> `struct_column` with multiple rows + * / \ + * S1[0] S1[1] Struct ----> `struct_element` with single row + * / \ / \ + * i1 f1 i2 f2 Primitive columns + * + * List level L1: + * |Index|List> | + * |-----|-------------------------| + * |0 | [(1, 2.0), (3, 4.0)] | + * length: 1 + * Offsets: 0, 2 + * + * Struct level S1: + * |Index|Struct| + * |-----|------------------| + * |0 | (1, 2.0) | + * |1 | (3, 4.0) | + * length: 2 + * + * @tparam T Type of the column. + * @param col The column to hash. + * @param row_index The index of the row to hash. + * @param _check_nulls A flag to indicate whether to check for null values. + * @param _seed The initial seed value for the hash computation. + * @return The computed hash value. + * + * @note This function is only enabled for nested columns. + */ + template ())> + __device__ hash_value_type operator()(cudf::column_device_view const& col, + cudf::size_type row_index, + Nullate const _check_nulls, + hash_value_type const _seed) const noexcept { - CUDF_UNREACHABLE("Unsupported type for xxhash64"); + hash_value_type ret = _seed; + cudf::column_device_view curr_col = col.slice(row_index, 1); + // The default constructor of `col_stack_frame` is deleted, so it can not allocate an array + // of `col_stack_frame` directly. + // Instead leverage the byte array to create the col_stack_frame array. + alignas(col_stack_frame) char stack_wrapper[sizeof(col_stack_frame) * MAX_STACK_DEPTH]; + auto col_stack = reinterpret_cast(stack_wrapper); + int stack_size = 0; + + col_stack[stack_size++] = col_stack_frame(curr_col); + + while (stack_size > 0) { + col_stack_frame& top = col_stack[stack_size - 1]; + curr_col = top.get_column(); + // Replace list column with its most inner non-list child + if (curr_col.type().id() == cudf::type_id::LIST) { + do { + curr_col = cudf::detail::lists_column_device_view(curr_col).get_sliced_child(); + } while (curr_col.type().id() == cudf::type_id::LIST); + col_stack[stack_size - 1] = col_stack_frame(curr_col); + continue; + } + + if (curr_col.type().id() == cudf::type_id::STRUCT) { + if (curr_col.size() <= 1) { // struct element + // All child columns processed, pop the element + if (top.get_idx_to_process() == curr_col.num_child_columns()) { + --stack_size; + } else { + // Push the next child column into the stack + col_stack[stack_size++] = + col_stack_frame(cudf::detail::structs_column_device_view(curr_col).get_sliced_child( + top.get_and_inc_idx_to_process())); + } + } else { // struct column + if (top.get_idx_to_process() == curr_col.size()) { + --stack_size; + } else { + col_stack[stack_size++] = + col_stack_frame(curr_col.slice(top.get_and_inc_idx_to_process(), 1)); + } + } + } else { // Primitive column + ret = cudf::detail::accumulate( + thrust::counting_iterator(0), + thrust::counting_iterator(curr_col.size()), + ret, + [curr_col, _check_nulls] __device__(auto hash, auto element_index) { + return cudf::type_dispatcher( + curr_col.type(), element_hasher{_check_nulls, hash}, curr_col, element_index); + }); + --stack_size; + } + } + return ret; } }; @@ -325,6 +508,40 @@ class device_row_hasher { hash_value_type const _seed; }; +void check_nested_depth(cudf::table_view const& input) +{ + using column_checker_fn_t = std::function; + + column_checker_fn_t get_nested_depth = [&](cudf::column_view const& col) { + if (col.type().id() == cudf::type_id::LIST) { + auto const child_col = cudf::lists_column_view(col).child(); + // When encountering a List of Struct column, we need to account for an extra depth, + // as both the struct column and its elements will be pushed into the stack. + if (child_col.type().id() == cudf::type_id::STRUCT) { + return 1 + get_nested_depth(child_col); + } + return get_nested_depth(child_col); + } else if (col.type().id() == cudf::type_id::STRUCT) { + int max_child_depth = 0; + for (auto child = col.child_begin(); child != col.child_end(); ++child) { + max_child_depth = std::max(max_child_depth, get_nested_depth(*child)); + } + return 1 + max_child_depth; + } else { // Primitive type + return 1; + } + }; + + for (auto i = 0; i < input.num_columns(); i++) { + cudf::column_view const& col = input.column(i); + CUDF_EXPECTS(get_nested_depth(col) <= MAX_STACK_DEPTH, + "The " + std::to_string(i) + + "-th column exceeds the maximum allowed nested depth. " + + "Current depth: " + std::to_string(get_nested_depth(col)) + ", " + + "Maximum allowed depth: " + std::to_string(MAX_STACK_DEPTH)); + } +} + } // namespace std::unique_ptr xxhash64(cudf::table_view const& input, @@ -343,7 +560,9 @@ std::unique_ptr xxhash64(cudf::table_view const& input, // Return early if there's nothing to hash if (input.num_columns() == 0 || input.num_rows() == 0) { return output; } - bool const nullable = has_nulls(input); + check_nested_depth(input); + + bool const nullable = has_nested_nulls(input); auto const input_view = cudf::table_device_view::create(input, stream); auto output_view = output->mutable_view(); diff --git a/src/main/cpp/tests/datetime_rebase.cpp b/src/main/cpp/tests/datetime_rebase.cpp index 8ac1ed45f9..a86308ccf1 100644 --- a/src/main/cpp/tests/datetime_rebase.cpp +++ b/src/main/cpp/tests/datetime_rebase.cpp @@ -14,7 +14,7 @@ * limitations under the License. */ -#include +#include // #include diff --git a/src/main/java/ai/rapids/cudf/CudfAccessor.java b/src/main/java/ai/rapids/cudf/CudfAccessor.java deleted file mode 100644 index 3352ee1ae9..0000000000 --- a/src/main/java/ai/rapids/cudf/CudfAccessor.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package ai.rapids.cudf; - -// TODO: properly expose these functions in the actual Scalar API and remove this layer. -// https://github.com/NVIDIA/spark-rapids-jni/issues/1307 -public class CudfAccessor { - public static long getScalarHandle(Scalar s) { - return s.getScalarHandle(); - } - - public static Scalar scalarFromHandle(DType type, long scalarHandle) { - return new Scalar(type, scalarHandle); - } -} diff --git a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java index 46bf9a7f08..6a676a54bb 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/BloomFilter.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -16,12 +16,8 @@ package com.nvidia.spark.rapids.jni; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - import ai.rapids.cudf.BaseDeviceMemoryBuffer; import ai.rapids.cudf.ColumnVector; -import ai.rapids.cudf.CudfAccessor; import ai.rapids.cudf.CudfException; import ai.rapids.cudf.DType; import ai.rapids.cudf.Scalar; @@ -46,7 +42,7 @@ public static Scalar create(int numHashes, long bloomFilterBits){ if(bloomFilterBits <= 0){ throw new IllegalArgumentException("Bloom filters must have a positive number of bits"); } - return CudfAccessor.scalarFromHandle(DType.LIST, creategpu(numHashes, bloomFilterBits)); + return new Scalar(DType.LIST, creategpu(numHashes, bloomFilterBits)); } /** @@ -55,39 +51,39 @@ public static Scalar create(int numHashes, long bloomFilterBits){ * @param cv The column containing the values to add. */ public static void put(Scalar bloomFilter, ColumnVector cv){ - put(CudfAccessor.getScalarHandle(bloomFilter), cv.getNativeView()); + put(bloomFilter.getScalarHandle(), cv.getNativeView()); } /** * Merge one or more bloom filters into a new bloom filter. - * @param bloomFilters A ColumnVector containing a bloom filter per row. + * @param bloomFilters A ColumnVector containing a bloom filter per row. * @return A new bloom filter containing the merged inputs. */ public static Scalar merge(ColumnVector bloomFilters){ - return CudfAccessor.scalarFromHandle(DType.LIST, merge(bloomFilters.getNativeView())); + return new Scalar(DType.LIST, merge(bloomFilters.getNativeView())); } /** - * Probe a bloom filter with a column of longs. Returns a column of booleans. For + * Probe a bloom filter with a column of longs. Returns a column of booleans. For * each row in the output; a value of true indicates that the corresponding input value * -may- be in the set of values used to build the bloom filter; a value of false indicates * that the corresponding input value is conclusively not in the set of values used to build - * the bloom filter. + * the bloom filter. * @param bloomFilter The bloom filter to be probed. * @param cv The column containing the values to check. * @return A boolean column indicating the results of the probe. */ public static ColumnVector probe(Scalar bloomFilter, ColumnVector cv){ - return new ColumnVector(probe(CudfAccessor.getScalarHandle(bloomFilter), cv.getNativeView())); + return new ColumnVector(probe(bloomFilter.getScalarHandle(), cv.getNativeView())); } /** - * Probe a bloom filter with a column of longs. Returns a column of booleans. For + * Probe a bloom filter with a column of longs. Returns a column of booleans. For * each row in the output; a value of true indicates that the corresponding input value * -may- be in the set of values used to build the bloom filter; a value of false indicates * that the corresponding input value is conclusively not in the set of values used to build - * the bloom filter. - * @param bloomFilter The bloom filter to be probed. This buffer is expected to be the + * the bloom filter. + * @param bloomFilter The bloom filter to be probed. This buffer is expected to be the * fully packed Spark bloom filter, including header. * @param cv The column containing the values to check. * @return A boolean column indicating the results of the probe. @@ -95,10 +91,10 @@ public static ColumnVector probe(Scalar bloomFilter, ColumnVector cv){ public static ColumnVector probe(BaseDeviceMemoryBuffer bloomFilter, ColumnVector cv){ return new ColumnVector(probebuffer(bloomFilter.getAddress(), bloomFilter.getLength(), cv.getNativeView())); } - + private static native long creategpu(int numHashes, long bloomFilterBits) throws CudfException; private static native int put(long bloomFilter, long cv) throws CudfException; private static native long merge(long bloomFilters) throws CudfException; - private static native long probe(long bloomFilter, long cv) throws CudfException; - private static native long probebuffer(long bloomFilter, long bloomFilterSize, long cv) throws CudfException; + private static native long probe(long bloomFilter, long cv) throws CudfException; + private static native long probebuffer(long bloomFilter, long bloomFilterSize, long cv) throws CudfException; } diff --git a/src/main/java/com/nvidia/spark/rapids/jni/DateTimeRebase.java b/src/main/java/com/nvidia/spark/rapids/jni/DateTimeRebase.java index d73ee038d6..7ec98aa930 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/DateTimeRebase.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/DateTimeRebase.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2023, NVIDIA CORPORATION. + * Copyright (c) 2023-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. @@ -19,41 +19,18 @@ import ai.rapids.cudf.*; /** - * Utility class for converting between column major and row major data + * This will be removed after the plugin picks up DateTimeUtils class. */ public class DateTimeRebase { static { NativeDepsLoader.loadNativeDeps(); } - /** - * Convert the given timestamps as a number of days or microseconds since the epoch instant - * 1970-01-01T00:00:00Z to a local date-time in Proleptic Gregorian calendar, reinterpreting - * the result as in Julian calendar, then compute the number of days or microseconds since the - * epoch from that Julian local date-time. - *

- * This is to match with Apache Spark's `localRebaseGregorianToJulianDays` and - * `rebaseGregorianToJulianMicros` functions with timezone fixed to UTC. - */ public static ColumnVector rebaseGregorianToJulian(ColumnView input) { - return new ColumnVector(rebaseGregorianToJulian(input.getNativeView())); + return DateTimeUtils.rebaseGregorianToJulian(input); } - /** - * Convert the given timestamps as a number of days or microseconds since the epoch instant - * 1970-01-01T00:00:00Z to a local date-time in Julian calendar, reinterpreting the result - * as in Proleptic Gregorian calendar, then compute the number of days or microseconds since the - * epoch from that Gregorian local date-time. - *

- * This is to match with Apache Spark's `localRebaseJulianToGregorianDays` and - * `rebaseJulianToGregorianMicros` functions with timezone fixed to UTC. - */ public static ColumnVector rebaseJulianToGregorian(ColumnView input) { - return new ColumnVector(rebaseJulianToGregorian(input.getNativeView())); + return DateTimeUtils.rebaseJulianToGregorian(input); } - - - private static native long rebaseGregorianToJulian(long nativeHandle); - - private static native long rebaseJulianToGregorian(long nativeHandle); } diff --git a/src/main/java/com/nvidia/spark/rapids/jni/DateTimeUtils.java b/src/main/java/com/nvidia/spark/rapids/jni/DateTimeUtils.java new file mode 100644 index 0000000000..b02aa1eca6 --- /dev/null +++ b/src/main/java/com/nvidia/spark/rapids/jni/DateTimeUtils.java @@ -0,0 +1,116 @@ +/* + * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.jni; + +import ai.rapids.cudf.*; + +/** + * Utility class for converting between column major and row major data + */ +public class DateTimeUtils { + static { + NativeDepsLoader.loadNativeDeps(); + } + + /** + * Convert the given timestamps as a number of days or microseconds since the epoch instant + * 1970-01-01T00:00:00Z to a local date-time in Proleptic Gregorian calendar, reinterpreting + * the result as in Julian calendar, then compute the number of days or microseconds since the + * epoch from that Julian local date-time. + *

+ * This is to match with Apache Spark's `localRebaseGregorianToJulianDays` and + * `rebaseGregorianToJulianMicros` functions with timezone fixed to UTC. + * + * @param input The input column + * @return A new column with the rebase applied + */ + public static ColumnVector rebaseGregorianToJulian(ColumnView input) { + return new ColumnVector(rebaseGregorianToJulian(input.getNativeView())); + } + + /** + * Convert the given timestamps as a number of days or microseconds since the epoch instant + * 1970-01-01T00:00:00Z to a local date-time in Julian calendar, reinterpreting the result + * as in Proleptic Gregorian calendar, then compute the number of days or microseconds since the + * epoch from that Gregorian local date-time. + *

+ * This is to match with Apache Spark's `localRebaseJulianToGregorianDays` and + * `rebaseJulianToGregorianMicros` functions with timezone fixed to UTC. + * + * @param input The input column + * @return A new column with the rebase applied + */ + public static ColumnVector rebaseJulianToGregorian(ColumnView input) { + return new ColumnVector(rebaseJulianToGregorian(input.getNativeView())); + } + + /** + * Truncate the given date or timestamp to the unit specified by the format string. + *

+ * The input date/time must be of type TIMESTAMP_DAYS or TIMESTAMP_MICROSECONDS, and the format + * be of type STRING. In addition, the format strings are case-insensitive. + *

+ * For TIMESTAMP_DAYS, the valid format are:
+ * - {@code "YEAR", "YYYY", "YY"}: truncate to the first date of the year.
+ * - {@code "QUARTER"}: truncate to the first date of the quarter.
+ * - {@code "MONTH", "MM", "MON"}: truncate to the first date of the month.
+ * - {@code "WEEK"}: truncate to the Monday of the week.
+ *
+ * For TIMESTAMP_MICROSECONDS, the valid format are:
+ * - {@code "YEAR", "YYYY", "YY"}: truncate to the first date of the year.
+ * - {@code "QUARTER"}: truncate to the first date of the quarter.
+ * - {@code "MONTH", "MM", "MON"}: truncate to the first date of the month.
+ * - {@code "WEEK"}: truncate to the Monday of the week.
+ * - {@code "DAY", "DD"}: zero out the time part.
+ * - {@code "HOUR"}: zero out the minute and second with fraction part.
+ * - {@code "MINUTE"}: zero out the second with fraction part.
+ * - {@code "SECOND"}: zero out the second fraction part.
+ * - {@code "MILLISECOND"}: zero out the microseconds.
+ * - {@code "MICROSECOND"}: keep everything.
+ * + * @param datetime The input date/time + * @param format The time component to truncate to + * @return The truncated date/time + */ + public static ColumnVector truncate(ColumnView datetime, ColumnView format) { + return new ColumnVector(truncateWithColumnFormat(datetime.getNativeView(), + format.getNativeView())); + } + + /** + * Truncate the given date or timestamp to the unit specified by the format string. + *

+ * This function is similar to {@link #truncate(ColumnView, ColumnView)} but the input format + * is a string literal instead of a column. + * + * @param datetime The input date/time + * @param format The time component to truncate to + * @return The truncated date/time + */ + public static ColumnVector truncate(ColumnView datetime, String format) { + return new ColumnVector(truncateWithScalarFormat(datetime.getNativeView(), format)); + } + + + private static native long rebaseGregorianToJulian(long nativeHandle); + + private static native long rebaseJulianToGregorian(long nativeHandle); + + private static native long truncateWithColumnFormat(long datetimeHandle, long formatHandle); + + private static native long truncateWithScalarFormat(long datetimeHandle, String format); +} diff --git a/src/main/java/com/nvidia/spark/rapids/jni/GpuSubstringIndexUtils.java b/src/main/java/com/nvidia/spark/rapids/jni/GpuSubstringIndexUtils.java index a8750919c9..81d2da56bf 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/GpuSubstringIndexUtils.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/GpuSubstringIndexUtils.java @@ -24,7 +24,7 @@ public class GpuSubstringIndexUtils { } public static ColumnVector substringIndex(ColumnView cv, Scalar delimiter, int count){ - return new ColumnVector(substringIndex(cv.getNativeView(), CudfAccessor.getScalarHandle(delimiter), count)); + return new ColumnVector(substringIndex(cv.getNativeView(), delimiter.getScalarHandle(), count)); } private static native long substringIndex(long columnView, long delimiter, int count) throws CudfException; diff --git a/src/main/java/com/nvidia/spark/rapids/jni/Hash.java b/src/main/java/com/nvidia/spark/rapids/jni/Hash.java index 16971c5bdb..2d23ae5256 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/Hash.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/Hash.java @@ -22,13 +22,15 @@ import ai.rapids.cudf.NativeDepsLoader; public class Hash { - // there doesn't appear to be a useful constant in spark to reference. this could break. - static final long DEFAULT_XXHASH64_SEED = 42; - static { NativeDepsLoader.loadNativeDeps(); } + // there doesn't appear to be a useful constant in spark to reference. this could break. + static final long DEFAULT_XXHASH64_SEED = 42; + + public static final int MAX_STACK_DEPTH = getMaxStackDepth(); + /** * Create a new vector containing spark's 32-bit murmur3 hash of each row in the table. * Spark's murmur3 hash uses a different tail processing algorithm. @@ -75,7 +77,6 @@ public static ColumnVector xxhash64(long seed, ColumnView columns[]) { assert columns[i] != null : "Column vectors passed may not be null"; assert columns[i].getRowCount() == size : "Row count mismatch, all columns must be the same size"; assert !columns[i].getType().isDurationType() : "Unsupported column type Duration"; - assert !columns[i].getType().isNestedType() : "Unsupported column type Nested"; columnViews[i] = columns[i].getNativeView(); } return new ColumnVector(xxhash64(seed, columnViews)); @@ -101,6 +102,8 @@ public static ColumnVector hiveHash(ColumnView columns[]) { return new ColumnVector(hiveHash(columnViews)); } + private static native int getMaxStackDepth(); + private static native long murmurHash32(int seed, long[] viewHandles) throws CudfException; private static native long xxhash64(long seed, long[] viewHandles) throws CudfException; diff --git a/src/main/java/com/nvidia/spark/rapids/jni/RegexRewriteUtils.java b/src/main/java/com/nvidia/spark/rapids/jni/RegexRewriteUtils.java index 9277c3e0f9..d509227cdf 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/RegexRewriteUtils.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/RegexRewriteUtils.java @@ -28,7 +28,7 @@ public class RegexRewriteUtils { * a literal string followed by a range of characters in the range of start to end, with at least * len characters. * - * @param strings Column of strings to check for literal. + * @param input Column of strings to check for literal. * @param literal UTF-8 encoded string to check in strings column. * @param len Minimum number of characters to check after the literal. * @param start Minimum UTF-8 codepoint value to check for in the range. @@ -37,7 +37,7 @@ public class RegexRewriteUtils { */ public static ColumnVector literalRangePattern(ColumnVector input, Scalar literal, int len, int start, int end) { assert(input.getType().equals(DType.STRING)) : "column must be a String"; - return new ColumnVector(literalRangePattern(input.getNativeView(), CudfAccessor.getScalarHandle(literal), len, start, end)); + return new ColumnVector(literalRangePattern(input.getNativeView(), literal.getScalarHandle(), len, start, end)); } private static native long literalRangePattern(long input, long literal, int len, int start, int end); diff --git a/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java b/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java index 6370531428..7ae784e639 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializer.java @@ -16,20 +16,29 @@ package com.nvidia.spark.rapids.jni.kudo; -import ai.rapids.cudf.*; +import static com.nvidia.spark.rapids.jni.Preconditions.ensure; +import static java.util.Objects.requireNonNull; + +import ai.rapids.cudf.BufferType; +import ai.rapids.cudf.Cuda; +import ai.rapids.cudf.HostColumnVector; +import ai.rapids.cudf.JCudfSerialization; +import ai.rapids.cudf.Schema; +import ai.rapids.cudf.Table; import com.nvidia.spark.rapids.jni.Pair; import com.nvidia.spark.rapids.jni.schema.Visitors; - -import java.io.*; +import java.io.BufferedOutputStream; +import java.io.DataInputStream; +import java.io.DataOutputStream; +import java.io.IOException; +import java.io.InputStream; +import java.io.OutputStream; import java.util.Arrays; import java.util.List; import java.util.function.LongConsumer; import java.util.function.Supplier; import java.util.stream.IntStream; -import static com.nvidia.spark.rapids.jni.Preconditions.ensure; -import static java.util.Objects.requireNonNull; - /** * This class is used to serialize/deserialize a table using the Kudo format. * @@ -148,8 +157,9 @@ public class KudoSerializer { private static final byte[] PADDING = new byte[64]; - private static final BufferType[] ALL_BUFFER_TYPES = new BufferType[]{BufferType.VALIDITY, BufferType.OFFSET, - BufferType.DATA}; + private static final BufferType[] ALL_BUFFER_TYPES = + new BufferType[] {BufferType.VALIDITY, BufferType.OFFSET, + BufferType.DATA}; static { Arrays.fill(PADDING, (byte) 0); @@ -176,7 +186,7 @@ public KudoSerializer(Schema schema) { * @param numRows number of rows to write * @return number of bytes written */ - long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) { + WriteMetrics writeToStreamWithMetrics(Table table, OutputStream out, int rowOffset, int numRows) { HostColumnVector[] columns = null; try { columns = IntStream.range(0, table.getNumberOfColumns()) @@ -185,7 +195,7 @@ long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) { .toArray(HostColumnVector[]::new); Cuda.DEFAULT_STREAM.sync(); - return writeToStream(columns, out, rowOffset, numRows); + return writeToStreamWithMetrics(columns, out, rowOffset, numRows); } finally { if (columns != null) { for (HostColumnVector column : columns) { @@ -195,6 +205,18 @@ long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) { } } + /** + * Write partition of an array of {@link HostColumnVector} to an output stream. + * See {@link #writeToStreamWithMetrics(HostColumnVector[], OutputStream, int, int)} for more + * details. + * + * @return number of bytes written + */ + public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset, + int numRows) { + return writeToStreamWithMetrics(columns, out, rowOffset, numRows).getWrittenBytes(); + } + /** * Write partition of an array of {@link HostColumnVector} to an output stream. *
@@ -208,7 +230,8 @@ long writeToStream(Table table, OutputStream out, int rowOffset, int numRows) { * @param numRows number of rows to write * @return number of bytes written */ - public long writeToStream(HostColumnVector[] columns, OutputStream out, int rowOffset, int numRows) { + public WriteMetrics writeToStreamWithMetrics(HostColumnVector[] columns, OutputStream out, + int rowOffset, int numRows) { ensure(numRows > 0, () -> "numRows must be > 0, but was " + numRows); ensure(columns.length > 0, () -> "columns must not be empty, for row count only records " + "please call writeRowCountToStream"); @@ -286,17 +309,25 @@ public Pair mergeToTable(List kudoTables) throws } } - private long writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset, int numRows) throws Exception { - KudoTableHeaderCalc headerCalc = new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount); - Visitors.visitColumns(columns, headerCalc); + private WriteMetrics writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffset, + int numRows) throws Exception { + WriteMetrics metrics = new WriteMetrics(); + KudoTableHeaderCalc headerCalc = + new KudoTableHeaderCalc(rowOffset, numRows, flattenedColumnCount); + withTime(() -> Visitors.visitColumns(columns, headerCalc), metrics::addCalcHeaderTime); KudoTableHeader header = headerCalc.getHeader(); + long currentTime = System.nanoTime(); header.writeTo(out); + metrics.addCopyHeaderTime(System.nanoTime() - currentTime); + metrics.addWrittenBytes(header.getSerializedSize()); long bytesWritten = 0; for (BufferType bufferType : ALL_BUFFER_TYPES) { - SlicedBufferSerializer serializer = new SlicedBufferSerializer(rowOffset, numRows, bufferType, out); + SlicedBufferSerializer serializer = new SlicedBufferSerializer(rowOffset, numRows, bufferType, + out, metrics); Visitors.visitColumns(columns, serializer); bytesWritten += serializer.getTotalDataLen(); + metrics.addWrittenBytes(serializer.getTotalDataLen()); } if (bytesWritten != header.getTotalDataLen()) { @@ -307,7 +338,7 @@ private long writeSliced(HostColumnVector[] columns, DataWriter out, int rowOffs out.flush(); - return header.getSerializedSize() + bytesWritten; + return metrics; } private static DataWriter writerFrom(OutputStream out) { @@ -348,6 +379,12 @@ static T withTime(Supplier task, LongConsumer timeConsumer) { return ret; } + static void withTime(Runnable task, LongConsumer timeConsumer) { + long now = System.nanoTime(); + task.run(); + timeConsumer.accept(System.nanoTime() - now); + } + /** * This method returns the length in bytes needed to represent X number of rows * e.g. getValidityLengthInBytes(5) => 1 byte diff --git a/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java b/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java index e22a523855..080cb5eda6 100644 --- a/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java +++ b/src/main/java/com/nvidia/spark/rapids/jni/kudo/SlicedBufferSerializer.java @@ -16,19 +16,18 @@ package com.nvidia.spark.rapids.jni.kudo; +import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.padForHostAlignment; + import ai.rapids.cudf.BufferType; import ai.rapids.cudf.DType; import ai.rapids.cudf.HostColumnVectorCore; import ai.rapids.cudf.HostMemoryBuffer; import com.nvidia.spark.rapids.jni.schema.HostColumnsVisitor; - import java.io.IOException; import java.util.ArrayDeque; import java.util.Deque; import java.util.List; -import static com.nvidia.spark.rapids.jni.kudo.KudoSerializer.padForHostAlignment; - /** * This class visits a list of columns and serialize one of the buffers (validity, offset, or data) into with kudo * format. @@ -48,13 +47,16 @@ class SlicedBufferSerializer implements HostColumnsVisitor { private final DataWriter writer; private final Deque sliceInfos = new ArrayDeque<>(); + private final WriteMetrics metrics; private long totalDataLen; - SlicedBufferSerializer(int rowOffset, int numRows, BufferType bufferType, DataWriter writer) { + SlicedBufferSerializer(int rowOffset, int numRows, BufferType bufferType, DataWriter writer, + WriteMetrics metrics) { this.root = new SliceInfo(rowOffset, numRows); this.bufferType = bufferType; this.writer = writer; this.sliceInfos.addLast(root); + this.metrics = metrics; this.totalDataLen = 0; } @@ -153,28 +155,26 @@ public Void visit(HostColumnVectorCore col) { } } - private long copySlicedValidity(HostColumnVectorCore column, SliceInfo sliceInfo) throws IOException { + private long copySlicedValidity(HostColumnVectorCore column, SliceInfo sliceInfo) + throws IOException { if (column.getValidity() != null && sliceInfo.getRowCount() > 0) { HostMemoryBuffer buff = column.getValidity(); long len = sliceInfo.getValidityBufferInfo().getBufferLength(); - writer.copyDataFrom(buff, sliceInfo.getValidityBufferInfo().getBufferOffset(), - len); - return padForHostAlignment(writer, len); + return copyBufferAndPadForHost(buff, sliceInfo.getValidityBufferInfo().getBufferOffset(), len); } else { return 0; } } - private long copySlicedOffset(HostColumnVectorCore column, SliceInfo sliceInfo) throws IOException { + private long copySlicedOffset(HostColumnVectorCore column, SliceInfo sliceInfo) + throws IOException { if (sliceInfo.rowCount <= 0 || column.getOffsets() == null) { // Don't copy anything, there are no rows return 0; } long bytesToCopy = (sliceInfo.rowCount + 1) * Integer.BYTES; long srcOffset = sliceInfo.offset * Integer.BYTES; - HostMemoryBuffer buff = column.getOffsets(); - writer.copyDataFrom(buff, srcOffset, bytesToCopy); - return padForHostAlignment(writer, bytesToCopy); + return copyBufferAndPadForHost(column.getOffsets(), srcOffset, bytesToCopy); } private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) throws IOException { @@ -182,7 +182,8 @@ private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) th DType type = column.getType(); if (type.equals(DType.STRING)) { long startByteOffset = column.getOffsets().getInt(sliceInfo.offset * Integer.BYTES); - long endByteOffset = column.getOffsets().getInt((sliceInfo.offset + sliceInfo.rowCount) * Integer.BYTES); + long endByteOffset = + column.getOffsets().getInt((sliceInfo.offset + sliceInfo.rowCount) * Integer.BYTES); long bytesToCopy = endByteOffset - startByteOffset; if (column.getData() == null) { if (bytesToCopy != 0) { @@ -192,14 +193,12 @@ private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) th return 0; } else { - writer.copyDataFrom(column.getData(), startByteOffset, bytesToCopy); - return padForHostAlignment(writer, bytesToCopy); + return copyBufferAndPadForHost(column.getData(), startByteOffset, bytesToCopy); } } else if (type.getSizeInBytes() > 0) { long bytesToCopy = sliceInfo.rowCount * type.getSizeInBytes(); long srcOffset = sliceInfo.offset * type.getSizeInBytes(); - writer.copyDataFrom(column.getData(), srcOffset, bytesToCopy); - return padForHostAlignment(writer, bytesToCopy); + return copyBufferAndPadForHost(column.getData(), srcOffset, bytesToCopy); } else { return 0; } @@ -207,4 +206,13 @@ private long copySlicedData(HostColumnVectorCore column, SliceInfo sliceInfo) th return 0; } } + + private long copyBufferAndPadForHost(HostMemoryBuffer buffer, long offset, long length) + throws IOException { + long now = System.nanoTime(); + writer.copyDataFrom(buffer, offset, length); + long ret = padForHostAlignment(writer, length); + metrics.addCopyBufferTime(System.nanoTime() - now); + return ret; + } } diff --git a/src/main/java/com/nvidia/spark/rapids/jni/kudo/WriteMetrics.java b/src/main/java/com/nvidia/spark/rapids/jni/kudo/WriteMetrics.java new file mode 100644 index 0000000000..d34564e776 --- /dev/null +++ b/src/main/java/com/nvidia/spark/rapids/jni/kudo/WriteMetrics.java @@ -0,0 +1,79 @@ +/* + * Copyright (c) 2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.jni.kudo; + +/** + * This class contains metrics for serializing table using kudo format. + */ +public class WriteMetrics { + private long calcHeaderTime; + private long copyHeaderTime; + private long copyBufferTime; + private long writtenBytes; + + + public WriteMetrics() { + this.calcHeaderTime = 0; + this.copyHeaderTime = 0; + this.copyBufferTime = 0; + this.writtenBytes = 0; + } + + /** + * Get the time spent on calculating the header. + */ + public long getCalcHeaderTime() { + return calcHeaderTime; + } + + /** + * Get the time spent on copying the buffer. + */ + public long getCopyBufferTime() { + return copyBufferTime; + } + + public void addCopyBufferTime(long time) { + copyBufferTime += time; + } + + /** + * Get the time spent on copying the header. + */ + public long getCopyHeaderTime() { + return copyHeaderTime; + } + + public void addCalcHeaderTime(long time) { + calcHeaderTime += time; + } + + public void addCopyHeaderTime(long time) { + copyHeaderTime += time; + } + + /** + * Get the number of bytes written. + */ + public long getWrittenBytes() { + return writtenBytes; + } + + public void addWrittenBytes(long bytes) { + writtenBytes += bytes; + } +} diff --git a/src/test/java/com/nvidia/spark/rapids/jni/DateTimeRebaseTest.java b/src/test/java/com/nvidia/spark/rapids/jni/DateTimeRebaseTest.java deleted file mode 100644 index 5508d56d4d..0000000000 --- a/src/test/java/com/nvidia/spark/rapids/jni/DateTimeRebaseTest.java +++ /dev/null @@ -1,115 +0,0 @@ -/* - * Copyright (c) 2023, NVIDIA CORPORATION. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package com.nvidia.spark.rapids.jni; - -import static ai.rapids.cudf.AssertUtils.assertColumnsAreEqual; - -import org.junit.jupiter.api.Test; - -import ai.rapids.cudf.ColumnVector; - -public class DateTimeRebaseTest { - @Test - void rebaseDaysToJulianTest() { - try (ColumnVector input = ColumnVector.timestampDaysFromBoxedInts(-719162, -354285, null, - -141714, -141438, -141437, - null, null, - -141432, -141427, -31463, -31453, -1, 0, 18335); - ColumnVector expected = ColumnVector.timestampDaysFromBoxedInts(-719164, -354280, null, - -141704, -141428, -141427, - null, null, - -141427, -141427, -31463, -31453, -1, 0, 18335); - ColumnVector result = DateTimeRebase.rebaseGregorianToJulian(input)) { - assertColumnsAreEqual(expected, result); - } - } - - @Test - void rebaseDaysToGregorianTest() { - try (ColumnVector input = ColumnVector.timestampDaysFromBoxedInts(-719164, -354280, null, - -141704, -141428, -141427, - null, null, - -141427, -141427, -31463, -31453, -1, 0, 18335); - ColumnVector expected = ColumnVector.timestampDaysFromBoxedInts(-719162, -354285, null, - -141714, -141438, -141427, - null, null, - -141427, -141427, -31463, -31453, -1, 0, 18335); - ColumnVector result = DateTimeRebase.rebaseJulianToGregorian(input)) { - assertColumnsAreEqual(expected, result); - } - } - - @Test - void rebaseMicroToJulian() { - try (ColumnVector input = ColumnVector.timestampMicroSecondsFromBoxedLongs(-62135593076345679L, - -30610213078876544L, - null, - -12244061221876544L, - -12220243200000000L, - -12219639001448163L, - -12219292799000001L, - -45446999900L, - 1L, - null, - 1584178381500000L); - ColumnVector expected = - ColumnVector.timestampMicroSecondsFromBoxedLongs(-62135765876345679L, - -30609781078876544L, - null, - -12243197221876544L, - -12219379200000000L, - -12219207001448163L, - -12219292799000001L, - -45446999900L, - 1L, - null, - 1584178381500000L); - ColumnVector result = DateTimeRebase.rebaseGregorianToJulian(input)) { - assertColumnsAreEqual(expected, result); - } - } - - @Test - void rebaseMicroToGregorian() { - try (ColumnVector input = ColumnVector.timestampMicroSecondsFromBoxedLongs(-62135765876345679L, - -30609781078876544L, - null, - -12243197221876544L, - -12219379200000000L, - -12219207001448163L, - -12219292799000001L, - -45446999900L, - 1L, - null, - 1584178381500000L); - ColumnVector expected = - ColumnVector.timestampMicroSecondsFromBoxedLongs(-62135593076345679L, - -30610213078876544L, - null, - -12244061221876544L, - -12220243200000000L, - -12219207001448163L, - -12219292799000001L, - -45446999900L, - 1L, - null, - 1584178381500000L); - ColumnVector result = DateTimeRebase.rebaseJulianToGregorian(input)) { - assertColumnsAreEqual(expected, result); - } - } -} diff --git a/src/test/java/com/nvidia/spark/rapids/jni/DateTimeUtilsTest.java b/src/test/java/com/nvidia/spark/rapids/jni/DateTimeUtilsTest.java new file mode 100644 index 0000000000..6007dfe219 --- /dev/null +++ b/src/test/java/com/nvidia/spark/rapids/jni/DateTimeUtilsTest.java @@ -0,0 +1,150 @@ +/* + * Copyright (c) 2023-2024, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.jni; + +import static ai.rapids.cudf.AssertUtils.assertColumnsAreEqual; + +import org.junit.jupiter.api.Test; + +import ai.rapids.cudf.ColumnVector; + +public class DateTimeUtilsTest { + @Test + void rebaseDaysToJulianTest() { + try ( + ColumnVector input = ColumnVector.timestampDaysFromBoxedInts(-719162, -354285, null, + -141714, -141438, -141437, + null, null, + -141432, -141427, -31463, -31453, -1, 0, 18335); + ColumnVector expected = ColumnVector.timestampDaysFromBoxedInts(-719164, -354280, null, + -141704, -141428, -141427, + null, null, + -141427, -141427, -31463, -31453, -1, 0, 18335); + ColumnVector result = DateTimeUtils.rebaseGregorianToJulian(input)) { + assertColumnsAreEqual(expected, result); + } + } + + @Test + void rebaseDaysToGregorianTest() { + try ( + ColumnVector input = ColumnVector.timestampDaysFromBoxedInts(-719164, -354280, null, + -141704, -141428, -141427, + null, null, + -141427, -141427, -31463, -31453, -1, 0, 18335); + ColumnVector expected = ColumnVector.timestampDaysFromBoxedInts(-719162, -354285, null, + -141714, -141438, -141427, + null, null, + -141427, -141427, -31463, -31453, -1, 0, 18335); + ColumnVector result = DateTimeUtils.rebaseJulianToGregorian(input)) { + assertColumnsAreEqual(expected, result); + } + } + + @Test + void rebaseMicroToJulian() { + try ( + ColumnVector input = ColumnVector.timestampMicroSecondsFromBoxedLongs(-62135593076345679L, + -30610213078876544L, + null, + -12244061221876544L, + -12220243200000000L, + -12219639001448163L, + -12219292799000001L, + -45446999900L, + 1L, + null, + 1584178381500000L); + ColumnVector expected = + ColumnVector.timestampMicroSecondsFromBoxedLongs(-62135765876345679L, + -30609781078876544L, + null, + -12243197221876544L, + -12219379200000000L, + -12219207001448163L, + -12219292799000001L, + -45446999900L, + 1L, + null, + 1584178381500000L); + ColumnVector result = DateTimeUtils.rebaseGregorianToJulian(input)) { + assertColumnsAreEqual(expected, result); + } + } + + @Test + void rebaseMicroToGregorian() { + try ( + ColumnVector input = ColumnVector.timestampMicroSecondsFromBoxedLongs(-62135765876345679L, + -30609781078876544L, + null, + -12243197221876544L, + -12219379200000000L, + -12219207001448163L, + -12219292799000001L, + -45446999900L, + 1L, + null, + 1584178381500000L); + ColumnVector expected = + ColumnVector.timestampMicroSecondsFromBoxedLongs(-62135593076345679L, + -30610213078876544L, + null, + -12244061221876544L, + -12220243200000000L, + -12219207001448163L, + -12219292799000001L, + -45446999900L, + 1L, + null, + 1584178381500000L); + ColumnVector result = DateTimeUtils.rebaseJulianToGregorian(input)) { + assertColumnsAreEqual(expected, result); + } + } + + @Test + void truncateDateTest() { + try (ColumnVector input = ColumnVector.timestampDaysFromBoxedInts(-31463, -31453, null, 0, 18335); + ColumnVector format = ColumnVector.fromStrings("YEAR", "MONTH", "WEEK", "QUARTER", "YY"); + ColumnVector expected = ColumnVector.timestampDaysFromBoxedInts(-31776, -31472, null, 0, 18262); + ColumnVector result = DateTimeUtils.truncate(input, format)) { + assertColumnsAreEqual(expected, result); + } + } + + @Test + void truncateTimestampTest() { + try ( + ColumnVector input = ColumnVector.timestampMicroSecondsFromBoxedLongs( + -12219292799000001L, + -45446999900L, + 1L, + null, + 1584178381500000L); + ColumnVector format = ColumnVector.fromStrings("YEAR", "HOUR", "WEEK", "QUARTER", "SECOND"); + ColumnVector expected = ColumnVector.timestampMicroSecondsFromBoxedLongs( + -12244089600000000L, + -46800000000L, + -259200000000L, + null, + 1584178381000000L); + ColumnVector result = DateTimeUtils.truncate(input, format)) { + assertColumnsAreEqual(expected, result); + } + } +} diff --git a/src/test/java/com/nvidia/spark/rapids/jni/HashTest.java b/src/test/java/com/nvidia/spark/rapids/jni/HashTest.java index 19172a8d33..874cb84b5e 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/HashTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/HashTest.java @@ -387,6 +387,182 @@ void testXXHash64Mixed() { } } + @Test + void testXXHash64Struct() { + try (ColumnVector strings = ColumnVector.fromStrings( + "a", "B\n", "dE\"\u0100\t\u0101 \ud720\ud721", + "A very long (greater than 128 bytes/char string) to test a multi hash-step data point " + + "in the MD5 hash function. This string needed to be longer.", + null, null); + ColumnVector integers = ColumnVector.fromBoxedInts(0, 100, -100, Integer.MIN_VALUE, Integer.MAX_VALUE, null); + ColumnVector doubles = ColumnVector.fromBoxedDoubles( + 0.0, 100.0, -100.0, POSITIVE_DOUBLE_NAN_LOWER_RANGE, POSITIVE_DOUBLE_NAN_UPPER_RANGE, null); + ColumnVector floats = ColumnVector.fromBoxedFloats( + 0f, 100f, -100f, NEGATIVE_FLOAT_NAN_LOWER_RANGE, NEGATIVE_FLOAT_NAN_UPPER_RANGE, null); + ColumnVector bools = ColumnVector.fromBoxedBooleans(true, false, null, false, true, null); + ColumnView structs = ColumnView.makeStructView(strings, integers, doubles, floats, bools); + ColumnVector result = Hash.xxhash64(new ColumnView[]{structs}); + ColumnVector expected = ColumnVector.fromBoxedLongs(7451748878409563026L, 6024043102550151964L, 3380664624738534402L, 8444697026100086329L, -5888679192448042852L, Hash.DEFAULT_XXHASH64_SEED)) { + assertColumnsAreEqual(expected, result); + } + } + + @Test + void testXXHash64NestedStruct() { + try (ColumnVector strings = ColumnVector.fromStrings( + "a", "B\n", "dE\"\u0100\t\u0101 \ud720\ud721", + "A very long (greater than 128 bytes/char string) to test a multi hash-step data point " + + "in the MD5 hash function. This string needed to be longer.", + null, null); + ColumnVector integers = ColumnVector.fromBoxedInts(0, 100, -100, Integer.MIN_VALUE, Integer.MAX_VALUE, null); + ColumnVector doubles = ColumnVector.fromBoxedDoubles( + 0.0, 100.0, -100.0, POSITIVE_DOUBLE_NAN_LOWER_RANGE, POSITIVE_DOUBLE_NAN_UPPER_RANGE, null); + ColumnVector floats = ColumnVector.fromBoxedFloats( + 0f, 100f, -100f, NEGATIVE_FLOAT_NAN_LOWER_RANGE, NEGATIVE_FLOAT_NAN_UPPER_RANGE, null); + ColumnVector bools = ColumnVector.fromBoxedBooleans(true, false, null, false, true, null); + ColumnView structs1 = ColumnView.makeStructView(strings, integers); + ColumnView structs2 = ColumnView.makeStructView(structs1, doubles); + ColumnView structs3 = ColumnView.makeStructView(bools); + ColumnView structs = ColumnView.makeStructView(structs2, floats, structs3); + ColumnVector result = Hash.xxhash64(new ColumnView[]{structs}); + ColumnVector expected = ColumnVector.fromBoxedLongs(7451748878409563026L, 6024043102550151964L, 3380664624738534402L, 8444697026100086329L, -5888679192448042852L, Hash.DEFAULT_XXHASH64_SEED)) { + assertColumnsAreEqual(expected, result); + } + } + + @Test + void testXXHash64Lists() { + try (ColumnVector stringListCV = ColumnVector.fromLists( + new ListType(true, new BasicType(true, DType.STRING)), + Arrays.asList(null, "a"), + Arrays.asList("B\n", ""), + Arrays.asList("dE\"\u0100\t\u0101", " \ud720\ud721"), + Collections.singletonList("A very long (greater than 128 bytes/char string) to test a multi hash-step data point " + + "in the MD5 hash function. This string needed to be longer."), + Collections.singletonList(""), + null); + ColumnVector stringExpected = ColumnVector.fromBoxedLongs(-8582455328737087284L, 7160715839242204087L, -862482741676457612L, -3700309651391443614L, -7444071767201028348L, Hash.DEFAULT_XXHASH64_SEED); + ColumnVector stringResult = Hash.xxhash64(new ColumnView[]{stringListCV}); + ColumnVector intListCV = ColumnVector.fromLists( + new ListType(true, new BasicType(true, DType.INT32)), + Collections.emptyList(), + Arrays.asList(0, -2, 3), + Collections.singletonList(Integer.MAX_VALUE), + Arrays.asList(5, -6, null), + Collections.singletonList(Integer.MIN_VALUE), + null); + ColumnVector intExpected = ColumnVector.fromBoxedLongs(Hash.DEFAULT_XXHASH64_SEED, -4022702357093761688L, 1508894993788531228L, 7329154841501342665L, 2073849959933241805L, Hash.DEFAULT_XXHASH64_SEED); + ColumnVector intResult = Hash.xxhash64(new ColumnVector[]{intListCV})) { + assertColumnsAreEqual(stringExpected, stringResult); + assertColumnsAreEqual(intExpected, intResult); + } + } + + @Test + void testXXHash64NestedLists() { + try (ColumnVector nestedStringListCV = ColumnVector.fromLists( + new ListType(true, new ListType(true, new BasicType(true, DType.STRING))), + Arrays.asList(null, Collections.singletonList("a")), + Collections.singletonList(Arrays.asList("B\n", "")), + Arrays.asList(Collections.singletonList("dE\"\u0100\t\u0101"), Collections.singletonList(" \ud720\ud721")), + Collections.singletonList(Collections.singletonList("A very long (greater than 128 bytes/char string) to test a multi hash-step data point " + + "in the MD5 hash function. This string needed to be longer.")), + Collections.singletonList(Collections.singletonList("")), + null); + ColumnVector stringExpected = ColumnVector.fromBoxedLongs(-8582455328737087284L, 7160715839242204087L, -862482741676457612L, -3700309651391443614L, -7444071767201028348L, Hash.DEFAULT_XXHASH64_SEED); + ColumnVector stringResult = Hash.xxhash64(new ColumnView[]{nestedStringListCV}); + ColumnVector nestedIntListCV = ColumnVector.fromLists( + new ListType(true, new ListType(true, new BasicType(true, DType.INT32))), + Collections.emptyList(), + Arrays.asList(Collections.singletonList(0), Collections.singletonList(-2), Collections.singletonList(3)), + Collections.singletonList(Collections.singletonList(Integer.MAX_VALUE)), + Arrays.asList(Collections.singletonList(5), Arrays.asList(-6, null)), + Collections.singletonList(Collections.singletonList(Integer.MIN_VALUE)), + null); + ColumnVector intExpected = ColumnVector.fromBoxedLongs(Hash.DEFAULT_XXHASH64_SEED, -4022702357093761688L, 1508894993788531228L, 7329154841501342665L, 2073849959933241805L, Hash.DEFAULT_XXHASH64_SEED); + ColumnVector intResult = Hash.xxhash64(new ColumnVector[]{nestedIntListCV});) { + assertColumnsAreEqual(stringExpected, stringResult); + assertColumnsAreEqual(intExpected, intResult); + } + } + + @Test + void testXXHash64StructOfList() { + try (ColumnVector stringListCV = ColumnVector.fromLists( + new ListType(true, new BasicType(true, DType.STRING)), + Arrays.asList(null, "a"), + Arrays.asList("B\n", ""), + Arrays.asList("dE\"\u0100\t\u0101", " \ud720\ud721"), + Collections.singletonList("A very long (greater than 128 bytes/char string) to test a multi hash-step data point " + + "in the MD5 hash function. This string needed to be longer."), + Collections.singletonList(""), + null); + ColumnVector intListCV = ColumnVector.fromLists( + new ListType(true, new BasicType(true, DType.INT32)), + Collections.emptyList(), + Arrays.asList(0, -2, 3), + Collections.singletonList(Integer.MAX_VALUE), + Arrays.asList(5, -6, null), + Collections.singletonList(Integer.MIN_VALUE), + null); + ColumnVector doubles = ColumnVector.fromBoxedDoubles( + 0.0, 100.0, -100.0, POSITIVE_DOUBLE_NAN_LOWER_RANGE, POSITIVE_DOUBLE_NAN_UPPER_RANGE, null); + ColumnVector floats = ColumnVector.fromBoxedFloats( + 0f, 100f, -100f, NEGATIVE_FLOAT_NAN_LOWER_RANGE, NEGATIVE_FLOAT_NAN_UPPER_RANGE, null); + ColumnView structCV = ColumnView.makeStructView(intListCV, stringListCV, doubles, floats); + ColumnVector nestedExpected = ColumnVector.fromBoxedLongs(-8492741646850220468L, -6547737320918905493L, -8718220625378038731L, 5441580647216064522L, 3645801243834961127L, Hash.DEFAULT_XXHASH64_SEED); + ColumnVector nestedResult = Hash.xxhash64(new ColumnView[]{structCV})) { + assertColumnsAreEqual(nestedExpected, nestedResult); + } + } + + @Test + void testXXHash64ListOfStruct() { + try (ColumnVector structListCV = ColumnVector.fromLists(new ListType(true, new StructType(true, + new BasicType(true, DType.STRING), new BasicType(true, DType.INT32), new BasicType(true, DType.FLOAT64), new BasicType(true, DType.FLOAT32), new BasicType(true, DType.BOOL8))), + Collections.emptyList(), + Collections.singletonList(new StructData("a", 0, 0.0, 0f, true)), + Arrays.asList(new StructData("B\n", 100, 100.0, 100f, false), new StructData("dE\"\u0100\t\u0101 \ud720\ud721", -100, -100.0, -100f, null)), + Collections.singletonList(new StructData("A very long (greater than 128 bytes/char string) to test a multi hash-step data point " + + "in the MD5 hash function. This string needed to be longer.", Integer.MIN_VALUE, POSITIVE_DOUBLE_NAN_LOWER_RANGE, NEGATIVE_FLOAT_NAN_LOWER_RANGE, false)), + Arrays.asList(new StructData(null, Integer.MAX_VALUE, POSITIVE_DOUBLE_NAN_UPPER_RANGE, NEGATIVE_FLOAT_NAN_UPPER_RANGE, true), new StructData(null, null, null, null, null)), + null); + ColumnVector result = Hash.xxhash64(new ColumnView[]{structListCV}); + ColumnVector expected = ColumnVector.fromBoxedLongs(Hash.DEFAULT_XXHASH64_SEED, 7451748878409563026L, 948372773124634350L, 8444697026100086329L, -5888679192448042852L, Hash.DEFAULT_XXHASH64_SEED)) { + assertColumnsAreEqual(expected, result); + } + } + + @Test + void testXXHash64NestedDepthExceedsLimit() { + try (ColumnVector nestedIntListCV = ColumnVector.fromLists( + new ListType(true, new ListType(true, new BasicType(true, DType.INT32))), + Arrays.asList(Arrays.asList(null, null), null), + Arrays.asList(Collections.singletonList(0), Collections.singletonList(-2), Collections.singletonList(3)), + Arrays.asList(null, Collections.singletonList(Integer.MAX_VALUE)), + Arrays.asList(Collections.singletonList(5), Arrays.asList(-6, null)), + Arrays.asList(Collections.singletonList(Integer.MIN_VALUE), null), + null); + ColumnVector integers = ColumnVector.fromBoxedInts( + 0, 100, -100, Integer.MIN_VALUE, Integer.MAX_VALUE, null); + ColumnVector doubles = ColumnVector.fromBoxedDoubles(0.0, 100.0, -100.0, + POSITIVE_DOUBLE_NAN_LOWER_RANGE, POSITIVE_DOUBLE_NAN_UPPER_RANGE, null); + ColumnVector floats = ColumnVector.fromBoxedFloats(0f, 100f, -100f, + NEGATIVE_FLOAT_NAN_LOWER_RANGE, NEGATIVE_FLOAT_NAN_UPPER_RANGE, null); + ColumnVector bools = ColumnVector.fromBoxedBooleans( + true, false, null, false, true, null); + ColumnView structs1 = ColumnView.makeStructView(nestedIntListCV, integers); + ColumnView structs2 = ColumnView.makeStructView(structs1, doubles); + ColumnView structs3 = ColumnView.makeStructView(structs2, bools); + ColumnView structs4 = ColumnView.makeStructView(structs3); + ColumnView structs5 = ColumnView.makeStructView(structs4, floats); + ColumnView structs6 = ColumnView.makeStructView(structs5); + ColumnView structs7 = ColumnView.makeStructView(structs6); + ColumnView nestedResult = ColumnView.makeStructView(structs7);) { + assertThrows(CudfException.class, () -> Hash.xxhash64(new ColumnView[]{nestedResult})); + } + } + @Test void testHiveHashBools() { try (ColumnVector v0 = ColumnVector.fromBoxedBooleans(true, false, null); diff --git a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java index 270a4266cd..f618f945b0 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/RmmSparkTest.java @@ -1,5 +1,5 @@ /* - * Copyright (c) 2020-2023, NVIDIA CORPORATION. + * Copyright (c) 2020-2024, NVIDIA CORPORATION. * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. diff --git a/src/test/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializerTest.java b/src/test/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializerTest.java index 210777accf..3ffcb5e61b 100644 --- a/src/test/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializerTest.java +++ b/src/test/java/com/nvidia/spark/rapids/jni/kudo/KudoSerializerTest.java @@ -75,7 +75,7 @@ public void testWriteSimple() throws Exception { try (Table t = buildSimpleTable()) { ByteArrayOutputStream out = new ByteArrayOutputStream(); - long bytesWritten = serializer.writeToStream(t, out, 0, 4); + long bytesWritten = serializer.writeToStreamWithMetrics(t, out, 0, 4).getWrittenBytes(); assertEquals(189, bytesWritten); ByteArrayInputStream in = new ByteArrayInputStream(out.toByteArray()); @@ -365,7 +365,7 @@ private static void checkMergeTable(Table expected, List tableSlices ByteArrayOutputStream bout = new ByteArrayOutputStream(); for (TableSlice slice : tableSlices) { - serializer.writeToStream(slice.getBaseTable(), bout, slice.getStartRow(), slice.getNumRows()); + serializer.writeToStreamWithMetrics(slice.getBaseTable(), bout, slice.getStartRow(), slice.getNumRows()); } bout.flush(); diff --git a/thirdparty/cudf b/thirdparty/cudf index fa62ff45ed..a081a573b6 160000 --- a/thirdparty/cudf +++ b/thirdparty/cudf @@ -1 +1 @@ -Subproject commit fa62ff45eddd8256f0a3e8cebf077970dd70cb67 +Subproject commit a081a573b6ca626f7b77ec21322acff5012e7ada diff --git a/thirdparty/cudf-pins/rapids-cmake.sha b/thirdparty/cudf-pins/rapids-cmake.sha index fad760dd73..005c3541bc 100644 --- a/thirdparty/cudf-pins/rapids-cmake.sha +++ b/thirdparty/cudf-pins/rapids-cmake.sha @@ -1 +1 @@ -fa61767d1584a9c8d083aa9ce8636af89cc63923 +faef975d488895186cf04a714faeb335f73757f0 diff --git a/thirdparty/cudf-pins/versions.json b/thirdparty/cudf-pins/versions.json index 752f515108..ec2b83f4ad 100644 --- a/thirdparty/cudf-pins/versions.json +++ b/thirdparty/cudf-pins/versions.json @@ -5,20 +5,10 @@ { "always_download" : true, "git_shallow" : false, - "git_tag" : "e21d607157218540cd7c45461213fb96adf720b7", + "git_tag" : "05e019afe53f9b0e4454cbd822f9bdda18df49bb", "git_url" : "https://github.com/NVIDIA/cccl.git", "patches" : [ - { - "file" : "${current_json_dir}/../cudf/cpp/cmake/thirdparty/patches/cccl_symbol_visibility.diff", - "fixed_in" : "2.6", - "issue" : "Correct symbol visibility issues in libcudacxx [https://github.com/NVIDIA/cccl/pull/1832/]" - }, - { - "file" : "${current_json_dir}/../cudf/cpp/cmake/thirdparty/patches/thrust_disable_64bit_dispatching.diff", - "fixed_in" : "", - "issue" : "Remove 64bit dispatching as not needed by libcudf and results in compiling twice as many kernels [https://github.com/rapidsai/cudf/pull/11437]" - }, { "file" : "${current_json_dir}/../cudf/cpp/cmake/thirdparty/patches/thrust_faster_sort_compile_times.diff", "fixed_in" : "", @@ -30,7 +20,7 @@ "issue" : "Improve Thrust scan compile times by reducing the number of kernels generated [https://github.com/rapidsai/cudf/pull/8183]" } ], - "version" : "2.5.0" + "version" : "2.7.0" }, "GTest" : { @@ -52,7 +42,7 @@ { "always_download" : true, "git_shallow" : false, - "git_tag" : "dc0f9fc20c2a544e53099e640a681b347532391a", + "git_tag" : "096346b739da3fb1d9c3b402190c0a3a7e554440", "git_url" : "https://github.com/NVIDIA/cuCollections.git", "version" : "0.0.1" }, @@ -99,7 +89,7 @@ { "always_download" : true, "git_shallow" : false, - "git_tag" : "e82574ba3787d5c6b1d1cd3f6aba02b52b233f45", + "git_tag" : "288535770abbe950ab8ec655d44f5aa9d6704cea", "git_url" : "https://github.com/rapidsai/kvikio.git", "version" : "25.02" }, @@ -109,14 +99,6 @@ "git_shallow" : false, "git_tag" : "1e2664a70ec14907409cadcceb14d79b9670bcdb", "git_url" : "https://github.com/apache/arrow-nanoarrow.git", - "patches" : - [ - { - "file" : "${current_json_dir}/../cudf/cpp/cmake/thirdparty/patches/nanoarrow_clang_tidy_compliance.diff", - "fixed_in" : "", - "issue" : "https://github.com/apache/arrow-nanoarrow/issues/537" - } - ], "version" : "0.6.0.dev" }, "nvcomp" : @@ -145,11 +127,19 @@ "git_url" : "https://github.com/NVIDIA/NVTX.git", "version" : "3.1.0" }, + "rapids_logger" : + { + "always_download" : true, + "git_shallow" : false, + "git_tag" : "c510947ae9d3a67530cfe3e5eaccb5a3b8ea0e55", + "git_url" : "https://github.com/rapidsai/rapids-logger.git", + "version" : "c510947ae9d3a67530cfe3e5eaccb5a3b8ea0e55" + }, "rmm" : { "always_download" : true, "git_shallow" : false, - "git_tag" : "c9c6039ab71f91fb41376abea7ec36b8a2563de1", + "git_tag" : "1af03eb55ce51a376c3df2dc0cdf3c81738b2dd6", "git_url" : "https://github.com/rapidsai/rmm.git", "version" : "25.02" },