Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Compression changes to eo-learn #731

Merged
merged 10 commits into from
Aug 31, 2023
13 changes: 9 additions & 4 deletions eolearn/core/core_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import copy
import datetime as dt
import warnings
from abc import ABCMeta
from typing import Any, Callable, Iterable, Literal, Tuple, Union, cast

Expand Down Expand Up @@ -102,11 +103,11 @@ def __init__(
config: SHConfig | None = None,
features: FeaturesSpecification = ...,
overwrite_permission: OverwritePermission = OverwritePermission.ADD_ONLY,
compress_level: int = 1,
*,
save_timestamps: bool | Literal["auto"] = "auto",
use_zarr: bool = False,
temporal_selection: None | slice | list[int] | Literal["infer"] = None,
compress_level: int | None = None,
):
"""
:param path: root path where all EOPatches are saved
Expand All @@ -116,7 +117,6 @@ def __init__(
:param features: A collection of features types specifying features of which type will be saved. By default,
all features will be saved.
:param overwrite_permission: A level of permission for overwriting an existing EOPatch
:param compress_level: A level of data compression and can be specified with an integer from 0 (no compression)
to 9 (highest compression).
:save_timestamps: Whether to save the timestamps of the EOPatch. With the `"auto"` setting timestamps are saved
if `features=...` or if other temporal features are being saved.
Expand All @@ -127,12 +127,18 @@ def __init__(
"""
self.features = features
self.overwrite_permission = overwrite_permission
self.compress_level = compress_level
self.use_zarr = use_zarr
self.temporal_selection = temporal_selection
self.save_timestamps = save_timestamps
super().__init__(path, filesystem=filesystem, create=True, config=config)

if compress_level is not None:
warnings.warn(
"The `compress_level` parameter has been deprecated, data is now compressed by default.",
category=EODeprecationWarning,
stacklevel=2,
)

