Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: One-pass column optimization #491

Draft
wants to merge 29 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
b18b3af
start
martindurant Mar 28, 2024
3cdc778
Remove unused
martindurant Mar 28, 2024
c57693c
Pass reports around
martindurant Apr 4, 2024
0c367df
remember to commit
martindurant Apr 5, 2024
9c654b1
first working (for parquet)
martindurant Apr 9, 2024
e6c5ce3
Merge branch 'main' into one-pass
martindurant Apr 11, 2024
f56f77a
stop
martindurant Apr 12, 2024
75b4416
most pass
martindurant Apr 15, 2024
a5c319d
fix most
martindurant Apr 18, 2024
243fbc1
probably better
martindurant Apr 19, 2024
6eebf87
Reinstate necessary_columns (needs doc)
martindurant Apr 22, 2024
8bbe409
Merge branch 'main' into one-pass
martindurant Apr 22, 2024
43b2e43
pass buffer names around, not columns
martindurant Apr 25, 2024
e5828fb
Clear cache between tests
martindurant May 13, 2024
df0cf2f
Merge branch 'main' into one-pass
martindurant May 13, 2024
b593f87
Another one squashed
martindurant May 21, 2024
e410b61
Squash errors that only show when uproot.dask and hist.dask are insta…
martindurant May 22, 2024
cd537e2
fix uproot
martindurant May 23, 2024
baf6f46
fix report
martindurant May 27, 2024
e29e929
if meta fails
martindurant Jun 6, 2024
f61fdd7
rev
martindurant Jul 23, 2024
2c3abd0
concat enforce condition
martindurant Jul 24, 2024
04abbc8
temp
martindurant Jul 29, 2024
d876f00
squached some
martindurant Jul 30, 2024
8e1d507
add note
martindurant Jul 30, 2024
d1922ab
Merge branch 'main' into one-pass
martindurant Jul 30, 2024
fc9589b
Fix concat form comparison
martindurant Jul 31, 2024
961dd0c
one more squashed
martindurant Jul 31, 2024
c8b254b
fix IO report
martindurant Aug 2, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -165,5 +165,5 @@ omit = [
source = ["src/"]

[tool.ruff]
ignore = ["E501", "E402"]
per-file-ignores = {"__init__.py" = ["E402", "F401"]}
lint.ignore = ["E501", "E402"]
lint.per-file-ignores = {"__init__.py" = ["E402", "F401"]}
10 changes: 2 additions & 8 deletions src/dask_awkward/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,14 +19,7 @@
partition_compatibility,
)
from dask_awkward.lib.describe import backend, fields
from dask_awkward.lib.inspect import (
report_necessary_buffers,
report_necessary_columns,
sample,
)

necessary_columns = report_necessary_columns # Export for backwards compatibility.

from dask_awkward.lib.inspect import sample
from dask_awkward.lib.io.io import (
from_awkward,
from_dask_array,
Expand All @@ -42,6 +35,7 @@
from dask_awkward.lib.io.parquet import from_parquet, to_parquet
from dask_awkward.lib.io.text import from_text
from dask_awkward.lib.operations import concatenate
from dask_awkward.lib.optimize import necessary_columns
from dask_awkward.lib.reducers import (
all,
any,
Expand Down
4 changes: 0 additions & 4 deletions src/dask_awkward/layers/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
AwkwardMaterializedLayer,
AwkwardTreeReductionLayer,
ImplementsIOFunction,
ImplementsProjection,
IOFunctionWithMocking,
io_func_implements_projection,
)

Expand All @@ -14,8 +12,6 @@
"AwkwardBlockwiseLayer",
"AwkwardMaterializedLayer",
"AwkwardTreeReductionLayer",
"ImplementsProjection",
"ImplementsIOFunction",
"IOFunctionWithMocking",
"io_func_implements_projection",
)
229 changes: 10 additions & 219 deletions src/dask_awkward/layers/layers.py
Original file line number Diff line number Diff line change
@@ -1,23 +1,14 @@
from __future__ import annotations

import copy
from collections.abc import Callable, Mapping
from typing import TYPE_CHECKING, Any, Literal, Protocol, TypeVar, Union, cast
from typing import Any, Protocol, TypeVar

from dask.blockwise import Blockwise, BlockwiseDepDict, blockwise_token
from dask.highlevelgraph import MaterializedLayer
from dask.layers import DataFrameTreeReduction
from typing_extensions import TypeAlias

from dask_awkward.utils import LazyInputsDict

if TYPE_CHECKING:
from awkward import Array as AwkwardArray
from awkward._nplikes.typetracer import TypeTracerReport


