Skip to content

Commit

Permalink
Merge pull request #56 from NeurodataWithoutBorders/copy-chunks-to-st…
Browse files Browse the repository at this point in the history
…aging-area

Copy chunks to staging area (2)
  • Loading branch information
magland authored May 9, 2024
2 parents 5640e17 + 833b3f3 commit f8c9cc0
Show file tree
Hide file tree
Showing 7 changed files with 82 additions and 29 deletions.
27 changes: 9 additions & 18 deletions lindi/LindiH5ZarrStore/LindiH5ZarrStore.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import json
import base64
from typing import Union, List, IO, Any, Dict
from dataclasses import dataclass
import numpy as np
import zarr
from zarr.storage import Store, MemoryStore
Expand All @@ -21,20 +20,7 @@
from ..LindiH5pyFile.LindiReferenceFileSystemStore import LindiReferenceFileSystemStore
from ..LocalCache.LocalCache import LocalCache
from ..LindiRemfile.LindiRemfile import LindiRemfile


@dataclass
class LindiH5ZarrStoreOpts:
"""
Options for the LindiH5ZarrStore class.
Attributes:
num_dataset_chunks_threshold (Union[int, None]): For each dataset in the
HDF5 file, if the number of chunks is greater than this threshold, then
the dataset will be represented as an external array link. If None, then
the threshold is not used. Default is 1000.
"""
num_dataset_chunks_threshold: Union[int, None] = 1000
from .LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts


