From 9df816beab257345e7ba0e342bb7b33b2371dd21 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Thu, 25 Jul 2024 10:14:19 -0700 Subject: [PATCH 1/9] switch to async streaming download: - remove unused sync functions - use async methods from stream-zip - note that stream-zip still does a sync->async conversion under the hood - follow-up to #1933 for streaming download improvements --- backend/btrixcloud/storages.py | 82 +++++++++------------------------- 1 file changed, 20 insertions(+), 62 deletions(-) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index d5529c2801..3c783e3acc 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -9,9 +9,9 @@ List, Dict, AsyncIterator, + AsyncIterable, TYPE_CHECKING, Any, - cast, ) from urllib.parse import urlsplit from contextlib import asynccontextmanager @@ -27,13 +27,12 @@ from zipfile import ZipInfo from fastapi import Depends, HTTPException -from stream_zip import stream_zip, NO_COMPRESSION_64, Method +from stream_zip import async_stream_zip, NO_COMPRESSION_64, Method from remotezip import RemoteZip import aiobotocore.session -import boto3 +import aiohttp -from mypy_boto3_s3.client import S3Client from mypy_boto3_s3.type_defs import CompletedPartTypeDef from types_aiobotocore_s3 import S3Client as AIOS3Client @@ -289,35 +288,6 @@ async def get_s3_client( ) as client: yield client, bucket, key - @asynccontextmanager - async def get_sync_client( - self, org: Organization - ) -> AsyncIterator[tuple[S3Client, str, str]]: - """context manager for s3 client""" - storage = self.get_org_primary_storage(org) - - endpoint_url = storage.endpoint_url - - if not endpoint_url.endswith("/"): - endpoint_url += "/" - - parts = urlsplit(endpoint_url) - bucket, key = parts.path[1:].split("/", 1) - - endpoint_url = parts.scheme + "://" + parts.netloc - - try: - client = boto3.client( - "s3", - region_name=storage.region, - endpoint_url=endpoint_url, - aws_access_key_id=storage.access_key, - aws_secret_access_key=storage.secret_key, - ) - yield client, bucket, key - finally: - client.close() - async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None: """Test credentials and storage endpoint by uploading an empty test file""" @@ -682,25 +652,28 @@ def _sync_get_filestream(self, wacz_url: str, filename: str) -> Iterator[bytes]: with remote_zip.open(filename) as file_stream: yield from file_stream - def _sync_dl( - self, all_files: List[CrawlFileOut], client: S3Client, bucket: str, key: str - ) -> Iterator[bytes]: + async def download_streaming_wacz( + self, _: Organization, all_files: List[CrawlFileOut] + ) -> AsyncIterable[bytes]: """generate streaming zip as sync""" - for file_ in all_files: - file_.path = file_.name - datapackage = { "profile": "multi-wacz-package", "resources": [file_.dict() for file_ in all_files], } datapackage_bytes = json.dumps(datapackage).encode("utf-8") - def get_file(name) -> Iterator[bytes]: - response = client.get_object(Bucket=bucket, Key=key + name) - return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE) + async def get_datapackage() -> AsyncIterable[bytes]: + yield datapackage_bytes + + async def get_file(path: str) -> AsyncIterable[bytes]: + path = self.resolve_internal_access_path(path) + async with aiohttp.ClientSession() as session: + async with session.get(path) as response: + async for chunk in response.content.iter_chunked(CHUNK_SIZE): + yield chunk - def member_files() -> ( - Iterable[tuple[str, datetime, int, Method, Iterable[bytes]]] + async def member_files() -> ( + AsyncIterable[tuple[str, datetime, int, Method, AsyncIterable[bytes]]] ): modified_at = datetime(year=1980, month=1, day=1) perms = 0o664 @@ -710,7 +683,7 @@ def member_files() -> ( modified_at, perms, NO_COMPRESSION_64(file_.size, 0), - get_file(file_.name), + get_file(file_.path), ) yield ( @@ -720,25 +693,10 @@ def member_files() -> ( NO_COMPRESSION_64( len(datapackage_bytes), zlib.crc32(datapackage_bytes) ), - (datapackage_bytes,), - ) - - # stream_zip() is an Iterator but defined as an Iterable, can cast - return cast(Iterator[bytes], stream_zip(member_files(), chunk_size=CHUNK_SIZE)) - - async def download_streaming_wacz( - self, org: Organization, files: List[CrawlFileOut] - ) -> Iterator[bytes]: - """return an iter for downloading a stream nested wacz file - from list of files""" - async with self.get_sync_client(org) as (client, bucket, key): - loop = asyncio.get_event_loop() - - resp = await loop.run_in_executor( - None, self._sync_dl, files, client, bucket, key + get_datapackage(), ) - return resp + return async_stream_zip(member_files(), chunk_size=CHUNK_SIZE) # ============================================================================ From 6036319fd6307f674c34f620f7b4f66b0d08be07 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 2 Oct 2024 20:12:33 -0700 Subject: [PATCH 2/9] fix path == name in datapackage.json, to match previous behavior fully remove boto tests: update test to ensure multi-wacz resources name == path --- backend/btrixcloud/storages.py | 4 ++-- backend/requirements.txt | 2 -- backend/test/test_run_crawl.py | 9 +++++++++ 3 files changed, 11 insertions(+), 4 deletions(-) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 3c783e3acc..5d352e0daf 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -33,8 +33,8 @@ import aiobotocore.session import aiohttp -from mypy_boto3_s3.type_defs import CompletedPartTypeDef from types_aiobotocore_s3 import S3Client as AIOS3Client +from types_aiobotocore_s3.type_defs import CompletedPartTypeDef from .models import ( BaseFile, @@ -658,7 +658,7 @@ async def download_streaming_wacz( """generate streaming zip as sync""" datapackage = { "profile": "multi-wacz-package", - "resources": [file_.dict() for file_ in all_files], + "resources": [{**file_.dict(), "path": file_.name} for file_ in all_files], } datapackage_bytes = json.dumps(datapackage).encode("utf-8") diff --git a/backend/requirements.txt b/backend/requirements.txt index fc49b9506e..f1d9116fbf 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -18,10 +18,8 @@ humanize python-multipart pathvalidate https://github.com/ikreymer/stream-zip/archive/refs/heads/crc32-optional.zip -boto3 backoff>=2.2.1 python-slugify>=8.0.1 -mypy_boto3_s3 types_aiobotocore_s3 types-redis types-python-slugify diff --git a/backend/test/test_run_crawl.py b/backend/test/test_run_crawl.py index d794ee9c99..7156ee098c 100644 --- a/backend/test/test_run_crawl.py +++ b/backend/test/test_run_crawl.py @@ -406,6 +406,15 @@ def test_download_wacz_crawls( assert filename.endswith(".wacz") or filename == "datapackage.json" assert zip_file.getinfo(filename).compress_type == ZIP_STORED + if filename == "datapackage.json": + data = zip_file.read(filename).decode("utf-8") + datapackage = json.loads(data) + assert len(datapackage["resources"]) == 1 + for resource in datapackage["resources"]: + assert resource["name"] == resource["path"] + assert resource["size"] + assert resource["hash"] + def test_update_crawl( admin_auth_headers, From 68deb3fa9ed8dd5e1ecbfe854b8a81f65a7994fc Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 2 Oct 2024 20:28:14 -0700 Subject: [PATCH 3/9] fix multi-wacz datapackage.json: only include name, path, hash, bytes in each resource entry! --- backend/btrixcloud/storages.py | 12 ++++++++++-- 1 file changed, 10 insertions(+), 2 deletions(-) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 5d352e0daf..ac8a68087c 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -658,9 +658,17 @@ async def download_streaming_wacz( """generate streaming zip as sync""" datapackage = { "profile": "multi-wacz-package", - "resources": [{**file_.dict(), "path": file_.name} for file_ in all_files], + "resources": [ + { + "name": file_.name, + "path": file_.name, + "hash": "sha256:" + file_.hash, + "bytes": file_.size, + } + for file_ in all_files + ], } - datapackage_bytes = json.dumps(datapackage).encode("utf-8") + datapackage_bytes = json.dumps(datapackage, indent=2).encode("utf-8") async def get_datapackage() -> AsyncIterable[bytes]: yield datapackage_bytes From 62dd80b49fcc8a53630e7cffdf738b32864cc833 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 2 Oct 2024 20:28:58 -0700 Subject: [PATCH 4/9] update test --- backend/test/test_run_crawl.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/test/test_run_crawl.py b/backend/test/test_run_crawl.py index 7156ee098c..82677d9b98 100644 --- a/backend/test/test_run_crawl.py +++ b/backend/test/test_run_crawl.py @@ -412,8 +412,8 @@ def test_download_wacz_crawls( assert len(datapackage["resources"]) == 1 for resource in datapackage["resources"]: assert resource["name"] == resource["path"] - assert resource["size"] assert resource["hash"] + assert resource["bytes"] def test_update_crawl( From cf6b7b02984e80e1aa0ac16e842de76271172049 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Wed, 2 Oct 2024 20:37:27 -0700 Subject: [PATCH 5/9] fix missing import --- backend/test/test_run_crawl.py | 1 + 1 file changed, 1 insertion(+) diff --git a/backend/test/test_run_crawl.py b/backend/test/test_run_crawl.py index 82677d9b98..dfcfaeb421 100644 --- a/backend/test/test_run_crawl.py +++ b/backend/test/test_run_crawl.py @@ -6,6 +6,7 @@ import re import csv import codecs +import json from tempfile import TemporaryFile from zipfile import ZipFile, ZIP_STORED From c7babf94d170154ab2b54f05e8b1876fa1db4c99 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Thu, 3 Oct 2024 09:38:25 -0700 Subject: [PATCH 6/9] switch back to mostly sync download with requests, but simplified to not require boto --- backend/btrixcloud/storages.py | 39 ++++++++++++++++++++++------------ backend/requirements.txt | 1 + 2 files changed, 26 insertions(+), 14 deletions(-) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index ac8a68087c..54c4f78c79 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -9,9 +9,9 @@ List, Dict, AsyncIterator, - AsyncIterable, TYPE_CHECKING, Any, + cast, ) from urllib.parse import urlsplit from contextlib import asynccontextmanager @@ -23,11 +23,13 @@ import json import os +import requests + from datetime import datetime from zipfile import ZipInfo from fastapi import Depends, HTTPException -from stream_zip import async_stream_zip, NO_COMPRESSION_64, Method +from stream_zip import stream_zip, NO_COMPRESSION_64, Method from remotezip import RemoteZip import aiobotocore.session @@ -652,9 +654,7 @@ def _sync_get_filestream(self, wacz_url: str, filename: str) -> Iterator[bytes]: with remote_zip.open(filename) as file_stream: yield from file_stream - async def download_streaming_wacz( - self, _: Organization, all_files: List[CrawlFileOut] - ) -> AsyncIterable[bytes]: + def _sync_dl(self, all_files: List[CrawlFileOut]) -> Iterator[bytes]: """generate streaming zip as sync""" datapackage = { "profile": "multi-wacz-package", @@ -670,18 +670,17 @@ async def download_streaming_wacz( } datapackage_bytes = json.dumps(datapackage, indent=2).encode("utf-8") - async def get_datapackage() -> AsyncIterable[bytes]: + def get_datapackage() -> Iterable[bytes]: yield datapackage_bytes - async def get_file(path: str) -> AsyncIterable[bytes]: + def get_file(path: str) -> Iterable[bytes]: path = self.resolve_internal_access_path(path) - async with aiohttp.ClientSession() as session: - async with session.get(path) as response: - async for chunk in response.content.iter_chunked(CHUNK_SIZE): - yield chunk + r = requests.get(path, stream=True) + for chunk in r.iter_content(CHUNK_SIZE): + yield chunk - async def member_files() -> ( - AsyncIterable[tuple[str, datetime, int, Method, AsyncIterable[bytes]]] + def member_files() -> ( + Iterable[tuple[str, datetime, int, Method, Iterable[bytes]]] ): modified_at = datetime(year=1980, month=1, day=1) perms = 0o664 @@ -704,7 +703,19 @@ async def member_files() -> ( get_datapackage(), ) - return async_stream_zip(member_files(), chunk_size=CHUNK_SIZE) + # stream_zip() is an Iterator but defined as an Iterable, can cast + return cast(Iterator[bytes], stream_zip(member_files(), chunk_size=CHUNK_SIZE)) + + async def download_streaming_wacz( + self, org: Organization, files: List[CrawlFileOut] + ) -> Iterator[bytes]: + """return an iter for downloading a stream nested wacz file + from list of files""" + loop = asyncio.get_event_loop() + + resp = await loop.run_in_executor(None, self._sync_dl, files) + + return resp # ============================================================================ diff --git a/backend/requirements.txt b/backend/requirements.txt index f1d9116fbf..e1a863cc41 100644 --- a/backend/requirements.txt +++ b/backend/requirements.txt @@ -11,6 +11,7 @@ aiofiles kubernetes-asyncio==29.0.0 kubernetes aiobotocore +requests redis>=5.0.0 pyyaml jinja2 From 4e5c5ed86bfafccebf8c612c219126136aad7df4 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Thu, 3 Oct 2024 10:46:48 -0700 Subject: [PATCH 7/9] add additional metadata for multi-wacz to be conformant with datapackage v1: - include 'id', but also 'title', 'description' and 'organization' fields, as well as 'crawlId' when possible - add 'type' indicating 'crawl', 'upload', 'collection', 'qa-run' --- backend/btrixcloud/basecrawls.py | 11 +++++++++-- backend/btrixcloud/colls.py | 11 ++++++++++- backend/btrixcloud/crawls.py | 11 ++++++++++- backend/btrixcloud/storages.py | 18 +++++++++--------- 4 files changed, 38 insertions(+), 13 deletions(-) diff --git a/backend/btrixcloud/basecrawls.py b/backend/btrixcloud/basecrawls.py index 67df51be5a..d913d3362b 100644 --- a/backend/btrixcloud/basecrawls.py +++ b/backend/btrixcloud/basecrawls.py @@ -54,7 +54,7 @@ # ============================================================================ -# pylint: disable=too-many-instance-attributes, too-many-public-methods +# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines class BaseCrawlOps: """operations that apply to all crawls""" @@ -823,7 +823,14 @@ async def download_crawl_as_single_wacz(self, crawl_id: str, org: Organization): if not crawl.resources: raise HTTPException(status_code=400, detail="no_crawl_resources") - resp = await self.storage_ops.download_streaming_wacz(org, crawl.resources) + metadata = {"type": crawl.type, "id": crawl_id, "organization": org.slug} + if crawl.name: + metadata["title"] = crawl.name + + if crawl.description: + metadata["description"] = crawl.description + + resp = await self.storage_ops.download_streaming_wacz(metadata, crawl.resources) headers = {"Content-Disposition": f'attachment; filename="{crawl_id}.wacz"'} return StreamingResponse( diff --git a/backend/btrixcloud/colls.py b/backend/btrixcloud/colls.py index 5b84ea8e5b..d0fdd43a7c 100644 --- a/backend/btrixcloud/colls.py +++ b/backend/btrixcloud/colls.py @@ -323,7 +323,16 @@ async def download_collection(self, coll_id: UUID, org: Organization): """Download all WACZs in collection as streaming nested WACZ""" coll = await self.get_collection(coll_id, org, resources=True) - resp = await self.storage_ops.download_streaming_wacz(org, coll.resources) + metadata = { + "type": "collection", + "id": str(coll_id), + "title": coll.name, + "organization": org.slug, + } + if coll.description: + metadata["description"] = coll.description + + resp = await self.storage_ops.download_streaming_wacz(metadata, coll.resources) headers = {"Content-Disposition": f'attachment; filename="{coll.name}.wacz"'} return StreamingResponse( diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index 81cf731e87..f6851adb18 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -1034,7 +1034,16 @@ async def download_qa_run_as_single_wacz( if not qa_run.resources: raise HTTPException(status_code=400, detail="qa_run_no_resources") - resp = await self.storage_ops.download_streaming_wacz(org, qa_run.resources) + metadata = { + "type": "qa_run", + "id": qa_run_id, + "crawlId": crawl_id, + "organization": org.slug, + } + + resp = await self.storage_ops.download_streaming_wacz( + metadata, qa_run.resources + ) finished = qa_run.finished.isoformat() diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 54c4f78c79..51b9086668 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -23,8 +23,6 @@ import json import os -import requests - from datetime import datetime from zipfile import ZipInfo @@ -33,7 +31,7 @@ from remotezip import RemoteZip import aiobotocore.session -import aiohttp +import requests from types_aiobotocore_s3 import S3Client as AIOS3Client from types_aiobotocore_s3.type_defs import CompletedPartTypeDef @@ -654,7 +652,9 @@ def _sync_get_filestream(self, wacz_url: str, filename: str) -> Iterator[bytes]: with remote_zip.open(filename) as file_stream: yield from file_stream - def _sync_dl(self, all_files: List[CrawlFileOut]) -> Iterator[bytes]: + def _sync_dl( + self, metadata: dict[str, str], all_files: List[CrawlFileOut] + ) -> Iterator[bytes]: """generate streaming zip as sync""" datapackage = { "profile": "multi-wacz-package", @@ -667,6 +667,7 @@ def _sync_dl(self, all_files: List[CrawlFileOut]) -> Iterator[bytes]: } for file_ in all_files ], + **metadata, } datapackage_bytes = json.dumps(datapackage, indent=2).encode("utf-8") @@ -675,9 +676,8 @@ def get_datapackage() -> Iterable[bytes]: def get_file(path: str) -> Iterable[bytes]: path = self.resolve_internal_access_path(path) - r = requests.get(path, stream=True) - for chunk in r.iter_content(CHUNK_SIZE): - yield chunk + r = requests.get(path, stream=True, timeout=None) + yield from r.iter_content(CHUNK_SIZE) def member_files() -> ( Iterable[tuple[str, datetime, int, Method, Iterable[bytes]]] @@ -707,13 +707,13 @@ def member_files() -> ( return cast(Iterator[bytes], stream_zip(member_files(), chunk_size=CHUNK_SIZE)) async def download_streaming_wacz( - self, org: Organization, files: List[CrawlFileOut] + self, metadata: dict[str, str], files: List[CrawlFileOut] ) -> Iterator[bytes]: """return an iter for downloading a stream nested wacz file from list of files""" loop = asyncio.get_event_loop() - resp = await loop.run_in_executor(None, self._sync_dl, files) + resp = await loop.run_in_executor(None, self._sync_dl, metadata, files) return resp From 124f4c118e8a50824c49ca610e4a9be5693fbcb9 Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Thu, 3 Oct 2024 12:01:39 -0700 Subject: [PATCH 8/9] add 'software' to metadata --- backend/btrixcloud/storages.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/backend/btrixcloud/storages.py b/backend/btrixcloud/storages.py index 51b9086668..cee943dcbf 100644 --- a/backend/btrixcloud/storages.py +++ b/backend/btrixcloud/storages.py @@ -51,6 +51,7 @@ ) from .utils import is_bool, slug_from_name +from .version import __version__ if TYPE_CHECKING: @@ -667,6 +668,7 @@ def _sync_dl( } for file_ in all_files ], + "software": f"Browsertrix v{__version__}", **metadata, } datapackage_bytes = json.dumps(datapackage, indent=2).encode("utf-8") From fceafdcc9abfc3fe8ea80bda58de71729d7c718d Mon Sep 17 00:00:00 2001 From: Ilya Kreymer Date: Thu, 3 Oct 2024 16:10:24 -0700 Subject: [PATCH 9/9] qa_run -> qaRun --- backend/btrixcloud/crawls.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/backend/btrixcloud/crawls.py b/backend/btrixcloud/crawls.py index f6851adb18..5a0994fe70 100644 --- a/backend/btrixcloud/crawls.py +++ b/backend/btrixcloud/crawls.py @@ -1035,7 +1035,7 @@ async def download_qa_run_as_single_wacz( raise HTTPException(status_code=400, detail="qa_run_no_resources") metadata = { - "type": "qa_run", + "type": "qaRun", "id": qa_run_id, "crawlId": crawl_id, "organization": org.slug,