Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add write_parquet to pylibcudf #17263

Merged
merged 30 commits into from
Nov 22, 2024
Merged
Show file tree
Hide file tree
Changes from 18 commits
Commits
Show all changes
30 commits
Select commit Hold shift + click to select a range
ff0d51f
Add writer, supporting objects, and tests; compilation passes
mroeschke Nov 7, 2024
bb2c258
Add fix test, add python method for construction
mroeschke Nov 7, 2024
625e254
Merge remote-tracking branch 'upstream/branch-24.12' into plc/io/parq…
mroeschke Nov 7, 2024
4b8402d
Merge remote-tracking branch 'upstream/branch-24.12' into plc/io/parq…
mroeschke Nov 14, 2024
097decb
Use HostBuffer, rename to c_obj
mroeschke Nov 15, 2024
3133ee1
fix up tests
mroeschke Nov 15, 2024
35984c9
Merge remote-tracking branch 'upstream/branch-24.12' into plc/io/parq…
mroeschke Nov 15, 2024
14c4501
keep table and sink references alive
mroeschke Nov 15, 2024
46cbb46
Return memoryview
mroeschke Nov 15, 2024
46db84e
Adjust test too
mroeschke Nov 15, 2024
efe24d4
Add back contiguous split changes
mroeschke Nov 15, 2024
0d0b5ba
Merge remote-tracking branch 'upstream/branch-24.12' into plc/io/parq…
mroeschke Nov 15, 2024
a0fdcfa
Allow construction of HostBuffer from nullptr
wence- Nov 19, 2024
4d802f2
Parquet writing does not support gzip compression
wence- Nov 19, 2024
91e847e
Use valid values for row_group/max_page_size_bytes
wence- Nov 19, 2024
f2a905e
Skip zero-sized table and non-None partition info
wence- Nov 19, 2024
010c1da
Add type stub information
wence- Nov 19, 2024
da6b730
Merge branch 'branch-24.12' into plc/io/parquet_writer_only
wence- Nov 19, 2024
81c9839
Address reviews
mroeschke Nov 20, 2024
c057bf7
merge conflict
Matt711 Nov 20, 2024
85a5505
Update python/pylibcudf/pylibcudf/io/parquet.pxd
mroeschke Nov 21, 2024
2f65032
Update python/pylibcudf/pylibcudf/io/parquet.pyi
mroeschke Nov 21, 2024
44a7bba
Update python/pylibcudf/pylibcudf/io/parquet.pyx
mroeschke Nov 21, 2024
1564fae
Update python/pylibcudf/pylibcudf/io/parquet.pyx
mroeschke Nov 21, 2024
91ed038
Update python/pylibcudf/pylibcudf/io/parquet.pyx
mroeschke Nov 21, 2024
2faa151
Update python/pylibcudf/pylibcudf/io/parquet.pyx
mroeschke Nov 21, 2024
ed096f3
Update python/pylibcudf/pylibcudf/io/parquet.pyx
mroeschke Nov 21, 2024
66f555f
Update python/pylibcudf/pylibcudf/io/parquet.pyx
mroeschke Nov 21, 2024
0df14b1
address docstring review, reduce test parameterization
mroeschke Nov 21, 2024
741c95c
Merge remote-tracking branch 'upstream/branch-24.12' into plc/io/parq…
mroeschke Nov 21, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 13 additions & 0 deletions python/pylibcudf/pylibcudf/contiguous_split.pxd
Original file line number Diff line number Diff line change
@@ -1,12 +1,25 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libc.stdint cimport uint8_t
from libcpp.memory cimport unique_ptr
from libcpp.vector cimport vector
from pylibcudf.libcudf.contiguous_split cimport packed_columns

from .gpumemoryview cimport gpumemoryview
from .table cimport Table


cdef class HostBuffer:
cdef unique_ptr[vector[uint8_t]] c_obj
cdef size_t nbytes
cdef Py_ssize_t[1] shape
cdef Py_ssize_t[1] strides

@staticmethod
cdef HostBuffer from_unique_ptr(
unique_ptr[vector[uint8_t]] vec
)

cdef class PackedColumns:
cdef unique_ptr[packed_columns] c_obj

