diff --git a/cpp/src/groupby/hash/compute_groupby.cu b/cpp/src/groupby/hash/compute_groupby.cu index e1dbf2a3d9e..9648d942513 100644 --- a/cpp/src/groupby/hash/compute_groupby.cu +++ b/cpp/src/groupby/hash/compute_groupby.cu @@ -61,7 +61,7 @@ std::unique_ptr compute_groupby(table_view const& keys, d_row_equal, probing_scheme_t{d_row_hash}, cuco::thread_scope_device, - cuco::storage{}, + cuco::storage{}, cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}, stream.value()}; diff --git a/cpp/src/groupby/hash/compute_mapping_indices.cuh b/cpp/src/groupby/hash/compute_mapping_indices.cuh index d353830780f..f86a93109be 100644 --- a/cpp/src/groupby/hash/compute_mapping_indices.cuh +++ b/cpp/src/groupby/hash/compute_mapping_indices.cuh @@ -106,15 +106,15 @@ CUDF_KERNEL void mapping_indices_kernel(cudf::size_type num_input_rows, __shared__ cudf::size_type shared_set_indices[GROUPBY_SHM_MAX_ELEMENTS]; // Shared set initialization - __shared__ cuco::window windows[window_extent.value()]; + __shared__ cuco::bucket buckets[bucket_extent.value()]; auto raw_set = cuco::static_set_ref{ cuco::empty_key{cudf::detail::CUDF_SIZE_TYPE_SENTINEL}, global_set.key_eq(), probing_scheme_t{global_set.hash_function()}, cuco::thread_scope_block, - cuco::aow_storage_ref{ - window_extent, windows}}; + cuco::bucket_storage_ref{ + bucket_extent, buckets}}; auto shared_set = raw_set.rebind_operators(cuco::insert_and_find); auto const block = cooperative_groups::this_thread_block(); diff --git a/cpp/src/groupby/hash/helpers.cuh b/cpp/src/groupby/hash/helpers.cuh index f950e03e0fb..92925e11bac 100644 --- a/cpp/src/groupby/hash/helpers.cuh +++ b/cpp/src/groupby/hash/helpers.cuh @@ -27,7 +27,7 @@ namespace cudf::groupby::detail::hash { CUDF_HOST_DEVICE auto constexpr GROUPBY_CG_SIZE = 1; /// Number of slots per thread -CUDF_HOST_DEVICE auto constexpr GROUPBY_WINDOW_SIZE = 1; +CUDF_HOST_DEVICE auto constexpr GROUPBY_BUCKET_SIZE = 1; /// Thread block size CUDF_HOST_DEVICE auto constexpr GROUPBY_BLOCK_SIZE = 128; @@ -48,9 +48,9 @@ using shmem_extent_t = cuco::extent(static_cast(GROUPBY_SHM_MAX_ELEMENTS) * 1.43)>; -/// Number of windows needed by each shared memory hash set -CUDF_HOST_DEVICE auto constexpr window_extent = - cuco::make_window_extent(shmem_extent_t{}); +/// Number of buckets needed by each shared memory hash set +CUDF_HOST_DEVICE auto constexpr bucket_extent = + cuco::make_bucket_extent(shmem_extent_t{}); using row_hash_t = cudf::experimental::row::hash::device_row_hasher, - cuco::storage>; + cuco::storage>; using nullable_global_set_t = cuco::static_set, @@ -83,7 +83,7 @@ using nullable_global_set_t = cuco::static_set, - cuco::storage>; + cuco::storage>; template using hash_set_ref_t = cuco::static_set_ref< @@ -91,7 +91,7 @@ using hash_set_ref_t = cuco::static_set_ref< cuda::thread_scope_device, row_comparator_t, probing_scheme_t, - cuco::aow_storage_ref>, + cuco::bucket_storage_ref>, Op>; template @@ -100,6 +100,6 @@ using nullable_hash_set_ref_t = cuco::static_set_ref< cuda::thread_scope_device, nullable_row_comparator_t, probing_scheme_t, - cuco::aow_storage_ref>, + cuco::bucket_storage_ref>, Op>; } // namespace cudf::groupby::detail::hash diff --git a/cpp/src/io/orc/dict_enc.cu b/cpp/src/io/orc/dict_enc.cu index 0cb5c382631..7facc6497ed 100644 --- a/cpp/src/io/orc/dict_enc.cu +++ b/cpp/src/io/orc/dict_enc.cu @@ -180,9 +180,9 @@ CUDF_KERNEL void __launch_bounds__(block_size) for (size_type i = 0; i < dict.map_slots.size(); i += block_size) { if (t + i < dict.map_slots.size()) { - auto window = dict.map_slots.begin() + t + i; - // Collect all slots from each window. - for (auto& slot : *window) { + auto bucket = dict.map_slots.begin() + t + i; + // Collect all slots from each bucket. + for (auto& slot : *bucket) { auto const key = slot.first; if (key != KEY_SENTINEL) { auto loc = counter.fetch_add(1, memory_order_relaxed); diff --git a/cpp/src/io/orc/orc_gpu.hpp b/cpp/src/io/orc/orc_gpu.hpp index daff429c087..f4e75f78dec 100644 --- a/cpp/src/io/orc/orc_gpu.hpp +++ b/cpp/src/io/orc/orc_gpu.hpp @@ -47,16 +47,16 @@ using slot_type = cuco::pair; auto constexpr map_cg_size = 1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset. ///< Note: Adjust insert and find loops to use `cg::tile` if increasing this. -auto constexpr window_size = +auto constexpr bucket_size = 1; ///< Number of concurrent slots (set for best performance) handled by each thread. auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size ///< N * (1/0.7) = 1.43 to target a 70% occupancy factor. -using storage_type = cuco::aow_storage, - cudf::detail::cuco_allocator>; +using storage_type = cuco::bucket_storage, + cudf::detail::cuco_allocator>; using storage_ref_type = typename storage_type::ref_type; -using window_type = typename storage_type::window_type; +using bucket_type = typename storage_type::bucket_type; using slot_type = cuco::pair; auto constexpr KEY_SENTINEL = size_type{-1}; @@ -193,7 +193,7 @@ struct StripeStream { */ struct stripe_dictionary { // input - device_span map_slots; // hash map (windows) storage + device_span map_slots; // hash map (buckets) storage uint32_t column_idx = 0; // column index size_type start_row = 0; // first row in the stripe size_type start_rowgroup = 0; // first rowgroup in the stripe diff --git a/cpp/src/io/parquet/chunk_dict.cu b/cpp/src/io/parquet/chunk_dict.cu index b85ebf2fa1a..b5f9b894c46 100644 --- a/cpp/src/io/parquet/chunk_dict.cu +++ b/cpp/src/io/parquet/chunk_dict.cu @@ -210,7 +210,7 @@ struct map_find_fn { template CUDF_KERNEL void __launch_bounds__(block_size) - populate_chunk_hash_maps_kernel(device_span const map_storage, + populate_chunk_hash_maps_kernel(device_span const map_storage, cudf::detail::device_2dspan frags) { auto const col_idx = blockIdx.y; @@ -239,7 +239,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) template CUDF_KERNEL void __launch_bounds__(block_size) - collect_map_entries_kernel(device_span const map_storage, + collect_map_entries_kernel(device_span const map_storage, device_span chunks) { auto& chunk = chunks[blockIdx.x]; @@ -251,11 +251,11 @@ CUDF_KERNEL void __launch_bounds__(block_size) if (t == 0) { new (&counter) cuda::atomic{0}; } __syncthreads(); - // Iterate over all windows in the map. + // Iterate over all buckets in the map. for (; t < chunk.dict_map_size; t += block_size) { - auto window = map_storage.data() + chunk.dict_map_offset + t; - // Collect all slots from each window. - for (auto& slot : *window) { + auto bucket = map_storage.data() + chunk.dict_map_offset + t; + // Collect all slots from each bucket. + for (auto& slot : *bucket) { auto const key = slot.first; if (key != KEY_SENTINEL) { auto const loc = counter.fetch_add(1, memory_order_relaxed); @@ -272,7 +272,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) template CUDF_KERNEL void __launch_bounds__(block_size) - get_dictionary_indices_kernel(device_span const map_storage, + get_dictionary_indices_kernel(device_span const map_storage, cudf::detail::device_2dspan frags) { auto const col_idx = blockIdx.y; @@ -302,7 +302,7 @@ CUDF_KERNEL void __launch_bounds__(block_size) s_ck_start_val_idx); } -void populate_chunk_hash_maps(device_span const map_storage, +void populate_chunk_hash_maps(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { @@ -311,7 +311,7 @@ void populate_chunk_hash_maps(device_span const map_storage, <<>>(map_storage, frags); } -void collect_map_entries(device_span const map_storage, +void collect_map_entries(device_span const map_storage, device_span chunks, rmm::cuda_stream_view stream) { @@ -320,7 +320,7 @@ void collect_map_entries(device_span const map_storage, <<>>(map_storage, chunks); } -void get_dictionary_indices(device_span const map_storage, +void get_dictionary_indices(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream) { diff --git a/cpp/src/io/parquet/parquet_gpu.cuh b/cpp/src/io/parquet/parquet_gpu.cuh index 7c09764da2d..800875f7448 100644 --- a/cpp/src/io/parquet/parquet_gpu.cuh +++ b/cpp/src/io/parquet/parquet_gpu.cuh @@ -34,7 +34,7 @@ using slot_type = cuco::pair; auto constexpr map_cg_size = 1; ///< A CUDA Cooperative Group of 1 thread (set for best performance) to handle each subset. ///< Note: Adjust insert and find loops to use `cg::tile` if increasing this. -auto constexpr window_size = +auto constexpr bucket_size = 1; ///< Number of concurrent slots (set for best performance) handled by each thread. auto constexpr occupancy_factor = 1.43f; ///< cuCollections suggests using a hash map of size ///< N * (1/0.7) = 1.43 to target a 70% occupancy factor. @@ -43,12 +43,12 @@ auto constexpr KEY_SENTINEL = key_type{-1}; auto constexpr VALUE_SENTINEL = mapped_type{-1}; auto constexpr SCOPE = cuda::thread_scope_block; -using storage_type = cuco::aow_storage, - cudf::detail::cuco_allocator>; +using storage_type = cuco::bucket_storage, + cudf::detail::cuco_allocator>; using storage_ref_type = typename storage_type::ref_type; -using window_type = typename storage_type::window_type; +using bucket_type = typename storage_type::bucket_type; /** * @brief Return the byte length of parquet dtypes that are physically represented by INT32 @@ -100,7 +100,7 @@ inline size_type __device__ row_to_value_idx(size_type idx, * @param frags Column fragments * @param stream CUDA stream to use */ -void populate_chunk_hash_maps(device_span const map_storage, +void populate_chunk_hash_maps(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); @@ -111,7 +111,7 @@ void populate_chunk_hash_maps(device_span const map_storage, * @param chunks Flat span of chunks to compact hash maps for * @param stream CUDA stream to use */ -void collect_map_entries(device_span const map_storage, +void collect_map_entries(device_span const map_storage, device_span chunks, rmm::cuda_stream_view stream); @@ -128,7 +128,7 @@ void collect_map_entries(device_span const map_storage, * @param frags Column fragments * @param stream CUDA stream to use */ -void get_dictionary_indices(device_span const map_storage, +void get_dictionary_indices(device_span const map_storage, cudf::detail::device_2dspan frags, rmm::cuda_stream_view stream); diff --git a/cpp/src/io/parquet/writer_impl.cu b/cpp/src/io/parquet/writer_impl.cu index 2eb9c49fd88..6b1a20701f9 100644 --- a/cpp/src/io/parquet/writer_impl.cu +++ b/cpp/src/io/parquet/writer_impl.cu @@ -1303,7 +1303,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, } else { chunk.use_dictionary = true; chunk.dict_map_size = - static_cast(cuco::make_window_extent( + static_cast(cuco::make_bucket_extent( static_cast(occupancy_factor * chunk.num_values))); chunk.dict_map_offset = total_map_storage_size; total_map_storage_size += chunk.dict_map_size; @@ -1318,7 +1318,7 @@ build_chunk_dictionaries(hostdevice_2dvector& chunks, total_map_storage_size, cudf::detail::cuco_allocator{rmm::mr::polymorphic_allocator{}, stream}}; // Create a span of non-const map_storage as map_storage_ref takes in a non-const pointer. - device_span const map_storage_data{map_storage.data(), total_map_storage_size}; + device_span const map_storage_data{map_storage.data(), total_map_storage_size}; // Synchronize chunks.host_to_device_async(stream); diff --git a/python/cudf/cudf/io/json.py b/python/cudf/cudf/io/json.py index e0c9e535e6f..39a85465deb 100644 --- a/python/cudf/cudf/io/json.py +++ b/python/cudf/cudf/io/json.py @@ -161,13 +161,15 @@ def read_json( if cudf.get_option("io.json.low_memory") and lines: res_cols, res_col_names, res_child_names = ( plc.io.json.chunked_read_json( - plc.io.SourceInfo(filepaths_or_buffers), - processed_dtypes, - c_compression, - keep_quotes=keep_quotes, - mixed_types_as_string=mixed_types_as_string, - prune_columns=prune_columns, - recovery_mode=c_on_bad_lines, + plc.io.json._setup_json_reader_options( + plc.io.SourceInfo(filepaths_or_buffers), + processed_dtypes, + c_compression, + keep_quotes=keep_quotes, + mixed_types_as_string=mixed_types_as_string, + prune_columns=prune_columns, + recovery_mode=c_on_bad_lines, + ) ) ) df = cudf.DataFrame._from_data( @@ -181,19 +183,23 @@ def read_json( return df else: table_w_meta = plc.io.json.read_json( - plc.io.SourceInfo(filepaths_or_buffers), - processed_dtypes, - c_compression, - lines, - byte_range_offset=byte_range[0] - if byte_range is not None - else 0, - byte_range_size=byte_range[1] if byte_range is not None else 0, - keep_quotes=keep_quotes, - mixed_types_as_string=mixed_types_as_string, - prune_columns=prune_columns, - recovery_mode=c_on_bad_lines, - extra_parameters=kwargs, + plc.io.json._setup_json_reader_options( + plc.io.SourceInfo(filepaths_or_buffers), + processed_dtypes, + c_compression, + lines, + byte_range_offset=byte_range[0] + if byte_range is not None + else 0, + byte_range_size=byte_range[1] + if byte_range is not None + else 0, + keep_quotes=keep_quotes, + mixed_types_as_string=mixed_types_as_string, + prune_columns=prune_columns, + recovery_mode=c_on_bad_lines, + extra_parameters=kwargs, + ) ) df = cudf.DataFrame._from_data( diff --git a/python/cudf_polars/cudf_polars/dsl/ir.py b/python/cudf_polars/cudf_polars/dsl/ir.py index b5af3bb80bf..1c1d4860eec 100644 --- a/python/cudf_polars/cudf_polars/dsl/ir.py +++ b/python/cudf_polars/cudf_polars/dsl/ir.py @@ -604,10 +604,12 @@ def slice_skip(tbl: plc.Table): (name, typ, []) for name, typ in schema.items() ] plc_tbl_w_meta = plc.io.json.read_json( - plc.io.SourceInfo(paths), - lines=True, - dtypes=json_schema, - prune_columns=True, + plc.io.json._setup_json_reader_options( + plc.io.SourceInfo(paths), + lines=True, + dtypes=json_schema, + prune_columns=True, + ) ) # TODO: I don't think cudf-polars supports nested types in general right now # (but when it does, we should pass child column names from nested columns in) diff --git a/python/pylibcudf/pylibcudf/io/json.pxd b/python/pylibcudf/pylibcudf/io/json.pxd index 4894ca3bd6e..7e446298ba9 100644 --- a/python/pylibcudf/pylibcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/io/json.pxd @@ -8,6 +8,8 @@ from pylibcudf.io.types cimport ( ) from pylibcudf.libcudf.io.json cimport ( json_recovery_mode_t, + json_reader_options, + json_reader_options_builder, json_writer_options, json_writer_options_builder, ) @@ -15,19 +17,43 @@ from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table -cpdef TableWithMetadata read_json( - SourceInfo source_info, - list dtypes = *, - compression_type compression = *, - bool lines = *, - size_t byte_range_offset = *, - size_t byte_range_size = *, - bool keep_quotes = *, - bool mixed_types_as_string = *, - bool prune_columns = *, - json_recovery_mode_t recovery_mode = *, - dict extra_parameters = *, -) +cdef class JsonReaderOptions: + cdef json_reader_options c_obj + cdef SourceInfo source + cpdef void set_dtypes(self, list types) + cpdef void enable_keep_quotes(self, bool keep_quotes) + cpdef void enable_mixed_types_as_string(self, bool mixed_types_as_string) + cpdef void enable_prune_columns(self, bool prune_columns) + cpdef void set_byte_range_offset(self, size_t offset) + cpdef void set_byte_range_size(self, size_t size) + cpdef void enable_lines(self, bool val) + # These hidden options are subjected to change without deprecation cycle. + # These are used to test libcudf JSON reader features, not used in cuDF. + cpdef void set_delimiter(self, str val) + cpdef void enable_dayfirst(self, bool val) + cpdef void enable_experimental(self, bool val) + cpdef void enable_normalize_single_quotes(self, bool val) + cpdef void enable_normalize_whitespace(self, bool val) + cpdef void set_strict_validation(self, bool val) + cpdef void allow_unquoted_control_chars(self, bool val) + cpdef void allow_numeric_leading_zeros(self, bool val) + cpdef void allow_nonnumeric_numbers(self, bool val) + cpdef void set_na_values(self, list vals) + +cdef class JsonReaderOptionsBuilder: + cdef json_reader_options_builder c_obj + cdef SourceInfo source + cpdef JsonReaderOptionsBuilder compression(self, compression_type compression) + cpdef JsonReaderOptionsBuilder lines(self, bool val) + cpdef JsonReaderOptionsBuilder keep_quotes(self, bool val) + cpdef JsonReaderOptionsBuilder byte_range_offset(self, size_t byte_range_offset) + cpdef JsonReaderOptionsBuilder byte_range_size(self, size_t byte_range_size) + cpdef JsonReaderOptionsBuilder recovery_mode( + self, json_recovery_mode_t recovery_mode + ) + cpdef build(self) + +cpdef TableWithMetadata read_json(JsonReaderOptions options) cdef class JsonWriterOptions: cdef json_writer_options c_obj @@ -50,12 +76,6 @@ cdef class JsonWriterOptionsBuilder: cpdef void write_json(JsonWriterOptions options) cpdef tuple chunked_read_json( - SourceInfo source_info, - list dtypes = *, - compression_type compression = *, - bool keep_quotes = *, - bool mixed_types_as_string = *, - bool prune_columns = *, - json_recovery_mode_t recovery_mode = *, + JsonReaderOptions options, int chunk_size= *, ) diff --git a/python/pylibcudf/pylibcudf/io/json.pyi b/python/pylibcudf/pylibcudf/io/json.pyi index e0489742cd0..b84b437a3a2 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyi +++ b/python/pylibcudf/pylibcudf/io/json.pyi @@ -19,18 +19,40 @@ ChildNameToTypeMap: TypeAlias = Mapping[str, ChildNameToTypeMap] NameAndType: TypeAlias = tuple[str, DataType, list[NameAndType]] -def read_json( - source_info: SourceInfo, - dtypes: list[NameAndType] | None = None, - compression: CompressionType = CompressionType.AUTO, - lines: bool = False, - byte_range_offset: int = 0, - byte_range_size: int = 0, - keep_quotes: bool = False, - mixed_types_as_string: bool = False, - prune_columns: bool = False, - recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL, -) -> TableWithMetadata: ... +class JsonReaderOptions: + def set_dtypes( + self, types: list[DataType] | list[NameAndType] + ) -> None: ... + def enable_keep_quotes(self, keep_quotes: bool) -> None: ... + def enable_mixed_types_as_string( + self, mixed_types_as_string: bool + ) -> None: ... + def enable_prune_columns(self, prune_columns: bool) -> None: ... + def set_byte_range_offset(self, offset: int) -> None: ... + def set_byte_range_size(self, size: int) -> None: ... + def enable_lines(self, val: bool) -> None: ... + def set_delimiter(self, val: str) -> None: ... + def enable_dayfirst(self, val: bool) -> None: ... + def enable_experimental(self, val: bool) -> None: ... + def enable_normalize_single_quotes(self, val: bool) -> None: ... + def enable_normalize_whitespace(self, val: bool) -> None: ... + def set_strict_validation(self, val: bool) -> None: ... + def allow_unquoted_control_chars(self, val: bool) -> None: ... + def allow_numeric_leading_zeros(self, val: bool) -> None: ... + def allow_nonnumeric_numbers(self, val: bool) -> None: ... + def set_na_values(self, vals: list[str]) -> None: ... + @staticmethod + def builder(source: SourceInfo) -> JsonReaderOptionsBuilder: ... + +class JsonReaderOptionsBuilder: + def compression(self, compression: CompressionType) -> Self: ... + def lines(self, lines: bool) -> Self: ... + def byte_range_offset(self, byte_range_offset: int) -> Self: ... + def byte_range_size(self, byte_range_size: int) -> Self: ... + def recovery_mode(self, recovery_mode: JSONRecoveryMode) -> Self: ... + def build(self) -> JsonReaderOptions: ... + +def read_json(options: JsonReaderOptions) -> TableWithMetadata: ... class JsonWriterOptions: @staticmethod @@ -48,12 +70,6 @@ class JsonWriterOptionsBuilder: def write_json(options: JsonWriterOptions) -> None: ... def chunked_read_json( - source_info: SourceInfo, - dtypes: list[NameAndType] | None = None, - compression: CompressionType = CompressionType.AUTO, - keep_quotes: bool = False, - mixed_types_as_string: bool = False, - prune_columns: bool = False, - recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL, + options: JsonReaderOptions, chunk_size: int = 100_000_000, ) -> tuple[list[Column], list[str], ChildNameToTypeMap]: ... diff --git a/python/pylibcudf/pylibcudf/io/json.pyx b/python/pylibcudf/pylibcudf/io/json.pyx index 16078b31566..1d8a559afad 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyx +++ b/python/pylibcudf/pylibcudf/io/json.pyx @@ -25,6 +25,8 @@ __all__ = [ "chunked_read_json", "read_json", "write_json", + "JsonReaderOptions", + "JsonReaderOptionsBuilder", "JsonWriterOptions", "JsonWriterOptionsBuilder" ] @@ -51,23 +53,21 @@ cdef map[string, schema_element] _generate_schema_map(list dtypes): return schema_map -cdef json_reader_options _setup_json_reader_options( +cpdef JsonReaderOptions _setup_json_reader_options( SourceInfo source_info, list dtypes, - compression_type compression, - bool lines, - size_t byte_range_offset, - size_t byte_range_size, - bool keep_quotes, - bool mixed_types_as_string, - bool prune_columns, - json_recovery_mode_t recovery_mode, - dict extra_parameters=None): - - cdef vector[string] na_vec - cdef vector[data_type] types_vec - cdef json_reader_options opts = ( - json_reader_options.builder(source_info.c_obj) + compression_type compression = compression_type.AUTO, + bool lines = False, + size_t byte_range_offset = 0, + size_t byte_range_size = 0, + bool keep_quotes = False, + bool mixed_types_as_string = False, + bool prune_columns = False, + json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, + dict extra_parameters=None, +): + options = ( + JsonReaderOptions.builder(source_info) .compression(compression) .lines(lines) .byte_range_offset(byte_range_offset) @@ -77,88 +77,359 @@ cdef json_reader_options _setup_json_reader_options( ) if dtypes is not None: - if isinstance(dtypes[0], tuple): - opts.set_dtypes(move(_generate_schema_map(dtypes))) - else: - for dtype in dtypes: - types_vec.push_back((dtype).c_obj) - opts.set_dtypes(types_vec) + options.set_dtypes(dtypes) - opts.enable_keep_quotes(keep_quotes) - opts.enable_mixed_types_as_string(mixed_types_as_string) - opts.enable_prune_columns(prune_columns) + options.enable_keep_quotes(keep_quotes) + options.enable_mixed_types_as_string(mixed_types_as_string) + options.enable_prune_columns(prune_columns) # These hidden options are subjected to change without deprecation cycle. # These are used to test libcudf JSON reader features, not used in cuDF. if extra_parameters is not None: for key, value in extra_parameters.items(): if key == 'delimiter': - opts.set_delimiter(ord(value)) + options.set_delimiter(value) elif key == 'dayfirst': - opts.enable_dayfirst(value) + options.enable_dayfirst(value) elif key == 'experimental': - opts.enable_experimental(value) + options.enable_experimental(value) elif key == 'normalize_single_quotes': - opts.enable_normalize_single_quotes(value) + options.enable_normalize_single_quotes(value) elif key == 'normalize_whitespace': - opts.enable_normalize_whitespace(value) + options.enable_normalize_whitespace(value) elif key == 'strict_validation': - opts.set_strict_validation(value) + options.set_strict_validation(value) elif key == 'allow_unquoted_control_chars': - opts.allow_unquoted_control_chars(value) + options.allow_unquoted_control_chars(value) elif key == 'allow_numeric_leading_zeros': - opts.allow_numeric_leading_zeros(value) + options.allow_numeric_leading_zeros(value) elif key == 'allow_nonnumeric_numbers': - opts.allow_nonnumeric_numbers(value) + options.allow_nonnumeric_numbers(value) elif key == 'na_values': - for na_val in value: - if isinstance(na_val, str): - na_vec.push_back(na_val.encode()) - opts.set_na_values(na_vec) + options.set_na_values(value) else: raise ValueError( "cudf engine doesn't support the " f"'{key}' keyword argument for read_json" ) - return opts + return options + + +cdef class JsonReaderOptions: + """ + The settings to use for ``read_json`` + + For details, see `:cpp:class:`cudf::io::json_reader_options` + """ + @staticmethod + def builder(SourceInfo source): + """ + Create a JsonReaderOptionsBuilder object + + For details, see :cpp:func:`cudf::io::json_reader_options::builder` + + Parameters + ---------- + sink : SourceInfo + The source to read the JSON file from. + + Returns + ------- + JsonReaderOptionsBuilder + Builder to build JsonReaderOptions + """ + cdef JsonReaderOptionsBuilder json_builder = ( + JsonReaderOptionsBuilder.__new__(JsonReaderOptionsBuilder) + ) + json_builder.c_obj = json_reader_options.builder(source.c_obj) + json_builder.source = source + return json_builder + + cpdef void set_dtypes(self, list types): + """ + Set data types for columns to be read. + + Parameters + ---------- + types : list + List of dtypes or a list of tuples of + column names, dtypes, and list of tuples + (to support nested column hierarchy) + + Returns + ------- + None + """ + cdef vector[data_type] types_vec + if isinstance(types[0], tuple): + self.c_obj.set_dtypes(_generate_schema_map(types)) + else: + types_vec.reserve(len(types)) + for dtype in types: + types_vec.push_back((dtype).c_obj) + self.c_obj.set_dtypes(types_vec) + + cpdef void enable_keep_quotes(self, bool keep_quotes): + """ + Set whether the reader should keep quotes of string values. + + Parameters + ---------- + keep_quotes : bool + Boolean value to indicate whether the reader should + keep quotes of string values + + Returns + ------- + None + """ + self.c_obj.enable_keep_quotes(keep_quotes) + + cpdef void enable_mixed_types_as_string(self, bool mixed_types_as_string): + """ + Set whether to parse mixed types as a string column. + Also enables forcing to read a struct as string column using schema. + + Parameters + ---------- + mixed_types_as_string : bool + Boolean value to enable/disable parsing mixed types + as a string column + + Returns + ------- + None + """ + self.c_obj.enable_mixed_types_as_string(mixed_types_as_string) + + cpdef void enable_prune_columns(self, bool prune_columns): + """ + Set whether to prune columns on read, selected + based on the ``set_dtypes`` option. + + Parameters + ---------- + prune_columns : bool + When set as true, if the reader options include + ``set_dtypes``, then the reader will only return those + columns which are mentioned in ``set_dtypes``. If false, + then all columns are returned, independent of the + ``set_dtypes`` setting. + + Returns + ------- + None + """ + self.c_obj.enable_prune_columns(prune_columns) + + cpdef void set_byte_range_offset(self, size_t offset): + """ + Set number of bytes to skip from source start. + + Parameters + ---------- + offset : size_t + Number of bytes of offset + + Returns + ------- + None + """ + self.c_obj.set_byte_range_offset(offset) + + cpdef void set_byte_range_size(self, size_t size): + """ + Set number of bytes to read. + + Parameters + ---------- + size : size_t + Number of bytes to read + + Returns + ------- + None + """ + self.c_obj.set_byte_range_size(size) + + cpdef void enable_lines(self, bool val): + """ + Set whether to read the file as a json object per line. + + Parameters + ---------- + val : bool + Boolean value to enable/disable the option + to read each line as a json object + + Returns + ------- + None + """ + self.c_obj.enable_lines(val) + + # These hidden options are subjected to change without deprecation cycle. + # These are used to test libcudf JSON reader features, not used in cuDF. + + cpdef void set_delimiter(self, str val): + self.c_obj.set_delimiter(val.encode()) + + cpdef void enable_dayfirst(self, bool val): + self.c_obj.enable_dayfirst(val) + + cpdef void enable_experimental(self, bool val): + self.c_obj.enable_experimental(val) + + cpdef void enable_normalize_single_quotes(self, bool val): + self.c_obj.enable_normalize_single_quotes(val) + + cpdef void enable_normalize_whitespace(self, bool val): + self.c_obj.enable_normalize_whitespace(val) + + cpdef void set_strict_validation(self, bool val): + self.c_obj.set_strict_validation(val) + + cpdef void allow_unquoted_control_chars(self, bool val): + self.c_obj.allow_unquoted_control_chars(val) + + cpdef void allow_numeric_leading_zeros(self, bool val): + self.c_obj.allow_numeric_leading_zeros(val) + + cpdef void allow_nonnumeric_numbers(self, bool val): + self.c_obj.allow_nonnumeric_numbers(val) + + cpdef void set_na_values(self, list vals): + cdef vector[string] vec + for val in vals: + if isinstance(val, str): + vec.push_back(val.encode()) + self.c_obj.set_na_values(vec) + + +cdef class JsonReaderOptionsBuilder: + cpdef JsonReaderOptionsBuilder compression(self, compression_type compression): + """ + Sets compression type. + + Parameters + ---------- + compression : CompressionType + The compression type to use + + Returns + ------- + Self + """ + self.c_obj.compression(compression) + return self + + cpdef JsonReaderOptionsBuilder lines(self, bool val): + """ + Set whether to read the file as a json object per line. + + Parameters + ---------- + val : bool + Boolean value to enable/disable the option + to read each line as a json object + + Returns + ------- + Self + """ + self.c_obj.lines(val) + return self + + cpdef JsonReaderOptionsBuilder keep_quotes(self, bool val): + """ + Set whether the reader should keep quotes of string values. + + Parameters + ---------- + val : bool + Boolean value to indicate whether the + reader should keep quotes of string values + + Returns + ------- + Self + """ + self.c_obj.keep_quotes(val) + return self + + cpdef JsonReaderOptionsBuilder byte_range_offset(self, size_t byte_range_offset): + """ + Set number of bytes to skip from source start. + + Parameters + ---------- + byte_range_offset : size_t + Number of bytes of offset + + Returns + ------- + Self + """ + self.c_obj.byte_range_offset(byte_range_offset) + return self + + cpdef JsonReaderOptionsBuilder byte_range_size(self, size_t byte_range_size): + """ + Set number of bytes to read. + + Parameters + ---------- + byte_range_size : size_t + Number of bytes to read + + Returns + ------- + Self + """ + self.c_obj.byte_range_size(byte_range_size) + return self + + cpdef JsonReaderOptionsBuilder recovery_mode( + self, + json_recovery_mode_t recovery_mode + ): + """ + Specifies the JSON reader's behavior on invalid JSON lines. + + Parameters + ---------- + recovery_mode : json_recovery_mode_t + An enum value to indicate the JSON reader's + behavior on invalid JSON lines. + + Returns + ------- + Self + """ + self.c_obj.recovery_mode(recovery_mode) + return self + + cpdef build(self): + """Create a JsonReaderOptions object""" + cdef JsonReaderOptions json_options = JsonReaderOptions.__new__( + JsonReaderOptions + ) + json_options.c_obj = move(self.c_obj.build()) + json_options.source = self.source + return json_options cpdef tuple chunked_read_json( - SourceInfo source_info, - list dtypes = None, - compression_type compression = compression_type.AUTO, - bool keep_quotes = False, - bool mixed_types_as_string = False, - bool prune_columns = False, - json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, + JsonReaderOptions options, int chunk_size=100_000_000, ): - """Reads an JSON file into a :py:class:`~.types.TableWithMetadata`. + """ + Reads chunks of a JSON file into a :py:class:`~.types.TableWithMetadata`. Parameters ---------- - source_info : SourceInfo - The SourceInfo object to read the JSON file from. - dtypes : list, default None - Set data types for the columns in the JSON file. - - Each element of the list has the format - (column_name, column_dtype, list of child dtypes), where - the list of child dtypes is an empty list if the child is not - a nested type (list or struct dtype), and is of format - (column_child_name, column_child_type, list of grandchild dtypes). - compression: CompressionType, default CompressionType.AUTO - The compression format of the JSON source. - keep_quotes : bool, default False - Whether the reader should keep quotes of string values. - mixed_types_as_string : bool, default False - If True, mixed type columns are returned as string columns. - If `False` parsing mixed type columns will thrown an error. - prune_columns : bool, default False - Whether to only read columns specified in dtypes. - recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL - Whether to raise an error or set corresponding values to null - when encountering an invalid JSON line. + options : JsonReaderOptions + Settings for controlling reading behavior chunk_size : int, default 100_000_000 bytes. The number of bytes to be read in chunks. The chunk_size should be set to at least row_size. @@ -171,20 +442,6 @@ cpdef tuple chunked_read_json( cdef size_type c_range_size = ( chunk_size if chunk_size is not None else 0 ) - cdef json_reader_options opts = _setup_json_reader_options( - source_info=source_info, - dtypes=dtypes, - compression=compression, - lines=True, - byte_range_offset=0, - byte_range_size=0, - keep_quotes=keep_quotes, - mixed_types_as_string=mixed_types_as_string, - prune_columns=prune_columns, - recovery_mode=recovery_mode, - ) - - # Read JSON cdef table_with_metadata c_result final_columns = [] @@ -192,12 +449,13 @@ cpdef tuple chunked_read_json( child_names = None i = 0 while True: - opts.set_byte_range_offset(c_range_size * i) - opts.set_byte_range_size(c_range_size) + options.enable_lines(True) + options.set_byte_range_offset(c_range_size * i) + options.set_byte_range_size(c_range_size) try: with nogil: - c_result = move(cpp_read_json(opts)) + c_result = move(cpp_read_json(options.c_obj)) except (ValueError, OverflowError): break if meta_names is None: @@ -225,75 +483,30 @@ cpdef tuple chunked_read_json( cpdef TableWithMetadata read_json( - SourceInfo source_info, - list dtypes = None, - compression_type compression = compression_type.AUTO, - bool lines = False, - size_t byte_range_offset = 0, - size_t byte_range_size = 0, - bool keep_quotes = False, - bool mixed_types_as_string = False, - bool prune_columns = False, - json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, - dict extra_parameters = None, + JsonReaderOptions options ): - """Reads an JSON file into a :py:class:`~.types.TableWithMetadata`. + """ + Read from JSON format. + + The source to read from and options are encapsulated + by the `options` object. + + For details, see :cpp:func:`read_json`. Parameters ---------- - source_info : SourceInfo - The SourceInfo object to read the JSON file from. - dtypes : list, default None - Set data types for the columns in the JSON file. - - Each element of the list has the format - (column_name, column_dtype, list of child dtypes), where - the list of child dtypes is an empty list if the child is not - a nested type (list or struct dtype), and is of format - (column_child_name, column_child_type, list of grandchild dtypes). - compression: CompressionType, default CompressionType.AUTO - The compression format of the JSON source. - byte_range_offset : size_t, default 0 - Number of bytes to skip from source start. - byte_range_size : size_t, default 0 - Number of bytes to read. By default, will read all bytes. - keep_quotes : bool, default False - Whether the reader should keep quotes of string values. - mixed_types_as_string : bool, default False - If True, mixed type columns are returned as string columns. - If `False` parsing mixed type columns will thrown an error. - prune_columns : bool, default False - Whether to only read columns specified in dtypes. - recover_mode : JSONRecoveryMode, default JSONRecoveryMode.FAIL - Whether to raise an error or set corresponding values to null - when encountering an invalid JSON line. - extra_parameters : dict, default None - Additional hidden parameters to pass to the JSON reader. + options: JsonReaderOptions + Settings for controlling reading behavior Returns ------- TableWithMetadata The Table and its corresponding metadata (column names) that were read in. """ - cdef json_reader_options opts = _setup_json_reader_options( - source_info=source_info, - dtypes=dtypes, - compression=compression, - lines=lines, - byte_range_offset=byte_range_offset, - byte_range_size=byte_range_size, - keep_quotes=keep_quotes, - mixed_types_as_string=mixed_types_as_string, - prune_columns=prune_columns, - recovery_mode=recovery_mode, - extra_parameters=extra_parameters, - ) - - # Read JSON cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_json(opts)) + c_result = move(cpp_read_json(options.c_obj)) return TableWithMetadata.from_libcudf(c_result) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_json.py b/python/pylibcudf/pylibcudf/tests/io/test_json.py index 9b0c5a29fe8..747bbfa1370 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_json.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_json.py @@ -167,9 +167,12 @@ def test_read_json_basic( source.seek(0) res = plc.io.json.read_json( - plc.io.SourceInfo([source]), - compression=compression_type, - lines=lines, + ( + plc.io.json.JsonReaderOptions.builder(plc.io.SourceInfo([source])) + .compression(compression_type) + .lines(lines) + .build() + ) ) # Adjustments to correct for the fact orient=records is lossy @@ -243,9 +246,14 @@ def get_child_types(typ): new_schema = pa.schema(new_fields) - res = plc.io.json.read_json( - plc.io.SourceInfo([source]), dtypes=dtypes, lines=True + options = ( + plc.io.json.JsonReaderOptions.builder(plc.io.SourceInfo([source])) + .lines(True) + .build() ) + options.set_dtypes(dtypes) + + res = plc.io.json.read_json(options) new_table = pa_table.cast(new_schema) # orient=records is lossy @@ -269,10 +277,15 @@ def test_read_json_lines_byte_range(source_or_sink, chunk_size): for chunk_start in range(0, len(json_str.encode("utf-8")), chunk_size): tbls_w_meta.append( plc.io.json.read_json( - plc.io.SourceInfo([source]), - lines=True, - byte_range_offset=chunk_start, - byte_range_size=chunk_start + chunk_size, + ( + plc.io.json.JsonReaderOptions.builder( + plc.io.SourceInfo([source]) + ) + .lines(True) + .byte_range_offset(chunk_start) + .byte_range_size(chunk_start + chunk_size) + .build() + ) ) ) @@ -302,7 +315,12 @@ def test_read_json_lines_keep_quotes(keep_quotes, source_or_sink): write_source_str(source, json_bytes) tbl_w_meta = plc.io.json.read_json( - plc.io.SourceInfo([source]), lines=True, keep_quotes=keep_quotes + ( + plc.io.json.JsonReaderOptions.builder(plc.io.SourceInfo([source])) + .lines(True) + .keep_quotes(keep_quotes) + .build() + ) ) template = "{0}" @@ -330,20 +348,19 @@ def test_read_json_lines_recovery_mode(recovery_mode, source_or_sink): json_str = '{"a":1,"b":10}\n{"a":2,"b":11}\nabc\n{"a":3,"b":12}\n' write_source_str(source, json_str) + options = ( + plc.io.json.JsonReaderOptions.builder(plc.io.SourceInfo([source])) + .lines(True) + .recovery_mode(recovery_mode) + .build() + ) + if recovery_mode == plc.io.types.JSONRecoveryMode.FAIL: with pytest.raises(RuntimeError): - plc.io.json.read_json( - plc.io.SourceInfo([source]), - lines=True, - recovery_mode=recovery_mode, - ) + plc.io.json.read_json(options) else: # Recover case (bad values replaced with nulls) - tbl_w_meta = plc.io.json.read_json( - plc.io.SourceInfo([source]), - lines=True, - recovery_mode=recovery_mode, - ) + tbl_w_meta = plc.io.json.read_json(options) exp = pa.Table.from_arrays( [[1, 2, None, 3], [10, 11, None, 12]], names=["a", "b"] )