diff --git a/notebooks/helm/direct_azure.ipynb b/notebooks/helm/direct_azure.ipynb new file mode 100644 index 00000000000..0db87718008 --- /dev/null +++ b/notebooks/helm/direct_azure.ipynb @@ -0,0 +1,865 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": 40, + "metadata": {}, + "outputs": [], + "source": [ + "import syft as sy\n", + "import os" + ] + }, + { + "cell_type": "code", + "execution_count": 41, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "Logged into as \n" + ] + }, + { + "data": { + "text/html": [ + "
SyftWarning: You are using a default password. Please change the password using `[your_client].me.set_password([new_password])`.

" + ], + "text/plain": [ + "SyftWarning: You are using a default password. Please change the password using `[your_client].me.set_password([new_password])`." + ] + }, + "metadata": {}, + "output_type": "display_data" + } + ], + "source": [ + "# node = sy.orchestra.launch(name=\"test-domain-helm2\", dev_mode=True,\n", + "# reset=True,\n", + "# n_consumers=4,\n", + "# create_producer=True)\n", + "# client = node.login(email=\"info@openmined.org\", password=\"changethis\")\n", + "client = sy.login(url=\"http://localhost:8080\", email=\"info@openmined.org\", password=\"changethis\")" + ] + }, + { + "cell_type": "code", + "execution_count": 34, + "metadata": {}, + "outputs": [], + "source": [ + "\n", + "# from syft.store.blob_storage import BlobStorageConfig, BlobStorageClientConfig\n", + "# from syft.store.blob_storage.seaweedfs import SeaweedFSClient, SeaweedFSClientConfig\n", + "# blob_config = BlobStorageConfig(client_type=SeaweedFSClient,\n", + "# client_config=SeaweedFSClientConfig(host=\"http://0.0.0.0\",\n", + "# port=\"8333\",\n", + "# access_key=\"admin\",\n", + "# secret_key=\"admin\",\n", + "# bucket_name=\"test_bucket\",\n", + "# region=\"us-east-1\", \n", + "# # mount_port=4001\n", + "# )\n", + "# )\n", + "# node.python_node.init_blob_storage(blob_config)" + ] + }, + { + "cell_type": "code", + "execution_count": 35, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "
SyftSuccess: Mounting Azure Successful!

