diff --git a/docs/manual/apps.rst b/docs/manual/apps.rst index 630d5657d..4c7f1b995 100644 --- a/docs/manual/apps.rst +++ b/docs/manual/apps.rst @@ -45,7 +45,7 @@ The tool can be used while ``wayback`` is running, and pywb will detect many cha It can be used to: * Create a new collection -- ``wb-manager init `` -* Add WARCs to collection -- ``wb-manager add `` +* Add WARCs or WACZs to collection -- ``wb-manager add `` * Add override templates * Add and remove metadata to a collections ``metadata.yaml`` * List all collections diff --git a/pywb/manager/manager.py b/pywb/manager/manager.py index 18c76ee77..40a8bef8f 100644 --- a/pywb/manager/manager.py +++ b/pywb/manager/manager.py @@ -5,12 +5,15 @@ import heapq import yaml import re +import gzip import six from distutils.util import strtobool from pkg_resources import resource_string, get_distribution from argparse import ArgumentParser, RawTextHelpFormatter +from tempfile import mkdtemp +from zipfile import ZipFile from pywb.utils.loaders import load_yaml_config from warcio.timeutils import timestamp20_now @@ -47,6 +50,9 @@ class CollectionsManager(object): COLLS_DIR = 'collections' + WARC_RX = re.compile(r'.*\.w?arc(\.gz)?$') + WACZ_RX = re.compile(r'.*\.wacz$') + def __init__(self, coll_name, colls_dir=None, must_exist=True): colls_dir = colls_dir or self.COLLS_DIR self.default_config = load_yaml_config(DEFAULT_CONFIG) @@ -115,29 +121,127 @@ def _assert_coll_exists(self): 'To create a new collection, run\n\n{1} init {0}') raise IOError(msg.format(self.coll_name, sys.argv[0])) - def add_warcs(self, warcs): + def add_archives(self, archives, uncompress_wacz=False): if not os.path.isdir(self.archive_dir): raise IOError('Directory {0} does not exist'. format(self.archive_dir)) - full_paths = [] - duplicate_warcs = [] - for filename in warcs: - filename = os.path.abspath(filename) + invalid_archives = [] + warc_paths = [] + for archive in archives: + if self.WARC_RX.match(archive): + full_path = self._add_warc(archive) + if full_path: + warc_paths.append(full_path) + elif self.WACZ_RX.match(archive): + if uncompress_wacz: + self._add_wacz_uncompressed(archive) + else: + raise NotImplementedError('Adding waczs without unpacking is not yet implemented. Use ' + '\'--uncompress-wacz\' flag to add the wacz\'s content.') + else: + invalid_archives.append(archive) + + self._index_merge_warcs(warc_paths, self.DEF_INDEX_FILE) + + if invalid_archives: + logging.warning(f'Invalid archives weren\'t added: {", ".join(invalid_archives)}') + + def _add_warc(self, warc): + filename = os.path.abspath(warc) + + # don't overwrite existing warcs with duplicate names + if os.path.exists(os.path.join(self.archive_dir, os.path.basename(filename))): + logging.warning(f'Warc {filename} wasn\'t added because of duplicate name.') + return None + + shutil.copy2(filename, self.archive_dir) + full_path = os.path.join(self.archive_dir, filename) + logging.info('Copied ' + filename + ' to ' + self.archive_dir) + return full_path + + def _add_wacz_uncompressed(self, wacz): + wacz = os.path.abspath(wacz) + temp_dir = mkdtemp() + warc_regex = re.compile(r'.+\.warc(\.gz)?$') + cdx_regex = re.compile(r'.+\.cdx(\.gz)?$') + with ZipFile(wacz, 'r') as wacz_zip_file: + archive_members = wacz_zip_file.namelist() + warc_files = [file for file in archive_members if warc_regex.match(file)] + if not warc_files: + logging.warning(f'WACZ {wacz} does not contain any warc files.') + return + + # extract warc files + for warc_file in warc_files: + wacz_zip_file.extract(warc_file, temp_dir) - # don't overwrite existing warcs with duplicate names - if os.path.exists(os.path.join(self.archive_dir, os.path.basename(filename))): - duplicate_warcs.append(filename) + cdx_files = [file for file in archive_members if cdx_regex.match(file)] + if not cdx_files: + logging.warning(f'WACZ {wacz} does not contain any indices.') + return + + for cdx_file in cdx_files: + wacz_zip_file.extract(cdx_file, temp_dir) + + # copy extracted warc files to collections archive dir, use wacz filename as filename with added index if + # multiple warc files exist + warc_filename_mapping = {} + full_paths = [] + for idx, extracted_warc_file in enumerate(warc_files): + _, warc_ext = os.path.splitext(extracted_warc_file) + if warc_ext == '.gz': + warc_ext = '.warc.gz' + warc_filename = os.path.basename(wacz) + warc_filename, _ = os.path.splitext(warc_filename) + warc_filename = f'{warc_filename}-{idx}{warc_ext}' + warc_destination_path = os.path.join(self.archive_dir, warc_filename) + + if os.path.exists(warc_destination_path): + logging.warning(f'Warc {warc_filename} wasn\'t added because of duplicate name.') continue - shutil.copy2(filename, self.archive_dir) - full_paths.append(os.path.join(self.archive_dir, filename)) - logging.info('Copied ' + filename + ' to ' + self.archive_dir) + warc_filename_mapping[os.path.basename(extracted_warc_file)] = warc_filename + shutil.copy2(os.path.join(temp_dir, extracted_warc_file), warc_destination_path) + full_paths.append(warc_destination_path) - self._index_merge_warcs(full_paths, self.DEF_INDEX_FILE) + # rewrite filenames in wacz indices and merge them with collection index file + for cdx_file in cdx_files: + self._add_wacz_index(os.path.join(self.indexes_dir, self.DEF_INDEX_FILE), os.path.join(temp_dir, cdx_file), + warc_filename_mapping) - if duplicate_warcs: - logging.warning(f'Warcs {", ".join(duplicate_warcs)} weren\'t added because of duplicate names.') + # delete temporary files + shutil.rmtree(temp_dir) + + @staticmethod + def _add_wacz_index(collection_index_path, wacz_index_path, filename_mapping): + from pywb.warcserver.index.cdxobject import CDXObject + + # copy collection index to temporary directory + tempdir = mkdtemp() + collection_index_name = os.path.basename(collection_index_path) + collection_index_temp_path = os.path.join(tempdir, collection_index_name) + + if os.path.exists(collection_index_path): + shutil.copy2(collection_index_path, collection_index_temp_path) + + with open(collection_index_temp_path, 'a') as collection_index_temp_file: + if wacz_index_path.endswith('.gz'): + wacz_index_file = gzip.open(wacz_index_path, 'rb') + else: + wacz_index_file = open(wacz_index_path, 'rb') + collection_index_temp_file.write('\n') + for line in wacz_index_file.readlines(): + cdx_object = CDXObject(cdxline=line) + if cdx_object['filename'] in filename_mapping: + cdx_object['filename'] = filename_mapping[cdx_object['filename']] + collection_index_temp_file.write(cdx_object.to_cdxj()) + + wacz_index_file.close() + + # copy temporary index back to original location and delete temporary directory + shutil.move(collection_index_temp_path, collection_index_path) + shutil.rmtree(tempdir) def reindex(self): cdx_file = os.path.join(self.indexes_dir, self.DEF_INDEX_FILE) @@ -383,16 +487,17 @@ def do_list(r): listcmd = subparsers.add_parser('list', help=list_help) listcmd.set_defaults(func=do_list) - # Add Warcs + # Add Warcs or Waczs def do_add(r): m = CollectionsManager(r.coll_name) - m.add_warcs(r.files) - - addwarc_help = 'Copy ARCS/WARCS to collection directory and reindex' - addwarc = subparsers.add_parser('add', help=addwarc_help) - addwarc.add_argument('coll_name') - addwarc.add_argument('files', nargs='+') - addwarc.set_defaults(func=do_add) + m.add_archives(r.files, r.uncompress_wacz) + + add_archives_help = 'Copy ARCS/WARCS/WACZ to collection directory and reindex' + add_archives = subparsers.add_parser('add', help=add_archives_help) + add_archives.add_argument('--uncompress-wacz', dest='uncompress_wacz', action='store_true') + add_archives.add_argument('coll_name') + add_archives.add_argument('files', nargs='+') + add_archives.set_defaults(func=do_add) # Reindex All def do_reindex(r): diff --git a/sample_archive/cdxj/example.cdx.gz b/sample_archive/cdxj/example.cdx.gz new file mode 100644 index 000000000..974659635 Binary files /dev/null and b/sample_archive/cdxj/example.cdx.gz differ diff --git a/sample_archive/waczs/invalid_example_1.wacz b/sample_archive/waczs/invalid_example_1.wacz new file mode 100644 index 000000000..f5dc1abf3 Binary files /dev/null and b/sample_archive/waczs/invalid_example_1.wacz differ diff --git a/sample_archive/waczs/valid_example_1.wacz b/sample_archive/waczs/valid_example_1.wacz new file mode 100644 index 000000000..2d4b7516d Binary files /dev/null and b/sample_archive/waczs/valid_example_1.wacz differ diff --git a/tests/test_manager.py b/tests/test_manager.py new file mode 100644 index 000000000..285e64f8e --- /dev/null +++ b/tests/test_manager.py @@ -0,0 +1,97 @@ +import os + +import pytest + +from pywb.manager.manager import CollectionsManager + +VALID_WACZ_PATH = 'sample_archive/waczs/valid_example_1.wacz' +INVALID_WACZ_PATH = 'sample_archive/waczs/invalid_example_1.wacz' + +TEST_COLLECTION_NAME = 'test-col' + + +class TestManager: + def test_add_valid_wacz_uncompressed(self, tmp_path): + """Test if adding a valid wacz file to a collection succeeds""" + manager = self.get_test_collections_manager(tmp_path) + manager._add_wacz_uncompressed(VALID_WACZ_PATH) + assert 'valid_example_1-0.warc' in os.listdir(manager.archive_dir) + assert manager.DEF_INDEX_FILE in os.listdir(manager.indexes_dir) + with open(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), 'r') as f: + assert '"filename": "valid_example_1-0.warc"' in f.read() + + def test_add_invalid_wacz_uncompressed(self, tmp_path, caplog): + """Test if adding an invalid wacz file to a collection fails""" + manager = self.get_test_collections_manager(tmp_path) + manager._add_wacz_uncompressed(INVALID_WACZ_PATH) + assert 'invalid_example_1-0.warc' not in os.listdir(manager.archive_dir) + assert 'sample_archive/waczs/invalid_example_1.wacz does not contain any warc files.' in caplog.text + + index_path = os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE) + if os.path.exists(index_path): + with open(index_path, 'r') as f: + assert '"filename": "invalid_example_1-0.warc"' not in f.read() + + def test_add_valid_archives_uncompressed_wacz(self, tmp_path): + manager = self.get_test_collections_manager(tmp_path) + archives = ['sample_archive/warcs/example.arc', 'sample_archive/warcs/example.arc.gz', + 'sample_archive/warcs/example.warc', 'sample_archive/warcs/example.warc.gz', + 'sample_archive/waczs/valid_example_1.wacz'] + manager.add_archives(archives, uncompress_wacz=True) + + with open(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), 'r') as f: + index_text = f.read() + + for archive in archives: + archive = os.path.basename(archive) + + if archive.endswith('wacz'): + archive = 'valid_example_1-0.warc' + + assert archive in os.listdir(manager.archive_dir) + assert archive in index_text + + def test_add_valid_archives_dont_uncompress_wacz(self, tmp_path): + manager = self.get_test_collections_manager(tmp_path) + archives = ['sample_archive/warcs/example.arc', 'sample_archive/warcs/example.arc.gz', + 'sample_archive/warcs/example.warc', 'sample_archive/warcs/example.warc.gz', + 'sample_archive/waczs/valid_example_1.wacz'] + + with pytest.raises(NotImplementedError): + manager.add_archives(archives, uncompress_wacz=False) + + def test_add_invalid_archives_uncompress_wacz(self, tmp_path, caplog): + manager = self.get_test_collections_manager(tmp_path) + manager.add_archives(['sample_archive/warcs/example.warc', 'sample_archive/text_content/sample.html'], + uncompress_wacz=True) + assert 'sample.html' not in os.listdir(manager.archive_dir) + assert 'example.warc' in os.listdir(manager.archive_dir) + assert "Invalid archives weren't added: sample_archive/text_content/sample.html" in caplog.messages + + def test_merge_wacz_index(self, tmp_path): + manager = self.get_test_collections_manager(tmp_path) + manager._add_wacz_index(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), + 'sample_archive/cdxj/example.cdxj', + {'example.warc.gz': 'rewritten.warc.gz'}) + with open(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), 'r') as f: + index_content = f.read() + + assert 'example.warc.gz' not in index_content + assert 'rewritten.warc.gz' in index_content + + def test_merge_wacz_index_gzip(self, tmp_path): + manager = self.get_test_collections_manager(tmp_path) + manager._add_wacz_index(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), + 'sample_archive/cdxj/example.cdx.gz', + {'example-collection.warc': 'rewritten.warc'}) + with open(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), 'r') as f: + index_content = f.read() + + assert 'example-collection.warc' not in index_content + assert 'rewritten.warc' in index_content + + @staticmethod + def get_test_collections_manager(collections_path): + manager = CollectionsManager(TEST_COLLECTION_NAME, colls_dir=collections_path, must_exist=False) + manager.add_collection() + return manager