diff --git a/tests/utils/archives_test.py b/tests/utils/archives_test.py index 1f20818a1..a0a997e6e 100644 --- a/tests/utils/archives_test.py +++ b/tests/utils/archives_test.py @@ -21,6 +21,7 @@ import bz2 import gzip import lzma +import os import pathlib import tarfile import zipfile @@ -55,12 +56,9 @@ def test_detect_file_type_no_suffixes(): params=[ (None, 'tar'), (None, 'zip'), - ('bz2', None), - ('gz', None), ('tbz', None), ('tbz2', None), ('tgz', None), - ('xz', None), ('bz2', 'tar'), ('bz2', 'zip'), ('gz', 'tar'), @@ -70,12 +68,9 @@ def test_detect_file_type_no_suffixes(): ids=[ 'tar_archive', 'zip_archive', - 'bz2_compressed_archive', - 'gz_compressed_archive', 'tbz_compressed_archive', 'tbz2_compressed_archive', 'tgz_compressed_archive', - 'xz_compressed_archive', 'bz2_compressed_tar_archive', 'bz2_compressed_zip_archive', 'gz_compressed_tar_archive', @@ -89,18 +84,14 @@ def fixture_archive(request, tmp_path): # write tmp filepath test_filepath = rootpath / 'test.file' - if extension in {'zip', 'tar'}: - test_filepath.write_text('test') - if compression in {'bz2', 'xz', 'tbz', 'tbz2', 'tgz', 'gz'} and extension is None: - test_filepath.write_bytes(b'test') + test_filepath.write_text('test') + + top_level_directory = 'toplevel' # add additional archive filepath = rootpath / 'recursive.zip' with zipfile.ZipFile(filepath, 'w') as zip_open: - zip_open.write(test_filepath) - - # now remove original file again - test_filepath.unlink() + zip_open.write(test_filepath, arcname=test_filepath.name) # declare archive path if compression is None: @@ -112,19 +103,16 @@ def fixture_archive(request, tmp_path): if compression is None and extension == 'zip': with zipfile.ZipFile(archive_path, 'w') as zip_open: - zip_open.write(filepath) - yield archive_path + zip_open.write(filepath, arcname=os.path.join(top_level_directory, filepath.name)) elif compression is not None and extension == 'zip': comp_type = _ZIP_COMPRESSION_MAP[f'.{compression}'] with zipfile.ZipFile(archive_path, 'w', compression=comp_type) as zip_open: - zip_open.write(filepath) - yield archive_path + zip_open.write(filepath, arcname=os.path.join(top_level_directory, filepath.name)) elif compression is None and extension == 'tar': with tarfile.TarFile.open(archive_path, 'w') as fp: - fp.add(filepath) - yield archive_path + fp.add(filepath, arcname=os.path.join(top_level_directory, filepath.name)) elif ( (compression is not None and extension == 'tar') or @@ -135,26 +123,61 @@ def fixture_archive(request, tmp_path): if compression in {'tgz'}: compression = 'gz' with tarfile.TarFile.open(archive_path, f'w:{compression}') as fp: - fp.add(filepath) - yield archive_path + fp.add(filepath, arcname=os.path.join(top_level_directory, filepath.name)) + + else: + raise ValueError(f'{request.param} not supported for archive fixture') + + # now remove original files again + test_filepath.unlink() + filepath.unlink() + + yield archive_path + + +@pytest.fixture( + name='compressed_file', + params=[ + 'bz2', + 'gz', + 'xz', + ], + ids=[ + 'bz2_compressed_file', + 'gz_compressed_file', + 'xz_compressed_file', + ], +) +def fixture_compressed_file(request, tmp_path): + rootpath = tmp_path + compression = request.param - elif compression == 'bz2' and extension is None: - with bz2.open(archive_path, 'wb') as fp: - fp.write(filepath.read_bytes()) - yield archive_path + # write tmp filepath + test_filepath = rootpath / 'test.file' + test_filepath.write_bytes(b'test') - elif compression == 'gz' and extension is None: - with gzip.open(archive_path, 'wb') as fp: - fp.write(filepath.read_bytes()) - yield archive_path + # declare archive path + compressed_filepath = rootpath / f'test.{compression}' - elif compression == 'xz' and extension is None: - with lzma.open(archive_path, 'wb') as fp: - fp.write(filepath.read_bytes()) - yield archive_path + if compression == 'bz2': + with bz2.open(compressed_filepath, 'wb') as fp: + fp.write(test_filepath.read_bytes()) + + elif compression == 'gz': + with gzip.open(compressed_filepath, 'wb') as fp: + fp.write(test_filepath.read_bytes()) + + elif compression == 'xz': + with lzma.open(compressed_filepath, 'wb') as fp: + fp.write(test_filepath.read_bytes()) else: - raise ValueError(f'{request.param} not supported for archive fixture') + raise ValueError(f'{request.param} not supported for compressed file fixture') + + # now remove original file again + test_filepath.unlink() + + yield compressed_filepath @pytest.fixture( @@ -181,26 +204,90 @@ def fixture_unsupported_archive(request, tmp_path): @pytest.mark.parametrize( - 'recursive', + ('recursive', 'remove_finished', 'expected_files'), [ - pytest.param(False, id='recursive_false'), - pytest.param(True, id='recursive_true'), + pytest.param( + False, False, + ( + 'toplevel', + os.path.join('toplevel', 'recursive.zip'), + ), + id='recursive_false_remove_finished_false', + ), + pytest.param( + False, True, + ( + 'toplevel', + os.path.join('toplevel', 'recursive.zip'), + ), + id='recursive_false_remove_finished_true', + ), + pytest.param( + True, False, + ( + 'toplevel', + os.path.join('toplevel', 'recursive.zip'), + os.path.join('toplevel', 'recursive'), + os.path.join('toplevel', 'recursive', 'test.file'), + ), + id='recursive_true_remove_finished_false', + ), + pytest.param( + True, True, + ( + 'toplevel', + os.path.join('toplevel', 'recursive'), + os.path.join('toplevel', 'recursive', 'test.file'), + ), + id='recursive_true_remove_finished_true', + ), ], ) +def test_extract_archive_destination_path_None( + recursive, remove_finished, expected_files, archive, tmp_path, +): + extract_archive( + source_path=archive, + destination_path=None, + recursive=recursive, + remove_finished=remove_finished, + ) + result_files = { + str(file.relative_to(archive.parent)) for file in archive.parent.rglob('*') + } + + expected_files = set(expected_files) + if not remove_finished: + expected_files.add(archive.name) + assert result_files == expected_files + + @pytest.mark.parametrize( - 'remove_finished', + ('recursive', 'remove_finished'), [ - pytest.param(False, id='remove_finished_false'), - pytest.param(True, id='remove_finished_true'), + pytest.param(False, False, id='recursive_false_remove_finished_false'), + pytest.param(False, True, id='recursive_false_remove_finished_true'), + pytest.param(True, False, id='recursive_true_remove_finished_false'), + pytest.param(True, True, id='recursive_true_remove_finished_true'), ], ) -def test_extract_archive_destination_path_None(recursive, remove_finished, archive): +def test_extract_compressed_file_destination_path_None( + recursive, remove_finished, compressed_file, tmp_path, +): extract_archive( - source_path=archive, + source_path=compressed_file, destination_path=None, recursive=recursive, remove_finished=remove_finished, ) + result_files = { + str(file.relative_to(compressed_file.parent)) for file in compressed_file.parent.rglob('*') + } + + expected_files = {'test'} + if not remove_finished: + expected_files.add(compressed_file.name) + assert result_files == expected_files @pytest.mark.parametrize( @@ -235,27 +322,93 @@ def test_extract_unsupported_archive_destination_path_None( @pytest.mark.parametrize( - 'recursive', + ('recursive', 'remove_finished', 'expected_files'), [ - pytest.param(False, id='recursive_false'), - pytest.param(True, id='recursive_true'), + pytest.param( + False, False, + ( + 'toplevel', + os.path.join('toplevel', 'recursive.zip'), + ), + id='recursive_false_remove_finished_false', + ), + pytest.param( + False, True, + ( + 'toplevel', + os.path.join('toplevel', 'recursive.zip'), + ), + id='recursive_false_remove_finished_true', + ), + pytest.param( + True, False, + ( + 'toplevel', + os.path.join('toplevel', 'recursive.zip'), + os.path.join('toplevel', 'recursive'), + os.path.join('toplevel', 'recursive', 'test.file'), + ), + id='recursive_true_remove_finished_false', + ), + pytest.param( + True, True, + ( + 'toplevel', + os.path.join('toplevel', 'recursive'), + os.path.join('toplevel', 'recursive', 'test.file'), + ), + id='recursive_true_remove_finished_true', + ), ], ) +def test_extract_archive_destination_path_not_None( + recursive, remove_finished, archive, tmp_path, expected_files, +): + destination_path = tmp_path / pathlib.Path('tmpfoo') + extract_archive( + source_path=archive, + destination_path=destination_path, + recursive=recursive, + remove_finished=remove_finished, + ) + + if destination_path.is_file(): + destination_path = destination_path.parent + + result_files = {str(file.relative_to(destination_path)) for file in destination_path.rglob('*')} + + assert result_files == set(expected_files) + assert archive.is_file() != remove_finished + + @pytest.mark.parametrize( - 'remove_finished', + ('recursive', 'remove_finished'), [ - pytest.param(False, id='remove_finished_false'), - pytest.param(True, id='remove_finished_true'), + pytest.param(False, False, id='recursive_false_remove_finished_false'), + pytest.param(False, True, id='recursive_false_remove_finished_true'), + pytest.param(True, False, id='recursive_true_remove_finished_false'), + pytest.param(True, True, id='recursive_true_remove_finished_true'), ], ) -def test_extract_archive_destination_path_not_None(recursive, remove_finished, archive, tmp_path): - destination_path = tmp_path / pathlib.Path('tmpfoo') +def test_extract_compressed_file_destination_path_not_None( + recursive, remove_finished, compressed_file, tmp_path, +): + destination_filename = 'tmpfoo' + destination_path = tmp_path / pathlib.Path(destination_filename) extract_archive( - source_path=archive, + source_path=compressed_file, destination_path=destination_path, recursive=recursive, remove_finished=remove_finished, ) + result_files = { + str(file.relative_to(compressed_file.parent)) for file in compressed_file.parent.rglob('*') + } + + expected_files = {destination_filename} + if not remove_finished: + expected_files.add(compressed_file.name) + assert result_files == expected_files @pytest.mark.parametrize(