" + ], + "text/plain": [ + "SyftSuccess: Mounting Azure Successful!" + ] + }, + "execution_count": 35, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "client.api.services.blob_storage.mount_azure(\n", + " account_name='helmprojectstorage',\n", + " container_name='helm',\n", + " account_key=os.environ[\"HELM_STORAGE_ACCOUNT_KEY\"],\n", + " bucket_name='helmazurebucket',\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": 42, + "metadata": {}, + "outputs": [], + "source": [ + "files = client.api.services.blob_storage.get_files_from_bucket(\"helmazurebucket\")" + ] + }, + { + "cell_type": "code", + "execution_count": 43, + "metadata": {}, + "outputs": [ + { + "data": { + "text/html": [ + "\n", + "\n", + "\n", + "\n", + "
\n", + "
\n", + "
\n", + "

BlobFile List

\n", + "
\n", + "\n", + "
\n", + "
\n", + "
\n", + "
\n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + " \n", + "
\n", + "\n", + "

0

\n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + "
\n", + "
\n", + " \n", + " \n" + ], + "text/plain": [ + "[syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile,\n", + " syft.types.blob_storage.BlobFile]" + ] + }, + "execution_count": 43, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "files" + ] + }, + { + "cell_type": "code", + "execution_count": 44, + "metadata": {}, + "outputs": [], + "source": [ + "file = [f for f in files if f.file_name==\"test.json\"][0]" + ] + }, + { + "cell_type": "code", + "execution_count": 45, + "metadata": {}, + "outputs": [ + { + "data": { + "text/markdown": [ + "```python\n", + "class BlobFile:\n", + " id: str = cb2ef738082c49418ed70eb05a193770\n", + " file_name: str = \"test.json\"\n", + "\n", + "```" + ], + "text/plain": [ + "syft.types.blob_storage.BlobFile" + ] + }, + "execution_count": 45, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "file" + ] + }, + { + "cell_type": "code", + "execution_count": 46, + "metadata": {}, + "outputs": [ + { + "data": { + "text/plain": [ + "b'{\\n\"abc\": \"def\"\\n}'" + ] + }, + "execution_count": 46, + "metadata": {}, + "output_type": "execute_result" + } + ], + "source": [ + "file.read()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.9.16" + }, + "toc": { + "base_numbering": 1, + "nav_menu": {}, + "number_sections": true, + "sideBar": true, + "skip_h1_title": false, + "title_cell": "Table of Contents", + "title_sidebar": "Contents", + "toc_cell": false, + "toc_position": {}, + "toc_section_display": true, + "toc_window_display": true + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/notebooks/helm/docker-helm-syft.ipynb b/notebooks/helm/docker-helm-syft.ipynb index 3ce0d4b6365..376180837a6 100644 --- a/notebooks/helm/docker-helm-syft.ipynb +++ b/notebooks/helm/docker-helm-syft.ipynb @@ -41,7 +41,7 @@ "name": "stdout", "output_type": "stream", "text": [ - "Logged into as \n" + "Logged into as \n" ] }, { @@ -320,7 +320,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table67d3c3655c244210ac6987f00cc4b290 {\n", + " .grid-table64f7a5ffbda44d209ed23226aa1cea3b {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(8, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -492,25 +492,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -813,7 +813,7 @@ }, { "cell_type": "code", - "execution_count": 6, + "execution_count": 10, "id": "d84a897e", "metadata": {}, "outputs": [ @@ -826,7 +826,7 @@ "SyftSuccess: 3 workers added" ] }, - "execution_count": 6, + "execution_count": 10, "metadata": {}, "output_type": "execute_result" } @@ -837,7 +837,7 @@ }, { "cell_type": "code", - "execution_count": 7, + "execution_count": 11, "id": "4cea5229", "metadata": {}, "outputs": [ @@ -1047,7 +1047,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table161debd8746d4ca98e6a94ee5b196795 {\n", + " .grid-table6af016552d884357a2c0b0b918577dab {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(12, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -1219,25 +1219,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -1456,7 +1456,7 @@ " syft.service.worker.worker_service.DockerWorker]" ] }, - "execution_count": 7, + "execution_count": 11, "metadata": {}, "output_type": "execute_result" } @@ -1475,7 +1475,7 @@ }, { "cell_type": "code", - "execution_count": 8, + "execution_count": 12, "id": "c7d90857", "metadata": {}, "outputs": [], @@ -1486,7 +1486,7 @@ }, { "cell_type": "code", - "execution_count": 9, + "execution_count": 13, "id": "740b3cf1", "metadata": {}, "outputs": [], @@ -1497,7 +1497,7 @@ }, { "cell_type": "code", - "execution_count": 10, + "execution_count": 14, "id": "f0da9c8a", "metadata": {}, "outputs": [], @@ -1521,7 +1521,7 @@ }, { "cell_type": "code", - "execution_count": 11, + "execution_count": 15, "id": "4400f06f", "metadata": {}, "outputs": [ @@ -1549,7 +1549,7 @@ "output_type": "stream", "text": [ "\r", - " 0%| | 0/2 [00:00SyftSuccess: Dataset uploaded to 'determined_norvig'. To see the datasets uploaded by a client on this node, use command `[your_client].datasets`

" + "
SyftSuccess: Dataset uploaded to 'test'. To see the datasets uploaded by a client on this node, use command `[your_client].datasets`

" ], "text/plain": [ - "SyftSuccess: Dataset uploaded to 'determined_norvig'. To see the datasets uploaded by a client on this node, use command `[your_client].datasets`" + "SyftSuccess: Dataset uploaded to 'test'. To see the datasets uploaded by a client on this node, use command `[your_client].datasets`" ] }, - "execution_count": 11, + "execution_count": 15, "metadata": {}, "output_type": "execute_result" } @@ -1601,7 +1601,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 17, "id": "842988d1", "metadata": {}, "outputs": [ @@ -1634,7 +1634,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 18, "id": "aa3a5c31", "metadata": {}, "outputs": [ @@ -1758,7 +1758,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 19, "id": "2f23c7ae", "metadata": {}, "outputs": [ @@ -1771,7 +1771,7 @@ "SyftSuccess: User Code Submitted" ] }, - "execution_count": 14, + "execution_count": 19, "metadata": {}, "output_type": "execute_result" } @@ -1782,7 +1782,7 @@ }, { "cell_type": "code", - "execution_count": 15, + "execution_count": 20, "id": "27be4dc4", "metadata": {}, "outputs": [ @@ -1819,7 +1819,7 @@ }, { "cell_type": "code", - "execution_count": 16, + "execution_count": 21, "id": "82d92df1", "metadata": {}, "outputs": [ @@ -1832,12 +1832,12 @@ " \n", "
\n", "

Request

\n", - "

Id: e7f73ae2ab8e4f1eb0efb02d676e51c1

\n", - "

Request time: 2023-12-05 09:35:19

\n", + "

Id: 615fac3ed41c4c23be8ea9fff6016ace

\n", + "

Request time: 2023-12-15 11:03:04

\n", " \n", " \n", "

Status: RequestStatus.PENDING

\n", - "

Requested on: Determined_norvig of type Domain

\n", + "

Requested on: Test of type Domain

\n", "

Requested by: Jane Doe (info@openmined.org)

\n", "

Changes: Request to change main_function to permission RequestStatus.APPROVED. Nested Requests not resolved.

\n", "
\n", @@ -1847,12 +1847,12 @@ "text/markdown": [ "```python\n", "class Request:\n", - " id: str = e7f73ae2ab8e4f1eb0efb02d676e51c1\n", - " request_time: str = 2023-12-05 09:35:19\n", + " id: str = 615fac3ed41c4c23be8ea9fff6016ace\n", + " request_time: str = 2023-12-15 11:03:04\n", " updated_at: str = None\n", " status: str = RequestStatus.PENDING\n", " changes: str = ['Request to change main_function to permission RequestStatus.APPROVED. Nested Requests not resolved']\n", - " requesting_user_verify_key: str = 810847da4475205f7540b1823a72af4c85c327665bfdb463351fbbab2715a675\n", + " requesting_user_verify_key: str = ca43ec9f08916214cb9703333669fea5d473b1a2460f89fceaa161fc46600b17\n", "\n", "```" ], @@ -1860,7 +1860,7 @@ "syft.service.request.request.Request" ] }, - "execution_count": 16, + "execution_count": 21, "metadata": {}, "output_type": "execute_result" } @@ -1871,7 +1871,7 @@ }, { "cell_type": "code", - "execution_count": 18, + "execution_count": 22, "id": "29ee2790", "metadata": {}, "outputs": [ @@ -1892,19 +1892,19 @@ "output_type": "stream", "text": [ "Would you like to proceed? [y/n]: y\n", - "Request approved for domain determined_norvig\n" + "Request approved for domain test\n" ] }, { "data": { "text/html": [ - "
SyftSuccess: Request e7f73ae2ab8e4f1eb0efb02d676e51c1 changes applied

" + "
SyftSuccess: Request 615fac3ed41c4c23be8ea9fff6016ace changes applied

" ], "text/plain": [ - "SyftSuccess: Request e7f73ae2ab8e4f1eb0efb02d676e51c1 changes applied" + "SyftSuccess: Request 615fac3ed41c4c23be8ea9fff6016ace changes applied" ] }, - "execution_count": 18, + "execution_count": 22, "metadata": {}, "output_type": "execute_result" } @@ -1915,7 +1915,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": 23, "id": "78b084c0", "metadata": {}, "outputs": [], @@ -1935,7 +1935,7 @@ }, { "cell_type": "code", - "execution_count": 21, + "execution_count": 24, "id": "55c3bee6", "metadata": {}, "outputs": [ @@ -1944,7 +1944,7 @@ "text/markdown": [ "```python\n", "class Job:\n", - " id: UID = 4d68a345f6f34892bc7d4d42ee1f004b\n", + " id: UID = b56adc9fdd1e48c4a275a11b96519723\n", " status: created\n", " has_parent: False\n", " result: ActionDataEmpty \n", @@ -1958,7 +1958,7 @@ "syft.service.job.job_stash.Job" ] }, - "execution_count": 21, + "execution_count": 24, "metadata": {}, "output_type": "execute_result" } @@ -1969,17 +1969,7 @@ }, { "cell_type": "code", - "execution_count": 22, - "id": "cf89cf33", - "metadata": {}, - "outputs": [], - "source": [ - "# job.subjobs" - ] - }, - { - "cell_type": "code", - "execution_count": 26, + "execution_count": 46, "id": "4d567f04", "metadata": {}, "outputs": [ @@ -2189,7 +2179,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table2becc9f73ad4415eab2f9c5e69d7fe16 {\n", + " .grid-tablef4adcf287e94440ab66813405ab8b5d0 {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(28, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -2361,25 +2351,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -2596,7 +2586,7 @@ "[syft.service.job.job_stash.Job]" ] }, - "execution_count": 26, + "execution_count": 46, "metadata": {}, "output_type": "execute_result" } diff --git a/packages/grid/seaweedfs/mount_command.sh b/packages/grid/seaweedfs/mount_command.sh index f9c813ef8d5..1360cbddf62 100644 --- a/packages/grid/seaweedfs/mount_command.sh +++ b/packages/grid/seaweedfs/mount_command.sh @@ -1,3 +1,8 @@ +# 1 = remote config name +# 2 = azure account name +# 3 = seaweedfs bucket name +# 4 = azure container name +# 5 = azure key echo "remote.configure -name=$1 -type=azure -azure.account_name=$2 -azure.account_key=$5" | weed shell && \ echo "s3.bucket.create -name=$3" | weed shell && \ echo "remote.mount -dir=/buckets/$3 -remote=$1/$4" | weed shell && \ diff --git a/packages/syft/setup.cfg b/packages/syft/setup.cfg index 10ebcfea4d9..2f4143b62d8 100644 --- a/packages/syft/setup.cfg +++ b/packages/syft/setup.cfg @@ -63,7 +63,7 @@ syft = pandas==1.5.3 docker==6.1.3 PyYAML==6.0.1 - + azure-storage-blob==12.19 install_requires = %(syft)s diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 095d2849ffd..7c9fb8760ef 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -380,6 +380,19 @@ def init_blob_storage(self, config: Optional[BlobStorageConfig] = None) -> None: self.blob_store_config = config_ self.blob_storage_client = config_.client_type(config=config_.client_config) + # relative + from ..store.blob_storage.seaweedfs import SeaweedFSConfig + + if isinstance(config, SeaweedFSConfig): + blob_storage_service = self.get_service(BlobStorageService) + remote_profiles = blob_storage_service.remote_profile_stash.get_all( + credentials=self.signing_key.verify_key, has_permission=True + ).ok() + for remote_profile in remote_profiles: + self.blob_store_config.client_config.remote_profiles[ + remote_profile.profile_name + ] = remote_profile + def stop(self): for consumer_list in self.queue_manager.consumers.values(): for c in consumer_list: @@ -723,6 +736,13 @@ def init_stores( document_store = document_store_config.store_type self.document_store_config = document_store_config + # We add the python id of the current node in order + # to create one connection per Node object in MongoClientCache + # so that we avoid closing the connection from a + # different thread through the garbage collection + if isinstance(self.document_store_config, MongoStoreConfig): + self.document_store_config.client_config.node_obj_python_id = id(self) + self.document_store = document_store( root_verify_key=self.verify_key, store_config=document_store_config, @@ -746,6 +766,12 @@ def init_stores( root_verify_key=self.verify_key, ) elif isinstance(action_store_config, MongoStoreConfig): + # We add the python id of the current node in order + # to create one connection per Node object in MongoClientCache + # so that we avoid closing the connection from a + # different thread through the garbage collection + action_store_config.client_config.node_obj_python_id = id(self) + self.action_store = MongoActionStore( root_verify_key=self.verify_key, store_config=action_store_config ) diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index c0e34a775d1..a84c764c70f 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -981,6 +981,46 @@ "hash": "fd1067b0bb9a6e630a224162ed92f4746d4d5869bc104923ec48c4b9d597594c", "action": "add" } + }, + "SeaweedSecureFilePathLocation": { + "2": { + "version": 2, + "hash": "3ca49db7536a33d5712485164e95406000df9af2aed78e9f9fa2bb2bbbb34fe6", + "action": "add" + } + }, + "AzureSecureFilePathLocation": { + "1": { + "version": 1, + "hash": "1bb15f3f9d7082779f1c9f58de94011487924cb8a8c9c2ec18fd7c161c27fd0e", + "action": "add" + } + }, + "RemoteConfig": { + "1": { + "version": 1, + "hash": "ad7bc4780a8ad52e14ce68601852c93d2fe07bda489809cad7cae786d2461754", + "action": "add" + } + }, + "AzureRemoteConfig": { + "1": { + "version": 1, + "hash": "c05c6caa27db4e385c642536d4b0ecabc0c71e91220d2e6ce21a2761ca68a673", + "action": "add" + } + }, + "BlobRetrievalByURL": { + "2": { + "version": 2, + "hash": "8059ee03016c4d74e408dad9529e877f91829672e0cc42d8cfff9c8e14058adc", + "action": "remove" + }, + "3": { + "version": 3, + "hash": "0b664100ea08413ca4ef04665ca910c2cf9535539617ea4ba33687d05cdfe747", + "action": "add" + } } } } diff --git a/packages/syft/src/syft/service/blob_storage/remote_profile.py b/packages/syft/src/syft/service/blob_storage/remote_profile.py new file mode 100644 index 00000000000..8bd92bc9f91 --- /dev/null +++ b/packages/syft/src/syft/service/blob_storage/remote_profile.py @@ -0,0 +1,35 @@ +# relative +from ...serde.serializable import serializable +from ...store.document_store import BaseUIDStoreStash +from ...store.document_store import DocumentStore +from ...store.document_store import PartitionSettings +from ...types.syft_object import SYFT_OBJECT_VERSION_1 +from ...types.syft_object import SyftObject + + +@serializable() +class RemoteProfile(SyftObject): + __canonical_name__ = "RemoteConfig" + __version__ = SYFT_OBJECT_VERSION_1 + + +@serializable() +class AzureRemoteProfile(RemoteProfile): + __canonical_name__ = "AzureRemoteConfig" + __version__ = SYFT_OBJECT_VERSION_1 + + profile_name: str # used by seaweedfs + account_name: str + account_key: str + container_name: str + + +@serializable() +class RemoteProfileStash(BaseUIDStoreStash): + object_type = RemoteProfile + settings: PartitionSettings = PartitionSettings( + name=RemoteProfile.__canonical_name__, object_type=RemoteProfile + ) + + def __init__(self, store: DocumentStore) -> None: + super().__init__(store=store) diff --git a/packages/syft/src/syft/service/blob_storage/service.py b/packages/syft/src/syft/service/blob_storage/service.py index 250998c314f..f485c2e67bf 100644 --- a/packages/syft/src/syft/service/blob_storage/service.py +++ b/packages/syft/src/syft/service/blob_storage/service.py @@ -15,11 +15,11 @@ from ...store.blob_storage.seaweedfs import SeaweedFSBlobDeposit from ...store.document_store import DocumentStore from ...store.document_store import UIDPartitionKey +from ...types.blob_storage import AzureSecureFilePathLocation from ...types.blob_storage import BlobFileType from ...types.blob_storage import BlobStorageEntry from ...types.blob_storage import BlobStorageMetadata from ...types.blob_storage import CreateBlobStorageEntry -from ...types.blob_storage import SecureFilePathLocation from ...types.uid import UID from ..context import AuthedServiceContext from ..response import SyftError @@ -28,6 +28,8 @@ from ..service import TYPE_TO_SERVICE from ..service import service_method from ..user.user_roles import GUEST_ROLE_LEVEL +from .remote_profile import AzureRemoteProfile +from .remote_profile import RemoteProfileStash from .stash import BlobStorageStash BlobDepositType = Union[OnDiskBlobDeposit, SeaweedFSBlobDeposit] @@ -37,10 +39,12 @@ class BlobStorageService(AbstractService): store: DocumentStore stash: BlobStorageStash + remote_profile_stash: RemoteProfileStash def __init__(self, store: DocumentStore) -> None: self.store = store self.stash = BlobStorageStash(store=store) + self.remote_profile_stash = RemoteProfileStash(store=store) @service_method(path="blob_storage.get_all", name="get_all") def get_all_blob_storage_entries( @@ -61,34 +65,53 @@ def mount_azure( bucket_name: str, ): # stdlib - import sys # TODO: fix arguments + remote_name = f"{account_name}{container_name}" + remote_name = "".join(ch for ch in remote_name if ch.isalnum()) args_dict = { "account_name": account_name, "account_key": account_key, "container_name": container_name, - "remote_name": f"{account_name}{container_name}", + "remote_name": remote_name, "bucket_name": bucket_name, } + + new_profile = AzureRemoteProfile( + profile_name=remote_name, + account_name=account_name, + account_key=account_key, + container_name=container_name, + ) + res = self.remote_profile_stash.set(context.credentials, new_profile) + if res.is_err(): + return SyftError(message=res.value) + remote_profile = res.ok() + seaweed_config = context.node.blob_storage_client.config + # we cache this here such that we can use it when reading a file from azure + # from the remote_name + seaweed_config.remote_profiles[remote_name] = remote_profile + # TODO: possible wrap this in try catch cfg = context.node.blob_store_config.client_config init_request = requests.post(url=cfg.mount_url, json=args_dict) # nosec print(init_request.content) # TODO check return code - - print(bucket_name, file=sys.stderr) - res = context.node.blob_storage_client.connect().client.list_objects( Bucket=bucket_name ) - print(res) + # stdlib objects = res["Contents"] file_sizes = [object["Size"] for object in objects] file_paths = [object["Key"] for object in objects] secure_file_paths = [ - SecureFilePathLocation(path=file_path) for file_path in file_paths + AzureSecureFilePathLocation( + path=file_path, + azure_profile_name=remote_name, + bucket_name=bucket_name, + ) + for file_path in file_paths ] for sfp, file_size in zip(secure_file_paths, file_sizes): @@ -112,12 +135,17 @@ def get_files_from_bucket(self, context: AuthedServiceContext, bucket_name: str) return result bse_list = result.ok() # stdlib - import sys - print(bse_list, file=sys.stderr) blob_files = [] for bse in bse_list: self.stash.set(obj=bse, credentials=context.credentials) + # We create an empty ActionObject and set its blob_storage_entry_id to bse.id + # such that we can call reload_cache which creates + # the BlobRetrieval (user needs permission to do this) + # This could be a BlobRetrievalByURL that creates a BlobFile + # and then sets it in the cache (it does not contain the data, only the BlobFile). + # In the client, when reading the file, we will creates **another**, blobretrieval + # object to read the actual data blob_file = ActionObject.empty() blob_file.syft_blob_storage_entry_id = bse.id blob_file.syft_client_verify_key = context.credentials @@ -146,6 +174,7 @@ def get_blob_storage_metadata_by_uid( return blob_storage_entry.to(BlobStorageMetadata) return SyftError(message=result.err()) + # TODO: replace name with `create_blob_retrieval` @service_method( path="blob_storage.read", name="read", diff --git a/packages/syft/src/syft/service/queue/queue.py b/packages/syft/src/syft/service/queue/queue.py index e02a3d185d3..e178493aad6 100644 --- a/packages/syft/src/syft/service/queue/queue.py +++ b/packages/syft/src/syft/service/queue/queue.py @@ -235,7 +235,10 @@ def handle_message(message: bytes): credentials = queue_item.syft_client_verify_key - job_item = worker.job_stash.get_by_uid(credentials, queue_item.job_id).ok() + res = worker.job_stash.get_by_uid(credentials, queue_item.job_id) + if res.is_err(): + raise Exception(res.value) + job_item = res.ok() queue_item.status = Status.PROCESSING queue_item.node_uid = worker.id diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index 48699df9db1..7818830739c 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -61,16 +61,22 @@ from ...types.blob_storage import BlobFileType from ...types.blob_storage import BlobStorageEntry from ...types.blob_storage import CreateBlobStorageEntry +from ...types.blob_storage import DEFAULT_CHUNK_SIZE from ...types.blob_storage import SecureFilePathLocation from ...types.grid_url import GridURL from ...types.syft_migration import migrate from ...types.syft_object import SYFT_OBJECT_VERSION_1 from ...types.syft_object import SYFT_OBJECT_VERSION_2 +from ...types.syft_object import SYFT_OBJECT_VERSION_3 from ...types.syft_object import SyftObject from ...types.transforms import drop from ...types.transforms import make_set_default +from ...types.transforms import str_url_to_grid_url from ...types.uid import UID +DEFAULT_TIMEOUT = 10 +MAX_RETRIES = 20 + @serializable() class BlobRetrievalV1(SyftObject): @@ -104,7 +110,7 @@ def _read_data(self, **kwargs): @migrate(BlobRetrieval, BlobRetrievalV1) -def downgrade_blobretrival_v2_to_v1(): +def downgrade_blobretrieval_v2_to_v1(): return [ drop(["syft_blob_storage_entry_id", "file_size"]), ] @@ -168,13 +174,49 @@ class BlobRetrievalByURLV1(BlobRetrievalV1): url: GridURL -@serializable() -class BlobRetrievalByURL(BlobRetrieval): +def syft_iter_content( + blob_url, chunk_size, max_retries=MAX_RETRIES, timeout=DEFAULT_TIMEOUT +): + """custom iter content with smart retries (start from last byte read)""" + current_byte = 0 + for attempt in range(max_retries): + try: + headers = {"Range": f"bytes={current_byte}-"} + with requests.get( + str(blob_url), stream=True, headers=headers, timeout=(timeout, timeout) + ) as response: + response.raise_for_status() + for chunk in response.iter_content( + chunk_size=chunk_size, decode_unicode=False + ): + current_byte += len(chunk) + yield chunk + return + + except requests.exceptions.RequestException as e: + if attempt < max_retries: + print( + f"Attempt {attempt}/{max_retries} failed: {e} at byte {current_byte}. Retrying..." + ) + else: + print(f"Max retries reached. Failed with error: {e}") + raise + + +class BlobRetrievalByURLV2(BlobRetrievalV1): __canonical_name__ = "BlobRetrievalByURL" __version__ = SYFT_OBJECT_VERSION_2 url: GridURL + +@serializable() +class BlobRetrievalByURL(BlobRetrieval): + __canonical_name__ = "BlobRetrievalByURL" + __version__ = SYFT_OBJECT_VERSION_3 + + url: Union[GridURL, str] + def read(self) -> Union[SyftObject, SyftError]: if self.type_ is BlobFileType: return BlobFile( @@ -187,7 +229,7 @@ def read(self) -> Union[SyftObject, SyftError]: else: return self._read_data() - def _read_data(self, stream=False, chunk_size=512): + def _read_data(self, stream=False, chunk_size=DEFAULT_CHUNK_SIZE): # relative from ...client.api import APIRegistry @@ -195,33 +237,36 @@ def _read_data(self, stream=False, chunk_size=512): node_uid=self.syft_node_location, user_verify_key=self.syft_client_verify_key, ) - if api is not None: + if api is not None and isinstance(self.url, GridURL): blob_url = api.connection.to_blob_route( self.url.url_path, host=self.url.host_or_ip ) else: blob_url = self.url try: - response = requests.get(str(blob_url), stream=stream) # nosec - response.raise_for_status() if self.type_ is BlobFileType: if stream: - return response.iter_lines(chunk_size=chunk_size) + return syft_iter_content(blob_url, chunk_size) else: + response = requests.get(str(blob_url), stream=False) # nosec + response.raise_for_status() return response.content - return deserialize(response.content, from_bytes=True) + else: + response = requests.get(str(blob_url), stream=stream) # nosec + response.raise_for_status() + return deserialize(response.content, from_bytes=True) except requests.RequestException as e: return SyftError(message=f"Failed to retrieve with Error: {e}") -@migrate(BlobRetrievalByURL, BlobRetrievalByURLV1) +@migrate(BlobRetrievalByURLV2, BlobRetrievalByURLV1) def downgrade_blobretrivalbyurl_v2_to_v1(): return [ drop(["syft_blob_storage_entry_id", "file_size"]), ] -@migrate(BlobRetrievalByURLV1, BlobRetrievalByURL) +@migrate(BlobRetrievalByURLV1, BlobRetrievalByURLV2) def upgrade_blobretrivalbyurl_v1_to_v2(): return [ make_set_default("syft_blob_storage_entry_id", None), @@ -229,6 +274,18 @@ def upgrade_blobretrivalbyurl_v1_to_v2(): ] +@migrate(BlobRetrievalByURL, BlobRetrievalByURLV2) +def downgrade_blobretrivalbyurl_v3_to_v2(): + return [ + str_url_to_grid_url, + ] + + +@migrate(BlobRetrievalByURLV2, BlobRetrievalByURL) +def upgrade_blobretrivalbyurl_v2_to_v3(): + return [] + + @serializable() class BlobDeposit(SyftObject): __canonical_name__ = "BlobDeposit" diff --git a/packages/syft/src/syft/store/blob_storage/seaweedfs.py b/packages/syft/src/syft/store/blob_storage/seaweedfs.py index c1314663d46..2a27fc2518a 100644 --- a/packages/syft/src/syft/store/blob_storage/seaweedfs.py +++ b/packages/syft/src/syft/store/blob_storage/seaweedfs.py @@ -1,7 +1,7 @@ # stdlib from io import BytesIO import math -from pathlib import Path +from typing import Dict from typing import Generator from typing import List from typing import Optional @@ -19,14 +19,13 @@ # relative from . import BlobDeposit from . import BlobRetrieval -from . import BlobRetrievalByURL from . import BlobStorageClient from . import BlobStorageClientConfig from . import BlobStorageConfig from . import BlobStorageConnection from ...serde.serializable import serializable +from ...service.blob_storage.remote_profile import AzureRemoteProfile from ...service.response import SyftError -from ...service.response import SyftException from ...service.response import SyftSuccess from ...service.service import from_api_or_context from ...types.blob_storage import BlobStorageEntry @@ -37,7 +36,6 @@ from ...types.syft_object import SYFT_OBJECT_VERSION_1 from ...util.constants import DEFAULT_TIMEOUT -READ_EXPIRATION_TIME = 1800 # seconds WRITE_EXPIRATION_TIME = 900 # seconds DEFAULT_CHUNK_SIZE = 1024**3 # 1 GB @@ -109,6 +107,7 @@ class SeaweedFSClientConfig(BlobStorageClientConfig): secret_key: str region: str default_bucket_name: str = "defaultbucket" + remote_profiles: Dict[str, AzureRemoteProfile] = {} @property def endpoint_url(self) -> str: @@ -137,6 +136,7 @@ def connect(self) -> BlobStorageConnection: region_name=self.config.region, ), default_bucket_name=self.config.default_bucket_name, + config=self.config, ) @@ -144,10 +144,17 @@ def connect(self) -> BlobStorageConnection: class SeaweedFSConnection(BlobStorageConnection): client: S3BaseClient default_bucket_name: str + config: SeaweedFSClientConfig - def __init__(self, client: S3BaseClient, default_bucket_name: str): + def __init__( + self, + client: S3BaseClient, + default_bucket_name: str, + config: SeaweedFSClientConfig, + ): self.client = client self.default_bucket_name = default_bucket_name + self.config = config def __enter__(self) -> Self: return self @@ -160,18 +167,9 @@ def read( ) -> BlobRetrieval: if bucket_name is None: bucket_name = self.default_bucket_name - try: - url = self.client.generate_presigned_url( - ClientMethod="get_object", - Params={"Bucket": bucket_name, "Key": fp.path}, - ExpiresIn=READ_EXPIRATION_TIME, - ) - - return BlobRetrievalByURL( - url=GridURL.from_url(url), file_name=Path(fp.path).name, type_=type_ - ) - except BotoClientError as e: - raise SyftException(e) + # this will generate the url, the SecureFilePathLocation also handles the logic + # that decides whether to use a direct connection to azure/aws/gcp or via seaweed + return fp.generate_url(self, type_, bucket_name) def allocate( self, obj: CreateBlobStorageEntry diff --git a/packages/syft/src/syft/store/mongo_client.py b/packages/syft/src/syft/store/mongo_client.py index b932a665bf3..cbbf1c5d4f0 100644 --- a/packages/syft/src/syft/store/mongo_client.py +++ b/packages/syft/src/syft/store/mongo_client.py @@ -120,6 +120,10 @@ class MongoStoreClientConfig(StoreClientConfig): # Testing and connection reuse client: Any = None + # this allows us to have one connection per `Node` object + # in the MongoClientCache + node_obj_python_id: Optional[int] = None + class MongoClientCache: __client_cache__: Dict[str, Type["MongoClient"]] = {} @@ -139,6 +143,7 @@ class MongoClient: client: PyMongoClient = None def __init__(self, config: MongoStoreClientConfig, cache: bool = True) -> None: + self.config = config if config.client is not None: self.client = config.client elif cache: @@ -236,3 +241,4 @@ def with_collection_permissions( def close(self): self.client.close() + MongoClientCache.__client_cache__.pop(hash(str(self.config)), None) diff --git a/packages/syft/src/syft/store/sqlite_document_store.py b/packages/syft/src/syft/store/sqlite_document_store.py index 2ec313a1a8a..e128a63bc91 100644 --- a/packages/syft/src/syft/store/sqlite_document_store.py +++ b/packages/syft/src/syft/store/sqlite_document_store.py @@ -207,7 +207,7 @@ def _set(self, key: UID, value: Any) -> None: else: insert_sql = ( f"insert into {self.table_name} (uid, repr, value) VALUES (?, ?, ?)" # nosec - ) + ) # nosec data = _serialize(value, to_bytes=True) res = self._execute(insert_sql, [str(key), _repr_debug_(value), data]) if res.is_err(): @@ -216,7 +216,7 @@ def _set(self, key: UID, value: Any) -> None: def _update(self, key: UID, value: Any) -> None: insert_sql = ( f"update {self.table_name} set uid = ?, repr = ?, value = ? where uid = ?" # nosec - ) + ) # nosec data = _serialize(value, to_bytes=True) res = self._execute(insert_sql, [str(key), _repr_debug_(value), data, str(key)]) if res.is_err(): diff --git a/packages/syft/src/syft/types/blob_storage.py b/packages/syft/src/syft/types/blob_storage.py index d695bfd86d6..7f0438acf2e 100644 --- a/packages/syft/src/syft/types/blob_storage.py +++ b/packages/syft/src/syft/types/blob_storage.py @@ -1,4 +1,6 @@ # stdlib +from datetime import datetime +from datetime import timedelta import mimetypes from pathlib import Path from queue import Queue @@ -13,6 +15,9 @@ from typing import Union # third party +from azure.storage.blob import BlobSasPermissions +from azure.storage.blob import generate_blob_sas +from botocore.client import ClientError as BotoClientError from typing_extensions import Self # relative @@ -24,6 +29,7 @@ from ..service.action.action_types import action_types from ..service.response import SyftException from ..service.service import from_api_or_context +from ..types.grid_url import GridURL from ..types.transforms import drop from ..types.transforms import keep from ..types.transforms import make_set_default @@ -35,6 +41,9 @@ from .syft_object import SyftObject from .uid import UID +READ_EXPIRATION_TIME = 1800 # seconds +DEFAULT_CHUNK_SIZE = 10000 * 1024 + @serializable() class BlobFileV1(SyftObject): @@ -57,7 +66,7 @@ class BlobFile(SyftObject): __repr_attrs__ = ["id", "file_name"] - def read(self, stream=False, chunk_size=512, force=False): + def read(self, stream=False, chunk_size=DEFAULT_CHUNK_SIZE, force=False): # get blob retrieval object from api + syft_blob_storage_entry_id read_method = from_api_or_context( "blob_storage.read", self.syft_node_location, self.syft_client_verify_key @@ -72,9 +81,29 @@ def upload_from_path(self, path, client): return sy.ActionObject.from_path(path=path).send(client).syft_action_data - def _iter_lines(self, chunk_size=512): - """Synchronous version of the async iter_lines""" - return self.read(stream=True, chunk_size=chunk_size) + def _iter_lines(self, chunk_size=DEFAULT_CHUNK_SIZE): + """Synchronous version of the async iter_lines. This implementation + is also optimized in terms of splitting chunks, making it faster for + larger lines""" + pending = None + for chunk in self.read(stream=True, chunk_size=chunk_size): + if b"\n" in chunk: + if pending is not None: + chunk = pending + chunk + lines = chunk.splitlines() + if lines and lines[-1] and chunk and lines[-1][-1] == chunk[-1]: + pending = lines.pop() + else: + pending = None + yield from lines + else: + if pending is None: + pending = chunk + else: + pending = pending + chunk + + if pending is not None: + yield pending def read_queue(self, queue, chunk_size, progress=False, buffer_lines=10000): total_read = 0 @@ -95,7 +124,7 @@ def read_queue(self, queue, chunk_size, progress=False, buffer_lines=10000): # Put anything not a string at the end queue.put(0) - def iter_lines(self, chunk_size=512, progress=False): + def iter_lines(self, chunk_size=DEFAULT_CHUNK_SIZE, progress=False): item_queue: Queue = Queue() threading.Thread( target=self.read_queue, @@ -155,15 +184,86 @@ class SecureFilePathLocation(SyftObject): def __repr__(self) -> str: return f"{self.path}" + def generate_url(self, *args): + raise NotImplementedError + @serializable() -class SeaweedSecureFilePathLocation(SecureFilePathLocation): +class SeaweedSecureFilePathLocationV1(SecureFilePathLocation): __canonical_name__ = "SeaweedSecureFilePathLocation" __version__ = SYFT_OBJECT_VERSION_1 upload_id: str +@serializable() +class SeaweedSecureFilePathLocation(SecureFilePathLocation): + __canonical_name__ = "SeaweedSecureFilePathLocation" + __version__ = SYFT_OBJECT_VERSION_2 + + upload_id: str + + def generate_url(self, connection, type_, bucket_name): + try: + url = connection.client.generate_presigned_url( + ClientMethod="get_object", + Params={"Bucket": bucket_name, "Key": self.path}, + ExpiresIn=READ_EXPIRATION_TIME, + ) + + # relative + from ..store.blob_storage import BlobRetrievalByURL + + return BlobRetrievalByURL( + url=GridURL.from_url(url), file_name=Path(self.path).name, type_=type_ + ) + except BotoClientError as e: + raise SyftException(e) + + +@migrate(SeaweedSecureFilePathLocationV1, SeaweedSecureFilePathLocation) +def upgrade_seaweedsecurefilepathlocation_v1_to_v2(): + return [make_set_default("bucket_name", "")] + + +@migrate(SeaweedSecureFilePathLocation, SeaweedSecureFilePathLocationV1) +def downgrade_seaweedsecurefilepathlocation_v2_to_v1(): + return [ + drop(["bucket_name"]), + ] + + +@serializable() +class AzureSecureFilePathLocation(SecureFilePathLocation): + __canonical_name__ = "AzureSecureFilePathLocation" + __version__ = SYFT_OBJECT_VERSION_1 + + # upload_id: str + azure_profile_name: str # Used by Seaweedfs to refer to a remote config + bucket_name: str + + def generate_url(self, connection, type_, *args): + # SAS is almost the same thing as the presigned url + config = connection.config.remote_profiles[self.azure_profile_name] + account_name = config.account_name + container_name = config.container_name + blob_name = self.path + sas_blob = generate_blob_sas( + account_name=account_name, + container_name=container_name, + blob_name=blob_name, + account_key=config.account_key, + permission=BlobSasPermissions(read=True), + expiry=datetime.utcnow() + timedelta(hours=48), + ) + url = f"https://{config.account_name}.blob.core.windows.net/{container_name}/{blob_name}?{sas_blob}" + + # relative + from ..store.blob_storage import BlobRetrievalByURL + + return BlobRetrievalByURL(url=url, file_name=Path(self.path).name, type_=type_) + + @serializable() class BlobStorageEntryV1(SyftObject): __canonical_name__ = "BlobStorageEntry" diff --git a/packages/syft/src/syft/types/transforms.py b/packages/syft/src/syft/types/transforms.py index 01011f0a518..d4d7a0c38ca 100644 --- a/packages/syft/src/syft/types/transforms.py +++ b/packages/syft/src/syft/types/transforms.py @@ -145,6 +145,13 @@ def validate_email(context: TransformContext) -> TransformContext: return context +def str_url_to_grid_url(context: TransformContext) -> TransformContext: + url = context.output.get("url", None) + if url is not None and isinstance(url, str): + context.output["url"] = GridURL.from_url(str) + return context + + def add_credentials_for_key(key: str) -> Callable: def add_credentials(context: TransformContext) -> TransformContext: context.output[key] = context.credentials