Skip to content

Commit

Permalink
[#799] wb-manager: Add wacz archives to collection with --uncompress-…
Browse files Browse the repository at this point in the history
…wacz (#800)

Add WACZ support for `wb-manager add` by unpacking WACZ files with --uncompress-wacz.

A future commit will add pywb support for WACZ files without requiring them to be unpacked.
  • Loading branch information
kuechensofa authored Feb 15, 2023
1 parent b869330 commit 454486b
Show file tree
Hide file tree
Showing 6 changed files with 225 additions and 23 deletions.
2 changes: 1 addition & 1 deletion docs/manual/apps.rst
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ The tool can be used while ``wayback`` is running, and pywb will detect many cha
It can be used to:

* Create a new collection -- ``wb-manager init <coll>``
* Add WARCs to collection -- ``wb-manager add <coll> <warc>``
* Add WARCs or WACZs to collection -- ``wb-manager add <coll> <warc/wacz>``
* Add override templates
* Add and remove metadata to a collections ``metadata.yaml``
* List all collections
Expand Down
149 changes: 127 additions & 22 deletions pywb/manager/manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,15 @@
import heapq
import yaml
import re
import gzip
import six

from distutils.util import strtobool
from pkg_resources import resource_string, get_distribution

from argparse import ArgumentParser, RawTextHelpFormatter
from tempfile import mkdtemp
from zipfile import ZipFile

from pywb.utils.loaders import load_yaml_config
from warcio.timeutils import timestamp20_now
Expand Down Expand Up @@ -47,6 +50,9 @@ class CollectionsManager(object):

COLLS_DIR = 'collections'

WARC_RX = re.compile(r'.*\.w?arc(\.gz)?$')
WACZ_RX = re.compile(r'.*\.wacz$')

def __init__(self, coll_name, colls_dir=None, must_exist=True):
colls_dir = colls_dir or self.COLLS_DIR
self.default_config = load_yaml_config(DEFAULT_CONFIG)
Expand Down Expand Up @@ -115,29 +121,127 @@ def _assert_coll_exists(self):
'To create a new collection, run\n\n{1} init {0}')
raise IOError(msg.format(self.coll_name, sys.argv[0]))

def add_warcs(self, warcs):
def add_archives(self, archives, uncompress_wacz=False):
if not os.path.isdir(self.archive_dir):
raise IOError('Directory {0} does not exist'.
format(self.archive_dir))

full_paths = []
duplicate_warcs = []
for filename in warcs:
filename = os.path.abspath(filename)
invalid_archives = []
warc_paths = []
for archive in archives:
if self.WARC_RX.match(archive):
full_path = self._add_warc(archive)
if full_path:
warc_paths.append(full_path)
elif self.WACZ_RX.match(archive):
if uncompress_wacz:
self._add_wacz_uncompressed(archive)
else:
raise NotImplementedError('Adding waczs without unpacking is not yet implemented. Use '
'\'--uncompress-wacz\' flag to add the wacz\'s content.')
else:
invalid_archives.append(archive)

self._index_merge_warcs(warc_paths, self.DEF_INDEX_FILE)

if invalid_archives:
logging.warning(f'Invalid archives weren\'t added: {", ".join(invalid_archives)}')

def _add_warc(self, warc):
filename = os.path.abspath(warc)

# don't overwrite existing warcs with duplicate names
if os.path.exists(os.path.join(self.archive_dir, os.path.basename(filename))):
logging.warning(f'Warc {filename} wasn\'t added because of duplicate name.')
return None

shutil.copy2(filename, self.archive_dir)
full_path = os.path.join(self.archive_dir, filename)
logging.info('Copied ' + filename + ' to ' + self.archive_dir)
return full_path

def _add_wacz_uncompressed(self, wacz):
wacz = os.path.abspath(wacz)
temp_dir = mkdtemp()
warc_regex = re.compile(r'.+\.warc(\.gz)?$')
cdx_regex = re.compile(r'.+\.cdx(\.gz)?$')
with ZipFile(wacz, 'r') as wacz_zip_file:
archive_members = wacz_zip_file.namelist()
warc_files = [file for file in archive_members if warc_regex.match(file)]
if not warc_files:
logging.warning(f'WACZ {wacz} does not contain any warc files.')
return

# extract warc files
for warc_file in warc_files:
wacz_zip_file.extract(warc_file, temp_dir)

# don't overwrite existing warcs with duplicate names
if os.path.exists(os.path.join(self.archive_dir, os.path.basename(filename))):
duplicate_warcs.append(filename)
cdx_files = [file for file in archive_members if cdx_regex.match(file)]
if not cdx_files:
logging.warning(f'WACZ {wacz} does not contain any indices.')
return

for cdx_file in cdx_files:
wacz_zip_file.extract(cdx_file, temp_dir)

# copy extracted warc files to collections archive dir, use wacz filename as filename with added index if
# multiple warc files exist
warc_filename_mapping = {}
full_paths = []
for idx, extracted_warc_file in enumerate(warc_files):
_, warc_ext = os.path.splitext(extracted_warc_file)
if warc_ext == '.gz':
warc_ext = '.warc.gz'
warc_filename = os.path.basename(wacz)
warc_filename, _ = os.path.splitext(warc_filename)
warc_filename = f'{warc_filename}-{idx}{warc_ext}'
warc_destination_path = os.path.join(self.archive_dir, warc_filename)

if os.path.exists(warc_destination_path):
logging.warning(f'Warc {warc_filename} wasn\'t added because of duplicate name.')
continue

shutil.copy2(filename, self.archive_dir)
full_paths.append(os.path.join(self.archive_dir, filename))
logging.info('Copied ' + filename + ' to ' + self.archive_dir)
warc_filename_mapping[os.path.basename(extracted_warc_file)] = warc_filename
shutil.copy2(os.path.join(temp_dir, extracted_warc_file), warc_destination_path)
full_paths.append(warc_destination_path)

self._index_merge_warcs(full_paths, self.DEF_INDEX_FILE)
# rewrite filenames in wacz indices and merge them with collection index file
for cdx_file in cdx_files:
self._add_wacz_index(os.path.join(self.indexes_dir, self.DEF_INDEX_FILE), os.path.join(temp_dir, cdx_file),
warc_filename_mapping)

if duplicate_warcs:
logging.warning(f'Warcs {", ".join(duplicate_warcs)} weren\'t added because of duplicate names.')
# delete temporary files
shutil.rmtree(temp_dir)

@staticmethod
def _add_wacz_index(collection_index_path, wacz_index_path, filename_mapping):
from pywb.warcserver.index.cdxobject import CDXObject

# copy collection index to temporary directory
tempdir = mkdtemp()
collection_index_name = os.path.basename(collection_index_path)
collection_index_temp_path = os.path.join(tempdir, collection_index_name)

if os.path.exists(collection_index_path):
shutil.copy2(collection_index_path, collection_index_temp_path)

with open(collection_index_temp_path, 'a') as collection_index_temp_file:
if wacz_index_path.endswith('.gz'):
wacz_index_file = gzip.open(wacz_index_path, 'rb')
else:
wacz_index_file = open(wacz_index_path, 'rb')
collection_index_temp_file.write('\n')
for line in wacz_index_file.readlines():
cdx_object = CDXObject(cdxline=line)
if cdx_object['filename'] in filename_mapping:
cdx_object['filename'] = filename_mapping[cdx_object['filename']]
collection_index_temp_file.write(cdx_object.to_cdxj())

wacz_index_file.close()

# copy temporary index back to original location and delete temporary directory
shutil.move(collection_index_temp_path, collection_index_path)
shutil.rmtree(tempdir)

def reindex(self):
cdx_file = os.path.join(self.indexes_dir, self.DEF_INDEX_FILE)
Expand Down Expand Up @@ -383,16 +487,17 @@ def do_list(r):
listcmd = subparsers.add_parser('list', help=list_help)
listcmd.set_defaults(func=do_list)

# Add Warcs
# Add Warcs or Waczs
def do_add(r):
m = CollectionsManager(r.coll_name)
m.add_warcs(r.files)

addwarc_help = 'Copy ARCS/WARCS to collection directory and reindex'
addwarc = subparsers.add_parser('add', help=addwarc_help)
addwarc.add_argument('coll_name')
addwarc.add_argument('files', nargs='+')
addwarc.set_defaults(func=do_add)
m.add_archives(r.files, r.uncompress_wacz)

add_archives_help = 'Copy ARCS/WARCS/WACZ to collection directory and reindex'
add_archives = subparsers.add_parser('add', help=add_archives_help)
add_archives.add_argument('--uncompress-wacz', dest='uncompress_wacz', action='store_true')
add_archives.add_argument('coll_name')
add_archives.add_argument('files', nargs='+')
add_archives.set_defaults(func=do_add)

# Reindex All
def do_reindex(r):
Expand Down
Binary file added sample_archive/cdxj/example.cdx.gz
Binary file not shown.
Binary file added sample_archive/waczs/invalid_example_1.wacz
Binary file not shown.
Binary file added sample_archive/waczs/valid_example_1.wacz
Binary file not shown.
97 changes: 97 additions & 0 deletions tests/test_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,97 @@
import os

import pytest

from pywb.manager.manager import CollectionsManager

VALID_WACZ_PATH = 'sample_archive/waczs/valid_example_1.wacz'
INVALID_WACZ_PATH = 'sample_archive/waczs/invalid_example_1.wacz'

TEST_COLLECTION_NAME = 'test-col'


class TestManager:
def test_add_valid_wacz_uncompressed(self, tmp_path):
"""Test if adding a valid wacz file to a collection succeeds"""
manager = self.get_test_collections_manager(tmp_path)
manager._add_wacz_uncompressed(VALID_WACZ_PATH)
assert 'valid_example_1-0.warc' in os.listdir(manager.archive_dir)
assert manager.DEF_INDEX_FILE in os.listdir(manager.indexes_dir)
with open(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), 'r') as f:
assert '"filename": "valid_example_1-0.warc"' in f.read()

