From 63c2052082af46351a3f4fee58e1f06b538a39d7 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 15:42:49 +0200 Subject: [PATCH 01/27] added descriptive comments for mounting command --- packages/grid/seaweedfs/mount_command.sh | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/packages/grid/seaweedfs/mount_command.sh b/packages/grid/seaweedfs/mount_command.sh index f9c813ef8d5..1360cbddf62 100644 --- a/packages/grid/seaweedfs/mount_command.sh +++ b/packages/grid/seaweedfs/mount_command.sh @@ -1,3 +1,8 @@ +# 1 = remote config name +# 2 = azure account name +# 3 = seaweedfs bucket name +# 4 = azure container name +# 5 = azure key echo "remote.configure -name=$1 -type=azure -azure.account_name=$2 -azure.account_key=$5" | weed shell && \ echo "s3.bucket.create -name=$3" | weed shell && \ echo "remote.mount -dir=/buckets/$3 -remote=$1/$4" | weed shell && \ From 4f0833548fb96538dce8ad9b4523829acda19dcc Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 15:43:21 +0200 Subject: [PATCH 02/27] added remote profiles --- .../src/syft/service/blob_storage/service.py | 31 +++++++++++++++++-- 1 file changed, 29 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/service/blob_storage/service.py b/packages/syft/src/syft/service/blob_storage/service.py index 250998c314f..2b0b90ca429 100644 --- a/packages/syft/src/syft/service/blob_storage/service.py +++ b/packages/syft/src/syft/service/blob_storage/service.py @@ -6,6 +6,7 @@ # third party import requests +from syft.service.blob_storage.remote_profile import AzureRemoteProfile, RemoteProfileStash # relative from ...serde.serializable import serializable @@ -37,10 +38,12 @@ class BlobStorageService(AbstractService): store: DocumentStore stash: BlobStorageStash + remote_profile_stash: RemoteProfileStash def __init__(self, store: DocumentStore) -> None: self.store = store self.stash = BlobStorageStash(store=store) + self.remote_profile_stash = RemoteProfileStash(store=store) @service_method(path="blob_storage.get_all", name="get_all") def get_all_blob_storage_entries( @@ -65,15 +68,31 @@ def mount_azure( # TODO: fix arguments + remote_name = f"{account_name}-{container_name}" args_dict = { "account_name": account_name, "account_key": account_key, "container_name": container_name, - "remote_name": f"{account_name}{container_name}", + "remote_name": remote_name, "bucket_name": bucket_name, } + + new_profile = AzureRemoteProfile( + profile_name=remote_name, + account_name=account_name, + account_key=account_key, + container_name=container_name, + ) + res = self.remote_profile_stash.set(context.credentials, new_profile) + if res.is_err(): + return SyftError(message=res.value) + remote_profile = res.ok() + seaweed_config = context.node.blob_storage_client.config + seaweed_config.remote_profiles[remote_name] = remote_profile + # TODO: possible wrap this in try catch cfg = context.node.blob_store_config.client_config + print(cfg.mount_url) init_request = requests.post(url=cfg.mount_url, json=args_dict) # nosec print(init_request.content) # TODO check return code @@ -83,7 +102,8 @@ def mount_azure( res = context.node.blob_storage_client.connect().client.list_objects( Bucket=bucket_name ) - print(res) + import sys + print(res, file=sys.stderr) objects = res["Contents"] file_sizes = [object["Size"] for object in objects] file_paths = [object["Key"] for object in objects] @@ -118,6 +138,11 @@ def get_files_from_bucket(self, context: AuthedServiceContext, bucket_name: str) blob_files = [] for bse in bse_list: self.stash.set(obj=bse, credentials=context.credentials) + # We create an empty ActionObject and set its blob_storage_entry to bse + # so that we can call reloac_cache where + # we create the BlobRetrieval (user needs permission to do this) + # This could be a BlobRetrievalByURL that creates a BlobFile + # and then sets it in the cache (it does not contain the data, only the BlobFile) blob_file = ActionObject.empty() blob_file.syft_blob_storage_entry_id = bse.id blob_file.syft_client_verify_key = context.credentials @@ -146,6 +171,7 @@ def get_blob_storage_metadata_by_uid( return blob_storage_entry.to(BlobStorageMetadata) return SyftError(message=result.err()) + # TODO: replace name with `create_blob_retrieval` @service_method( path="blob_storage.read", name="read", @@ -164,6 +190,7 @@ def read( res: BlobRetrieval = conn.read( obj.location, obj.type_, bucket_name=obj.bucket_name ) + print(res) res.syft_blob_storage_entry_id = uid res.file_size = obj.file_size return res From e6281b0ab2b70ab3a860710b8b9f33add9604a65 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 17:39:45 +0200 Subject: [PATCH 03/27] new object versions --- .../src/syft/protocol/protocol_version.json | 30 ++++++++++++++++++- 1 file changed, 29 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 938f0409c62..1b0c14f2597 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -833,7 +833,7 @@ }, "2": { "version": 2, - "hash": "8059ee03016c4d74e408dad9529e877f91829672e0cc42d8cfff9c8e14058adc", + "hash": "c78a998aa1c6d9700c775ca65a4efc9c6ae2d2ffdf677219f7b004e635b2be42", "action": "add" } }, @@ -956,6 +956,34 @@ "hash": "776fc7cf7498b93e656a00fff03b86160d1b63e508e2143ac7932e7e38021b0c", "action": "add" } + }, + "SeaweedSecureFilePathLocation": { + "2": { + "version": 2, + "hash": "35ad807e2282b97a6022649b2efa186d0c1705e2eb8f3753d55efda1b6510a9a", + "action": "add" + } + }, + "AzureSecureFilePathLocation": { + "1": { + "version": 1, + "hash": "1bb15f3f9d7082779f1c9f58de94011487924cb8a8c9c2ec18fd7c161c27fd0e", + "action": "add" + } + }, + "RemoteConfig": { + "1": { + "version": 1, + "hash": "ad7bc4780a8ad52e14ce68601852c93d2fe07bda489809cad7cae786d2461754", + "action": "add" + } + }, + "AzureRemoteConfig": { + "1": { + "version": 1, + "hash": "c05c6caa27db4e385c642536d4b0ecabc0c71e91220d2e6ce21a2761ca68a673", + "action": "add" + } } } } From a016ba13a4d8cac67fbd3362e41635b8ef674cdd Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 17:40:14 +0200 Subject: [PATCH 04/27] allow BlobRetrievalByURL to use str url --- packages/syft/src/syft/store/blob_storage/__init__.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index 48699df9db1..a6c459ad5e5 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -173,7 +173,7 @@ class BlobRetrievalByURL(BlobRetrieval): __canonical_name__ = "BlobRetrievalByURL" __version__ = SYFT_OBJECT_VERSION_2 - url: GridURL + url: Union[GridURL, str] def read(self) -> Union[SyftObject, SyftError]: if self.type_ is BlobFileType: @@ -195,7 +195,7 @@ def _read_data(self, stream=False, chunk_size=512): node_uid=self.syft_node_location, user_verify_key=self.syft_client_verify_key, ) - if api is not None: + if api is not None and isinstance(self.url, GridURL): blob_url = api.connection.to_blob_route( self.url.url_path, host=self.url.host_or_ip ) From e6762b86be9f9671f4d2741e7bb427dc96c2e392 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 17:40:43 +0200 Subject: [PATCH 05/27] clear debug print --- .../src/syft/service/blob_storage/service.py | 21 ++++++------------- 1 file changed, 6 insertions(+), 15 deletions(-) diff --git a/packages/syft/src/syft/service/blob_storage/service.py b/packages/syft/src/syft/service/blob_storage/service.py index 2b0b90ca429..9bb034f6c5f 100644 --- a/packages/syft/src/syft/service/blob_storage/service.py +++ b/packages/syft/src/syft/service/blob_storage/service.py @@ -6,7 +6,6 @@ # third party import requests -from syft.service.blob_storage.remote_profile import AzureRemoteProfile, RemoteProfileStash # relative from ...serde.serializable import serializable @@ -29,6 +28,8 @@ from ..service import TYPE_TO_SERVICE from ..service import service_method from ..user.user_roles import GUEST_ROLE_LEVEL +from .remote_profile import AzureRemoteProfile +from .remote_profile import RemoteProfileStash from .stash import BlobStorageStash BlobDepositType = Union[OnDiskBlobDeposit, SeaweedFSBlobDeposit] @@ -64,7 +65,6 @@ def mount_azure( bucket_name: str, ): # stdlib - import sys # TODO: fix arguments @@ -76,7 +76,7 @@ def mount_azure( "remote_name": remote_name, "bucket_name": bucket_name, } - + new_profile = AzureRemoteProfile( profile_name=remote_name, account_name=account_name, @@ -88,22 +88,16 @@ def mount_azure( return SyftError(message=res.value) remote_profile = res.ok() seaweed_config = context.node.blob_storage_client.config - seaweed_config.remote_profiles[remote_name] = remote_profile - + seaweed_config.remote_profiles[remote_name] = remote_profile + # TODO: possible wrap this in try catch cfg = context.node.blob_store_config.client_config - print(cfg.mount_url) init_request = requests.post(url=cfg.mount_url, json=args_dict) # nosec print(init_request.content) # TODO check return code - - print(bucket_name, file=sys.stderr) - res = context.node.blob_storage_client.connect().client.list_objects( Bucket=bucket_name ) - import sys - print(res, file=sys.stderr) objects = res["Contents"] file_sizes = [object["Size"] for object in objects] file_paths = [object["Key"] for object in objects] @@ -132,14 +126,12 @@ def get_files_from_bucket(self, context: AuthedServiceContext, bucket_name: str) return result bse_list = result.ok() # stdlib - import sys - print(bse_list, file=sys.stderr) blob_files = [] for bse in bse_list: self.stash.set(obj=bse, credentials=context.credentials) # We create an empty ActionObject and set its blob_storage_entry to bse - # so that we can call reloac_cache where + # so that we can call reloac_cache where # we create the BlobRetrieval (user needs permission to do this) # This could be a BlobRetrievalByURL that creates a BlobFile # and then sets it in the cache (it does not contain the data, only the BlobFile) @@ -190,7 +182,6 @@ def read( res: BlobRetrieval = conn.read( obj.location, obj.type_, bucket_name=obj.bucket_name ) - print(res) res.syft_blob_storage_entry_id = uid res.file_size = obj.file_size return res From c79172aa7b30d7ed6adec1bb7f2bbe960052f228 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 17:41:11 +0200 Subject: [PATCH 06/27] fix lint --- packages/syft/src/syft/store/sqlite_document_store.py | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/store/sqlite_document_store.py b/packages/syft/src/syft/store/sqlite_document_store.py index 2ec313a1a8a..43cd5e2952b 100644 --- a/packages/syft/src/syft/store/sqlite_document_store.py +++ b/packages/syft/src/syft/store/sqlite_document_store.py @@ -206,8 +206,8 @@ def _set(self, key: UID, value: Any) -> None: self._update(key, value) else: insert_sql = ( - f"insert into {self.table_name} (uid, repr, value) VALUES (?, ?, ?)" # nosec - ) + f"insert into {self.table_name} (uid, repr, value) VALUES (?, ?, ?)" + ) # nosec data = _serialize(value, to_bytes=True) res = self._execute(insert_sql, [str(key), _repr_debug_(value), data]) if res.is_err(): @@ -215,8 +215,8 @@ def _set(self, key: UID, value: Any) -> None: def _update(self, key: UID, value: Any) -> None: insert_sql = ( - f"update {self.table_name} set uid = ?, repr = ?, value = ? where uid = ?" # nosec - ) + f"update {self.table_name} set uid = ?, repr = ?, value = ? where uid = ?" + ) # nosec data = _serialize(value, to_bytes=True) res = self._execute(insert_sql, [str(key), _repr_debug_(value), data, str(key)]) if res.is_err(): From c2981eb48a3ca577e3b39f604043e0608919a2c7 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 17:41:43 +0200 Subject: [PATCH 07/27] moved url generation logic in file path objects --- .../src/syft/store/blob_storage/seaweedfs.py | 30 ++++++++----------- 1 file changed, 13 insertions(+), 17 deletions(-) diff --git a/packages/syft/src/syft/store/blob_storage/seaweedfs.py b/packages/syft/src/syft/store/blob_storage/seaweedfs.py index c1314663d46..246edabe6d4 100644 --- a/packages/syft/src/syft/store/blob_storage/seaweedfs.py +++ b/packages/syft/src/syft/store/blob_storage/seaweedfs.py @@ -1,7 +1,7 @@ # stdlib from io import BytesIO import math -from pathlib import Path +from typing import Dict from typing import Generator from typing import List from typing import Optional @@ -19,14 +19,13 @@ # relative from . import BlobDeposit from . import BlobRetrieval -from . import BlobRetrievalByURL from . import BlobStorageClient from . import BlobStorageClientConfig from . import BlobStorageConfig from . import BlobStorageConnection from ...serde.serializable import serializable +from ...service.blob_storage.remote_profile import AzureRemoteProfile from ...service.response import SyftError -from ...service.response import SyftException from ...service.response import SyftSuccess from ...service.service import from_api_or_context from ...types.blob_storage import BlobStorageEntry @@ -37,7 +36,6 @@ from ...types.syft_object import SYFT_OBJECT_VERSION_1 from ...util.constants import DEFAULT_TIMEOUT -READ_EXPIRATION_TIME = 1800 # seconds WRITE_EXPIRATION_TIME = 900 # seconds DEFAULT_CHUNK_SIZE = 1024**3 # 1 GB @@ -109,6 +107,7 @@ class SeaweedFSClientConfig(BlobStorageClientConfig): secret_key: str region: str default_bucket_name: str = "defaultbucket" + remote_profiles: Dict[str, AzureRemoteProfile] = {} @property def endpoint_url(self) -> str: @@ -137,6 +136,7 @@ def connect(self) -> BlobStorageConnection: region_name=self.config.region, ), default_bucket_name=self.config.default_bucket_name, + config=self.config, ) @@ -144,10 +144,17 @@ def connect(self) -> BlobStorageConnection: class SeaweedFSConnection(BlobStorageConnection): client: S3BaseClient default_bucket_name: str + config: SeaweedFSClientConfig - def __init__(self, client: S3BaseClient, default_bucket_name: str): + def __init__( + self, + client: S3BaseClient, + default_bucket_name: str, + config: SeaweedFSClientConfig, + ): self.client = client self.default_bucket_name = default_bucket_name + self.config = config def __enter__(self) -> Self: return self @@ -160,18 +167,7 @@ def read( ) -> BlobRetrieval: if bucket_name is None: bucket_name = self.default_bucket_name - try: - url = self.client.generate_presigned_url( - ClientMethod="get_object", - Params={"Bucket": bucket_name, "Key": fp.path}, - ExpiresIn=READ_EXPIRATION_TIME, - ) - - return BlobRetrievalByURL( - url=GridURL.from_url(url), file_name=Path(fp.path).name, type_=type_ - ) - except BotoClientError as e: - raise SyftException(e) + return fp.generate_url(self, type_, bucket_name) def allocate( self, obj: CreateBlobStorageEntry From 82c60286f253180b3c67068c24e41d736cf65c72 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 17:42:09 +0200 Subject: [PATCH 08/27] added new azure files --- packages/syft/src/syft/types/blob_storage.py | 80 +++++++++++++++++++- 1 file changed, 79 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/types/blob_storage.py b/packages/syft/src/syft/types/blob_storage.py index d695bfd86d6..59e11d1b992 100644 --- a/packages/syft/src/syft/types/blob_storage.py +++ b/packages/syft/src/syft/types/blob_storage.py @@ -1,4 +1,6 @@ # stdlib +from datetime import datetime +from datetime import timedelta import mimetypes from pathlib import Path from queue import Queue @@ -13,6 +15,9 @@ from typing import Union # third party +from azure.storage.blob import BlobSasPermissions +from azure.storage.blob import generate_blob_sas +from botocore.client import ClientError as BotoClientError from typing_extensions import Self # relative @@ -24,6 +29,7 @@ from ..service.action.action_types import action_types from ..service.response import SyftException from ..service.service import from_api_or_context +from ..types.grid_url import GridURL from ..types.transforms import drop from ..types.transforms import keep from ..types.transforms import make_set_default @@ -35,6 +41,8 @@ from .syft_object import SyftObject from .uid import UID +READ_EXPIRATION_TIME = 1800 # seconds + @serializable() class BlobFileV1(SyftObject): @@ -155,13 +163,83 @@ class SecureFilePathLocation(SyftObject): def __repr__(self) -> str: return f"{self.path}" + def generate_url(self, connection, type_): + raise NotImplementedError + +@serializable() +class SeaweedSecureFilePathLocationV1(SecureFilePathLocation): + __canonical_name__ = "SeaweedSecureFilePathLocation" + __version__ = SYFT_OBJECT_VERSION_1 + + upload_id: str @serializable() class SeaweedSecureFilePathLocation(SecureFilePathLocation): __canonical_name__ = "SeaweedSecureFilePathLocation" - __version__ = SYFT_OBJECT_VERSION_1 + __version__ = SYFT_OBJECT_VERSION_2 upload_id: str + bucket_name: str + + def generate_url(self, connection, type_): + try: + url = connection.client.generate_presigned_url( + ClientMethod="get_object", + Params={"Bucket": self.bucket_name, "Key": self.path}, + ExpiresIn=READ_EXPIRATION_TIME, + ) + + # relative + from ..store.blob_storage import BlobRetrievalByURL + + return BlobRetrievalByURL( + url=GridURL.from_url(url), file_name=Path(self.path).name, type_=type_ + ) + except BotoClientError as e: + raise SyftException(e) + +@migrate(SeaweedSecureFilePathLocationV1, SeaweedSecureFilePathLocation) +def upgrade_seaweedsecurefilepathlocation_v1_to_v2(): + return [ + make_set_default("bucket_name", "") + ] + +@migrate(SeaweedSecureFilePathLocation, SeaweedSecureFilePathLocationV1) +def downgrade_seaweedsecurefilepathlocation_v2_to_v1(): + return [ + drop(["bucket_name"]), + ] + + +@serializable() +class AzureSecureFilePathLocation(SecureFilePathLocation): + __canonical_name__ = "AzureSecureFilePathLocation" + __version__ = SYFT_OBJECT_VERSION_1 + + # upload_id: str + azure_profile_name: str # Used by Seaweedfs to refer to a remote config + bucket_name: str + + def generate_url(self, connection, type_, bucket_name): + # SAS is almost the same thing as the presigned url + config = connection.config.remote_profiles[self.azure_profile_name] + account_name = config.account_name + container_name = config.container_name + blob_name = self.path + sas_blob = generate_blob_sas( + account_name=account_name, + container_name=container_name, + blob_name=blob_name, + account_key=config.account_key, + permission=BlobSasPermissions(read=True), + expiry=datetime.utcnow() + timedelta(hours=48), + ) + url = f"https://{config.account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_blob}" + + # relative + from ..store.blob_storage import BlobRetrievalByURL + + return BlobRetrievalByURL(url=url, file_name=Path(self.path).name, type_=type_) @serializable() From 65eb5a1460972254ad029d239822fd44cd100aa4 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 17:44:46 +0200 Subject: [PATCH 09/27] added notebook for testing --- notebooks/helm/direct_azure.ipynb | 298 ++++++++++++++++++++++++++++++ 1 file changed, 298 insertions(+) create mode 100644 notebooks/helm/direct_azure.ipynb diff --git a/notebooks/helm/direct_azure.ipynb b/notebooks/helm/direct_azure.ipynb new file mode 100644 index 00000000000..e17ff61ed01 --- /dev/null +++ b/notebooks/helm/direct_azure.ipynb @@ -0,0 +1,298 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 1, + "metadata": {}, + "outputs": [ + { + "name": "stderr", + "output_type": "stream", + "text": [ + "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /home/teo/OpenMined/PySyft\n" + ] + } + ], + "source": [ + "import syft as sy\n", + "import os" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "```\n", + "docker run --entrypoint /bin/sh -p 8333:8333 -p 8888:8888 chrislusf/seaweedfs -c \"echo 's3.configure -access_key admin -secret_key admin -user iam -actions Read,Write,List,Tagging,Admin -apply' | weed shell > /dev/null 2>&1 & weed server -s3 -s3.port=8333 -master.volumeSizeLimitMB=2048\"\n", + "```" + ] + }, + { + "cell_type": "code", + "execution_count": 2, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Staging Protocol Changes...\n", + "Data Migrated to latest version !!!\n", + "Logged into as \n" + ] + }, + { + "data": { + "text/html": [ + "
SyftWarning: You are using a default password. Please change the password using `[your_client].me.set_password([new_password])`.

