Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream parameters in pylibcudf IO APIs #17620

Draft
wants to merge 23 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 0 additions & 2 deletions python/pylibcudf/pylibcudf/io/avro.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_
from pylibcudf.libcudf.types cimport size_type


from pylibcudf.libcudf.types cimport size_type

cdef class AvroReaderOptions:
cdef avro_reader_options c_obj
cdef SourceInfo source
Expand Down
5 changes: 3 additions & 2 deletions python/pylibcudf/pylibcudf/io/csv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ from pylibcudf.libcudf.io.types cimport (
table_with_metadata,
)
from pylibcudf.libcudf.types cimport size_type
from rmm._cuda.stream cimport Stream

cdef class CsvReaderOptions:
cdef csv_reader_options c_obj
Expand Down Expand Up @@ -61,7 +62,7 @@ cdef class CsvReaderOptionsBuilder:
cpdef CsvReaderOptionsBuilder dayfirst(self, bool dayfirst)
cpdef CsvReaderOptions build(self)

cpdef TableWithMetadata read_csv(CsvReaderOptions options)
cpdef TableWithMetadata read_csv(CsvReaderOptions options, Stream stream = *)

cdef class CsvWriterOptions:
cdef csv_writer_options c_obj
Expand All @@ -84,4 +85,4 @@ cdef class CsvWriterOptionsBuilder:
cpdef CsvWriterOptions build(self)


cpdef void write_csv(CsvWriterOptions options)
cpdef void write_csv(CsvWriterOptions options, Stream stream = *)
48 changes: 5 additions & 43 deletions python/pylibcudf/pylibcudf/io/csv.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from collections.abc import Mapping

from typing_extensions import Self

from rmm._cuda.stream import Stream

