Skip to content

Commit

Permalink
Compression changes to eo-learn (#731)
Browse files Browse the repository at this point in the history
* remove compress_level parameter from usable functions

* update savers, hard-code compression values

* remove compression param from tests

* fix compression-related tests

* formatting + linters

* add deprecation warnings when `compress_level` param is used

* add test for compression deprecation

* add test for compression cleanup and backwards compatibility

* review comments

* add backwards compatibility test for loading uncompressed data
  • Loading branch information
Matic Lubej authored Aug 31, 2023
1 parent 02e2042 commit 5940817
Show file tree
Hide file tree
Showing 4 changed files with 85 additions and 81 deletions.
13 changes: 9 additions & 4 deletions eolearn/core/core_tasks.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@

import copy
import datetime as dt
import warnings
from abc import ABCMeta
from typing import Any, Callable, Iterable, Literal, Tuple, Union, cast

Expand Down Expand Up @@ -102,11 +103,11 @@ def __init__(
config: SHConfig | None = None,
features: FeaturesSpecification = ...,
overwrite_permission: OverwritePermission = OverwritePermission.ADD_ONLY,
compress_level: int = 1,
*,
save_timestamps: bool | Literal["auto"] = "auto",
use_zarr: bool = False,
temporal_selection: None | slice | list[int] | Literal["infer"] = None,
compress_level: int | None = None,
):
"""
:param path: root path where all EOPatches are saved
Expand All @@ -116,7 +117,6 @@ def __init__(
:param features: A collection of features types specifying features of which type will be saved. By default,
all features will be saved.
:param overwrite_permission: A level of permission for overwriting an existing EOPatch
:param compress_level: A level of data compression and can be specified with an integer from 0 (no compression)
to 9 (highest compression).
:save_timestamps: Whether to save the timestamps of the EOPatch. With the `"auto"` setting timestamps are saved
if `features=...` or if other temporal features are being saved.
Expand All @@ -127,12 +127,18 @@ def __init__(
"""
self.features = features
self.overwrite_permission = overwrite_permission
self.compress_level = compress_level
self.use_zarr = use_zarr
self.temporal_selection = temporal_selection
self.save_timestamps = save_timestamps
super().__init__(path, filesystem=filesystem, create=True, config=config)

if compress_level is not None:
warnings.warn(
"The `compress_level` parameter has been deprecated, data is now compressed by default.",
category=EODeprecationWarning,
stacklevel=2,
)

def execute(
self,
eopatch: EOPatch,
Expand All @@ -155,7 +161,6 @@ def execute(
filesystem=self.filesystem,
features=self.features,
overwrite_permission=self.overwrite_permission,
compress_level=self.compress_level,
save_timestamps=self.save_timestamps,
use_zarr=self.use_zarr,
temporal_selection=temporal_selection,
Expand Down
12 changes: 8 additions & 4 deletions eolearn/core/eodata.py
Original file line number Diff line number Diff line change
Expand Up @@ -604,21 +604,19 @@ def save(
path: str,
features: FeaturesSpecification = ...,
overwrite_permission: OverwritePermission = OverwritePermission.ADD_ONLY,
compress_level: int = 1,
filesystem: FS | None = None,
*,
save_timestamps: bool | Literal["auto"] = "auto",
use_zarr: bool = False,
temporal_selection: None | slice | list[int] | Literal["infer"] = None,
compress_level: int | None = None,
) -> None:
"""Method to save an EOPatch from memory to a storage.
:param path: A location where to save EOPatch. It can be either a local path or a remote URL path.
:param features: A collection of features types specifying features of which type will be saved. By default,
all features will be saved.
:param overwrite_permission: A level of permission for overwriting an existing EOPatch
:param compress_level: A level of data compression and can be specified with an integer from 0 (no compression)
to 9 (highest compression).
:param filesystem: An existing filesystem object. If not given it will be initialized according to the `path`
parameter.
:save_timestamps: Whether to save the timestamps of the EOPatch. With the `"auto"` setting timestamps are saved
Expand All @@ -632,12 +630,18 @@ def save(
filesystem = get_filesystem(path, create=True)
path = "/"

if compress_level is not None:
warnings.warn(
"The `compress_level` parameter has been deprecated, data is now compressed by default.",
category=EODeprecationWarning,
stacklevel=2,
)

save_eopatch(
self,
filesystem,
path,
features=features,
compress_level=compress_level,
overwrite_permission=OverwritePermission(overwrite_permission),
save_timestamps=save_timestamps,
use_zarr=use_zarr,
Expand Down
23 changes: 16 additions & 7 deletions eolearn/core/eodata_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -116,7 +116,6 @@ def save_eopatch(
features: FeaturesSpecification,
save_timestamps: bool | Literal["auto"],
overwrite_permission: OverwritePermission,
compress_level: int,
use_zarr: bool,
temporal_selection: None | slice | list[int] | Literal["infer"],
) -> None:
Expand Down Expand Up @@ -144,7 +143,6 @@ def save_eopatch(
patch_location=patch_location,
filesystem=filesystem,
use_zarr=use_zarr,
compress_level=compress_level,
save_timestamps=save_timestamps,
temporal_selection=_infer_temporal_selection(temporal_selection, filesystem, file_information, eopatch),
)
Expand Down Expand Up @@ -203,7 +201,6 @@ def _yield_savers(
features: Features,
patch_location: str,
filesystem: FS,
compress_level: int,
save_timestamps: bool,
use_zarr: bool,
temporal_selection: TemporalSelection,
Expand All @@ -213,20 +210,32 @@ def _yield_savers(

if eopatch.bbox is not None: # remove after BBox is never None
bbox: BBox = eopatch.bbox # mypy has problems
yield partial(FeatureIOBBox.save, bbox, filesystem, get_file_path(BBOX_FILENAME), compress_level)
yield partial(
FeatureIOBBox.save,
data=bbox,
filesystem=filesystem,
feature_path=get_file_path(BBOX_FILENAME),
compress_level=0,
)

if eopatch.timestamps is not None and save_timestamps:
path = get_file_path(TIMESTAMPS_FILENAME)
yield partial(FeatureIOTimestamps.save, eopatch.timestamps, filesystem, path, compress_level)
yield partial(
FeatureIOTimestamps.save,
data=eopatch.timestamps,
filesystem=filesystem,
feature_path=path,
compress_level=0,
)

for ftype, fname in features:
io_constructor = _get_feature_io_constructor(ftype, use_zarr)
feature_saver = partial(
io_constructor.save,
compress_level=compress_level,
filesystem=filesystem,
data=eopatch[ftype, fname],
filesystem=filesystem,
feature_path=get_file_path(ftype.value, fname),
compress_level=1,
)

if ftype.is_temporal() and issubclass(io_constructor, FeatureIOZarr):
Expand Down
118 changes: 52 additions & 66 deletions tests/core/test_eodata_io.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
from __future__ import annotations

import datetime
import json
import os
import sys
import warnings
Expand Down Expand Up @@ -104,7 +103,7 @@ def test_saving_in_empty_folder(eopatch, fs_loader, use_zarr: bool):

subfolder = "new-subfolder"
eopatch.save("new-subfolder", filesystem=temp_fs, use_zarr=use_zarr)
assert temp_fs.exists(f"/{subfolder}/bbox.geojson.gz")
assert temp_fs.exists(f"/{subfolder}/bbox.geojson")


@mock_s3
Expand Down Expand Up @@ -218,7 +217,7 @@ def test_save_add_only_features(eopatch, fs_loader, use_zarr: bool):
def test_bbox_always_saved(eopatch, fs_loader):
with fs_loader() as temp_fs:
eopatch.save("/", filesystem=temp_fs, features=[FeatureType.DATA])
assert temp_fs.exists("/bbox.geojson.gz")
assert temp_fs.exists("/bbox.geojson")


@mock_s3
Expand All @@ -239,15 +238,15 @@ def test_bbox_always_saved(eopatch, fs_loader):
def test_save_timestamps(eopatch, fs_loader, save_timestamps, features, should_save):
with fs_loader() as temp_fs:
eopatch.save("/", filesystem=temp_fs, features=features, save_timestamps=save_timestamps)
assert temp_fs.exists("/timestamps.json.gz") == should_save
assert temp_fs.exists("/timestamps.json") == should_save


def test_auto_save_load_timestamps(eopatch):
"""Saving and loading with default values should process timestamps."""
test_patch = EOPatch(bbox=eopatch.bbox, timestamps=eopatch.timestamps) # no temporal stuff
with TempFS() as temp_fs:
test_patch.save("/", filesystem=temp_fs)
assert temp_fs.exists("/timestamps.json.gz")
assert temp_fs.exists("/timestamps.json")
assert EOPatch.load("/", filesystem=temp_fs).timestamps is not None


Expand Down Expand Up @@ -328,6 +327,25 @@ def test_overwrite_failure(fs_loader, use_zarr: bool):
)


@mock_s3
@pytest.mark.parametrize("fs_loader", FS_LOADERS)
@pytest.mark.parametrize("compress_level", [0, 1])
def test_compression_deprecation(eopatch, fs_loader, compress_level: int | None):
folder = "foo-folder"

with warnings.catch_warnings(): # make warnings errors
warnings.simplefilter("error")

with fs_loader() as temp_fs:
SaveTask(folder, filesystem=temp_fs)

with fs_loader() as temp_fs, pytest.warns(EODeprecationWarning):
SaveTask(folder, filesystem=temp_fs, compress_level=compress_level)

with fs_loader() as temp_fs, pytest.warns(EODeprecationWarning):
eopatch.save(folder, filesystem=temp_fs, compress_level=compress_level)


@mock_s3
@pytest.mark.parametrize("fs_loader", FS_LOADERS)
@pytest.mark.parametrize("use_zarr", [True, False])
Expand All @@ -338,11 +356,11 @@ def test_save_and_load_tasks(eopatch, fs_loader, use_zarr: bool):
with fs_loader() as temp_fs:
temp_fs.makedir(folder)

save_task = SaveTask(folder, filesystem=temp_fs, compress_level=9, use_zarr=use_zarr)
save_task = SaveTask(folder, filesystem=temp_fs, use_zarr=use_zarr)
load_task = LoadTask(folder, filesystem=temp_fs, lazy_loading=False)

saved_eop = save_task(eopatch, eopatch_folder=patch_folder)
bbox_path = fs.path.join(folder, patch_folder, "bbox.geojson.gz")
bbox_path = fs.path.join(folder, patch_folder, "bbox.geojson")
assert temp_fs.exists(bbox_path)
assert saved_eop == eopatch

Expand Down Expand Up @@ -408,65 +426,32 @@ def test_cleanup_different_compression(fs_loader, eopatch):
folder = "foo-folder"
patch_folder = "patch-folder"
with fs_loader() as temp_fs:
temp_fs.makedir(folder)
temp_fs.makedirs(fs.path.join(folder, patch_folder))
save_task = SaveTask(folder, filesystem=temp_fs, overwrite_permission="OVERWRITE_FEATURES")

save_compressed_task = SaveTask(
folder, filesystem=temp_fs, compress_level=9, overwrite_permission="OVERWRITE_FEATURES"
)
save_noncompressed_task = SaveTask(
folder, filesystem=temp_fs, compress_level=0, overwrite_permission="OVERWRITE_FEATURES"
)
# need to manually save uncompressed features
bbox_path = fs.path.join(folder, patch_folder, "bbox.geojson")
compressed_bbox_path = bbox_path + ".gz"
mask_timeless_path = fs.path.join(folder, patch_folder, "mask_timeless", "mask.npy")
compressed_mask_timeless_path = mask_timeless_path + ".gz"

save_compressed_task(eopatch, eopatch_folder=patch_folder)
save_noncompressed_task(eopatch, eopatch_folder=patch_folder)
assert temp_fs.exists(bbox_path)
assert temp_fs.exists(mask_timeless_path)
assert not temp_fs.exists(compressed_bbox_path)
assert not temp_fs.exists(compressed_mask_timeless_path)

save_compressed_task(eopatch, eopatch_folder=patch_folder)
assert not temp_fs.exists(bbox_path)
assert not temp_fs.exists(mask_timeless_path)
assert temp_fs.exists(compressed_bbox_path)
assert temp_fs.exists(compressed_mask_timeless_path)
FeatureIOBBox.save(eopatch.bbox, temp_fs, bbox_path, compress_level=0)

timestamps_path = fs.path.join(folder, patch_folder, "timestamps.json")
FeatureIOTimestamps.save(eopatch.timestamps, temp_fs, timestamps_path, compress_level=0)

def test_cleanup_different_compression_zarr(eopatch):
# zarr and moto dont work atm anyway
# slightly different than regular one, since zarr does not use gzip to compress itself
_skip_when_appropriate(None, True)
folder = "foo-folder"
patch_folder = "patch-folder"
with TempFS() as temp_fs:
temp_fs.makedir(folder)
ftype, fname = (FeatureType.MASK_TIMELESS, "mask")
temp_fs.makedir(fs.path.join(folder, patch_folder, ftype.value))
mask_timeless_path = fs.path.join(folder, patch_folder, ftype.value, f"{fname}.npy")
FeatureIONumpy.save(eopatch[(ftype, fname)], temp_fs, mask_timeless_path, compress_level=0)

save_compressed_task = SaveTask(
folder, filesystem=temp_fs, compress_level=9, overwrite_permission="OVERWRITE_FEATURES", use_zarr=True
)
save_noncompressed_task = SaveTask(
folder, filesystem=temp_fs, compress_level=0, overwrite_permission="OVERWRITE_FEATURES", use_zarr=True
)
bbox_path = fs.path.join(folder, patch_folder, "bbox.geojson")
compressed_bbox_path = bbox_path + ".gz"
mask_timeless_path = fs.path.join(folder, patch_folder, "mask_timeless", "mask.zarr")
wrong_compressed_mask_timeless_path = mask_timeless_path + ".gz"
# test EOPatch load of uncompressed features
EOPatch.load(fs.path.join(folder, patch_folder), filesystem=temp_fs)

save_compressed_task(eopatch, eopatch_folder=patch_folder)
save_noncompressed_task(eopatch, eopatch_folder=patch_folder)
# re-save compressed and check cleanup, bbox and timestamps are not compressed
save_task(eopatch, eopatch_folder=patch_folder)
assert temp_fs.exists(bbox_path)
assert temp_fs.exists(mask_timeless_path)
assert not temp_fs.exists(compressed_bbox_path)
assert not temp_fs.exists(wrong_compressed_mask_timeless_path)

save_compressed_task(eopatch, eopatch_folder=patch_folder)
assert not temp_fs.exists(bbox_path)
assert temp_fs.exists(mask_timeless_path)
assert temp_fs.exists(compressed_bbox_path)
assert not temp_fs.exists(wrong_compressed_mask_timeless_path)
assert temp_fs.exists(timestamps_path)
assert temp_fs.exists(mask_timeless_path + ".gz")
assert not temp_fs.exists(bbox_path + ".gz")
assert not temp_fs.exists(timestamps_path + ".gz")
assert not temp_fs.exists(mask_timeless_path)


@mock_s3
Expand Down Expand Up @@ -530,7 +515,7 @@ def test_feature_io(constructor: type[FeatureIO], data: Any, compress_level: int
file_name = "name"
with TempFS("testing_file_sistem") as temp_fs:
feat_io = constructor(file_name + file_extension, filesystem=temp_fs)
constructor.save(data, temp_fs, file_name, compress_level)
constructor.save(data, temp_fs, file_name, compress_level=compress_level)
loaded_data = feat_io.load()
assert_feature_data_equal(loaded_data, data)

Expand Down Expand Up @@ -739,19 +724,20 @@ def test_partial_temporal_saving_fails(eopatch: EOPatch):
@pytest.mark.parametrize("patch_location", [".", "patch-folder", "some/long/path"])
def test_old_style_meta_info(patch_location):
with TempFS() as temp_fs:
EOPatch(bbox=DUMMY_BBOX).save(path=patch_location, filesystem=temp_fs, compress_level=0)
EOPatch(bbox=DUMMY_BBOX).save(path=patch_location, filesystem=temp_fs)
meta_info = {"this": ["list"], "something": "else"}
with temp_fs.open(f"{patch_location}/meta_info.json", "w") as old_style_file:
json.dump(meta_info, old_style_file)
file_name, file_extension = f"{patch_location}/meta_info", ".json.gz"
old_style_io = FeatureIOJson(file_name + file_extension, filesystem=temp_fs)
old_style_io.save(meta_info, temp_fs, file_name, compress_level=1)

with pytest.warns(EODeprecationWarning):
loaded_patch = EOPatch.load(path=patch_location, filesystem=temp_fs)
assert dict(loaded_patch.meta_info.items()) == meta_info

loaded_patch.meta_info = {"beep": "boop"}
loaded_patch.save(path=patch_location, filesystem=temp_fs, compress_level=0)
assert not temp_fs.exists(f"{patch_location}/meta_info.json")
assert temp_fs.exists(f"{patch_location}/meta_info/beep.json")
loaded_patch.save(path=patch_location, filesystem=temp_fs)
assert not temp_fs.exists(f"{patch_location}/meta_info.json.gz")
assert temp_fs.exists(f"{patch_location}/meta_info/beep.json.gz")

loaded_patch = EOPatch.load(path=patch_location, filesystem=temp_fs)
assert dict(loaded_patch.meta_info.items()) == {"beep": "boop"}

0 comments on commit 5940817

Please sign in to comment.