diff --git a/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh b/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh index e0c7ce840d7..69edf38e359 100644 --- a/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh +++ b/cpp/include/cudf/hashing/detail/murmurhash3_x86_32.cuh @@ -57,62 +57,71 @@ struct MurmurHash3_x86_32 { }; template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()(bool const& key) const +MurmurHash3_x86_32::result_type __device__ inline MurmurHash3_x86_32::operator()( + bool const& key) const { return this->compute(static_cast(key)); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()(float const& key) const +MurmurHash3_x86_32::result_type __device__ inline MurmurHash3_x86_32::operator()( + float const& key) const { return this->compute(normalize_nans_and_zeros(key)); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()(double const& key) const +MurmurHash3_x86_32::result_type __device__ inline MurmurHash3_x86_32::operator()( + double const& key) const { return this->compute(normalize_nans_and_zeros(key)); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - cudf::string_view const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + cudf::string_view const& key) const { return this->compute_bytes(reinterpret_cast(key.data()), key.size_bytes()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - numeric::decimal32 const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + numeric::decimal32 const& key) const { return this->compute(key.value()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - numeric::decimal64 const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + numeric::decimal64 const& key) const { return this->compute(key.value()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - numeric::decimal128 const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + numeric::decimal128 const& key) const { return this->compute(key.value()); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - cudf::list_view const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + cudf::list_view const& key) const { CUDF_UNREACHABLE("List column hashing is not supported"); } template <> -hash_value_type __device__ inline MurmurHash3_x86_32::operator()( - cudf::struct_view const& key) const +MurmurHash3_x86_32::result_type + __device__ inline MurmurHash3_x86_32::operator()( + cudf::struct_view const& key) const { CUDF_UNREACHABLE("Direct hashing of struct_view is not supported"); } diff --git a/cpp/src/groupby/hash/compute_groupby.cu b/cpp/src/groupby/hash/compute_groupby.cu index e1dbf2a3d9e..9648d942513 100644 --- a/cpp/src/groupby/hash/compute_groupby.cu +++ b/cpp/src/groupby/hash/compute_groupby.cu @@ -61,7 +61,7 @@ std::unique_ptr compute_groupby(table_view const& keys, d_row_equal, probing_scheme_t{d_row_hash}, cuco::thread_scope_device, - cuco::storage{}, + cuco::storage{}, cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}, stream.value()}; diff --git a/cpp/src/groupby/hash/compute_mapping_indices.cuh b/cpp/src/groupby/hash/compute_mapping_indices.cuh index d353830780f..f86a93109be 100644 --- a/cpp/src/groupby/hash/compute_mapping_indices.cuh +++ b/cpp/src/groupby/hash/compute_mapping_indices.cuh @@ -106,15 +106,15 @@ CUDF_KERNEL void mapping_indices_kernel(cudf::size_type num_input_rows, __shared__ cudf::size_type shared_set_indices[GROUPBY_SHM_MAX_ELEMENTS]; // Shared set initialization - __shared__ cuco::window windows[window_extent.value()]; + __shared__ cuco::bucket buckets[bucket_extent.value()]; auto raw_set = cuco::static_set_ref{ cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL}, global_set.key_eq(), probing_scheme_t{global_set.hash_function()}, cuco::thread_scope_block, - cuco::aow_storage_ref{ - window_extent, windows}}; + cuco::bucket_storage_ref{ + bucket_extent, buckets}}; auto shared_set = raw_set.rebind_operators(cuco::insert_and_find); auto const block = cooperative_groups::this_thread_block(); diff --git a/cpp/src/groupby/hash/helpers.cuh b/cpp/src/groupby/hash/helpers.cuh index f950e03e0fb..92925e11bac 100644 --- a/cpp/src/groupby/hash/helpers.cuh +++ b/cpp/src/groupby/hash/helpers.cuh @@ -27,7 +27,7 @@ namespace cudf::groupby::detail::hash { CUDF_HOST_DEVICE auto constexpr GROUPBY_CG_SIZE = 1; /// Number of slots per thread -CUDF_HOST_DEVICE auto constexpr GROUPBY_WINDOW_SIZE = 1; +CUDF_HOST_DEVICE auto constexpr GROUPBY_BUCKET_SIZE = 1; /// Thread block size CUDF_HOST_DEVICE auto constexpr GROUPBY_BLOCK_SIZE = 128; @@ -48,9 +48,9 @@ using shmem_extent_t = cuco::extent(static_cast(GROUPBY_SHM_MAX_ELEMENTS) * 1.43)>; -/// Number of windows needed by each shared memory hash set -CUDF_HOST_DEVICE auto constexpr window_extent = - cuco::make_window_extent(shmem_extent_t{}); +/// Number of buckets needed by each shared memory hash set +CUDF_HOST_DEVICE auto constexpr bucket_extent = + cuco::make_bucket_extent(shmem_extent_t{}); using row_hash_t = cudf::experimental::row::hash::device_row_hasher, - cuco::storage>; + cuco::storage>; using nullable_global_set_t = cuco::static_set, @@ -83,7 +83,7 @@ using nullable_global_set_t = cuco::static_set, - cuco::storage>; + cuco::storage>; template using hash_set_ref_t = cuco::static_set_ref< @@ -91,7 +91,7 @@ using hash_set_ref_t = cuco::static_set_ref< cuda::thread_scope_device, row_comparator_t, probing_scheme_t, - cuco::aow_storage_ref>, + cuco::bucket_storage_ref>, Op>; template @@ -100,6 +100,6 @@ using nullable_hash_set_ref_t = cuco::static_set_ref< cuda::thread_scope_device, nullable_row_comparator_t, probing_scheme_t, - cuco::aow_storage_ref>, + cuco::bucket_storage_ref>, Op>; } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/io/fst/logical_stack.cuh b/cpp/src/io/fst/logical_stack.cuh index 0f1fc7d572b..98641f2c893 100644 --- a/cpp/src/io/fst/logical_stack.cuh +++ b/cpp/src/io/fst/logical_stack.cuh @@ -513,6 +513,12 @@ void sparse_stack_op_to_top_of_stack(StackSymbolItT d_symbols, stream)); } + // Check if the last element of d_kv_operations is 0. If not, then we have a problem. + if (num_symbols_in && !supports_reset_op) { + StackOpT last_symbol = d_kv_ops_current.element(num_symbols_in - 1, stream); + CUDF_EXPECTS(last_symbol.stack_level == 0, "The logical stack is not empty!"); + } + // Stable radix sort, sorting by stack level of the operations d_kv_operations_unsigned = cub::DoubleBuffer{ reinterpret_cast(d_kv_operations.Current()), diff --git a/cpp/src/io/json/nested_json_gpu.cu b/cpp/src/io/json/nested_json_gpu.cu index f1c2826c62a..30a28a1cf98 100644 --- a/cpp/src/io/json/nested_json_gpu.cu +++ b/cpp/src/io/json/nested_json_gpu.cu @@ -1473,10 +1473,11 @@ void get_stack_context(device_span json_in, to_stack_op::start_state, stream); - auto stack_ops_bufsize = d_num_stack_ops.value(stream); + // Copy back to actual number of stack operations + auto num_stack_ops = d_num_stack_ops.value(stream); // Sequence of stack symbols and their position in the original input (sparse representation) - rmm::device_uvector stack_ops{stack_ops_bufsize, stream}; - rmm::device_uvector stack_op_indices{stack_ops_bufsize, stream}; + rmm::device_uvector stack_ops{num_stack_ops, stream}; + rmm::device_uvector stack_op_indices{num_stack_ops, stream}; // Run bracket-brace FST to retrieve starting positions of structs and lists json_to_stack_ops_fst.Transduce(json_in.begin(), @@ -1487,9 +1488,6 @@ void get_stack_context(device_span json_in, to_stack_op::start_state, stream); - // Copy back to actual number of stack operations - auto const num_stack_ops = d_num_stack_ops.value(stream); - // Stack operations with indices are converted to top of the stack for each character in the input if (stack_behavior == stack_behavior_t::ResetOnDelimiter) { fst::sparse_stack_op_to_top_of_stack( diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 0cb5c382631..7facc6497ed 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -180,9 +180,9 @@ CUDF_KERNEL void __launch_bounds__(block_size) for (size_type i = 0; i < dict.map_slots.size(); i += block_size) { if (t + i < dict.map_slots.size()) { - auto window = dict.map_slots.begin() + t + i; - // Collect all slots from each window. - for (auto& slot : *window) { + auto bucket = dict.map_slots.begin() + t + i; + // Collect all slots from each bucket. + for (auto& slot : *bucket) { auto const key = slot.first; if (key != KEY_SENTINEL) { auto loc = counter.fetch_add(1, memory_order_relaxed); diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index 0949fafe9a4..654ee1e012c 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -47,16 +47,16 @@ using slot_type = cuco::pair; auto constexpr map_cg_size = 1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset. ///< Note: Adjust insert and find loops to use `cg::tile` if increasing this. -auto constexpr window_size = +auto constexpr bucket_size = 1; ///< Number of concurrent slots (set for best performance) handled by each thread. auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size ///< N * (1/0.7) = 1.43 to target a 70% occupancy factor. -using storage_type = cuco::aow_storage, - cudf::detail::cuco_allocator>; +using storage_type = cuco::bucket_storage, + cudf::detail::cuco_allocator>; using storage_ref_type = typename storage_type::ref_type; -using window_type = typename storage_type::window_type; +using bucket_type = typename storage_type::bucket_type; using slot_type = cuco::pair; auto constexpr KEY_SENTINEL = size_type{-1}; @@ -193,7 +193,7 @@ struct StripeStream { */ struct stripe_dictionary { // input - device_span map_slots; // hash map (windows) storage + device_span map_slots; // hash map (buckets) storage uint32_t column_idx = 0; // column index size_type start_row = 0; // first row in the stripe size_type start_rowgroup = 0; // first rowgroup in the stripe diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index b85ebf2fa1a..b5f9b894c46 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -210,7 +210,7 @@ struct map_find_fn { template CUDF_KERNEL void __launch_bounds__(block_size) - populate_chunk_hash_maps_kernel(device_span const map_storage, + populate_chunk_hash_maps_kernel(device_span const map_storage, cudf::detail::device_2dspan frags) { auto const col_idx = blockIdx.y; @@ -239,7 +239,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) template CUDF_KERNEL void __launch_bounds__(block_size) - collect_map_entries_kernel(device_span const map_storage, + collect_map_entries_kernel(device_span const map_storage, device_span chunks) { auto& chunk = chunks[blockIdx.x]; @@ -251,11 +251,11 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); - // Iterate over all windows in the map. + // Iterate over all buckets in the map. for (; t < chunk.dict_map_size; t += block_size) { - auto window = map_storage.data() + chunk.dict_map_offset + t; - // Collect all slots from each window. - for (auto& slot : *window) { + auto bucket = map_storage.data() + chunk.dict_map_offset + t; + // Collect all slots from each bucket. + for (auto& slot : *bucket) { auto const key = slot.first; if (key != KEY_SENTINEL) { auto const loc = counter.fetch_add(1, memory_order_relaxed); @@ -272,7 +272,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) template CUDF_KERNEL void __launch_bounds__(block_size) - get_dictionary_indices_kernel(device_span const map_storage, + get_dictionary_indices_kernel(device_span const map_storage, cudf::detail::device_2dspan frags) { auto const col_idx = blockIdx.y; @@ -302,7 +302,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) s_ck_start_val_idx); } -void populate_chunk_hash_maps(device_span const map_storage, +void populate_chunk_hash_maps(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { @@ -311,7 +311,7 @@ void populate_chunk_hash_maps(device_span const map_storage, <<>>(map_storage, frags); } -void collect_map_entries(device_span const map_storage, +void collect_map_entries(device_span const map_storage, device_span chunks, rmm::cuda_stream_view stream) { @@ -320,7 +320,7 @@ void collect_map_entries(device_span const map_storage, <<>>(map_storage, chunks); } -void get_dictionary_indices(device_span const map_storage, +void get_dictionary_indices(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { diff --git a/cpp/src/io/parquet/parquet_gpu.cuh b/cpp/src/io/parquet/parquet_gpu.cuh index 7c09764da2d..800875f7448 100644 --- a/cpp/src/io/parquet/parquet_gpu.cuh +++ b/cpp/src/io/parquet/parquet_gpu.cuh @@ -34,7 +34,7 @@ using slot_type = cuco::pair; auto constexpr map_cg_size = 1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset. ///< Note: Adjust insert and find loops to use `cg::tile` if increasing this. -auto constexpr window_size = +auto constexpr bucket_size = 1; ///< Number of concurrent slots (set for best performance) handled by each thread. auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size ///< N * (1/0.7) = 1.43 to target a 70% occupancy factor. @@ -43,12 +43,12 @@ auto constexpr KEY_SENTINEL = key_type{-1}; auto constexpr VALUE_SENTINEL = mapped_type{-1}; auto constexpr SCOPE = cuda::thread_scope_block; -using storage_type = cuco::aow_storage, - cudf::detail::cuco_allocator>; +using storage_type = cuco::bucket_storage, + cudf::detail::cuco_allocator>; using storage_ref_type = typename storage_type::ref_type; -using window_type = typename storage_type::window_type; +using bucket_type = typename storage_type::bucket_type; /** * @brief Return the byte length of parquet dtypes that are physically represented by INT32 @@ -100,7 +100,7 @@ inline size_type __device__ row_to_value_idx(size_type idx, * @param frags Column fragments * @param stream CUDA stream to use */ -void populate_chunk_hash_maps(device_span const map_storage, +void populate_chunk_hash_maps(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); @@ -111,7 +111,7 @@ void populate_chunk_hash_maps(device_span const map_storage, * @param chunks Flat span of chunks to compact hash maps for * @param stream CUDA stream to use */ -void collect_map_entries(device_span const map_storage, +void collect_map_entries(device_span const map_storage, device_span chunks, rmm::cuda_stream_view stream); @@ -128,7 +128,7 @@ void collect_map_entries(device_span const map_storage, * @param frags Column fragments * @param stream CUDA stream to use */ -void get_dictionary_indices(device_span const map_storage, +void get_dictionary_indices(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 188e6a8c0d8..6db92462498 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1302,7 +1302,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, } else { chunk.use_dictionary = true; chunk.dict_map_size = - static_cast(cuco::make_window_extent( + static_cast(cuco::make_bucket_extent( static_cast(occupancy_factor * chunk.num_values))); chunk.dict_map_offset = total_map_storage_size; total_map_storage_size += chunk.dict_map_size; @@ -1317,7 +1317,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, total_map_storage_size, cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}}; // Create a span of non-const map_storage as map_storage_ref takes in a non-const pointer. - device_span const map_storage_data{map_storage.data(), total_map_storage_size}; + device_span const map_storage_data{map_storage.data(), total_map_storage_size}; // Synchronize chunks.host_to_device_async(stream); diff --git a/cpp/tests/io/json/json_test.cpp b/cpp/tests/io/json/json_test.cpp index 37a750330fa..23ca5734ded 100644 --- a/cpp/tests/io/json/json_test.cpp +++ b/cpp/tests/io/json/json_test.cpp @@ -3450,4 +3450,15 @@ TEST_P(JsonCompressedIOTest, BasicJsonLines) CUDF_TEST_EXPECT_COLUMNS_EQUAL(result.tbl->get_column(1), float64_wrapper{{1.1, 2.2, 3.3}}); } +TEST_F(JsonReaderTest, MismatchedBeginEndTokens) +{ + std::string data = R"({"not_valid": "json)"; + auto opts = + cudf::io::json_reader_options::builder(cudf::io::source_info{data.data(), data.size()}) + .lines(true) + .recovery_mode(cudf::io::json_recovery_mode_t::FAIL) + .build(); + EXPECT_THROW(cudf::io::read_json(opts), cudf::logic_error); +} + CUDF_TEST_PROGRAM_MAIN() diff --git a/python/cudf/cudf/_lib/CMakeLists.txt b/python/cudf/cudf/_lib/CMakeLists.txt index 410fd57691e..da4faabf189 100644 --- a/python/cudf/cudf/_lib/CMakeLists.txt +++ b/python/cudf/cudf/_lib/CMakeLists.txt @@ -12,7 +12,7 @@ # the License. # ============================================================================= -set(cython_sources column.pyx groupby.pyx scalar.pyx strings_udf.pyx types.pyx utils.pyx) +set(cython_sources column.pyx scalar.pyx strings_udf.pyx types.pyx utils.pyx) set(linked_libraries cudf::cudf) rapids_cython_create_modules( diff --git a/python/cudf/cudf/_lib/__init__.py b/python/cudf/cudf/_lib/__init__.py index 6b5a7814e48..10f9d813ccc 100644 --- a/python/cudf/cudf/_lib/__init__.py +++ b/python/cudf/cudf/_lib/__init__.py @@ -1,10 +1,7 @@ # Copyright (c) 2020-2024, NVIDIA CORPORATION. import numpy as np -from . import ( - groupby, - strings_udf, -) +from . import strings_udf MAX_COLUMN_SIZE = np.iinfo(np.int32).max MAX_COLUMN_SIZE_STR = "INT32_MAX" diff --git a/python/cudf/cudf/_lib/groupby.pyx b/python/cudf/cudf/_lib/groupby.pyx deleted file mode 100644 index 80a77ef2267..00000000000 --- a/python/cudf/cudf/_lib/groupby.pyx +++ /dev/null @@ -1,281 +0,0 @@ -# Copyright (c) 2020-2024, NVIDIA CORPORATION. -from functools import singledispatch - -from pandas.errors import DataError - -from cudf.api.types import _is_categorical_dtype, is_string_dtype -from cudf.core.buffer import acquire_spill_lock -from cudf.core.dtypes import ( - CategoricalDtype, - DecimalDtype, - IntervalDtype, - ListDtype, - StructDtype, -) - -from cudf._lib.scalar cimport DeviceScalar -from cudf._lib.utils cimport columns_from_pylibcudf_table - -from cudf._lib.scalar import as_device_scalar - -import pylibcudf - -from cudf.core._internals.aggregation import make_aggregation - -# The sets below define the possible aggregations that can be performed on -# different dtypes. These strings must be elements of the AggregationKind enum. -# The libcudf infrastructure exists for "COLLECT" support on -# categoricals, but the dtype support in python does not. -_CATEGORICAL_AGGS = {"COUNT", "NUNIQUE", "SIZE", "UNIQUE"} -_STRING_AGGS = { - "COLLECT", - "COUNT", - "MAX", - "MIN", - "NTH", - "NUNIQUE", - "SIZE", - "UNIQUE", -} -_LIST_AGGS = {"COLLECT"} -_STRUCT_AGGS = {"COLLECT", "CORRELATION", "COVARIANCE"} -_INTERVAL_AGGS = {"COLLECT"} -_DECIMAL_AGGS = { - "ARGMIN", - "ARGMAX", - "COLLECT", - "COUNT", - "MAX", - "MIN", - "NTH", - "NUNIQUE", - "SUM", -} - - -@singledispatch -def get_valid_aggregation(dtype): - if is_string_dtype(dtype): - return _STRING_AGGS - return "ALL" - - -@get_valid_aggregation.register -def _(dtype: ListDtype): - return _LIST_AGGS - - -@get_valid_aggregation.register -def _(dtype: CategoricalDtype): - return _CATEGORICAL_AGGS - - -@get_valid_aggregation.register -def _(dtype: ListDtype): - return _LIST_AGGS - - -@get_valid_aggregation.register -def _(dtype: StructDtype): - return _STRUCT_AGGS - - -@get_valid_aggregation.register -def _(dtype: IntervalDtype): - return _INTERVAL_AGGS - - -@get_valid_aggregation.register -def _(dtype: DecimalDtype): - return _DECIMAL_AGGS - - -cdef class GroupBy: - cdef dict __dict__ - - def __init__(self, keys, dropna=True): - with acquire_spill_lock() as spill_lock: - self._groupby = pylibcudf.groupby.GroupBy( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in keys]), - pylibcudf.types.NullPolicy.EXCLUDE if dropna - else pylibcudf.types.NullPolicy.INCLUDE - ) - - # We spill lock the columns while this GroupBy instance is alive. - self._spill_lock = spill_lock - - def groups(self, list values): - """ - Perform a sort groupby, using the keys used to construct the Groupby as the key - columns and ``values`` as the value columns. - - Parameters - ---------- - values: list of Columns - The value columns - - Returns - ------- - offsets: list of integers - Integer offsets such that offsets[i+1] - offsets[i] - represents the size of group `i`. - grouped_keys: list of Columns - The grouped key columns - grouped_values: list of Columns - The grouped value columns - """ - offsets, grouped_keys, grouped_values = self._groupby.get_groups( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]) - if values else None - ) - - return ( - offsets, - columns_from_pylibcudf_table(grouped_keys), - ( - columns_from_pylibcudf_table(grouped_values) - if grouped_values is not None else [] - ), - ) - - def aggregate(self, values, aggregations): - """ - Parameters - ---------- - values : Frame - aggregations - A dict mapping column names in `Frame` to a list of aggregations - to perform on that column - - Each aggregation may be specified as: - - a string (e.g., "max") - - a lambda/function - - Returns - ------- - Frame of aggregated values - """ - included_aggregations = [] - column_included = [] - requests = [] - for i, (col, aggs) in enumerate(zip(values, aggregations)): - valid_aggregations = get_valid_aggregation(col.dtype) - included_aggregations_i = [] - col_aggregations = [] - for agg in aggs: - str_agg = str(agg) - if ( - is_string_dtype(col) - and agg not in _STRING_AGGS - and - ( - str_agg in {"cumsum", "cummin", "cummax"} - or not ( - any(a in str_agg for a in { - "count", - "max", - "min", - "first", - "last", - "nunique", - "unique", - "nth" - }) - or (agg is list) - ) - ) - ): - raise TypeError( - f"function is not supported for this dtype: {agg}" - ) - elif ( - _is_categorical_dtype(col) - and agg not in _CATEGORICAL_AGGS - and ( - str_agg in {"cumsum", "cummin", "cummax"} - or - not ( - any(a in str_agg for a in {"count", "max", "min", "unique"}) - ) - ) - ): - raise TypeError( - f"{col.dtype} type does not support {agg} operations" - ) - - agg_obj = make_aggregation(agg) - if valid_aggregations == "ALL" or agg_obj.kind in valid_aggregations: - included_aggregations_i.append((agg, agg_obj.kind)) - col_aggregations.append(agg_obj.c_obj) - included_aggregations.append(included_aggregations_i) - if col_aggregations: - requests.append(pylibcudf.groupby.GroupByRequest( - col.to_pylibcudf(mode="read"), col_aggregations - )) - column_included.append(i) - - if not requests and any(len(v) > 0 for v in aggregations): - raise DataError("All requested aggregations are unsupported.") - - keys, results = self._groupby.scan(requests) if \ - _is_all_scan_aggregate(aggregations) else self._groupby.aggregate(requests) - - result_columns = [[] for _ in range(len(values))] - for i, result in zip(column_included, results): - result_columns[i] = columns_from_pylibcudf_table(result) - - return result_columns, columns_from_pylibcudf_table(keys), included_aggregations - - def shift(self, list values, int periods, list fill_values): - keys, shifts = self._groupby.shift( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]), - [periods] * len(values), - [ - ( as_device_scalar(val, dtype=col.dtype)).c_value - for val, col in zip(fill_values, values) - ], - ) - - return columns_from_pylibcudf_table(shifts), columns_from_pylibcudf_table(keys) - - def replace_nulls(self, list values, object method): - _, replaced = self._groupby.replace_nulls( - pylibcudf.table.Table([c.to_pylibcudf(mode="read") for c in values]), - [ - pylibcudf.replace.ReplacePolicy.PRECEDING - if method == 'ffill' else pylibcudf.replace.ReplacePolicy.FOLLOWING - ] * len(values), - ) - - return columns_from_pylibcudf_table(replaced) - - -_GROUPBY_SCANS = {"cumcount", "cumsum", "cummin", "cummax", "cumprod", "rank"} - - -def _is_all_scan_aggregate(all_aggs): - """ - Returns true if all are scan aggregations. - Raises - ------ - NotImplementedError - If both reduction aggregations and scan aggregations are present. - """ - - def get_name(agg): - return agg.__name__ if callable(agg) else agg - - all_scan = all( - get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs - for agg_name in aggs - ) - any_scan = any( - get_name(agg_name) in _GROUPBY_SCANS for aggs in all_aggs - for agg_name in aggs - ) - - if not all_scan and any_scan: - raise NotImplementedError( - "Cannot perform both aggregation and scan in one operation" - ) - return all_scan and any_scan diff --git a/python/cudf/cudf/core/_base_index.py b/python/cudf/cudf/core/_base_index.py index f4543bc6156..c2f3c782d10 100644 --- a/python/cudf/cudf/core/_base_index.py +++ b/python/cudf/cudf/core/_base_index.py @@ -1447,7 +1447,7 @@ def _union(self, other, sort=None): other_df["order"] = other_df.index res = self_df.merge(other_df, on=[0], how="outer") res = res.sort_values( - by=res._data.to_pandas_index()[1:], ignore_index=True + by=res._data.to_pandas_index[1:], ignore_index=True ) union_result = cudf.core.index._index_from_data({0: res._data[0]}) diff --git a/python/cudf/cudf/core/_internals/aggregation.py b/python/cudf/cudf/core/_internals/aggregation.py index fe8ea5a947a..1d21d34b1bf 100644 --- a/python/cudf/cudf/core/_internals/aggregation.py +++ b/python/cudf/cudf/core/_internals/aggregation.py @@ -29,11 +29,11 @@ class Aggregation: def __init__(self, agg: plc.aggregation.Aggregation) -> None: - self.c_obj = agg + self.plc_obj = agg @property def kind(self) -> str: - name = self.c_obj.kind().name + name = self.plc_obj.kind().name return _agg_name_map.get(name, name) @classmethod diff --git a/python/cudf/cudf/core/column/column.py b/python/cudf/cudf/core/column/column.py index cccafaeba88..75b9070b53f 100644 --- a/python/cudf/cudf/core/column/column.py +++ b/python/cudf/cudf/core/column/column.py @@ -1605,7 +1605,7 @@ def scan(self, scan_op: str, inclusive: bool, **kwargs) -> Self: return type(self).from_pylibcudf( # type: ignore[return-value] plc.reduce.scan( self.to_pylibcudf(mode="read"), - aggregation.make_aggregation(scan_op, kwargs).c_obj, + aggregation.make_aggregation(scan_op, kwargs).plc_obj, plc.reduce.ScanType.INCLUSIVE if inclusive else plc.reduce.ScanType.EXCLUSIVE, @@ -1637,7 +1637,7 @@ def reduce(self, reduction_op: str, dtype=None, **kwargs) -> ScalarLike: with acquire_spill_lock(): plc_scalar = plc.reduce.reduce( self.to_pylibcudf(mode="read"), - aggregation.make_aggregation(reduction_op, kwargs).c_obj, + aggregation.make_aggregation(reduction_op, kwargs).plc_obj, dtype_to_pylibcudf_type(col_dtype), ) result_col = type(self).from_pylibcudf( diff --git a/python/cudf/cudf/core/column_accessor.py b/python/cudf/cudf/core/column_accessor.py index e4fd82e819b..aaf7d071dff 100644 --- a/python/cudf/cudf/core/column_accessor.py +++ b/python/cudf/cudf/core/column_accessor.py @@ -207,11 +207,16 @@ def _from_columns_like_self( @property def level_names(self) -> tuple[abc.Hashable, ...]: + if self.is_cached("to_pandas_index"): + return self.to_pandas_index.names if self._level_names is None or len(self._level_names) == 0: return tuple((None,) * max(1, self.nlevels)) else: return self._level_names + def is_cached(self, attr_name: str) -> bool: + return attr_name in self.__dict__ + @property def nlevels(self) -> int: if len(self) == 0: @@ -262,7 +267,12 @@ def _clear_cache(self, old_ncols: int, new_ncols: int) -> None: new_ncols: int len(self) after self._data was modified """ - cached_properties = ("columns", "names", "_grouped_data") + cached_properties = ( + "columns", + "names", + "_grouped_data", + "to_pandas_index", + ) for attr in cached_properties: try: self.__delattr__(attr) @@ -276,6 +286,7 @@ def _clear_cache(self, old_ncols: int, new_ncols: int) -> None: except AttributeError: pass + @cached_property def to_pandas_index(self) -> pd.Index: """Convert the keys of the ColumnAccessor to a Pandas Index object.""" if self.multiindex and len(self.level_names) > 0: @@ -726,10 +737,10 @@ def droplevel(self, level: int) -> None: } new_ncols = len(self) self._level_names = ( - self._level_names[:level] + self._level_names[level + 1 :] + self.level_names[:level] + self.level_names[level + 1 :] ) - if len(self._level_names) == 1: + if len(self.level_names) == 1: # can't use nlevels, as it depends on multiindex self.multiindex = False self._clear_cache(old_ncols, new_ncols) diff --git a/python/cudf/cudf/core/dataframe.py b/python/cudf/cudf/core/dataframe.py index e66e4f41642..3334b57ce1b 100644 --- a/python/cudf/cudf/core/dataframe.py +++ b/python/cudf/cudf/core/dataframe.py @@ -961,7 +961,7 @@ def _init_from_series_list(self, data, columns, index): warnings.simplefilter("ignore", FutureWarning) concat_df = cudf.concat(data, axis=1) - cols = concat_df._data.to_pandas_index() + cols = concat_df._data.to_pandas_index if cols.dtype == "object": concat_df.columns = cols.astype("str") @@ -2092,7 +2092,7 @@ def _make_operands_and_index_for_binop( equal_columns = True elif isinstance(other, Series): if ( - not (self_pd_columns := self._data.to_pandas_index()).equals( + not (self_pd_columns := self._data.to_pandas_index).equals( other_pd_index := other.index.to_pandas() ) and not can_reindex @@ -2117,8 +2117,8 @@ def _make_operands_and_index_for_binop( and fn in cudf.utils.utils._EQUALITY_OPS and ( not self.index.equals(other.index) - or not self._data.to_pandas_index().equals( - other._data.to_pandas_index() + or not self._data.to_pandas_index.equals( + other._data.to_pandas_index ) ) ): @@ -2162,11 +2162,11 @@ def _make_operands_and_index_for_binop( if not equal_columns: if isinstance(other, DataFrame): - column_names_list = self._data.to_pandas_index().join( - other._data.to_pandas_index(), how="outer" + column_names_list = self._data.to_pandas_index.join( + other._data.to_pandas_index, how="outer" ) elif isinstance(other, Series): - column_names_list = self._data.to_pandas_index().join( + column_names_list = self._data.to_pandas_index.join( other.index.to_pandas(), how="outer" ) else: @@ -2626,8 +2626,8 @@ def update( if not isinstance(other, DataFrame): other = DataFrame(other) - self_cols = self._data.to_pandas_index() - if not self_cols.equals(other._data.to_pandas_index()): + self_cols = self._data.to_pandas_index + if not self_cols.equals(other._data.to_pandas_index): other = other.reindex(self_cols, axis=1) if not self.index.equals(other.index): other = other.reindex(self.index, axis=0) @@ -2663,7 +2663,7 @@ def __iter__(self): def __contains__(self, item): # This must check against containment in the pandas Index and not # self._column_names to handle NA, None, nan, etc. correctly. - return item in self._data.to_pandas_index() + return item in self._data.to_pandas_index @_performance_tracking def items(self): @@ -2700,14 +2700,14 @@ def at(self): @property # type: ignore @_external_only_api( - "Use _column_names instead, or _data.to_pandas_index() if a pandas " + "Use _column_names instead, or _data.to_pandas_index if a pandas " "index is absolutely necessary. For checking if the columns are a " "MultiIndex, use _data.multiindex." ) @_performance_tracking def columns(self): """Returns a tuple of columns""" - return self._data.to_pandas_index() + return self._data.to_pandas_index @columns.setter # type: ignore @_performance_tracking @@ -2916,7 +2916,7 @@ def reindex( df = self else: columns = cudf.Index(columns) - intersection = self._data.to_pandas_index().intersection( + intersection = self._data.to_pandas_index.intersection( columns.to_pandas() ) df = self.loc[:, intersection] @@ -3430,7 +3430,7 @@ def axes(self): Index(['key', 'k2', 'val', 'temp'], dtype='object')] """ - return [self.index, self._data.to_pandas_index()] + return [self.index, self._data.to_pandas_index] def diff(self, periods=1, axis=0): """ @@ -4129,7 +4129,7 @@ def transpose(self): Not supporting *copy* because default and only behavior is copy=True """ - index = self._data.to_pandas_index() + index = self._data.to_pandas_index columns = self.index.copy(deep=False) if self._num_columns == 0 or self._num_rows == 0: return DataFrame(index=index, columns=columns) @@ -5535,7 +5535,7 @@ def to_pandas( } out_df = pd.DataFrame(out_data, index=out_index) - out_df.columns = self._data.to_pandas_index() + out_df.columns = self._data.to_pandas_index return out_df @@ -6487,7 +6487,7 @@ def _reduce( source = self._get_columns_by_label(numeric_cols) if source.empty: return Series( - index=self._data.to_pandas_index()[:0] + index=self._data.to_pandas_index[:0] if axis == 0 else source.index, dtype="float64", @@ -6540,7 +6540,7 @@ def _reduce( "Columns must all have the same dtype to " f"perform {op=} with {axis=}" ) - pd_index = source._data.to_pandas_index() + pd_index = source._data.to_pandas_index if source._data.multiindex: idx = MultiIndex.from_pandas(pd_index) else: @@ -7242,7 +7242,7 @@ def stack( ] has_unnamed_levels = len(unnamed_levels_indices) > 0 - column_name_idx = self._data.to_pandas_index() + column_name_idx = self._data.to_pandas_index # Construct new index from the levels specified by `level` named_levels = pd.MultiIndex.from_arrays( [column_name_idx.get_level_values(lv) for lv in level_indices] @@ -7432,7 +7432,7 @@ def cov(self, min_periods=None, ddof: int = 1, numeric_only: bool = False): ) cov = cupy.cov(self.values, ddof=ddof, rowvar=False) - cols = self._data.to_pandas_index() + cols = self._data.to_pandas_index df = DataFrame(cupy.asfortranarray(cov), index=cols) df._set_columns_like(self._data) return df @@ -7475,7 +7475,7 @@ def corr( ) corr = cupy.corrcoef(values, rowvar=False) - cols = self._data.to_pandas_index() + cols = self._data.to_pandas_index df = DataFrame(cupy.asfortranarray(corr), index=cols) df._set_columns_like(self._data) return df @@ -7544,7 +7544,7 @@ def keys(self): >>> df.keys() Index([0, 1, 2, 3], dtype='int64') """ - return self._data.to_pandas_index() + return self._data.to_pandas_index def itertuples(self, index=True, name="Pandas"): """ @@ -7778,7 +7778,7 @@ def nunique(self, axis=0, dropna: bool = True) -> Series: raise NotImplementedError("axis parameter is not supported yet.") counts = [col.distinct_count(dropna=dropna) for col in self._columns] return self._constructor_sliced( - counts, index=self._data.to_pandas_index() + counts, index=self._data.to_pandas_index ) def _sample_axis_1( diff --git a/python/cudf/cudf/core/groupby/groupby.py b/python/cudf/cudf/core/groupby/groupby.py index 6cd8e11695f..be3cc410174 100644 --- a/python/cudf/cudf/core/groupby/groupby.py +++ b/python/cudf/cudf/core/groupby/groupby.py @@ -4,9 +4,10 @@ import copy import itertools import textwrap +import types import warnings from collections import abc -from functools import cached_property +from functools import cached_property, singledispatch from typing import TYPE_CHECKING, Any, Literal import cupy as cp @@ -18,17 +19,27 @@ import cudf import cudf.core._internals from cudf import _lib as libcudf -from cudf._lib import groupby as libgroupby from cudf._lib.types import size_type_dtype from cudf.api.extensions import no_default -from cudf.api.types import is_list_like, is_numeric_dtype +from cudf.api.types import ( + is_list_like, + is_numeric_dtype, + is_string_dtype, +) from cudf.core._compat import PANDAS_LT_300 -from cudf.core._internals import sorting +from cudf.core._internals import aggregation, sorting from cudf.core.abc import Serializable from cudf.core.buffer import acquire_spill_lock -from cudf.core.column.column import ColumnBase, StructDtype, as_column +from cudf.core.column.column import ColumnBase, as_column from cudf.core.column_accessor import ColumnAccessor from cudf.core.copy_types import GatherMap +from cudf.core.dtypes import ( + CategoricalDtype, + DecimalDtype, + IntervalDtype, + ListDtype, + StructDtype, +) from cudf.core.join._join_helpers import _match_join_keys from cudf.core.mixins import Reducible, Scannable from cudf.core.multiindex import MultiIndex @@ -37,7 +48,7 @@ from cudf.utils.utils import GetAttrGetItemMixin if TYPE_CHECKING: - from collections.abc import Iterable + from collections.abc import Generator, Iterable from cudf._typing import ( AggType, @@ -46,6 +57,152 @@ ScalarLike, ) +# The sets below define the possible aggregations that can be performed on +# different dtypes. These strings must be elements of the AggregationKind enum. +# The libcudf infrastructure exists for "COLLECT" support on +# categoricals, but the dtype support in python does not. +_CATEGORICAL_AGGS = {"COUNT", "NUNIQUE", "SIZE", "UNIQUE"} +_STRING_AGGS = { + "COLLECT", + "COUNT", + "MAX", + "MIN", + "NTH", + "NUNIQUE", + "SIZE", + "UNIQUE", +} +_LIST_AGGS = {"COLLECT"} +_STRUCT_AGGS = {"COLLECT", "CORRELATION", "COVARIANCE"} +_INTERVAL_AGGS = {"COLLECT"} +_DECIMAL_AGGS = { + "ARGMIN", + "ARGMAX", + "COLLECT", + "COUNT", + "MAX", + "MIN", + "NTH", + "NUNIQUE", + "SUM", +} + + +@singledispatch +def get_valid_aggregation(dtype): + if is_string_dtype(dtype): + return _STRING_AGGS + return "ALL" + + +@get_valid_aggregation.register +def _(dtype: ListDtype): + return _LIST_AGGS + + +@get_valid_aggregation.register +def _(dtype: CategoricalDtype): + return _CATEGORICAL_AGGS + + +@get_valid_aggregation.register +def _(dtype: ListDtype): + return _LIST_AGGS + + +@get_valid_aggregation.register +def _(dtype: StructDtype): + return _STRUCT_AGGS + + +@get_valid_aggregation.register +def _(dtype: IntervalDtype): + return _INTERVAL_AGGS + + +@get_valid_aggregation.register +def _(dtype: DecimalDtype): + return _DECIMAL_AGGS + + +@singledispatch +def _is_unsupported_agg_for_type(dtype, str_agg: str) -> bool: + return False + + +@_is_unsupported_agg_for_type.register +def _(dtype: np.dtype, str_agg: str) -> bool: + # string specifically + cumulative_agg = str_agg in {"cumsum", "cummin", "cummax"} + basic_agg = any( + a in str_agg + for a in ( + "count", + "max", + "min", + "first", + "last", + "nunique", + "unique", + "nth", + ) + ) + return ( + dtype.kind == "O" + and str_agg not in _STRING_AGGS + and (cumulative_agg or not (basic_agg or str_agg == "")) + ) + + +@_is_unsupported_agg_for_type.register +def _(dtype: CategoricalDtype, str_agg: str) -> bool: + cumulative_agg = str_agg in {"cumsum", "cummin", "cummax"} + not_basic_agg = not any( + a in str_agg for a in ("count", "max", "min", "unique") + ) + return str_agg not in _CATEGORICAL_AGGS and ( + cumulative_agg or not_basic_agg + ) + + +def _is_all_scan_aggregate(all_aggs: list[list[str]]) -> bool: + """ + Returns True if all are scan aggregations. + + Raises + ------ + NotImplementedError + If both reduction aggregations and scan aggregations are present. + """ + groupby_scans = { + "cumcount", + "cumsum", + "cummin", + "cummax", + "cumprod", + "rank", + } + + def get_name(agg): + return agg.__name__ if callable(agg) else agg + + all_scan = all( + get_name(agg_name) in groupby_scans + for aggs in all_aggs + for agg_name in aggs + ) + any_scan = any( + get_name(agg_name) in groupby_scans + for aggs in all_aggs + for agg_name in aggs + ) + + if not all_scan and any_scan: + raise NotImplementedError( + "Cannot perform both aggregation and scan in one operation" + ) + return all_scan and any_scan + def _deprecate_collect(): warnings.warn( @@ -423,7 +580,7 @@ def indices(self) -> dict[ScalarLike, cp.ndarray]: >>> df.groupby(by=["a"]).indices {10: array([0, 1]), 40: array([2])} """ - offsets, group_keys, (indices,) = self._groupby.groups( + offsets, group_keys, (indices,) = self._groups( [ cudf.core.column.as_column( range(len(self.obj)), dtype=size_type_dtype @@ -582,11 +739,137 @@ def rank(x): return result @cached_property - def _groupby(self): - return libgroupby.GroupBy( - [*self.grouping.keys._columns], dropna=self._dropna + def _groupby(self) -> types.SimpleNamespace: + with acquire_spill_lock() as spill_lock: + plc_groupby = plc.groupby.GroupBy( + plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in self.grouping.keys._columns + ] + ), + plc.types.NullPolicy.EXCLUDE + if self._dropna + else plc.types.NullPolicy.INCLUDE, + ) + # Do we need this because we just check _spill_locks in test_spillable_df_groupby? + return types.SimpleNamespace( + plc_groupby=plc_groupby, _spill_locks=spill_lock + ) + + def _groups( + self, values: Iterable[ColumnBase] + ) -> tuple[list[int], list[ColumnBase], list[ColumnBase]]: + plc_columns = [col.to_pylibcudf(mode="read") for col in values] + if not plc_columns: + plc_table = None + else: + plc_table = plc.Table(plc_columns) + offsets, grouped_keys, grouped_values = ( + self._groupby.plc_groupby.get_groups(plc_table) + ) + + return ( + offsets, + [ColumnBase.from_pylibcudf(col) for col in grouped_keys.columns()], + ( + [ + ColumnBase.from_pylibcudf(col) + for col in grouped_values.columns() + ] + if grouped_values is not None + else [] + ), + ) + + def _aggregate( + self, values: tuple[ColumnBase, ...], aggregations + ) -> tuple[ + list[list[ColumnBase]], + list[ColumnBase], + list[list[tuple[str, str]]], + ]: + included_aggregations = [] + column_included = [] + requests = [] + result_columns: list[list[ColumnBase]] = [] + for i, (col, aggs) in enumerate(zip(values, aggregations)): + valid_aggregations = get_valid_aggregation(col.dtype) + included_aggregations_i = [] + col_aggregations = [] + for agg in aggs: + str_agg = str(agg) + if _is_unsupported_agg_for_type(col.dtype, str_agg): + raise TypeError( + f"{col.dtype} type does not support {agg} operations" + ) + agg_obj = aggregation.make_aggregation(agg) + if ( + valid_aggregations == "ALL" + or agg_obj.kind in valid_aggregations + ): + included_aggregations_i.append((agg, agg_obj.kind)) + col_aggregations.append(agg_obj.plc_obj) + included_aggregations.append(included_aggregations_i) + result_columns.append([]) + if col_aggregations: + requests.append( + plc.groupby.GroupByRequest( + col.to_pylibcudf(mode="read"), col_aggregations + ) + ) + column_included.append(i) + + if not requests and any(len(v) > 0 for v in aggregations): + raise pd.errors.DataError( + "All requested aggregations are unsupported." + ) + + keys, results = ( + self._groupby.plc_groupby.scan(requests) + if _is_all_scan_aggregate(aggregations) + else self._groupby.plc_groupby.aggregate(requests) ) + for i, result in zip(column_included, results): + result_columns[i] = [ + ColumnBase.from_pylibcudf(col) for col in result.columns() + ] + + return ( + result_columns, + [ColumnBase.from_pylibcudf(key) for key in keys.columns()], + included_aggregations, + ) + + def _shift( + self, values: tuple[ColumnBase, ...], periods: int, fill_values: list + ) -> Generator[ColumnBase]: + _, shifts = self._groupby.plc_groupby.shift( + plc.table.Table([col.to_pylibcudf(mode="read") for col in values]), + [periods] * len(values), + [ + cudf.Scalar(val, dtype=col.dtype).device_value.c_value + for val, col in zip(fill_values, values) + ], + ) + return (ColumnBase.from_pylibcudf(col) for col in shifts.columns()) + + def _replace_nulls( + self, values: tuple[ColumnBase, ...], method: str + ) -> Generator[ColumnBase]: + _, replaced = self._groupby.plc_groupby.replace_nulls( + plc.Table([col.to_pylibcudf(mode="read") for col in values]), + [ + plc.replace.ReplacePolicy.PRECEDING + if method == "ffill" + else plc.replace.ReplacePolicy.FOLLOWING + ] + * len(values), + ) + + return (ColumnBase.from_pylibcudf(col) for col in replaced.columns()) + @_performance_tracking def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): """ @@ -702,7 +985,7 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): result_columns, grouped_key_cols, included_aggregations, - ) = self._groupby.aggregate(columns, normalized_aggs) + ) = self._aggregate(columns, normalized_aggs) result_index = self.grouping.keys._from_columns_like_self( grouped_key_cols, @@ -761,7 +1044,7 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): else: if cudf.get_option( "mode.pandas_compatible" - ) and not libgroupby._is_all_scan_aggregate(normalized_aggs): + ) and not _is_all_scan_aggregate(normalized_aggs): # Even with `sort=False`, pandas guarantees that # groupby preserves the order of rows within each group. left_cols = list(self.grouping.keys.drop_duplicates()._columns) @@ -810,7 +1093,7 @@ def agg(self, func=None, *args, engine=None, engine_kwargs=None, **kwargs): if not self._as_index: result = result.reset_index() - if libgroupby._is_all_scan_aggregate(normalized_aggs): + if _is_all_scan_aggregate(normalized_aggs): # Scan aggregations return rows in original index order return self._mimic_pandas_order(result) @@ -920,7 +1203,7 @@ def _head_tail(self, n, *, take_head: bool, preserve_order: bool): # Can't use _mimic_pandas_order because we need to # subsample the gather map from the full input ordering, # rather than permuting the gather map of the output. - _, _, (ordering,) = self._groupby.groups( + _, _, (ordering,) = self._groups( [as_column(range(0, len(self.obj)))] ) # Invert permutation from original order to groups on the @@ -1312,8 +1595,8 @@ def deserialize(cls, header, frames): return cls(obj, grouping, **kwargs) def _grouped(self, *, include_groups: bool = True): - offsets, grouped_key_cols, grouped_value_cols = self._groupby.groups( - [*self.obj.index._columns, *self.obj._columns] + offsets, grouped_key_cols, grouped_value_cols = self._groups( + itertools.chain(self.obj.index._columns, self.obj._columns) ) grouped_keys = cudf.core.index._index_from_data( dict(enumerate(grouped_key_cols)) @@ -1945,7 +2228,7 @@ def transform( "Currently, `transform()` supports only aggregations." ) from e # If the aggregation is a scan, don't broadcast - if libgroupby._is_all_scan_aggregate([[func]]): + if _is_all_scan_aggregate([[func]]): if len(result) != len(self.obj): raise AssertionError( "Unexpected result length for scan transform" @@ -2409,7 +2692,7 @@ def _scan_fill(self, method: str, limit: int) -> DataFrameOrSeries: dict( zip( values._column_names, - self._groupby.replace_nulls([*values._columns], method), + self._replace_nulls(values._columns, method), ) ) ) @@ -2513,7 +2796,7 @@ def fillna( @_performance_tracking def shift( self, - periods=1, + periods: int = 1, freq=None, axis=0, fill_value=None, @@ -2560,7 +2843,7 @@ def shift( if freq is not None: raise NotImplementedError("Parameter freq is unsupported.") - if not axis == 0: + if axis != 0: raise NotImplementedError("Only axis=0 is supported.") if suffix is not None: @@ -2568,20 +2851,18 @@ def shift( values = self.grouping.values if is_list_like(fill_value): - if len(fill_value) != len(values._data): + if len(fill_value) != values._num_columns: raise ValueError( "Mismatched number of columns and values to fill." ) else: - fill_value = [fill_value] * len(values._data) + fill_value = [fill_value] * values._num_columns result = self.obj.__class__._from_data( dict( zip( values._column_names, - self._groupby.shift( - [*values._columns], periods, fill_value - )[0], + self._shift(values._columns, periods, fill_value), ) ) ) @@ -2680,9 +2961,7 @@ def _mimic_pandas_order( # result coming back from libcudf has null_count few rows than # the input, so we must produce an ordering from the full # input range. - _, _, (ordering,) = self._groupby.groups( - [as_column(range(0, len(self.obj)))] - ) + _, _, (ordering,) = self._groups([as_column(range(0, len(self.obj)))]) if self._dropna and any( c.has_nulls(include_nan=True) > 0 for c in self.grouping._key_columns @@ -3087,7 +3366,7 @@ def agg(self, func, *args, engine=None, engine_kwargs=None, **kwargs): # drop the first level if we have a multiindex if result._data.nlevels > 1: - result.columns = result._data.to_pandas_index().droplevel(0) + result.columns = result._data.to_pandas_index.droplevel(0) return result diff --git a/python/cudf/cudf/core/indexed_frame.py b/python/cudf/cudf/core/indexed_frame.py index 72bb85821fa..6854cb02aa5 100644 --- a/python/cudf/cudf/core/indexed_frame.py +++ b/python/cudf/cudf/core/indexed_frame.py @@ -1106,13 +1106,11 @@ def dot(self, other, reflect=False): lhs = self.reindex(index=common, copy=False).values rhs = other.reindex(index=common, copy=False).values if isinstance(other, cudf.DataFrame): - result_index = other._data.to_pandas_index() + result_index = other._data.to_pandas_index elif isinstance(self, cudf.DataFrame) and isinstance( other, (cudf.Series, cudf.DataFrame) ): - common = self._data.to_pandas_index().union( - other.index.to_pandas() - ) + common = self._data.to_pandas_index.union(other.index.to_pandas()) if len(common) > self._num_columns or len(common) > len( other.index ): @@ -1124,7 +1122,7 @@ def dot(self, other, reflect=False): rhs = other.reindex(index=common, copy=False).values lhs = lhs.values if isinstance(other, cudf.DataFrame): - result_cols = other._data.to_pandas_index() + result_cols = other._data.to_pandas_index elif isinstance( other, (cp.ndarray, np.ndarray) @@ -2244,7 +2242,7 @@ def truncate(self, before=None, after=None, axis=0, copy=True): if not copy: raise ValueError("Truncating with copy=False is not supported.") axis = self._get_axis_from_axis_arg(axis) - ax = self.index if axis == 0 else self._data.to_pandas_index() + ax = self.index if axis == 0 else self._data.to_pandas_index if not ax.is_monotonic_increasing and not ax.is_monotonic_decreasing: raise ValueError("truncate requires a sorted index") @@ -6770,7 +6768,7 @@ def _drop_rows_by_labels( return obj.__class__._from_data( join_res.iloc[:, idx_nlv:]._data, index=midx, - columns=obj._data.to_pandas_index(), + columns=obj._data.to_pandas_index, ) else: diff --git a/python/cudf/cudf/core/multiindex.py b/python/cudf/cudf/core/multiindex.py index d2afe643dc4..1e613e49ffc 100644 --- a/python/cudf/cudf/core/multiindex.py +++ b/python/cudf/cudf/core/multiindex.py @@ -1123,7 +1123,7 @@ def _concat(cls, objs) -> Self: # TODO: Verify if this is really necessary or if we can rely on # DataFrame._concat. if len(source_data) > 1: - colnames = source_data[0]._data.to_pandas_index() + colnames = source_data[0]._data.to_pandas_index for obj in source_data[1:]: obj.columns = colnames @@ -2068,7 +2068,7 @@ def _union(self, other, sort=None) -> Self: result_df = self_df.merge(other_df, on=col_names, how="outer") result_df = result_df.sort_values( - by=result_df._data.to_pandas_index()[self.nlevels :], + by=result_df._data.to_pandas_index[self.nlevels :], ignore_index=True, ) diff --git a/python/cudf/cudf/core/reshape.py b/python/cudf/cudf/core/reshape.py index 3ab6ed306b6..0abd42d4d4e 100644 --- a/python/cudf/cudf/core/reshape.py +++ b/python/cudf/cudf/core/reshape.py @@ -431,8 +431,9 @@ def concat( result_columns = ( objs[0] - ._data.to_pandas_index() - .append([obj._data.to_pandas_index() for obj in objs[1:]]) + ._data.to_pandas_index.append( + [obj._data.to_pandas_index for obj in objs[1:]] + ) .unique() ) @@ -689,7 +690,7 @@ def _tile(A, reps): if not value_vars: # TODO: Use frame._data.label_dtype when it's more consistently set var_data = cudf.Series( - value_vars, dtype=frame._data.to_pandas_index().dtype + value_vars, dtype=frame._data.to_pandas_index.dtype ) else: var_data = ( @@ -1273,7 +1274,7 @@ def unstack(df, level, fill_value=None, sort: bool = True): res = df.T.stack(future_stack=False) # Result's index is a multiindex res.index.names = ( - tuple(df._data.to_pandas_index().names) + df.index.names + tuple(df._data.to_pandas_index.names) + df.index.names ) return res else: diff --git a/python/cudf/cudf/core/window/rolling.py b/python/cudf/cudf/core/window/rolling.py index a580c35ccbf..2f8a6d9e5e7 100644 --- a/python/cudf/cudf/core/window/rolling.py +++ b/python/cudf/cudf/core/window/rolling.py @@ -315,7 +315,7 @@ def _apply_agg_column(self, source_column, agg_name): {"dtype": source_column.dtype} if callable(agg_name) else self.agg_params, - ).c_obj, + ).plc_obj, ) ) diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index 39a85465deb..4be556e1d67 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -54,6 +54,22 @@ def _get_cudf_schema_element_from_dtype( return lib_type, child_types +def _to_plc_compression( + compression: Literal["infer", "gzip", "bz2", "zip", "xz", None], +) -> plc.io.types.CompressionType: + if compression is not None: + if compression == "gzip": + return plc.io.types.CompressionType.GZIP + elif compression == "bz2": + return plc.io.types.CompressionType.BZIP2 + elif compression == "zip": + return plc.io.types.CompressionType.ZIP + else: + return plc.io.types.CompressionType.AUTO + else: + return plc.io.types.CompressionType.NONE + + @ioutils.doc_read_json() def read_json( path_or_buf, @@ -115,17 +131,7 @@ def read_json( if isinstance(source, str) and not os.path.isfile(source): filepaths_or_buffers[idx] = source.encode() - if compression is not None: - if compression == "gzip": - c_compression = plc.io.types.CompressionType.GZIP - elif compression == "bz2": - c_compression = plc.io.types.CompressionType.BZIP2 - elif compression == "zip": - c_compression = plc.io.types.CompressionType.ZIP - else: - c_compression = plc.io.types.CompressionType.AUTO - else: - c_compression = plc.io.types.CompressionType.NONE + c_compression = _to_plc_compression(compression) if on_bad_lines.lower() == "error": c_on_bad_lines = plc.io.types.JSONRecoveryMode.FAIL @@ -291,6 +297,7 @@ def _plc_write_json( include_nulls: bool = True, lines: bool = False, rows_per_chunk: int = 1024 * 64, # 64K rows + compression: Literal["infer", "gzip", "bz2", "zip", "xz", None] = None, ) -> None: try: tbl_w_meta = plc.io.TableWithMetadata( @@ -307,6 +314,7 @@ def _plc_write_json( .na_rep(na_rep) .include_nulls(include_nulls) .lines(lines) + .compression(_to_plc_compression(compression)) .build() ) if rows_per_chunk != np.iinfo(np.int32).max: diff --git a/python/cudf/cudf/testing/testing.py b/python/cudf/cudf/testing/testing.py index 0b09cf7dc34..a1df2c7d857 100644 --- a/python/cudf/cudf/testing/testing.py +++ b/python/cudf/cudf/testing/testing.py @@ -692,8 +692,8 @@ def assert_frame_equal( ) pd.testing.assert_index_equal( - left._data.to_pandas_index(), - right._data.to_pandas_index(), + left._data.to_pandas_index, + right._data.to_pandas_index, exact=check_column_type, check_names=check_names, check_exact=check_exact, diff --git a/python/cudf/cudf/tests/test_column_accessor.py b/python/cudf/cudf/tests/test_column_accessor.py index 5cef077c18d..27ec4fcd1f3 100644 --- a/python/cudf/cudf/tests/test_column_accessor.py +++ b/python/cudf/cudf/tests/test_column_accessor.py @@ -64,7 +64,7 @@ def test_to_pandas_simple(simple_data): # Index([], dtype='object'), and `integer` for RangeIndex() # to ignore this `inferred_type` comparison, we pass exact=False. assert_eq( - ca.to_pandas_index(), + ca.to_pandas_index, pd.DataFrame( {key: value.values_host for key, value in simple_data.items()} ).columns, @@ -75,7 +75,7 @@ def test_to_pandas_simple(simple_data): def test_to_pandas_multiindex(mi_data): ca = ColumnAccessor(mi_data, multiindex=True) assert_eq( - ca.to_pandas_index(), + ca.to_pandas_index, pd.DataFrame( {key: value.values_host for key, value in mi_data.items()} ).columns, @@ -89,7 +89,7 @@ def test_to_pandas_multiindex_names(): level_names=("foo", "bar"), ) assert_eq( - ca.to_pandas_index(), + ca.to_pandas_index, pd.MultiIndex.from_tuples( (("a", "b"), ("c", "d")), names=("foo", "bar") ), diff --git a/python/cudf/cudf/tests/test_dataframe.py b/python/cudf/cudf/tests/test_dataframe.py index d04fd97dcbd..11a9b398b50 100644 --- a/python/cudf/cudf/tests/test_dataframe.py +++ b/python/cudf/cudf/tests/test_dataframe.py @@ -11193,3 +11193,32 @@ def test_dataframe_init_column(): expect = cudf.DataFrame({"a": s}) actual = cudf.DataFrame._from_arrays(s._column, columns=["a"]) assert_eq(expect, actual) + + +@pytest.mark.parametrize("name", [None, "foo", 1, 1.0]) +def test_dataframe_column_name(name): + df = cudf.DataFrame({"a": [1, 2, 3]}) + pdf = df.to_pandas() + + df.columns.name = name + pdf.columns.name = name + + assert_eq(df, pdf) + assert_eq(df.columns.name, pdf.columns.name) + + +@pytest.mark.parametrize("names", [["abc", "def"], [1, 2], ["abc", 10]]) +def test_dataframe_multiindex_column_names(names): + arrays = [["A", "A", "B", "B"], ["one", "two", "one", "two"]] + tuples = list(zip(*arrays)) + index = pd.MultiIndex.from_tuples(tuples, names=["first", "second"]) + + pdf = pd.DataFrame([[1, 2, 3, 4], [5, 6, 7, 8]], columns=index) + df = cudf.from_pandas(pdf) + + assert_eq(df, pdf) + assert_eq(df.columns.names, pdf.columns.names) + pdf.columns.names = names + df.columns.names = names + assert_eq(df, pdf) + assert_eq(df.columns.names, pdf.columns.names) diff --git a/python/cudf/cudf/tests/test_groupby.py b/python/cudf/cudf/tests/test_groupby.py index d8a2528230e..db4f3cd3c9f 100644 --- a/python/cudf/cudf/tests/test_groupby.py +++ b/python/cudf/cudf/tests/test_groupby.py @@ -3960,8 +3960,8 @@ def test_group_by_value_counts_with_count_column(): def test_groupby_internal_groups_empty(gdf): # test that we don't segfault when calling the internal # .groups() method with an empty list: - gb = gdf.groupby("y")._groupby - _, _, grouped_vals = gb.groups([]) + gb = gdf.groupby("y") + _, _, grouped_vals = gb._groups([]) assert grouped_vals == [] diff --git a/python/cudf/cudf/tests/test_json.py b/python/cudf/cudf/tests/test_json.py index aaa8d7d07ee..db34329261f 100644 --- a/python/cudf/cudf/tests/test_json.py +++ b/python/cudf/cudf/tests/test_json.py @@ -1453,3 +1453,12 @@ def test_chunked_json_reader(): with cudf.option_context("io.json.low_memory", True): gdf = cudf.read_json(buf, lines=True) assert_eq(df, gdf) + + +@pytest.mark.parametrize("compression", ["gzip", None]) +def test_roundtrip_compression(compression, tmp_path): + expected = cudf.DataFrame({"a": 1, "b": "2"}) + fle = BytesIO() + expected.to_json(fle, engine="cudf", compression=compression) + result = cudf.read_json(fle, engine="cudf", compression=compression) + assert_eq(result, expected) diff --git a/python/cudf_polars/cudf_polars/callback.py b/python/cudf_polars/cudf_polars/callback.py index 29d3dc4ae79..074096446fd 100644 --- a/python/cudf_polars/cudf_polars/callback.py +++ b/python/cudf_polars/cudf_polars/callback.py @@ -231,7 +231,8 @@ def validate_config_options(config: dict) -> None: executor = config.get("executor", "pylibcudf") if executor == "dask-experimental": unsupported = config.get("executor_options", {}).keys() - { - "max_rows_per_partition" + "max_rows_per_partition", + "parquet_blocksize", } else: unsupported = config.get("executor_options", {}).keys() diff --git a/python/cudf_polars/cudf_polars/experimental/io.py b/python/cudf_polars/cudf_polars/experimental/io.py index 3a1fec36079..2a5b400af4c 100644 --- a/python/cudf_polars/cudf_polars/experimental/io.py +++ b/python/cudf_polars/cudf_polars/experimental/io.py @@ -4,18 +4,24 @@ from __future__ import annotations +import enum import math -from typing import TYPE_CHECKING +import random +from enum import IntEnum +from typing import TYPE_CHECKING, Any -from cudf_polars.dsl.ir import DataFrameScan, Union +import pylibcudf as plc + +from cudf_polars.dsl.ir import IR, DataFrameScan, Scan, Union from cudf_polars.experimental.base import PartitionInfo from cudf_polars.experimental.dispatch import lower_ir_node if TYPE_CHECKING: from collections.abc import MutableMapping - from cudf_polars.dsl.ir import IR + from cudf_polars.dsl.expr import NamedExpr from cudf_polars.experimental.dispatch import LowerIRTransformer + from cudf_polars.typing import Schema @lower_ir_node.register(DataFrameScan) @@ -47,3 +53,274 @@ def _( } return ir, {ir: PartitionInfo(count=1)} + + +class ScanPartitionFlavor(IntEnum): + """Flavor of Scan partitioning.""" + + SINGLE_FILE = enum.auto() # 1:1 mapping between files and partitions + SPLIT_FILES = enum.auto() # Split each file into >1 partition + FUSED_FILES = enum.auto() # Fuse multiple files into each partition + + +class ScanPartitionPlan: + """ + Scan partitioning plan. + + Notes + ----- + The meaning of `factor` depends on the value of `flavor`: + - SINGLE_FILE: `factor` must be `1`. + - SPLIT_FILES: `factor` is the number of partitions per file. + - FUSED_FILES: `factor` is the number of files per partition. + """ + + __slots__ = ("factor", "flavor") + factor: int + flavor: ScanPartitionFlavor + + def __init__(self, factor: int, flavor: ScanPartitionFlavor) -> None: + if ( + flavor == ScanPartitionFlavor.SINGLE_FILE and factor != 1 + ): # pragma: no cover + raise ValueError(f"Expected factor == 1 for {flavor}, got: {factor}") + self.factor = factor + self.flavor = flavor + + @staticmethod + def from_scan(ir: Scan) -> ScanPartitionPlan: + """Extract the partitioning plan of a Scan operation.""" + if ir.typ == "parquet": + # TODO: Use system info to set default blocksize + parallel_options = ir.config_options.get("executor_options", {}) + blocksize: int = parallel_options.get("parquet_blocksize", 1024**3) + stats = _sample_pq_statistics(ir) + file_size = sum(float(stats[column]) for column in ir.schema) + if file_size > 0: + if file_size > blocksize: + # Split large files + return ScanPartitionPlan( + math.ceil(file_size / blocksize), + ScanPartitionFlavor.SPLIT_FILES, + ) + else: + # Fuse small files + return ScanPartitionPlan( + max(blocksize // int(file_size), 1), + ScanPartitionFlavor.FUSED_FILES, + ) + + # TODO: Use file sizes for csv and json + return ScanPartitionPlan(1, ScanPartitionFlavor.SINGLE_FILE) + + +class SplitScan(IR): + """ + Input from a split file. + + This class wraps a single-file `Scan` object. At + IO/evaluation time, this class will only perform + a partial read of the underlying file. The range + (skip_rows and n_rows) is calculated at IO time. + """ + + __slots__ = ( + "base_scan", + "schema", + "split_index", + "total_splits", + ) + _non_child = ( + "schema", + "base_scan", + "split_index", + "total_splits", + ) + base_scan: Scan + """Scan operation this node is based on.""" + split_index: int + """Index of the current split.""" + total_splits: int + """Total number of splits.""" + + def __init__( + self, schema: Schema, base_scan: Scan, split_index: int, total_splits: int + ): + self.schema = schema + self.base_scan = base_scan + self.split_index = split_index + self.total_splits = total_splits + self._non_child_args = ( + split_index, + total_splits, + *base_scan._non_child_args, + ) + self.children = () + if base_scan.typ not in ("parquet",): # pragma: no cover + raise NotImplementedError( + f"Unhandled Scan type for file splitting: {base_scan.typ}" + ) + + @classmethod + def do_evaluate( + cls, + split_index: int, + total_splits: int, + schema: Schema, + typ: str, + reader_options: dict[str, Any], + config_options: dict[str, Any], + paths: list[str], + with_columns: list[str] | None, + skip_rows: int, + n_rows: int, + row_index: tuple[str, int] | None, + predicate: NamedExpr | None, + ): + """Evaluate and return a dataframe.""" + if typ not in ("parquet",): # pragma: no cover + raise NotImplementedError(f"Unhandled Scan type for file splitting: {typ}") + + if len(paths) > 1: # pragma: no cover + raise ValueError(f"Expected a single path, got: {paths}") + + # Parquet logic: + # - We are one of "total_splits" SplitScan nodes + # assigned to the same file. + # - We know our index within this file ("split_index") + # - We can also use parquet metadata to query the + # total number of rows in each row-group of the file. + # - We can use all this information to calculate the + # "skip_rows" and "n_rows" options to use locally. + + rowgroup_metadata = plc.io.parquet_metadata.read_parquet_metadata( + plc.io.SourceInfo(paths) + ).rowgroup_metadata() + total_row_groups = len(rowgroup_metadata) + if total_splits <= total_row_groups: + # We have enough row-groups in the file to align + # all "total_splits" of our reads with row-group + # boundaries. Calculate which row-groups to include + # in the current read, and use metadata to translate + # the row-group indices to "skip_rows" and "n_rows". + rg_stride = total_row_groups // total_splits + skip_rgs = rg_stride * split_index + skip_rows = sum(rg["num_rows"] for rg in rowgroup_metadata[:skip_rgs]) + n_rows = sum( + rg["num_rows"] + for rg in rowgroup_metadata[skip_rgs : skip_rgs + rg_stride] + ) + else: + # There are not enough row-groups to align + # all "total_splits" of our reads with row-group + # boundaries. Use metadata to directly calculate + # "skip_rows" and "n_rows" for the current read. + total_rows = sum(rg["num_rows"] for rg in rowgroup_metadata) + n_rows = total_rows // total_splits + skip_rows = n_rows * split_index + + # Last split should always read to end of file + if split_index == (total_splits - 1): + n_rows = -1 + + # Perform the partial read + return Scan.do_evaluate( + schema, + typ, + reader_options, + config_options, + paths, + with_columns, + skip_rows, + n_rows, + row_index, + predicate, + ) + + +def _sample_pq_statistics(ir: Scan) -> dict[str, float]: + import numpy as np + import pyarrow.dataset as pa_ds + + # Use average total_uncompressed_size of three files + # TODO: Use plc.io.parquet_metadata.read_parquet_metadata + n_sample = 3 + column_sizes = {} + ds = pa_ds.dataset(random.sample(ir.paths, n_sample), format="parquet") + for i, frag in enumerate(ds.get_fragments()): + md = frag.metadata + for rg in range(md.num_row_groups): + row_group = md.row_group(rg) + for col in range(row_group.num_columns): + column = row_group.column(col) + name = column.path_in_schema + if name not in column_sizes: + column_sizes[name] = np.zeros(n_sample, dtype="int64") + column_sizes[name][i] += column.total_uncompressed_size + + return {name: np.mean(sizes) for name, sizes in column_sizes.items()} + + +@lower_ir_node.register(Scan) +def _( + ir: Scan, rec: LowerIRTransformer +) -> tuple[IR, MutableMapping[IR, PartitionInfo]]: + partition_info: MutableMapping[IR, PartitionInfo] + if ir.typ in ("csv", "parquet", "ndjson") and ir.n_rows == -1 and ir.skip_rows == 0: + plan = ScanPartitionPlan.from_scan(ir) + paths = list(ir.paths) + if plan.flavor == ScanPartitionFlavor.SPLIT_FILES: + # Disable chunked reader when splitting files + config_options = ir.config_options.copy() + config_options["parquet_options"] = config_options.get( + "parquet_options", {} + ).copy() + config_options["parquet_options"]["chunked"] = False + + slices: list[SplitScan] = [] + for path in paths: + base_scan = Scan( + ir.schema, + ir.typ, + ir.reader_options, + ir.cloud_options, + config_options, + [path], + ir.with_columns, + ir.skip_rows, + ir.n_rows, + ir.row_index, + ir.predicate, + ) + slices.extend( + SplitScan(ir.schema, base_scan, sindex, plan.factor) + for sindex in range(plan.factor) + ) + new_node = Union(ir.schema, None, *slices) + partition_info = {slice: PartitionInfo(count=1) for slice in slices} | { + new_node: PartitionInfo(count=len(slices)) + } + else: + groups: list[Scan] = [ + Scan( + ir.schema, + ir.typ, + ir.reader_options, + ir.cloud_options, + ir.config_options, + paths[i : i + plan.factor], + ir.with_columns, + ir.skip_rows, + ir.n_rows, + ir.row_index, + ir.predicate, + ) + for i in range(0, len(paths), plan.factor) + ] + new_node = Union(ir.schema, None, *groups) + partition_info = {group: PartitionInfo(count=1) for group in groups} | { + new_node: PartitionInfo(count=len(groups)) + } + return new_node, partition_info + + return ir, {ir: PartitionInfo(count=1)} # pragma: no cover diff --git a/python/cudf_polars/tests/experimental/test_scan.py b/python/cudf_polars/tests/experimental/test_scan.py new file mode 100644 index 00000000000..a26d751dc86 --- /dev/null +++ b/python/cudf_polars/tests/experimental/test_scan.py @@ -0,0 +1,80 @@ +# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES. +# SPDX-License-Identifier: Apache-2.0 + +from __future__ import annotations + +import pytest + +import polars as pl + +from cudf_polars import Translator +from cudf_polars.experimental.parallel import lower_ir_graph +from cudf_polars.testing.asserts import assert_gpu_result_equal + + +@pytest.fixture(scope="module") +def df(): + return pl.DataFrame( + { + "x": range(3_000), + "y": ["cat", "dog", "fish"] * 1_000, + "z": [1.0, 2.0, 3.0, 4.0, 5.0] * 600, + } + ) + + +def make_source(df, path, fmt, n_files=3): + n_rows = len(df) + stride = int(n_rows / n_files) + for i in range(n_files): + offset = stride * i + part = df.slice(offset, stride) + if fmt == "csv": + part.write_csv(path / f"part.{i}.csv") + elif fmt == "ndjson": + part.write_ndjson(path / f"part.{i}.ndjson") + else: + part.write_parquet( + path / f"part.{i}.parquet", + row_group_size=int(stride / 2), + ) + + +@pytest.mark.parametrize( + "fmt, scan_fn", + [ + ("csv", pl.scan_csv), + ("ndjson", pl.scan_ndjson), + ("parquet", pl.scan_parquet), + ], +) +def test_parallel_scan(tmp_path, df, fmt, scan_fn): + make_source(df, tmp_path, fmt) + q = scan_fn(tmp_path) + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + ) + assert_gpu_result_equal(q, engine=engine) + + +@pytest.mark.parametrize("blocksize", [1_000, 10_000, 1_000_000]) +def test_parquet_blocksize(tmp_path, df, blocksize): + n_files = 3 + make_source(df, tmp_path, "parquet", n_files) + q = pl.scan_parquet(tmp_path) + engine = pl.GPUEngine( + raise_on_fail=True, + executor="dask-experimental", + executor_options={"parquet_blocksize": blocksize}, + ) + assert_gpu_result_equal(q, engine=engine) + + # Check partitioning + qir = Translator(q._ldf.visit(), engine).translate_ir() + ir, info = lower_ir_graph(qir) + count = info[ir].count + if blocksize <= 12_000: + assert count > n_files + else: + assert count < n_files diff --git a/python/pylibcudf/pylibcudf/interop.pyx b/python/pylibcudf/pylibcudf/interop.pyx index bd5397ac328..7a102cf0c88 100644 --- a/python/pylibcudf/pylibcudf/interop.pyx +++ b/python/pylibcudf/pylibcudf/interop.pyx @@ -273,10 +273,19 @@ cdef void _release_array(object array_capsule) noexcept: free(array) +def _maybe_create_nested_column_metadata(Column col): + return ColumnMetadata( + children_meta=[ + _maybe_create_nested_column_metadata(child) for child in col.children() + ] + ) + + def _table_to_schema(Table tbl, metadata): if metadata is None: - metadata = [ColumnMetadata() for _ in range(len(tbl.columns()))] - metadata = [ColumnMetadata(m) if isinstance(m, str) else m for m in metadata] + metadata = [_maybe_create_nested_column_metadata(col) for col in tbl.columns()] + else: + metadata = [ColumnMetadata(m) if isinstance(m, str) else m for m in metadata] cdef vector[column_metadata] c_metadata c_metadata.reserve(len(metadata)) diff --git a/python/pylibcudf/pylibcudf/io/json.pxd b/python/pylibcudf/pylibcudf/io/json.pxd index be1edca5089..93481cb9c1c 100644 --- a/python/pylibcudf/pylibcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/io/json.pxd @@ -63,6 +63,7 @@ cdef class JsonWriterOptions: cpdef void set_rows_per_chunk(self, size_type val) cpdef void set_true_value(self, str val) cpdef void set_false_value(self, str val) + cpdef void set_compression(self, compression_type comptype) cdef class JsonWriterOptionsBuilder: cdef json_writer_options_builder c_obj @@ -72,6 +73,7 @@ cdef class JsonWriterOptionsBuilder: cpdef JsonWriterOptionsBuilder na_rep(self, str val) cpdef JsonWriterOptionsBuilder include_nulls(self, bool val) cpdef JsonWriterOptionsBuilder lines(self, bool val) + cpdef JsonWriterOptionsBuilder compression(self, compression_type comptype) cpdef JsonWriterOptions build(self) cpdef void write_json(JsonWriterOptions options, Stream stream = *) diff --git a/python/pylibcudf/pylibcudf/io/json.pyi b/python/pylibcudf/pylibcudf/io/json.pyi index 70b7106c075..c8e16e13baf 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyi +++ b/python/pylibcudf/pylibcudf/io/json.pyi @@ -62,12 +62,14 @@ class JsonWriterOptions: def set_rows_per_chunk(self, val: int) -> None: ... def set_true_value(self, val: str) -> None: ... def set_false_value(self, val: str) -> None: ... + def set_compression(self, comptype: CompressionType) -> None: ... class JsonWriterOptionsBuilder: def metadata(self, tbl_w_meta: TableWithMetadata) -> Self: ... def na_rep(self, val: str) -> Self: ... def include_nulls(self, val: bool) -> Self: ... def lines(self, val: bool) -> Self: ... + def compression(self, comptype: CompressionType) -> Self: ... def build(self) -> JsonWriterOptions: ... def write_json(options: JsonWriterOptions, stream: Stream = None) -> None: ... diff --git a/python/pylibcudf/pylibcudf/io/json.pyx b/python/pylibcudf/pylibcudf/io/json.pyx index ecfa6785fa5..b999503083b 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyx +++ b/python/pylibcudf/pylibcudf/io/json.pyx @@ -597,6 +597,20 @@ cdef class JsonWriterOptions: """ self.c_obj.set_false_value(val.encode()) + cpdef void set_compression(self, compression_type comptype): + """ + Sets compression type to be used + + Parameters + ---------- + comptype : CompressionType + Compression type for sink + + Returns + ------- + None + """ + self.c_obj.set_compression(comptype) cdef class JsonWriterOptionsBuilder: cpdef JsonWriterOptionsBuilder metadata(self, TableWithMetadata tbl_w_meta): @@ -663,6 +677,22 @@ cdef class JsonWriterOptionsBuilder: self.c_obj.lines(val) return self + cpdef JsonWriterOptionsBuilder compression(self, compression_type comptype): + """ + Sets compression type of output sink. + + Parameters + ---------- + comptype : CompressionType + Compression type used + + Returns + ------- + Self + """ + self.c_obj.compression(comptype) + return self + cpdef JsonWriterOptions build(self): """Create a JsonWriterOptions object""" cdef JsonWriterOptions json_options = JsonWriterOptions.__new__( diff --git a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd index 1037f13c834..a836538e990 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd @@ -170,6 +170,8 @@ cdef extern from "cudf/io/json.hpp" \ size_type get_rows_per_chunk() except +libcudf_exception_handler string get_true_value() except +libcudf_exception_handler string get_false_value() except +libcudf_exception_handler + cudf_io_types.compression_type get_compression()\ + except +libcudf_exception_handler # setter void set_table( @@ -184,6 +186,9 @@ cdef extern from "cudf/io/json.hpp" \ void set_rows_per_chunk(size_type val) except +libcudf_exception_handler void set_true_value(string val) except +libcudf_exception_handler void set_false_value(string val) except +libcudf_exception_handler + void set_compression( + cudf_io_types.compression_type comptype + ) except +libcudf_exception_handler @staticmethod json_writer_options_builder builder( @@ -221,6 +226,9 @@ cdef extern from "cudf/io/json.hpp" \ json_writer_options_builder& false_value( string val ) except +libcudf_exception_handler + json_writer_options_builder& compression( + cudf_io_types.compression_type comptype + ) except +libcudf_exception_handler json_writer_options build() except +libcudf_exception_handler diff --git a/python/pylibcudf/pylibcudf/tests/test_interop.py b/python/pylibcudf/pylibcudf/tests/test_interop.py index af80b6e5978..ca42eacdfdb 100644 --- a/python/pylibcudf/pylibcudf/tests/test_interop.py +++ b/python/pylibcudf/pylibcudf/tests/test_interop.py @@ -40,6 +40,28 @@ def test_struct_dtype_roundtrip(): assert arrow_type == struct_type +def test_table_with_nested_dtype_to_arrow(): + pa_array = pa.array([[{"": 1}]]) + plc_table = plc.Table([plc.interop.from_arrow(pa_array)]) + result = plc.interop.to_arrow(plc_table) + expected_schema = pa.schema( + [ + pa.field( + "", + pa.list_( + pa.field( + "", + pa.struct([pa.field("", pa.int64(), nullable=False)]), + nullable=False, + ) + ), + nullable=False, + ) + ] + ) + assert result.schema == expected_schema + + def test_decimal128_roundtrip(): decimal_type = pa.decimal128(10, 2) plc_type = plc.interop.from_arrow(decimal_type)