From edd25d5e41e69a85fa9cbed23ad8ccb3479448d7 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Fri, 15 Dec 2023 15:01:35 +0000 Subject: [PATCH] fix read --- notebooks/helm/helm-syft.ipynb | 933 +++--------------- .../src/syft/store/blob_storage/__init__.py | 9 +- 2 files changed, 155 insertions(+), 787 deletions(-) diff --git a/notebooks/helm/helm-syft.ipynb b/notebooks/helm/helm-syft.ipynb index 18c3ea9b326..5e20747f873 100644 --- a/notebooks/helm/helm-syft.ipynb +++ b/notebooks/helm/helm-syft.ipynb @@ -4,15 +4,7 @@ "cell_type": "code", "execution_count": 1, "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "kj/filesystem-disk-unix.c++:1703: warning: PWD environment variable doesn't match current directory; pwd = /Users/koen/workspace/pysyft\n" - ] - } - ], + "outputs": [], "source": [ "import syft as sy\n", "from syft.store.blob_storage import BlobStorageConfig, BlobStorageClientConfig\n", @@ -97,15 +89,15 @@ "metadata": {}, "outputs": [], "source": [ - "blob_config = BlobStorageConfig(client_type=SeaweedFSClient,\n", - " client_config=SeaweedFSClientConfig(host=\"http://0.0.0.0\",\n", - " port=\"8333\",\n", - " access_key=\"admin\",\n", - " secret_key=\"admin\",\n", - " bucket_name=\"test_bucket\",\n", - " region=\"us-east-1\"\n", - " )\n", - ")" + "# blob_config = BlobStorageConfig(client_type=SeaweedFSClient,\n", + "# client_config=SeaweedFSClientConfig(host=\"http://0.0.0.0\",\n", + "# port=\"8333\",\n", + "# access_key=\"admin\",\n", + "# secret_key=\"admin\",\n", + "# bucket_name=\"test_bucket\",\n", + "# region=\"us-east-1\"\n", + "# )\n", + "# )" ] }, { @@ -114,7 +106,7 @@ "metadata": {}, "outputs": [], "source": [ - "node.python_node.init_blob_storage(blob_config)" + "# node.python_node.init_blob_storage(blob_config)" ] }, { @@ -144,7 +136,9 @@ "metadata": {}, "outputs": [], "source": [ - "# scenario_objs[0]" + "input_files = ActionObject.from_obj([\n", + " BlobFile.upload_from_path(\"short_input.jsonl\", client)\n", + "])" ] }, { @@ -153,9 +147,7 @@ "metadata": {}, "outputs": [], "source": [ - "input_files = ActionObject.from_obj([\n", - " BlobFile.upload_from_path(\"short_input.jsonl\", client)\n", - "])" + "input_files_ptr = input_files.send(client)" ] }, { @@ -169,7 +161,12 @@ " asset_list=[\n", " sy.Asset(\n", " name=\"helm train data\",\n", - " data=input_files,\n", + " data=input_files_ptr,\n", + " mock=sy.ActionObject.empty()\n", + " ),\n", + " sy.Asset(\n", + " name=\"helm test data\",\n", + " data=scenario_files_ptr,\n", " mock=sy.ActionObject.empty()\n", " )\n", " ]\n", @@ -185,14 +182,22 @@ "name": "stderr", "output_type": "stream", "text": [ - "100%|█████████████████████████████████████████████████████████████████████████████████████████████████████████| 1/1 [00:00<00:00, 6.80it/s]\n" + " 50%|██████████████████████████▌ | 1/2 [00:00<00:00, 5.07it/s]" ] }, { "name": "stdout", "output_type": "stream", "text": [ - "Uploading: helm train data\n" + "Uploading: helm train data\n", + "Uploading: helm test data\n" + ] + }, + { + "name": "stderr", + "output_type": "stream", + "text": [ + "100%|█████████████████████████████████████████████████████| 2/2 [00:00<00:00, 5.12it/s]\n" ] }, { @@ -222,6 +227,15 @@ "input_files_asset = client.datasets[\"Helm dataset\"].assets[0]" ] }, + { + "cell_type": "code", + "execution_count": 12, + "metadata": {}, + "outputs": [], + "source": [ + "scenario_files_asset = client.datasets[\"Helm dataset\"].assets[1]" + ] + }, { "cell_type": "markdown", "metadata": {}, @@ -231,7 +245,7 @@ }, { "cell_type": "code", - "execution_count": 12, + "execution_count": 13, "metadata": {}, "outputs": [ { @@ -337,7 +351,7 @@ }, { "cell_type": "code", - "execution_count": 13, + "execution_count": 14, "metadata": {}, "outputs": [ { @@ -349,7 +363,7 @@ "SyftSuccess: User Code Submitted" ] }, - "execution_count": 13, + "execution_count": 14, "metadata": {}, "output_type": "execute_result" } @@ -360,7 +374,7 @@ }, { "cell_type": "code", - "execution_count": 14, + "execution_count": 15, "metadata": {}, "outputs": [ { @@ -377,7 +391,7 @@ } ], "source": [ - "@sy.syft_function_single_use(input_files=input_files_asset, scenario_files=scenario_files_ptr)\n", + "@sy.syft_function_single_use(input_files=input_files_asset, scenario_files=scenario_files_asset)\n", "def main_function(domain, input_files, scenario_files):\n", " N = [5, 9, 13]\n", " jobs = []\n", @@ -396,7 +410,7 @@ }, { "cell_type": "code", - "execution_count": 19, + "execution_count": 16, "metadata": {}, "outputs": [ { @@ -409,13 +423,13 @@ { "data": { "text/html": [ - "
SyftSuccess: Request 5569f1e0d1de42748b2962456d3a3609 changes applied

