-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
6 changed files
with
166 additions
and
3 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
"""Tools to simplify zipping and unzipping of files.""" | ||
|
||
import tempfile | ||
import zipfile | ||
from pathlib import Path | ||
from typing import TypeAlias | ||
|
||
PathType: TypeAlias = str | Path | ||
|
||
|
||
def zipflat(*, files: list[PathType], outfile: PathType) -> None: | ||
"""Zip files into a single archive with no directory structure. | ||
Args: | ||
files: List of files to zip. | ||
outfile: Path to the resulting zip archive. | ||
Raises: | ||
ValueError: If the names of the provided files are not unique. | ||
""" | ||
filepaths = [Path(file) for file in files] | ||
names = [file.name for file in filepaths] | ||
if len(names) != len(set(names)): | ||
for name in set(names): | ||
if names.count(name) > 1: | ||
raise ValueError(f"Filename {name} is not unique.") | ||
# This should not be reachable, but just in case: | ||
raise ValueError("All files must have unique names.") | ||
with zipfile.ZipFile(outfile, "w") as zipf: | ||
for file in filepaths: | ||
zipf.write(file, arcname=file.name) | ||
|
||
|
||
def unzip(*, file: PathType, dest_dir: PathType) -> None: | ||
"""Unzip a file into a destination directory. | ||
Args: | ||
file: Path to the zip archive. | ||
dest_dir: Directory to unzip the archive into. | ||
""" | ||
with zipfile.ZipFile(file, "r") as zipf: | ||
zipf.extractall(dest_dir) | ||
|
||
|
||
def unzipflat(*, file: PathType, dest_dir: PathType, overwrite: bool = False) -> None: | ||
"""Unzip a file into a destination directory. | ||
Args: | ||
file: Path to the zip archive. | ||
dest_dir: An existing directory to unzip the archive into. | ||
overwrite: If True, overwrite any existing files in the destination directory. | ||
Raises: | ||
ValueError: If the input file is not a zip archive. | ||
ValueError: If any contents of the input zip archive are directories -- expect a flat archive. | ||
FileExistsError: If any files in the archive would overwrite existing files in the destination directory. | ||
""" | ||
if not zipfile.is_zipfile(file): | ||
raise ValueError(f"File {file} is not a zip archive.") | ||
for item in zipfile.ZipFile(file, "r").infolist(): | ||
item_path = Path(item.filename) | ||
if item_path.parts[0] != item_path.name: | ||
raise ValueError( | ||
f"Input zip archive contains directory structure in element {item_path}; expected a flat archive." | ||
) | ||
if overwrite: | ||
unzip(file=file, dest_dir=dest_dir) | ||
else: | ||
# unzip files into a temporary directory, then move them to the destination only after | ||
# verifying that no files will be overwritten: | ||
with tempfile.TemporaryDirectory() as tempdir: | ||
unzip(file=file, dest_dir=tempdir) | ||
for file in Path(tempdir).iterdir(): | ||
target = Path(dest_dir) / file.name | ||
if target.exists(): | ||
raise FileExistsError(f"File {target} already exists.") | ||
for file in Path(tempdir).iterdir(): | ||
file.rename(Path(dest_dir) / file.name) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -1,6 +1,6 @@ | ||
[project] | ||
name = "packio" | ||
version = "0.0.5" | ||
version = "0.1.0" | ||
description = "IO for multiple python objects to/from a single file" | ||
authors = [{ name = "Zach Kurtz", email = "[email protected]" }] | ||
readme = "README.md" | ||
|
File renamed without changes.
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,78 @@ | ||
"""Test the zip utilities.""" | ||
|
||
import zipfile | ||
from pathlib import Path | ||
|
||
import pytest | ||
|
||
from packio import unzipflat, zipflat | ||
|
||
|
||
def test_zipflat(tmp_path: Path) -> None: | ||
"""Test the zipflat function.""" | ||
# Create some files to zip. | ||
file1 = tmp_path / "file1.txt" | ||
file1.write_text("Hello, world!") | ||
file2 = tmp_path / "file2.txt" | ||
file2.write_text("Goodbye, world!") | ||
# Zip the files. | ||
zip_path = tmp_path / "archive.zip" | ||
zipflat(files=[file1, file2], outfile=zip_path) | ||
# Unzip the files. | ||
unzip_path = tmp_path / "unzipped" | ||
unzip_path.mkdir() | ||
unzipflat(file=zip_path, dest_dir=unzip_path) | ||
# Check the unzipped files. | ||
assert (unzip_path / "file1.txt").read_text() == "Hello, world!" | ||
assert (unzip_path / "file2.txt").read_text() == "Goodbye, world!" | ||
|
||
|
||
def test_zipflat_duplicate(tmp_path: Path) -> None: | ||
"""Test zipflat with duplicate filenames.""" | ||
# Create some files to zip. | ||
file1 = tmp_path / "file.txt" | ||
file1.write_text("Hello, world!") | ||
file2 = tmp_path / "file.txt" | ||
file2.write_text("Goodbye, world!") | ||
# Zip the files. | ||
zip_path = tmp_path / "archive.zip" | ||
with pytest.raises(ValueError, match="Filename file.txt is not unique."): | ||
zipflat(files=[file1, file2], outfile=zip_path) | ||
|
||
|
||
def test_unzipflat_unflat(tmp_path: Path) -> None: | ||
"""Exception should be raised if unzipped archive has directory structure.""" | ||
# Create a zip archive with a directory structure. | ||
zip_path = tmp_path / "archive.zip" | ||
with zipfile.ZipFile(zip_path, "w") as zipf: | ||
zipf.writestr("dir/file.txt", "Hello, world!") | ||
# Try to unzip the file. | ||
with pytest.raises(ValueError, match="expected a flat archive."): | ||
unzipflat(file=zip_path, dest_dir=tmp_path) | ||
|
||
|
||
def test_unzipflat_overwrite(tmp_path: Path) -> None: | ||
"""Test unzipflat with overwrite.""" | ||
# Create a file to zip. | ||
file = tmp_path / "file.txt" | ||
file.write_text("Hello, world!") | ||
# Zip the file. | ||
zip_path = tmp_path / "archive.zip" | ||
zipflat(files=[file], outfile=zip_path) | ||
# Unzip the file. | ||
unzip_path = tmp_path / "unzipped" | ||
unzip_path.mkdir() | ||
unzipflat(file=zip_path, dest_dir=unzip_path) | ||
# Try to unzip the file again. | ||
with pytest.raises(FileExistsError, match="file.txt"): | ||
unzipflat(file=zip_path, dest_dir=unzip_path) | ||
|
||
|
||
def test_unzipflat_not_zip(tmp_path: Path) -> None: | ||
"""Test unzipflat with a non-zip file.""" | ||
# Create a file to unzip. | ||
file = tmp_path / "file.txt" | ||
file.write_text("Hello, world!") | ||
# Try to unzip the file. | ||
with pytest.raises(ValueError, match="is not a zip archive."): | ||
unzipflat(file=file, dest_dir=tmp_path) |
Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.
Oops, something went wrong.