" + ], + "text/plain": [ + "SyftWarning: You are using a default password. Please change the password using `[your_client].me.set_password([new_password])`." + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "node = sy.orchestra.launch(name=\"test-domain-helm2\", dev_mode=True,\n", + " reset=True,\n", + " n_consumers=4,\n", + " create_producer=True)\n", + "client = node.login(email=\"info@openmined.org\", password=\"changethis\")\n", + "# client = sy.login(url=\"http://localhost:8080\", email=\"info@openmined.org\", password=\"changethis\")" + ] + }, + { + "cell_type": "code", + "execution_count": 3, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "from syft.store.blob_storage import BlobStorageConfig, BlobStorageClientConfig\n", + "from syft.store.blob_storage.seaweedfs import SeaweedFSClient, SeaweedFSClientConfig\n", + "blob_config = BlobStorageConfig(client_type=SeaweedFSClient,\n", + " client_config=SeaweedFSClientConfig(host=\"http://0.0.0.0\",\n", + " port=\"8333\",\n", + " access_key=\"admin\",\n", + " secret_key=\"admin\",\n", + " bucket_name=\"test_bucket\",\n", + " region=\"us-east-1\", \n", + " # mount_port=4001\n", + " )\n", + ")\n", + "node.python_node.init_blob_storage(blob_config)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": 4, + "metadata": {}, + "outputs": [], + "source": [ + "os.environ[\"RED_TEAM_AZURE_KEY\"] = \"t7Y5enmCiG2k8o5rvItSn3Ak9tHaVTXQUTn1LQ74jQ1g5bjvs0ui/O2FXJeDaCsfI6xMPz0txtoH+AStss/Xmg==\"" + ] + }, + { + "cell_type": "code", + "execution_count": 5, + "metadata": {}, + "outputs": [], + "source": [ + "# client.api.services.blob_storage.mount_azure(\n", + "# account_name='redteamtest',\n", + "# container_name='manual-test',\n", + "# account_key=os.environ[\"RED_TEAM_AZURE_KEY\"],\n", + "# bucket_name='bucket2',\n", + "# )" + ] + }, + { + "cell_type": "code", + "execution_count": 6, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "{'ResponseMetadata': {'RequestId': '1702049867443665606', 'HostId': '', 'HTTPStatusCode': 200, 'HTTPHeaders': {'accept-ranges': 'bytes', 'content-length': '668', 'content-type': 'application/xml', 'server': 'SeaweedFS S3', 'x-amz-request-id': '1702049867443665606', 'date': 'Fri, 08 Dec 2023 15:37:47 GMT'}, 'RetryAttempts': 0}, 'IsTruncated': False, 'Marker': '', 'Contents': [{'Key': '0266f72a-edae-4812-8ce2-ea2a57b52529.txt', 'LastModified': datetime.datetime(2023, 9, 13, 12, 8, 7, tzinfo=tzutc()), 'ETag': '\"e1de79d74ebc\"', 'Size': 13, 'StorageClass': 'STANDARD', 'Owner': {'ID': '0'}}, {'Key': 'example.txt', 'LastModified': datetime.datetime(2023, 9, 14, 16, 20, 32, tzinfo=tzutc()), 'ETag': '\"f5a69a6dae5e\"', 'Size': 13, 'StorageClass': 'STANDARD', 'Owner': {'ID': '0'}}], 'Name': 'test', 'Prefix': '', 'MaxKeys': 10000}\n", + "[{'Key': '0266f72a-edae-4812-8ce2-ea2a57b52529.txt', 'LastModified': datetime.datetime(2023, 9, 13, 12, 8, 7, tzinfo=tzutc()), 'ETag': '\"e1de79d74ebc\"', 'Size': 13, 'StorageClass': 'STANDARD', 'Owner': {'ID': '0'}}, {'Key': 'example.txt', 'LastModified': datetime.datetime(2023, 9, 14, 16, 20, 32, tzinfo=tzutc()), 'ETag': '\"f5a69a6dae5e\"', 'Size': 13, 'StorageClass': 'STANDARD', 'Owner': {'ID': '0'}}]\n" + ] + } + ], + "source": [ + "from syft.service.blob_storage.remote_profile import AzureRemoteProfile\n", + "from syft.types.blob_storage import AzureSecureFilePathLocation, BlobFileType, BlobStorageEntry, SecureFilePathLocation\n", + "\n", + "\n", + "res = node.python_node.blob_storage_client.connect().client.list_objects(\n", + " Bucket='test'\n", + ")\n", + "print(res)\n", + "objects = res[\"Contents\"]\n", + "account_name = 'redteamtest'\n", + "account_key = \"t7Y5enmCiG2k8o5rvItSn3Ak9tHaVTXQUTn1LQ74jQ1g5bjvs0ui/O2FXJeDaCsfI6xMPz0txtoH+AStss/Xmg==\"\n", + "container_name = 'manual-test'\n", + "\n", + "remote_name = f\"{account_name}-{container_name}\"\n", + "new_profile = AzureRemoteProfile(\n", + " profile_name=remote_name,\n", + " account_name=account_name,\n", + " account_key=account_key,\n", + " container_name=container_name,\n", + ")\n", + "blob_storage_service = node.python_node.get_service(\"BlobStorageService\")\n", + "res = blob_storage_service.remote_profile_stash.set(client.credentials.verify_key, new_profile)\n", + "\n", + "remote_profile = res.ok()\n", + "seaweed_config = node.python_node.blob_storage_client.config\n", + "seaweed_config.remote_profiles[remote_name] = remote_profile \n", + "\n", + "print(objects)\n", + "file_sizes = [object[\"Size\"] for object in objects]\n", + "file_paths = [object[\"Key\"] for object in objects]\n", + "secure_file_paths = [\n", + " AzureSecureFilePathLocation(\n", + " path=file_path,\n", + " azure_profile_name=remote_name,\n", + " bucket_name=\"test\",\n", + " ) for file_path in file_paths\n", + "]\n", + "\n", + "for sfp, file_size in zip(secure_file_paths, file_sizes):\n", + " blob_storage_entry = BlobStorageEntry(\n", + " location=sfp,\n", + " uploaded_by=client.credentials.verify_key,\n", + " file_size=file_size,\n", + " type_=BlobFileType,\n", + " bucket_name=\"test\",\n", + " )\n", + " node.python_node.get_service(\"BlobStorageService\").stash.set(client.credentials, blob_storage_entry)\n" + ] + }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'Hello, World!'" + ] + }, + "execution_count": 12, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.api.services.blob_storage.get_files_from_bucket(\"test\")[1].read()" + ] + }, + { + "cell_type": "code", + "execution_count": 8, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "" + ] + }, + "execution_count": 8, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "node.python_node.blob_storage_client.connect().client" + ] + }, + { + "cell_type": "code", + "execution_count": 9, + "metadata": {}, + "outputs": [], + "source": [ + "from datetime import datetime, timedelta\n", + "from azure.storage.blob import BlobClient, generate_blob_sas, BlobSasPermissions\n", + "\n", + "def get_blob_sas(account_name,account_key, container_name, blob_name):\n", + " sas_blob = generate_blob_sas(account_name=account_name, \n", + " container_name=container_name,\n", + " blob_name=blob_name,\n", + " account_key=account_key,\n", + " permission=BlobSasPermissions(read=True),\n", + " expiry=datetime.utcnow() + timedelta(hours=1))\n", + " return sas_blob" + ] + }, + { + "cell_type": "code", + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "account_name = 'redteamtest'\n", + "account_key = \"t7Y5enmCiG2k8o5rvItSn3Ak9tHaVTXQUTn1LQ74jQ1g5bjvs0ui/O2FXJeDaCsfI6xMPz0txtoH+AStss/Xmg==\"\n", + "container_name = 'manual-test'\n", + "blob_name = 'example.txt'\n", + "\n", + "blob = get_blob_sas(account_name,account_key, container_name, blob_name)\n", + "url = 'https://'+account_name+'.blob.core.windows.net/'+container_name+'/'+blob_name+'?'+blob" + ] + }, + { + "cell_type": "code", + "execution_count": 11, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "'https://redteamtest.blob.core.windows.net/manual-test/example.txt?se=2023-12-08T16%3A37%3A47Z&sp=r&sv=2023-08-03&sr=b&sig=jHY%2B1fOi2Mtmim0pWUmcV6Yv9CSucsIBc6k4Vkhke8g%3D'" + ] + }, + "execution_count": 11, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "url" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "syft_3.11", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.11.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} From 2e1f7be3ba6671f816fdefc77dc5b98531276fcf Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 18:39:17 +0200 Subject: [PATCH 10/27] added azure storage blob requirement --- packages/syft/setup.cfg | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/setup.cfg b/packages/syft/setup.cfg index bd6c7d655ee..820cf55c13e 100644 --- a/packages/syft/setup.cfg +++ b/packages/syft/setup.cfg @@ -63,7 +63,7 @@ syft = pandas==1.5.3 docker==6.1.3 PyYAML==6.0.1 - + azure-storage-blob==12.19 install_requires = %(syft)s From 511bfe73e957ac012ad5330c7baf45f36cddaf0b Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 18:39:55 +0200 Subject: [PATCH 11/27] fixed azure path creation --- .../src/syft/protocol/protocol_version.json | 2 +- .../service/blob_storage/remote_profile.py | 37 +++++++++++++++++++ .../src/syft/service/blob_storage/service.py | 13 +++++-- packages/syft/src/syft/types/blob_storage.py | 9 ++--- 4 files changed, 52 insertions(+), 9 deletions(-) create mode 100644 packages/syft/src/syft/service/blob_storage/remote_profile.py diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 1b0c14f2597..2056bb52eff 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -960,7 +960,7 @@ "SeaweedSecureFilePathLocation": { "2": { "version": 2, - "hash": "35ad807e2282b97a6022649b2efa186d0c1705e2eb8f3753d55efda1b6510a9a", + "hash": "3ca49db7536a33d5712485164e95406000df9af2aed78e9f9fa2bb2bbbb34fe6", "action": "add" } }, diff --git a/packages/syft/src/syft/service/blob_storage/remote_profile.py b/packages/syft/src/syft/service/blob_storage/remote_profile.py new file mode 100644 index 00000000000..f1cc8627144 --- /dev/null +++ b/packages/syft/src/syft/service/blob_storage/remote_profile.py @@ -0,0 +1,37 @@ +# syft absolute +from syft.types.syft_object import SYFT_OBJECT_VERSION_1 +from syft.types.syft_object import SyftObject + +# relative +from ...serde.serializable import serializable +from ...store.document_store import BaseUIDStoreStash +from ...store.document_store import DocumentStore +from ...store.document_store import PartitionSettings + + +@serializable() +class RemoteProfile(SyftObject): + __canonical_name__ = "RemoteConfig" + __version__ = SYFT_OBJECT_VERSION_1 + + +@serializable() +class AzureRemoteProfile(RemoteProfile): + __canonical_name__ = "AzureRemoteConfig" + __version__ = SYFT_OBJECT_VERSION_1 + + profile_name: str # used by seaweedfs + account_name: str + account_key: str + container_name: str + + +@serializable() +class RemoteProfileStash(BaseUIDStoreStash): + object_type = RemoteProfile + settings: PartitionSettings = PartitionSettings( + name=RemoteProfile.__canonical_name__, object_type=RemoteProfile + ) + + def __init__(self, store: DocumentStore) -> None: + super().__init__(store=store) diff --git a/packages/syft/src/syft/service/blob_storage/service.py b/packages/syft/src/syft/service/blob_storage/service.py index 9bb034f6c5f..7c069fc2c8c 100644 --- a/packages/syft/src/syft/service/blob_storage/service.py +++ b/packages/syft/src/syft/service/blob_storage/service.py @@ -15,7 +15,7 @@ from ...store.blob_storage.seaweedfs import SeaweedFSBlobDeposit from ...store.document_store import DocumentStore from ...store.document_store import UIDPartitionKey -from ...types.blob_storage import BlobFileType +from ...types.blob_storage import AzureSecureFilePathLocation, BlobFileType from ...types.blob_storage import BlobStorageEntry from ...types.blob_storage import BlobStorageMetadata from ...types.blob_storage import CreateBlobStorageEntry @@ -68,7 +68,8 @@ def mount_azure( # TODO: fix arguments - remote_name = f"{account_name}-{container_name}" + remote_name = f"{account_name}{container_name}" + remote_name = ''.join(ch for ch in remote_name if ch.isalnum()) args_dict = { "account_name": account_name, "account_key": account_key, @@ -98,11 +99,17 @@ def mount_azure( res = context.node.blob_storage_client.connect().client.list_objects( Bucket=bucket_name ) + import pdb + pdb.set_trace() objects = res["Contents"] file_sizes = [object["Size"] for object in objects] file_paths = [object["Key"] for object in objects] secure_file_paths = [ - SecureFilePathLocation(path=file_path) for file_path in file_paths + AzureSecureFilePathLocation( + path=file_path, + azure_profile_name=remote_name, + bucket_name=bucket_name, + ) for file_path in file_paths for file_path in file_paths ] for sfp, file_size in zip(secure_file_paths, file_sizes): diff --git a/packages/syft/src/syft/types/blob_storage.py b/packages/syft/src/syft/types/blob_storage.py index 59e11d1b992..39e69a64160 100644 --- a/packages/syft/src/syft/types/blob_storage.py +++ b/packages/syft/src/syft/types/blob_storage.py @@ -163,7 +163,7 @@ class SecureFilePathLocation(SyftObject): def __repr__(self) -> str: return f"{self.path}" - def generate_url(self, connection, type_): + def generate_url(self, *args): raise NotImplementedError @serializable() @@ -179,13 +179,12 @@ class SeaweedSecureFilePathLocation(SecureFilePathLocation): __version__ = SYFT_OBJECT_VERSION_2 upload_id: str - bucket_name: str - def generate_url(self, connection, type_): + def generate_url(self, connection, type_, bucket_name): try: url = connection.client.generate_presigned_url( ClientMethod="get_object", - Params={"Bucket": self.bucket_name, "Key": self.path}, + Params={"Bucket": bucket_name, "Key": self.path}, ExpiresIn=READ_EXPIRATION_TIME, ) @@ -220,7 +219,7 @@ class AzureSecureFilePathLocation(SecureFilePathLocation): azure_profile_name: str # Used by Seaweedfs to refer to a remote config bucket_name: str - def generate_url(self, connection, type_, bucket_name): + def generate_url(self, connection, type_, *args): # SAS is almost the same thing as the presigned url config = connection.config.remote_profiles[self.azure_profile_name] account_name = config.account_name From b182017202aea6a6769869a65348871c9d584c93 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 8 Dec 2023 18:40:59 +0200 Subject: [PATCH 12/27] test for docker --- notebooks/helm/direct_azure.ipynb | 170 ++++++++++++++++-------------- 1 file changed, 91 insertions(+), 79 deletions(-) diff --git a/notebooks/helm/direct_azure.ipynb b/notebooks/helm/direct_azure.ipynb index e17ff61ed01..740586a09af 100644 --- a/notebooks/helm/direct_azure.ipynb +++ b/notebooks/helm/direct_azure.ipynb @@ -36,9 +36,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Staging Protocol Changes...\n", - "Data Migrated to latest version !!!\n", - "Logged into as \n" + "Logged into as \n" ] }, { @@ -55,12 +53,12 @@ } ], "source": [ - "node = sy.orchestra.launch(name=\"test-domain-helm2\", dev_mode=True,\n", - " reset=True,\n", - " n_consumers=4,\n", - " create_producer=True)\n", - "client = node.login(email=\"info@openmined.org\", password=\"changethis\")\n", - "# client = sy.login(url=\"http://localhost:8080\", email=\"info@openmined.org\", password=\"changethis\")" + "# node = sy.orchestra.launch(name=\"test-domain-helm2\", dev_mode=True,\n", + "# reset=True,\n", + "# n_consumers=4,\n", + "# create_producer=True)\n", + "# client = node.login(email=\"info@openmined.org\", password=\"changethis\")\n", + "client = sy.login(url=\"http://localhost:8080\", email=\"info@openmined.org\", password=\"changethis\")" ] }, { @@ -70,19 +68,19 @@ "outputs": [], "source": [ "\n", - "from syft.store.blob_storage import BlobStorageConfig, BlobStorageClientConfig\n", - "from syft.store.blob_storage.seaweedfs import SeaweedFSClient, SeaweedFSClientConfig\n", - "blob_config = BlobStorageConfig(client_type=SeaweedFSClient,\n", - " client_config=SeaweedFSClientConfig(host=\"http://0.0.0.0\",\n", - " port=\"8333\",\n", - " access_key=\"admin\",\n", - " secret_key=\"admin\",\n", - " bucket_name=\"test_bucket\",\n", - " region=\"us-east-1\", \n", - " # mount_port=4001\n", - " )\n", - ")\n", - "node.python_node.init_blob_storage(blob_config)" + "# from syft.store.blob_storage import BlobStorageConfig, BlobStorageClientConfig\n", + "# from syft.store.blob_storage.seaweedfs import SeaweedFSClient, SeaweedFSClientConfig\n", + "# blob_config = BlobStorageConfig(client_type=SeaweedFSClient,\n", + "# client_config=SeaweedFSClientConfig(host=\"http://0.0.0.0\",\n", + "# port=\"8333\",\n", + "# access_key=\"admin\",\n", + "# secret_key=\"admin\",\n", + "# bucket_name=\"test_bucket\",\n", + "# region=\"us-east-1\", \n", + "# # mount_port=4001\n", + "# )\n", + "# )\n", + "# node.python_node.init_blob_storage(blob_config)" ] }, { @@ -105,19 +103,33 @@ "cell_type": "code", "execution_count": 5, "metadata": {}, - "outputs": [], + "outputs": [ + { + "data": { + "text/html": [ + "
SyftSuccess: Mounting Azure Successful!

