From ca5d453c8715518329455b353a2242365aece165 Mon Sep 17 00:00:00 2001 From: pfackeldey Date: Wed, 26 Feb 2025 15:34:48 -0500 Subject: [PATCH] add tree to virtual array conversion --- src/uproot/_dask.py | 94 ++++++++++++++++++++++++++++++--- src/uproot/behaviors/TBranch.py | 74 ++++++++++++++++++++++++++ 2 files changed, 160 insertions(+), 8 deletions(-) diff --git a/src/uproot/_dask.py b/src/uproot/_dask.py index a33abb23f..e810d135f 100644 --- a/src/uproot/_dask.py +++ b/src/uproot/_dask.py @@ -950,11 +950,93 @@ def load_buffers( return container + def load_virtual_arrays( + self, + tree: HasBranches, + keys: frozenset[str], + start: int, + stop: int, + decompression_executor, + interpretation_executor, + options: Any, + ) -> Mapping[str, AwkArray]: + awkward = uproot.extras.awkward() + + class Generator: + def __init__(self, branch, attribute): + self.branch = branch + self.attribute = attribute + + def __repr__(self): + return f"Generator({self.branch}, {self.attribute})" + + def __call__(self): + layout = self.branch.array( + entry_start=start, + entry_stop=stop, + interpretation_executor=interpretation_executor, + decompression_executor=decompression_executor, + library="ak", + ak_add_doc=options.get("ak_add_doc"), + ).layout + # this is a bit of a hack, but it works for now + if isinstance(layout, awkward.contents.NumpyArray): + return layout.data + elif isinstance(layout, awkward.contents.ListOffsetArray): + if self.attribute == "data": + return layout.content.data + elif self.attribute == "offsets": + return layout.offsets.data + else: + raise NotImplementedError() + else: + raise NotImplementedError() + + container = {} + for buffer_key, _ in self._form.expected_from_buffers().items(): + form_key, attribute = self.parse_buffer_key(buffer_key) + branch_name = self._form_key_to_key[form_key] + branch = tree[branch_name] + container[buffer_key] = Generator(branch, attribute) + + return container + + +def form_with_unique_keys(form: Form, key: str) -> Form: + awkward = uproot.extras.awkward() + + def impl(form: Form, key: str) -> None: + # Set form key + form.form_key = key + + # If the form is a record we need to loop over all fields in the + # record and set form that include the field name; this will keep + # recursing as well. + if form.is_record: + for field in form.fields: + impl(form.content(field), f"{key}.{field}") + + elif form.is_union: + for i, entry in enumerate(form.contents): + impl(entry, f"{key}#{i}") + + # NumPy like array is easy + elif form.is_numpy or form.is_unknown: + pass + + # Anything else grab the content and keep recursing + else: + impl(form.content, f"{key}.content") + + # Perform a "deep" copy without preserving references + form = awkward.forms.from_dict(form.to_dict()) + impl(form, key) + return form + class TrivialFormMapping(ImplementsFormMapping): def __call__(self, form: Form) -> tuple[Form, TrivialFormMappingInfo]: - dask_awkward = uproot.extras.dask_awkward() - new_form = dask_awkward.lib.utils.form_with_unique_keys(form, "") + new_form = form_with_unique_keys(form, "") return new_form, TrivialFormMappingInfo(new_form) @@ -1548,9 +1630,7 @@ def real_filter_branch(branch): partition_args.append((0, 0, 0)) if form_mapping is None: - expected_form = dask_awkward.lib.utils.form_with_unique_keys( - base_form, "" - ) + expected_form = form_with_unique_keys(base_form, "") form_mapping_info = TrivialFormMappingInfo(expected_form) else: expected_form, form_mapping_info = form_mapping(base_form) @@ -1651,9 +1731,7 @@ def _get_dak_array_delay_open( ) if form_mapping is None: - expected_form = dask_awkward.lib.utils.form_with_unique_keys( - base_form, "" - ) + expected_form = form_with_unique_keys(base_form, "") form_mapping_info = TrivialFormMappingInfo(expected_form) else: expected_form, form_mapping_info = form_mapping(base_form) diff --git a/src/uproot/behaviors/TBranch.py b/src/uproot/behaviors/TBranch.py index 3fb57ab7e..44de73186 100644 --- a/src/uproot/behaviors/TBranch.py +++ b/src/uproot/behaviors/TBranch.py @@ -673,6 +673,80 @@ def show( stream.write(formatter.format(name, typename, interp).rstrip(" ") + "\n") + def virtual_arrays( + self, + *, + filter_name=no_filter, + filter_typename=no_filter, + filter_branch=no_filter, + aliases=None, + recursive=True, + full_paths=True, + ignore_duplicates=False, + language=uproot.language.python.python_language, + form_mapping=None, + entry_start=None, + entry_stop=None, + decompression_executor=None, + interpretation_executor=None, + array_cache="inherit", + ak_add_doc=False, + ): + from uproot._dask import ( + TrivialFormMappingInfo, + _get_ttree_form, + form_with_unique_keys, + ) + + awkward = uproot.extras.awkward() + + entry_start, entry_stop = _regularize_entries_start_stop( + self.num_entries, entry_start, entry_stop + ) + decompression_executor, interpretation_executor = _regularize_executors( + decompression_executor, interpretation_executor, self._file + ) + array_cache = _regularize_array_cache(array_cache, self._file) + + keys = self.keys( + filter_name=filter_name, + filter_typename=filter_typename, + filter_branch=filter_branch, + recursive=recursive, + full_paths=full_paths, + ignore_duplicates=ignore_duplicates, + ) + + base_form = _get_ttree_form( + awkward, + self, + keys, + ak_add_doc, + ) + + if form_mapping is None: + expected_form = form_with_unique_keys(base_form, "") + form_mapping_info = TrivialFormMappingInfo(expected_form) + else: + expected_form, form_mapping_info = form_mapping(base_form) + + container = form_mapping_info.load_virtual_arrays( + self, + keys, + entry_start, + entry_stop, + decompression_executor, + interpretation_executor, + {"ak_add_doc": ak_add_doc}, + ) + return awkward.from_buffers( + expected_form, + entry_stop - entry_start, + container, + behavior=form_mapping_info.behavior, + buffer_key=form_mapping_info.buffer_key, + ) + def arrays( self, expressions=None,