Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/branch-24.12' into ray/kudo-al…
Browse files Browse the repository at this point in the history
…loc-output
  • Loading branch information
liurenjie1024 committed Nov 15, 2024
2 parents 2365b0a + e5e1603 commit e405a88
Show file tree
Hide file tree
Showing 30 changed files with 2,238 additions and 305 deletions.
4 changes: 4 additions & 0 deletions ci/submodule-sync.sh
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ rapids_cmake_sha=$(git -C ${LIBCUDF_BUILD_PATH}/_deps/rapids-cmake-src/ rev-pars
echo "Update rapids-cmake pinned SHA1 to ${rapids_cmake_sha}"
echo "${rapids_cmake_sha}" > thirdparty/cudf-pins/rapids-cmake.sha

echo "Workaround for https://github.com/NVIDIA/spark-rapids-jni/issues/2582"
cudf_patch_path="cudf/cpp/cmake/thirdparty/patches"
sed -i "s|\${current_json_dir}|\${current_json_dir}/../${cudf_patch_path}|g" thirdparty/cudf-pins/versions.json

# Do the git add after the build so that we get
# the updated versions.json generated by the build
echo "Update cudf submodule to ${cudf_sha} with updated pinned versions"
Expand Down
15 changes: 13 additions & 2 deletions src/main/cpp/src/JSONUtilsJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -142,15 +142,26 @@ Java_com_nvidia_spark_rapids_jni_JSONUtils_getJsonObjectMultiplePaths(JNIEnv* en
}

JNIEXPORT jlong JNICALL Java_com_nvidia_spark_rapids_jni_JSONUtils_extractRawMapFromJsonString(
JNIEnv* env, jclass, jlong j_input)
JNIEnv* env,
jclass,
jlong j_input,
jboolean normalize_single_quotes,
jboolean allow_leading_zeros,
jboolean allow_nonnumeric_numbers,
jboolean allow_unquoted_control)
{
JNI_NULL_CHECK(env, j_input, "j_input is null", 0);

try {
cudf::jni::auto_set_device(env);
auto const input_cv = reinterpret_cast<cudf::column_view const*>(j_input);
return cudf::jni::ptr_as_jlong(
spark_rapids_jni::from_json_to_raw_map(cudf::strings_column_view{*input_cv}).release());
spark_rapids_jni::from_json_to_raw_map(cudf::strings_column_view{*input_cv},
normalize_single_quotes,
allow_leading_zeros,
allow_nonnumeric_numbers,
allow_unquoted_control)
.release());
}
CATCH_STD(env, 0);
}
Expand Down
507 changes: 298 additions & 209 deletions src/main/cpp/src/from_json_to_raw_map.cu

Large diffs are not rendered by default.

23 changes: 15 additions & 8 deletions src/main/cpp/src/from_json_to_raw_map_debug.cuh
Original file line number Diff line number Diff line change
Expand Up @@ -108,37 +108,44 @@ void print_pair_debug(rmm::device_uvector<T> const& input,
}

