Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

tests: Test resulting filestructure for utils.archives.extract() #570

Merged
merged 8 commits into from
Sep 22, 2023
257 changes: 205 additions & 52 deletions tests/utils/archives_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import bz2
import gzip
import lzma
import os
import pathlib
import tarfile
import zipfile
Expand Down Expand Up @@ -55,12 +56,9 @@ def test_detect_file_type_no_suffixes():
params=[
(None, 'tar'),
(None, 'zip'),
('bz2', None),
('gz', None),
('tbz', None),
('tbz2', None),
('tgz', None),
('xz', None),
('bz2', 'tar'),
('bz2', 'zip'),
('gz', 'tar'),
Expand All @@ -70,12 +68,9 @@ def test_detect_file_type_no_suffixes():
ids=[
'tar_archive',
'zip_archive',
'bz2_compressed_archive',
'gz_compressed_archive',
'tbz_compressed_archive',
'tbz2_compressed_archive',
'tgz_compressed_archive',
'xz_compressed_archive',
'bz2_compressed_tar_archive',
'bz2_compressed_zip_archive',
'gz_compressed_tar_archive',
Expand All @@ -89,18 +84,14 @@ def fixture_archive(request, tmp_path):

# write tmp filepath
test_filepath = rootpath / 'test.file'
if extension in {'zip', 'tar'}:
test_filepath.write_text('test')
if compression in {'bz2', 'xz', 'tbz', 'tbz2', 'tgz', 'gz'} and extension is None:
test_filepath.write_bytes(b'test')
test_filepath.write_text('test')

top_level_directory = 'toplevel'

# add additional archive
filepath = rootpath / 'recursive.zip'
with zipfile.ZipFile(filepath, 'w') as zip_open:
zip_open.write(test_filepath)

# now remove original file again
test_filepath.unlink()
zip_open.write(test_filepath, arcname=test_filepath.name)

# declare archive path
if compression is None:
Expand All @@ -112,19 +103,16 @@ def fixture_archive(request, tmp_path):

if compression is None and extension == 'zip':
with zipfile.ZipFile(archive_path, 'w') as zip_open:
zip_open.write(filepath)
yield archive_path
zip_open.write(filepath, arcname=os.path.join(top_level_directory, filepath.name))

elif compression is not None and extension == 'zip':
comp_type = _ZIP_COMPRESSION_MAP[f'.{compression}']
with zipfile.ZipFile(archive_path, 'w', compression=comp_type) as zip_open:
zip_open.write(filepath)
yield archive_path
zip_open.write(filepath, arcname=os.path.join(top_level_directory, filepath.name))

elif compression is None and extension == 'tar':
with tarfile.TarFile.open(archive_path, 'w') as fp:
fp.add(filepath)
yield archive_path
fp.add(filepath, arcname=os.path.join(top_level_directory, filepath.name))

elif (
(compression is not None and extension == 'tar') or
Expand All @@ -135,26 +123,61 @@ def fixture_archive(request, tmp_path):
if compression in {'tgz'}:
compression = 'gz'
with tarfile.TarFile.open(archive_path, f'w:{compression}') as fp:
fp.add(filepath)
yield archive_path
fp.add(filepath, arcname=os.path.join(top_level_directory, filepath.name))

else:
raise ValueError(f'{request.param} not supported for archive fixture')

# now remove original files again
test_filepath.unlink()
filepath.unlink()

yield archive_path


@pytest.fixture(
name='compressed_file',
params=[
'bz2',
'gz',
'xz',
],
ids=[
'bz2_compressed_file',
'gz_compressed_file',
'xz_compressed_file',
],
)
def fixture_compressed_file(request, tmp_path):
rootpath = tmp_path
compression = request.param

elif compression == 'bz2' and extension is None:
with bz2.open(archive_path, 'wb') as fp:
fp.write(filepath.read_bytes())
yield archive_path
# write tmp filepath
test_filepath = rootpath / 'test.file'
test_filepath.write_bytes(b'test')

elif compression == 'gz' and extension is None:
with gzip.open(archive_path, 'wb') as fp:
fp.write(filepath.read_bytes())
yield archive_path
# declare archive path
compressed_filepath = rootpath / f'test.{compression}'

elif compression == 'xz' and extension is None:
with lzma.open(archive_path, 'wb') as fp:
fp.write(filepath.read_bytes())
yield archive_path
if compression == 'bz2':
with bz2.open(compressed_filepath, 'wb') as fp:
fp.write(test_filepath.read_bytes())

elif compression == 'gz':
with gzip.open(compressed_filepath, 'wb') as fp:
fp.write(test_filepath.read_bytes())

elif compression == 'xz':
with lzma.open(compressed_filepath, 'wb') as fp:
fp.write(test_filepath.read_bytes())

else:
raise ValueError(f'{request.param} not supported for archive fixture')
raise ValueError(f'{request.param} not supported for compressed file fixture')

# now remove original file again
test_filepath.unlink()

yield compressed_filepath


@pytest.fixture(
Expand All @@ -181,26 +204,90 @@ def fixture_unsupported_archive(request, tmp_path):


@pytest.mark.parametrize(
'recursive',
('recursive', 'remove_finished', 'expected_files'),
[
pytest.param(False, id='recursive_false'),
pytest.param(True, id='recursive_true'),
pytest.param(
False, False,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
),
id='recursive_false_remove_finished_false',
),
pytest.param(
False, True,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
),
id='recursive_false_remove_finished_true',
),
pytest.param(
True, False,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
os.path.join('toplevel', 'recursive'),
os.path.join('toplevel', 'recursive', 'test.file'),
),
id='recursive_true_remove_finished_false',
),
pytest.param(
True, True,
(
'toplevel',
os.path.join('toplevel', 'recursive'),
os.path.join('toplevel', 'recursive', 'test.file'),
),
id='recursive_true_remove_finished_true',
),
],
)
def test_extract_archive_destination_path_None(
recursive, remove_finished, expected_files, archive, tmp_path,
):
extract_archive(
source_path=archive,
destination_path=None,
recursive=recursive,
remove_finished=remove_finished,
)
result_files = {
str(file.relative_to(archive.parent)) for file in archive.parent.rglob('*')
}

expected_files = set(expected_files)
if not remove_finished:
expected_files.add(archive.name)
assert result_files == expected_files


@pytest.mark.parametrize(
'remove_finished',
('recursive', 'remove_finished'),
[
pytest.param(False, id='remove_finished_false'),
pytest.param(True, id='remove_finished_true'),
pytest.param(False, False, id='recursive_false_remove_finished_false'),
pytest.param(False, True, id='recursive_false_remove_finished_true'),
pytest.param(True, False, id='recursive_true_remove_finished_false'),
pytest.param(True, True, id='recursive_true_remove_finished_true'),
],
)
def test_extract_archive_destination_path_None(recursive, remove_finished, archive):
def test_extract_compressed_file_destination_path_None(
recursive, remove_finished, compressed_file, tmp_path,
):
extract_archive(
source_path=archive,
source_path=compressed_file,
destination_path=None,
recursive=recursive,
remove_finished=remove_finished,
)
result_files = {
str(file.relative_to(compressed_file.parent)) for file in compressed_file.parent.rglob('*')
}

expected_files = {'test'}
if not remove_finished:
expected_files.add(compressed_file.name)
assert result_files == expected_files


@pytest.mark.parametrize(
Expand Down Expand Up @@ -235,27 +322,93 @@ def test_extract_unsupported_archive_destination_path_None(


@pytest.mark.parametrize(
'recursive',
('recursive', 'remove_finished', 'expected_files'),
[
pytest.param(False, id='recursive_false'),
pytest.param(True, id='recursive_true'),
pytest.param(
False, False,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
),
id='recursive_false_remove_finished_false',
),
pytest.param(
False, True,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
),
id='recursive_false_remove_finished_true',
),
pytest.param(
True, False,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
os.path.join('toplevel', 'recursive'),
os.path.join('toplevel', 'recursive', 'test.file'),
),
id='recursive_true_remove_finished_false',
),
pytest.param(
True, True,
(
'toplevel',
os.path.join('toplevel', 'recursive'),
os.path.join('toplevel', 'recursive', 'test.file'),
),
id='recursive_true_remove_finished_true',
),
],
)
def test_extract_archive_destination_path_not_None(
recursive, remove_finished, archive, tmp_path, expected_files,
):
destination_path = tmp_path / pathlib.Path('tmpfoo')
extract_archive(
source_path=archive,
destination_path=destination_path,
recursive=recursive,
remove_finished=remove_finished,
)