BackendT: TypeAlias = Union[Literal["cpu"], Literal["jax"], Literal["cuda"]]


class AwkwardBlockwiseLayer(Blockwise):
"""Just like upstream Blockwise, except we override pickling"""
Expand All @@ -30,17 +21,11 @@ def from_blockwise(cls, layer: Blockwise) -> AwkwardBlockwiseLayer:
ob.__dict__.update(layer.__dict__)
return ob

def mock(self) -> AwkwardBlockwiseLayer:
layer = copy.copy(self)
nb = layer.numblocks
layer.numblocks = {k: tuple(1 for _ in v) for k, v in nb.items()}
layer.__dict__.pop("_dims", None)
return layer

def __getstate__(self) -> dict:
# Indicator that this layer has been serialised
state = self.__dict__.copy()
# Indicator that this layer has been serialised
state["has_been_unpickled"] = True
state.pop("meta", None) # this is a typetracer
return state

def __repr__(self) -> str:
Expand All @@ -54,59 +39,13 @@ def __call__(self, *args, **kwargs): ...
T = TypeVar("T")


class ImplementsMocking(ImplementsIOFunction, Protocol):
def mock(self) -> AwkwardArray: ...


class ImplementsMockEmpty(ImplementsIOFunction, Protocol):
def mock_empty(self, backend: BackendT) -> AwkwardArray: ...


class ImplementsReport(ImplementsIOFunction, Protocol):
@property
def return_report(self) -> bool: ...


class ImplementsProjection(ImplementsMocking, Protocol[T]):
def prepare_for_projection(self) -> tuple[AwkwardArray, TypeTracerReport, T]: ...

def project(self, report: TypeTracerReport, state: T) -> ImplementsIOFunction: ...


class ImplementsNecessaryColumns(ImplementsProjection[T], Protocol):
def necessary_columns(
self, report: TypeTracerReport, state: T
) -> frozenset[str]: ...


class IOFunctionWithMocking(ImplementsMocking, ImplementsIOFunction):
def __init__(self, meta: AwkwardArray, io_func: ImplementsIOFunction):
self._meta = meta
self._io_func = io_func

def __getstate__(self) -> dict:
state = self.__dict__.copy()
state["_meta"] = None
return state

def __call__(self, *args, **kwargs):
return self._io_func(*args, **kwargs)

def mock(self) -> AwkwardArray:
assert self._meta is not None
return self._meta


def io_func_implements_projection(func: ImplementsIOFunction) -> bool:
return hasattr(func, "prepare_for_projection")


def io_func_implements_mocking(func: ImplementsIOFunction) -> bool:
return hasattr(func, "mock")


def io_func_implements_mock_empty(func: ImplementsIOFunction) -> bool:
return hasattr(func, "mock_empty")
return hasattr(func, "project")


