Skip to content

Commit

Permalink
Feature/upload provider (#69)
Browse files Browse the repository at this point in the history
* support additional headers in upload

* support additional headers in upload

* support additional headers in upload

* support additional headers in upload

* support additional headers in upload

* support additional headers in upload

* support additional headers in upload

* support additional headers in upload
  • Loading branch information
dbernaciak authored Sep 13, 2024
1 parent afc5e4d commit cc9ced0
Show file tree
Hide file tree
Showing 7 changed files with 50 additions and 18 deletions.
2 changes: 1 addition & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[package]
name = "pyfusion"
version = "1.3.2"
version = "1.3.3-dev0"
edition = "2021"


Expand Down
2 changes: 1 addition & 1 deletion py_src/fusion/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@

__author__ = """Fusion Devs"""
__email__ = "[email protected]"
__version__ = "1.3.2"
__version__ = "1.3.3-dev0"

from fusion._fusion import FusionCredentials
from fusion.fs_sync import fsync
Expand Down
4 changes: 3 additions & 1 deletion py_src/fusion/fusion.py
Original file line number Diff line number Diff line change
Expand Up @@ -1025,12 +1025,13 @@ def upload( # noqa: PLR0913
from_date: Optional[str] = None,
to_date: Optional[str] = None,
preserve_original_name: Optional[bool] = False,
additional_headers: Optional[dict[str, str]] = None,
) -> Optional[list[tuple[bool, str, Optional[str]]]]:
"""Uploads the requested files/files to Fusion.
Args:
path (str): path to a file or a folder with files
dataset (str, optional): Dataset name to which the file will be uplaoded (for single file only).
dataset (str, optional): Dataset name to which the file will be uploaded (for single file only).
If not provided the dataset will be implied from file's name.
dt_str (str, optional): A file name. Can be any string but is usually a date.
Defaults to 'latest' which will return the most recent.
Expand Down Expand Up @@ -1118,6 +1119,7 @@ def upload( # noqa: PLR0913
show_progress=show_progress,
from_date=from_date,
to_date=to_date,
additional_headers=additional_headers,
)

if not all(r[0] for r in res):
Expand Down
50 changes: 38 additions & 12 deletions py_src/fusion/fusion_filesystem.py
Original file line number Diff line number Diff line change
Expand Up @@ -520,14 +520,26 @@ def get( # disable: W0221
self.loop, self._download_single_file_async, str(rpath), lpath, file_size, chunk_size, n_threads
)

async def _put_file( # noqa: PLR0915
@staticmethod
def _update_kwargs(
kw: dict[str, Any], headers: dict[str, str], additional_headers: Optional[dict[str, str]]
) -> dict[str, Any]:
if "File-Name" in headers: # noqa: PLR0915
kw.setdefault("headers", {})
kw["headers"]["File-Name"] = headers["File-Name"]
if additional_headers:
kw["headers"].update(additional_headers)
return kw

async def _put_file( # noqa: PLR0915, PLR0913
self,
lpath: Union[str, io.IOBase, fsspec.spec.AbstractBufferedFile],
rpath: str,
chunk_size: int = 5 * 2**20,
callback: fsspec.callbacks.Callback = _DEFAULT_CALLBACK,
method: str = "post",
multipart: bool = False,
additional_headers: Optional[dict[str, str]] = None,
**kwargs: Any,
) -> None:
async def put_data() -> AsyncGenerator[dict[Any, Any], None]:
Expand All @@ -553,6 +565,7 @@ async def put_data() -> AsyncGenerator[dict[Any, Any], None]:
kw = self.kwargs.copy()
url = rpath + f"/operations/upload?operationId={operation_id}&partNumber={i+1}"
kw.update({"headers": kwargs["chunk_headers_lst"][i]})
kw = FusionHTTPFileSystem._update_kwargs(kw, headers, additional_headers)
async with meth(url=url, data=chunk, **kw) as resp:
await self._async_raise_not_found_for_status(resp, rpath)
yield await resp.json()
Expand All @@ -572,15 +585,16 @@ async def put_data() -> AsyncGenerator[dict[Any, Any], None]:
if not multipart:
kw = self.kwargs.copy()
kw.update({"headers": headers})
if additional_headers:
kw["headers"].update(additional_headers)
if isinstance(lpath, io.BytesIO):
lpath.seek(0)
async with meth(rpath, data=lpath.read(), **kw) as resp: # type: ignore
await self._async_raise_not_found_for_status(resp, rpath)
else:
kw = self.kwargs.copy()
if "File-Name" in headers: # noqa: PLR0915
kw.setdefault("headers", {})
kw["headers"]["File-Name"] = headers["File-Name"]
kw = FusionHTTPFileSystem._update_kwargs(kw, headers, additional_headers)

async with session.post(rpath + "/operationType/upload", **kw) as resp:
await self._async_raise_not_found_for_status(resp, rpath)
operation_id = await resp.json()
Expand All @@ -589,6 +603,7 @@ async def put_data() -> AsyncGenerator[dict[Any, Any], None]:
resps = [resp async for resp in put_data()]
kw = self.kwargs.copy()
kw.update({"headers": headers})
kw = FusionHTTPFileSystem._update_kwargs(kw, headers, additional_headers)
async with session.post(
url=rpath + f"/operations/upload?operationId={operation_id}",
json={"parts": resps},
Expand All @@ -615,6 +630,7 @@ def _construct_headers(
}
if file_name:
headers["File-Name"] = file_name

headers["Content-Type"] = "application/json" if multipart else headers["Content-Type"]
headers_chunks = {"Content-Type": "application/octet-stream", "Digest": ""}

Expand Down Expand Up @@ -649,6 +665,7 @@ def _cloud_copy( # noqa: PLR0913, PLR0915
callback: fsspec.callbacks.Callback = _DEFAULT_CALLBACK,
method: str = "put",
file_name: Optional[str] = None,
additional_headers: Optional[dict[str, str]] = None,
) -> None:
async def _get_operation_id(kw: dict[str, str]) -> dict[str, Any]:
session = await self.set_session()
Expand Down Expand Up @@ -702,6 +719,7 @@ async def _meth(url: Any, kw: Any) -> None:
headers_chunks["Digest"] = "SHA-256=" + base64.b64encode(hash_sha256_chunk.digest()).decode()
kw = self.kwargs.copy()
kw.update({"headers": headers_chunks})
kw = FusionHTTPFileSystem._update_kwargs(kw, headers, additional_headers)
url = rpath + f"/operations/upload?operationId={operation_id}&partNumber={i+1}"
yield sync(self.loop, _meth, url, kw)
i += 1
Expand All @@ -722,19 +740,23 @@ async def _meth(url: Any, kw: Any) -> None:
}
if file_name:
headers["File-Name"] = file_name

if additional_headers:
for k, v in additional_headers.items():
headers[k] = v

lpath.seek(0)
kw = self.kwargs.copy()
kw.update({"headers": headers})

kw_op = self.kwargs.copy()
if "File-Name" in headers: # noqa: SIM102
kw_op.setdefault("headers", {})
kw_op["headers"]["File-Name"] = headers["File-Name"]
kw_op = FusionHTTPFileSystem._update_kwargs(kw_op, headers, additional_headers)

operation_id = sync(self.loop, _get_operation_id, kw_op)["operationId"]
resps = list(put_data())
hash_sha256 = hash_sha256_lst[0]
headers["Digest"] = "SHA-256=" + base64.b64encode(hash_sha256.digest()).decode()
kw = self.kwargs.copy()
kw.update({"headers": headers})
kw = FusionHTTPFileSystem._update_kwargs(kw, headers, additional_headers)
sync(self.loop, _finish_operation, operation_id, kw)

def put( # noqa: PLR0913
Expand All @@ -748,6 +770,7 @@ def put( # noqa: PLR0913
from_date: Optional[str] = None,
to_date: Optional[str] = None,
file_name: Optional[str] = None,
additional_headers: Optional[dict[str, str]] = None,
**kwargs: Any,
) -> Any:
"""Copy file(s) from local.
Expand All @@ -762,6 +785,7 @@ def put( # noqa: PLR0913
from_date: earliest date of data in upload file
to_date: latest date of data in upload file
file_name: Name of the file.
additional_headers: Additional headers.
**kwargs: Kwargs.
Returns:
Expand All @@ -778,16 +802,18 @@ def put( # noqa: PLR0913
dt_created = pd.Timestamp.now().strftime("%Y-%m-%d")
rpath = self._decorate_url(rpath)
if type(lpath).__name__ in ["S3File"]:
return self._cloud_copy(lpath, rpath, dt_from, dt_to, dt_created, chunk_size, callback, method, file_name)
return self._cloud_copy(
lpath, rpath, dt_from, dt_to, dt_created, chunk_size, callback, method, file_name, additional_headers
)
headers, chunk_headers_lst = self._construct_headers(
lpath, dt_from, dt_to, dt_created, chunk_size, multipart, file_name
)
kwargs.update({"headers": headers})
if multipart:
kwargs.update({"chunk_headers_lst": chunk_headers_lst})
args = [lpath, rpath, chunk_size, callback, method, multipart]
args = [lpath, rpath, chunk_size, callback, method, multipart, additional_headers]
else:
args = [lpath, rpath, None, callback, method, multipart]
args = [lpath, rpath, None, callback, method, multipart, additional_headers]

return sync(super().loop, self._put_file, *args, **kwargs)

Expand Down
4 changes: 4 additions & 0 deletions py_src/fusion/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,6 +763,7 @@ def upload_files( # noqa: PLR0913
show_progress: bool = True,
from_date: str | None = None,
to_date: str | None = None,
additional_headers: dict[str, str] | None = None,
) -> list[tuple[bool, str, str | None]]:
"""Upload file into Fusion.
Expand All @@ -777,6 +778,7 @@ def upload_files( # noqa: PLR0913
show_progress (bool): Show progress bar
from_date (str, optional): earliest date of data contained in distribution.
to_date (str, optional): latest date of data contained in distribution.
additional_headers (dict, optional): Additional headers to include in the request.
Returns: List of update statuses.
Expand All @@ -796,6 +798,7 @@ def _upload(p_url: str, path: str, file_name: str | None = None) -> tuple[bool,
from_date=from_date,
to_date=to_date,
file_name=file_name,
additional_headers=additional_headers,
)
else:
with fs_local.open(path, "rb") as file_local:
Expand All @@ -808,6 +811,7 @@ def _upload(p_url: str, path: str, file_name: str | None = None) -> tuple[bool,
from_date=from_date,
to_date=to_date,
file_name=file_name,
additional_headers=additional_headers,
)
return (True, path, None)
except Exception as ex: # noqa: BLE001
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "pyfusion"
version = "1.3.2"
version = "1.3.3-dev0"

homepage = "https://github.com/jpmorganchase/fusion"
description = "JPMC Fusion Developer Tools"
Expand Down Expand Up @@ -224,7 +224,7 @@ omit = [


[tool.bumpversion]
current_version = "1.3.2"
current_version = "1.3.3-dev0"
parse = '(?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)(?:-(?P<release>[a-z]+)(?P<candidate>\d+))?'
serialize = [
'{major}.{minor}.{patch}-{release}{candidate}',
Expand Down

0 comments on commit cc9ced0

Please sign in to comment.