" + "
SyftSuccess: Request 5bcfb440f8ba41699e7f2275fb2e48aa changes applied

" ], "text/plain": [ - "SyftSuccess: Request 5569f1e0d1de42748b2962456d3a3609 changes applied" + "SyftSuccess: Request 5bcfb440f8ba41699e7f2275fb2e48aa changes applied" ] }, - "execution_count": 19, + "execution_count": 16, "metadata": {}, "output_type": "execute_result" } @@ -427,20 +441,12 @@ }, { "cell_type": "code", - "execution_count": 20, + "execution_count": 17, "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "27/11/23 13:16:36 FUNCTION LOG (6758c7aab545457dbb249f9f4c0f7391): starting overlap computation\n" - ] - } - ], + "outputs": [], "source": [ "job = client.code.main_function(input_files=input_files_asset,\n", - " scenario_files=scenario_files_ptr,\n", + " scenario_files=scenario_files_asset,\n", " blocking=False)" ] }, @@ -453,7 +459,7 @@ }, { "cell_type": "code", - "execution_count": 21, + "execution_count": 18, "metadata": {}, "outputs": [ { @@ -461,14 +467,13 @@ "text/markdown": [ "```python\n", "class Job:\n", - " id: UID = a087f43d9b23419aa8c4045337ba1ab9\n", - " status: completed\n", + " id: UID = 5d6d06e16f894361b07eaa4321a1f205\n", + " status: created\n", " has_parent: False\n", " result: ActionDataEmpty \n", " logs:\n", "\n", "0 \n", - "JOB COMPLETED\n", " \n", "```" ], @@ -476,7 +481,7 @@ "syft.service.job.job_stash.Job" ] }, - "execution_count": 21, + "execution_count": 18, "metadata": {}, "output_type": "execute_result" } @@ -487,7 +492,7 @@ }, { "cell_type": "code", - "execution_count": 22, + "execution_count": 27, "metadata": {}, "outputs": [ { @@ -696,7 +701,7 @@ " flex-grow: 0;\n", " }\n", "\n", - " .grid-table52fd8ebdcfe645b4bda3cdb0f792f451 {\n", + " .grid-tablefd8b10a5b6ae4e37ab40e44e0e672e2a {\n", " display:grid;\n", " grid-template-columns: 1fr repeat(28, 1fr);\n", " grid-template-rows: repeat(2, 1fr);\n", @@ -868,25 +873,25 @@ "
\n", "
\n", "
\n", - "
\n", - "
\n", + "
\n", " \n", "
\n", - " \n", + " \n", "
\n", - " \n", "
\n", "\n", - "

0

\n", + "

0

\n", "
\n", - "
\n", + "
\n", " \n", "
\n", - "
\n", + "
\n", " \n", "
\n", "
\n", @@ -1103,7 +1108,7 @@ "[syft.service.job.job_stash.Job]" ] }, - "execution_count": 22, + "execution_count": 27, "metadata": {}, "output_type": "execute_result" } @@ -1114,680 +1119,38 @@ }, { "cell_type": "code", - "execution_count": 23, - "metadata": {}, - "outputs": [ - { - "data": { - "text/html": [ - "\n", - "\n", - "\n", - "\n", - "
\n", - "
\n", - "
\n", - "

Job List

\n", - "
\n", - "\n", - "
\n", - "
\n", - "
\n", - "
\n", - "
\n", - " \n", - "
\n", - " \n", - "
\n", - " \n", - "
\n", - "\n", - "

0

\n", - "
\n", - "
\n", - " \n", - "
\n", - "
\n", - " \n", - "
\n", - "
\n", - "
\n", - " \n" - ], - "text/plain": [ - "" - ] - }, - "execution_count": 23, - "metadata": {}, - "output_type": "execute_result" - }, - { - "name": "stderr", - "output_type": "stream", - "text": [ - "27/11/23 13:16:41 FUNCTION LOG (6758c7aab545457dbb249f9f4c0f7391): preparing scenarios and creating indexes\n", - "27/11/23 13:16:41 FUNCTION LOG (6758c7aab545457dbb249f9f4c0f7391): computing overlap\n" - ] - } - ], - "source": [ - "client.jobs" - ] - }, - { - "cell_type": "code", - "execution_count": 24, - "metadata": {}, - "outputs": [ - { - "name": "stdout", - "output_type": "stream", - "text": [ - "starting overlap computation\n", - "preparing scenarios and creating indexes\n", - "computing overlap\n", - "\n", - "\n" - ] - } - ], - "source": [ - "job.subjobs[0].logs()" - ] - }, - { - "cell_type": "code", - "execution_count": 25, - "metadata": {}, - "outputs": [ - { - "name": "stderr", - "output_type": "stream", - "text": [ - "27/11/23 13:16:52 FUNCTION LOG (6758c7aab545457dbb249f9f4c0f7391): done\n" - ] - } - ], - "source": [ - "results = [j.wait().get() for j in job.subjobs]" - ] - }, - { - "cell_type": "code", - "execution_count": 26, + "execution_count": 29, + "metadata": {}, + "outputs": [ + { + "name": "stdout", + "output_type": "stream", + "text": [ + "starting overlap computation\n", + "preparing scenarios and creating indexes\n", + "computing overlap\n", + "done\n", + "\n", + "\n" + ] + } + ], + "source": [ + "job.subjobs[0].logs()" + ] + }, + { + "cell_type": "code", + "execution_count": 30, + "metadata": {}, + "outputs": [], + "source": [ + "results = [j.wait().get() for j in job.subjobs]" + ] + }, + { + "cell_type": "code", + "execution_count": 31, "metadata": {}, "outputs": [ { @@ -1809,7 +1172,7 @@ " 'anatomy_test_5': 135}))]" ] }, - "execution_count": 26, + "execution_count": 31, "metadata": {}, "output_type": "execute_result" } @@ -1821,7 +1184,7 @@ }, { "cell_type": "code", - "execution_count": 27, + "execution_count": 32, "metadata": {}, "outputs": [ { @@ -1842,7 +1205,7 @@ " 'anatomy_test_5': 135}))" ] }, - "execution_count": 27, + "execution_count": 32, "metadata": {}, "output_type": "execute_result" } @@ -1860,7 +1223,7 @@ }, { "cell_type": "code", - "execution_count": 28, + "execution_count": 33, "metadata": {}, "outputs": [], "source": [ @@ -1911,7 +1274,7 @@ }, { "cell_type": "code", - "execution_count": 29, + "execution_count": 34, "metadata": {}, "outputs": [ { diff --git a/packages/syft/src/syft/store/blob_storage/__init__.py b/packages/syft/src/syft/store/blob_storage/__init__.py index 7818830739c..557d77553e8 100644 --- a/packages/syft/src/syft/store/blob_storage/__init__.py +++ b/packages/syft/src/syft/store/blob_storage/__init__.py @@ -105,8 +105,13 @@ def _read(self): with open(self.file_name, "rb") as f: return f.read() - def _read_data(self, **kwargs): - return self._read() + def _read_data(self, stream=False, **kwargs): + res = self._read() + # TODO: this is maybe not the right solution + if stream: + return [res] + else: + return res @migrate(BlobRetrieval, BlobRetrievalV1)