Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add stream parameters in pylibcudf IO APIs #17620

Draft
wants to merge 23 commits into
base: branch-25.02
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from 15 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions python/pylibcudf/pylibcudf/io/avro.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from pylibcudf.libcudf.io.avro cimport avro_reader_options, avro_reader_options_builder
from pylibcudf.libcudf.types cimport size_type

from rmm._cuda.stream cimport Stream

from pylibcudf.libcudf.types cimport size_type

Expand All @@ -20,4 +20,4 @@ cdef class AvroReaderOptionsBuilder:
cpdef AvroReaderOptionsBuilder num_rows(self, size_type num_rows)
cpdef AvroReaderOptions build(self)

cpdef TableWithMetadata read_avro(AvroReaderOptions options)
cpdef TableWithMetadata read_avro(AvroReaderOptions options, Stream stream = *)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What does = * do in Cython? I don't think I've used this syntax but I have seen it in a few places.

Copy link
Contributor Author

@Matt711 Matt711 Dec 18, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Indicates the argument is optional: ref

7 changes: 6 additions & 1 deletion python/pylibcudf/pylibcudf/io/avro.pyi
Original file line number Diff line number Diff line change
@@ -1,4 +1,7 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from rmm._cuda.stream import Stream

from pylibcudf.io.types import SourceInfo, TableWithMetadata

__all__ = ["AvroReaderOptions", "AvroReaderOptionsBuilder", "read_avro"]
Expand All @@ -13,4 +16,6 @@ class AvroReaderOptionsBuilder:
def num_rows(num_rows: int) -> AvroReaderOptionsBuilder: ...
def build(self) -> AvroReaderOptions: ...

def read_avro(options: AvroReaderOptions) -> TableWithMetadata: ...
def read_avro(
options: AvroReaderOptions, stream: Stream = None
) -> TableWithMetadata: ...
8 changes: 6 additions & 2 deletions python/pylibcudf/pylibcudf/io/avro.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ from pylibcudf.libcudf.io.avro cimport (
read_avro as cpp_read_avro,
)
from pylibcudf.libcudf.types cimport size_type
from rmm._cuda.stream cimport Stream

__all__ = ["read_avro", "AvroReaderOptions", "AvroReaderOptionsBuilder"]

Expand Down Expand Up @@ -126,7 +127,8 @@ cdef class AvroReaderOptionsBuilder:


cpdef TableWithMetadata read_avro(
AvroReaderOptions options
AvroReaderOptions options,
Stream stream = None,
):
"""
Read from Avro format.
Expand All @@ -141,7 +143,9 @@ cpdef TableWithMetadata read_avro(
options: AvroReaderOptions
Settings for controlling reading behavior
"""
if stream is None:
stream = Stream()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We need to streamline this. Maybe we can use a default besides None, or perhaps we need a cdef stream_or_default(stream) function.

with nogil:
c_result = move(cpp_read_avro(options.c_obj))
c_result = move(cpp_read_avro(options.c_obj, stream.view()))

return TableWithMetadata.from_libcudf(c_result)
5 changes: 3 additions & 2 deletions python/pylibcudf/pylibcudf/io/csv.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ from pylibcudf.libcudf.io.types cimport (
table_with_metadata,
)
from pylibcudf.libcudf.types cimport size_type
from rmm._cuda.stream cimport Stream

cdef class CsvReaderOptions:
cdef csv_reader_options c_obj
Expand Down Expand Up @@ -61,7 +62,7 @@ cdef class CsvReaderOptionsBuilder:
cpdef CsvReaderOptionsBuilder dayfirst(self, bool dayfirst)
cpdef CsvReaderOptions build(self)

cpdef TableWithMetadata read_csv(CsvReaderOptions options)
cpdef TableWithMetadata read_csv(CsvReaderOptions options, Stream stream = *)

cdef class CsvWriterOptions:
cdef csv_writer_options c_obj
Expand All @@ -84,4 +85,4 @@ cdef class CsvWriterOptionsBuilder:
cpdef CsvWriterOptions build(self)


cpdef void write_csv(CsvWriterOptions options)
cpdef void write_csv(CsvWriterOptions options, Stream stream = *)
48 changes: 5 additions & 43 deletions python/pylibcudf/pylibcudf/io/csv.pyi
Original file line number Diff line number Diff line change
@@ -1,9 +1,9 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from collections.abc import Mapping

from typing_extensions import Self

from rmm._cuda.stream import Stream

