diff --git a/src/main/cpp/CMakeLists.txt b/src/main/cpp/CMakeLists.txt index 600e6ac245..5e2fa2bed6 100644 --- a/src/main/cpp/CMakeLists.txt +++ b/src/main/cpp/CMakeLists.txt @@ -156,6 +156,7 @@ add_library( src/HashJni.cpp src/MapUtilsJni.cpp src/NativeParquetJni.cpp + src/ParseURIJni.cpp src/RowConversionJni.cpp src/SparkResourceAdaptorJni.cpp src/ZOrderJni.cpp @@ -167,6 +168,7 @@ add_library( src/decimal_utils.cu src/map_utils.cu src/murmur_hash.cu + src/parse_uri.cu src/row_conversion.cu src/utilities.cu src/xxhash64.cu diff --git a/src/main/cpp/benchmarks/CMakeLists.txt b/src/main/cpp/benchmarks/CMakeLists.txt index 7ce778b035..23d35b0bea 100644 --- a/src/main/cpp/benchmarks/CMakeLists.txt +++ b/src/main/cpp/benchmarks/CMakeLists.txt @@ -1,5 +1,5 @@ #============================================================================= -# Copyright (c) 2022, NVIDIA CORPORATION. +# Copyright (c) 2022-2023, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -23,7 +23,7 @@ target_compile_options( ) target_link_libraries( - spark_rapids_jni_datagen PUBLIC cudf::cudf + spark_rapids_jni_datagen PUBLIC cudf::cudf ) target_include_directories( @@ -56,7 +56,7 @@ function(ConfigureBench CMAKE_BENCH_NAME) CUDA_STANDARD_REQUIRED ON ) target_link_libraries(${CMAKE_BENCH_NAME} nvbench::main spark_rapids_jni_datagen ${CUDF_BENCHMARK_COMMON} - cudf::cudf spark_rapids_jni Threads::Threads) + cudf::cudf spark_rapids_jni Threads::Threads cudf::cudftestutil) install( TARGETS ${CMAKE_BENCH_NAME} COMPONENT testing @@ -77,3 +77,6 @@ ConfigureBench(STRING_TO_FLOAT_BENCH ConfigureBench(BLOOM_FILTER_BENCH bloom_filter.cu) + +ConfigureBench(PARSE_URI_BENCH + parse_uri.cpp) diff --git a/src/main/cpp/benchmarks/parse_uri.cpp b/src/main/cpp/benchmarks/parse_uri.cpp new file mode 100644 index 0000000000..26e46117d8 --- /dev/null +++ b/src/main/cpp/benchmarks/parse_uri.cpp @@ -0,0 +1,145 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include + +#include + +#include +#include +#include +#include +#include + +static void bench_random_parse_uri(nvbench::state& state) +{ + cudf::size_type const n_rows{(cudf::size_type)state.get_int64("num_rows")}; + + auto const table = create_random_table({cudf::type_id::STRING}, row_count{n_rows}); + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + auto out = + spark_rapids_jni::parse_uri_to_protocol(cudf::strings_column_view{table->get_column(0)}); + }); + + state.add_buffer_size(n_rows, "trc", "Total Rows"); +} + +static void bench_parse_uri(nvbench::state& state) +{ + auto const n_rows = static_cast(state.get_int64("num_rows")); + auto const hit_rate = static_cast(state.get_int64("hit_rate")); + + // build input table using the following data + auto raw_data = + cudf::test::strings_column_wrapper( + { + "https://www.google.com/s/" + "query?parameternumber0=0¶meternumber1=1¶meternumber2=2parameternumber2=" + "2parameternumber2=2parameternumber2=2parameternumber2=2parameternumber2=2parameternumber2=" + "2parameternumber2=2parameternumber2=2parameternumber2=2parameternumber2=2parameternumber2=" + "2parameternumber2=2parameternumber2=2parameternumber2=2parameternumber2=2parameternumber2=" + "2parameternumber2=2parameternumber2=2parameternumber2=2parameternumber2=2parameternumber2=" + "2parameternumber2=2parameternumber2=2parameternumber2=2", // valid uri + "abcdefghijklmnopqrstuvwxyz 01234abcdefghijklmnopqrstuvwxyz " + "01234abcdefghijklmnopqrstuvwxyz 01234abcdefghijklmnopqrstuvwxyz " + "01234abcdefghijklmnopqrstuvwxyz 01234abcdefghijklmnopqrstuvwxyz " + "01234abcdefghijklmnopqrstuvwxyz 01234abcdefghijklmnopqrstuvwxyz " + "01234abcdefghijklmnopqrstuvwxyz 01234abcdefghijklmnopqrstuvwxyz " + "01234abcdefghijklmnopqrstuvwxyz 01234abcdefghijklmnopqrstuvwxyz " + "01234abcdefghijklmnopqrstuvwxyz 01234abcdefghijklmnopqrstuvwxyz " + "01234abcdefghijklmnopqrstuvwxyz 01234abcdefghijklmnopqrstuvwxyz " + "01234abcdefghijklmnopqrstuvwxyz 01234abcdefghijklmnopqrstuvwxyz 01234", // the rest are + // invalid + "", + "AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ " + "01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ " + "01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ " + "01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ " + "01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ " + "01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ " + "01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01AbcéDEFGHIJKLMNOPQRSTUVWXYZ 01", + "9876543210,abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU", + "9876543210,abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU9876543210," + "abcdefghijklmnopqrstU9876543210,abcdefghijklmnopqrstU", + "123 édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 " + "édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf " + "4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf " + "4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf " + "4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf " + "4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf " + "4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf " + "4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5123 édf " + "4567890 DéFG 0987 X5123 édf 4567890 DéFG 0987 X5", + "1", + }) + .release(); + + auto data_view = raw_data->view(); + + // compute number of rows in n_rows that should match + auto const matches = static_cast(n_rows * hit_rate) / 100; + + // Create a randomized gather-map to build a column out of the strings in data. + data_profile gather_profile = + data_profile_builder().cardinality(0).null_probability(0.0).distribution( + cudf::type_id::INT32, distribution_id::UNIFORM, 1, data_view.size() - 1); + auto gather_table = + create_random_table({cudf::type_id::INT32}, row_count{n_rows}, gather_profile); + gather_table->get_column(0).set_null_mask(rmm::device_buffer{}, 0); + + // Create scatter map by placing 0-index values throughout the gather-map + auto scatter_data = cudf::sequence( + matches, cudf::numeric_scalar(0), cudf::numeric_scalar(n_rows / matches)); + auto zero_scalar = cudf::numeric_scalar(0); + auto table = cudf::scatter({zero_scalar}, scatter_data->view(), gather_table->view()); + auto gather_map = table->view().column(0); + table = cudf::gather(cudf::table_view({data_view}), gather_map); + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + auto out = + spark_rapids_jni::parse_uri_to_protocol(cudf::strings_column_view{table->get_column(0)}); + }); + + state.add_buffer_size(n_rows, "trc", "Total Rows"); +} + +NVBENCH_BENCH(bench_random_parse_uri) + .set_name("Strings") + .add_int64_axis("num_rows", {512 * 1024, 1 * 1024 * 1024}); + +NVBENCH_BENCH(bench_parse_uri) + .set_name("URIStringMix") + .add_int64_axis("num_rows", {512 * 1024, 1 * 1024 * 1024}) + .add_int64_axis("hit_rate", {5, 50, 100}); diff --git a/src/main/cpp/src/ParseURIJni.cpp b/src/main/cpp/src/ParseURIJni.cpp new file mode 100644 index 0000000000..0d2d245108 --- /dev/null +++ b/src/main/cpp/src/ParseURIJni.cpp @@ -0,0 +1,36 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "cudf_jni_apis.hpp" +#include "dtype_utils.hpp" +#include "parse_uri.hpp" + +extern "C" { + +JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_ParseURI_parseProtocol(JNIEnv* env, + jclass, + jlong input_column) +{ + JNI_NULL_CHECK(env, input_column, "input column is null", 0); + + try { + cudf::jni::auto_set_device(env); + auto const input = reinterpret_cast(input_column); + return cudf::jni::ptr_as_jlong(spark_rapids_jni::parse_uri_to_protocol(*input).release()); + } + CATCH_STD(env, 0); +} +} diff --git a/src/main/cpp/src/parse_uri.cu b/src/main/cpp/src/parse_uri.cu new file mode 100644 index 0000000000..54e79ab022 --- /dev/null +++ b/src/main/cpp/src/parse_uri.cu @@ -0,0 +1,368 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "parse_uri.hpp" + +#include +#include +#include +#include +#include +#include +#include + +#include +#include + +#include + +namespace spark_rapids_jni { + +using namespace cudf; + +namespace detail { +namespace { + +// utility to validate a character is valid in a URI +constexpr bool is_valid_character(char ch, bool alphanum_only) +{ + if (alphanum_only) { + if (ch >= '-' && ch <= '9' && ch != '/') return true; // 0-9 and .- + if (ch >= 'A' && ch <= 'Z') return true; // A-Z + if (ch >= 'a' && ch <= 'z') return true; // a-z + } else { + if (ch >= '!' && ch <= ';' && ch != '"') return true; // 0-9 and !#%&'()*+,-./ + if (ch >= '=' && ch <= 'Z' && ch != '>') return true; // A-Z and =?@ + if (ch >= '_' && ch <= 'z' && ch != '`') return true; // a-z and _ + } + return false; +} + +/** + * @brief Count the number of characters of each string after parsing the protocol. + * + * @tparam num_warps_per_threadblock Number of warps in a threadblock. This template argument must + * match the launch configuration, i.e. the kernel must be launched with + * `num_warps_per_threadblock * cudf::detail::warp_size` threads per threadblock. + * @tparam char_block_size Number of characters which will be loaded into the shared memory at a + * time. + * + * @param in_strings Input string column + * @param out_counts Number of characters in each decode URL + * @param out_validity Bitmask of validity data, updated in funcion + */ +template +__global__ void parse_uri_protocol_char_counter(column_device_view const in_strings, + size_type* const out_counts, + bitmask_type* out_validity) +{ + __shared__ char temporary_buffer[num_warps_per_threadblock][char_block_size]; + __shared__ typename cub::WarpScan::TempStorage cub_storage[num_warps_per_threadblock]; + __shared__ bool found_token[num_warps_per_threadblock]; + + auto const global_thread_id = cudf::detail::grid_1d::global_thread_id(); + auto const global_warp_id = static_cast(global_thread_id / cudf::detail::warp_size); + auto const local_warp_id = static_cast(threadIdx.x / cudf::detail::warp_size); + auto const warp_lane = static_cast(threadIdx.x % cudf::detail::warp_size); + auto const nwarps = static_cast(gridDim.x * blockDim.x / cudf::detail::warp_size); + char* in_chars_shared = temporary_buffer[local_warp_id]; + + // Loop through strings, and assign each string to a warp. + for (thread_index_type tidx = global_warp_id; tidx < in_strings.size(); tidx += nwarps) { + auto const row_idx = static_cast(tidx); + if (in_strings.is_null(row_idx)) { + if (warp_lane == 0) out_counts[row_idx] = 0; + continue; + } + + auto const in_string = in_strings.element(row_idx); + auto const in_chars = in_string.data(); + auto const string_length = in_string.size_bytes(); + auto const nblocks = cudf::util::div_rounding_up_unsafe(string_length, char_block_size); + size_type output_string_size = 0; + + // valid until proven otherwise + bool valid{true}; + + // Use the last thread of the warp to initialize `found_token` to false. + if (warp_lane == cudf::detail::warp_size - 1) { found_token[local_warp_id] = false; } + + for (size_type block_idx = 0; block_idx < nblocks && valid; block_idx++) { + auto const string_length_block = + std::min(char_block_size, string_length - char_block_size * block_idx); + + // Each warp collectively loads input characters of the current block to the shared memory. + for (auto char_idx = warp_lane; char_idx < string_length_block; + char_idx += cudf::detail::warp_size) { + auto const in_idx = block_idx * char_block_size + char_idx; + in_chars_shared[char_idx] = in_idx < string_length ? in_chars[in_idx] : 0; + } + + __syncwarp(); + + // `char_idx_start` represents the start character index of the current warp. + for (size_type char_idx_start = 0; char_idx_start < string_length_block; + char_idx_start += cudf::detail::warp_size) { + auto const char_idx = char_idx_start + warp_lane; + char const* const ch_ptr = in_chars_shared + char_idx; + + // need to know if the character we are validating is before or after the token + // as valid characters changes. Default to 1 to handle the case where we have + // alreayd found the token and do not search for it again. + int8_t out_tokens{1}; + if (!found_token[local_warp_id]) { + // Warp-wise prefix sum to establish tokens of string. + // All threads in the warp participate in the prefix sum, even if `char_idx` is beyond + // `string_length_block`. + int8_t const is_token = (char_idx < string_length_block && *ch_ptr == ':') ? 1 : 0; + cub::WarpScan(cub_storage[local_warp_id]).InclusiveSum(is_token, out_tokens); + } + + auto const before_token = out_tokens == 0; + valid = valid && __ballot_sync(0xffffffff, + (char_idx >= string_length_block || + is_valid_character(*ch_ptr, before_token)) + ? 0 + : 1) == 0; + if (!valid) { + // last thread in warp sets validity + if (warp_lane == cudf::detail::warp_size - 1) { + clear_bit(out_validity, row_idx); + out_counts[row_idx] = 0; + } + break; + } + + // if we have already found our token, no more string copy we only need to validate + // characters + if (!found_token[local_warp_id]) { + // If the current character is before the token we will output the character. + int8_t const out_size = (char_idx >= string_length_block || out_tokens > 0) ? 0 : 1; + + // Warp-wise prefix sum to establish output location of the current thread. + // All threads in the warp participate in the prefix sum, even if `char_idx` is beyond + // `string_length_block`. + int8_t out_offset; + cub::WarpScan(cub_storage[local_warp_id]).InclusiveSum(out_size, out_offset); + + // last thread of the warp updates offsets and token since it has the last offset and + // token value + if (warp_lane == cudf::detail::warp_size - 1) { + output_string_size += out_offset; + found_token[local_warp_id] = out_tokens > 0; + } + } + + __syncwarp(); + } + } + + // last thread of the warp sets output size + if (warp_lane == cudf::detail::warp_size - 1) { + if (!found_token[local_warp_id]) { + clear_bit(out_validity, row_idx); + out_counts[row_idx] = 0; + } else if (valid) { + out_counts[row_idx] = output_string_size; + } + } + } +} + +/** + * @brief Parse protocol and copy from the input string column to the output char buffer. + * + * @tparam num_warps_per_threadblock Number of warps in a threadblock. This template argument must + * match the launch configuration, i.e. the kernel must be launched with + * `num_warps_per_threadblock * cudf::detail::warp_size` threads per threadblock. + * @tparam char_block_size Number of characters which will be loaded into the shared memory at a + * time. + * + * @param in_strings Input string column + * @param in_validity Validity vector of output column + * @param out_chars Character buffer for the output string column + * @param out_offsets Offset value of each string associated with `out_chars` + */ +template +__global__ void parse_uri_to_protocol(column_device_view const in_strings, + bitmask_type* in_validity, + char* const out_chars, + size_type const* const out_offsets) +{ + __shared__ char temporary_buffer[num_warps_per_threadblock][char_block_size]; + __shared__ typename cub::WarpScan::TempStorage cub_storage[num_warps_per_threadblock]; + __shared__ size_type out_idx[num_warps_per_threadblock]; + __shared__ bool found_token[num_warps_per_threadblock]; + + auto const global_thread_id = cudf::detail::grid_1d::global_thread_id(); + auto const global_warp_id = static_cast(global_thread_id / cudf::detail::warp_size); + auto const local_warp_id = static_cast(threadIdx.x / cudf::detail::warp_size); + auto const warp_lane = static_cast(threadIdx.x % cudf::detail::warp_size); + auto const nwarps = static_cast(gridDim.x * blockDim.x / cudf::detail::warp_size); + char* in_chars_shared = temporary_buffer[local_warp_id]; + + // Loop through strings, and assign each string to a warp + for (thread_index_type tidx = global_warp_id; tidx < in_strings.size(); tidx += nwarps) { + auto const row_idx = static_cast(tidx); + if (!bit_is_set(in_validity, row_idx)) { continue; } + + auto const in_string = in_strings.element(row_idx); + auto const in_chars = in_string.data(); + auto const string_length = in_string.size_bytes(); + auto out_chars_string = out_chars + out_offsets[row_idx]; + auto const nblocks = cudf::util::div_rounding_up_unsafe(string_length, char_block_size); + + // Use the last thread of the warp to initialize `out_idx` to 0 and `found_token` to false. + if (warp_lane == cudf::detail::warp_size - 1) { + out_idx[local_warp_id] = 0; + found_token[local_warp_id] = false; + } + + __syncwarp(); + + for (size_type block_idx = 0; block_idx < nblocks && !found_token[local_warp_id]; block_idx++) { + auto const string_length_block = + std::min(char_block_size, string_length - char_block_size * block_idx); + + // Each warp collectively loads input characters of the current block to shared memory. + for (auto char_idx = warp_lane; char_idx < string_length_block; + char_idx += cudf::detail::warp_size) { + auto const in_idx = block_idx * char_block_size + char_idx; + in_chars_shared[char_idx] = in_idx >= 0 && in_idx < string_length ? in_chars[in_idx] : 0; + } + + __syncwarp(); + + // `char_idx_start` represents the start character index of the current warp. + for (size_type char_idx_start = 0; + char_idx_start < string_length_block && !found_token[local_warp_id]; + char_idx_start += cudf::detail::warp_size) { + auto const char_idx = char_idx_start + warp_lane; + char const* const ch_ptr = in_chars_shared + char_idx; + + // Warp-wise prefix sum to establish tokens of string. + // All threads in the warp participate in the prefix sum, even if `char_idx` is beyond + // `string_length_block`. + int8_t const is_token = (char_idx < string_length_block && *ch_ptr == ':') ? 1 : 0; + int8_t out_tokens; + cub::WarpScan(cub_storage[local_warp_id]).InclusiveSum(is_token, out_tokens); + + // If the current character is before the token we will output the character. + int8_t const out_size = (char_idx >= string_length_block || out_tokens > 0) ? 0 : 1; + + // Warp-wise prefix sum to establish output location of the current thread. + // All threads in the warp participate in the prefix sum, even if `char_idx` is beyond + // `string_length_block`. + int8_t out_offset; + cub::WarpScan(cub_storage[local_warp_id]).ExclusiveSum(out_size, out_offset); + + // out_size of 1 means this thread writes a byte + if (out_size == 1) { out_chars_string[out_idx[local_warp_id] + out_offset] = *ch_ptr; } + + // last thread of the warp updates the offset and the token + if (warp_lane == cudf::detail::warp_size - 1) { + out_idx[local_warp_id] += (out_offset + out_size); + found_token[local_warp_id] = out_tokens > 0; + } + + __syncwarp(); + } + } + } +} + +} // namespace + +std::unique_ptr parse_uri_to_protocol(strings_column_view const& input, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + size_type strings_count = input.size(); + if (strings_count == 0) return make_empty_column(type_id::STRING); + + constexpr size_type num_warps_per_threadblock = 4; + constexpr size_type threadblock_size = num_warps_per_threadblock * cudf::detail::warp_size; + constexpr size_type char_block_size = 256; + auto const num_threadblocks = + std::min(65536, cudf::util::div_rounding_up_unsafe(strings_count, num_warps_per_threadblock)); + + auto offset_count = strings_count + 1; + auto const d_strings = column_device_view::create(input.parent(), stream); + + // build offsets column + auto offsets_column = make_numeric_column( + data_type{type_to_id()}, offset_count, mask_state::UNALLOCATED, stream, mr); + + // copy null mask + rmm::device_buffer null_mask = + input.parent().nullable() + ? cudf::detail::copy_bitmask(input.parent(), stream, mr) + : cudf::detail::create_null_mask(input.size(), mask_state::ALL_VALID, stream, mr); + + // count number of bytes in each string after parsing and store it in offsets_column + auto offsets_view = offsets_column->view(); + auto offsets_mutable_view = offsets_column->mutable_view(); + parse_uri_protocol_char_counter + <<>>( + *d_strings, + offsets_mutable_view.begin(), + reinterpret_cast(null_mask.data())); + + // use scan to transform number of bytes into offsets + thrust::exclusive_scan(rmm::exec_policy(stream), + offsets_view.begin(), + offsets_view.end(), + offsets_mutable_view.begin()); + + // copy the total number of characters of all strings combined (last element of the offset column) + // to the host memory + auto out_chars_bytes = cudf::detail::get_value(offsets_view, offset_count - 1, stream); + + // create the chars column + auto chars_column = cudf::strings::detail::create_chars_child_column(out_chars_bytes, stream, mr); + auto d_out_chars = chars_column->mutable_view().data(); + + // parse and copy the characters from the input column to the output column + parse_uri_to_protocol + <<>>( + *d_strings, + reinterpret_cast(null_mask.data()), + d_out_chars, + offsets_column->view().begin()); + + auto null_count = + cudf::null_count(reinterpret_cast(null_mask.data()), 0, strings_count); + + return make_strings_column(strings_count, + std::move(offsets_column), + std::move(chars_column), + null_count, + std::move(null_mask)); +} + +} // namespace detail + +// external API + +std::unique_ptr parse_uri_to_protocol(strings_column_view const& input, + rmm::cuda_stream_view stream, + rmm::mr::device_memory_resource* mr) +{ + CUDF_FUNC_RANGE(); + return detail::parse_uri_to_protocol(input, stream, mr); +} + +} // namespace spark_rapids_jni \ No newline at end of file diff --git a/src/main/cpp/src/parse_uri.hpp b/src/main/cpp/src/parse_uri.hpp new file mode 100644 index 0000000000..3237e347ab --- /dev/null +++ b/src/main/cpp/src/parse_uri.hpp @@ -0,0 +1,41 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#pragma once + +#include +#include + +#include + +#include + +namespace spark_rapids_jni { + +/** + * @brief Parse protocol and copy from the input string column to the output char buffer. + * + * @param input Input string column of URIs to parse + * @param stream Stream on which to operate. + * @param mr Memory resource for returned column + * @return std::unique_ptr String column of protocols parsed. + */ +std::unique_ptr parse_uri_to_protocol( + cudf::strings_column_view const& input, + rmm::cuda_stream_view stream = rmm::cuda_stream_default, + rmm::mr::device_memory_resource* mr = rmm::mr::get_current_device_resource()); + +} // namespace spark_rapids_jni diff --git a/src/main/cpp/tests/CMakeLists.txt b/src/main/cpp/tests/CMakeLists.txt index 5b95291351..fcc956bc34 100644 --- a/src/main/cpp/tests/CMakeLists.txt +++ b/src/main/cpp/tests/CMakeLists.txt @@ -66,3 +66,5 @@ ConfigureTest(BLOOM_FILTER ConfigureTest(UTILITIES utilities.cpp) +ConfigureTest(PARSE_URI + parse_uri.cpp) diff --git a/src/main/cpp/tests/parse_uri.cpp b/src/main/cpp/tests/parse_uri.cpp new file mode 100644 index 0000000000..3ff14a6075 --- /dev/null +++ b/src/main/cpp/tests/parse_uri.cpp @@ -0,0 +1,95 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include +#include +#include + +struct ParseURIProtocolTests : public cudf::test::BaseFixture {}; + +TEST_F(ParseURIProtocolTests, Simple) +{ + cudf::test::strings_column_wrapper col({ + "https://www.nvidia.com/s/uri?param1=2", + "http://www.nvidia.com", + "file://path/to/a/cool/file", + "smb://network/path/to/file", + "http:/www.nvidia.com", + "file:path/to/a/cool/file", + }); + auto result = spark_rapids_jni::parse_uri_to_protocol(cudf::strings_column_view{col}); + + cudf::test::strings_column_wrapper expected({"https", "http", "file", "smb", "http", "file"}); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(result->view(), expected); +} + +TEST_F(ParseURIProtocolTests, Negatives) +{ + cudf::test::strings_column_wrapper col({ + "https//www.nvidia.com/s/uri?param1=2", + "/network/path/to/file", + "nvidia.com", + "www.nvidia.com/s/uri", + }); + auto result = spark_rapids_jni::parse_uri_to_protocol(cudf::strings_column_view{col}); + + cudf::test::strings_column_wrapper expected({"", "", "", ""}, {0, 0, 0, 0}); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(result->view(), expected); +} + +TEST_F(ParseURIProtocolTests, SparkEdges) +{ + cudf::test::strings_column_wrapper col( + {"https://nvidia.com/https&#://nvidia.com", + "https://http://www.nvidia.com", + "filesystemmagicthing://bob.yaml", + "nvidia.com:8080", + "http://thisisinvalid.data/due/to-the_character%s/inside*the#url`~", + "file:/absolute/path", + "//www.nvidia.com", + "#bob", + "#this%doesnt#make//sense://to/me", + "HTTP:&bob", + "/absolute/path", + "http://%77%77%77.%4EV%49%44%49%41.com", + "https:://broken.url", + "https://www.nvidia.com/q/This%20is%20a%20query"}); + + auto result = spark_rapids_jni::parse_uri_to_protocol(cudf::strings_column_view{col}); + + cudf::test::strings_column_wrapper expected({"https", + "https", + "filesystemmagicthing", + "nvidia.com", + "", + "file", + "", + "", + "", + "HTTP", + "", + "http", + "https", + "https"}, + {1, 1, 1, 1, 0, 1, 0, 0, 0, 1, 0, 1, 1, 1}); + + CUDF_TEST_EXPECT_COLUMNS_EQUIVALENT(result->view(), expected); +} \ No newline at end of file diff --git a/src/main/java/com/nvidia/spark/rapids/jni/ParseURI.java b/src/main/java/com/nvidia/spark/rapids/jni/ParseURI.java new file mode 100644 index 0000000000..0c0b046f15 --- /dev/null +++ b/src/main/java/com/nvidia/spark/rapids/jni/ParseURI.java @@ -0,0 +1,44 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.jni; + +import ai.rapids.cudf.ColumnVector; +import ai.rapids.cudf.ColumnView; +import ai.rapids.cudf.DType; +import ai.rapids.cudf.NativeDepsLoader; + +public class ParseURI { + static { + NativeDepsLoader.loadNativeDeps(); + } + + + /** + * Parse protocol for each URI from the incoming column. + * + * @param URIColumn The input strings column in which each row contains a URI. + * @return A string column with protocol data extracted. + */ + public static ColumnVector parseURIProtocol(ColumnView uriColumn) { + assert uriColumn.getType().equals(DType.STRING) : "Input type must be String"; + return new ColumnVector(parseProtocol(uriColumn.getNativeView())); + } + + + private static native long parseProtocol(long jsonColumnHandle); + +} diff --git a/src/test/java/com/nvidia/spark/rapids/jni/ParseURITest.java b/src/test/java/com/nvidia/spark/rapids/jni/ParseURITest.java new file mode 100644 index 0000000000..7289d110b2 --- /dev/null +++ b/src/test/java/com/nvidia/spark/rapids/jni/ParseURITest.java @@ -0,0 +1,73 @@ +/* + * Copyright (c) 2023, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.jni; + +import java.net.URI; +import java.net.URISyntaxException; + +import org.junit.jupiter.api.Test; + +import ai.rapids.cudf.AssertUtils; +import ai.rapids.cudf.ColumnVector; + +public class ParseURITest { + @Test + void parseURIToProtocolTest() { + String[] testData = {"https://nvidia.com/https&#://nvidia.com", + "https://http://www.nvidia.com", + "filesystemmagicthing://bob.yaml", + "nvidia.com:8080", + "http://thisisinvalid.data/due/to-the_character%s/inside*the#url`~", + "file:/absolute/path", + "//www.nvidia.com", + "#bob", + "#this%doesnt#make//sense://to/me", + "HTTP:&bob", + "/absolute/path", + "http://%77%77%77.%4EV%49%44%49%41.com", + "https:://broken.url", + "https://www.nvidia.com/q/This%20is%20a%20query", + "http:/www.nvidia.com", + "http://:www.nvidia.com/", + "http:///nvidia.com/q", + "https://www.nvidia.com:8080/q", + "https://www.nvidia.com#8080", + "file://path/to/cool/file", + "http//www.nvidia.com/q", + "", + null}; + + String[] expectedStrings = new String[testData.length]; + for (int i=0; i