Skip to content

Commit

Permalink
Merge branch 'branch-24.10' into join-stream-ordering
Browse files Browse the repository at this point in the history
  • Loading branch information
lamarrr authored Sep 17, 2024
2 parents a2bb2ae + 7285efb commit 0b976c7
Show file tree
Hide file tree
Showing 9 changed files with 145 additions and 36 deletions.
8 changes: 7 additions & 1 deletion java/src/main/java/ai/rapids/cudf/ColumnVector.java
Original file line number Diff line number Diff line change
Expand Up @@ -218,7 +218,13 @@ static long initViewHandle(DType type, int numRows, int nullCount,
od, vd, nullCount, numRows, childHandles);
}

static ColumnVector fromViewWithContiguousAllocation(long columnViewAddress, DeviceMemoryBuffer buffer) {
/**
* Creates a ColumnVector from a native column_view using a contiguous device allocation.
*
* @param columnViewAddress address of the native column_view
* @param buffer device buffer containing the data referenced by the column view
*/
public static ColumnVector fromViewWithContiguousAllocation(long columnViewAddress, DeviceMemoryBuffer buffer) {
return new ColumnVector(columnViewAddress, buffer);
}

Expand Down
26 changes: 25 additions & 1 deletion java/src/main/java/ai/rapids/cudf/ParquetWriterOptions.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/*
*
* Copyright (c) 2019-2021, NVIDIA CORPORATION.
* Copyright (c) 2019-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand All @@ -24,9 +24,13 @@
*/
public final class ParquetWriterOptions extends CompressionMetadataWriterOptions {
private final StatisticsFrequency statsGranularity;
private int rowGroupSizeRows;
private long rowGroupSizeBytes;

private ParquetWriterOptions(Builder builder) {
super(builder);
this.rowGroupSizeRows = builder.rowGroupSizeRows;
this.rowGroupSizeBytes = builder.rowGroupSizeBytes;
this.statsGranularity = builder.statsGranularity;
}

Expand All @@ -51,18 +55,38 @@ public static Builder builder() {
return new Builder();
}

public int getRowGroupSizeRows() {
return rowGroupSizeRows;
}

public long getRowGroupSizeBytes() {
return rowGroupSizeBytes;
}

public StatisticsFrequency getStatisticsFrequency() {
return statsGranularity;
}

public static class Builder extends CompressionMetadataWriterOptions.Builder
<Builder, ParquetWriterOptions> {
private int rowGroupSizeRows = 1000000; //Max of 1 million rows per row group
private long rowGroupSizeBytes = 128 * 1024 * 1024; //Max of 128MB per row group
private StatisticsFrequency statsGranularity = StatisticsFrequency.ROWGROUP;

public Builder() {
super();
}

public Builder withRowGroupSizeRows(int rowGroupSizeRows) {
this.rowGroupSizeRows = rowGroupSizeRows;
return this;
}

public Builder withRowGroupSizeBytes(long rowGroupSizeBytes) {
this.rowGroupSizeBytes = rowGroupSizeBytes;
return this;
}

public Builder withStatisticsFrequency(StatisticsFrequency statsGranularity) {
this.statsGranularity = statsGranularity;
return this;
Expand Down
68 changes: 40 additions & 28 deletions java/src/main/java/ai/rapids/cudf/Table.java
Original file line number Diff line number Diff line change
Expand Up @@ -332,20 +332,22 @@ private static native long[] readAvroFromDataSource(String[] filterColumnNames,

/**
* Setup everything to write parquet formatted data to a file.
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param filename local output path
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param rowGroupSizeRows max #rows in a row group
* @param rowGroupSizeBytes max #bytes in a row group
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param filename local output path
* @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd.
*/
private static native long writeParquetFileBegin(String[] columnNames,
Expand All @@ -355,6 +357,8 @@ private static native long writeParquetFileBegin(String[] columnNames,
String[] metadataKeys,
String[] metadataValues,
int compression,
int rowGroupSizeRows,
long rowGroupSizeBytes,
int statsFreq,
boolean[] isInt96,
int[] precisions,
Expand All @@ -366,20 +370,22 @@ private static native long writeParquetFileBegin(String[] columnNames,

/**
* Setup everything to write parquet formatted data to a buffer.
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param consumer consumer of host buffers produced.
* @param columnNames names that correspond to the table columns
* @param numChildren Children of the top level
* @param flatNumChildren flattened list of children per column
* @param nullable true if the column can have nulls else false
* @param metadataKeys Metadata key names to place in the Parquet file
* @param metadataValues Metadata values corresponding to metadataKeys
* @param compression native compression codec ID
* @param rowGroupSizeRows max #rows in a row group
* @param rowGroupSizeBytes max #bytes in a row group
* @param statsFreq native statistics frequency ID
* @param isInt96 true if timestamp type is int96
* @param precisions precision list containing all the precisions of the decimal types in
* the columns
* @param isMapValues true if a column is a map
* @param isBinaryValues true if a column is a binary
* @param consumer consumer of host buffers produced.
* @return a handle that is used in later calls to writeParquetChunk and writeParquetEnd.
*/
private static native long writeParquetBufferBegin(String[] columnNames,
Expand All @@ -389,6 +395,8 @@ private static native long writeParquetBufferBegin(String[] columnNames,
String[] metadataKeys,
String[] metadataValues,
int compression,
int rowGroupSizeRows,
long rowGroupSizeBytes,
int statsFreq,
boolean[] isInt96,
int[] precisions,
Expand Down Expand Up @@ -1820,6 +1828,8 @@ private ParquetTableWriter(ParquetWriterOptions options, File outputFile) {
options.getMetadataKeys(),
options.getMetadataValues(),
options.getCompressionType().nativeId,
options.getRowGroupSizeRows(),
options.getRowGroupSizeBytes(),
options.getStatisticsFrequency().nativeId,
options.getFlatIsTimeTypeInt96(),
options.getFlatPrecision(),
Expand All @@ -1840,6 +1850,8 @@ private ParquetTableWriter(ParquetWriterOptions options, HostBufferConsumer cons
options.getMetadataKeys(),
options.getMetadataValues(),
options.getCompressionType().nativeId,
options.getRowGroupSizeRows(),
options.getRowGroupSizeBytes(),
options.getStatisticsFrequency().nativeId,
options.getFlatIsTimeTypeInt96(),
options.getFlatPrecision(),
Expand Down
8 changes: 8 additions & 0 deletions java/src/main/native/src/TableJni.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2150,6 +2150,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env,
jobjectArray j_metadata_keys,
jobjectArray j_metadata_values,
jint j_compression,
jint j_row_group_size_rows,
jlong j_row_group_size_bytes,
jint j_stats_freq,
jbooleanArray j_isInt96,
jintArray j_precisions,
Expand Down Expand Up @@ -2205,6 +2207,8 @@ Java_ai_rapids_cudf_Table_writeParquetBufferBegin(JNIEnv* env,
chunked_parquet_writer_options::builder(sink)
.metadata(std::move(metadata))
.compression(static_cast<compression_type>(j_compression))
.row_group_size_rows(j_row_group_size_rows)
.row_group_size_bytes(j_row_group_size_bytes)
.stats_level(static_cast<statistics_freq>(j_stats_freq))
.key_value_metadata({kv_metadata})
.compression_statistics(stats)
Expand All @@ -2227,6 +2231,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env,
jobjectArray j_metadata_keys,
jobjectArray j_metadata_values,
jint j_compression,
jint j_row_group_size_rows,
jlong j_row_group_size_bytes,
jint j_stats_freq,
jbooleanArray j_isInt96,
jintArray j_precisions,
Expand Down Expand Up @@ -2280,6 +2286,8 @@ Java_ai_rapids_cudf_Table_writeParquetFileBegin(JNIEnv* env,
chunked_parquet_writer_options::builder(sink)
.metadata(std::move(metadata))
.compression(static_cast<compression_type>(j_compression))
.row_group_size_rows(j_row_group_size_rows)
.row_group_size_bytes(j_row_group_size_bytes)
.stats_level(static_cast<statistics_freq>(j_stats_freq))
.key_value_metadata({kv_metadata})
.compression_statistics(stats)
Expand Down
8 changes: 7 additions & 1 deletion java/src/test/java/ai/rapids/cudf/TableTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -9122,7 +9122,11 @@ void testParquetWriteToBufferChunked() {
columns.add(Columns.STRUCT.name);
WriteUtils.buildWriterOptions(optBuilder, columns);
ParquetWriterOptions options = optBuilder.build();
ParquetWriterOptions optionsNoCompress = optBuilder.withCompressionType(CompressionType.NONE).build();
ParquetWriterOptions optionsNoCompress =
optBuilder.withCompressionType(CompressionType.NONE)
.withRowGroupSizeRows(10000)
.withRowGroupSizeBytes(10000)
.build();
try (Table table0 = getExpectedFileTable(columns);
MyBufferConsumer consumer = new MyBufferConsumer()) {
try (TableWriter writer = Table.writeParquetChunked(options, consumer)) {
Expand Down Expand Up @@ -9208,6 +9212,8 @@ void testParquetWriteToFileUncompressedNoStats() throws IOException {
.withDecimalColumn("_c7", 4)
.withDecimalColumn("_c8", 6)
.withCompressionType(CompressionType.NONE)
.withRowGroupSizeRows(10000)
.withRowGroupSizeBytes(10000)
.withStatisticsFrequency(ParquetWriterOptions.StatisticsFrequency.NONE)
.build();
try (TableWriter writer = Table.writeParquetChunked(options, tempFile.getAbsoluteFile())) {
Expand Down
11 changes: 7 additions & 4 deletions python/cudf/cudf/core/reshape.py
Original file line number Diff line number Diff line change
Expand Up @@ -738,7 +738,8 @@ def get_dummies(
sparse : boolean, optional
Right now this is NON-FUNCTIONAL argument in rapids.
drop_first : boolean, optional
Right now this is NON-FUNCTIONAL argument in rapids.
Whether to get k-1 dummies out of k categorical levels by removing the
first level.
columns : sequence of str, optional
Names of columns to encode. If not provided, will attempt to encode all
columns. Note this is different from pandas default behavior, which
Expand Down Expand Up @@ -806,9 +807,6 @@ def get_dummies(
if sparse:
raise NotImplementedError("sparse is not supported yet")

if drop_first:
raise NotImplementedError("drop_first is not supported yet")

if isinstance(data, cudf.DataFrame):
encode_fallback_dtypes = ["object", "category"]

Expand Down Expand Up @@ -862,6 +860,7 @@ def get_dummies(
prefix=prefix_map.get(name, prefix),
prefix_sep=prefix_sep_map.get(name, prefix_sep),
dtype=dtype,
drop_first=drop_first,
)
result_data.update(col_enc_data)
return cudf.DataFrame._from_data(result_data, index=data.index)
Expand All @@ -874,6 +873,7 @@ def get_dummies(
prefix=prefix,
prefix_sep=prefix_sep,
dtype=dtype,
drop_first=drop_first,
)
return cudf.DataFrame._from_data(data, index=ser.index)

Expand Down Expand Up @@ -1256,6 +1256,7 @@ def _one_hot_encode_column(
prefix: str | None,
prefix_sep: str | None,
dtype: Dtype | None,
drop_first: bool,
) -> dict[str, ColumnBase]:
"""Encode a single column with one hot encoding. The return dictionary
contains pairs of (category, encodings). The keys may be prefixed with
Expand All @@ -1276,6 +1277,8 @@ def _one_hot_encode_column(
)
data = one_hot_encode(column, categories)

if drop_first and len(data):
data.pop(next(iter(data)))
if prefix is not None and prefix_sep is not None:
data = {f"{prefix}{prefix_sep}{col}": enc for col, enc in data.items()}
if dtype:
Expand Down
17 changes: 17 additions & 0 deletions python/cudf/cudf/tests/test_onehot.py
Original file line number Diff line number Diff line change
Expand Up @@ -161,3 +161,20 @@ def test_get_dummies_cats_deprecated():
df = cudf.DataFrame(range(3))
with pytest.warns(FutureWarning):
cudf.get_dummies(df, cats={0: [0, 1, 2]})


def test_get_dummies_drop_first_series():
result = cudf.get_dummies(cudf.Series(list("abcaa")), drop_first=True)
expected = pd.get_dummies(pd.Series(list("abcaa")), drop_first=True)
assert_eq(result, expected)


def test_get_dummies_drop_first_dataframe():
result = cudf.get_dummies(
cudf.DataFrame({"A": list("abcaa"), "B": list("bcaab")}),
drop_first=True,
)
expected = pd.get_dummies(
pd.DataFrame({"A": list("abcaa"), "B": list("bcaab")}), drop_first=True
)
assert_eq(result, expected)
18 changes: 17 additions & 1 deletion python/dask_cudf/dask_cudf/expr/_collection.py
Original file line number Diff line number Diff line change
Expand Up @@ -49,8 +49,24 @@ def to_dask_dataframe(self, **kwargs):

return self.to_backend("pandas", **kwargs)

def _prepare_cov_corr(self, min_periods, numeric_only):
# Upstream version of this method sets min_periods
# to 2 by default (which is not supported by cudf)
# TODO: Remove when cudf supports both min_periods
# and numeric_only
# See: https://github.com/rapidsai/cudf/issues/12626
# See: https://github.com/rapidsai/cudf/issues/9009
self._meta.cov(min_periods=min_periods)

frame = self
if numeric_only:
numerics = self._meta._get_numeric_data()
if len(numerics.columns) != len(self.columns):
frame = frame[list(numerics.columns)]
return frame, min_periods

# var can be removed if cudf#15179 is addressed.
# See: https://github.com/rapidsai/cudf/issues/15179
# See: https://github.com/rapidsai/cudf/issues/14935
def var(
self,
axis=0,
Expand Down
17 changes: 17 additions & 0 deletions python/dask_cudf/dask_cudf/tests/test_core.py
Original file line number Diff line number Diff line change
Expand Up @@ -1007,3 +1007,20 @@ def test_to_backend_simplify():
df2 = df.to_backend("cudf")[["y"]].simplify()
df3 = df[["y"]].to_backend("cudf").to_backend("cudf").simplify()
assert df2._name == df3._name


@pytest.mark.parametrize("numeric_only", [True, False])
@pytest.mark.parametrize("op", ["corr", "cov"])
def test_cov_corr(op, numeric_only):
df = cudf.DataFrame.from_dict(
{
"x": np.random.randint(0, 5, size=10),
"y": np.random.normal(size=10),
}
)
ddf = dd.from_pandas(df, npartitions=2)
res = getattr(ddf, op)(numeric_only=numeric_only)
# Use to_pandas until cudf supports numeric_only
# (See: https://github.com/rapidsai/cudf/issues/12626)
expect = getattr(df.to_pandas(), op)(numeric_only=numeric_only)
dd.assert_eq(res, expect)

0 comments on commit 0b976c7

Please sign in to comment.