Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: add tree to virtual array conversion #1393

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
94 changes: 86 additions & 8 deletions src/uproot/_dask.py
Original file line number Diff line number Diff line change
Expand Up @@ -950,11 +950,93 @@ def load_buffers(

return container

def load_virtual_arrays(
self,
tree: HasBranches,
keys: frozenset[str],
start: int,
stop: int,
decompression_executor,
interpretation_executor,
options: Any,
) -> Mapping[str, AwkArray]:
awkward = uproot.extras.awkward()

class Generator:
def __init__(self, branch, attribute):
self.branch = branch
self.attribute = attribute

def __repr__(self):
return f"Generator({self.branch}, {self.attribute})"

def __call__(self):
layout = self.branch.array(
entry_start=start,
entry_stop=stop,
interpretation_executor=interpretation_executor,
decompression_executor=decompression_executor,
library="ak",
ak_add_doc=options.get("ak_add_doc"),
).layout
# this is a bit of a hack, but it works for now
if isinstance(layout, awkward.contents.NumpyArray):
return layout.data
elif isinstance(layout, awkward.contents.ListOffsetArray):
if self.attribute == "data":
return layout.content.data
elif self.attribute == "offsets":
return layout.offsets.data
else:
raise NotImplementedError()
else:
raise NotImplementedError()

container = {}
for buffer_key, _ in self._form.expected_from_buffers().items():
form_key, attribute = self.parse_buffer_key(buffer_key)
branch_name = self._form_key_to_key[form_key]
branch = tree[branch_name]
container[buffer_key] = Generator(branch, attribute)

return container


def form_with_unique_keys(form: Form, key: str) -> Form:
awkward = uproot.extras.awkward()

def impl(form: Form, key: str) -> None:
# Set form key
form.form_key = key

# If the form is a record we need to loop over all fields in the
# record and set form that include the field name; this will keep
# recursing as well.
if form.is_record:
for field in form.fields:
impl(form.content(field), f"{key}.{field}")

elif form.is_union:
for i, entry in enumerate(form.contents):
impl(entry, f"{key}#{i}")

# NumPy like array is easy
elif form.is_numpy or form.is_unknown:
pass

# Anything else grab the content and keep recursing
else:
impl(form.content, f"{key}.content")

# Perform a "deep" copy without preserving references
form = awkward.forms.from_dict(form.to_dict())
impl(form, key)
return form


class TrivialFormMapping(ImplementsFormMapping):
def __call__(self, form: Form) -> tuple[Form, TrivialFormMappingInfo]:
dask_awkward = uproot.extras.dask_awkward()
new_form = dask_awkward.lib.utils.form_with_unique_keys(form, "<root>")
new_form = form_with_unique_keys(form, "<root>")
return new_form, TrivialFormMappingInfo(new_form)


Expand Down Expand Up @@ -1548,9 +1630,7 @@ def real_filter_branch(branch):
partition_args.append((0, 0, 0))

if form_mapping is None:
expected_form = dask_awkward.lib.utils.form_with_unique_keys(
base_form, "<root>"
)
expected_form = form_with_unique_keys(base_form, "<root>")
form_mapping_info = TrivialFormMappingInfo(expected_form)
else:
expected_form, form_mapping_info = form_mapping(base_form)
Expand Down Expand Up @@ -1651,9 +1731,7 @@ def _get_dak_array_delay_open(
)

if form_mapping is None:
expected_form = dask_awkward.lib.utils.form_with_unique_keys(
base_form, "<root>"
)
expected_form = form_with_unique_keys(base_form, "<root>")
form_mapping_info = TrivialFormMappingInfo(expected_form)
else:
expected_form, form_mapping_info = form_mapping(base_form)
Expand Down
74 changes: 74 additions & 0 deletions src/uproot/behaviors/TBranch.py
Original file line number Diff line number Diff line change
Expand Up @@ -673,6 +673,80 @@ def show(

stream.write(formatter.format(name, typename, interp).rstrip(" ") + "\n")

def virtual_arrays(
self,
*,
filter_name=no_filter,
filter_typename=no_filter,
filter_branch=no_filter,
aliases=None,
recursive=True,
full_paths=True,
ignore_duplicates=False,
language=uproot.language.python.python_language,
form_mapping=None,
entry_start=None,
entry_stop=None,
decompression_executor=None,
interpretation_executor=None,
array_cache="inherit",
ak_add_doc=False,
):
from uproot._dask import (
TrivialFormMappingInfo,
_get_ttree_form,
form_with_unique_keys,
)

awkward = uproot.extras.awkward()

entry_start, entry_stop = _regularize_entries_start_stop(
self.num_entries, entry_start, entry_stop
)
decompression_executor, interpretation_executor = _regularize_executors(
decompression_executor, interpretation_executor, self._file
)
array_cache = _regularize_array_cache(array_cache, self._file)

keys = self.keys(
filter_name=filter_name,
filter_typename=filter_typename,
filter_branch=filter_branch,
recursive=recursive,
full_paths=full_paths,
ignore_duplicates=ignore_duplicates,
)

base_form = _get_ttree_form(
awkward,
self,
keys,
ak_add_doc,
)

if form_mapping is None:
expected_form = form_with_unique_keys(base_form, "<root>")
form_mapping_info = TrivialFormMappingInfo(expected_form)
else:
expected_form, form_mapping_info = form_mapping(base_form)

container = form_mapping_info.load_virtual_arrays(
self,
keys,
entry_start,
entry_stop,
decompression_executor,
interpretation_executor,
{"ak_add_doc": ak_add_doc},
)
return awkward.from_buffers(
expected_form,
entry_stop - entry_start,
container,
behavior=form_mapping_info.behavior,
buffer_key=form_mapping_info.buffer_key,
)

def arrays(
self,
expressions=None,
Expand Down
Loading