Skip to content

Commit

Permalink
Merge pull request #31 from jpmorganchase/feature/multipart-download
Browse files Browse the repository at this point in the history
multipart/range-request download if only 1 file is downloaded
  • Loading branch information
robertsdrm authored Mar 27, 2024
2 parents a1baa10 + 0964578 commit 6923f1a
Show file tree
Hide file tree
Showing 2 changed files with 166 additions and 32 deletions.
87 changes: 57 additions & 30 deletions fusion/fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
csv_to_table,
distribution_to_filename,
distribution_to_url,
download_single_file_threading,
get_session,
is_dataset_raw,
json_to_table,
Expand Down Expand Up @@ -691,41 +692,67 @@ def download(
for d in download_folders:
if not self.fs.exists(d):
self.fs.mkdir(d, create_parents=True)
download_spec = [
{
"credentials": self.credentials,
"url": distribution_to_url(
self.root_url, series[1], series[2], series[3], series[0]
),
"output_file": distribution_to_filename(
download_folders[i],
series[1],
series[2],
series[3],
series[0],
partitioning=partitioning,
),
"overwrite": force_download,
"fs": self.fs,
}
for i, series in enumerate(required_series)
]

logger.log(
VERBOSE_LVL,
f"Beginning {len(download_spec)} downloads in batches of {n_par}",
)
if show_progress:
with tqdm_joblib(tqdm(total=len(download_spec))) as _:
if len(required_series) == 1:
with tqdm(total=1) as pbar:
res = download_single_file_threading(
self.credentials,
distribution_to_url(
self.root_url,
required_series[0][1],
required_series[0][2],
required_series[0][3],
required_series[0][0],
),
distribution_to_filename(
download_folders[0],
required_series[0][1],
required_series[0][2],
required_series[0][3],
required_series[0][0],
partitioning=partitioning,
),
fs=self.fs,
)
if (len(res) > 0) and all((r[0] for r in res)):
pbar.update(1)

else:
download_spec = [
{
"credentials": self.credentials,
"url": distribution_to_url(
self.root_url, series[1], series[2], series[3], series[0]
),
"output_file": distribution_to_filename(
download_folders[i],
series[1],
series[2],
series[3],
series[0],
partitioning=partitioning,
),
"overwrite": force_download,
"fs": self.fs,
}
for i, series in enumerate(required_series)
]

logger.log(
VERBOSE_LVL,
f"Beginning {len(download_spec)} downloads in batches of {n_par}",
)
if show_progress:
with tqdm_joblib(tqdm(total=len(download_spec))) as _:
res = Parallel(n_jobs=n_par)(
delayed(stream_single_file_new_session)(**spec)
for spec in download_spec
)
else:
res = Parallel(n_jobs=n_par)(
delayed(stream_single_file_new_session)(**spec)
for spec in download_spec
)
else:
res = Parallel(n_jobs=n_par)(
delayed(stream_single_file_new_session)(**spec)
for spec in download_spec
)

if (len(res) > 0) and (not all((r[0] for r in res))):
for r in res:
Expand Down
111 changes: 109 additions & 2 deletions fusion/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,12 @@
import sys
from datetime import timedelta
from io import BytesIO
import math
from pathlib import Path
from typing import Union
from urllib.parse import urlparse, urlunparse
from threading import Thread
from threading import Lock

import aiohttp
import joblib
Expand Down Expand Up @@ -514,7 +517,10 @@ def distribution_to_url(
f"{root_url}catalogs/{catalog}/datasets/{dataset}/sample/distributions/csv"
)

return f"{root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/{datasetseries}/distributions/{file_format}"
return (
f"{root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/"
f"{datasetseries}/distributions/{file_format}/operationType/download"
)


def _get_canonical_root_url(any_url: str) -> str:
Expand Down Expand Up @@ -724,6 +730,107 @@ def _stream_single_file_new_session_dry_run(credentials, url: str, output_file:
return False, output_file, ex


def stream_single_file_new_session_chunks(
credentials,
url: str,
output_file,
start: int,
end: int,
lock,
results: list,
idx: int,
overwrite: bool = True,
fs: fsspec.AbstractFileSystem = fsspec.filesystem("file"),
) -> int:
"""Function to stream a single file from the API to a file on disk.
Args:
credentials (FusionCredentials): Valid user credentials to provide an acces token
url (str): The URL to call.
output_file: The file handle for the target write file.
start (int): Start byte.
end(int): End byte.
lock (Threading.Lock): Lock.
results (list): Results list.
idx (int): Results list index.
overwrite (bool, optional): True if previously downloaded files should be overwritten. Defaults to True.
fs (fsspec.filesystem): Filesystem.
Returns:
int: Exit status
"""

if not overwrite and fs.exists(output_file):
results[idx] = True, output_file, None
return 0

try:
url = url + f"?downloadRange=bytes={start}-{end-1}"
with get_session(credentials, url).get(url, stream=False) as r:
r.raise_for_status()
with lock:
output_file.seek(start)
output_file.write(r.content)

logger.log(
VERBOSE_LVL,
f"Wrote {start} - {end} bytes to {output_file}",
)
results[idx] = (True, output_file, None)
return 0
except Exception as ex:
logger.log(
VERBOSE_LVL,
f"Failed to write to {output_file}. ex - {ex}",
)
results[idx] = (False, output_file, ex)
return 1


def download_single_file_threading(
credentials,
url: str,
output_file,
chunk_size: int = 5 * 2**20,
fs: fsspec.AbstractFileSystem = fsspec.filesystem("file"),
):
"""Download single file using range requests.
Args:
credentials (FusionCredentials): Valid user credentials to provide an access token
url (str): The URL to call.
output_file (str): The filename that the data will be saved into.
chunk_size (int): Chunk size for parallelization.
fs (fsspec.filesystem): Filesystem.
Returns: List[Tuple]
"""
header = get_session(credentials, url).head(url).headers
content_length = int(header["Content-Length"])
n_chunks = int(math.ceil(content_length / chunk_size))
starts = [i * chunk_size for i in range(n_chunks)]
ends = [min((i + 1) * chunk_size, content_length) for i in range(n_chunks)]
lock = Lock()
output_file = fs.open(output_file, "wb")
results = [None] * n_chunks
threads = [
Thread(
target=stream_single_file_new_session_chunks,
args=(credentials, url, output_file, start, end, lock, results, idx),
)
for idx, (start, end) in enumerate(zip(starts, ends))
]
for thread in threads:
thread.start()

for thread in threads:
thread.join()
output_file.close()
return results


def stream_single_file_new_session(
credentials,
url: str,
Expand All @@ -736,7 +843,7 @@ def stream_single_file_new_session(
"""Function to stream a single file from the API to a file on disk.
Args:
credentials (FusionCredentials): Valid user credentials to provide an acces token
credentials (FusionCredentials): Valid user credentials to provide an access token
url (str): The URL to call.
output_file (str): The filename that the data will be saved into.
overwrite (bool, optional): True if previously downloaded files should be overwritten. Defaults to True.
Expand Down

0 comments on commit 6923f1a

Please sign in to comment.