diff --git a/softcopy/copier.py b/softcopy/copier.py index fb770fc..e77e758 100644 --- a/softcopy/copier.py +++ b/softcopy/copier.py @@ -4,6 +4,7 @@ LOG = logging.getLogger(__name__) + class AbstractCopier(ABC): def __init__(self, source: Path, destination: Path, n_copy_procs: int, log: logging.Logger = LOG): self._source = source diff --git a/softcopy/main.py b/softcopy/main.py index 64b4cc6..332c9dd 100644 --- a/softcopy/main.py +++ b/softcopy/main.py @@ -62,6 +62,7 @@ def main(targets_file, verbose, nprocs): for copier in copiers: copier.stop() + def set_low_io_priority(): try: if sys.platform == "linux": diff --git a/softcopy/ome_zarr_copier.py b/softcopy/ome_zarr_copier.py index b9d0696..bed7002 100644 --- a/softcopy/ome_zarr_copier.py +++ b/softcopy/ome_zarr_copier.py @@ -9,6 +9,7 @@ LOG = logging.getLogger(__name__) + class OMEZarrCopier(AbstractCopier): """ Wrapper around a ZarrCopier that also copies the metadata files for an OME-Zarr archive. diff --git a/softcopy/packed_name.py b/softcopy/packed_name.py index f1ed0d7..1dee888 100644 --- a/softcopy/packed_name.py +++ b/softcopy/packed_name.py @@ -18,14 +18,14 @@ class PackedName: # __slots__ is a special attribute that tells Python to not use a dict, and only allocate space for a fixed set of # attributes. This is a performance optimization which saves memory. - __slots__ = ('_path', '_index') + __slots__ = ("_path", "_index") def __init__(self, name: str, files_nd: np.ndarray, dim_separator: Literal["/", "."], zarr_format: Literal[2, 3]): if dim_separator == "/": parts = name.split("/") else: last_slash = name.rfind("/") - parts = name[last_slash + 1:].split(dim_separator) + parts = name[last_slash + 1 :].split(dim_separator) require_c_prefix = zarr_format == 3 needed_parts = files_nd.size + (1 if require_c_prefix else 0) @@ -49,7 +49,7 @@ def __init__(self, name: str, files_nd: np.ndarray, dim_separator: Literal["/", # parts = parts[-needed_parts:] try: - chunk_index_nd = tuple(int(p) for p in parts[-files_nd.size:]) + chunk_index_nd = tuple(int(p) for p in parts[-files_nd.size :]) except ValueError: self._path = name self._index = None @@ -73,7 +73,13 @@ def from_index(index: int): ret._path = None return ret - def path_from_index(index: int, files_nd: np.ndarray, zarr_location: Path, dim_separator: Literal["/", "."], zarr_format: Literal[2, 3]) -> Path: + def path_from_index( + index: int, + files_nd: np.ndarray, + zarr_location: Path, + dim_separator: Literal["/", "."], + zarr_format: Literal[2, 3], + ) -> Path: chunk_index_nd = np.unravel_index(index, files_nd) prefixless_chunk_key = dim_separator.join(map(str, chunk_index_nd)) @@ -81,7 +87,9 @@ def path_from_index(index: int, files_nd: np.ndarray, zarr_location: Path, dim_s return zarr_location / "c" / prefixless_chunk_key return zarr_location / prefixless_chunk_key - def get_path(self, files_nd: np.ndarray, zarr_location: Path, dim_separator: Literal["/", "."], zarr_format: Literal[2, 3]) -> Path: + def get_path( + self, files_nd: np.ndarray, zarr_location: Path, dim_separator: Literal["/", "."], zarr_format: Literal[2, 3] + ) -> Path: if self._path is not None: return zarr_location / self._path elif self._index is not None: diff --git a/softcopy/slow_write.py b/softcopy/slow_write.py index c4d2171..5e6c05b 100644 --- a/softcopy/slow_write.py +++ b/softcopy/slow_write.py @@ -23,7 +23,12 @@ @click.option("--method", type=click.Choice(["v2", "v3", "v3_shard"]), default="v3") @click.option("--sleep", type=float, default=1.0) @click.option("--timepoints", type=int, default=3) -@click.option("--no-complete-file", is_flag=True, help="Disable using a file called 'complete' to signal that the write is done", default=False) +@click.option( + "--no-complete-file", + is_flag=True, + help="Disable using a file called 'complete' to signal that the write is done", + default=False, +) def main(source, destination, method, sleep, timepoints, no_complete_file): ensure_high_io_priority() @@ -66,13 +71,11 @@ def main(source, destination, method, sleep, timepoints, no_complete_file): # target of 1000 * timepoints, we will warn the user: num_files = np.prod(data.shape[1:] // chunks[1:]) if num_files / np.prod(target_files_nd) < 0.1: - print(f"Warning: the number of files being written is very low ({num_files} / timepoint). In real acquisitions, softcopy moves much more data than this, and so this may not be a good test of its performance.") + print( + f"Warning: the number of files being written is very low ({num_files} / timepoint). In real acquisitions, softcopy moves much more data than this, and so this may not be a good test of its performance." + ) - preparation_methods = { - "v2": prepare_zarr_v2, - "v3": prepare_zarr_v3, - "v3_shard": prepare_zarr_v3_shard - } + preparation_methods = {"v2": prepare_zarr_v2, "v3": prepare_zarr_v3, "v3_shard": prepare_zarr_v3_shard} prepare_zarr = preparation_methods[method] dataset = prepare_zarr(destination, data, timepoints, chunks) diff --git a/softcopy/zarr_copier.py b/softcopy/zarr_copier.py index 29392d8..cee87f6 100644 --- a/softcopy/zarr_copier.py +++ b/softcopy/zarr_copier.py @@ -69,7 +69,9 @@ def __init__(self, source: Path, destination: Path, n_copy_procs: int = 1, log: self._dimension_separator = zarr_json["dimension_separator"] self._log.debug(f"Dimension separator: {self._dimension_separator}") if self._dimension_separator in (None, ""): - log.critical(f"Could not determine dimension separator from zarr.json file {zarr_json_path}: {self._dimension_separator!r}") + log.critical( + f"Could not determine dimension separator from zarr.json file {zarr_json_path}: {self._dimension_separator!r}" + ) exit(1) elif self._zarr_format == 3: chunks = np.array(zarr_json["chunk_grid"]["configuration"]["chunk_shape"]) @@ -93,7 +95,12 @@ def __init__(self, source: Path, destination: Path, n_copy_procs: int = 1, log: self._queue = Queue() self._observer = Observer() event_handler = ZarrFileEventHandler( - self._zarr_format, self._dimension_separator, self._files_nd, self._observation_finished, self._queue, self._log + self._zarr_format, + self._dimension_separator, + self._files_nd, + self._observation_finished, + self._queue, + self._log, ) self._observer.schedule(event_handler, source, recursive=True) self._copy_procs = [] @@ -190,7 +197,9 @@ def print_copy_status(): missed_count = 0 for chunk_index in range(np.prod(self._files_nd)): chunk_packed_name: PackedName = PackedName.from_index(chunk_index) - chunk_path = chunk_packed_name.get_path(self._files_nd, self._destination, self._dimension_separator, self._zarr_format) + chunk_path = chunk_packed_name.get_path( + self._files_nd, self._destination, self._dimension_separator, self._zarr_format + ) if not chunk_path.exists(): self._log.debug(f"File {chunk_path} was missed by the observer! Adding to queue for retry.") @@ -232,7 +241,9 @@ def _queue_existing_files(self): # and then pack the name relative_dir_path = os.path.relpath(dir_path, self._source) relative_filepath = os.path.join(relative_dir_path, file) - packed_name = PackedName(relative_filepath, self._files_nd, self._dimension_separator, self._zarr_format) + packed_name = PackedName( + relative_filepath, self._files_nd, self._dimension_separator, self._zarr_format + ) if packed_name.is_zarr_chunk(): chunk_count += 1 if Path(file).stem == "complete": @@ -313,9 +324,16 @@ def _copy_worker( if queue_draining.value == 1 and queue.empty(): break + class ZarrFileEventHandler(FileSystemEventHandler): def __init__( - self, zarr_format: Literal[2, 3], dimension_separator: Literal[".", "/"], files_nd: np.ndarray, observation_finished: Synchronized, queue: Queue, log: Logger = LOG + self, + zarr_format: Literal[2, 3], + dimension_separator: Literal[".", "/"], + files_nd: np.ndarray, + observation_finished: Synchronized, + queue: Queue, + log: Logger = LOG, ): super().__init__() self.zarr_format = zarr_format @@ -326,10 +344,10 @@ def __init__( self.queue = queue def on_created(self, event: FileCreatedEvent): - if isinstance(event, FileCreatedEvent): # noqa: SIM102 + if isinstance(event, FileCreatedEvent): # noqa: SIM102 # This is probably pointless, but I am worried about path parsing overhead given how many file transactions # can occur - so only parse the path if we know the filepath ends with "complete" - if event.src_path.endswith("complete"): # noqa: SIM102 + if event.src_path.endswith("complete"): # noqa: SIM102 if Path(event.src_path).stem == "complete": self._log.info("Detected 'complete' file. Stopping observer.") with self.observation_finished.get_lock(): @@ -340,7 +358,9 @@ def on_deleted(self, event): # remove .__lock suffix from right side of path lock_index = event.src_path.rfind(".__lock") if lock_index != -1: - packed_name = PackedName(event.src_path[:lock_index], self.files_nd, self._dimension_separator, self.zarr_format) + packed_name = PackedName( + event.src_path[:lock_index], self.files_nd, self._dimension_separator, self.zarr_format + ) if packed_name._index is None: print(f"screwed up: {event.src_path}") self.queue.put(packed_name) diff --git a/softcopy/zarr_utils.py b/softcopy/zarr_utils.py index de08498..fb15b86 100644 --- a/softcopy/zarr_utils.py +++ b/softcopy/zarr_utils.py @@ -14,6 +14,7 @@ ALL_METADATA_FILES = set(functools.reduce(operator.iadd, METADATA_FILES_BY_VERSION.values(), [])) KNOWN_VERSIONS = set(METADATA_FILES_BY_VERSION.keys()) + def identify_zarr_format(archive_path: Path, log: Logger = LOG) -> Optional[Literal[2, 3]]: """ Identify the zarr version of the archive by identifying a metadata file and reading its zarr_format key. @@ -38,6 +39,7 @@ def identify_zarr_format(archive_path: Path, log: Logger = LOG) -> Optional[Lite log.debug(f"Could not identify zarr version from metadata files in archive folder {archive_path}") return None + def dtype_string_zarr2(dtype): endianness = dtype.byteorder if endianness == "=": diff --git a/test.py b/test.py deleted file mode 100644 index 67d226f..0000000 --- a/test.py +++ /dev/null @@ -1,118 +0,0 @@ -import numpy as np -import timeit -from pathlib import Path - -# Original Implementation (for comparison) -class PackedNameOriginal: - _path: Path = None - _index: int = None - - def __init__(self, name: str, files_nd: np.ndarray, version: int = 2, dim_separator: str = "."): - path = Path(name) - - if version == 3: - parts = path.parts - else: - parts = path.name.split(dim_separator) - - needed_parts = (1 + files_nd.size) if version == 3 else files_nd.size - - if len(parts) < needed_parts: - self._path = path - return - - if version == 3: - if parts[-needed_parts][0] != "c": - self._path = path - return - parts = parts[-needed_parts:] - - try: - chunk_index_nd = list(map(int, parts[-files_nd.size:])) - if all(0 <= coord < files_nd[i] for i, coord in enumerate(chunk_index_nd)): - self._index = np.ravel_multi_index(chunk_index_nd, files_nd) - self._path = None - except ValueError: - self._path = path - - def get_path(self, files_nd: np.ndarray, zarr_location: Path, dim_separator: str = "."): - if self._path is not None: - return zarr_location / self._path - elif self._index is not None: - chunk_index_nd = np.unravel_index(self._index, files_nd) - if dim_separator == "/": - return zarr_location / Path(*map(str, chunk_index_nd)) - return zarr_location / dim_separator.join(map(str, chunk_index_nd)) - - -# Optimized Implementation -class PackedNameOptimized: - __slots__ = ('_path', '_index') - - def __init__(self, name: str, files_nd: np.ndarray, version: int = 2, dim_separator: str = "."): - if version == 3: - parts = name.split('/') - else: - parts = name.split(dim_separator) - - needed_parts = (1 + files_nd.size) if version == 3 else files_nd.size - - if len(parts) < needed_parts: - self._path = name - self._index = None - return - - if version == 3: - if parts[-needed_parts][0] != "c": - self._path = name - self._index = None - return - parts = parts[-needed_parts:] - - try: - chunk_index_nd = tuple(int(p) for p in parts[-files_nd.size:]) - except ValueError: - self._path = name - self._index = None - return - - if not all(0 <= coord < files_nd[i] for i, coord in enumerate(chunk_index_nd)): - self._path = name - self._index = None - return - - self._index = np.ravel_multi_index(chunk_index_nd, files_nd) - self._path = None - - def get_path(self, files_nd: np.ndarray, zarr_location: Path, dim_separator: str = "."): - if self._path is not None: - return zarr_location / self._path - elif self._index is not None: - chunk_index_nd = np.unravel_index(self._index, files_nd) - if dim_separator == "/": - return zarr_location / "/".join(map(str, chunk_index_nd)) - return zarr_location / dim_separator.join(map(str, chunk_index_nd)) - - -# Set up the test environment -files_nd = np.array([100, 100, 100]) # Example 3D file array -zarr_location = Path("/zarr_location") -name = "c/50/50/50" -dim_separator = "/" - -# Benchmark both implementations -def benchmark_original(): - packed = PackedNameOriginal(name, files_nd, version=3, dim_separator=dim_separator) - return packed.get_path(files_nd, zarr_location, dim_separator=dim_separator) - -def benchmark_optimized(): - packed = PackedNameOptimized(name, files_nd, version=3, dim_separator=dim_separator) - return packed.get_path(files_nd, zarr_location, dim_separator=dim_separator) - -# Run benchmarks -original_time = timeit.timeit(benchmark_original, number=100000) -optimized_time = timeit.timeit(benchmark_optimized, number=100000) - -print(f"Original Implementation Time: {original_time} seconds") -print(f"Optimized Implementation Time: {optimized_time} seconds") - diff --git a/tests/test_softcopy.py b/tests/test_softcopy.py index 68e70fb..3ba65a8 100644 --- a/tests/test_softcopy.py +++ b/tests/test_softcopy.py @@ -14,11 +14,19 @@ def run_slow_write(dummy_path: Path, input_path: Path, no_complete_file: bool = False): - softcopy.slow_write.main([str(dummy_path), str(input_path), "--method", "v2", "--no-complete-file" if no_complete_file else "--"], standalone_mode=False) + softcopy.slow_write.main( + [str(dummy_path), str(input_path), "--method", "v2", "--no-complete-file" if no_complete_file else "--"], + standalone_mode=False, + ) def run_softcopy(targets_file_path: Path): - softcopy.main.main([str(targets_file_path),], standalone_mode=False) + softcopy.main.main( + [ + str(targets_file_path), + ], + standalone_mode=False, + ) def create_targets_yaml(targets_file_path: Path, input_path: Path, output_path: Path): @@ -85,6 +93,7 @@ def test_slow_write_and_softcopy(workspace, dummy_zarr_path, create_zarr2_archiv assert np.all(np.equal(slow_write_output[:].read().result(), copied_dataset[:].read().result())) + # In the tests above, we just test copying a regular zarr archive. However, we also need limited support for ome-zarr FOV data. # Specifically, the ome-zarr subset that is written by DaXi. This looks like the following: # aquisition.ome.zarr/ diff --git a/tests/test_to_ome.py b/tests/test_to_ome.py index 6c91f95..6f26ed3 100644 --- a/tests/test_to_ome.py +++ b/tests/test_to_ome.py @@ -21,4 +21,5 @@ def test_full_run(dummy_zarr_path, create_zarr2_archive): expected_contents = {".zattrs", ".zgroup", "0"} assert top_level_contents == expected_contents + # TODO: test for / dimension separator input, remove dim diff --git a/tests/test_zarr_copier.py b/tests/test_zarr_copier.py index fa01873..d8aa0ba 100644 --- a/tests/test_zarr_copier.py +++ b/tests/test_zarr_copier.py @@ -6,6 +6,7 @@ TEST_DATA = "test data" + def test_zarr_copier(workspace, dummy_zarr_path): destination = workspace / "destination" destination.mkdir()