diff --git a/python/pylibcudf/pylibcudf/io/avro.pxd b/python/pylibcudf/pylibcudf/io/avro.pxd index a0fca95d459..c29e0ff98b2 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pxd +++ b/python/pylibcudf/pylibcudf/io/avro.pxd @@ -4,8 +4,6 @@ from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_ from pylibcudf.libcudf.types cimport size_type -from pylibcudf.libcudf.types cimport size_type - cdef class AvroReaderOptions: cdef avro_reader_options c_obj cdef SourceInfo source diff --git a/python/pylibcudf/pylibcudf/io/csv.pxd b/python/pylibcudf/pylibcudf/io/csv.pxd index 95f3ff4fe45..c4da27ffc05 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pxd +++ b/python/pylibcudf/pylibcudf/io/csv.pxd @@ -18,6 +18,7 @@ from pylibcudf.libcudf.io.types cimport ( table_with_metadata, ) from pylibcudf.libcudf.types cimport size_type +from rmm._cuda.stream cimport Stream cdef class CsvReaderOptions: cdef csv_reader_options c_obj @@ -61,7 +62,7 @@ cdef class CsvReaderOptionsBuilder: cpdef CsvReaderOptionsBuilder dayfirst(self, bool dayfirst) cpdef CsvReaderOptions build(self) -cpdef TableWithMetadata read_csv(CsvReaderOptions options) +cpdef TableWithMetadata read_csv(CsvReaderOptions options, Stream stream = *) cdef class CsvWriterOptions: cdef csv_writer_options c_obj @@ -84,4 +85,4 @@ cdef class CsvWriterOptionsBuilder: cpdef CsvWriterOptions build(self) -cpdef void write_csv(CsvWriterOptions options) +cpdef void write_csv(CsvWriterOptions options, Stream stream = *) diff --git a/python/pylibcudf/pylibcudf/io/csv.pyi b/python/pylibcudf/pylibcudf/io/csv.pyi index 540cbc778ea..24d5cb5fada 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyi +++ b/python/pylibcudf/pylibcudf/io/csv.pyi @@ -1,9 +1,9 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from collections.abc import Mapping - from typing_extensions import Self +from rmm._cuda.stream import Stream + from pylibcudf.io.types import ( CompressionType, QuoteStyle, @@ -56,48 +56,10 @@ class CsvReaderOptionsBuilder: def build(self) -> CsvReaderOptions: ... def read_csv( - source_info: SourceInfo, - *, - compression: CompressionType = CompressionType.AUTO, - byte_range_offset: int = 0, - byte_range_size: int = 0, - col_names: list[str] | None = None, - prefix: str = "", - mangle_dupe_cols: bool = True, - usecols: list[int] | list[str] | None = None, - nrows: int = -1, - skiprows: int = 0, - skipfooter: int = 0, - header: int = 0, - lineterminator: str = "\n", - delimiter: str | None = None, - thousands: str | None = None, - decimal: str = ".", - comment: str | None = None, - delim_whitespace: bool = False, - skipinitialspace: bool = False, - skip_blank_lines: bool = True, - quoting: QuoteStyle = QuoteStyle.MINIMAL, - quotechar: str = '"', - doublequote: bool = True, - parse_dates: list[str] | list[int] | None = None, - parse_hex: list[str] | list[int] | None = None, - # Technically this should be dict/list - # but using a fused type prevents using None as default - dtypes: Mapping[str, DataType] | list[DataType] | None = None, - true_values: list[str] | None = None, - false_values: list[str] | None = None, - na_values: list[str] | None = None, - keep_default_na: bool = True, - na_filter: bool = True, - dayfirst: bool = False, - # Note: These options are supported by the libcudf reader - # but are not exposed here since there is no demand for them - # on the Python side yet. - # detect_whitespace_around_quotes: bool = False, - # timestamp_type: DataType = DataType(type_id.EMPTY), + options: CsvReaderOptions, + stream: Stream = None, ) -> TableWithMetadata: ... -def write_csv(options: CsvWriterOptionsBuilder): ... +def write_csv(options: CsvWriterOptionsBuilder, stream: Stream = None): ... class CsvWriterOptions: def __init__(self): ... diff --git a/python/pylibcudf/pylibcudf/io/csv.pyx b/python/pylibcudf/pylibcudf/io/csv.pyx index efc9bb813a1..8c56190d76d 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyx +++ b/python/pylibcudf/pylibcudf/io/csv.pyx @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.types cimport ( from pylibcudf.libcudf.types cimport data_type, size_type from pylibcudf.types cimport DataType from pylibcudf.table cimport Table +from rmm._cuda.stream cimport Stream __all__ = [ "read_csv", @@ -629,7 +630,8 @@ cdef class CsvReaderOptionsBuilder: cpdef TableWithMetadata read_csv( - CsvReaderOptions options + CsvReaderOptions options, + Stream stream = None, ): """ Read from CSV format. @@ -644,9 +646,11 @@ cpdef TableWithMetadata read_csv( options: CsvReaderOptions Settings for controlling reading behavior """ + if stream is None: + stream = Stream() cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_csv(options.c_obj)) + c_result = move(cpp_read_csv(options.c_obj, stream.view())) cdef TableWithMetadata tbl_meta = TableWithMetadata.from_libcudf(c_result) return tbl_meta @@ -831,7 +835,8 @@ cdef class CsvWriterOptionsBuilder: cpdef void write_csv( - CsvWriterOptions options + CsvWriterOptions options, + Stream stream = None, ): """ Write to CSV format. @@ -846,6 +851,7 @@ cpdef void write_csv( options: CsvWriterOptions Settings for controlling writing behavior """ - + if stream is None: + stream = Stream() with nogil: - cpp_write_csv(move(options.c_obj)) + cpp_write_csv(move(options.c_obj), stream.view()) diff --git a/python/pylibcudf/pylibcudf/io/json.pxd b/python/pylibcudf/pylibcudf/io/json.pxd index 7ce3cb859a5..93481cb9c1c 100644 --- a/python/pylibcudf/pylibcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/io/json.pxd @@ -14,6 +14,7 @@ from pylibcudf.libcudf.io.json cimport ( json_writer_options_builder, ) from pylibcudf.libcudf.types cimport size_type +from rmm._cuda.stream cimport Stream from pylibcudf.table cimport Table @@ -75,9 +76,10 @@ cdef class JsonWriterOptionsBuilder: cpdef JsonWriterOptionsBuilder compression(self, compression_type comptype) cpdef JsonWriterOptions build(self) -cpdef void write_json(JsonWriterOptions options) +cpdef void write_json(JsonWriterOptions options, Stream stream = *) cpdef tuple chunked_read_json( JsonReaderOptions options, int chunk_size= *, + Stream stream = *, ) diff --git a/python/pylibcudf/pylibcudf/io/json.pyi b/python/pylibcudf/pylibcudf/io/json.pyi index db4546f138d..c8e16e13baf 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyi +++ b/python/pylibcudf/pylibcudf/io/json.pyi @@ -4,6 +4,8 @@ from typing import TypeAlias from typing_extensions import Self +from rmm._cuda.stream import Stream + from pylibcudf.column import Column from pylibcudf.io.types import ( CompressionType, @@ -70,8 +72,9 @@ class JsonWriterOptionsBuilder: def compression(self, comptype: CompressionType) -> Self: ... def build(self) -> JsonWriterOptions: ... -def write_json(options: JsonWriterOptions) -> None: ... +def write_json(options: JsonWriterOptions, stream: Stream = None) -> None: ... def chunked_read_json( options: JsonReaderOptions, chunk_size: int = 100_000_000, + stream: Stream = None, ) -> tuple[list[Column], list[str], ChildNameToTypeMap]: ... diff --git a/python/pylibcudf/pylibcudf/io/json.pyx b/python/pylibcudf/pylibcudf/io/json.pyx index cf286378902..b999503083b 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyx +++ b/python/pylibcudf/pylibcudf/io/json.pyx @@ -20,6 +20,8 @@ from pylibcudf.libcudf.io.types cimport ( ) from pylibcudf.libcudf.types cimport data_type, size_type from pylibcudf.types cimport DataType +from rmm._cuda.stream cimport Stream + __all__ = [ "chunked_read_json", @@ -422,6 +424,7 @@ cdef class JsonReaderOptionsBuilder: cpdef tuple chunked_read_json( JsonReaderOptions options, int chunk_size=100_000_000, + Stream stream = None, ): """ Reads chunks of a JSON file into a :py:class:`~.types.TableWithMetadata`. @@ -442,6 +445,9 @@ cpdef tuple chunked_read_json( cdef size_type c_range_size = ( chunk_size if chunk_size is not None else 0 ) + if stream is None: + stream = Stream() + cdef table_with_metadata c_result final_columns = [] @@ -455,7 +461,7 @@ cpdef tuple chunked_read_json( try: with nogil: - c_result = move(cpp_read_json(options.c_obj)) + c_result = move(cpp_read_json(options.c_obj, stream.view())) except (ValueError, OverflowError): break if meta_names is None: @@ -483,7 +489,8 @@ cpdef tuple chunked_read_json( cpdef TableWithMetadata read_json( - JsonReaderOptions options + JsonReaderOptions options, + Stream stream = None, ): """ Read from JSON format. @@ -503,10 +510,13 @@ cpdef TableWithMetadata read_json( TableWithMetadata The Table and its corresponding metadata (column names) that were read in. """ + if stream is None: + stream = Stream() + cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_json(options.c_obj)) + c_result = move(cpp_read_json(options.c_obj, stream.view())) return TableWithMetadata.from_libcudf(c_result) @@ -694,7 +704,7 @@ cdef class JsonWriterOptionsBuilder: return json_options -cpdef void write_json(JsonWriterOptions options): +cpdef void write_json(JsonWriterOptions options, Stream stream = None): """ Writes a set of columns to JSON format. @@ -707,5 +717,7 @@ cpdef void write_json(JsonWriterOptions options): ------- None """ + if stream is None: + stream = Stream() with nogil: - cpp_write_json(options.c_obj) + cpp_write_json(options.c_obj, stream.view()) diff --git a/python/pylibcudf/pylibcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/io/orc.pxd index 7531608519c..4ef1941a6f5 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/io/orc.pxd @@ -33,6 +33,8 @@ from pylibcudf.libcudf.io.types cimport ( compression_type, statistics_freq, ) +from rmm._cuda.stream cimport Stream + cdef class OrcReaderOptions: cdef orc_reader_options c_obj @@ -93,7 +95,7 @@ cdef class OrcWriterOptionsBuilder: cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta) cpdef OrcWriterOptions build(self) -cpdef void write_orc(OrcWriterOptions options) +cpdef void write_orc(OrcWriterOptions options, Stream stream = *) cdef class OrcChunkedWriter: cdef unique_ptr[orc_chunked_writer] c_obj diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index c125d7e76fa..477d10bf9dc 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -39,6 +39,8 @@ from pylibcudf.libcudf.io.orc cimport ( orc_writer_options, chunked_orc_writer_options, ) +from rmm._cuda.stream cimport Stream + __all__ = [ "OrcColumnStatistics", @@ -405,7 +407,7 @@ cdef class OrcReaderOptionsBuilder: return orc_options -cpdef TableWithMetadata read_orc(OrcReaderOptions options): +cpdef TableWithMetadata read_orc(OrcReaderOptions options, Stream stream = None): """ Read from ORC format. @@ -419,10 +421,12 @@ cpdef TableWithMetadata read_orc(OrcReaderOptions options): options: OrcReaderOptions Settings for controlling reading behavior """ + if stream is None: + stream = Stream() cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_orc(options.c_obj)) + c_result = move(cpp_read_orc(options.c_obj, stream.view())) return TableWithMetadata.from_libcudf(c_result) @@ -604,7 +608,7 @@ cdef class OrcWriterOptionsBuilder: return orc_options -cpdef void write_orc(OrcWriterOptions options): +cpdef void write_orc(OrcWriterOptions options, Stream stream = None): """ Write to ORC format. @@ -622,8 +626,10 @@ cpdef void write_orc(OrcWriterOptions options): ------- None """ + if stream is None: + stream = Stream() with nogil: - cpp_write_orc(move(options.c_obj)) + cpp_write_orc(move(options.c_obj), stream.view()) cdef class OrcChunkedWriter: @@ -655,7 +661,7 @@ cdef class OrcChunkedWriter: self.c_obj.get()[0].write(table.view()) @staticmethod - def from_options(ChunkedOrcWriterOptions options): + def from_options(ChunkedOrcWriterOptions options, Stream stream = None): """ Creates a chunked ORC writer from options @@ -668,10 +674,12 @@ cdef class OrcChunkedWriter: ------- OrcChunkedWriter """ + if stream is None: + stream = Stream() cdef OrcChunkedWriter orc_writer = OrcChunkedWriter.__new__( OrcChunkedWriter ) - orc_writer.c_obj.reset(new orc_chunked_writer(options.c_obj)) + orc_writer.c_obj.reset(new orc_chunked_writer(options.c_obj, stream.view())) return orc_writer diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 84f47cf5305..ab168189032 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -27,6 +27,7 @@ from pylibcudf.libcudf.io.parquet cimport ( from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table from pylibcudf.types cimport DataType +from rmm._cuda.stream cimport Stream cdef class ParquetReaderOptions: @@ -55,7 +56,7 @@ cdef class ChunkedParquetReader: cpdef TableWithMetadata read_chunk(self) -cpdef read_parquet(ParquetReaderOptions options) +cpdef read_parquet(ParquetReaderOptions options, Stream stream = *) cdef class ParquetChunkedWriter: @@ -145,6 +146,6 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptions build(self) -cpdef memoryview write_parquet(ParquetWriterOptions options) +cpdef memoryview write_parquet(ParquetWriterOptions options, Stream stream = *) cpdef memoryview merge_row_group_metadata(list metdata_list) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index 2d8d12c1a45..d0f3c2451b8 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -4,6 +4,8 @@ from collections.abc import Mapping from typing_extensions import Self +from rmm._cuda.stream import Stream + from pylibcudf.expressions import Expression from pylibcudf.io.types import ( CompressionType, @@ -53,18 +55,8 @@ class ChunkedParquetReader: def read_chunk(self) -> TableWithMetadata: ... def read_parquet( - source_info: SourceInfo, - columns: list[str] | None = None, - row_groups: list[list[int]] | None = None, - filters: Expression | None = None, - convert_strings_to_categories: bool = False, - use_pandas_metadata: bool = True, - skip_rows: int = 0, - nrows: int = -1, - allow_mismatched_pq_schemas: bool = False, - # disabled see comment in parquet.pyx for more - # reader_column_schema: ReaderColumnSchema = *, - # timestamp_type: DataType = * + options: ParquetReaderOptions, + stream: Stream = None, ) -> TableWithMetadata: ... class ParquetWriterOptions: @@ -96,14 +88,18 @@ class ParquetWriterOptionsBuilder: def write_arrow_schema(self, enabled: bool) -> Self: ... def build(self) -> ParquetWriterOptions: ... -def write_parquet(options: ParquetWriterOptions) -> memoryview: ... +def write_parquet( + options: ParquetWriterOptions, stream: Stream = None +) -> memoryview: ... class ParquetChunkedWriter: def __init__(self): ... def close(self, metadata_file_path: list) -> memoryview: ... def write(self, table: Table) -> None: ... @staticmethod - def from_options(options: ChunkedParquetWriterOptions) -> Self: ... + def from_options( + options: ChunkedParquetWriterOptions, stream: Stream = None + ) -> Self: ... class ChunkedParquetWriterOptions: def __init__(self): ... diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 672fe2be847..7d100769beb 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -35,6 +35,7 @@ from pylibcudf.libcudf.io.types cimport ( ) from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table +from rmm._cuda.stream cimport Stream __all__ = [ "ChunkedParquetReader", @@ -306,7 +307,7 @@ cdef class ChunkedParquetReader: return TableWithMetadata.from_libcudf(c_result) -cpdef read_parquet(ParquetReaderOptions options): +cpdef read_parquet(ParquetReaderOptions options, Stream stream = None): """ Read from Parquet format. @@ -320,8 +321,10 @@ cpdef read_parquet(ParquetReaderOptions options): options: ParquetReaderOptions Settings for controlling reading behavior """ + if stream is None: + stream = Stream() with nogil: - c_result = move(cpp_read_parquet(options.c_obj)) + c_result = move(cpp_read_parquet(options.c_obj, stream.view())) return TableWithMetadata.from_libcudf(c_result) @@ -378,7 +381,7 @@ cdef class ParquetChunkedWriter: self.c_obj.get()[0].write(table.view(), partitions) @staticmethod - def from_options(ChunkedParquetWriterOptions options): + def from_options(ChunkedParquetWriterOptions options, Stream stream = None): """ Creates a chunked Parquet writer from options @@ -391,10 +394,16 @@ cdef class ParquetChunkedWriter: ------- ParquetChunkedWriter """ + if stream is None: + stream = Stream() cdef ParquetChunkedWriter parquet_writer = ParquetChunkedWriter.__new__( ParquetChunkedWriter ) - parquet_writer.c_obj.reset(new cpp_parquet_chunked_writer(options.c_obj)) + parquet_writer.c_obj.reset( + new cpp_parquet_chunked_writer( + options.c_obj, stream.view() + ) + ) return parquet_writer @@ -916,7 +925,7 @@ cdef class ParquetWriterOptionsBuilder: return parquet_options -cpdef memoryview write_parquet(ParquetWriterOptions options): +cpdef memoryview write_parquet(ParquetWriterOptions options, Stream stream = None): """ Writes a set of columns to parquet format. @@ -932,10 +941,12 @@ cpdef memoryview write_parquet(ParquetWriterOptions options): (parquet FileMetadata thrift message) if requested in parquet_writer_options (empty blob otherwise). """ + if stream is None: + stream = Stream() cdef unique_ptr[vector[uint8_t]] c_result with nogil: - c_result = cpp_write_parquet(move(options.c_obj)) + c_result = cpp_write_parquet(move(options.c_obj), stream.view()) return memoryview(HostBuffer.from_unique_ptr(move(c_result))) diff --git a/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd b/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd index cac55640ac9..9e571609d9c 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd @@ -4,6 +4,7 @@ from libcpp.string cimport string from libcpp.vector cimport vector from pylibcudf.exception_handler cimport libcudf_exception_handler from pylibcudf.libcudf.types cimport size_type +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/avro.hpp" \ @@ -45,5 +46,5 @@ cdef extern from "cudf/io/avro.hpp" \ avro_reader_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata read_avro( - avro_reader_options &options + avro_reader_options &options, ) except +libcudf_exception_handler diff --git a/python/pylibcudf/pylibcudf/libcudf/io/csv.pxd b/python/pylibcudf/pylibcudf/libcudf/io/csv.pxd index 7ca158016a2..ae1dcd64174 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/csv.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/csv.pxd @@ -9,7 +9,7 @@ from libcpp.string cimport string from libcpp.vector cimport vector from pylibcudf.exception_handler cimport libcudf_exception_handler from pylibcudf.libcudf.types cimport data_type, size_type - +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/csv.hpp" \ namespace "cudf::io" nogil: @@ -259,7 +259,8 @@ cdef extern from "cudf/io/csv.hpp" \ csv_reader_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata read_csv( - csv_reader_options &options + csv_reader_options &options, + cuda_stream_view stream, ) except +libcudf_exception_handler cdef cppclass csv_writer_options: @@ -330,4 +331,7 @@ cdef extern from "cudf/io/csv.hpp" \ csv_writer_options build() except +libcudf_exception_handler - cdef void write_csv(csv_writer_options args) except +libcudf_exception_handler + cdef void write_csv( + csv_writer_options args, + cuda_stream_view stream, + ) except +libcudf_exception_handler diff --git a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd index d23dd0685d1..a836538e990 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd @@ -10,6 +10,7 @@ from libcpp.string cimport string from libcpp.vector cimport vector from pylibcudf.exception_handler cimport libcudf_exception_handler from pylibcudf.libcudf.types cimport data_type, size_type +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/json.hpp" \ @@ -154,7 +155,9 @@ cdef extern from "cudf/io/json.hpp" \ json_reader_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata read_json( - json_reader_options &options) except +libcudf_exception_handler + json_reader_options &options, + cuda_stream_view stream, + ) except +libcudf_exception_handler cdef cppclass json_writer_options: json_writer_options() except +libcudf_exception_handler @@ -230,4 +233,6 @@ cdef extern from "cudf/io/json.hpp" \ json_writer_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata write_json( - json_writer_options &options) except +libcudf_exception_handler + json_writer_options &options, + cuda_stream_view stream, + ) except +libcudf_exception_handler diff --git a/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd index f5485da1d51..4915565956b 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd @@ -10,6 +10,7 @@ from libcpp.string cimport string from libcpp.vector cimport vector from pylibcudf.exception_handler cimport libcudf_exception_handler from pylibcudf.libcudf.types cimport data_type, size_type +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/orc.hpp" \ @@ -76,7 +77,8 @@ cdef extern from "cudf/io/orc.hpp" \ orc_reader_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata read_orc( - orc_reader_options opts + orc_reader_options opts, + cuda_stream_view stream, ) except +libcudf_exception_handler cdef cppclass orc_writer_options: @@ -144,7 +146,8 @@ cdef extern from "cudf/io/orc.hpp" \ orc_writer_options build() except +libcudf_exception_handler cdef void write_orc( - orc_writer_options options + orc_writer_options options, + cuda_stream_view stream, ) except +libcudf_exception_handler cdef cppclass chunked_orc_writer_options: @@ -213,7 +216,8 @@ cdef extern from "cudf/io/orc.hpp" \ cdef cppclass orc_chunked_writer: orc_chunked_writer() except +libcudf_exception_handler orc_chunked_writer( - chunked_orc_writer_options args + chunked_orc_writer_options args, + cuda_stream_view stream, ) except +libcudf_exception_handler orc_chunked_writer& write( cudf_table_view.table_view table_, diff --git a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd index e03fe7e921e..0697bf8a65e 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd @@ -21,6 +21,7 @@ from pylibcudf.libcudf.io.types cimport ( ) from pylibcudf.libcudf.table.table_view cimport table_view from pylibcudf.libcudf.types cimport data_type, size_type +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: @@ -87,7 +88,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_reader_options build() except +libcudf_exception_handler cdef table_with_metadata read_parquet( - parquet_reader_options args) except +libcudf_exception_handler + parquet_reader_options args, + cuda_stream_view stream, + ) except +libcudf_exception_handler cdef cppclass parquet_writer_options_base: parquet_writer_options_base() except +libcudf_exception_handler @@ -212,7 +215,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: ) except +libcudf_exception_handler cdef unique_ptr[vector[uint8_t]] write_parquet( - parquet_writer_options options + parquet_writer_options options, + cuda_stream_view stream, ) except +libcudf_exception_handler cdef cppclass chunked_parquet_writer_options(parquet_writer_options_base): @@ -235,7 +239,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cdef cppclass parquet_chunked_writer: parquet_chunked_writer() except +libcudf_exception_handler parquet_chunked_writer( - chunked_parquet_writer_options args + chunked_parquet_writer_options args, + cuda_stream_view stream, ) except +libcudf_exception_handler parquet_chunked_writer& write( table_view table_, diff --git a/python/pylibcudf/pylibcudf/tests/io/test_csv.py b/python/pylibcudf/pylibcudf/tests/io/test_csv.py index 555ca2fb02c..d1e2f42ffb5 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_csv.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_csv.py @@ -14,6 +14,8 @@ write_source_str, ) +from rmm._cuda.stream import Stream + import pylibcudf as plc from pylibcudf.io.types import CompressionType @@ -44,6 +46,16 @@ def csv_table_data(table_data): return plc.interop.from_arrow(pa_table), pa_table +@pytest.fixture(scope="module") +def simple_csv_table_data(): + return [ + "1,2,3,4_4,'z'", + '4,5,6,5_5,""', + "7,8,9,9_87,'123'", + "1,1,1,10_11,abc", + ] + + @pytest.mark.parametrize("delimiter", [",", ";"]) def test_read_csv_basic( csv_table_data, @@ -389,3 +401,21 @@ def test_write_csv_na_rep(na_rep): pd_result = pa_tbl.to_pandas().to_csv(na_rep=na_rep, index=False) assert str_result == pd_result + + +@pytest.mark.parametrize("stream", [None, Stream()]) +def test_read_csv_with_default_stream( + source_or_sink, simple_csv_table_data, stream +): + buffer = "\n".join(simple_csv_table_data) + + write_source_str(source_or_sink, buffer) + + options = plc.io.csv.CsvReaderOptions.builder( + plc.io.SourceInfo([source_or_sink]) + ).build() + plc_table_w_meta = plc.io.csv.read_csv(options, stream) + df = pd.read_csv( + StringIO(buffer), + ) + assert_table_and_meta_eq(pa.Table.from_pandas(df), plc_table_w_meta)