diff --git a/packio/__init__.py b/packio/__init__.py index 8ea450e..78a9168 100644 --- a/packio/__init__.py +++ b/packio/__init__.py @@ -4,6 +4,13 @@ from packio.io import Reader as Reader from packio.io import Writer as Writer +from packio.zip import unzipflat as unzipflat +from packio.zip import zipflat as zipflat __version__ = version("packio") -__all__ = ["Reader", "Writer"] +__all__ = [ + "Reader", + "Writer", + "zipflat", + "unzipflat", +] diff --git a/packio/zip.py b/packio/zip.py new file mode 100644 index 0000000..615f707 --- /dev/null +++ b/packio/zip.py @@ -0,0 +1,78 @@ +"""Tools to simplify zipping and unzipping of files.""" + +import tempfile +import zipfile +from pathlib import Path +from typing import TypeAlias + +PathType: TypeAlias = str | Path + + +def zipflat(*, files: list[PathType], outfile: PathType) -> None: + """Zip files into a single archive with no directory structure. + + Args: + files: List of files to zip. + outfile: Path to the resulting zip archive. + + Raises: + ValueError: If the names of the provided files are not unique. + """ + filepaths = [Path(file) for file in files] + names = [file.name for file in filepaths] + if len(names) != len(set(names)): + for name in set(names): + if names.count(name) > 1: + raise ValueError(f"Filename {name} is not unique.") + # This should not be reachable, but just in case: + raise ValueError("All files must have unique names.") + with zipfile.ZipFile(outfile, "w") as zipf: + for file in filepaths: + zipf.write(file, arcname=file.name) + + +def unzip(*, file: PathType, dest_dir: PathType) -> None: + """Unzip a file into a destination directory. + + Args: + file: Path to the zip archive. + dest_dir: Directory to unzip the archive into. + """ + with zipfile.ZipFile(file, "r") as zipf: + zipf.extractall(dest_dir) + + +def unzipflat(*, file: PathType, dest_dir: PathType, overwrite: bool = False) -> None: + """Unzip a file into a destination directory. + + Args: + file: Path to the zip archive. + dest_dir: An existing directory to unzip the archive into. + overwrite: If True, overwrite any existing files in the destination directory. + + Raises: + ValueError: If the input file is not a zip archive. + ValueError: If any contents of the input zip archive are directories -- expect a flat archive. + FileExistsError: If any files in the archive would overwrite existing files in the destination directory. + """ + if not zipfile.is_zipfile(file): + raise ValueError(f"File {file} is not a zip archive.") + for item in zipfile.ZipFile(file, "r").infolist(): + item_path = Path(item.filename) + if item_path.parts[0] != item_path.name: + raise ValueError( + f"Input zip archive contains directory structure in element {item_path}; expected a flat archive." + ) + if overwrite: + unzip(file=file, dest_dir=dest_dir) + else: + # unzip files into a temporary directory, then move them to the destination only after + # verifying that no files will be overwritten: + with tempfile.TemporaryDirectory() as tempdir: + unzip(file=file, dest_dir=tempdir) + for file in Path(tempdir).iterdir(): + target = Path(dest_dir) / file.name + if target.exists(): + raise FileExistsError(f"File {target} already exists.") + for file in Path(tempdir).iterdir(): + file.rename(Path(dest_dir) / file.name) diff --git a/pyproject.toml b/pyproject.toml index 618c3e1..a7facb2 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "packio" -version = "0.0.5" +version = "0.1.0" description = "IO for multiple python objects to/from a single file" authors = [{ name = "Zach Kurtz", email = "zkurtz@gmail.com" }] readme = "README.md" diff --git a/tests/test_packio.py b/tests/test_io.py similarity index 100% rename from tests/test_packio.py rename to tests/test_io.py diff --git a/tests/test_zip.py b/tests/test_zip.py new file mode 100644 index 0000000..511d3d6 --- /dev/null +++ b/tests/test_zip.py @@ -0,0 +1,78 @@ +"""Test the zip utilities.""" + +import zipfile +from pathlib import Path + +import pytest + +from packio import unzipflat, zipflat + + +def test_zipflat(tmp_path: Path) -> None: + """Test the zipflat function.""" + # Create some files to zip. + file1 = tmp_path / "file1.txt" + file1.write_text("Hello, world!") + file2 = tmp_path / "file2.txt" + file2.write_text("Goodbye, world!") + # Zip the files. + zip_path = tmp_path / "archive.zip" + zipflat(files=[file1, file2], outfile=zip_path) + # Unzip the files. + unzip_path = tmp_path / "unzipped" + unzip_path.mkdir() + unzipflat(file=zip_path, dest_dir=unzip_path) + # Check the unzipped files. + assert (unzip_path / "file1.txt").read_text() == "Hello, world!" + assert (unzip_path / "file2.txt").read_text() == "Goodbye, world!" + + +def test_zipflat_duplicate(tmp_path: Path) -> None: + """Test zipflat with duplicate filenames.""" + # Create some files to zip. + file1 = tmp_path / "file.txt" + file1.write_text("Hello, world!") + file2 = tmp_path / "file.txt" + file2.write_text("Goodbye, world!") + # Zip the files. + zip_path = tmp_path / "archive.zip" + with pytest.raises(ValueError, match="Filename file.txt is not unique."): + zipflat(files=[file1, file2], outfile=zip_path) + + +def test_unzipflat_unflat(tmp_path: Path) -> None: + """Exception should be raised if unzipped archive has directory structure.""" + # Create a zip archive with a directory structure. + zip_path = tmp_path / "archive.zip" + with zipfile.ZipFile(zip_path, "w") as zipf: + zipf.writestr("dir/file.txt", "Hello, world!") + # Try to unzip the file. + with pytest.raises(ValueError, match="expected a flat archive."): + unzipflat(file=zip_path, dest_dir=tmp_path) + + +def test_unzipflat_overwrite(tmp_path: Path) -> None: + """Test unzipflat with overwrite.""" + # Create a file to zip. + file = tmp_path / "file.txt" + file.write_text("Hello, world!") + # Zip the file. + zip_path = tmp_path / "archive.zip" + zipflat(files=[file], outfile=zip_path) + # Unzip the file. + unzip_path = tmp_path / "unzipped" + unzip_path.mkdir() + unzipflat(file=zip_path, dest_dir=unzip_path) + # Try to unzip the file again. + with pytest.raises(FileExistsError, match="file.txt"): + unzipflat(file=zip_path, dest_dir=unzip_path) + + +def test_unzipflat_not_zip(tmp_path: Path) -> None: + """Test unzipflat with a non-zip file.""" + # Create a file to unzip. + file = tmp_path / "file.txt" + file.write_text("Hello, world!") + # Try to unzip the file. + with pytest.raises(ValueError, match="is not a zip archive."): + unzipflat(file=file, dest_dir=tmp_path) diff --git a/uv.lock b/uv.lock index d301fa1..e641600 100644 --- a/uv.lock +++ b/uv.lock @@ -477,7 +477,7 @@ wheels = [ [[package]] name = "packio" -version = "0.0.5" +version = "0.1.0" source = { editable = "." } [package.dev-dependencies]