class LindiH5ZarrStore(Store):
Expand Down Expand Up @@ -83,7 +69,7 @@ def __init__(
def from_file(
hdf5_file_name_or_url: str,
*,
opts: LindiH5ZarrStoreOpts = LindiH5ZarrStoreOpts(),
opts: Union[LindiH5ZarrStoreOpts, None] = None,
url: Union[str, None] = None,
local_cache: Union[LocalCache, None] = None
):
Expand All @@ -94,7 +80,7 @@ def from_file(
----------
hdf5_file_name_or_url : str
The name of the HDF5 file or a URL to the HDF5 file.
opts : LindiH5ZarrStoreOpts
opts : LindiH5ZarrStoreOpts or None
Options for the store.
url : str or None
If hdf5_file_name_or_url is a local file name, then this can
Expand All @@ -107,6 +93,8 @@ def from_file(
A local cache to use when reading chunks from a remote file. If None,
then no local cache is used.
"""
if opts is None:
opts = LindiH5ZarrStoreOpts() # default options
if hdf5_file_name_or_url.startswith(
"http://"
) or hdf5_file_name_or_url.startswith("https://"):
Expand Down Expand Up @@ -618,7 +606,10 @@ def __init__(self, h5_dataset: h5py.Dataset):
self._is_inline = True
...
elif h5_dataset.dtype.kind in ['i', 'u', 'f']: # integer or float
self._is_inline = False
if h5_dataset.size and h5_dataset.size < 1000:
self._is_inline = True
else:
self._is_inline = False
else:
self._is_inline = True
if h5_dataset.dtype.kind == "V" and h5_dataset.dtype.fields is not None: # compound type
Expand Down
16 changes: 16 additions & 0 deletions lindi/LindiH5ZarrStore/LindiH5ZarrStoreOpts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
from typing import Union
from dataclasses import dataclass


@dataclass(frozen=True)
class LindiH5ZarrStoreOpts:
"""
Options for the LindiH5ZarrStore class.
Attributes:
num_dataset_chunks_threshold (Union[int, None]): For each dataset in the
HDF5 file, if the number of chunks is greater than this threshold, then
the dataset will be represented as an external array link. If None, then
the threshold is not used. Default is 1000.
"""
num_dataset_chunks_threshold: Union[int, None] = 1000
3 changes: 2 additions & 1 deletion lindi/LindiH5ZarrStore/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
from .LindiH5ZarrStore import LindiH5ZarrStore, LindiH5ZarrStoreOpts
from .LindiH5ZarrStore import LindiH5ZarrStore
from .LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts

__all__ = [
"LindiH5ZarrStore",
Expand Down
12 changes: 6 additions & 6 deletions lindi/LindiH5pyFile/LindiH5pyFile.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

from ..LindiStagingStore.StagingArea import StagingArea
from ..LindiStagingStore.LindiStagingStore import LindiStagingStore
from ..LindiH5ZarrStore.LindiH5ZarrStoreOpts import LindiH5ZarrStoreOpts

from ..LocalCache.LocalCache import LocalCache

Expand Down Expand Up @@ -45,7 +46,7 @@ def from_lindi_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", staging
return LindiH5pyFile.from_reference_file_system(url_or_path, mode=mode, staging_area=staging_area, local_cache=local_cache)

@staticmethod
def from_hdf5_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", local_cache: Union[LocalCache, None] = None):
def from_hdf5_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", local_cache: Union[LocalCache, None] = None, zarr_store_opts: Union[LindiH5ZarrStoreOpts, None] = None):
"""
Create a LindiH5pyFile from a URL or path to an HDF5 file.
Expand All @@ -58,11 +59,13 @@ def from_hdf5_file(url_or_path: str, *, mode: Literal["r", "r+"] = "r", local_ca
supported, by default "r".
local_cache : Union[LocalCache, None], optional
The local cache to use for caching data chunks, by default None.
zarr_store_opts : Union[LindiH5ZarrStoreOpts, None], optional
The options to use for the zarr store, by default None.
"""
from ..LindiH5ZarrStore.LindiH5ZarrStore import LindiH5ZarrStore # avoid circular import
if mode == 'r+':
raise Exception("Opening hdf5 file in r+ mode is not supported")
zarr_store = LindiH5ZarrStore.from_file(url_or_path, local_cache=local_cache)
zarr_store = LindiH5ZarrStore.from_file(url_or_path, local_cache=local_cache, opts=zarr_store_opts, url=url_or_path)
return LindiH5pyFile.from_zarr_store(
zarr_store=zarr_store,
mode=mode,
Expand Down Expand Up @@ -102,12 +105,9 @@ def from_reference_file_system(rfs: Union[dict, str, None], *, mode: Literal["r"
}
},
}
if staging_area is not None:
if mode not in ['r+']:
raise Exception("Staging area cannot be used in read-only mode")

if isinstance(rfs, str):
if rfs.startswith("http") or rfs.startswith("https"):
if rfs.startswith("http://") or rfs.startswith("https://"):
with tempfile.TemporaryDirectory() as tmpdir:
filename = f"{tmpdir}/temp.lindi.json"
_download_file(rfs, filename)
Expand Down
2 changes: 1 addition & 1 deletion lindi/LindiH5pyFile/LindiReferenceFileSystemStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ def __getitem__(self, key: str):
url = x[0]
offset = x[1]
length = x[2]
if '{{' in url and 'templates' in self.rfs:
if '{{' in url and '}}' in url and 'templates' in self.rfs:
for k, v in self.rfs["templates"].items():
url = url.replace("{{" + k + "}}", v)
if self.local_cache is not None:
Expand Down
45 changes: 45 additions & 0 deletions lindi/LindiStagingStore/LindiStagingStore.py
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,45 @@ def consolidate_chunks(self):
for fname in files:
os.remove(f"{root}/{fname}")

def copy_chunks_to_staging_area(self, *, download_remote: bool):
"""
Copy the chunks in the base store to the staging area. This is done
in preparation for uploading to a storage system.
Parameters
----------
download_remote : bool
If True, download the remote chunks to the staging area. If False,
just copy the local chunks.
"""
if download_remote:
raise NotImplementedError("Downloading remote chunks not yet implemented")
rfs = self._base_store.rfs
templates = rfs.get('templates', {})
for k, v in rfs['refs'].items():
if isinstance(v, list) and len(v) == 3:
url = _apply_templates(v[0], templates)
if url.startswith('http://') or url.startswith('https://'):
if download_remote:
raise NotImplementedError("Downloading remote chunks not yet implemented")
continue
elif url.startswith(self._staging_area.directory + '/'):
# already in the staging area
continue
else:
# copy the local file to the staging area
path0 = url
chunk_data = _read_chunk_data(path0, v[1], v[2])
stored_file_path = self._staging_area.store_file(k, chunk_data)
self._set_ref_reference(k, stored_file_path, 0, v[2])


def _apply_templates(x: str, templates: dict) -> str:
if '{{' in x and '}}' in x:
for key, val in templates.items():
x = x.replace('{{' + key + '}}', val)
return x


def _sort_by_chunk_key(files: list) -> list:
# first verify that all the files have the same number of parts
Expand Down Expand Up @@ -266,3 +305,9 @@ def _format_size_bytes(size_bytes: int) -> str:
return f"{size_bytes / 1024 / 1024:.1f} MB"
else:
return f"{size_bytes / 1024 / 1024 / 1024:.1f} GB"


def _read_chunk_data(filename: str, offset: int, size: int) -> bytes:
with open(filename, "rb") as f:
f.seek(offset)
return f.read(size)
6 changes: 3 additions & 3 deletions tests/test_fletcher32.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,8 @@ def test_fletcher32():
with tempfile.TemporaryDirectory() as tmpdir:
filename = f'{tmpdir}/test.h5'
with h5py.File(filename, 'w') as f:
dset = f.create_dataset('dset', shape=(100,), dtype='i4', fletcher32=True)
dset[...] = range(100)
dset = f.create_dataset('dset', shape=(2000,), dtype='i4', fletcher32=True)
dset[...] = range(2000) # this needs to be large enough so it doesn't get inlined
assert dset.fletcher32
store = lindi.LindiH5ZarrStore.from_file(filename, url=filename)
rfs = store.to_reference_file_system()
Expand All @@ -20,7 +20,7 @@ def test_fletcher32():
data = ds0[...]
assert isinstance(data, np.ndarray)
assert data.dtype == np.dtype('int32')
assert np.all(data == np.arange(100))
assert np.all(data == np.arange(2000))


if __name__ == '__main__':
Expand Down

0 comments on commit f8c9cc0

Please sign in to comment.