" + ], + "text/plain": [ + "SyftSuccess: Mounting Azure Successful!" + ] + }, + "execution_count": 5, + "metadata": {}, + "output_type": "execute_result" + } + ], "source": [ - "# client.api.services.blob_storage.mount_azure(\n", - "# account_name='redteamtest',\n", - "# container_name='manual-test',\n", - "# account_key=os.environ[\"RED_TEAM_AZURE_KEY\"],\n", - "# bucket_name='bucket2',\n", - "# )" + "client.api.services.blob_storage.mount_azure(\n", + " account_name='redteamtest',\n", + " container_name='manual-test',\n", + " account_key=os.environ[\"RED_TEAM_AZURE_KEY\"],\n", + " bucket_name='bucket2',\n", + ")" ] }, { "cell_type": "code", - "execution_count": 6, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -130,78 +142,78 @@ } ], "source": [ - "from syft.service.blob_storage.remote_profile import AzureRemoteProfile\n", - "from syft.types.blob_storage import AzureSecureFilePathLocation, BlobFileType, BlobStorageEntry, SecureFilePathLocation\n", + "# from syft.service.blob_storage.remote_profile import AzureRemoteProfile\n", + "# from syft.types.blob_storage import AzureSecureFilePathLocation, BlobFileType, BlobStorageEntry, SecureFilePathLocation\n", "\n", "\n", - "res = node.python_node.blob_storage_client.connect().client.list_objects(\n", - " Bucket='test'\n", - ")\n", - "print(res)\n", - "objects = res[\"Contents\"]\n", - "account_name = 'redteamtest'\n", - "account_key = \"t7Y5enmCiG2k8o5rvItSn3Ak9tHaVTXQUTn1LQ74jQ1g5bjvs0ui/O2FXJeDaCsfI6xMPz0txtoH+AStss/Xmg==\"\n", - "container_name = 'manual-test'\n", + "# res = node.python_node.blob_storage_client.connect().client.list_objects(\n", + "# Bucket='test'\n", + "# )\n", + "# print(res)\n", + "# objects = res[\"Contents\"]\n", + "# account_name = 'redteamtest'\n", + "# account_key = \"t7Y5enmCiG2k8o5rvItSn3Ak9tHaVTXQUTn1LQ74jQ1g5bjvs0ui/O2FXJeDaCsfI6xMPz0txtoH+AStss/Xmg==\"\n", + "# container_name = 'manual-test'\n", "\n", - "remote_name = f\"{account_name}-{container_name}\"\n", - "new_profile = AzureRemoteProfile(\n", - " profile_name=remote_name,\n", - " account_name=account_name,\n", - " account_key=account_key,\n", - " container_name=container_name,\n", - ")\n", - "blob_storage_service = node.python_node.get_service(\"BlobStorageService\")\n", - "res = blob_storage_service.remote_profile_stash.set(client.credentials.verify_key, new_profile)\n", + "# remote_name = f\"{account_name}-{container_name}\"\n", + "# new_profile = AzureRemoteProfile(\n", + "# profile_name=remote_name,\n", + "# account_name=account_name,\n", + "# account_key=account_key,\n", + "# container_name=container_name,\n", + "# )\n", + "# blob_storage_service = node.python_node.get_service(\"BlobStorageService\")\n", + "# res = blob_storage_service.remote_profile_stash.set(client.credentials.verify_key, new_profile)\n", "\n", - "remote_profile = res.ok()\n", - "seaweed_config = node.python_node.blob_storage_client.config\n", - "seaweed_config.remote_profiles[remote_name] = remote_profile \n", + "# remote_profile = res.ok()\n", + "# seaweed_config = node.python_node.blob_storage_client.config\n", + "# seaweed_config.remote_profiles[remote_name] = remote_profile \n", "\n", - "print(objects)\n", - "file_sizes = [object[\"Size\"] for object in objects]\n", - "file_paths = [object[\"Key\"] for object in objects]\n", - "secure_file_paths = [\n", - " AzureSecureFilePathLocation(\n", - " path=file_path,\n", - " azure_profile_name=remote_name,\n", - " bucket_name=\"test\",\n", - " ) for file_path in file_paths\n", - "]\n", + "# print(objects)\n", + "# file_sizes = [object[\"Size\"] for object in objects]\n", + "# file_paths = [object[\"Key\"] for object in objects]\n", + "# secure_file_paths = [\n", + "# AzureSecureFilePathLocation(\n", + "# path=file_path,\n", + "# azure_profile_name=remote_name,\n", + "# bucket_name=\"test\",\n", + "# ) for file_path in file_paths\n", + "# ]\n", "\n", - "for sfp, file_size in zip(secure_file_paths, file_sizes):\n", - " blob_storage_entry = BlobStorageEntry(\n", - " location=sfp,\n", - " uploaded_by=client.credentials.verify_key,\n", - " file_size=file_size,\n", - " type_=BlobFileType,\n", - " bucket_name=\"test\",\n", - " )\n", - " node.python_node.get_service(\"BlobStorageService\").stash.set(client.credentials, blob_storage_entry)\n" + "# for sfp, file_size in zip(secure_file_paths, file_sizes):\n", + "# blob_storage_entry = BlobStorageEntry(\n", + "# location=sfp,\n", + "# uploaded_by=client.credentials.verify_key,\n", + "# file_size=file_size,\n", + "# type_=BlobFileType,\n", + "# bucket_name=\"test\",\n", + "# )\n", + "# node.python_node.get_service(\"BlobStorageService\").stash.set(client.credentials, blob_storage_entry)\n" ] }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 9, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "b'Hello, World!'" + "b'Hello world!\\n'" ] }, - "execution_count": 12, + "execution_count": 9, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "client.api.services.blob_storage.get_files_from_bucket(\"test\")[1].read()" + "client.api.services.blob_storage.get_files_from_bucket(\"bucket2\")[0].read()" ] }, { "cell_type": "code", - "execution_count": 8, + "execution_count": null, "metadata": {}, "outputs": [ { @@ -221,7 +233,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -240,7 +252,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": null, "metadata": {}, "outputs": [], "source": [ @@ -255,7 +267,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": null, "metadata": {}, "outputs": [ { From ad546c15ce6913cc08aa4e8d2cdf62ea5e8940f8 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Thu, 14 Dec 2023 09:37:50 +0000 Subject: [PATCH 13/27] lint en remove pdb --- .../syft/service/blob_storage/remote_profile.py | 6 ++---- .../syft/src/syft/service/blob_storage/service.py | 15 ++++++++------- packages/syft/src/syft/types/blob_storage.py | 12 +++++++----- 3 files changed, 17 insertions(+), 16 deletions(-) diff --git a/packages/syft/src/syft/service/blob_storage/remote_profile.py b/packages/syft/src/syft/service/blob_storage/remote_profile.py index f1cc8627144..8bd92bc9f91 100644 --- a/packages/syft/src/syft/service/blob_storage/remote_profile.py +++ b/packages/syft/src/syft/service/blob_storage/remote_profile.py @@ -1,12 +1,10 @@ -# syft absolute -from syft.types.syft_object import SYFT_OBJECT_VERSION_1 -from syft.types.syft_object import SyftObject - # relative from ...serde.serializable import serializable from ...store.document_store import BaseUIDStoreStash from ...store.document_store import DocumentStore from ...store.document_store import PartitionSettings +from ...types.syft_object import SYFT_OBJECT_VERSION_1 +from ...types.syft_object import SyftObject @serializable() diff --git a/packages/syft/src/syft/service/blob_storage/service.py b/packages/syft/src/syft/service/blob_storage/service.py index 7c069fc2c8c..9cb2e4f90ad 100644 --- a/packages/syft/src/syft/service/blob_storage/service.py +++ b/packages/syft/src/syft/service/blob_storage/service.py @@ -15,11 +15,11 @@ from ...store.blob_storage.seaweedfs import SeaweedFSBlobDeposit from ...store.document_store import DocumentStore from ...store.document_store import UIDPartitionKey -from ...types.blob_storage import AzureSecureFilePathLocation, BlobFileType +from ...types.blob_storage import AzureSecureFilePathLocation +from ...types.blob_storage import BlobFileType from ...types.blob_storage import BlobStorageEntry from ...types.blob_storage import BlobStorageMetadata from ...types.blob_storage import CreateBlobStorageEntry -from ...types.blob_storage import SecureFilePathLocation from ...types.uid import UID from ..context import AuthedServiceContext from ..response import SyftError @@ -69,7 +69,7 @@ def mount_azure( # TODO: fix arguments remote_name = f"{account_name}{container_name}" - remote_name = ''.join(ch for ch in remote_name if ch.isalnum()) + remote_name = "".join(ch for ch in remote_name if ch.isalnum()) args_dict = { "account_name": account_name, "account_key": account_key, @@ -99,17 +99,18 @@ def mount_azure( res = context.node.blob_storage_client.connect().client.list_objects( Bucket=bucket_name ) - import pdb - pdb.set_trace() + # stdlib objects = res["Contents"] file_sizes = [object["Size"] for object in objects] file_paths = [object["Key"] for object in objects] secure_file_paths = [ AzureSecureFilePathLocation( - path=file_path, + path=file_path, azure_profile_name=remote_name, bucket_name=bucket_name, - ) for file_path in file_paths for file_path in file_paths + ) + for file_path in file_paths + for file_path in file_paths ] for sfp, file_size in zip(secure_file_paths, file_sizes): diff --git a/packages/syft/src/syft/types/blob_storage.py b/packages/syft/src/syft/types/blob_storage.py index 39e69a64160..5a077bd6da1 100644 --- a/packages/syft/src/syft/types/blob_storage.py +++ b/packages/syft/src/syft/types/blob_storage.py @@ -165,7 +165,8 @@ def __repr__(self) -> str: def generate_url(self, *args): raise NotImplementedError - + + @serializable() class SeaweedSecureFilePathLocationV1(SecureFilePathLocation): __canonical_name__ = "SeaweedSecureFilePathLocation" @@ -173,6 +174,7 @@ class SeaweedSecureFilePathLocationV1(SecureFilePathLocation): upload_id: str + @serializable() class SeaweedSecureFilePathLocation(SecureFilePathLocation): __canonical_name__ = "SeaweedSecureFilePathLocation" @@ -196,12 +198,12 @@ def generate_url(self, connection, type_, bucket_name): ) except BotoClientError as e: raise SyftException(e) - + + @migrate(SeaweedSecureFilePathLocationV1, SeaweedSecureFilePathLocation) def upgrade_seaweedsecurefilepathlocation_v1_to_v2(): - return [ - make_set_default("bucket_name", "") - ] + return [make_set_default("bucket_name", "")] + @migrate(SeaweedSecureFilePathLocation, SeaweedSecureFilePathLocationV1) def downgrade_seaweedsecurefilepathlocation_v2_to_v1(): From 8e210b52f76083b3455b8560f463e30867fafc24 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Thu, 14 Dec 2023 12:17:56 +0000 Subject: [PATCH 14/27] update notebook with test --- notebooks/helm/direct_azure.ipynb | 825 +++++++++++++++++++++++++----- 1 file changed, 694 insertions(+), 131 deletions(-) diff --git a/notebooks/helm/direct_azure.ipynb b/notebooks/helm/direct_azure.ipynb index 740586a09af..f210caa8f5e 100644 --- a/notebooks/helm/direct_azure.ipynb +++ b/notebooks/helm/direct_azure.ipynb @@ -9,7 +9,7 @@ "name": "stderr", "output_type": "stream", "text": [ - "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /home/teo/OpenMined/PySyft\n" + "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /Users/koen/workspace/pysyft\n" ] } ], @@ -18,15 +18,6 @@ "import os" ] }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "```\n", - "docker run --entrypoint /bin/sh -p 8333:8333 -p 8888:8888 chrislusf/seaweedfs -c \"echo 's3.configure -access_key admin -secret_key admin -user iam -actions Read,Write,List,Tagging,Admin -apply' | weed shell > /dev/null 2>&1 & weed server -s3 -s3.port=8333 -master.volumeSizeLimitMB=2048\"\n", - "```" - ] - }, { "cell_type": "code", "execution_count": 2, @@ -36,7 +27,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Logged into as \n" + "Logged into as \n" ] }, { @@ -94,15 +85,6 @@ "cell_type": "code", "execution_count": 4, "metadata": {}, - "outputs": [], - "source": [ - "os.environ[\"RED_TEAM_AZURE_KEY\"] = \"t7Y5enmCiG2k8o5rvItSn3Ak9tHaVTXQUTn1LQ74jQ1g5bjvs0ui/O2FXJeDaCsfI6xMPz0txtoH+AStss/Xmg==\"" - ] - }, - { - "cell_type": "code", - "execution_count": 5, - "metadata": {}, "outputs": [ { "data": { @@ -113,182 +95,750 @@ "SyftSuccess: Mounting Azure Successful!" ] }, - "execution_count": 5, + "execution_count": 4, "metadata": {}, "output_type": "execute_result" } ], "source": [ "client.api.services.blob_storage.mount_azure(\n", - " account_name='redteamtest',\n", - " container_name='manual-test',\n", - " account_key=os.environ[\"RED_TEAM_AZURE_KEY\"],\n", - " bucket_name='bucket2',\n", + " account_name='helmprojectstorage',\n", + " container_name='helm',\n", + " account_key=os.environ[\"HELM_STORAGE_ACCOUNT_KEY\"],\n", + " bucket_name='helmazurebucket',\n", ")" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 5, "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "{'ResponseMetadata': {'RequestId': '1702049867443665606', 'HostId': '', 'HTTPStatusCode': 200, 'HTTPHeaders': {'accept-ranges': 'bytes', 'content-length': '668', 'content-type': 'application/xml', 'server': 'SeaweedFS S3', 'x-amz-request-id': '1702049867443665606', 'date': 'Fri, 08 Dec 2023 15:37:47 GMT'}, 'RetryAttempts': 0}, 'IsTruncated': False, 'Marker': '', 'Contents': [{'Key': '0266f72a-edae-4812-8ce2-ea2a57b52529.txt', 'LastModified': datetime.datetime(2023, 9, 13, 12, 8, 7, tzinfo=tzutc()), 'ETag': '\"e1de79d74ebc\"', 'Size': 13, 'StorageClass': 'STANDARD', 'Owner': {'ID': '0'}}, {'Key': 'example.txt', 'LastModified': datetime.datetime(2023, 9, 14, 16, 20, 32, tzinfo=tzutc()), 'ETag': '\"f5a69a6dae5e\"', 'Size': 13, 'StorageClass': 'STANDARD', 'Owner': {'ID': '0'}}], 'Name': 'test', 'Prefix': '', 'MaxKeys': 10000}\n", - "[{'Key': '0266f72a-edae-4812-8ce2-ea2a57b52529.txt', 'LastModified': datetime.datetime(2023, 9, 13, 12, 8, 7, tzinfo=tzutc()), 'ETag': '\"e1de79d74ebc\"', 'Size': 13, 'StorageClass': 'STANDARD', 'Owner': {'ID': '0'}}, {'Key': 'example.txt', 'LastModified': datetime.datetime(2023, 9, 14, 16, 20, 32, tzinfo=tzutc()), 'ETag': '\"f5a69a6dae5e\"', 'Size': 13, 'StorageClass': 'STANDARD', 'Owner': {'ID': '0'}}]\n" - ] - } - ], + "outputs": [], "source": [ - "# from syft.service.blob_storage.remote_profile import AzureRemoteProfile\n", - "# from syft.types.blob_storage import AzureSecureFilePathLocation, BlobFileType, BlobStorageEntry, SecureFilePathLocation\n", - "\n", - "\n", - "# res = node.python_node.blob_storage_client.connect().client.list_objects(\n", - "# Bucket='test'\n", - "# )\n", - "# print(res)\n", - "# objects = res[\"Contents\"]\n", - "# account_name = 'redteamtest'\n", - "# account_key = \"t7Y5enmCiG2k8o5rvItSn3Ak9tHaVTXQUTn1LQ74jQ1g5bjvs0ui/O2FXJeDaCsfI6xMPz0txtoH+AStss/Xmg==\"\n", - "# container_name = 'manual-test'\n", - "\n", - "# remote_name = f\"{account_name}-{container_name}\"\n", - "# new_profile = AzureRemoteProfile(\n", - "# profile_name=remote_name,\n", - "# account_name=account_name,\n", - "# account_key=account_key,\n", - "# container_name=container_name,\n", - "# )\n", - "# blob_storage_service = node.python_node.get_service(\"BlobStorageService\")\n", - "# res = blob_storage_service.remote_profile_stash.set(client.credentials.verify_key, new_profile)\n", - "\n", - "# remote_profile = res.ok()\n", - "# seaweed_config = node.python_node.blob_storage_client.config\n", - "# seaweed_config.remote_profiles[remote_name] = remote_profile \n", - "\n", - "# print(objects)\n", - "# file_sizes = [object[\"Size\"] for object in objects]\n", - "# file_paths = [object[\"Key\"] for object in objects]\n", - "# secure_file_paths = [\n", - "# AzureSecureFilePathLocation(\n", - "# path=file_path,\n", - "# azure_profile_name=remote_name,\n", - "# bucket_name=\"test\",\n", - "# ) for file_path in file_paths\n", - "# ]\n", - "\n", - "# for sfp, file_size in zip(secure_file_paths, file_sizes):\n", - "# blob_storage_entry = BlobStorageEntry(\n", - "# location=sfp,\n", - "# uploaded_by=client.credentials.verify_key,\n", - "# file_size=file_size,\n", - "# type_=BlobFileType,\n", - "# bucket_name=\"test\",\n", - "# )\n", - "# node.python_node.get_service(\"BlobStorageService\").stash.set(client.credentials, blob_storage_entry)\n" + "files = client.api.services.blob_storage.get_files_from_bucket(\"helmazurebucket\")" ] }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 6, "metadata": {}, "outputs": [ { "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "
\n", + "
\n", + "
\n", + "