// Print the final output map data (Spark's MapType, i.e., List<Struct<String,String>>).
void print_output_spark_map(rmm::device_uvector<cudf::size_type> const& list_offsets,
void print_output_spark_map(std::unique_ptr<cudf::column> const& list_offsets,
std::unique_ptr<cudf::column> const& extracted_keys,
std::unique_ptr<cudf::column> const& extracted_values,
rmm::cuda_stream_view stream)
{
auto const keys_child = extracted_keys->child(cudf::strings_column_view::chars_column_index);
if (extracted_keys->size() == 0) {
std::cerr << "Extract keys-values are all empty.\n" << std::endl;
return;
}

auto const keys_sv = cudf::strings_column_view{extracted_keys->view()};
auto const values_sv = cudf::strings_column_view{extracted_values->view()};
auto const keys_offsets = extracted_keys->child(cudf::strings_column_view::offsets_column_index);
auto const values_child = extracted_values->child(cudf::strings_column_view::chars_column_index);
auto const values_offsets =
extracted_values->child(cudf::strings_column_view::offsets_column_index);

auto const h_extracted_keys_child = cudf::detail::make_host_vector_sync(
cudf::device_span<char const>{keys_child.view().data<char>(),
static_cast<size_t>(keys_child.size())},
cudf::device_span<char const>{keys_sv.chars_begin(stream),
static_cast<size_t>(keys_sv.chars_size(stream))},
stream);
auto const h_extracted_keys_offsets = cudf::detail::make_host_vector_sync(
cudf::device_span<int const>{keys_offsets.view().data<int>(),
static_cast<size_t>(keys_offsets.size())},
stream);

auto const h_extracted_values_child = cudf::detail::make_host_vector_sync(
cudf::device_span<char const>{values_child.view().data<char>(),
static_cast<size_t>(values_child.size())},
cudf::device_span<char const>{values_sv.chars_begin(stream),
static_cast<size_t>(values_sv.chars_size(stream))},
stream);
auto const h_extracted_values_offsets = cudf::detail::make_host_vector_sync(
cudf::device_span<int const>{values_offsets.view().data<int>(),
static_cast<size_t>(values_offsets.size())},
stream);

auto const h_list_offsets = cudf::detail::make_host_vector_sync(
cudf::device_span<cudf::size_type const>{list_offsets.data(), list_offsets.size()}, stream);
cudf::device_span<cudf::size_type const>{list_offsets->view().begin<cudf::size_type>(),
static_cast<std::size_t>(list_offsets->size())},
stream);
CUDF_EXPECTS(h_list_offsets.back() == extracted_keys->size(),
"Invalid list offsets computation.");

Expand Down
11 changes: 8 additions & 3 deletions src/main/cpp/src/json_utils.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

#include <cudf/strings/strings_column_view.hpp>
#include <cudf/utilities/default_stream.hpp>
#include <cudf/utilities/memory_resource.hpp>

#include <rmm/cuda_stream_view.hpp>
#include <rmm/device_buffer.hpp>
Expand All @@ -29,14 +30,18 @@ namespace spark_rapids_jni {

std::unique_ptr<cudf::column> from_json_to_raw_map(
cudf::strings_column_view const& input,
bool normalize_single_quotes,
bool allow_leading_zeros,
bool allow_nonnumeric_numbers,
bool allow_unquoted_control,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
rmm::device_async_resource_ref mr = cudf::get_current_device_resource());

std::unique_ptr<cudf::column> make_structs(
std::vector<cudf::column_view> const& input,
cudf::column_view const& is_null,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
rmm::device_async_resource_ref mr = cudf::get_current_device_resource());

/**
* @brief Concatenate the JSON objects given by a strings column into one single character buffer,
Expand All @@ -62,6 +67,6 @@ std::tuple<std::unique_ptr<rmm::device_buffer>, char, std::unique_ptr<cudf::colu
cudf::strings_column_view const& input,
bool nullify_invalid_rows = false,
rmm::cuda_stream_view stream = cudf::get_default_stream(),
rmm::device_async_resource_ref mr = rmm::mr::get_current_device_resource());
rmm::device_async_resource_ref mr = cudf::get_current_device_resource());

} // namespace spark_rapids_jni
21 changes: 19 additions & 2 deletions src/main/java/com/nvidia/spark/rapids/jni/Arms.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
/**
* This class contains utility methods for automatic resource management.
*/
class Arms {
public class Arms {
/**
* This method close the resource if an exception is thrown while executing the function.
*/
Expand Down Expand Up @@ -54,7 +54,10 @@ public static <R extends AutoCloseable> void closeAll(Iterator<R> resources) {
Throwable t = null;
while (resources.hasNext()) {
try {
resources.next().close();
R resource = resources.next();
if (resource != null) {
resource.close();
}
} catch (Exception e) {
if (t == null) {
t = e;
Expand All @@ -81,4 +84,18 @@ public static <R extends AutoCloseable> void closeAll(R... resources) {
public static <R extends AutoCloseable> void closeAll(Collection<R> resources) {
closeAll(resources.iterator());
}

/**
* This method safely closes the resources after applying the function.
* <br/>
* See {@link #closeAll(Iterator)} for more details.
*/
public static <R extends AutoCloseable, C extends Collection<R>, V> V withResource(
C resource, Function<C, V> function) {
try {
return function.apply(resource);
} finally {
closeAll(resource);
}
}
}
36 changes: 31 additions & 5 deletions src/main/java/com/nvidia/spark/rapids/jni/JSONUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -141,23 +141,45 @@ public static ColumnVector[] getJsonObjectMultiplePaths(ColumnVector input,
/**
* Extract key-value pairs for each output map from the given json strings. These key-value are
* copied directly as substrings of the input without any type conversion.
* <p>
* <p/>
* Since there is not any validity check, the output of this function may be different from
* what generated by Spark's `from_json` function. Situations that can lead to
* different/incorrect outputs may include:<br>
* - The value in the input json string is invalid, such as 'abc' value for an integer key.<br>
* - The value string can be non-clean format for floating-point type, such as '1.00000'.
* <p>
* <p/>
* The output of these situations should all be NULL or a value '1.0', respectively. However, this
* function will just simply copy the input value strings to the output.
*
* @param input The input strings column in which each row specifies a json object
* @param opts The options for parsing JSON strings
* @return A map column (i.e., a column of type {@code List<Struct<String,String>>}) in
* which the key-value pairs are extracted directly from the input json strings
*/
public static ColumnVector extractRawMapFromJsonString(ColumnView input, JSONOptions opts) {
assert (input.getType().equals(DType.STRING)) : "Input must be of STRING type";
return new ColumnVector(extractRawMapFromJsonString(input.getNativeView(),
opts.isNormalizeSingleQuotes(),
opts.leadingZerosAllowed(),
opts.nonNumericNumbersAllowed(),
opts.unquotedControlChars()));
}

/**
* Extract key-value pairs for each output map from the given json strings. This method is
* similar to {@link #extractRawMapFromJsonString(ColumnView, JSONOptions)} but is deprecated.
*
* @deprecated This method is deprecated since it does not have parameters to control various
* JSON reader behaviors.
*
* @param input The input strings column in which each row specifies a json object
* @return A map column (i.e., a column of type {@code List<Struct<String,String>>}) in
* which the key-value pairs are extracted directly from the input json strings
* which the key-value pairs are extracted directly from the input json strings
*/
public static ColumnVector extractRawMapFromJsonString(ColumnView input) {
assert (input.getType().equals(DType.STRING)) : "Input must be of STRING type";
return new ColumnVector(extractRawMapFromJsonString(input.getNativeView()));
return new ColumnVector(extractRawMapFromJsonString(input.getNativeView(),
true, true, true, true));
}

/**
Expand Down Expand Up @@ -236,7 +258,11 @@ private static native long[] getJsonObjectMultiplePaths(long input,
int parallelOverride);


private static native long extractRawMapFromJsonString(long input);
private static native long extractRawMapFromJsonString(long input,
boolean normalizeSingleQuotes,
boolean leadingZerosAllowed,
boolean nonNumericNumbersAllowed,
boolean unquotedControlChars);

private static native long[] concatenateJsonStrings(long input);

Expand Down
2 changes: 1 addition & 1 deletion src/main/java/com/nvidia/spark/rapids/jni/Pair.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@
/**
* A utility class for holding a pair of values.
*/
class Pair<K, V> {
public class Pair<K, V> {
private final K left;
private final V right;

Expand Down
14 changes: 14 additions & 0 deletions src/main/java/com/nvidia/spark/rapids/jni/Preconditions.java
Original file line number Diff line number Diff line change
Expand Up @@ -53,4 +53,18 @@ public static int ensureNonNegative(int value, String name) {
}
return value;
}

/**
* Check if the value is non-negative, otherwise throw an IllegalArgumentException with the given message.
* @param value the value to check
* @param name the name of the value
* @return the value if it is non-negative
* @throws IllegalArgumentException if the value is negative
*/
public static long ensureNonNegative(long value, String name) {
if (value < 0) {
throw new IllegalArgumentException(name + " must be non-negative, but was " + value);
}
return value;
}
}
121 changes: 121 additions & 0 deletions src/main/java/com/nvidia/spark/rapids/jni/kudo/ColumnOffsetInfo.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,121 @@
/*
* Copyright (c) 2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.nvidia.spark.rapids.jni.kudo;

import ai.rapids.cudf.DeviceMemoryBufferView;

import static com.nvidia.spark.rapids.jni.Preconditions.ensureNonNegative;

/**
* This class is used to store the offsets of the buffer of a column in the serialized data.
*/
class ColumnOffsetInfo {
static final long INVALID_OFFSET = -1L;
private final long validity;
private final long validityBufferLen;
private final long offset;
private final long offsetBufferLen;
private final long data;
private final long dataBufferLen;

public ColumnOffsetInfo(long validity, long validityBufferLen, long offset, long offsetBufferLen, long data,
long dataBufferLen) {
ensureNonNegative(validityBufferLen, "validityBuffeLen");
ensureNonNegative(offsetBufferLen, "offsetBufferLen");
ensureNonNegative(dataBufferLen, "dataBufferLen");
this.validity = validity;
this.validityBufferLen = validityBufferLen;
this.offset = offset;
this.offsetBufferLen = offsetBufferLen;
this.data = data;
this.dataBufferLen = dataBufferLen;
}

/**
* Get the validity buffer offset.
* @return {@value #INVALID_OFFSET} if the validity buffer is not present, otherwise the offset.
*/
long getValidity() {
return validity;
}

/**
* Get a view of the validity buffer from underlying buffer.
* @param baseAddress the base address of underlying buffer.
* @return null if the validity buffer is not present, otherwise a view of the buffer.
*/
DeviceMemoryBufferView getValidityBuffer(long baseAddress) {
if (validity == INVALID_OFFSET) {
return null;
}
return new DeviceMemoryBufferView(validity + baseAddress, validityBufferLen);
}

/**
* Get the offset buffer offset.
* @return {@value #INVALID_OFFSET} if the offset buffer is not present, otherwise the offset.
*/
long getOffset() {
return offset;
}

/**
* Get a view of the offset buffer from underlying buffer.
* @param baseAddress the base address of underlying buffer.
* @return null if the offset buffer is not present, otherwise a view of the buffer.
*/
DeviceMemoryBufferView getOffsetBuffer(long baseAddress) {
if (offset == INVALID_OFFSET) {
return null;
}
return new DeviceMemoryBufferView(offset + baseAddress, offsetBufferLen);
}

/**
* Get the data buffer offset.
* @return {@value #INVALID_OFFSET} if the data buffer is not present, otherwise the offset.
*/
long getData() {
return data;
}

/**
* Get a view of the data buffer from underlying buffer.
* @param baseAddress the base address of underlying buffer.
* @return null if the data buffer is not present, otherwise a view of the buffer.
*/
DeviceMemoryBufferView getDataBuffer(long baseAddress) {
if (data == INVALID_OFFSET) {
return null;
}
return new DeviceMemoryBufferView(data + baseAddress, dataBufferLen);
}

long getDataBufferLen() {
return dataBufferLen;
}

@Override
public String toString() {
return "ColumnOffsets{" +
"validity=" + validity +
", offset=" + offset +
", data=" + data +
", dataLen=" + dataBufferLen +
'}';
}
}
Loading

0 comments on commit e405a88

Please sign in to comment.