def test_add_invalid_wacz_uncompressed(self, tmp_path, caplog):
"""Test if adding an invalid wacz file to a collection fails"""
manager = self.get_test_collections_manager(tmp_path)
manager._add_wacz_uncompressed(INVALID_WACZ_PATH)
assert 'invalid_example_1-0.warc' not in os.listdir(manager.archive_dir)
assert 'sample_archive/waczs/invalid_example_1.wacz does not contain any warc files.' in caplog.text

index_path = os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE)
if os.path.exists(index_path):
with open(index_path, 'r') as f:
assert '"filename": "invalid_example_1-0.warc"' not in f.read()

def test_add_valid_archives_uncompressed_wacz(self, tmp_path):
manager = self.get_test_collections_manager(tmp_path)
archives = ['sample_archive/warcs/example.arc', 'sample_archive/warcs/example.arc.gz',
'sample_archive/warcs/example.warc', 'sample_archive/warcs/example.warc.gz',
'sample_archive/waczs/valid_example_1.wacz']
manager.add_archives(archives, uncompress_wacz=True)

with open(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), 'r') as f:
index_text = f.read()

for archive in archives:
archive = os.path.basename(archive)

if archive.endswith('wacz'):
archive = 'valid_example_1-0.warc'

assert archive in os.listdir(manager.archive_dir)
assert archive in index_text

