Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

switch to async streaming download: #1982

Merged
merged 9 commits into from
Oct 3, 2024
Merged
Show file tree
Hide file tree
Changes from 8 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 9 additions & 2 deletions backend/btrixcloud/basecrawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@


# ============================================================================
# pylint: disable=too-many-instance-attributes, too-many-public-methods
# pylint: disable=too-many-instance-attributes, too-many-public-methods, too-many-lines
class BaseCrawlOps:
"""operations that apply to all crawls"""

Expand Down Expand Up @@ -823,7 +823,14 @@ async def download_crawl_as_single_wacz(self, crawl_id: str, org: Organization):
if not crawl.resources:
raise HTTPException(status_code=400, detail="no_crawl_resources")

resp = await self.storage_ops.download_streaming_wacz(org, crawl.resources)
metadata = {"type": crawl.type, "id": crawl_id, "organization": org.slug}
if crawl.name:
metadata["title"] = crawl.name

if crawl.description:
metadata["description"] = crawl.description

resp = await self.storage_ops.download_streaming_wacz(metadata, crawl.resources)

headers = {"Content-Disposition": f'attachment; filename="{crawl_id}.wacz"'}
return StreamingResponse(
Expand Down
11 changes: 10 additions & 1 deletion backend/btrixcloud/colls.py
Original file line number Diff line number Diff line change
Expand Up @@ -323,7 +323,16 @@ async def download_collection(self, coll_id: UUID, org: Organization):
"""Download all WACZs in collection as streaming nested WACZ"""
coll = await self.get_collection(coll_id, org, resources=True)

resp = await self.storage_ops.download_streaming_wacz(org, coll.resources)
metadata = {
"type": "collection",
"id": str(coll_id),
"title": coll.name,
"organization": org.slug,
}
if coll.description:
metadata["description"] = coll.description

resp = await self.storage_ops.download_streaming_wacz(metadata, coll.resources)

headers = {"Content-Disposition": f'attachment; filename="{coll.name}.wacz"'}
return StreamingResponse(
Expand Down
11 changes: 10 additions & 1 deletion backend/btrixcloud/crawls.py
Original file line number Diff line number Diff line change
Expand Up @@ -1034,7 +1034,16 @@ async def download_qa_run_as_single_wacz(
if not qa_run.resources:
raise HTTPException(status_code=400, detail="qa_run_no_resources")

resp = await self.storage_ops.download_streaming_wacz(org, qa_run.resources)
metadata = {
"type": "qa_run",
"id": qa_run_id,
"crawlId": crawl_id,
ikreymer marked this conversation as resolved.
Show resolved Hide resolved
"organization": org.slug,
}

resp = await self.storage_ops.download_streaming_wacz(
metadata, qa_run.resources
)

finished = qa_run.finished.isoformat()

Expand Down
79 changes: 29 additions & 50 deletions backend/btrixcloud/storages.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,11 +31,10 @@
from remotezip import RemoteZip

import aiobotocore.session
import boto3
import requests

from mypy_boto3_s3.client import S3Client
from mypy_boto3_s3.type_defs import CompletedPartTypeDef
from types_aiobotocore_s3 import S3Client as AIOS3Client
from types_aiobotocore_s3.type_defs import CompletedPartTypeDef

from .models import (
BaseFile,
Expand All @@ -52,6 +51,7 @@
)

from .utils import is_bool, slug_from_name
from .version import __version__


if TYPE_CHECKING:
Expand Down Expand Up @@ -289,35 +289,6 @@ async def get_s3_client(
) as client:
yield client, bucket, key

@asynccontextmanager
async def get_sync_client(
self, org: Organization
) -> AsyncIterator[tuple[S3Client, str, str]]:
"""context manager for s3 client"""
storage = self.get_org_primary_storage(org)

endpoint_url = storage.endpoint_url

if not endpoint_url.endswith("/"):
endpoint_url += "/"

parts = urlsplit(endpoint_url)
bucket, key = parts.path[1:].split("/", 1)

endpoint_url = parts.scheme + "://" + parts.netloc

try:
client = boto3.client(
"s3",
region_name=storage.region,
endpoint_url=endpoint_url,
aws_access_key_id=storage.access_key,
aws_secret_access_key=storage.secret_key,
)
yield client, bucket, key
finally:
client.close()

async def verify_storage_upload(self, storage: S3Storage, filename: str) -> None:
"""Test credentials and storage endpoint by uploading an empty test file"""

Expand Down Expand Up @@ -683,21 +654,32 @@ def _sync_get_filestream(self, wacz_url: str, filename: str) -> Iterator[bytes]:
yield from file_stream

def _sync_dl(
self, all_files: List[CrawlFileOut], client: S3Client, bucket: str, key: str
self, metadata: dict[str, str], all_files: List[CrawlFileOut]
) -> Iterator[bytes]:
"""generate streaming zip as sync"""
for file_ in all_files:
file_.path = file_.name

datapackage = {
"profile": "multi-wacz-package",
"resources": [file_.dict() for file_ in all_files],
"resources": [
{
"name": file_.name,
"path": file_.name,
"hash": "sha256:" + file_.hash,
"bytes": file_.size,
}
for file_ in all_files
],
"software": f"Browsertrix v{__version__}",
**metadata,
}
datapackage_bytes = json.dumps(datapackage).encode("utf-8")
datapackage_bytes = json.dumps(datapackage, indent=2).encode("utf-8")

def get_datapackage() -> Iterable[bytes]:
yield datapackage_bytes

def get_file(name) -> Iterator[bytes]:
response = client.get_object(Bucket=bucket, Key=key + name)
return response["Body"].iter_chunks(chunk_size=CHUNK_SIZE)
def get_file(path: str) -> Iterable[bytes]:
path = self.resolve_internal_access_path(path)
r = requests.get(path, stream=True, timeout=None)
yield from r.iter_content(CHUNK_SIZE)

def member_files() -> (
Iterable[tuple[str, datetime, int, Method, Iterable[bytes]]]
Expand All @@ -710,7 +692,7 @@ def member_files() -> (
modified_at,
perms,
NO_COMPRESSION_64(file_.size, 0),
get_file(file_.name),
get_file(file_.path),
)

yield (
Expand All @@ -720,25 +702,22 @@ def member_files() -> (
NO_COMPRESSION_64(
len(datapackage_bytes), zlib.crc32(datapackage_bytes)
),
(datapackage_bytes,),
get_datapackage(),
)

# stream_zip() is an Iterator but defined as an Iterable, can cast
return cast(Iterator[bytes], stream_zip(member_files(), chunk_size=CHUNK_SIZE))

async def download_streaming_wacz(
self, org: Organization, files: List[CrawlFileOut]
self, metadata: dict[str, str], files: List[CrawlFileOut]
) -> Iterator[bytes]:
"""return an iter for downloading a stream nested wacz file
from list of files"""
async with self.get_sync_client(org) as (client, bucket, key):
loop = asyncio.get_event_loop()
loop = asyncio.get_event_loop()

resp = await loop.run_in_executor(
None, self._sync_dl, files, client, bucket, key
)
resp = await loop.run_in_executor(None, self._sync_dl, metadata, files)

return resp
return resp


# ============================================================================
Expand Down
3 changes: 1 addition & 2 deletions backend/requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,16 @@ aiofiles
kubernetes-asyncio==29.0.0
kubernetes
aiobotocore
requests
redis>=5.0.0
pyyaml
jinja2
humanize
python-multipart
pathvalidate
https://github.com/ikreymer/stream-zip/archive/refs/heads/crc32-optional.zip
boto3
backoff>=2.2.1
python-slugify>=8.0.1
mypy_boto3_s3
types_aiobotocore_s3
types-redis
types-python-slugify
Expand Down
10 changes: 10 additions & 0 deletions backend/test/test_run_crawl.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
import re
import csv
import codecs
import json
from tempfile import TemporaryFile
from zipfile import ZipFile, ZIP_STORED

Expand Down Expand Up @@ -406,6 +407,15 @@ def test_download_wacz_crawls(
assert filename.endswith(".wacz") or filename == "datapackage.json"
assert zip_file.getinfo(filename).compress_type == ZIP_STORED

if filename == "datapackage.json":
data = zip_file.read(filename).decode("utf-8")
datapackage = json.loads(data)
assert len(datapackage["resources"]) == 1
for resource in datapackage["resources"]:
assert resource["name"] == resource["path"]
assert resource["hash"]
assert resource["bytes"]


def test_update_crawl(
admin_auth_headers,
Expand Down
Loading