Skip to content

Commit

Permalink
tests: Test resulting filestructure for utils.archives.extract() (#570)
Browse files Browse the repository at this point in the history
  • Loading branch information
dkrako authored Sep 22, 2023
1 parent 4edba55 commit 6927c7c
Showing 1 changed file with 205 additions and 52 deletions.
257 changes: 205 additions & 52 deletions tests/utils/archives_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import bz2
import gzip
import lzma
import os
import pathlib
import tarfile
import zipfile
Expand Down Expand Up @@ -55,12 +56,9 @@ def test_detect_file_type_no_suffixes():
params=[
(None, 'tar'),
(None, 'zip'),
('bz2', None),
('gz', None),
('tbz', None),
('tbz2', None),
('tgz', None),
('xz', None),
('bz2', 'tar'),
('bz2', 'zip'),
('gz', 'tar'),
Expand All @@ -70,12 +68,9 @@ def test_detect_file_type_no_suffixes():
ids=[
'tar_archive',
'zip_archive',
'bz2_compressed_archive',
'gz_compressed_archive',
'tbz_compressed_archive',
'tbz2_compressed_archive',
'tgz_compressed_archive',
'xz_compressed_archive',
'bz2_compressed_tar_archive',
'bz2_compressed_zip_archive',
'gz_compressed_tar_archive',
Expand All @@ -89,18 +84,14 @@ def fixture_archive(request, tmp_path):

# write tmp filepath
test_filepath = rootpath / 'test.file'
if extension in {'zip', 'tar'}:
test_filepath.write_text('test')
if compression in {'bz2', 'xz', 'tbz', 'tbz2', 'tgz', 'gz'} and extension is None:
test_filepath.write_bytes(b'test')
test_filepath.write_text('test')

top_level_directory = 'toplevel'

# add additional archive
filepath = rootpath / 'recursive.zip'
with zipfile.ZipFile(filepath, 'w') as zip_open:
zip_open.write(test_filepath)

# now remove original file again
test_filepath.unlink()
zip_open.write(test_filepath, arcname=test_filepath.name)

# declare archive path
if compression is None:
Expand All @@ -112,19 +103,16 @@ def fixture_archive(request, tmp_path):

if compression is None and extension == 'zip':
with zipfile.ZipFile(archive_path, 'w') as zip_open:
zip_open.write(filepath)
yield archive_path
zip_open.write(filepath, arcname=os.path.join(top_level_directory, filepath.name))

elif compression is not None and extension == 'zip':
comp_type = _ZIP_COMPRESSION_MAP[f'.{compression}']
with zipfile.ZipFile(archive_path, 'w', compression=comp_type) as zip_open:
zip_open.write(filepath)
yield archive_path
zip_open.write(filepath, arcname=os.path.join(top_level_directory, filepath.name))

elif compression is None and extension == 'tar':
with tarfile.TarFile.open(archive_path, 'w') as fp:
fp.add(filepath)
yield archive_path
fp.add(filepath, arcname=os.path.join(top_level_directory, filepath.name))

elif (
(compression is not None and extension == 'tar') or
Expand All @@ -135,26 +123,61 @@ def fixture_archive(request, tmp_path):
if compression in {'tgz'}:
compression = 'gz'
with tarfile.TarFile.open(archive_path, f'w:{compression}') as fp:
fp.add(filepath)
yield archive_path
fp.add(filepath, arcname=os.path.join(top_level_directory, filepath.name))

else:
raise ValueError(f'{request.param} not supported for archive fixture')

# now remove original files again
test_filepath.unlink()
filepath.unlink()

yield archive_path


@pytest.fixture(
name='compressed_file',
params=[
'bz2',
'gz',
'xz',
],
ids=[
'bz2_compressed_file',
'gz_compressed_file',
'xz_compressed_file',
],
)
def fixture_compressed_file(request, tmp_path):
rootpath = tmp_path
compression = request.param

elif compression == 'bz2' and extension is None:
with bz2.open(archive_path, 'wb') as fp:
fp.write(filepath.read_bytes())
yield archive_path
# write tmp filepath
test_filepath = rootpath / 'test.file'
test_filepath.write_bytes(b'test')

elif compression == 'gz' and extension is None:
with gzip.open(archive_path, 'wb') as fp:
fp.write(filepath.read_bytes())
yield archive_path
# declare archive path
compressed_filepath = rootpath / f'test.{compression}'

elif compression == 'xz' and extension is None:
with lzma.open(archive_path, 'wb') as fp:
fp.write(filepath.read_bytes())
yield archive_path
if compression == 'bz2':
with bz2.open(compressed_filepath, 'wb') as fp:
fp.write(test_filepath.read_bytes())

elif compression == 'gz':
with gzip.open(compressed_filepath, 'wb') as fp:
fp.write(test_filepath.read_bytes())

elif compression == 'xz':
with lzma.open(compressed_filepath, 'wb') as fp:
fp.write(test_filepath.read_bytes())

else:
raise ValueError(f'{request.param} not supported for archive fixture')
raise ValueError(f'{request.param} not supported for compressed file fixture')

# now remove original file again
test_filepath.unlink()

yield compressed_filepath


@pytest.fixture(
Expand All @@ -181,26 +204,90 @@ def fixture_unsupported_archive(request, tmp_path):


@pytest.mark.parametrize(
'recursive',
('recursive', 'remove_finished', 'expected_files'),
[
pytest.param(False, id='recursive_false'),
pytest.param(True, id='recursive_true'),
pytest.param(
False, False,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
),
id='recursive_false_remove_finished_false',
),
pytest.param(
False, True,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
),
id='recursive_false_remove_finished_true',
),
pytest.param(
True, False,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
os.path.join('toplevel', 'recursive'),
os.path.join('toplevel', 'recursive', 'test.file'),
),
id='recursive_true_remove_finished_false',
),
pytest.param(
True, True,
(
'toplevel',
os.path.join('toplevel', 'recursive'),
os.path.join('toplevel', 'recursive', 'test.file'),
),
id='recursive_true_remove_finished_true',
),
],
)
def test_extract_archive_destination_path_None(
recursive, remove_finished, expected_files, archive, tmp_path,
):
extract_archive(
source_path=archive,
destination_path=None,
recursive=recursive,
remove_finished=remove_finished,
)
result_files = {
str(file.relative_to(archive.parent)) for file in archive.parent.rglob('*')
}

expected_files = set(expected_files)
if not remove_finished:
expected_files.add(archive.name)
assert result_files == expected_files


@pytest.mark.parametrize(
'remove_finished',
('recursive', 'remove_finished'),
[
pytest.param(False, id='remove_finished_false'),
pytest.param(True, id='remove_finished_true'),
pytest.param(False, False, id='recursive_false_remove_finished_false'),
pytest.param(False, True, id='recursive_false_remove_finished_true'),
pytest.param(True, False, id='recursive_true_remove_finished_false'),
pytest.param(True, True, id='recursive_true_remove_finished_true'),
],
)
def test_extract_archive_destination_path_None(recursive, remove_finished, archive):
def test_extract_compressed_file_destination_path_None(
recursive, remove_finished, compressed_file, tmp_path,
):
extract_archive(
source_path=archive,
source_path=compressed_file,
destination_path=None,
recursive=recursive,
remove_finished=remove_finished,
)
result_files = {
str(file.relative_to(compressed_file.parent)) for file in compressed_file.parent.rglob('*')
}

expected_files = {'test'}
if not remove_finished:
expected_files.add(compressed_file.name)
assert result_files == expected_files


@pytest.mark.parametrize(
Expand Down Expand Up @@ -235,27 +322,93 @@ def test_extract_unsupported_archive_destination_path_None(


@pytest.mark.parametrize(
'recursive',
('recursive', 'remove_finished', 'expected_files'),
[
pytest.param(False, id='recursive_false'),
pytest.param(True, id='recursive_true'),
pytest.param(
False, False,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
),
id='recursive_false_remove_finished_false',
),
pytest.param(
False, True,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
),
id='recursive_false_remove_finished_true',
),
pytest.param(
True, False,
(
'toplevel',
os.path.join('toplevel', 'recursive.zip'),
os.path.join('toplevel', 'recursive'),
os.path.join('toplevel', 'recursive', 'test.file'),
),
id='recursive_true_remove_finished_false',
),
pytest.param(
True, True,
(
'toplevel',
os.path.join('toplevel', 'recursive'),
os.path.join('toplevel', 'recursive', 'test.file'),
),
id='recursive_true_remove_finished_true',
),
],
)
def test_extract_archive_destination_path_not_None(
recursive, remove_finished, archive, tmp_path, expected_files,
):
destination_path = tmp_path / pathlib.Path('tmpfoo')
extract_archive(
source_path=archive,
destination_path=destination_path,
recursive=recursive,
remove_finished=remove_finished,
)

if destination_path.is_file():
destination_path = destination_path.parent

result_files = {str(file.relative_to(destination_path)) for file in destination_path.rglob('*')}

assert result_files == set(expected_files)
assert archive.is_file() != remove_finished


@pytest.mark.parametrize(
'remove_finished',
('recursive', 'remove_finished'),
[
pytest.param(False, id='remove_finished_false'),
pytest.param(True, id='remove_finished_true'),
pytest.param(False, False, id='recursive_false_remove_finished_false'),
pytest.param(False, True, id='recursive_false_remove_finished_true'),
pytest.param(True, False, id='recursive_true_remove_finished_false'),
pytest.param(True, True, id='recursive_true_remove_finished_true'),
],
)
def test_extract_archive_destination_path_not_None(recursive, remove_finished, archive, tmp_path):
destination_path = tmp_path / pathlib.Path('tmpfoo')
def test_extract_compressed_file_destination_path_not_None(
recursive, remove_finished, compressed_file, tmp_path,
):
destination_filename = 'tmpfoo'
destination_path = tmp_path / pathlib.Path(destination_filename)
extract_archive(
source_path=archive,
source_path=compressed_file,
destination_path=destination_path,
recursive=recursive,
remove_finished=remove_finished,
)
result_files = {
str(file.relative_to(compressed_file.parent)) for file in compressed_file.parent.rglob('*')
}

expected_files = {destination_filename}
if not remove_finished:
expected_files.add(compressed_file.name)
assert result_files == expected_files


@pytest.mark.parametrize(
Expand Down

0 comments on commit 6927c7c

Please sign in to comment.