def io_func_implements_columnar(func: ImplementsIOFunction) -> bool:
Expand Down Expand Up @@ -172,94 +111,15 @@ def __init__(
def __repr__(self) -> str:
return f"AwkwardInputLayer<{self.output}>"

@property
def is_projectable(self) -> bool:
# isinstance(self.io_func, ImplementsProjection)
return (
io_func_implements_projection(self.io_func) and not self.has_been_unpickled
)

@property
def is_mockable(self) -> bool:
# isinstance(self.io_func, ImplementsMocking)
return io_func_implements_mocking(self.io_func)

@property
def is_columnar(self) -> bool:
return io_func_implements_columnar(self.io_func)

def mock(self) -> AwkwardInputLayer:
assert self.is_mockable
return AwkwardInputLayer(
name=self.name,
inputs=[None][: int(list(self.numblocks.values())[0][0])],
io_func=lambda *_, **__: cast(ImplementsMocking, self.io_func).mock(),
label=self.label,
produces_tasks=self.produces_tasks,
creation_info=self.creation_info,
annotations=self.annotations,
)

def prepare_for_projection(self) -> tuple[AwkwardInputLayer, TypeTracerReport, T]:
"""Mock the input layer as starting with a data-less typetracer.
This method is used to create new dask task graphs that
operate purely on typetracer Arrays (that is, array with
awkward structure but without real data buffers). This allows
us to test which parts of a real awkward array will be used in
a real computation. We do this by running a graph which starts
with mocked AwkwardInputLayers.

We mock an AwkwardInputLayer in these steps:
1. Ask the IO function to prepare a new meta array, and return
any transient state.
2. Build a new AwkwardInputLayer whose IO function just returns
this meta (typetracer) array
3. Return the new input layer and the transient state

When this new layer is added to a dask task graph and that
graph is computed, the report object will be mutated.
Inspecting the report object after the compute tells us which
buffers from the original form would be required for a real
compute with the same graph.
Returns
-------
AwkwardInputLayer
Copy of the input layer with data-less input.
TypeTracerReport
The report object used to track touched buffers.
Any
The black-box state object returned by the IO function.
"""
assert self.is_projectable
new_meta_array, report, state = cast(
ImplementsProjection, self.io_func
).prepare_for_projection()

new_return = new_meta_array
if io_func_implements_report(self.io_func):
if cast(ImplementsReport, self.io_func).return_report:
new_return = (new_meta_array, type(new_meta_array)([]))

new_input_layer = AwkwardInputLayer(
name=self.name,
inputs=[None][: int(list(self.numblocks.values())[0][0])],
io_func=AwkwardTokenizable(new_return, self.name),
label=self.label,
produces_tasks=self.produces_tasks,
creation_info=self.creation_info,
annotations=self.annotations,
)
return new_input_layer, report, state

def project(
self,
report: TypeTracerReport,
state: T,
) -> AwkwardInputLayer:
assert self.is_projectable
io_func = cast(ImplementsProjection, self.io_func).project(
report=report, state=state
)
def project(self, columns: list[str]) -> AwkwardInputLayer:
if hasattr(self.io_func, "project"):
io_func = self.io_func.project(columns)
else:
return self
return AwkwardInputLayer(
name=self.name,
inputs=self.inputs,
Expand All @@ -270,12 +130,6 @@ def project(
annotations=self.annotations,
)

def necessary_columns(self, report: TypeTracerReport, state: T) -> frozenset[str]:
assert self.is_columnar
return cast(ImplementsNecessaryColumns, self.io_func).necessary_columns(
report=report, state=state
)


class AwkwardMaterializedLayer(MaterializedLayer):
def __init__(
Expand All @@ -290,68 +144,5 @@ def __init__(
self.fn = fn
super().__init__(mapping, **kwargs)

def mock(self) -> MaterializedLayer:
mapping = copy.copy(self.mapping)
if not mapping:
# no partitions at all
return self
name = next(iter(mapping))[0]

npln = len(self.previous_layer_names)
# one previous layer name
#
# this case is used for mocking repartition or slicing where
# we maybe have multiple partitions that need to be included
# in a task.
if npln == 1:
prev_name: str = self.previous_layer_names[0]
if (name, 0) in mapping:
task = mapping[(name, 0)]
task = tuple(
(
(prev_name, 0)
if isinstance(v, tuple) and len(v) == 2 and v[0] == prev_name
else v
)
for v in task
)

# when using Array.partitions we need to mock that we
# just want the first partition.
if len(task) == 2 and isinstance(task[1], int) and task[1] > 0:
task = (task[0], 0)
return MaterializedLayer({(name, 0): task})
return self

# zero previous layers; this is likely a known scalar.
#
# we just use the existing mapping
elif npln == 0:
return MaterializedLayer({(name, 0): mapping[(name, 0)]})

# more than one previous_layer_names
#
# this case is needed for dak.concatenate on axis=0; we need
# the first partition of _each_ of the previous layer names!
else:
if self.fn is None:
raise ValueError(
"For multiple previous layers the fn argument cannot be None."
)
name0s = tuple((name, 0) for name in self.previous_layer_names)
task = (self.fn, *name0s)
return MaterializedLayer({(name, 0): task})


class AwkwardTreeReductionLayer(DataFrameTreeReduction):
def mock(self) -> AwkwardTreeReductionLayer:
return AwkwardTreeReductionLayer(
name=self.name,
name_input=self.name_input,
npartitions_input=1,
concat_func=self.concat_func,
tree_node_func=self.tree_node_func,
finalize_func=self.finalize_func,
split_every=self.split_every,
tree_node_name=self.tree_node_name,
)
class AwkwardTreeReductionLayer(DataFrameTreeReduction): ...
7 changes: 2 additions & 5 deletions src/dask_awkward/lib/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,11 +8,7 @@
partition_compatibility,
)
from dask_awkward.lib.describe import backend, fields
from dask_awkward.lib.inspect import (
report_necessary_buffers,
report_necessary_columns,
sample,
)
from dask_awkward.lib.inspect import sample
from dask_awkward.lib.io.io import (
from_awkward,
from_dask_array,
Expand All @@ -28,6 +24,7 @@
from dask_awkward.lib.io.parquet import from_parquet, to_parquet
from dask_awkward.lib.io.text import from_text
from dask_awkward.lib.operations import concatenate
from dask_awkward.lib.optimize import necessary_columns
from dask_awkward.lib.reducers import (
all,
any,
Expand Down
Loading
Loading