from pylibcudf.io.types import (
CompressionType,
QuoteStyle,
Expand Down Expand Up @@ -56,48 +56,10 @@ class CsvReaderOptionsBuilder:
def build(self) -> CsvReaderOptions: ...

def read_csv(
source_info: SourceInfo,
*,
compression: CompressionType = CompressionType.AUTO,
byte_range_offset: int = 0,
byte_range_size: int = 0,
col_names: list[str] | None = None,
prefix: str = "",
mangle_dupe_cols: bool = True,
usecols: list[int] | list[str] | None = None,
nrows: int = -1,
skiprows: int = 0,
skipfooter: int = 0,
header: int = 0,
lineterminator: str = "\n",
delimiter: str | None = None,
thousands: str | None = None,
decimal: str = ".",
comment: str | None = None,
delim_whitespace: bool = False,
skipinitialspace: bool = False,
skip_blank_lines: bool = True,
quoting: QuoteStyle = QuoteStyle.MINIMAL,
quotechar: str = '"',
doublequote: bool = True,
parse_dates: list[str] | list[int] | None = None,
parse_hex: list[str] | list[int] | None = None,
# Technically this should be dict/list
# but using a fused type prevents using None as default
dtypes: Mapping[str, DataType] | list[DataType] | None = None,
true_values: list[str] | None = None,
false_values: list[str] | None = None,
na_values: list[str] | None = None,
keep_default_na: bool = True,
na_filter: bool = True,
dayfirst: bool = False,
# Note: These options are supported by the libcudf reader
# but are not exposed here since there is no demand for them
# on the Python side yet.
# detect_whitespace_around_quotes: bool = False,
# timestamp_type: DataType = DataType(type_id.EMPTY),
options: CsvReaderOptions,
stream: Stream = None,
) -> TableWithMetadata: ...
def write_csv(options: CsvWriterOptionsBuilder): ...
def write_csv(options: CsvWriterOptionsBuilder, stream: Stream = None): ...

class CsvWriterOptions:
def __init__(self): ...
Expand Down
16 changes: 11 additions & 5 deletions python/pylibcudf/pylibcudf/io/csv.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ from pylibcudf.libcudf.io.types cimport (
from pylibcudf.libcudf.types cimport data_type, size_type
from pylibcudf.types cimport DataType
from pylibcudf.table cimport Table
from rmm._cuda.stream cimport Stream

__all__ = [
"read_csv",
Expand Down Expand Up @@ -629,7 +630,8 @@ cdef class CsvReaderOptionsBuilder:


cpdef TableWithMetadata read_csv(
CsvReaderOptions options
CsvReaderOptions options,
Stream stream = None,
):
"""
Read from CSV format.
Expand All @@ -644,9 +646,11 @@ cpdef TableWithMetadata read_csv(
options: CsvReaderOptions
Settings for controlling reading behavior
"""
if stream is None:
stream = Stream()
cdef table_with_metadata c_result
with nogil:
c_result = move(cpp_read_csv(options.c_obj))
c_result = move(cpp_read_csv(options.c_obj, stream.view()))

cdef TableWithMetadata tbl_meta = TableWithMetadata.from_libcudf(c_result)
return tbl_meta
Expand Down Expand Up @@ -831,7 +835,8 @@ cdef class CsvWriterOptionsBuilder:


cpdef void write_csv(
CsvWriterOptions options
CsvWriterOptions options,
Stream stream = None,
):
"""
Write to CSV format.
Expand All @@ -846,6 +851,7 @@ cpdef void write_csv(
options: CsvWriterOptions
Settings for controlling writing behavior
"""

if stream is None:
stream = Stream()
with nogil:
cpp_write_csv(move(options.c_obj))
cpp_write_csv(move(options.c_obj), stream.view())
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
5 changes: 4 additions & 1 deletion python/pylibcudf/pylibcudf/io/json.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ from pylibcudf.libcudf.io.json cimport (
json_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from rmm._cuda.stream cimport Stream
from pylibcudf.table cimport Table


Expand All @@ -27,6 +28,7 @@ cpdef TableWithMetadata read_json(
bool prune_columns = *,
json_recovery_mode_t recovery_mode = *,
dict extra_parameters = *,
Stream stream = *,
)

cdef class JsonWriterOptions:
Expand All @@ -47,7 +49,7 @@ cdef class JsonWriterOptionsBuilder:
cpdef JsonWriterOptionsBuilder lines(self, bool val)
cpdef JsonWriterOptions build(self)

cpdef void write_json(JsonWriterOptions options)
cpdef void write_json(JsonWriterOptions options, Stream stream = *)

cpdef tuple chunked_read_json(
SourceInfo source_info,
Expand All @@ -58,4 +60,5 @@ cpdef tuple chunked_read_json(
bool prune_columns = *,
json_recovery_mode_t recovery_mode = *,
int chunk_size= *,
Stream stream = *,
)
6 changes: 5 additions & 1 deletion python/pylibcudf/pylibcudf/io/json.pyi
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@ from typing import TypeAlias

from typing_extensions import Self

from rmm._cuda.stream import Stream

from pylibcudf.column import Column
from pylibcudf.io.types import (
CompressionType,
Expand All @@ -30,6 +32,7 @@ def read_json(
mixed_types_as_string: bool = False,
prune_columns: bool = False,
recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL,
stream: Stream = None,
) -> TableWithMetadata: ...

class JsonWriterOptions:
Expand All @@ -46,7 +49,7 @@ class JsonWriterOptionsBuilder:
def lines(self, val: bool) -> Self: ...
def build(self) -> JsonWriterOptions: ...

def write_json(options: JsonWriterOptions) -> None: ...
def write_json(options: JsonWriterOptions, stream: Stream = None) -> None: ...
def chunked_read_json(
source_info: SourceInfo,
dtypes: list[NameAndType] | None = None,
Expand All @@ -56,4 +59,5 @@ def chunked_read_json(
prune_columns: bool = False,
recovery_mode: JSONRecoveryMode = JSONRecoveryMode.FAIL,
chunk_size: int = 100_000_000,
stream: Stream = None,
) -> tuple[list[Column], list[str], ChildNameToTypeMap]: ...
20 changes: 14 additions & 6 deletions python/pylibcudf/pylibcudf/io/json.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@ from pylibcudf.libcudf.io.types cimport (
)
from pylibcudf.libcudf.types cimport data_type, size_type
from pylibcudf.types cimport DataType
from rmm._cuda.stream cimport Stream


__all__ = [
"chunked_read_json",
Expand Down Expand Up @@ -132,6 +134,7 @@ cpdef tuple chunked_read_json(
bool prune_columns = False,
json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL,
int chunk_size=100_000_000,
Stream stream = None,
):
"""Reads an JSON file into a :py:class:`~.types.TableWithMetadata`.

Expand Down Expand Up @@ -183,7 +186,8 @@ cpdef tuple chunked_read_json(
prune_columns=prune_columns,
recovery_mode=recovery_mode,
)

if stream is None:
stream = Stream()
# Read JSON
cdef table_with_metadata c_result

Expand All @@ -197,7 +201,7 @@ cpdef tuple chunked_read_json(

try:
with nogil:
c_result = move(cpp_read_json(opts))
c_result = move(cpp_read_json(opts, stream.view()))
Matt711 marked this conversation as resolved.
Show resolved Hide resolved
except (ValueError, OverflowError):
break
if meta_names is None:
Expand Down Expand Up @@ -236,6 +240,7 @@ cpdef TableWithMetadata read_json(
bool prune_columns = False,
json_recovery_mode_t recovery_mode = json_recovery_mode_t.FAIL,
dict extra_parameters = None,
Stream stream = None,
):
"""Reads an JSON file into a :py:class:`~.types.TableWithMetadata`.

Expand Down Expand Up @@ -288,12 +293,13 @@ cpdef TableWithMetadata read_json(
recovery_mode=recovery_mode,
extra_parameters=extra_parameters,
)

if stream is None:
stream = Stream()
# Read JSON
cdef table_with_metadata c_result

with nogil:
c_result = move(cpp_read_json(opts))
c_result = move(cpp_read_json(opts, stream.view()))
Matt711 marked this conversation as resolved.
Show resolved Hide resolved

return TableWithMetadata.from_libcudf(c_result)

Expand Down Expand Up @@ -451,7 +457,7 @@ cdef class JsonWriterOptionsBuilder:
return json_options


cpdef void write_json(JsonWriterOptions options):
cpdef void write_json(JsonWriterOptions options, Stream stream = None):
"""
Writes a set of columns to JSON format.

Expand All @@ -464,5 +470,7 @@ cpdef void write_json(JsonWriterOptions options):
-------
None
"""
if stream is None:
stream = Stream()
with nogil:
cpp_write_json(options.c_obj)
cpp_write_json(options.c_obj, stream.view())
4 changes: 3 additions & 1 deletion python/pylibcudf/pylibcudf/io/orc.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ from pylibcudf.libcudf.io.types cimport (
compression_type,
statistics_freq,
)
from rmm._cuda.stream cimport Stream


cdef class OrcReaderOptions:
cdef orc_reader_options c_obj
Expand Down Expand Up @@ -93,7 +95,7 @@ cdef class OrcWriterOptionsBuilder:
cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta)
cpdef OrcWriterOptions build(self)

cpdef void write_orc(OrcWriterOptions options)
cpdef void write_orc(OrcWriterOptions options, Stream stream = *)

cdef class OrcChunkedWriter:
cdef unique_ptr[orc_chunked_writer] c_obj
Expand Down
Loading
Loading