if destination_path.is_file():
destination_path = destination_path.parent

result_files = {str(file.relative_to(destination_path)) for file in destination_path.rglob('*')}

assert result_files == set(expected_files)
assert archive.is_file() != remove_finished


@pytest.mark.parametrize(
'remove_finished',
('recursive', 'remove_finished'),
[
pytest.param(False, id='remove_finished_false'),
pytest.param(True, id='remove_finished_true'),
pytest.param(False, False, id='recursive_false_remove_finished_false'),
pytest.param(False, True, id='recursive_false_remove_finished_true'),
pytest.param(True, False, id='recursive_true_remove_finished_false'),
pytest.param(True, True, id='recursive_true_remove_finished_true'),
],
)
def test_extract_archive_destination_path_not_None(recursive, remove_finished, archive, tmp_path):
destination_path = tmp_path / pathlib.Path('tmpfoo')
def test_extract_compressed_file_destination_path_not_None(
recursive, remove_finished, compressed_file, tmp_path,
):
destination_filename = 'tmpfoo'
destination_path = tmp_path / pathlib.Path(destination_filename)
extract_archive(
source_path=archive,
source_path=compressed_file,
destination_path=destination_path,
recursive=recursive,
remove_finished=remove_finished,
)
result_files = {
str(file.relative_to(compressed_file.parent)) for file in compressed_file.parent.rglob('*')
}

expected_files = {destination_filename}
if not remove_finished:
expected_files.add(compressed_file.name)
assert result_files == expected_files


@pytest.mark.parametrize(
Expand Down