Expand Down
13 changes: 5 additions & 8 deletions python/pylibcudf/pylibcudf/contiguous_split.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -29,26 +29,23 @@ __all__ = [

cdef class HostBuffer:
"""Owning host buffer that implements the buffer protocol"""
cdef unique_ptr[vector[uint8_t]] c_obj
cdef size_t nbytes
cdef Py_ssize_t[1] shape
cdef Py_ssize_t[1] strides

@staticmethod
cdef HostBuffer from_unique_ptr(
unique_ptr[vector[uint8_t]] vec
):
cdef HostBuffer out = HostBuffer()
cdef HostBuffer out = HostBuffer.__new__(HostBuffer)
# Allow construction from nullptr
out.nbytes = 0 if vec.get() == NULL else dereference(vec).size()
out.c_obj = move(vec)
out.nbytes = dereference(out.c_obj).size()
out.shape[0] = out.nbytes
out.strides[0] = 1
return out

__hash__ = None

def __getbuffer__(self, Py_buffer *buffer, int flags):
buffer.buf = dereference(self.c_obj).data()
# Empty vec produces empty buffer
buffer.buf = NULL if self.nbytes == 0 else dereference(self.c_obj).data()
buffer.format = NULL # byte
buffer.internal = NULL
buffer.itemsize = 1
Expand Down
62 changes: 60 additions & 2 deletions python/pylibcudf/pylibcudf/io/parquet.pxd
Original file line number Diff line number Diff line change
@@ -1,14 +1,26 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libc.stdint cimport int64_t
from libc.stdint cimport int64_t, uint8_t
from libcpp cimport bool
from libcpp.memory cimport unique_ptr
from libcpp.vector cimport vector
from pylibcudf.expressions cimport Expression
from pylibcudf.io.types cimport SourceInfo, TableWithMetadata
from pylibcudf.io.types cimport (
compression_type,
dictionary_policy,
statistics_freq,
SinkInfo,
SourceInfo,
TableInputMetadata,
TableWithMetadata,
)
from pylibcudf.libcudf.io.parquet cimport (
chunked_parquet_reader as cpp_chunked_parquet_reader,
parquet_writer_options,
parquet_writer_options_builder,
)
from pylibcudf.libcudf.types cimport size_type
from pylibcudf.table cimport Table
from pylibcudf.types cimport DataType


Expand All @@ -33,3 +45,49 @@ cpdef read_parquet(
# ReaderColumnSchema reader_column_schema = *,
# DataType timestamp_type = *
)

cdef class ParquetWriterOptions:
cdef parquet_writer_options c_obj
cdef Table table_ref
cdef SinkInfo sink_ref

cpdef void set_partitions(self, list partitions)

cpdef void set_column_chunks_file_paths(self, list file_paths)

cpdef void set_row_group_size_bytes(self, int size_bytes)

cpdef void set_row_group_size_rows(self, int size_rows)

cpdef void set_max_page_size_bytes(self, int size_bytes)

cpdef void set_max_page_size_rows(self, int size_rows)

cpdef void set_max_dictionary_size(self, int size_rows)
bdice marked this conversation as resolved.
Show resolved Hide resolved

cdef class ParquetWriterOptionsBuilder:
cdef parquet_writer_options_builder c_obj
cdef Table table_ref
cdef SinkInfo sink_ref

cpdef ParquetWriterOptionsBuilder metadata(self, TableInputMetadata metadata)

cpdef ParquetWriterOptionsBuilder key_value_metadata(self, list metadata)

cpdef ParquetWriterOptionsBuilder compression(self, compression_type compression)

cpdef ParquetWriterOptionsBuilder stats_level(self, statistics_freq sf)

cpdef ParquetWriterOptionsBuilder int96_timestamps(self, bool enabled)

cpdef ParquetWriterOptionsBuilder write_v2_headers(self, bool enabled)

cpdef ParquetWriterOptionsBuilder dictionary_policy(self, dictionary_policy val)

cpdef ParquetWriterOptionsBuilder utc_timestamps(self, bool enabled)

cpdef ParquetWriterOptionsBuilder write_arrow_schema(self, bool enabled)

cpdef ParquetWriterOptions build(self)

cpdef memoryview write_parquet(ParquetWriterOptions options)
46 changes: 45 additions & 1 deletion python/pylibcudf/pylibcudf/io/parquet.pyi
Original file line number Diff line number Diff line change
@@ -1,7 +1,20 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from collections.abc import Mapping
from typing import Self

from pylibcudf.expressions import Expression
from pylibcudf.io.types import SourceInfo, TableWithMetadata
from pylibcudf.io.types import (
CompressionType,
DictionaryPolicy,
PartitionInfo,
SinkInfo,
SourceInfo,
StatisticsFreq,
TableInputMetadata,
TableWithMetadata,
)
from pylibcudf.table import Table

class ChunkedParquetReader:
def __init__(
Expand Down Expand Up @@ -34,3 +47,34 @@ def read_parquet(
# reader_column_schema: ReaderColumnSchema = *,
# timestamp_type: DataType = *
) -> TableWithMetadata: ...

class ParquetWriterOptions:
def __init__(self): ...
@staticmethod
def builder(
sink: SinkInfo, table: Table
) -> ParquetWriterOptionsBuilder: ...
def set_partitions(self, partitions: list[PartitionInfo]) -> None: ...
def set_column_chunks_file_paths(self, file_paths: list[str]) -> None: ...
def set_row_group_size_bytes(self, size_bytes: int) -> None: ...
def set_row_group_size_rows(self, size_rows: int) -> None: ...
def set_max_page_size_bytes(self, size_bytes: int) -> None: ...
def set_max_page_size_rows(self, size_rows: int) -> None: ...
def set_max_dictionary_size(self, size_rows: int) -> None: ...
mroeschke marked this conversation as resolved.
Show resolved Hide resolved

class ParquetWriterOptionsBuilder:
def __init__(self): ...
def metadata(self, metadata: TableInputMetadata) -> Self: ...
def key_value_metadata(
self, metadata: list[Mapping[str, str]]
) -> Self: ...
def compression(self, compression: CompressionType) -> Self: ...
def stats_level(self, sf: StatisticsFreq) -> Self: ...
def int96_timestamps(self, enabled: bool) -> Self: ...
def write_v2_headers(self, enabled: bool) -> Self: ...
def dictionary_policy(self, val: DictionaryPolicy) -> Self: ...
def utc_timestamps(self, enabled: bool) -> Self: ...
def write_arrow_schema(self, enabled: bool) -> Self: ...
def build(self) -> ParquetWriterOptions: ...

def write_parquet(options: ParquetWriterOptions) -> memoryview: ...
Loading
Loading