From 9af7b3bcc426722e3614c5732f5ebd2d0c314169 Mon Sep 17 00:00:00 2001 From: dbernaciak Date: Tue, 26 Mar 2024 21:36:38 -0400 Subject: [PATCH 1/7] multipart/range-request download if only 1 file is downloaded --- fusion/fusion.py | 85 ++++++++++++++++++++++++------------- fusion/utils.py | 107 ++++++++++++++++++++++++++++++++++++++++++++++- 2 files changed, 160 insertions(+), 32 deletions(-) diff --git a/fusion/fusion.py b/fusion/fusion.py index 76d8d55c..69155d5e 100755 --- a/fusion/fusion.py +++ b/fusion/fusion.py @@ -25,6 +25,7 @@ csv_to_table, distribution_to_filename, distribution_to_url, + download_single_file_threading, get_session, is_dataset_raw, json_to_table, @@ -691,41 +692,65 @@ def download( for d in download_folders: if not self.fs.exists(d): self.fs.mkdir(d, create_parents=True) - download_spec = [ - { - "credentials": self.credentials, - "url": distribution_to_url( - self.root_url, series[1], series[2], series[3], series[0] - ), - "output_file": distribution_to_filename( - download_folders[i], - series[1], - series[2], - series[3], - series[0], - partitioning=partitioning, - ), - "overwrite": force_download, - "fs": self.fs, - } - for i, series in enumerate(required_series) - ] - logger.log( - VERBOSE_LVL, - f"Beginning {len(download_spec)} downloads in batches of {n_par}", - ) - if show_progress: - with tqdm_joblib(tqdm(total=len(download_spec))) as _: + if len(required_series) == 1: + with tqdm(total=1) as _: + res = download_single_file_threading( + self.credentials, + distribution_to_url( + self.root_url, + required_series[0][1], + required_series[0][2], + required_series[0][3], + required_series[0][0], + ), + distribution_to_filename( + download_folders[0], + required_series[0][1], + required_series[0][2], + required_series[0][3], + required_series[0][0], + partitioning=partitioning, + ), + fs=self.fs, + ) + + else: + download_spec = [ + { + "credentials": self.credentials, + "url": distribution_to_url( + self.root_url, series[1], series[2], series[3], series[0] + ), + "output_file": distribution_to_filename( + download_folders[i], + series[1], + series[2], + series[3], + series[0], + partitioning=partitioning, + ), + "overwrite": force_download, + "fs": self.fs, + } + for i, series in enumerate(required_series) + ] + + logger.log( + VERBOSE_LVL, + f"Beginning {len(download_spec)} downloads in batches of {n_par}", + ) + if show_progress: + with tqdm_joblib(tqdm(total=len(download_spec))) as _: + res = Parallel(n_jobs=n_par)( + delayed(stream_single_file_new_session)(**spec) + for spec in download_spec + ) + else: res = Parallel(n_jobs=n_par)( delayed(stream_single_file_new_session)(**spec) for spec in download_spec ) - else: - res = Parallel(n_jobs=n_par)( - delayed(stream_single_file_new_session)(**spec) - for spec in download_spec - ) if (len(res) > 0) and (not all((r[0] for r in res))): for r in res: diff --git a/fusion/utils.py b/fusion/utils.py index cbab39eb..e12808af 100644 --- a/fusion/utils.py +++ b/fusion/utils.py @@ -9,9 +9,12 @@ import sys from datetime import timedelta from io import BytesIO +import math from pathlib import Path from typing import Union from urllib.parse import urlparse, urlunparse +from threading import Thread +from threading import Lock import aiohttp import joblib @@ -514,7 +517,7 @@ def distribution_to_url( f"{root_url}catalogs/{catalog}/datasets/{dataset}/sample/distributions/csv" ) - return f"{root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/{datasetseries}/distributions/{file_format}" + return f"{root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/{datasetseries}/distributions/{file_format}/operationType/download" def _get_canonical_root_url(any_url: str) -> str: @@ -724,6 +727,106 @@ def _stream_single_file_new_session_dry_run(credentials, url: str, output_file: return False, output_file, ex +def stream_single_file_new_session_chunks( + credentials, + url: str, + output_file: str, + start: int, + end: int, + lock, + results: list, + idx: int, + overwrite: bool = True, + fs: fsspec.AbstractFileSystem = fsspec.filesystem("file"), +) -> tuple: + """Function to stream a single file from the API to a file on disk. + + Args: + credentials (FusionCredentials): Valid user credentials to provide an acces token + url (str): The URL to call. + output_file (str): The file handle for the target write file. + start (int): Start byte. + end(int): End byte. + lock (Threading.Lock): Lock. + overwrite (bool, optional): True if previously downloaded files should be overwritten. Defaults to True. + fs (fsspec.filesystem): Filesystem. + + Returns: + tuple: A tuple + + """ + + if not overwrite and fs.exists(output_file): + return True, output_file, None + + try: + url = url + f"?downloadRange=bytes={start}-{end-1}" + with get_session(credentials, url).get(url, stream=False) as r: + r.raise_for_status() + byte_cnt = 0 + with lock: + # with fs.open(output_file, "wb") as outfile: + # outfile.seek(start) + # outfile.write(r.content) + output_file.seek(start) + output_file.write(r.content) + + logger.log( + VERBOSE_LVL, + f"Wrote {byte_cnt:,} bytes to {output_file}", + ) + results[idx] = (True, output_file, None) + except Exception as ex: + logger.log( + VERBOSE_LVL, + f"Failed to write to {output_file}. ex - {ex}", + ) + results[idx] = (False, output_file, ex) + + +def download_single_file_threading( + credentials, + url: str, + output_file, + chunk_size: int = 5 * 2**20, + fs: fsspec.AbstractFileSystem = fsspec.filesystem("file"), +): + """ + + Args: + credentials (FusionCredentials): Valid user credentials to provide an access token + url (str): The URL to call. + output_file (str): The filename that the data will be saved into. + chunk_size (int): Chunk size for parallelization. + fs (fsspec.filesystem): Filesystem. + + Returns: List[Tuple] + + """ + header = get_session(credentials, url).head(url).headers + content_length = int(header["Content-Length"]) + n_chunks = int(math.ceil(content_length / chunk_size)) + starts = [i * chunk_size for i in range(n_chunks)] + ends = [min((i + 1) * chunk_size, content_length) for i in range(n_chunks)] + lock = Lock() + output_file = fs.open(output_file, "wb") + results = [None] * n_chunks + threads = [ + Thread( + target=stream_single_file_new_session_chunks, + args=(credentials, url, output_file, start, end, lock, results, idx), + ) + for idx, (start, end) in enumerate(zip(starts, ends)) + ] + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + output_file.close() + return results + + def stream_single_file_new_session( credentials, url: str, @@ -736,7 +839,7 @@ def stream_single_file_new_session( """Function to stream a single file from the API to a file on disk. Args: - credentials (FusionCredentials): Valid user credentials to provide an acces token + credentials (FusionCredentials): Valid user credentials to provide an access token url (str): The URL to call. output_file (str): The filename that the data will be saved into. overwrite (bool, optional): True if previously downloaded files should be overwritten. Defaults to True. From 4096747b062ff8b21dca547f7e64be3f28617951 Mon Sep 17 00:00:00 2001 From: dbernaciak Date: Tue, 26 Mar 2024 21:47:05 -0400 Subject: [PATCH 2/7] multipart/range-request download if only 1 file is downloaded --- fusion/utils.py | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/fusion/utils.py b/fusion/utils.py index e12808af..02e8fa35 100644 --- a/fusion/utils.py +++ b/fusion/utils.py @@ -748,6 +748,8 @@ def stream_single_file_new_session_chunks( start (int): Start byte. end(int): End byte. lock (Threading.Lock): Lock. + results (list): Results list. + idx (int): Results list index. overwrite (bool, optional): True if previously downloaded files should be overwritten. Defaults to True. fs (fsspec.filesystem): Filesystem. @@ -791,7 +793,7 @@ def download_single_file_threading( chunk_size: int = 5 * 2**20, fs: fsspec.AbstractFileSystem = fsspec.filesystem("file"), ): - """ + """Download single file using range requests. Args: credentials (FusionCredentials): Valid user credentials to provide an access token From 6bf110daa59b7b36debc27faeaa2466adbdfb7d8 Mon Sep 17 00:00:00 2001 From: dbernaciak Date: Tue, 26 Mar 2024 22:14:02 -0400 Subject: [PATCH 3/7] multipart/range-request download if only 1 file is downloaded --- fusion/utils.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/fusion/utils.py b/fusion/utils.py index 02e8fa35..7f0c9161 100644 --- a/fusion/utils.py +++ b/fusion/utils.py @@ -517,7 +517,10 @@ def distribution_to_url( f"{root_url}catalogs/{catalog}/datasets/{dataset}/sample/distributions/csv" ) - return f"{root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/{datasetseries}/distributions/{file_format}/operationType/download" + return ( + f"{root_url}catalogs/{catalog}/datasets/{dataset}/datasetseries/" + f"{datasetseries}/distributions/{file_format}/operationType/download" + ) def _get_canonical_root_url(any_url: str) -> str: From 87986eaf494f5c4397953a3226b5fed3aef5b6ee Mon Sep 17 00:00:00 2001 From: dbernaciak Date: Tue, 26 Mar 2024 22:26:31 -0400 Subject: [PATCH 4/7] multipart/range-request download if only 1 file is downloaded --- fusion/utils.py | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/fusion/utils.py b/fusion/utils.py index 7f0c9161..070ad44a 100644 --- a/fusion/utils.py +++ b/fusion/utils.py @@ -733,7 +733,7 @@ def _stream_single_file_new_session_dry_run(credentials, url: str, output_file: def stream_single_file_new_session_chunks( credentials, url: str, - output_file: str, + output_file, start: int, end: int, lock, @@ -741,13 +741,13 @@ def stream_single_file_new_session_chunks( idx: int, overwrite: bool = True, fs: fsspec.AbstractFileSystem = fsspec.filesystem("file"), -) -> tuple: +) -> int: """Function to stream a single file from the API to a file on disk. Args: credentials (FusionCredentials): Valid user credentials to provide an acces token url (str): The URL to call. - output_file (str): The file handle for the target write file. + output_file: The file handle for the target write file. start (int): Start byte. end(int): End byte. lock (Threading.Lock): Lock. @@ -757,12 +757,13 @@ def stream_single_file_new_session_chunks( fs (fsspec.filesystem): Filesystem. Returns: - tuple: A tuple + int: Exit status """ if not overwrite and fs.exists(output_file): - return True, output_file, None + results[idx] = True, output_file, None + return 0 try: url = url + f"?downloadRange=bytes={start}-{end-1}" @@ -770,9 +771,6 @@ def stream_single_file_new_session_chunks( r.raise_for_status() byte_cnt = 0 with lock: - # with fs.open(output_file, "wb") as outfile: - # outfile.seek(start) - # outfile.write(r.content) output_file.seek(start) output_file.write(r.content) @@ -781,12 +779,14 @@ def stream_single_file_new_session_chunks( f"Wrote {byte_cnt:,} bytes to {output_file}", ) results[idx] = (True, output_file, None) + return 0 except Exception as ex: logger.log( VERBOSE_LVL, f"Failed to write to {output_file}. ex - {ex}", ) results[idx] = (False, output_file, ex) + return 1 def download_single_file_threading( From 758ca23948c1487b97aae53cf16b73f70ac36bc4 Mon Sep 17 00:00:00 2001 From: dbernaciak Date: Wed, 27 Mar 2024 10:40:54 -0400 Subject: [PATCH 5/7] multipart/range-request download if only 1 file is downloaded --- fusion/utils.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/fusion/utils.py b/fusion/utils.py index 070ad44a..40dba754 100644 --- a/fusion/utils.py +++ b/fusion/utils.py @@ -769,14 +769,13 @@ def stream_single_file_new_session_chunks( url = url + f"?downloadRange=bytes={start}-{end-1}" with get_session(credentials, url).get(url, stream=False) as r: r.raise_for_status() - byte_cnt = 0 with lock: output_file.seek(start) output_file.write(r.content) logger.log( VERBOSE_LVL, - f"Wrote {byte_cnt:,} bytes to {output_file}", + f"Wrote {start} - {end} bytes to {output_file}", ) results[idx] = (True, output_file, None) return 0 From b5ff3d6667cdde8deb9c98027970821faef99fdb Mon Sep 17 00:00:00 2001 From: dbernaciak Date: Wed, 27 Mar 2024 11:31:57 -0400 Subject: [PATCH 6/7] multipart/range-request download if only 1 file is downloaded --- fusion/fusion.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fusion/fusion.py b/fusion/fusion.py index 69155d5e..1eb760b6 100755 --- a/fusion/fusion.py +++ b/fusion/fusion.py @@ -694,7 +694,7 @@ def download( self.fs.mkdir(d, create_parents=True) if len(required_series) == 1: - with tqdm(total=1) as _: + with tqdm(total=1) as pbar: res = download_single_file_threading( self.credentials, distribution_to_url( @@ -714,6 +714,7 @@ def download( ), fs=self.fs, ) + pbar.update(1) else: download_spec = [ From 0964578f64bb58a1a1811e1271d596c29a757cd5 Mon Sep 17 00:00:00 2001 From: dbernaciak Date: Wed, 27 Mar 2024 12:36:22 -0400 Subject: [PATCH 7/7] multipart/range-request download if only 1 file is downloaded --- fusion/fusion.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/fusion/fusion.py b/fusion/fusion.py index 1eb760b6..cc45517e 100755 --- a/fusion/fusion.py +++ b/fusion/fusion.py @@ -714,7 +714,8 @@ def download( ), fs=self.fs, ) - pbar.update(1) + if (len(res) > 0) and all((r[0] for r in res)): + pbar.update(1) else: download_spec = [