def test_add_valid_archives_dont_uncompress_wacz(self, tmp_path):
manager = self.get_test_collections_manager(tmp_path)
archives = ['sample_archive/warcs/example.arc', 'sample_archive/warcs/example.arc.gz',
'sample_archive/warcs/example.warc', 'sample_archive/warcs/example.warc.gz',
'sample_archive/waczs/valid_example_1.wacz']

with pytest.raises(NotImplementedError):
manager.add_archives(archives, uncompress_wacz=False)

def test_add_invalid_archives_uncompress_wacz(self, tmp_path, caplog):
manager = self.get_test_collections_manager(tmp_path)
manager.add_archives(['sample_archive/warcs/example.warc', 'sample_archive/text_content/sample.html'],
uncompress_wacz=True)
assert 'sample.html' not in os.listdir(manager.archive_dir)
assert 'example.warc' in os.listdir(manager.archive_dir)
assert "Invalid archives weren't added: sample_archive/text_content/sample.html" in caplog.messages

def test_merge_wacz_index(self, tmp_path):
manager = self.get_test_collections_manager(tmp_path)
manager._add_wacz_index(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE),
'sample_archive/cdxj/example.cdxj',
{'example.warc.gz': 'rewritten.warc.gz'})
with open(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), 'r') as f:
index_content = f.read()

assert 'example.warc.gz' not in index_content
assert 'rewritten.warc.gz' in index_content

def test_merge_wacz_index_gzip(self, tmp_path):
manager = self.get_test_collections_manager(tmp_path)
manager._add_wacz_index(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE),
'sample_archive/cdxj/example.cdx.gz',
{'example-collection.warc': 'rewritten.warc'})
with open(os.path.join(manager.indexes_dir, manager.DEF_INDEX_FILE), 'r') as f:
index_content = f.read()

assert 'example-collection.warc' not in index_content
assert 'rewritten.warc' in index_content

@staticmethod
def get_test_collections_manager(collections_path):
manager = CollectionsManager(TEST_COLLECTION_NAME, colls_dir=collections_path, must_exist=False)
manager.add_collection()
return manager

0 comments on commit 454486b

Please sign in to comment.