def execute(
self,
eopatch: EOPatch,
Expand All @@ -155,7 +161,6 @@ def execute(
filesystem=self.filesystem,
features=self.features,
overwrite_permission=self.overwrite_permission,
compress_level=self.compress_level,
save_timestamps=self.save_timestamps,
use_zarr=self.use_zarr,
temporal_selection=temporal_selection,
Expand Down
12 changes: 8 additions & 4 deletions eolearn/core/eodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,21 +604,19 @@ def save(
path: str,
features: FeaturesSpecification = ...,
overwrite_permission: OverwritePermission = OverwritePermission.ADD_ONLY,
compress_level: int = 1,
filesystem: FS | None = None,
*,
save_timestamps: bool | Literal["auto"] = "auto",
use_zarr: bool = False,
temporal_selection: None | slice | list[int] | Literal["infer"] = None,
compress_level: int | None = None,
) -> None:
"""Method to save an EOPatch from memory to a storage.

:param path: A location where to save EOPatch. It can be either a local path or a remote URL path.
:param features: A collection of features types specifying features of which type will be saved. By default,
all features will be saved.
:param overwrite_permission: A level of permission for overwriting an existing EOPatch
:param compress_level: A level of data compression and can be specified with an integer from 0 (no compression)
to 9 (highest compression).
:param filesystem: An existing filesystem object. If not given it will be initialized according to the `path`
parameter.
:save_timestamps: Whether to save the timestamps of the EOPatch. With the `"auto"` setting timestamps are saved
Expand All @@ -632,12 +630,18 @@ def save(
filesystem = get_filesystem(path, create=True)
path = "/"

if compress_level is not None:
warnings.warn(
"The `compress_level` parameter has been deprecated, data is now compressed by default.",
category=EODeprecationWarning,
stacklevel=2,
)

save_eopatch(
self,
filesystem,
path,
features=features,
compress_level=compress_level,
overwrite_permission=OverwritePermission(overwrite_permission),
save_timestamps=save_timestamps,
use_zarr=use_zarr,
Expand Down
23 changes: 16 additions & 7 deletions eolearn/core/eodata_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def save_eopatch(
features: FeaturesSpecification,
save_timestamps: bool | Literal["auto"],
overwrite_permission: OverwritePermission,
compress_level: int,
use_zarr: bool,
temporal_selection: None | slice | list[int] | Literal["infer"],
) -> None:
Expand Down Expand Up @@ -144,7 +143,6 @@ def save_eopatch(
patch_location=patch_location,
filesystem=filesystem,
use_zarr=use_zarr,
compress_level=compress_level,
save_timestamps=save_timestamps,
temporal_selection=_infer_temporal_selection(temporal_selection, filesystem, file_information, eopatch),
)
Expand Down Expand Up @@ -203,7 +201,6 @@ def _yield_savers(
features: Features,
patch_location: str,
filesystem: FS,
compress_level: int,
save_timestamps: bool,
use_zarr: bool,
temporal_selection: TemporalSelection,
Expand All @@ -213,20 +210,32 @@ def _yield_savers(

if eopatch.bbox is not None: # remove after BBox is never None
bbox: BBox = eopatch.bbox # mypy has problems
yield partial(FeatureIOBBox.save, bbox, filesystem, get_file_path(BBOX_FILENAME), compress_level)
yield partial(
FeatureIOBBox.save,
data=bbox,
filesystem=filesystem,
feature_path=get_file_path(BBOX_FILENAME),
compress_level=0,
)

if eopatch.timestamps is not None and save_timestamps:
path = get_file_path(TIMESTAMPS_FILENAME)
yield partial(FeatureIOTimestamps.save, eopatch.timestamps, filesystem, path, compress_level)
yield partial(
FeatureIOTimestamps.save,
data=eopatch.timestamps,
filesystem=filesystem,
feature_path=path,
compress_level=0,
)

for ftype, fname in features:
io_constructor = _get_feature_io_constructor(ftype, use_zarr)
feature_saver = partial(
io_constructor.save,
compress_level=compress_level,
filesystem=filesystem,
data=eopatch[ftype, fname],
filesystem=filesystem,
feature_path=get_file_path(ftype.value, fname),
compress_level=1,
)

if ftype.is_temporal() and issubclass(io_constructor, FeatureIOZarr):
Expand Down
114 changes: 42 additions & 72 deletions tests/core/test_eodata_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from __future__ import annotations

import datetime
import json
import os
import sys
import warnings
Expand Down Expand Up @@ -104,7 +103,7 @@ def test_saving_in_empty_folder(eopatch, fs_loader, use_zarr: bool):

subfolder = "new-subfolder"
eopatch.save("new-subfolder", filesystem=temp_fs, use_zarr=use_zarr)
assert temp_fs.exists(f"/{subfolder}/bbox.geojson.gz")
assert temp_fs.exists(f"/{subfolder}/bbox.geojson")


@mock_s3
Expand Down Expand Up @@ -218,7 +217,7 @@ def test_save_add_only_features(eopatch, fs_loader, use_zarr: bool):
def test_bbox_always_saved(eopatch, fs_loader):
with fs_loader() as temp_fs:
eopatch.save("/", filesystem=temp_fs, features=[FeatureType.DATA])
assert temp_fs.exists("/bbox.geojson.gz")
assert temp_fs.exists("/bbox.geojson")


@mock_s3
Expand All @@ -239,15 +238,15 @@ def test_bbox_always_saved(eopatch, fs_loader):
def test_save_timestamps(eopatch, fs_loader, save_timestamps, features, should_save):
with fs_loader() as temp_fs:
eopatch.save("/", filesystem=temp_fs, features=features, save_timestamps=save_timestamps)
assert temp_fs.exists("/timestamps.json.gz") == should_save
assert temp_fs.exists("/timestamps.json") == should_save


def test_auto_save_load_timestamps(eopatch):
"""Saving and loading with default values should process timestamps."""
test_patch = EOPatch(bbox=eopatch.bbox, timestamps=eopatch.timestamps) # no temporal stuff
with TempFS() as temp_fs:
test_patch.save("/", filesystem=temp_fs)
assert temp_fs.exists("/timestamps.json.gz")
assert temp_fs.exists("/timestamps.json")
assert EOPatch.load("/", filesystem=temp_fs).timestamps is not None


Expand Down Expand Up @@ -328,6 +327,19 @@ def test_overwrite_failure(fs_loader, use_zarr: bool):
)


@mock_s3
@pytest.mark.parametrize("fs_loader", FS_LOADERS)
@pytest.mark.parametrize("compress_level", [0, 1])
def test_compression_deprecation(eopatch, fs_loader, compress_level: int | None):
folder = "foo-folder"

zigaLuksic marked this conversation as resolved.
Show resolved Hide resolved
with fs_loader() as temp_fs, pytest.warns(EODeprecationWarning):
SaveTask(folder, filesystem=temp_fs, compress_level=compress_level)

with fs_loader() as temp_fs, pytest.warns(EODeprecationWarning):
eopatch.save(folder, filesystem=temp_fs, compress_level=compress_level)


@mock_s3
@pytest.mark.parametrize("fs_loader", FS_LOADERS)
@pytest.mark.parametrize("use_zarr", [True, False])
Expand All @@ -338,11 +350,11 @@ def test_save_and_load_tasks(eopatch, fs_loader, use_zarr: bool):
with fs_loader() as temp_fs:
temp_fs.makedir(folder)

save_task = SaveTask(folder, filesystem=temp_fs, compress_level=9, use_zarr=use_zarr)
save_task = SaveTask(folder, filesystem=temp_fs, use_zarr=use_zarr)
load_task = LoadTask(folder, filesystem=temp_fs, lazy_loading=False)

saved_eop = save_task(eopatch, eopatch_folder=patch_folder)
bbox_path = fs.path.join(folder, patch_folder, "bbox.geojson.gz")
bbox_path = fs.path.join(folder, patch_folder, "bbox.geojson")
assert temp_fs.exists(bbox_path)
assert saved_eop == eopatch

Expand Down Expand Up @@ -408,65 +420,23 @@ def test_cleanup_different_compression(fs_loader, eopatch):
folder = "foo-folder"
patch_folder = "patch-folder"
with fs_loader() as temp_fs:
temp_fs.makedir(folder)

save_compressed_task = SaveTask(
folder, filesystem=temp_fs, compress_level=9, overwrite_permission="OVERWRITE_FEATURES"
)
save_noncompressed_task = SaveTask(
folder, filesystem=temp_fs, compress_level=0, overwrite_permission="OVERWRITE_FEATURES"
)
save_task = SaveTask(folder, filesystem=temp_fs, overwrite_permission="OVERWRITE_FEATURES")
bbox_path = fs.path.join(folder, patch_folder, "bbox.geojson")
compressed_bbox_path = bbox_path + ".gz"
mask_timeless_path = fs.path.join(folder, patch_folder, "mask_timeless", "mask.npy")
compressed_mask_timeless_path = mask_timeless_path + ".gz"

save_compressed_task(eopatch, eopatch_folder=patch_folder)
save_noncompressed_task(eopatch, eopatch_folder=patch_folder)
assert temp_fs.exists(bbox_path)
assert temp_fs.exists(mask_timeless_path)
assert not temp_fs.exists(compressed_bbox_path)
assert not temp_fs.exists(compressed_mask_timeless_path)

save_compressed_task(eopatch, eopatch_folder=patch_folder)
assert not temp_fs.exists(bbox_path)
assert not temp_fs.exists(mask_timeless_path)
assert temp_fs.exists(compressed_bbox_path)
assert temp_fs.exists(compressed_mask_timeless_path)


def test_cleanup_different_compression_zarr(eopatch):
# zarr and moto dont work atm anyway
# slightly different than regular one, since zarr does not use gzip to compress itself
_skip_when_appropriate(None, True)
folder = "foo-folder"
patch_folder = "patch-folder"
with TempFS() as temp_fs:
temp_fs.makedir(folder)
timestamps_path = fs.path.join(folder, patch_folder, "timestamps.json")

save_compressed_task = SaveTask(
folder, filesystem=temp_fs, compress_level=9, overwrite_permission="OVERWRITE_FEATURES", use_zarr=True
)
save_noncompressed_task = SaveTask(
folder, filesystem=temp_fs, compress_level=0, overwrite_permission="OVERWRITE_FEATURES", use_zarr=True
)
bbox_path = fs.path.join(folder, patch_folder, "bbox.geojson")
compressed_bbox_path = bbox_path + ".gz"
mask_timeless_path = fs.path.join(folder, patch_folder, "mask_timeless", "mask.zarr")
wrong_compressed_mask_timeless_path = mask_timeless_path + ".gz"
# need to manually save uncompressed feature
zigaLuksic marked this conversation as resolved.
Show resolved Hide resolved
ftype, fname = (FeatureType.MASK_TIMELESS, "mask")
ftype_path = fs.path.join(folder, patch_folder, ftype.value)
temp_fs.makedirs(ftype_path)
feature_io = FeatureIONumpy(os.path.join(ftype_path, fname + ".npy"), filesystem=temp_fs)
feature_io.save(eopatch[(ftype, fname)], temp_fs, os.path.join(ftype_path, fname), compress_level=0)
mlubej marked this conversation as resolved.
Show resolved Hide resolved

save_compressed_task(eopatch, eopatch_folder=patch_folder)
save_noncompressed_task(eopatch, eopatch_folder=patch_folder)
# re-save compressed and check cleanup
save_task(eopatch, eopatch_folder=patch_folder)
assert temp_fs.exists(bbox_path)
assert temp_fs.exists(mask_timeless_path)
assert not temp_fs.exists(compressed_bbox_path)
assert not temp_fs.exists(wrong_compressed_mask_timeless_path)

save_compressed_task(eopatch, eopatch_folder=patch_folder)
assert not temp_fs.exists(bbox_path)
assert temp_fs.exists(mask_timeless_path)
assert temp_fs.exists(compressed_bbox_path)
assert not temp_fs.exists(wrong_compressed_mask_timeless_path)
assert temp_fs.exists(timestamps_path)
assert temp_fs.exists(os.path.join(ftype_path, fname) + ".npy.gz")
assert not temp_fs.exists(os.path.join(ftype_path, fname) + ".npy")


@mock_s3
Expand Down Expand Up @@ -520,17 +490,16 @@ def test_lazy_loading_plus_overwrite_patch(fs_loader, folder_name, eopatch, use_
(FeatureIOTimestamps, [datetime.datetime(2017, 1, 1, 10, 4, 7), datetime.datetime(2017, 1, 4, 10, 14, 5)]),
],
)
@pytest.mark.parametrize("compress_level", [0, 1])
zigaLuksic marked this conversation as resolved.
Show resolved Hide resolved
def test_feature_io(constructor: type[FeatureIO], data: Any, compress_level: int) -> None:
def test_feature_io(constructor: type[FeatureIO], data: Any) -> None:
"""
Tests verifying that FeatureIO subclasses correctly save, load, and lazy-load data.
Test cases do not include subfolders, because subfolder management is currently done by the `save_eopatch` function.
"""
file_extension = constructor.get_file_extension(compress_level=compress_level)
file_extension = constructor.get_file_extension(compress_level=1)
file_name = "name"
with TempFS("testing_file_sistem") as temp_fs:
feat_io = constructor(file_name + file_extension, filesystem=temp_fs)
constructor.save(data, temp_fs, file_name, compress_level)
constructor.save(data, temp_fs, file_name, compress_level=1)
loaded_data = feat_io.load()
assert_feature_data_equal(loaded_data, data)

Expand Down Expand Up @@ -739,19 +708,20 @@ def test_partial_temporal_saving_fails(eopatch: EOPatch):
@pytest.mark.parametrize("patch_location", [".", "patch-folder", "some/long/path"])
def test_old_style_meta_info(patch_location):
with TempFS() as temp_fs:
EOPatch(bbox=DUMMY_BBOX).save(path=patch_location, filesystem=temp_fs, compress_level=0)
EOPatch(bbox=DUMMY_BBOX).save(path=patch_location, filesystem=temp_fs)
meta_info = {"this": ["list"], "something": "else"}
with temp_fs.open(f"{patch_location}/meta_info.json", "w") as old_style_file:
json.dump(meta_info, old_style_file)
file_name, file_extension = f"{patch_location}/meta_info", ".json.gz"
old_style_io = FeatureIOJson(file_name + file_extension, filesystem=temp_fs)
old_style_io.save(meta_info, temp_fs, file_name, compress_level=1)

with pytest.warns(EODeprecationWarning):
loaded_patch = EOPatch.load(path=patch_location, filesystem=temp_fs)
assert dict(loaded_patch.meta_info.items()) == meta_info

loaded_patch.meta_info = {"beep": "boop"}
loaded_patch.save(path=patch_location, filesystem=temp_fs, compress_level=0)
assert not temp_fs.exists(f"{patch_location}/meta_info.json")
assert temp_fs.exists(f"{patch_location}/meta_info/beep.json")
loaded_patch.save(path=patch_location, filesystem=temp_fs)
assert not temp_fs.exists(f"{patch_location}/meta_info.json.gz")
assert temp_fs.exists(f"{patch_location}/meta_info/beep.json.gz")

loaded_patch = EOPatch.load(path=patch_location, filesystem=temp_fs)
assert dict(loaded_patch.meta_info.items()) == {"beep": "boop"}