BlobFile List

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + "\n", + "

0

\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + " \n" + ], "text/plain": [ - "b'Hello world!\\n'" + "[syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile]" ] }, - "execution_count": 9, + "execution_count": 6, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "client.api.services.blob_storage.get_files_from_bucket(\"bucket2\")[0].read()" + "files" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 10, + "metadata": {}, + "outputs": [], + "source": [ + "file = [f for f in files if f.file_name==\"test.json\"][0]" + ] + }, + { + "cell_type": "code", + "execution_count": 11, "metadata": {}, "outputs": [ { "data": { + "text/markdown": [ + "```python\n", + "class BlobFile:\n", + " id: str = bf18dfa66d144e828cf6af41078233ad\n", + " file_name: str = \"test.json\"\n", + "\n", + "```" + ], "text/plain": [ - "" + "syft.types.blob_storage.BlobFile" ] }, - "execution_count": 8, + "execution_count": 11, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "node.python_node.blob_storage_client.connect().client" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "from datetime import datetime, timedelta\n", - "from azure.storage.blob import BlobClient, generate_blob_sas, BlobSasPermissions\n", - "\n", - "def get_blob_sas(account_name,account_key, container_name, blob_name):\n", - " sas_blob = generate_blob_sas(account_name=account_name, \n", - " container_name=container_name,\n", - " blob_name=blob_name,\n", - " account_key=account_key,\n", - " permission=BlobSasPermissions(read=True),\n", - " expiry=datetime.utcnow() + timedelta(hours=1))\n", - " return sas_blob" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "account_name = 'redteamtest'\n", - "account_key = \"t7Y5enmCiG2k8o5rvItSn3Ak9tHaVTXQUTn1LQ74jQ1g5bjvs0ui/O2FXJeDaCsfI6xMPz0txtoH+AStss/Xmg==\"\n", - "container_name = 'manual-test'\n", - "blob_name = 'example.txt'\n", - "\n", - "blob = get_blob_sas(account_name,account_key, container_name, blob_name)\n", - "url = 'https://'+account_name+'.blob.core.windows.net/'+container_name+'/'+blob_name+'?'+blob" + "file" ] }, { "cell_type": "code", - "execution_count": null, + "execution_count": 12, "metadata": {}, "outputs": [ { "data": { "text/plain": [ - "'https://redteamtest.blob.core.windows.net/manual-test/example.txt?se=2023-12-08T16%3A37%3A47Z&sp=r&sv=2023-08-03&sr=b&sig=jHY%2B1fOi2Mtmim0pWUmcV6Yv9CSucsIBc6k4Vkhke8g%3D'" + "b'{\\n\"abc\": \"def\"\\n}'" ] }, - "execution_count": 11, + "execution_count": 12, "metadata": {}, "output_type": "execute_result" } ], "source": [ - "url" + "file.read()" ] } ], "metadata": { "kernelspec": { - "display_name": "syft_3.11", + "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, @@ -302,7 +852,20 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.4" + "version": "3.9.16" + }, + "toc": { + "base_numbering": 1, + "nav_menu": {}, + "number_sections": true, + "sideBar": true, + "skip_h1_title": false, + "title_cell": "Table of Contents", + "title_sidebar": "Contents", + "toc_cell": false, + "toc_position": {}, + "toc_section_display": true, + "toc_window_display": true } }, "nbformat": 4, From d35be07869a171d4a3d1dfc163b914b6a7112b21 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Thu, 14 Dec 2023 14:11:21 +0000 Subject: [PATCH 15/27] some comments + persist and load remote profiles --- notebooks/helm/direct_azure.ipynb | 160 +++++++++--------- packages/syft/src/syft/node/node.py | 13 ++ .../src/syft/service/blob_storage/service.py | 13 +- .../src/syft/store/blob_storage/seaweedfs.py | 2 + 4 files changed, 99 insertions(+), 89 deletions(-) diff --git a/notebooks/helm/direct_azure.ipynb b/notebooks/helm/direct_azure.ipynb index f210caa8f5e..0db87718008 100644 --- a/notebooks/helm/direct_azure.ipynb +++ b/notebooks/helm/direct_azure.ipynb @@ -2,17 +2,9 @@ "cells": [ { "cell_type": "code", - "execution_count": 1, + "execution_count": 40, "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /Users/koen/workspace/pysyft\n" - ] - } - ], + "outputs": [], "source": [ "import syft as sy\n", "import os" @@ -20,14 +12,14 @@ }, { "cell_type": "code", - "execution_count": 2, + "execution_count": 41, "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ - "Logged into as \n" + "Logged into as \n" ] }, { @@ -54,7 +46,7 @@ }, { "cell_type": "code", - "execution_count": 3, + "execution_count": 34, "metadata": {}, "outputs": [], "source": [ @@ -76,14 +68,7 @@ }, { "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - }, - { - "cell_type": "code", - "execution_count": 4, + "execution_count": 35, "metadata": {}, "outputs": [ { @@ -95,7 +80,7 @@ "SyftSuccess: Mounting Azure Successful!" ] }, - "execution_count": 4, + "execution_count": 35, "metadata": {}, "output_type": "execute_result" } @@ -111,7 +96,7 @@ }, { "cell_type": "code", - "execution_count": 5, + "execution_count": 42, "metadata": {}, "outputs": [], "source": [ @@ -120,7 +105,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 43, "metadata": {}, "outputs": [ { @@ -329,7 +314,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table96755d8ddaea4702b6f54fec2e4d89e9 {\n", + " .grid-table1f0c653a8f0140bb98abde83607cb8db {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(8, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -501,25 +486,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -769,7 +754,7 @@ " syft.types.blob_storage.BlobFile]" ] }, - "execution_count": 6, + "execution_count": 43, "metadata": {}, "output_type": "execute_result" } @@ -780,7 +765,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 44, "metadata": {}, "outputs": [], "source": [ @@ -789,7 +774,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 45, "metadata": {}, "outputs": [ { @@ -797,7 +782,7 @@ "text/markdown": [ "```python\n", "class BlobFile:\n", - " id: str = bf18dfa66d144e828cf6af41078233ad\n", + " id: str = cb2ef738082c49418ed70eb05a193770\n", " file_name: str = \"test.json\"\n", "\n", "```" @@ -806,7 +791,7 @@ "syft.types.blob_storage.BlobFile" ] }, - "execution_count": 11, + "execution_count": 45, "metadata": {}, "output_type": "execute_result" } @@ -817,7 +802,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 46, "metadata": {}, "outputs": [ { @@ -826,7 +811,7 @@ "b'{\\n\"abc\": \"def\"\\n}'" ] }, - "execution_count": 12, + "execution_count": 46, "metadata": {}, "output_type": "execute_result" } @@ -834,6 +819,13 @@ "source": [ "file.read()" ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index c77afa38233..58f99bda06b 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -376,6 +376,19 @@ def init_blob_storage(self, config: Optional[BlobStorageConfig] = None) -> None: self.blob_store_config = config_ self.blob_storage_client = config_.client_type(config=config_.client_config) + # relative + from ..store.blob_storage.seaweedfs import SeaweedFSConfig + + if isinstance(config, SeaweedFSConfig): + blob_storage_service = self.get_service(BlobStorageService) + remote_profiles = blob_storage_service.remote_profile_stash.get_all( + credentials=self.signing_key.verify_key, has_permission=True + ).ok() + for remote_profile in remote_profiles: + self.blob_store_config.client_config.remote_profiles[ + remote_profile.profile_name + ] = remote_profile + def stop(self): for consumer_list in self.queue_manager.consumers.values(): for c in consumer_list: diff --git a/packages/syft/src/syft/service/blob_storage/service.py b/packages/syft/src/syft/service/blob_storage/service.py index 9cb2e4f90ad..f485c2e67bf 100644 --- a/packages/syft/src/syft/service/blob_storage/service.py +++ b/packages/syft/src/syft/service/blob_storage/service.py @@ -89,6 +89,8 @@ def mount_azure( return SyftError(message=res.value) remote_profile = res.ok() seaweed_config = context.node.blob_storage_client.config + # we cache this here such that we can use it when reading a file from azure + # from the remote_name seaweed_config.remote_profiles[remote_name] = remote_profile # TODO: possible wrap this in try catch @@ -110,7 +112,6 @@ def mount_azure( bucket_name=bucket_name, ) for file_path in file_paths - for file_path in file_paths ] for sfp, file_size in zip(secure_file_paths, file_sizes): @@ -138,11 +139,13 @@ def get_files_from_bucket(self, context: AuthedServiceContext, bucket_name: str) blob_files = [] for bse in bse_list: self.stash.set(obj=bse, credentials=context.credentials) - # We create an empty ActionObject and set its blob_storage_entry to bse - # so that we can call reloac_cache where - # we create the BlobRetrieval (user needs permission to do this) + # We create an empty ActionObject and set its blob_storage_entry_id to bse.id + # such that we can call reload_cache which creates + # the BlobRetrieval (user needs permission to do this) # This could be a BlobRetrievalByURL that creates a BlobFile - # and then sets it in the cache (it does not contain the data, only the BlobFile) + # and then sets it in the cache (it does not contain the data, only the BlobFile). + # In the client, when reading the file, we will creates **another**, blobretrieval + # object to read the actual data blob_file = ActionObject.empty() blob_file.syft_blob_storage_entry_id = bse.id blob_file.syft_client_verify_key = context.credentials diff --git a/packages/syft/src/syft/store/blob_storage/seaweedfs.py b/packages/syft/src/syft/store/blob_storage/seaweedfs.py index 246edabe6d4..2a27fc2518a 100644 --- a/packages/syft/src/syft/store/blob_storage/seaweedfs.py +++ b/packages/syft/src/syft/store/blob_storage/seaweedfs.py @@ -167,6 +167,8 @@ def read( ) -> BlobRetrieval: if bucket_name is None: bucket_name = self.default_bucket_name + # this will generate the url, the SecureFilePathLocation also handles the logic + # that decides whether to use a direct connection to azure/aws/gcp or via seaweed return fp.generate_url(self, type_, bucket_name) def allocate( From 9231e1a3b8ef0f8b194438a83ca882b345e5f7ed Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Thu, 14 Dec 2023 14:29:51 +0000 Subject: [PATCH 16/27] recompute protocol verison --- .../src/syft/protocol/protocol_version.json | 50 +++++++++---------- 1 file changed, 25 insertions(+), 25 deletions(-) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 60a6b2f6b45..980ed650518 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -271,7 +271,7 @@ "BlobRetrievalByURL": { "1": { "version": 1, - "hash": "18fd860cb9de296532fc9ff075932e6a4377cc8f043dd88ed4f620517321077d", + "hash": "8059ee03016c4d74e408dad9529e877f91829672e0cc42d8cfff9c8e14058adc", "action": "add" } }, @@ -957,56 +957,56 @@ "action": "add" } }, - "SeaweedSecureFilePathLocation": { - "2": { - "version": 2, - "hash": "3ca49db7536a33d5712485164e95406000df9af2aed78e9f9fa2bb2bbbb34fe6", + "SyftWorker": { + "1": { + "version": 1, + "hash": "dfb3aba52b0223c16251148503e24606c73b1f3f2dfe5fbfac477899adf27420", "action": "add" } }, - "AzureSecureFilePathLocation": { + "WorkerPool": { "1": { "version": 1, - "hash": "1bb15f3f9d7082779f1c9f58de94011487924cb8a8c9c2ec18fd7c161c27fd0e", + "hash": "f498e61ec6d2ec916b7e1f25a67dc468f17c59239e77194cebf8b512993cb555", "action": "add" } - }, - "RemoteConfig": { + } + } + }, + "dev": { + "object_versions": { + "SyftWorkerImage": { "1": { "version": 1, - "hash": "ad7bc4780a8ad52e14ce68601852c93d2fe07bda489809cad7cae786d2461754", + "hash": "fd1067b0bb9a6e630a224162ed92f4746d4d5869bc104923ec48c4b9d597594c", "action": "add" } }, - "AzureRemoteConfig": { - "1": { - "version": 1, - "hash": "c05c6caa27db4e385c642536d4b0ecabc0c71e91220d2e6ce21a2761ca68a673", + "SeaweedSecureFilePathLocation": { + "2": { + "version": 2, + "hash": "3ca49db7536a33d5712485164e95406000df9af2aed78e9f9fa2bb2bbbb34fe6", "action": "add" } }, - "SyftWorker": { + "AzureSecureFilePathLocation": { "1": { "version": 1, - "hash": "dfb3aba52b0223c16251148503e24606c73b1f3f2dfe5fbfac477899adf27420", + "hash": "1bb15f3f9d7082779f1c9f58de94011487924cb8a8c9c2ec18fd7c161c27fd0e", "action": "add" } }, - "WorkerPool": { + "RemoteConfig": { "1": { "version": 1, - "hash": "f498e61ec6d2ec916b7e1f25a67dc468f17c59239e77194cebf8b512993cb555", + "hash": "ad7bc4780a8ad52e14ce68601852c93d2fe07bda489809cad7cae786d2461754", "action": "add" } - } - } - }, - "dev": { - "object_versions": { - "SyftWorkerImage": { + }, + "AzureRemoteConfig": { "1": { "version": 1, - "hash": "fd1067b0bb9a6e630a224162ed92f4746d4d5869bc104923ec48c4b9d597594c", + "hash": "c05c6caa27db4e385c642536d4b0ecabc0c71e91220d2e6ce21a2761ca68a673", "action": "add" } } From a89d1d588d0d0bf6a87632c7a410baf64093efb3 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Thu, 14 Dec 2023 14:40:34 +0000 Subject: [PATCH 17/27] recompute protocol verison --- .../syft/src/syft/protocol/protocol_version.json | 16 ++++++++++++++-- .../syft/src/syft/store/blob_storage/__init__.py | 10 +++++++++- 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 980ed650518..a84c764c70f 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -271,7 +271,7 @@ "BlobRetrievalByURL": { "1": { "version": 1, - "hash": "8059ee03016c4d74e408dad9529e877f91829672e0cc42d8cfff9c8e14058adc", + "hash": "18fd860cb9de296532fc9ff075932e6a4377cc8f043dd88ed4f620517321077d", "action": "add" } }, @@ -833,7 +833,7 @@ }, "2": { "version": 2, - "hash": "c78a998aa1c6d9700c775ca65a4efc9c6ae2d2ffdf677219f7b004e635b2be42", + "hash": "8059ee03016c4d74e408dad9529e877f91829672e0cc42d8cfff9c8e14058adc", "action": "add" } }, @@ -1009,6 +1009,18 @@ "hash": "c05c6caa27db4e385c642536d4b0ecabc0c71e91220d2e6ce21a2761ca68a673", "action": "add" } + }, + "BlobRetrievalByURL": { + "2": { + "version": 2, + "hash": "8059ee03016c4d74e408dad9529e877f91829672e0cc42d8cfff9c8e14058adc", + "action": "remove" + }, + "3": { + "version": 3, + "hash": "0b664100ea08413ca4ef04665ca910c2cf9535539617ea4ba33687d05cdfe747", + "action": "add" + } } } } diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index a6c459ad5e5..de3c65d844c 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -66,6 +66,7 @@ from ...types.syft_migration import migrate from ...types.syft_object import SYFT_OBJECT_VERSION_1 from ...types.syft_object import SYFT_OBJECT_VERSION_2 +from ...types.syft_object import SYFT_OBJECT_VERSION_3 from ...types.syft_object import SyftObject from ...types.transforms import drop from ...types.transforms import make_set_default @@ -168,10 +169,17 @@ class BlobRetrievalByURLV1(BlobRetrievalV1): url: GridURL +class BlobRetrievalByURLV2(BlobRetrievalV1): + __canonical_name__ = "BlobRetrievalByURL" + __version__ = SYFT_OBJECT_VERSION_2 + + url: GridURL + + @serializable() class BlobRetrievalByURL(BlobRetrieval): __canonical_name__ = "BlobRetrievalByURL" - __version__ = SYFT_OBJECT_VERSION_2 + __version__ = SYFT_OBJECT_VERSION_3 url: Union[GridURL, str] From 6282a635dd5f7fcdbf52881cc96ded1b36d8111b Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Thu, 14 Dec 2023 16:23:21 +0000 Subject: [PATCH 18/27] security fixes --- packages/syft/src/syft/store/sqlite_document_store.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/store/sqlite_document_store.py b/packages/syft/src/syft/store/sqlite_document_store.py index 43cd5e2952b..e128a63bc91 100644 --- a/packages/syft/src/syft/store/sqlite_document_store.py +++ b/packages/syft/src/syft/store/sqlite_document_store.py @@ -206,7 +206,7 @@ def _set(self, key: UID, value: Any) -> None: self._update(key, value) else: insert_sql = ( - f"insert into {self.table_name} (uid, repr, value) VALUES (?, ?, ?)" + f"insert into {self.table_name} (uid, repr, value) VALUES (?, ?, ?)" # nosec ) # nosec data = _serialize(value, to_bytes=True) res = self._execute(insert_sql, [str(key), _repr_debug_(value), data]) @@ -215,7 +215,7 @@ def _set(self, key: UID, value: Any) -> None: def _update(self, key: UID, value: Any) -> None: insert_sql = ( - f"update {self.table_name} set uid = ?, repr = ?, value = ? where uid = ?" + f"update {self.table_name} set uid = ?, repr = ?, value = ? where uid = ?" # nosec ) # nosec data = _serialize(value, to_bytes=True) res = self._execute(insert_sql, [str(key), _repr_debug_(value), data, str(key)]) From 59300655c7fd829e27e96263f8767361c2d77da4 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 15 Dec 2023 11:42:46 +0200 Subject: [PATCH 19/27] remove pdb trace --- packages/syft/src/syft/service/blob_storage/service.py | 2 -- 1 file changed, 2 deletions(-) diff --git a/packages/syft/src/syft/service/blob_storage/service.py b/packages/syft/src/syft/service/blob_storage/service.py index 7c069fc2c8c..ef9e15ec1c6 100644 --- a/packages/syft/src/syft/service/blob_storage/service.py +++ b/packages/syft/src/syft/service/blob_storage/service.py @@ -99,8 +99,6 @@ def mount_azure( res = context.node.blob_storage_client.connect().client.list_objects( Bucket=bucket_name ) - import pdb - pdb.set_trace() objects = res["Contents"] file_sizes = [object["Size"] for object in objects] file_paths = [object["Key"] for object in objects] From 4497eff015f6b66456eb30ab332b59ed31044d4c Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 15 Dec 2023 12:21:24 +0200 Subject: [PATCH 20/27] added node_obj_python id to mongo client config --- packages/syft/src/syft/node/node.py | 13 +++++++++++++ 1 file changed, 13 insertions(+) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index c77afa38233..5248467ece7 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -718,6 +718,13 @@ def init_stores( document_store = document_store_config.store_type self.document_store_config = document_store_config + # We add the python id of the current node in order + # to create one connection per Node object in MongoClientCache + # so that we avoid closing the connection from a + # different thread through the garbage collection + if isinstance(self.document_store_config, MongoStoreConfig): + self.document_store_config.client_config.node_obj_python_id = id(self) + self.document_store = document_store( root_verify_key=self.verify_key, store_config=document_store_config, @@ -741,6 +748,12 @@ def init_stores( root_verify_key=self.verify_key, ) elif isinstance(action_store_config, MongoStoreConfig): + # We add the python id of the current node in order + # to create one connection per Node object in MongoClientCache + # so that we avoid closing the connection from a + # different thread through the garbage collection + action_store_config.client_config.node_obj_python_id = id(self) + self.action_store = MongoActionStore( root_verify_key=self.verify_key, store_config=action_store_config ) From 0dedff2ddf2aa93973b540ea40e5a7db86db5a81 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 15 Dec 2023 12:21:45 +0200 Subject: [PATCH 21/27] fix repeating for --- packages/syft/src/syft/service/blob_storage/service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/blob_storage/service.py b/packages/syft/src/syft/service/blob_storage/service.py index ef9e15ec1c6..c61c9e84f09 100644 --- a/packages/syft/src/syft/service/blob_storage/service.py +++ b/packages/syft/src/syft/service/blob_storage/service.py @@ -107,7 +107,7 @@ def mount_azure( path=file_path, azure_profile_name=remote_name, bucket_name=bucket_name, - ) for file_path in file_paths for file_path in file_paths + ) for file_path in file_paths ] for sfp, file_size in zip(secure_file_paths, file_sizes): From 2ed1497c35110b034e4856fc93c90dddc1d89d31 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 15 Dec 2023 12:24:05 +0200 Subject: [PATCH 22/27] added exception for job stash errors --- packages/syft/src/syft/service/queue/queue.py | 5 ++++- 1 file changed, 4 insertions(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/queue/queue.py b/packages/syft/src/syft/service/queue/queue.py index 11f623edf93..9de19544144 100644 --- a/packages/syft/src/syft/service/queue/queue.py +++ b/packages/syft/src/syft/service/queue/queue.py @@ -235,7 +235,10 @@ def handle_message(message: bytes): credentials = queue_item.syft_client_verify_key - job_item = worker.job_stash.get_by_uid(credentials, queue_item.job_id).ok() + res = worker.job_stash.get_by_uid(credentials, queue_item.job_id) + if res.is_err(): + raise Exception(res.value) + job_item = res.ok() queue_item.status = Status.PROCESSING queue_item.node_uid = worker.id From e3b91de547659324b352eb851bdf19e3d7851075 Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 15 Dec 2023 12:25:44 +0200 Subject: [PATCH 23/27] added custom generator for request retries and custom timeout --- .../src/syft/store/blob_storage/__init__.py | 47 ++++++++++++++++++- 1 file changed, 45 insertions(+), 2 deletions(-) diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index a6c459ad5e5..2e0b02fa7ef 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -168,6 +168,49 @@ class BlobRetrievalByURLV1(BlobRetrievalV1): url: GridURL +def generate(blob_url, chunk_size): + max_tries = 20 + pending = None + start_byte = 0 + for attempt in range(max_tries): + try: + headers = {'Range': f'bytes={start_byte}-'} + with requests.get(str(blob_url), stream=True, headers=headers, timeout=(10, 10)) as response: + response.raise_for_status() + for chunk in response.iter_content( + chunk_size=chunk_size, decode_unicode=False + ): + start_byte += len(chunk) + if b'\n' in chunk: + if pending is not None: + chunk = pending + chunk + + lines = chunk.splitlines() + + if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: + pending = lines.pop() + else: + pending = None + + yield from lines + else: + if pending is None: + pending = chunk + else: + pending = pending + chunk + + if pending is not None: + yield pending + return + + except requests.exceptions.RequestException as e: + if attempt < max_tries: + print(start_byte) + print(f"Attempt {attempt}/{max_tries} failed: {e}. Retrying...") + else: + print(f"Max retries reached. Failed with error: {e}") + raise + @serializable() class BlobRetrievalByURL(BlobRetrieval): __canonical_name__ = "BlobRetrievalByURL" @@ -205,8 +248,8 @@ def _read_data(self, stream=False, chunk_size=512): response = requests.get(str(blob_url), stream=stream) # nosec response.raise_for_status() if self.type_ is BlobFileType: - if stream: - return response.iter_lines(chunk_size=chunk_size) + if stream: + return generate(blob_url, chunk_size) else: return response.content return deserialize(response.content, from_bytes=True) From c6d42be8377d4757a90ec9c6ae4a87249209995f Mon Sep 17 00:00:00 2001 From: teo Date: Fri, 15 Dec 2023 12:26:13 +0200 Subject: [PATCH 24/27] added mongo client cache cleaning --- packages/syft/src/syft/store/mongo_client.py | 2 ++ 1 file changed, 2 insertions(+) diff --git a/packages/syft/src/syft/store/mongo_client.py b/packages/syft/src/syft/store/mongo_client.py index b932a665bf3..cc77376b442 100644 --- a/packages/syft/src/syft/store/mongo_client.py +++ b/packages/syft/src/syft/store/mongo_client.py @@ -139,6 +139,7 @@ class MongoClient: client: PyMongoClient = None def __init__(self, config: MongoStoreClientConfig, cache: bool = True) -> None: + self.config=config if config.client is not None: self.client = config.client elif cache: @@ -236,3 +237,4 @@ def with_collection_permissions( def close(self): self.client.close() + MongoClientCache.__client_cache__.pop(hash(str(self.config)), None) From 76971887137db498e427df7e078e60a5b2e2432c Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Fri, 15 Dec 2023 10:50:04 +0000 Subject: [PATCH 25/27] cleanup --- packages/syft/src/syft/node/node.py | 10 +-- .../src/syft/store/blob_storage/__init__.py | 70 +++++++++---------- packages/syft/src/syft/store/mongo_client.py | 6 +- packages/syft/src/syft/types/blob_storage.py | 31 ++++++-- 4 files changed, 68 insertions(+), 49 deletions(-) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 2046e93eec2..7c9fb8760ef 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -738,10 +738,10 @@ def init_stores( # We add the python id of the current node in order # to create one connection per Node object in MongoClientCache - # so that we avoid closing the connection from a + # so that we avoid closing the connection from a # different thread through the garbage collection if isinstance(self.document_store_config, MongoStoreConfig): - self.document_store_config.client_config.node_obj_python_id = id(self) + self.document_store_config.client_config.node_obj_python_id = id(self) self.document_store = document_store( root_verify_key=self.verify_key, @@ -768,10 +768,10 @@ def init_stores( elif isinstance(action_store_config, MongoStoreConfig): # We add the python id of the current node in order # to create one connection per Node object in MongoClientCache - # so that we avoid closing the connection from a + # so that we avoid closing the connection from a # different thread through the garbage collection - action_store_config.client_config.node_obj_python_id = id(self) - + action_store_config.client_config.node_obj_python_id = id(self) + self.action_store = MongoActionStore( root_verify_key=self.verify_key, store_config=action_store_config ) diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index 41813a68c9b..36dbb1c4bb9 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -61,6 +61,7 @@ from ...types.blob_storage import BlobFileType from ...types.blob_storage import BlobStorageEntry from ...types.blob_storage import CreateBlobStorageEntry +from ...types.blob_storage import DEFAULT_CHUNK_SIZE from ...types.blob_storage import SecureFilePathLocation from ...types.grid_url import GridURL from ...types.syft_migration import migrate @@ -72,6 +73,9 @@ from ...types.transforms import make_set_default from ...types.uid import UID +DEFAULT_TIMEOUT = 10 +MAX_RETRIES = 20 + @serializable() class BlobRetrievalV1(SyftObject): @@ -169,48 +173,35 @@ class BlobRetrievalByURLV1(BlobRetrievalV1): url: GridURL -def generate(blob_url, chunk_size): - max_tries = 20 - pending = None - start_byte = 0 - for attempt in range(max_tries): +def syft_iter_content( + blob_url, chunk_size, max_retries=MAX_RETRIES, timeout=DEFAULT_TIMEOUT +): + """custom iter content with smart retries (start from last byte read)""" + current_byte = 0 + for attempt in range(max_retries): try: - headers = {'Range': f'bytes={start_byte}-'} - with requests.get(str(blob_url), stream=True, headers=headers, timeout=(10, 10)) as response: + headers = {"Range": f"bytes={current_byte}-"} + with requests.get( + str(blob_url), stream=True, headers=headers, timeout=(timeout, timeout) + ) as response: response.raise_for_status() for chunk in response.iter_content( chunk_size=chunk_size, decode_unicode=False ): - start_byte += len(chunk) - if b'\n' in chunk: - if pending is not None: - chunk = pending + chunk - - lines = chunk.splitlines() - - if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: - pending = lines.pop() - else: - pending = None - - yield from lines - else: - if pending is None: - pending = chunk - else: - pending = pending + chunk - - if pending is not None: - yield pending + current_byte += len(chunk) + yield chunk return - + except requests.exceptions.RequestException as e: - if attempt < max_tries: - print(start_byte) - print(f"Attempt {attempt}/{max_tries} failed: {e}. Retrying...") + if attempt < max_retries: + print( + f"Attempt {attempt}/{max_retries} failed: {e} at byte {current_byte}. Retrying..." + ) else: print(f"Max retries reached. Failed with error: {e}") raise + + class BlobRetrievalByURLV2(BlobRetrievalV1): __canonical_name__ = "BlobRetrievalByURL" __version__ = SYFT_OBJECT_VERSION_2 @@ -237,7 +228,7 @@ def read(self) -> Union[SyftObject, SyftError]: else: return self._read_data() - def _read_data(self, stream=False, chunk_size=512): + def _read_data(self, stream=False, chunk_size=DEFAULT_CHUNK_SIZE): # relative from ...client.api import APIRegistry @@ -252,14 +243,17 @@ def _read_data(self, stream=False, chunk_size=512): else: blob_url = self.url try: - response = requests.get(str(blob_url), stream=stream) # nosec - response.raise_for_status() if self.type_ is BlobFileType: - if stream: - return generate(blob_url, chunk_size) + if stream: + return syft_iter_content(blob_url, chunk_size) else: + response = requests.get(str(blob_url), stream=False) # nosec + response.raise_for_status() return response.content - return deserialize(response.content, from_bytes=True) + else: + response = requests.get(str(blob_url), stream=stream) # nosec + response.raise_for_status() + return deserialize(response.content, from_bytes=True) except requests.RequestException as e: return SyftError(message=f"Failed to retrieve with Error: {e}") diff --git a/packages/syft/src/syft/store/mongo_client.py b/packages/syft/src/syft/store/mongo_client.py index cc77376b442..cbbf1c5d4f0 100644 --- a/packages/syft/src/syft/store/mongo_client.py +++ b/packages/syft/src/syft/store/mongo_client.py @@ -120,6 +120,10 @@ class MongoStoreClientConfig(StoreClientConfig): # Testing and connection reuse client: Any = None + # this allows us to have one connection per `Node` object + # in the MongoClientCache + node_obj_python_id: Optional[int] = None + class MongoClientCache: __client_cache__: Dict[str, Type["MongoClient"]] = {} @@ -139,7 +143,7 @@ class MongoClient: client: PyMongoClient = None def __init__(self, config: MongoStoreClientConfig, cache: bool = True) -> None: - self.config=config + self.config = config if config.client is not None: self.client = config.client elif cache: diff --git a/packages/syft/src/syft/types/blob_storage.py b/packages/syft/src/syft/types/blob_storage.py index 5a077bd6da1..7f0438acf2e 100644 --- a/packages/syft/src/syft/types/blob_storage.py +++ b/packages/syft/src/syft/types/blob_storage.py @@ -42,6 +42,7 @@ from .uid import UID READ_EXPIRATION_TIME = 1800 # seconds +DEFAULT_CHUNK_SIZE = 10000 * 1024 @serializable() @@ -65,7 +66,7 @@ class BlobFile(SyftObject): __repr_attrs__ = ["id", "file_name"] - def read(self, stream=False, chunk_size=512, force=False): + def read(self, stream=False, chunk_size=DEFAULT_CHUNK_SIZE, force=False): # get blob retrieval object from api + syft_blob_storage_entry_id read_method = from_api_or_context( "blob_storage.read", self.syft_node_location, self.syft_client_verify_key @@ -80,9 +81,29 @@ def upload_from_path(self, path, client): return sy.ActionObject.from_path(path=path).send(client).syft_action_data - def _iter_lines(self, chunk_size=512): - """Synchronous version of the async iter_lines""" - return self.read(stream=True, chunk_size=chunk_size) + def _iter_lines(self, chunk_size=DEFAULT_CHUNK_SIZE): + """Synchronous version of the async iter_lines. This implementation + is also optimized in terms of splitting chunks, making it faster for + larger lines""" + pending = None + for chunk in self.read(stream=True, chunk_size=chunk_size): + if b"\n" in chunk: + if pending is not None: + chunk = pending + chunk + lines = chunk.splitlines() + if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: + pending = lines.pop() + else: + pending = None + yield from lines + else: + if pending is None: + pending = chunk + else: + pending = pending + chunk + + if pending is not None: + yield pending def read_queue(self, queue, chunk_size, progress=False, buffer_lines=10000): total_read = 0 @@ -103,7 +124,7 @@ def read_queue(self, queue, chunk_size, progress=False, buffer_lines=10000): # Put anything not a string at the end queue.put(0) - def iter_lines(self, chunk_size=512, progress=False): + def iter_lines(self, chunk_size=DEFAULT_CHUNK_SIZE, progress=False): item_queue: Queue = Queue() threading.Thread( target=self.read_queue, From a8eadf8fadbd2f25635c5525762788e4447befed Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Fri, 15 Dec 2023 11:07:02 +0000 Subject: [PATCH 26/27] update docker notebook --- notebooks/helm/docker-helm-syft.ipynb | 406 +++++++++++++------------- 1 file changed, 198 insertions(+), 208 deletions(-) diff --git a/notebooks/helm/docker-helm-syft.ipynb b/notebooks/helm/docker-helm-syft.ipynb index 3ce0d4b6365..376180837a6 100644 --- a/notebooks/helm/docker-helm-syft.ipynb +++ b/notebooks/helm/docker-helm-syft.ipynb @@ -41,7 +41,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Logged into as \n" + "Logged into as \n" ] }, { @@ -320,7 +320,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table67d3c3655c244210ac6987f00cc4b290 {\n", + " .grid-table64f7a5ffbda44d209ed23226aa1cea3b {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(8, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -492,25 +492,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -813,7 +813,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 10, "id": "d84a897e", "metadata": {}, "outputs": [ @@ -826,7 +826,7 @@ "SyftSuccess: 3 workers added" ] }, - "execution_count": 6, + "execution_count": 10, "metadata": {}, "output_type": "execute_result" } @@ -837,7 +837,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 11, "id": "4cea5229", "metadata": {}, "outputs": [ @@ -1047,7 +1047,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table161debd8746d4ca98e6a94ee5b196795 {\n", + " .grid-table6af016552d884357a2c0b0b918577dab {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(12, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -1219,25 +1219,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -1456,7 +1456,7 @@ " syft.service.worker.worker_service.DockerWorker]" ] }, - "execution_count": 7, + "execution_count": 11, "metadata": {}, "output_type": "execute_result" } @@ -1475,7 +1475,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 12, "id": "c7d90857", "metadata": {}, "outputs": [], @@ -1486,7 +1486,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 13, "id": "740b3cf1", "metadata": {}, "outputs": [], @@ -1497,7 +1497,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 14, "id": "f0da9c8a", "metadata": {}, "outputs": [], @@ -1521,7 +1521,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 15, "id": "4400f06f", "metadata": {}, "outputs": [ @@ -1549,7 +1549,7 @@ "output_type": "stream", "text": [ "\r", - " 0%| | 0/2 [00:00SyftSuccess: Dataset uploaded to 'determined_norvig'. To see the datasets uploaded by a client on this node, use command `[your_client].datasets`

" + "
SyftSuccess: Dataset uploaded to 'test'. To see the datasets uploaded by a client on this node, use command `[your_client].datasets`

" ], "text/plain": [ - "SyftSuccess: Dataset uploaded to 'determined_norvig'. To see the datasets uploaded by a client on this node, use command `[your_client].datasets`" + "SyftSuccess: Dataset uploaded to 'test'. To see the datasets uploaded by a client on this node, use command `[your_client].datasets`" ] }, - "execution_count": 11, + "execution_count": 15, "metadata": {}, "output_type": "execute_result" } @@ -1601,7 +1601,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 17, "id": "842988d1", "metadata": {}, "outputs": [ @@ -1634,7 +1634,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 18, "id": "aa3a5c31", "metadata": {}, "outputs": [ @@ -1758,7 +1758,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 19, "id": "2f23c7ae", "metadata": {}, "outputs": [ @@ -1771,7 +1771,7 @@ "SyftSuccess: User Code Submitted" ] }, - "execution_count": 14, + "execution_count": 19, "metadata": {}, "output_type": "execute_result" } @@ -1782,7 +1782,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 20, "id": "27be4dc4", "metadata": {}, "outputs": [ @@ -1819,7 +1819,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 21, "id": "82d92df1", "metadata": {}, "outputs": [ @@ -1832,12 +1832,12 @@ " \n", "
\n", "

Request

\n", - "

Id: e7f73ae2ab8e4f1eb0efb02d676e51c1

\n", - "

Request time: 2023-12-05 09:35:19

\n", + "

Id: 615fac3ed41c4c23be8ea9fff6016ace

\n", + "

Request time: 2023-12-15 11:03:04

\n", " \n", " \n", "

Status: RequestStatus.PENDING

\n", - "

Requested on: Determined_norvig of type Domain

\n", + "

Requested on: Test of type Domain

\n", "

Requested by: Jane Doe (info@openmined.org)

\n", "

Changes: Request to change main_function to permission RequestStatus.APPROVED. Nested Requests not resolved.

\n", "
\n", @@ -1847,12 +1847,12 @@ "text/markdown": [ "```python\n", "class Request:\n", - " id: str = e7f73ae2ab8e4f1eb0efb02d676e51c1\n", - " request_time: str = 2023-12-05 09:35:19\n", + " id: str = 615fac3ed41c4c23be8ea9fff6016ace\n", + " request_time: str = 2023-12-15 11:03:04\n", " updated_at: str = None\n", " status: str = RequestStatus.PENDING\n", " changes: str = ['Request to change main_function to permission RequestStatus.APPROVED. Nested Requests not resolved']\n", - " requesting_user_verify_key: str = 810847da4475205f7540b1823a72af4c85c327665bfdb463351fbbab2715a675\n", + " requesting_user_verify_key: str = ca43ec9f08916214cb9703333669fea5d473b1a2460f89fceaa161fc46600b17\n", "\n", "```" ], @@ -1860,7 +1860,7 @@ "syft.service.request.request.Request" ] }, - "execution_count": 16, + "execution_count": 21, "metadata": {}, "output_type": "execute_result" } @@ -1871,7 +1871,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 22, "id": "29ee2790", "metadata": {}, "outputs": [ @@ -1892,19 +1892,19 @@ "output_type": "stream", "text": [ "Would you like to proceed? [y/n]: y\n", - "Request approved for domain determined_norvig\n" + "Request approved for domain test\n" ] }, { "data": { "text/html": [ - "
SyftSuccess: Request e7f73ae2ab8e4f1eb0efb02d676e51c1 changes applied

" + "
SyftSuccess: Request 615fac3ed41c4c23be8ea9fff6016ace changes applied

" ], "text/plain": [ - "SyftSuccess: Request e7f73ae2ab8e4f1eb0efb02d676e51c1 changes applied" + "SyftSuccess: Request 615fac3ed41c4c23be8ea9fff6016ace changes applied" ] }, - "execution_count": 18, + "execution_count": 22, "metadata": {}, "output_type": "execute_result" } @@ -1915,7 +1915,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": 23, "id": "78b084c0", "metadata": {}, "outputs": [], @@ -1935,7 +1935,7 @@ }, { "cell_type": "code", - "execution_count": 21, + "execution_count": 24, "id": "55c3bee6", "metadata": {}, "outputs": [ @@ -1944,7 +1944,7 @@ "text/markdown": [ "```python\n", "class Job:\n", - " id: UID = 4d68a345f6f34892bc7d4d42ee1f004b\n", + " id: UID = b56adc9fdd1e48c4a275a11b96519723\n", " status: created\n", " has_parent: False\n", " result: ActionDataEmpty \n", @@ -1958,7 +1958,7 @@ "syft.service.job.job_stash.Job" ] }, - "execution_count": 21, + "execution_count": 24, "metadata": {}, "output_type": "execute_result" } @@ -1969,17 +1969,7 @@ }, { "cell_type": "code", - "execution_count": 22, - "id": "cf89cf33", - "metadata": {}, - "outputs": [], - "source": [ - "# job.subjobs" - ] - }, - { - "cell_type": "code", - "execution_count": 26, + "execution_count": 46, "id": "4d567f04", "metadata": {}, "outputs": [ @@ -2189,7 +2179,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table2becc9f73ad4415eab2f9c5e69d7fe16 {\n", + " .grid-tablef4adcf287e94440ab66813405ab8b5d0 {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(28, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -2361,25 +2351,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -2596,7 +2586,7 @@ "[syft.service.job.job_stash.Job]" ] }, - "execution_count": 26, + "execution_count": 46, "metadata": {}, "output_type": "execute_result" } From e96d6d232a6aa9fb1dbd33bd8b9c0816893bcc71 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Fri, 15 Dec 2023 11:32:30 +0000 Subject: [PATCH 27/27] update transforms --- .../src/syft/store/blob_storage/__init__.py | 19 ++++++++++++++++--- packages/syft/src/syft/types/transforms.py | 7 +++++++ 2 files changed, 23 insertions(+), 3 deletions(-) diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index 36dbb1c4bb9..7818830739c 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -71,6 +71,7 @@ from ...types.syft_object import SyftObject from ...types.transforms import drop from ...types.transforms import make_set_default +from ...types.transforms import str_url_to_grid_url from ...types.uid import UID DEFAULT_TIMEOUT = 10 @@ -109,7 +110,7 @@ def _read_data(self, **kwargs): @migrate(BlobRetrieval, BlobRetrievalV1) -def downgrade_blobretrival_v2_to_v1(): +def downgrade_blobretrieval_v2_to_v1(): return [ drop(["syft_blob_storage_entry_id", "file_size"]), ] @@ -258,14 +259,14 @@ def _read_data(self, stream=False, chunk_size=DEFAULT_CHUNK_SIZE): return SyftError(message=f"Failed to retrieve with Error: {e}") -@migrate(BlobRetrievalByURL, BlobRetrievalByURLV1) +@migrate(BlobRetrievalByURLV2, BlobRetrievalByURLV1) def downgrade_blobretrivalbyurl_v2_to_v1(): return [ drop(["syft_blob_storage_entry_id", "file_size"]), ] -@migrate(BlobRetrievalByURLV1, BlobRetrievalByURL) +@migrate(BlobRetrievalByURLV1, BlobRetrievalByURLV2) def upgrade_blobretrivalbyurl_v1_to_v2(): return [ make_set_default("syft_blob_storage_entry_id", None), @@ -273,6 +274,18 @@ def upgrade_blobretrivalbyurl_v1_to_v2(): ] +@migrate(BlobRetrievalByURL, BlobRetrievalByURLV2) +def downgrade_blobretrivalbyurl_v3_to_v2(): + return [ + str_url_to_grid_url, + ] + + +@migrate(BlobRetrievalByURLV2, BlobRetrievalByURL) +def upgrade_blobretrivalbyurl_v2_to_v3(): + return [] + + @serializable() class BlobDeposit(SyftObject): __canonical_name__ = "BlobDeposit" diff --git a/packages/syft/src/syft/types/transforms.py b/packages/syft/src/syft/types/transforms.py index 01011f0a518..d4d7a0c38ca 100644 --- a/packages/syft/src/syft/types/transforms.py +++ b/packages/syft/src/syft/types/transforms.py @@ -145,6 +145,13 @@ def validate_email(context: TransformContext) -> TransformContext: return context +def str_url_to_grid_url(context: TransformContext) -> TransformContext: + url = context.output.get("url", None) + if url is not None and isinstance(url, str): + context.output["url"] = GridURL.from_url(str) + return context + + def add_credentials_for_key(key: str) -> Callable: def add_credentials(context: TransformContext) -> TransformContext: context.output[key] = context.credentials