From e27cad231c106154d9bcf4d43be9b5d3dafab53f Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 18 Dec 2024 06:35:38 +0000 Subject: [PATCH 01/18] Add stream parameters in pylibcudf IO APIs --- python/pylibcudf/pylibcudf/io/csv.pxd | 3 +- python/pylibcudf/pylibcudf/io/csv.pyi | 43 ++------------------------- python/pylibcudf/pylibcudf/io/csv.pyx | 6 ++-- 3 files changed, 9 insertions(+), 43 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/csv.pxd b/python/pylibcudf/pylibcudf/io/csv.pxd index 95f3ff4fe45..ad8e6e39595 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pxd +++ b/python/pylibcudf/pylibcudf/io/csv.pxd @@ -18,6 +18,7 @@ from pylibcudf.libcudf.io.types cimport ( table_with_metadata, ) from pylibcudf.libcudf.types cimport size_type +from rmm._cuda.stream import Stream cdef class CsvReaderOptions: cdef csv_reader_options c_obj @@ -61,7 +62,7 @@ cdef class CsvReaderOptionsBuilder: cpdef CsvReaderOptionsBuilder dayfirst(self, bool dayfirst) cpdef CsvReaderOptions build(self) -cpdef TableWithMetadata read_csv(CsvReaderOptions options) +cpdef TableWithMetadata read_csv(CsvReaderOptions options, Stream stream) cdef class CsvWriterOptions: cdef csv_writer_options c_obj diff --git a/python/pylibcudf/pylibcudf/io/csv.pyi b/python/pylibcudf/pylibcudf/io/csv.pyi index 540cbc778ea..0f95f45d136 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyi +++ b/python/pylibcudf/pylibcudf/io/csv.pyi @@ -13,6 +13,7 @@ from pylibcudf.io.types import ( ) from pylibcudf.table import Table from pylibcudf.types import DataType +from rmm._cuda.stream import Stream class CsvReaderOptions: def __init__(self): ... @@ -56,46 +57,8 @@ class CsvReaderOptionsBuilder: def build(self) -> CsvReaderOptions: ... def read_csv( - source_info: SourceInfo, - *, - compression: CompressionType = CompressionType.AUTO, - byte_range_offset: int = 0, - byte_range_size: int = 0, - col_names: list[str] | None = None, - prefix: str = "", - mangle_dupe_cols: bool = True, - usecols: list[int] | list[str] | None = None, - nrows: int = -1, - skiprows: int = 0, - skipfooter: int = 0, - header: int = 0, - lineterminator: str = "\n", - delimiter: str | None = None, - thousands: str | None = None, - decimal: str = ".", - comment: str | None = None, - delim_whitespace: bool = False, - skipinitialspace: bool = False, - skip_blank_lines: bool = True, - quoting: QuoteStyle = QuoteStyle.MINIMAL, - quotechar: str = '"', - doublequote: bool = True, - parse_dates: list[str] | list[int] | None = None, - parse_hex: list[str] | list[int] | None = None, - # Technically this should be dict/list - # but using a fused type prevents using None as default - dtypes: Mapping[str, DataType] | list[DataType] | None = None, - true_values: list[str] | None = None, - false_values: list[str] | None = None, - na_values: list[str] | None = None, - keep_default_na: bool = True, - na_filter: bool = True, - dayfirst: bool = False, - # Note: These options are supported by the libcudf reader - # but are not exposed here since there is no demand for them - # on the Python side yet. - # detect_whitespace_around_quotes: bool = False, - # timestamp_type: DataType = DataType(type_id.EMPTY), + options: CsvReaderOptions, + stream: Stream, ) -> TableWithMetadata: ... def write_csv(options: CsvWriterOptionsBuilder): ... diff --git a/python/pylibcudf/pylibcudf/io/csv.pyx b/python/pylibcudf/pylibcudf/io/csv.pyx index efc9bb813a1..5cc48419631 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyx +++ b/python/pylibcudf/pylibcudf/io/csv.pyx @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.types cimport ( from pylibcudf.libcudf.types cimport data_type, size_type from pylibcudf.types cimport DataType from pylibcudf.table cimport Table +from rmm._cuda.stream import Stream __all__ = [ "read_csv", @@ -629,7 +630,8 @@ cdef class CsvReaderOptionsBuilder: cpdef TableWithMetadata read_csv( - CsvReaderOptions options + CsvReaderOptions options, + Stream stream, ): """ Read from CSV format. @@ -646,7 +648,7 @@ cpdef TableWithMetadata read_csv( """ cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_csv(options.c_obj)) + c_result = move(cpp_read_csv(options.c_obj), stream.view()) cdef TableWithMetadata tbl_meta = TableWithMetadata.from_libcudf(c_result) return tbl_meta From a72988f2d8740dbeb414ddacd1ffc1c95e790095 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 18 Dec 2024 14:13:32 +0000 Subject: [PATCH 02/18] add remaining streams --- python/pylibcudf/pylibcudf/io/avro.pxd | 4 ++-- python/pylibcudf/pylibcudf/io/avro.pyi | 3 ++- python/pylibcudf/pylibcudf/io/avro.pyx | 8 +++++-- python/pylibcudf/pylibcudf/io/csv.pxd | 6 ++--- python/pylibcudf/pylibcudf/io/csv.pyi | 4 ++-- python/pylibcudf/pylibcudf/io/csv.pyx | 14 +++++++----- python/pylibcudf/pylibcudf/io/json.pxd | 6 ++++- python/pylibcudf/pylibcudf/io/json.pyi | 6 +++++ python/pylibcudf/pylibcudf/io/json.pyx | 22 +++++++++++++------ python/pylibcudf/pylibcudf/io/orc.pxd | 7 ++++-- python/pylibcudf/pylibcudf/io/orc.pyi | 2 ++ python/pylibcudf/pylibcudf/io/orc.pyx | 20 ++++++++++++----- python/pylibcudf/pylibcudf/io/parquet.pxd | 5 +++-- python/pylibcudf/pylibcudf/io/parquet.pyi | 19 +++++----------- python/pylibcudf/pylibcudf/io/parquet.pyx | 17 +++++++++----- .../pylibcudf/pylibcudf/libcudf/io/avro.pxd | 4 +++- python/pylibcudf/pylibcudf/libcudf/io/csv.pxd | 10 ++++++--- .../pylibcudf/pylibcudf/libcudf/io/json.pxd | 9 ++++++-- python/pylibcudf/pylibcudf/libcudf/io/orc.pxd | 7 ++++-- .../pylibcudf/libcudf/io/parquet.pxd | 8 +++++-- 20 files changed, 119 insertions(+), 62 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/avro.pxd b/python/pylibcudf/pylibcudf/io/avro.pxd index a0fca95d459..5036ef04533 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pxd +++ b/python/pylibcudf/pylibcudf/io/avro.pxd @@ -2,7 +2,7 @@ from pylibcudf.io.types cimport SourceInfo, TableWithMetadata from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_builder from pylibcudf.libcudf.types cimport size_type - +from rmm._cuda.stream cimport Stream from pylibcudf.libcudf.types cimport size_type @@ -20,4 +20,4 @@ cdef class AvroReaderOptionsBuilder: cpdef AvroReaderOptionsBuilder num_rows(self, size_type num_rows) cpdef AvroReaderOptions build(self) -cpdef TableWithMetadata read_avro(AvroReaderOptions options) +cpdef TableWithMetadata read_avro(AvroReaderOptions options, Stream stream = *) diff --git a/python/pylibcudf/pylibcudf/io/avro.pyi b/python/pylibcudf/pylibcudf/io/avro.pyi index 8cafc9a6573..281cf38889f 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyi +++ b/python/pylibcudf/pylibcudf/io/avro.pyi @@ -1,5 +1,6 @@ # Copyright (c) 2024, NVIDIA CORPORATION. from pylibcudf.io.types import SourceInfo, TableWithMetadata +from rmm._cuda.stream import Stream __all__ = ["AvroReaderOptions", "AvroReaderOptionsBuilder", "read_avro"] @@ -13,4 +14,4 @@ class AvroReaderOptionsBuilder: def num_rows(num_rows: int) -> AvroReaderOptionsBuilder: ... def build(self) -> AvroReaderOptions: ... -def read_avro(options: AvroReaderOptions) -> TableWithMetadata: ... +def read_avro(options: AvroReaderOptions, stream: stream = None) -> TableWithMetadata: ... diff --git a/python/pylibcudf/pylibcudf/io/avro.pyx b/python/pylibcudf/pylibcudf/io/avro.pyx index c378fca0415..e5e1e9755fd 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyx +++ b/python/pylibcudf/pylibcudf/io/avro.pyx @@ -9,6 +9,7 @@ from pylibcudf.libcudf.io.avro cimport ( read_avro as cpp_read_avro, ) from pylibcudf.libcudf.types cimport size_type +from rmm._cuda.stream cimport Stream __all__ = ["read_avro", "AvroReaderOptions", "AvroReaderOptionsBuilder"] @@ -126,7 +127,8 @@ cdef class AvroReaderOptionsBuilder: cpdef TableWithMetadata read_avro( - AvroReaderOptions options + AvroReaderOptions options, + Stream stream = None, ): """ Read from Avro format. @@ -141,7 +143,9 @@ cpdef TableWithMetadata read_avro( options: AvroReaderOptions Settings for controlling reading behavior """ + if stream is None: + stream = Stream() with nogil: - c_result = move(cpp_read_avro(options.c_obj)) + c_result = move(cpp_read_avro(options.c_obj, stream.view())) return TableWithMetadata.from_libcudf(c_result) diff --git a/python/pylibcudf/pylibcudf/io/csv.pxd b/python/pylibcudf/pylibcudf/io/csv.pxd index ad8e6e39595..c4da27ffc05 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pxd +++ b/python/pylibcudf/pylibcudf/io/csv.pxd @@ -18,7 +18,7 @@ from pylibcudf.libcudf.io.types cimport ( table_with_metadata, ) from pylibcudf.libcudf.types cimport size_type -from rmm._cuda.stream import Stream +from rmm._cuda.stream cimport Stream cdef class CsvReaderOptions: cdef csv_reader_options c_obj @@ -62,7 +62,7 @@ cdef class CsvReaderOptionsBuilder: cpdef CsvReaderOptionsBuilder dayfirst(self, bool dayfirst) cpdef CsvReaderOptions build(self) -cpdef TableWithMetadata read_csv(CsvReaderOptions options, Stream stream) +cpdef TableWithMetadata read_csv(CsvReaderOptions options, Stream stream = *) cdef class CsvWriterOptions: cdef csv_writer_options c_obj @@ -85,4 +85,4 @@ cdef class CsvWriterOptionsBuilder: cpdef CsvWriterOptions build(self) -cpdef void write_csv(CsvWriterOptions options) +cpdef void write_csv(CsvWriterOptions options, Stream stream = *) diff --git a/python/pylibcudf/pylibcudf/io/csv.pyi b/python/pylibcudf/pylibcudf/io/csv.pyi index 0f95f45d136..ec4a6dd3115 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyi +++ b/python/pylibcudf/pylibcudf/io/csv.pyi @@ -58,9 +58,9 @@ class CsvReaderOptionsBuilder: def read_csv( options: CsvReaderOptions, - stream: Stream, + stream: stream = None, ) -> TableWithMetadata: ... -def write_csv(options: CsvWriterOptionsBuilder): ... +def write_csv(options: CsvWriterOptionsBuilder, stream: stream = None): ... class CsvWriterOptions: def __init__(self): ... diff --git a/python/pylibcudf/pylibcudf/io/csv.pyx b/python/pylibcudf/pylibcudf/io/csv.pyx index 5cc48419631..0c07e4ac037 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyx +++ b/python/pylibcudf/pylibcudf/io/csv.pyx @@ -22,7 +22,7 @@ from pylibcudf.libcudf.io.types cimport ( from pylibcudf.libcudf.types cimport data_type, size_type from pylibcudf.types cimport DataType from pylibcudf.table cimport Table -from rmm._cuda.stream import Stream +from rmm._cuda.stream cimport Stream __all__ = [ "read_csv", @@ -631,7 +631,7 @@ cdef class CsvReaderOptionsBuilder: cpdef TableWithMetadata read_csv( CsvReaderOptions options, - Stream stream, + Stream stream = None, ): """ Read from CSV format. @@ -646,6 +646,8 @@ cpdef TableWithMetadata read_csv( options: CsvReaderOptions Settings for controlling reading behavior """ + if stream is None: + stream = Stream() cdef table_with_metadata c_result with nogil: c_result = move(cpp_read_csv(options.c_obj), stream.view()) @@ -833,7 +835,8 @@ cdef class CsvWriterOptionsBuilder: cpdef void write_csv( - CsvWriterOptions options + CsvWriterOptions options, + Stream stream = None, ): """ Write to CSV format. @@ -848,6 +851,7 @@ cpdef void write_csv( options: CsvWriterOptions Settings for controlling writing behavior """ - + if stream is None: + stream = Stream() with nogil: - cpp_write_csv(move(options.c_obj)) + cpp_write_csv(move(options.c_obj), stream.view()) diff --git a/python/pylibcudf/pylibcudf/io/json.pxd b/python/pylibcudf/pylibcudf/io/json.pxd index d7726971351..175ab42592e 100644 --- a/python/pylibcudf/pylibcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/io/json.pxd @@ -8,6 +8,7 @@ from pylibcudf.io.types cimport ( ) from pylibcudf.libcudf.io.json cimport json_recovery_mode_t from pylibcudf.libcudf.types cimport size_type +from rmm._cuda.stream cimport Stream cpdef TableWithMetadata read_json( @@ -22,6 +23,7 @@ cpdef TableWithMetadata read_json( bool prune_columns = *, json_recovery_mode_t recovery_mode = *, dict extra_parameters = *, + Stream stream = *, ) @@ -33,7 +35,8 @@ cpdef void write_json( bool lines = *, size_type rows_per_chunk = *, str true_value = *, - str false_value = * + str false_value = *, + Stream stream = *, ) cpdef tuple chunked_read_json( @@ -45,4 +48,5 @@ cpdef tuple chunked_read_json( bool prune_columns = *, json_recovery_mode_t recovery_mode = *, int chunk_size= *, + Stream stream = *, ) diff --git a/python/pylibcudf/pylibcudf/io/json.pyi b/python/pylibcudf/pylibcudf/io/json.pyi index b2bc6a43700..e77960336e9 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyi +++ b/python/pylibcudf/pylibcudf/io/json.pyi @@ -11,6 +11,8 @@ from pylibcudf.io.types import ( TableWithMetadata, ) from pylibcudf.types import DataType +from rmm._cuda.stream import Stream + ChildNameToTypeMap: TypeAlias = Mapping[str, ChildNameToTypeMap] @@ -27,6 +29,7 @@ def read_json( mixed_types_as_string: bool = False, prune_columns: bool = False, recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL, + stream: stream = None, ) -> TableWithMetadata: ... def write_json( sink_info: SinkInfo, @@ -37,6 +40,8 @@ def write_json( rows_per_chunk: int = 2**32 - 1, true_value: str = "true", false_value: str = "false", + stream: stream = None, + ) -> None: ... def chunked_read_json( source_info: SourceInfo, @@ -47,4 +52,5 @@ def chunked_read_json( prune_columns: bool = False, recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL, chunk_size: int = 100_000_000, + stream: stream = None, ) -> tuple[list[Column], list[str], ChildNameToTypeMap]: ... diff --git a/python/pylibcudf/pylibcudf/io/json.pyx b/python/pylibcudf/pylibcudf/io/json.pyx index 32f737fbff4..2623d64ce32 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyx +++ b/python/pylibcudf/pylibcudf/io/json.pyx @@ -22,6 +22,8 @@ from pylibcudf.libcudf.io.types cimport ( ) from pylibcudf.libcudf.types cimport data_type, size_type from pylibcudf.types cimport DataType +from rmm._cuda.stream cimport Stream + __all__ = ["chunked_read_json", "read_json", "write_json"] @@ -128,6 +130,7 @@ cpdef tuple chunked_read_json( bool prune_columns = False, json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, int chunk_size=100_000_000, + Stream stream = None, ): """Reads an JSON file into a :py:class:`~.types.TableWithMetadata`. @@ -179,7 +182,8 @@ cpdef tuple chunked_read_json( prune_columns=prune_columns, recovery_mode=recovery_mode, ) - + if stream is None: + stream = Stream() # Read JSON cdef table_with_metadata c_result @@ -193,7 +197,7 @@ cpdef tuple chunked_read_json( try: with nogil: - c_result = move(cpp_read_json(opts)) + c_result = move(cpp_read_json(opts, stream.view())) except (ValueError, OverflowError): break if meta_names is None: @@ -232,6 +236,7 @@ cpdef TableWithMetadata read_json( bool prune_columns = False, json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL, dict extra_parameters = None, + Stream stream = None, ): """Reads an JSON file into a :py:class:`~.types.TableWithMetadata`. @@ -284,12 +289,13 @@ cpdef TableWithMetadata read_json( recovery_mode=recovery_mode, extra_parameters=extra_parameters, ) - + if stream is None: + stream = Stream() # Read JSON cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_json(opts)) + c_result = move(cpp_read_json(opts, stream.view())) return TableWithMetadata.from_libcudf(c_result) @@ -302,7 +308,8 @@ cpdef void write_json( bool lines = False, size_type rows_per_chunk = numeric_limits[size_type].max(), str true_value = "true", - str false_value = "false" + str false_value = "false", + Stream stream = None, ): """ Writes a :py:class:`~pylibcudf.table.Table` to JSON format. @@ -344,6 +351,7 @@ cpdef void write_json( options.set_true_value(true_value.encode()) if false_value != "false": options.set_false_value(false_value.encode()) - + if stream is None: + stream = Stream() with nogil: - cpp_write_json(options) + cpp_write_json(options, stream.view()) diff --git a/python/pylibcudf/pylibcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/io/orc.pxd index 671f0692444..ca7b794b38c 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/io/orc.pxd @@ -31,6 +31,8 @@ from pylibcudf.libcudf.io.types cimport ( compression_type, statistics_freq, ) +from rmm._cuda.stream cimport Stream + cpdef TableWithMetadata read_orc( SourceInfo source_info, @@ -41,7 +43,8 @@ cpdef TableWithMetadata read_orc( bool use_index = *, bool use_np_dtypes = *, DataType timestamp_type = *, - list decimal128_columns = * + list decimal128_columns = *, + Stream stream = *, ) cdef class OrcColumnStatistics: @@ -85,7 +88,7 @@ cdef class OrcWriterOptionsBuilder: cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta) cpdef OrcWriterOptions build(self) -cpdef void write_orc(OrcWriterOptions options) +cpdef void write_orc(OrcWriterOptions options, Stream stream = *) cdef class OrcChunkedWriter: cdef unique_ptr[orc_chunked_writer] c_obj diff --git a/python/pylibcudf/pylibcudf/io/orc.pyi b/python/pylibcudf/pylibcudf/io/orc.pyi index 516f97981e9..56426224be8 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyi +++ b/python/pylibcudf/pylibcudf/io/orc.pyi @@ -12,6 +12,7 @@ from pylibcudf.io.types import ( ) from pylibcudf.table import Table from pylibcudf.types import DataType +from rmm._cuda.stream import Stream def read_orc( source_info: SourceInfo, @@ -23,6 +24,7 @@ def read_orc( use_np_dtypes: bool = True, timestamp_type: DataType | None = None, decimal128_columns: list[str] | None = None, + stream: Strem = Stream(), ) -> TableWithMetadata: ... class OrcColumnStatistics: diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index 63eab4a9634..fc5ae404aaa 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -39,6 +39,8 @@ from pylibcudf.libcudf.io.orc cimport ( orc_writer_options, chunked_orc_writer_options, ) +from rmm._cuda.stream cimport Stream + __all__ = [ "OrcColumnStatistics", @@ -247,6 +249,7 @@ cpdef TableWithMetadata read_orc( bool use_np_dtypes = True, DataType timestamp_type = None, list decimal128_columns = None, + Stream stream = None, ): """Reads an ORC file into a :py:class:`~.types.TableWithMetadata`. @@ -310,11 +313,12 @@ cpdef TableWithMetadata read_orc( raise TypeError("Column names must be strings!") c_column_names.push_back(col.encode()) opts.set_columns(c_column_names) - + if stream is None: + stream = Stream() cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_orc(opts)) + c_result = move(cpp_read_orc(opts, stream.view())) return TableWithMetadata.from_libcudf(c_result) @@ -496,7 +500,7 @@ cdef class OrcWriterOptionsBuilder: return orc_options -cpdef void write_orc(OrcWriterOptions options): +cpdef void write_orc(OrcWriterOptions options, Stream stream = None): """ Write to ORC format. @@ -514,8 +518,10 @@ cpdef void write_orc(OrcWriterOptions options): ------- None """ + if stream is None: + stream = Stream() with nogil: - cpp_write_orc(move(options.c_obj)) + cpp_write_orc(move(options.c_obj), stream.view()) cdef class OrcChunkedWriter: @@ -547,7 +553,7 @@ cdef class OrcChunkedWriter: self.c_obj.get()[0].write(table.view()) @staticmethod - def from_options(ChunkedOrcWriterOptions options): + def from_options(ChunkedOrcWriterOptions options, Stream stream = None): """ Creates a chunked ORC writer from options @@ -560,10 +566,12 @@ cdef class OrcChunkedWriter: ------- OrcChunkedWriter """ + if stream is None: + stream = Stream() cdef OrcChunkedWriter orc_writer = OrcChunkedWriter.__new__( OrcChunkedWriter ) - orc_writer.c_obj.reset(new orc_chunked_writer(options.c_obj)) + orc_writer.c_obj.reset(new orc_chunked_writer(options.c_obj, stream.view())) return orc_writer diff --git a/python/pylibcudf/pylibcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/io/parquet.pxd index 84f47cf5305..ab168189032 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/io/parquet.pxd @@ -27,6 +27,7 @@ from pylibcudf.libcudf.io.parquet cimport ( from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table from pylibcudf.types cimport DataType +from rmm._cuda.stream cimport Stream cdef class ParquetReaderOptions: @@ -55,7 +56,7 @@ cdef class ChunkedParquetReader: cpdef TableWithMetadata read_chunk(self) -cpdef read_parquet(ParquetReaderOptions options) +cpdef read_parquet(ParquetReaderOptions options, Stream stream = *) cdef class ParquetChunkedWriter: @@ -145,6 +146,6 @@ cdef class ParquetWriterOptionsBuilder: cpdef ParquetWriterOptions build(self) -cpdef memoryview write_parquet(ParquetWriterOptions options) +cpdef memoryview write_parquet(ParquetWriterOptions options, Stream stream = *) cpdef memoryview merge_row_group_metadata(list metdata_list) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index 2d8d12c1a45..2923471b82b 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -16,6 +16,7 @@ from pylibcudf.io.types import ( TableWithMetadata, ) from pylibcudf.table import Table +from rmm._cuda.stream import Stream class ParquetReaderOptions: def __init__(self): ... @@ -53,18 +54,8 @@ class ChunkedParquetReader: def read_chunk(self) -> TableWithMetadata: ... def read_parquet( - source_info: SourceInfo, - columns: list[str] | None = None, - row_groups: list[list[int]] | None = None, - filters: Expression | None = None, - convert_strings_to_categories: bool = False, - use_pandas_metadata: bool = True, - skip_rows: int = 0, - nrows: int = -1, - allow_mismatched_pq_schemas: bool = False, - # disabled see comment in parquet.pyx for more - # reader_column_schema: ReaderColumnSchema = *, - # timestamp_type: DataType = * + options: ParquetReaderOptions, + stream: stream = None, ) -> TableWithMetadata: ... class ParquetWriterOptions: @@ -96,14 +87,14 @@ class ParquetWriterOptionsBuilder: def write_arrow_schema(self, enabled: bool) -> Self: ... def build(self) -> ParquetWriterOptions: ... -def write_parquet(options: ParquetWriterOptions) -> memoryview: ... +def write_parquet(options: ParquetWriterOptions, stream: stream = None) -> memoryview: ... class ParquetChunkedWriter: def __init__(self): ... def close(self, metadata_file_path: list) -> memoryview: ... def write(self, table: Table) -> None: ... @staticmethod - def from_options(options: ChunkedParquetWriterOptions) -> Self: ... + def from_options(options: ChunkedParquetWriterOptions, stream: stream = None) -> Self: ... class ChunkedParquetWriterOptions: def __init__(self): ... diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 672fe2be847..32870246c13 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -35,6 +35,7 @@ from pylibcudf.libcudf.io.types cimport ( ) from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table +from rmm._cuda.stream cimport Stream __all__ = [ "ChunkedParquetReader", @@ -306,7 +307,7 @@ cdef class ChunkedParquetReader: return TableWithMetadata.from_libcudf(c_result) -cpdef read_parquet(ParquetReaderOptions options): +cpdef read_parquet(ParquetReaderOptions options, Stream stream = None): """ Read from Parquet format. @@ -320,8 +321,10 @@ cpdef read_parquet(ParquetReaderOptions options): options: ParquetReaderOptions Settings for controlling reading behavior """ + if stream is None: + stream = Stream() with nogil: - c_result = move(cpp_read_parquet(options.c_obj)) + c_result = move(cpp_read_parquet(options.c_obj, stream.view())) return TableWithMetadata.from_libcudf(c_result) @@ -378,7 +381,7 @@ cdef class ParquetChunkedWriter: self.c_obj.get()[0].write(table.view(), partitions) @staticmethod - def from_options(ChunkedParquetWriterOptions options): + def from_options(ChunkedParquetWriterOptions options, Stream stream = None): """ Creates a chunked Parquet writer from options @@ -391,6 +394,8 @@ cdef class ParquetChunkedWriter: ------- ParquetChunkedWriter """ + if stream is None: + stream = Stream() cdef ParquetChunkedWriter parquet_writer = ParquetChunkedWriter.__new__( ParquetChunkedWriter ) @@ -916,7 +921,7 @@ cdef class ParquetWriterOptionsBuilder: return parquet_options -cpdef memoryview write_parquet(ParquetWriterOptions options): +cpdef memoryview write_parquet(ParquetWriterOptions options, Stream stream = None): """ Writes a set of columns to parquet format. @@ -932,10 +937,12 @@ cpdef memoryview write_parquet(ParquetWriterOptions options): (parquet FileMetadata thrift message) if requested in parquet_writer_options (empty blob otherwise). """ + if stream is None: + stream = Stream() cdef unique_ptr[vector[uint8_t]] c_result with nogil: - c_result = cpp_write_parquet(move(options.c_obj)) + c_result = cpp_write_parquet(move(options.c_obj), stream.view()) return memoryview(HostBuffer.from_unique_ptr(move(c_result))) diff --git a/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd b/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd index cac55640ac9..5252d6d7186 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd @@ -4,6 +4,7 @@ from libcpp.string cimport string from libcpp.vector cimport vector from pylibcudf.exception_handler cimport libcudf_exception_handler from pylibcudf.libcudf.types cimport size_type +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/avro.hpp" \ @@ -45,5 +46,6 @@ cdef extern from "cudf/io/avro.hpp" \ avro_reader_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata read_avro( - avro_reader_options &options + avro_reader_options &options, + cuda_stream_view stream, ) except +libcudf_exception_handler diff --git a/python/pylibcudf/pylibcudf/libcudf/io/csv.pxd b/python/pylibcudf/pylibcudf/libcudf/io/csv.pxd index 7ca158016a2..ae1dcd64174 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/csv.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/csv.pxd @@ -9,7 +9,7 @@ from libcpp.string cimport string from libcpp.vector cimport vector from pylibcudf.exception_handler cimport libcudf_exception_handler from pylibcudf.libcudf.types cimport data_type, size_type - +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/csv.hpp" \ namespace "cudf::io" nogil: @@ -259,7 +259,8 @@ cdef extern from "cudf/io/csv.hpp" \ csv_reader_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata read_csv( - csv_reader_options &options + csv_reader_options &options, + cuda_stream_view stream, ) except +libcudf_exception_handler cdef cppclass csv_writer_options: @@ -330,4 +331,7 @@ cdef extern from "cudf/io/csv.hpp" \ csv_writer_options build() except +libcudf_exception_handler - cdef void write_csv(csv_writer_options args) except +libcudf_exception_handler + cdef void write_csv( + csv_writer_options args, + cuda_stream_view stream, + ) except +libcudf_exception_handler diff --git a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd index c241c478f25..1037f13c834 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/json.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/json.pxd @@ -10,6 +10,7 @@ from libcpp.string cimport string from libcpp.vector cimport vector from pylibcudf.exception_handler cimport libcudf_exception_handler from pylibcudf.libcudf.types cimport data_type, size_type +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/json.hpp" \ @@ -154,7 +155,9 @@ cdef extern from "cudf/io/json.hpp" \ json_reader_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata read_json( - json_reader_options &options) except +libcudf_exception_handler + json_reader_options &options, + cuda_stream_view stream, + ) except +libcudf_exception_handler cdef cppclass json_writer_options: json_writer_options() except +libcudf_exception_handler @@ -222,4 +225,6 @@ cdef extern from "cudf/io/json.hpp" \ json_writer_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata write_json( - json_writer_options &options) except +libcudf_exception_handler + json_writer_options &options, + cuda_stream_view stream, + ) except +libcudf_exception_handler diff --git a/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd index f5485da1d51..20f4f9bd367 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd @@ -10,6 +10,7 @@ from libcpp.string cimport string from libcpp.vector cimport vector from pylibcudf.exception_handler cimport libcudf_exception_handler from pylibcudf.libcudf.types cimport data_type, size_type +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/orc.hpp" \ @@ -76,7 +77,8 @@ cdef extern from "cudf/io/orc.hpp" \ orc_reader_options build() except +libcudf_exception_handler cdef cudf_io_types.table_with_metadata read_orc( - orc_reader_options opts + orc_reader_options opts, + cuda_stream_view stream, ) except +libcudf_exception_handler cdef cppclass orc_writer_options: @@ -144,7 +146,8 @@ cdef extern from "cudf/io/orc.hpp" \ orc_writer_options build() except +libcudf_exception_handler cdef void write_orc( - orc_writer_options options + orc_writer_options options, + cuda_stream_view stream, ) except +libcudf_exception_handler cdef cppclass chunked_orc_writer_options: diff --git a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd index e03fe7e921e..410bcf3dab4 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd @@ -21,6 +21,7 @@ from pylibcudf.libcudf.io.types cimport ( ) from pylibcudf.libcudf.table.table_view cimport table_view from pylibcudf.libcudf.types cimport data_type, size_type +from rmm.librmm.cuda_stream_view cimport cuda_stream_view cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: @@ -87,7 +88,9 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: parquet_reader_options build() except +libcudf_exception_handler cdef table_with_metadata read_parquet( - parquet_reader_options args) except +libcudf_exception_handler + parquet_reader_options args, + cuda_stream_view stream, + ) except +libcudf_exception_handler cdef cppclass parquet_writer_options_base: parquet_writer_options_base() except +libcudf_exception_handler @@ -212,7 +215,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: ) except +libcudf_exception_handler cdef unique_ptr[vector[uint8_t]] write_parquet( - parquet_writer_options options + parquet_writer_options options, + cuda_stream_view stream, ) except +libcudf_exception_handler cdef cppclass chunked_parquet_writer_options(parquet_writer_options_base): From 48fc8a939bb1651dfc049923da9fad616fae3b70 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 18 Dec 2024 15:06:13 +0000 Subject: [PATCH 03/18] clean up --- python/pylibcudf/pylibcudf/io/avro.pyi | 8 ++++++-- python/pylibcudf/pylibcudf/io/csv.pyi | 9 ++++----- python/pylibcudf/pylibcudf/io/json.pyi | 11 +++++------ python/pylibcudf/pylibcudf/io/orc.pyi | 5 +++-- python/pylibcudf/pylibcudf/io/parquet.pyi | 13 +++++++++---- 5 files changed, 27 insertions(+), 19 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/avro.pyi b/python/pylibcudf/pylibcudf/io/avro.pyi index 281cf38889f..3de4c4bf9c2 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyi +++ b/python/pylibcudf/pylibcudf/io/avro.pyi @@ -1,7 +1,9 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from pylibcudf.io.types import SourceInfo, TableWithMetadata + from rmm._cuda.stream import Stream +from pylibcudf.io.types import SourceInfo, TableWithMetadata + __all__ = ["AvroReaderOptions", "AvroReaderOptionsBuilder", "read_avro"] class AvroReaderOptions: @@ -14,4 +16,6 @@ class AvroReaderOptionsBuilder: def num_rows(num_rows: int) -> AvroReaderOptionsBuilder: ... def build(self) -> AvroReaderOptions: ... -def read_avro(options: AvroReaderOptions, stream: stream = None) -> TableWithMetadata: ... +def read_avro( + options: AvroReaderOptions, stream: Stream = None +) -> TableWithMetadata: ... diff --git a/python/pylibcudf/pylibcudf/io/csv.pyi b/python/pylibcudf/pylibcudf/io/csv.pyi index ec4a6dd3115..24d5cb5fada 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyi +++ b/python/pylibcudf/pylibcudf/io/csv.pyi @@ -1,9 +1,9 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from collections.abc import Mapping - from typing_extensions import Self +from rmm._cuda.stream import Stream + from pylibcudf.io.types import ( CompressionType, QuoteStyle, @@ -13,7 +13,6 @@ from pylibcudf.io.types import ( ) from pylibcudf.table import Table from pylibcudf.types import DataType -from rmm._cuda.stream import Stream class CsvReaderOptions: def __init__(self): ... @@ -58,9 +57,9 @@ class CsvReaderOptionsBuilder: def read_csv( options: CsvReaderOptions, - stream: stream = None, + stream: Stream = None, ) -> TableWithMetadata: ... -def write_csv(options: CsvWriterOptionsBuilder, stream: stream = None): ... +def write_csv(options: CsvWriterOptionsBuilder, stream: Stream = None): ... class CsvWriterOptions: def __init__(self): ... diff --git a/python/pylibcudf/pylibcudf/io/json.pyi b/python/pylibcudf/pylibcudf/io/json.pyi index 6b43cc3db6d..3f02e53a899 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyi +++ b/python/pylibcudf/pylibcudf/io/json.pyi @@ -4,6 +4,8 @@ from typing import TypeAlias from typing_extensions import Self +from rmm._cuda.stream import Stream + from pylibcudf.column import Column from pylibcudf.io.types import ( CompressionType, @@ -14,8 +16,6 @@ from pylibcudf.io.types import ( ) from pylibcudf.table import Table from pylibcudf.types import DataType -from rmm._cuda.stream import Stream - ChildNameToTypeMap: TypeAlias = Mapping[str, ChildNameToTypeMap] @@ -32,7 +32,7 @@ def read_json( mixed_types_as_string: bool = False, prune_columns: bool = False, recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL, - stream: stream = None, + stream: Stream = None, ) -> TableWithMetadata: ... class JsonWriterOptions: @@ -49,8 +49,7 @@ class JsonWriterOptionsBuilder: def lines(self, val: bool) -> Self: ... def build(self) -> JsonWriterOptions: ... -def write_json(options: JsonWriterOptions, stream: stream = None) -> None: ... - +def write_json(options: JsonWriterOptions, stream: Stream = None) -> None: ... def chunked_read_json( source_info: SourceInfo, dtypes: list[NameAndType] | None = None, @@ -60,5 +59,5 @@ def chunked_read_json( prune_columns: bool = False, recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL, chunk_size: int = 100_000_000, - stream: stream = None, + stream: Stream = None, ) -> tuple[list[Column], list[str], ChildNameToTypeMap]: ... diff --git a/python/pylibcudf/pylibcudf/io/orc.pyi b/python/pylibcudf/pylibcudf/io/orc.pyi index 56426224be8..892fc5905b6 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyi +++ b/python/pylibcudf/pylibcudf/io/orc.pyi @@ -2,6 +2,8 @@ from typing import Any, Self +from rmm._cuda.stream import Stream + from pylibcudf.io.types import ( CompressionType, SinkInfo, @@ -12,7 +14,6 @@ from pylibcudf.io.types import ( ) from pylibcudf.table import Table from pylibcudf.types import DataType -from rmm._cuda.stream import Stream def read_orc( source_info: SourceInfo, @@ -24,7 +25,7 @@ def read_orc( use_np_dtypes: bool = True, timestamp_type: DataType | None = None, decimal128_columns: list[str] | None = None, - stream: Strem = Stream(), + stream: Stream = Stream(), ) -> TableWithMetadata: ... class OrcColumnStatistics: diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyi b/python/pylibcudf/pylibcudf/io/parquet.pyi index 2923471b82b..d0f3c2451b8 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyi +++ b/python/pylibcudf/pylibcudf/io/parquet.pyi @@ -4,6 +4,8 @@ from collections.abc import Mapping from typing_extensions import Self +from rmm._cuda.stream import Stream + from pylibcudf.expressions import Expression from pylibcudf.io.types import ( CompressionType, @@ -16,7 +18,6 @@ from pylibcudf.io.types import ( TableWithMetadata, ) from pylibcudf.table import Table -from rmm._cuda.stream import Stream class ParquetReaderOptions: def __init__(self): ... @@ -55,7 +56,7 @@ class ChunkedParquetReader: def read_parquet( options: ParquetReaderOptions, - stream: stream = None, + stream: Stream = None, ) -> TableWithMetadata: ... class ParquetWriterOptions: @@ -87,14 +88,18 @@ class ParquetWriterOptionsBuilder: def write_arrow_schema(self, enabled: bool) -> Self: ... def build(self) -> ParquetWriterOptions: ... -def write_parquet(options: ParquetWriterOptions, stream: stream = None) -> memoryview: ... +def write_parquet( + options: ParquetWriterOptions, stream: Stream = None +) -> memoryview: ... class ParquetChunkedWriter: def __init__(self): ... def close(self, metadata_file_path: list) -> memoryview: ... def write(self, table: Table) -> None: ... @staticmethod - def from_options(options: ChunkedParquetWriterOptions, stream: stream = None) -> Self: ... + def from_options( + options: ChunkedParquetWriterOptions, stream: Stream = None + ) -> Self: ... class ChunkedParquetWriterOptions: def __init__(self): ... From 8cf0eb12db36b6cfa4a54f23ed996bc4920b91de Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Wed, 18 Dec 2024 11:03:24 -0500 Subject: [PATCH 04/18] use stream --- python/pylibcudf/pylibcudf/io/parquet.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 32870246c13..1d25f740565 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -399,7 +399,7 @@ cdef class ParquetChunkedWriter: cdef ParquetChunkedWriter parquet_writer = ParquetChunkedWriter.__new__( ParquetChunkedWriter ) - parquet_writer.c_obj.reset(new cpp_parquet_chunked_writer(options.c_obj)) + parquet_writer.c_obj.reset(new cpp_parquet_chunked_writer(options.c_obj, stream.view())) return parquet_writer From dee9fda17287bc521ffccbea16253b4cde1b1834 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Wed, 18 Dec 2024 11:05:24 -0500 Subject: [PATCH 05/18] add stream to parquet_chunked_writer --- python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd index 410bcf3dab4..0697bf8a65e 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/parquet.pxd @@ -239,7 +239,8 @@ cdef extern from "cudf/io/parquet.hpp" namespace "cudf::io" nogil: cdef cppclass parquet_chunked_writer: parquet_chunked_writer() except +libcudf_exception_handler parquet_chunked_writer( - chunked_parquet_writer_options args + chunked_parquet_writer_options args, + cuda_stream_view stream, ) except +libcudf_exception_handler parquet_chunked_writer& write( table_view table_, From b06dabc862845038a4ac9c53ef681982ef287922 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Wed, 18 Dec 2024 11:06:47 -0500 Subject: [PATCH 06/18] add stream to orc_chunked_writer --- python/pylibcudf/pylibcudf/libcudf/io/orc.pxd | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd index 20f4f9bd367..4915565956b 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/orc.pxd @@ -216,7 +216,8 @@ cdef extern from "cudf/io/orc.hpp" \ cdef cppclass orc_chunked_writer: orc_chunked_writer() except +libcudf_exception_handler orc_chunked_writer( - chunked_orc_writer_options args + chunked_orc_writer_options args, + cuda_stream_view stream, ) except +libcudf_exception_handler orc_chunked_writer& write( cudf_table_view.table_view table_, From 4fa1a1afcaa7d43d0a0dfd1b5459084f29d3e1aa Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Wed, 18 Dec 2024 11:08:24 -0500 Subject: [PATCH 07/18] Update python/pylibcudf/pylibcudf/io/csv.pyx --- python/pylibcudf/pylibcudf/io/csv.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/csv.pyx b/python/pylibcudf/pylibcudf/io/csv.pyx index 0c07e4ac037..8c56190d76d 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyx +++ b/python/pylibcudf/pylibcudf/io/csv.pyx @@ -650,7 +650,7 @@ cpdef TableWithMetadata read_csv( stream = Stream() cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_csv(options.c_obj), stream.view()) + c_result = move(cpp_read_csv(options.c_obj, stream.view())) cdef TableWithMetadata tbl_meta = TableWithMetadata.from_libcudf(c_result) return tbl_meta From a60f80042e26fe3cc1bd4eae6acb55a91c8af72c Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Wed, 18 Dec 2024 11:08:28 -0500 Subject: [PATCH 08/18] Update python/pylibcudf/pylibcudf/io/csv.pyx --- python/pylibcudf/pylibcudf/io/csv.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/csv.pyx b/python/pylibcudf/pylibcudf/io/csv.pyx index 8c56190d76d..95389cd5729 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyx +++ b/python/pylibcudf/pylibcudf/io/csv.pyx @@ -854,4 +854,4 @@ cpdef void write_csv( if stream is None: stream = Stream() with nogil: - cpp_write_csv(move(options.c_obj), stream.view()) + cpp_write_csv(move(options.c_obj, stream.view())) From 5727aa119cbf09a3db4ae3297044ed9ee6ccb5d3 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Wed, 18 Dec 2024 11:08:33 -0500 Subject: [PATCH 09/18] Update python/pylibcudf/pylibcudf/io/json.pyx --- python/pylibcudf/pylibcudf/io/json.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/json.pyx b/python/pylibcudf/pylibcudf/io/json.pyx index a7ba3af49c9..b3472a73997 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyx +++ b/python/pylibcudf/pylibcudf/io/json.pyx @@ -473,4 +473,4 @@ cpdef void write_json(JsonWriterOptions options, Stream stream = None): if stream is None: stream = Stream() with nogil: - cpp_write_json(options.c_obj) + cpp_write_json(options.c_obj, stream.view()) From 9d1cc92e42eeaa3d2b188c30bb854186fccff5e0 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 18 Dec 2024 16:14:24 +0000 Subject: [PATCH 10/18] clean up --- python/pylibcudf/pylibcudf/io/orc.pyi | 2 -- python/pylibcudf/pylibcudf/io/parquet.pyx | 6 +++++- 2 files changed, 5 insertions(+), 3 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/orc.pyi b/python/pylibcudf/pylibcudf/io/orc.pyi index e1d6366b561..c496b7a2152 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyi +++ b/python/pylibcudf/pylibcudf/io/orc.pyi @@ -4,8 +4,6 @@ from typing import Any from typing_extensions import Self -from rmm._cuda.stream import Stream - from pylibcudf.io.types import ( CompressionType, SinkInfo, diff --git a/python/pylibcudf/pylibcudf/io/parquet.pyx b/python/pylibcudf/pylibcudf/io/parquet.pyx index 1d25f740565..7d100769beb 100644 --- a/python/pylibcudf/pylibcudf/io/parquet.pyx +++ b/python/pylibcudf/pylibcudf/io/parquet.pyx @@ -399,7 +399,11 @@ cdef class ParquetChunkedWriter: cdef ParquetChunkedWriter parquet_writer = ParquetChunkedWriter.__new__( ParquetChunkedWriter ) - parquet_writer.c_obj.reset(new cpp_parquet_chunked_writer(options.c_obj, stream.view())) + parquet_writer.c_obj.reset( + new cpp_parquet_chunked_writer( + options.c_obj, stream.view() + ) + ) return parquet_writer From 847786983ca74869c3e2ddd4cb5c37363c8543b9 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Wed, 18 Dec 2024 12:05:09 -0500 Subject: [PATCH 11/18] fix typo --- python/pylibcudf/pylibcudf/io/csv.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/csv.pyx b/python/pylibcudf/pylibcudf/io/csv.pyx index 95389cd5729..8c56190d76d 100644 --- a/python/pylibcudf/pylibcudf/io/csv.pyx +++ b/python/pylibcudf/pylibcudf/io/csv.pyx @@ -854,4 +854,4 @@ cpdef void write_csv( if stream is None: stream = Stream() with nogil: - cpp_write_csv(move(options.c_obj, stream.view())) + cpp_write_csv(move(options.c_obj), stream.view()) From e117d64f67ebf84039b834da153dc6f48575049d Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Wed, 18 Dec 2024 20:11:59 +0000 Subject: [PATCH 12/18] add stream param to cpp_read_orc --- python/pylibcudf/pylibcudf/io/orc.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index f385b7af9ca..477d10bf9dc 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -426,7 +426,7 @@ cpdef TableWithMetadata read_orc(OrcReaderOptions options, Stream stream = None) cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_orc(options.c_obj)) + c_result = move(cpp_read_orc(options.c_obj, stream.view())) return TableWithMetadata.from_libcudf(c_result) From 391322918d9b6d0d8fc1c037fd14f373b1ced387 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 19 Dec 2024 02:07:23 +0000 Subject: [PATCH 13/18] add a test --- .../pylibcudf/pylibcudf/tests/io/test_csv.py | 30 +++++++++++++++++++ 1 file changed, 30 insertions(+) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_csv.py b/python/pylibcudf/pylibcudf/tests/io/test_csv.py index 555ca2fb02c..d1e2f42ffb5 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_csv.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_csv.py @@ -14,6 +14,8 @@ write_source_str, ) +from rmm._cuda.stream import Stream + import pylibcudf as plc from pylibcudf.io.types import CompressionType @@ -44,6 +46,16 @@ def csv_table_data(table_data): return plc.interop.from_arrow(pa_table), pa_table +@pytest.fixture(scope="module") +def simple_csv_table_data(): + return [ + "1,2,3,4_4,'z'", + '4,5,6,5_5,""', + "7,8,9,9_87,'123'", + "1,1,1,10_11,abc", + ] + + @pytest.mark.parametrize("delimiter", [",", ";"]) def test_read_csv_basic( csv_table_data, @@ -389,3 +401,21 @@ def test_write_csv_na_rep(na_rep): pd_result = pa_tbl.to_pandas().to_csv(na_rep=na_rep, index=False) assert str_result == pd_result + + +@pytest.mark.parametrize("stream", [None, Stream()]) +def test_read_csv_with_default_stream( + source_or_sink, simple_csv_table_data, stream +): + buffer = "\n".join(simple_csv_table_data) + + write_source_str(source_or_sink, buffer) + + options = plc.io.csv.CsvReaderOptions.builder( + plc.io.SourceInfo([source_or_sink]) + ).build() + plc_table_w_meta = plc.io.csv.read_csv(options, stream) + df = pd.read_csv( + StringIO(buffer), + ) + assert_table_and_meta_eq(pa.Table.from_pandas(df), plc_table_w_meta) From 5affbbcaf19297ae05d1284d14071a40a8fa8b7c Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:59:21 -0500 Subject: [PATCH 14/18] Update python/pylibcudf/pylibcudf/io/json.pyx --- python/pylibcudf/pylibcudf/io/json.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/json.pyx b/python/pylibcudf/pylibcudf/io/json.pyx index e18f63c8f76..3681f1692d6 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyx +++ b/python/pylibcudf/pylibcudf/io/json.pyx @@ -461,7 +461,7 @@ cpdef tuple chunked_read_json( try: with nogil: - c_result = move(cpp_read_json(opts, stream.view())) + c_result = move(cpp_read_json(options.c_obj, stream.view())) except (ValueError, OverflowError): break if meta_names is None: From 218e73be7e88f0a30e21defde90844b64cc5ed7b Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Thu, 19 Dec 2024 09:59:27 -0500 Subject: [PATCH 15/18] Update python/pylibcudf/pylibcudf/io/json.pyx --- python/pylibcudf/pylibcudf/io/json.pyx | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/json.pyx b/python/pylibcudf/pylibcudf/io/json.pyx index 3681f1692d6..ecfa6785fa5 100644 --- a/python/pylibcudf/pylibcudf/io/json.pyx +++ b/python/pylibcudf/pylibcudf/io/json.pyx @@ -516,7 +516,7 @@ cpdef TableWithMetadata read_json( cdef table_with_metadata c_result with nogil: - c_result = move(cpp_read_json(opts, stream.view())) + c_result = move(cpp_read_json(options.c_obj, stream.view())) return TableWithMetadata.from_libcudf(c_result) From c29cdc45d4f28cdbf95e378b13378ace9129c192 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 19 Dec 2024 16:06:41 +0000 Subject: [PATCH 16/18] remove stream param from avro reader --- python/pylibcudf/pylibcudf/io/avro.pxd | 3 +-- python/pylibcudf/pylibcudf/io/avro.pyi | 6 +----- python/pylibcudf/pylibcudf/io/avro.pyx | 4 ---- python/pylibcudf/pylibcudf/libcudf/io/avro.pxd | 1 - 4 files changed, 2 insertions(+), 12 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/avro.pxd b/python/pylibcudf/pylibcudf/io/avro.pxd index 5036ef04533..9981a954b6c 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pxd +++ b/python/pylibcudf/pylibcudf/io/avro.pxd @@ -2,7 +2,6 @@ from pylibcudf.io.types cimport SourceInfo, TableWithMetadata from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_builder from pylibcudf.libcudf.types cimport size_type -from rmm._cuda.stream cimport Stream from pylibcudf.libcudf.types cimport size_type @@ -20,4 +19,4 @@ cdef class AvroReaderOptionsBuilder: cpdef AvroReaderOptionsBuilder num_rows(self, size_type num_rows) cpdef AvroReaderOptions build(self) -cpdef TableWithMetadata read_avro(AvroReaderOptions options, Stream stream = *) +cpdef TableWithMetadata read_avro(AvroReaderOptions options) diff --git a/python/pylibcudf/pylibcudf/io/avro.pyi b/python/pylibcudf/pylibcudf/io/avro.pyi index 3de4c4bf9c2..764b15f37d9 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyi +++ b/python/pylibcudf/pylibcudf/io/avro.pyi @@ -1,7 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from rmm._cuda.stream import Stream - from pylibcudf.io.types import SourceInfo, TableWithMetadata __all__ = ["AvroReaderOptions", "AvroReaderOptionsBuilder", "read_avro"] @@ -16,6 +14,4 @@ class AvroReaderOptionsBuilder: def num_rows(num_rows: int) -> AvroReaderOptionsBuilder: ... def build(self) -> AvroReaderOptions: ... -def read_avro( - options: AvroReaderOptions, stream: Stream = None -) -> TableWithMetadata: ... +def read_avro(options: AvroReaderOptions) -> TableWithMetadata: ... diff --git a/python/pylibcudf/pylibcudf/io/avro.pyx b/python/pylibcudf/pylibcudf/io/avro.pyx index e5e1e9755fd..5b37f0a0a67 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyx +++ b/python/pylibcudf/pylibcudf/io/avro.pyx @@ -9,7 +9,6 @@ from pylibcudf.libcudf.io.avro cimport ( read_avro as cpp_read_avro, ) from pylibcudf.libcudf.types cimport size_type -from rmm._cuda.stream cimport Stream __all__ = ["read_avro", "AvroReaderOptions", "AvroReaderOptionsBuilder"] @@ -128,7 +127,6 @@ cdef class AvroReaderOptionsBuilder: cpdef TableWithMetadata read_avro( AvroReaderOptions options, - Stream stream = None, ): """ Read from Avro format. @@ -143,8 +141,6 @@ cpdef TableWithMetadata read_avro( options: AvroReaderOptions Settings for controlling reading behavior """ - if stream is None: - stream = Stream() with nogil: c_result = move(cpp_read_avro(options.c_obj, stream.view())) diff --git a/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd b/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd index 5252d6d7186..9e571609d9c 100644 --- a/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd +++ b/python/pylibcudf/pylibcudf/libcudf/io/avro.pxd @@ -47,5 +47,4 @@ cdef extern from "cudf/io/avro.hpp" \ cdef cudf_io_types.table_with_metadata read_avro( avro_reader_options &options, - cuda_stream_view stream, ) except +libcudf_exception_handler From 9f7e87e7d28fd2181e0d45e43a05d41a02bc71f5 Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 19 Dec 2024 16:08:37 +0000 Subject: [PATCH 17/18] clean up --- python/pylibcudf/pylibcudf/io/avro.pxd | 1 - python/pylibcudf/pylibcudf/io/avro.pyi | 1 - python/pylibcudf/pylibcudf/io/avro.pyx | 4 ++-- 3 files changed, 2 insertions(+), 4 deletions(-) diff --git a/python/pylibcudf/pylibcudf/io/avro.pxd b/python/pylibcudf/pylibcudf/io/avro.pxd index 9981a954b6c..7be3d61c4a0 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pxd +++ b/python/pylibcudf/pylibcudf/io/avro.pxd @@ -2,7 +2,6 @@ from pylibcudf.io.types cimport SourceInfo, TableWithMetadata from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_builder from pylibcudf.libcudf.types cimport size_type - from pylibcudf.libcudf.types cimport size_type cdef class AvroReaderOptions: diff --git a/python/pylibcudf/pylibcudf/io/avro.pyi b/python/pylibcudf/pylibcudf/io/avro.pyi index 764b15f37d9..8cafc9a6573 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyi +++ b/python/pylibcudf/pylibcudf/io/avro.pyi @@ -1,5 +1,4 @@ # Copyright (c) 2024, NVIDIA CORPORATION. - from pylibcudf.io.types import SourceInfo, TableWithMetadata __all__ = ["AvroReaderOptions", "AvroReaderOptionsBuilder", "read_avro"] diff --git a/python/pylibcudf/pylibcudf/io/avro.pyx b/python/pylibcudf/pylibcudf/io/avro.pyx index 5b37f0a0a67..c378fca0415 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pyx +++ b/python/pylibcudf/pylibcudf/io/avro.pyx @@ -126,7 +126,7 @@ cdef class AvroReaderOptionsBuilder: cpdef TableWithMetadata read_avro( - AvroReaderOptions options, + AvroReaderOptions options ): """ Read from Avro format. @@ -142,6 +142,6 @@ cpdef TableWithMetadata read_avro( Settings for controlling reading behavior """ with nogil: - c_result = move(cpp_read_avro(options.c_obj, stream.view())) + c_result = move(cpp_read_avro(options.c_obj)) return TableWithMetadata.from_libcudf(c_result) From 3f84e5c57767fd8317ea6892bb6f67a4429d54ca Mon Sep 17 00:00:00 2001 From: Matthew Murray Date: Thu, 19 Dec 2024 16:09:26 +0000 Subject: [PATCH 18/18] clean up --- python/pylibcudf/pylibcudf/io/avro.pxd | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/pylibcudf/pylibcudf/io/avro.pxd b/python/pylibcudf/pylibcudf/io/avro.pxd index 7be3d61c4a0..c29e0ff98b2 100644 --- a/python/pylibcudf/pylibcudf/io/avro.pxd +++ b/python/pylibcudf/pylibcudf/io/avro.pxd @@ -2,7 +2,7 @@ from pylibcudf.io.types cimport SourceInfo, TableWithMetadata from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_builder from pylibcudf.libcudf.types cimport size_type -from pylibcudf.libcudf.types cimport size_type + cdef class AvroReaderOptions: cdef avro_reader_options c_obj