from pylibcudf.io.types import (
CompressionType,
QuoteStyle,
Expand Down Expand Up @@ -56,48 +56,10 @@ class CsvReaderOptionsBuilder:
def build(self) -> CsvReaderOptions: ...

def read_csv(
source_info: SourceInfo,
*,
compression: CompressionType = CompressionType.AUTO,
byte_range_offset: int = 0,
byte_range_size: int = 0,
col_names: list[str] | None = None,
prefix: str = "",
mangle_dupe_cols: bool = True,
usecols: list[int] | list[str] | None = None,
nrows: int = -1,
skiprows: int = 0,
skipfooter: int = 0,
header: int = 0,
lineterminator: str = "\n",
delimiter: str | None = None,
thousands: str | None = None,
decimal: str = ".",
comment: str | None = None,
delim_whitespace: bool = False,
skipinitialspace: bool = False,
skip_blank_lines: bool = True,
quoting: QuoteStyle = QuoteStyle.MINIMAL,
quotechar: str = '"',
doublequote: bool = True,
parse_dates: list[str] | list[int] | None = None,
parse_hex: list[str] | list[int] | None = None,
# Technically this should be dict/list
# but using a fused type prevents using None as default
dtypes: Mapping[str, DataType] | list[DataType] | None = None,
true_values: list[str] | None = None,
false_values: list[str] | None = None,
na_values: list[str] | None = None,
keep_default_na: bool = True,
na_filter: bool = True,
dayfirst: bool = False,
# Note: These options are supported by the libcudf reader
# but are not exposed here since there is no demand for them
# on the Python side yet.
# detect_whitespace_around_quotes: bool = False,
# timestamp_type: DataType = DataType(type_id.EMPTY),
options: CsvReaderOptions,
stream: Stream = None,
) -> TableWithMetadata: ...
def write_csv(options: CsvWriterOptionsBuilder): ...
def write_csv(options: CsvWriterOptionsBuilder, stream: Stream = None): ...

class CsvWriterOptions:
def __init__(self): ...
Expand Down
16 changes: 11 additions & 5 deletions python/pylibcudf/pylibcudf/io/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.types cimport (
from pylibcudf.libcudf.types cimport data_type, size_type
from pylibcudf.types cimport DataType
from pylibcudf.table cimport Table
from rmm._cuda.stream cimport Stream

__all__ = [
"read_csv",
Expand Down Expand Up @@ -629,7 +630,8 @@ cdef class CsvReaderOptionsBuilder:


cpdef TableWithMetadata read_csv(
CsvReaderOptions options
CsvReaderOptions options,
Stream stream = None,
):
"""
Read from CSV format.
Expand All @@ -644,9 +646,11 @@ cpdef TableWithMetadata read_csv(
options: CsvReaderOptions
Settings for controlling reading behavior
"""
if stream is None:
stream = Stream()
cdef table_with_metadata c_result
with nogil:
c_result = move(cpp_read_csv(options.c_obj))
c_result = move(cpp_read_csv(options.c_obj, stream.view()))

cdef TableWithMetadata tbl_meta = TableWithMetadata.from_libcudf(c_result)
return tbl_meta
Expand Down Expand Up @@ -831,7 +835,8 @@ cdef class CsvWriterOptionsBuilder:


cpdef void write_csv(
CsvWriterOptions options
CsvWriterOptions options,
Stream stream = None,
):
"""
Write to CSV format.
Expand All @@ -846,6 +851,7 @@ cpdef void write_csv(
options: CsvWriterOptions
Settings for controlling writing behavior
"""

if stream is None:
stream = Stream()
with nogil:
cpp_write_csv(move(options.c_obj))
cpp_write_csv(move(options.c_obj), stream.view())
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
4 changes: 3 additions & 1 deletion python/pylibcudf/pylibcudf/io/json.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ from pylibcudf.libcudf.io.json cimport (
json_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from rmm._cuda.stream cimport Stream
from pylibcudf.table cimport Table


Expand Down Expand Up @@ -75,9 +76,10 @@ cdef class JsonWriterOptionsBuilder:
cpdef JsonWriterOptionsBuilder compression(self, compression_type comptype)
cpdef JsonWriterOptions build(self)

cpdef void write_json(JsonWriterOptions options)
cpdef void write_json(JsonWriterOptions options, Stream stream = *)

cpdef tuple chunked_read_json(
JsonReaderOptions options,
int chunk_size= *,
Stream stream = *,
)
5 changes: 4 additions & 1 deletion python/pylibcudf/pylibcudf/io/json.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ from typing import TypeAlias

from typing_extensions import Self

from rmm._cuda.stream import Stream

from pylibcudf.column import Column
from pylibcudf.io.types import (
CompressionType,
Expand Down Expand Up @@ -70,8 +72,9 @@ class JsonWriterOptionsBuilder:
def compression(self, comptype: CompressionType) -> Self: ...
def build(self) -> JsonWriterOptions: ...

def write_json(options: JsonWriterOptions) -> None: ...
def write_json(options: JsonWriterOptions, stream: Stream = None) -> None: ...
def chunked_read_json(
options: JsonReaderOptions,
chunk_size: int = 100_000_000,
stream: Stream = None,
) -> tuple[list[Column], list[str], ChildNameToTypeMap]: ...
22 changes: 17 additions & 5 deletions python/pylibcudf/pylibcudf/io/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ from pylibcudf.libcudf.io.types cimport (
)
from pylibcudf.libcudf.types cimport data_type, size_type
from pylibcudf.types cimport DataType
from rmm._cuda.stream cimport Stream


__all__ = [
"chunked_read_json",
Expand Down Expand Up @@ -422,6 +424,7 @@ cdef class JsonReaderOptionsBuilder:
cpdef tuple chunked_read_json(
JsonReaderOptions options,
int chunk_size=100_000_000,
Stream stream = None,
):
"""
Reads chunks of a JSON file into a :py:class:`~.types.TableWithMetadata`.
Expand All @@ -442,6 +445,9 @@ cpdef tuple chunked_read_json(
cdef size_type c_range_size = (
chunk_size if chunk_size is not None else 0
)
if stream is None:
stream = Stream()

cdef table_with_metadata c_result

final_columns = []
Expand All @@ -455,7 +461,7 @@ cpdef tuple chunked_read_json(

try:
with nogil:
c_result = move(cpp_read_json(options.c_obj))
c_result = move(cpp_read_json(options.c_obj, stream.view()))
except (ValueError, OverflowError):
break
if meta_names is None:
Expand Down Expand Up @@ -483,7 +489,8 @@ cpdef tuple chunked_read_json(


cpdef TableWithMetadata read_json(
JsonReaderOptions options
JsonReaderOptions options,
Stream stream = None,
):
"""
Read from JSON format.
Expand All @@ -503,10 +510,13 @@ cpdef TableWithMetadata read_json(
TableWithMetadata
The Table and its corresponding metadata (column names) that were read in.
"""
if stream is None:
stream = Stream()

cdef table_with_metadata c_result

with nogil:
c_result = move(cpp_read_json(options.c_obj))
c_result = move(cpp_read_json(options.c_obj, stream.view()))

return TableWithMetadata.from_libcudf(c_result)

Expand Down Expand Up @@ -694,7 +704,7 @@ cdef class JsonWriterOptionsBuilder:
return json_options


cpdef void write_json(JsonWriterOptions options):
cpdef void write_json(JsonWriterOptions options, Stream stream = None):
"""
Writes a set of columns to JSON format.

Expand All @@ -707,5 +717,7 @@ cpdef void write_json(JsonWriterOptions options):
-------
None
"""
if stream is None:
stream = Stream()
with nogil:
cpp_write_json(options.c_obj)
cpp_write_json(options.c_obj, stream.view())
4 changes: 3 additions & 1 deletion python/pylibcudf/pylibcudf/io/orc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ from pylibcudf.libcudf.io.types cimport (
compression_type,
statistics_freq,
)
from rmm._cuda.stream cimport Stream


cdef class OrcReaderOptions:
cdef orc_reader_options c_obj
Expand Down Expand Up @@ -93,7 +95,7 @@ cdef class OrcWriterOptionsBuilder:
cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef OrcWriterOptions build(self)

cpdef void write_orc(OrcWriterOptions options)
cpdef void write_orc(OrcWriterOptions options, Stream stream = *)

cdef class OrcChunkedWriter:
cdef unique_ptr[orc_chunked_writer] c_obj
Expand Down
20 changes: 14 additions & 6 deletions python/pylibcudf/pylibcudf/io/orc.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ from pylibcudf.libcudf.io.orc cimport (
orc_writer_options,
chunked_orc_writer_options,
)
from rmm._cuda.stream cimport Stream


__all__ = [
"OrcColumnStatistics",
Expand Down Expand Up @@ -405,7 +407,7 @@ cdef class OrcReaderOptionsBuilder:
return orc_options


cpdef TableWithMetadata read_orc(OrcReaderOptions options):
cpdef TableWithMetadata read_orc(OrcReaderOptions options, Stream stream = None):
"""
Read from ORC format.

Expand All @@ -419,10 +421,12 @@ cpdef TableWithMetadata read_orc(OrcReaderOptions options):
options: OrcReaderOptions
Settings for controlling reading behavior
"""
if stream is None:
stream = Stream()
cdef table_with_metadata c_result

with nogil:
c_result = move(cpp_read_orc(options.c_obj))
c_result = move(cpp_read_orc(options.c_obj, stream.view()))

return TableWithMetadata.from_libcudf(c_result)

Expand Down Expand Up @@ -604,7 +608,7 @@ cdef class OrcWriterOptionsBuilder:
return orc_options


cpdef void write_orc(OrcWriterOptions options):
cpdef void write_orc(OrcWriterOptions options, Stream stream = None):
"""
Write to ORC format.

Expand All @@ -622,8 +626,10 @@ cpdef void write_orc(OrcWriterOptions options):
-------
None
"""
if stream is None:
stream = Stream()
with nogil:
cpp_write_orc(move(options.c_obj))
cpp_write_orc(move(options.c_obj), stream.view())


cdef class OrcChunkedWriter:
Expand Down Expand Up @@ -655,7 +661,7 @@ cdef class OrcChunkedWriter:
self.c_obj.get()[0].write(table.view())

@staticmethod
def from_options(ChunkedOrcWriterOptions options):
def from_options(ChunkedOrcWriterOptions options, Stream stream = None):
"""
Creates a chunked ORC writer from options

Expand All @@ -668,10 +674,12 @@ cdef class OrcChunkedWriter:
-------
OrcChunkedWriter
"""
if stream is None:
stream = Stream()
cdef OrcChunkedWriter orc_writer = OrcChunkedWriter.__new__(
OrcChunkedWriter
)
orc_writer.c_obj.reset(new orc_chunked_writer(options.c_obj))
orc_writer.c_obj.reset(new orc_chunked_writer(options.c_obj, stream.view()))
return orc_writer


Expand Down
5 changes: 3 additions & 2 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ from pylibcudf.libcudf.io.parquet cimport (
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.table cimport Table
from pylibcudf.types cimport DataType
from rmm._cuda.stream cimport Stream


cdef class ParquetReaderOptions:
Expand Down Expand Up @@ -55,7 +56,7 @@ cdef class ChunkedParquetReader:
cpdef TableWithMetadata read_chunk(self)


cpdef read_parquet(ParquetReaderOptions options)
cpdef read_parquet(ParquetReaderOptions options, Stream stream = *)


cdef class ParquetChunkedWriter:
Expand Down Expand Up @@ -145,6 +146,6 @@ cdef class ParquetWriterOptionsBuilder:

cpdef ParquetWriterOptions build(self)

cpdef memoryview write_parquet(ParquetWriterOptions options)
cpdef memoryview write_parquet(ParquetWriterOptions options, Stream stream = *)

cpdef memoryview merge_row_group_metadata(list metdata_list)
Loading
Loading