Skip to content

Commit

Permalink
Merge branch 'branch-25.02' into wordpiece-tokenize
Browse files Browse the repository at this point in the history
  • Loading branch information
davidwendt committed Dec 18, 2024
2 parents 4b47be9 + 5160989 commit c62ee59
Show file tree
Hide file tree
Showing 24 changed files with 403 additions and 121 deletions.
13 changes: 13 additions & 0 deletions .github/workflows/pr.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ jobs:
# Please keep pr-builder as the top job here
pr-builder:
needs:
- check-nightly-ci
- changed-files
- checks
- conda-cpp-build
Expand Down Expand Up @@ -54,6 +55,18 @@ jobs:
- name: Telemetry setup
if: ${{ vars.TELEMETRY_ENABLED == 'true' }}
uses: rapidsai/shared-actions/telemetry-dispatch-stash-base-env-vars@main
check-nightly-ci:
# Switch to ubuntu-latest once it defaults to a version of Ubuntu that
# provides at least Python 3.11 (see
# https://docs.python.org/3/library/datetime.html#datetime.date.fromisoformat)
runs-on: ubuntu-24.04
env:
RAPIDS_GH_TOKEN: ${{ secrets.GITHUB_TOKEN }}
steps:
- name: Check if nightly CI is passing
uses: rapidsai/shared-actions/check_nightly_success/dispatch@main
with:
repo: cudf
changed-files:
secrets: inherit
needs: telemetry-setup
Expand Down
4 changes: 2 additions & 2 deletions cpp/src/io/parquet/reader_impl_preprocess.cu
Original file line number Diff line number Diff line change
Expand Up @@ -550,7 +550,7 @@ void decode_page_headers(pass_intermediate_data& pass,
{
CUDF_FUNC_RANGE();

auto iter = thrust::make_counting_iterator(0);
auto iter = thrust::counting_iterator<size_t>(0);
rmm::device_uvector<size_t> chunk_page_counts(pass.chunks.size() + 1, stream);
thrust::transform_exclusive_scan(
rmm::exec_policy_nosync(stream),
Expand All @@ -562,7 +562,7 @@ void decode_page_headers(pass_intermediate_data& pass,
return static_cast<size_t>(
i >= num_chunks ? 0 : chunks[i].num_data_pages + chunks[i].num_dict_pages);
}),
0,
size_t{0},
thrust::plus<size_t>{});
rmm::device_uvector<chunk_page_info> d_chunk_page_info(pass.chunks.size(), stream);
thrust::for_each(rmm::exec_policy_nosync(stream),
Expand Down
28 changes: 20 additions & 8 deletions python/cudf/cudf/io/orc.py
Original file line number Diff line number Diff line change
Expand Up @@ -240,15 +240,27 @@ def read_orc(
elif not isinstance(num_rows, int) or num_rows < -1:
raise TypeError("num_rows must be an int >= -1")

tbl_w_meta = plc.io.orc.read_orc(
plc.io.SourceInfo(filepaths_or_buffers),
columns,
stripes,
skiprows,
num_rows,
use_index,
dtype_to_pylibcudf_type(cudf.dtype(timestamp_type)),
options = (
plc.io.orc.OrcReaderOptions.builder(
plc.io.types.SourceInfo(filepaths_or_buffers)
)
.use_index(use_index)
.build()
)
if num_rows >= 0:
options.set_num_rows(num_rows)
if skiprows >= 0:
options.set_skip_rows(skiprows)
if stripes is not None and len(stripes) > 0:
options.set_stripes(stripes)
if timestamp_type is not None:
options.set_timestamp_type(
dtype_to_pylibcudf_type(cudf.dtype(timestamp_type))
)
if columns is not None and len(columns) > 0:
options.set_columns(columns)

tbl_w_meta = plc.io.orc.read_orc(options)

if isinstance(columns, list) and len(columns) == 0:
# When `columns=[]`, index needs to be
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ def __init__(
self.dtype = dtype
self.name = name
self.options = options
self.is_pointwise = False
self.children = children
if name not in Agg._SUPPORTED:
raise NotImplementedError(
Expand Down
7 changes: 6 additions & 1 deletion python/cudf_polars/cudf_polars/dsl/expressions/base.py
Original file line number Diff line number Diff line change
Expand Up @@ -36,9 +36,11 @@ class ExecutionContext(IntEnum):
class Expr(Node["Expr"]):
"""An abstract expression object."""

__slots__ = ("dtype",)
__slots__ = ("dtype", "is_pointwise")
dtype: plc.DataType
"""Data type of the expression."""
is_pointwise: bool
"""Whether this expression acts pointwise on its inputs."""
# This annotation is needed because of https://github.com/python/mypy/issues/17981
_non_child: ClassVar[tuple[str, ...]] = ("dtype",)
"""Names of non-child data (not Exprs) for reconstruction."""
Expand Down Expand Up @@ -164,6 +166,7 @@ def __init__(self, dtype: plc.DataType, error: str) -> None:
self.dtype = dtype
self.error = error
self.children = ()
self.is_pointwise = True


class NamedExpr:
Expand Down Expand Up @@ -243,6 +246,7 @@ class Col(Expr):
def __init__(self, dtype: plc.DataType, name: str) -> None:
self.dtype = dtype
self.name = name
self.is_pointwise = True
self.children = ()

def do_evaluate(
Expand Down Expand Up @@ -280,6 +284,7 @@ def __init__(
self.dtype = dtype
self.index = index
self.table_ref = table_ref
self.is_pointwise = True
self.children = (column,)

def do_evaluate(
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/binaryop.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ def __init__(
op = BinOp._BOOL_KLEENE_MAPPING.get(op, op)
self.op = op
self.children = (left, right)
self.is_pointwise = True
if not plc.binaryop.is_supported_operation(
self.dtype, left.dtype, right.dtype, op
):
Expand Down
8 changes: 8 additions & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/boolean.py
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,14 @@ def __init__(
self.options = options
self.name = name
self.children = children
self.is_pointwise = self.name not in (
BooleanFunction.Name.All,
BooleanFunction.Name.Any,
BooleanFunction.Name.IsDuplicated,
BooleanFunction.Name.IsFirstDistinct,
BooleanFunction.Name.IsLastDistinct,
BooleanFunction.Name.IsUnique,
)
if self.name is BooleanFunction.Name.IsIn and not all(
c.dtype == self.children[0].dtype for c in self.children
):
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/datetime.py
Original file line number Diff line number Diff line change
Expand Up @@ -114,6 +114,7 @@ def __init__(
self.options = options
self.name = name
self.children = children
self.is_pointwise = True
if self.name not in self._COMPONENT_MAP:
raise NotImplementedError(f"Temporal function {self.name}")

Expand Down
2 changes: 2 additions & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/literal.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ def __init__(self, dtype: plc.DataType, value: pa.Scalar[Any]) -> None:
assert value.type == plc.interop.to_arrow(dtype)
self.value = value
self.children = ()
self.is_pointwise = True

def do_evaluate(
self,
Expand Down Expand Up @@ -65,6 +66,7 @@ def __init__(self, dtype: plc.DataType, value: pl.Series) -> None:
data = value.to_arrow()
self.value = data.cast(dtypes.downcast_arrow_lists(data.type))
self.children = ()
self.is_pointwise = True

def get_hashable(self) -> Hashable:
"""Compute a hash of the column."""
Expand Down
2 changes: 2 additions & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/rolling.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ def __init__(self, dtype: plc.DataType, options: Any, agg: Expr) -> None:
self.dtype = dtype
self.options = options
self.children = (agg,)
self.is_pointwise = False
raise NotImplementedError("Rolling window not implemented")


Expand All @@ -35,4 +36,5 @@ def __init__(self, dtype: plc.DataType, options: Any, agg: Expr, *by: Expr) -> N
self.dtype = dtype
self.options = options
self.children = (agg, *by)
self.is_pointwise = False
raise NotImplementedError("Grouped rolling window not implemented")
2 changes: 2 additions & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/selection.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ class Gather(Expr):
def __init__(self, dtype: plc.DataType, values: Expr, indices: Expr) -> None:
self.dtype = dtype
self.children = (values, indices)
self.is_pointwise = False

def do_evaluate(
self,
Expand Down Expand Up @@ -71,6 +72,7 @@ class Filter(Expr):
def __init__(self, dtype: plc.DataType, values: Expr, indices: Expr):
self.dtype = dtype
self.children = (values, indices)
self.is_pointwise = True

def do_evaluate(
self,
Expand Down
2 changes: 2 additions & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/sorting.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(
self.dtype = dtype
self.options = options
self.children = (column,)
self.is_pointwise = False

def do_evaluate(
self,
Expand Down Expand Up @@ -71,6 +72,7 @@ def __init__(
self.dtype = dtype
self.options = options
self.children = (column, *by)
self.is_pointwise = False

def do_evaluate(
self,
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/string.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,7 @@ def __init__(
self.options = options
self.name = name
self.children = children
self.is_pointwise = True
self._validate_input()

def _validate_input(self):
Expand Down
1 change: 1 addition & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/ternary.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ def __init__(
) -> None:
self.dtype = dtype
self.children = (when, then, otherwise)
self.is_pointwise = True

def do_evaluate(
self,
Expand Down
10 changes: 10 additions & 0 deletions python/cudf_polars/cudf_polars/dsl/expressions/unary.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ class Cast(Expr):
def __init__(self, dtype: plc.DataType, value: Expr) -> None:
self.dtype = dtype
self.children = (value,)
self.is_pointwise = True
if not dtypes.can_cast(value.dtype, self.dtype):
raise NotImplementedError(
f"Can't cast {value.dtype.id().name} to {self.dtype.id().name}"
Expand Down Expand Up @@ -63,6 +64,7 @@ class Len(Expr):
def __init__(self, dtype: plc.DataType) -> None:
self.dtype = dtype
self.children = ()
self.is_pointwise = False

def do_evaluate(
self,
Expand Down Expand Up @@ -147,6 +149,14 @@ def __init__(
self.name = name
self.options = options
self.children = children
self.is_pointwise = self.name not in (
"cum_min",
"cum_max",
"cum_prod",
"cum_sum",
"drop_nulls",
"unique",
)

if self.name not in UnaryFunction._supported_fns:
raise NotImplementedError(f"Unary function {name=}")
Expand Down
14 changes: 7 additions & 7 deletions python/cudf_polars/cudf_polars/dsl/traversal.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from cudf_polars.typing import U_contra, V_co

if TYPE_CHECKING:
from collections.abc import Callable, Generator, Mapping, MutableMapping
from collections.abc import Callable, Generator, Mapping, MutableMapping, Sequence

from cudf_polars.typing import GenericTransformer, NodeT

Expand All @@ -23,22 +23,22 @@
]


def traversal(node: NodeT) -> Generator[NodeT, None, None]:
def traversal(nodes: Sequence[NodeT]) -> Generator[NodeT, None, None]:
"""
Pre-order traversal of nodes in an expression.
Parameters
----------
node
Root of expression to traverse.
nodes
Roots of expressions to traverse.
Yields
------
Unique nodes in the expression, parent before child, children
Unique nodes in the expressions, parent before child, children
in-order from left to right.
"""
seen = {node}
lifo = [node]
seen = set(nodes)
lifo = list(nodes)

while lifo:
node = lifo.pop()
Expand Down
12 changes: 9 additions & 3 deletions python/cudf_polars/cudf_polars/experimental/parallel.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,9 @@
from functools import reduce
from typing import TYPE_CHECKING, Any

import cudf_polars.experimental.io # noqa: F401
from cudf_polars.dsl.ir import IR, Cache, Projection, Union
import cudf_polars.experimental.io
import cudf_polars.experimental.select # noqa: F401
from cudf_polars.dsl.ir import IR, Cache, Filter, HStack, Projection, Select, Union
from cudf_polars.dsl.traversal import CachingVisitor, traversal
from cudf_polars.experimental.base import PartitionInfo, _concat, get_key_name
from cudf_polars.experimental.dispatch import (
Expand Down Expand Up @@ -112,7 +113,7 @@ def task_graph(
"""
graph = reduce(
operator.or_,
(generate_ir_tasks(node, partition_info) for node in traversal(ir)),
(generate_ir_tasks(node, partition_info) for node in traversal([ir])),
)

key_name = get_key_name(ir)
Expand Down Expand Up @@ -226,6 +227,8 @@ def _lower_ir_pwise(

lower_ir_node.register(Projection, _lower_ir_pwise)
lower_ir_node.register(Cache, _lower_ir_pwise)
lower_ir_node.register(Filter, _lower_ir_pwise)
lower_ir_node.register(HStack, _lower_ir_pwise)


def _generate_ir_tasks_pwise(
Expand All @@ -245,3 +248,6 @@ def _generate_ir_tasks_pwise(

generate_ir_tasks.register(Projection, _generate_ir_tasks_pwise)
generate_ir_tasks.register(Cache, _generate_ir_tasks_pwise)
generate_ir_tasks.register(Filter, _generate_ir_tasks_pwise)
generate_ir_tasks.register(HStack, _generate_ir_tasks_pwise)
generate_ir_tasks.register(Select, _generate_ir_tasks_pwise)
36 changes: 36 additions & 0 deletions python/cudf_polars/cudf_polars/experimental/select.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
# SPDX-FileCopyrightText: Copyright (c) 2024 NVIDIA CORPORATION & AFFILIATES.
# SPDX-License-Identifier: Apache-2.0
"""Parallel Select Logic."""

from __future__ import annotations

from typing import TYPE_CHECKING

from cudf_polars.dsl.ir import Select
from cudf_polars.dsl.traversal import traversal
from cudf_polars.experimental.dispatch import lower_ir_node

if TYPE_CHECKING:
from collections.abc import MutableMapping

from cudf_polars.dsl.ir import IR
from cudf_polars.experimental.base import PartitionInfo
from cudf_polars.experimental.parallel import LowerIRTransformer


@lower_ir_node.register(Select)
def _(
ir: Select, rec: LowerIRTransformer
) -> tuple[IR, MutableMapping[IR, PartitionInfo]]:
child, partition_info = rec(ir.children[0])
pi = partition_info[child]
if pi.count > 1 and not all(
expr.is_pointwise for expr in traversal([e.value for e in ir.exprs])
):
# TODO: Handle non-pointwise expressions.
raise NotImplementedError(
f"Selection {ir} does not support multiple partitions."
)
new_node = ir.reconstruct([child])
partition_info[new_node] = pi
return new_node, partition_info
Loading

0 comments on commit c62ee59

Please sign in to comment.