diff --git a/.pre-commit-config.yaml b/.pre-commit-config.yaml index 521e3f9b60c..646698814a0 100644 --- a/.pre-commit-config.yaml +++ b/.pre-commit-config.yaml @@ -14,7 +14,7 @@ repos: exclude: ^(packages/grid/frontend/|.vscode) - id: check-added-large-files always_run: true - exclude: ^(packages/grid/backend/wheels/.*|docs/img/header.png|docs/img/terminalizer.gif) + exclude: ^(packages/grid/backend/wheels/.*|docs/img/header.png|docs/img/terminalizer.gif|^notebooks/scenarios/bigquery/upgradability/sync/migration_.*\.blob) - id: check-yaml always_run: true exclude: ^(packages/grid/k8s/rendered/|packages/grid/helm/) diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/01-setup-high-low-datasites.ipynb b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/01-setup-high-low-datasites.ipynb new file mode 100644 index 00000000000..65a947371db --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/01-setup-high-low-datasites.ipynb @@ -0,0 +1,241 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "import syft as sy\n", + "from syft import test_settings\n", + "\n", + "print(f\"syft version: {sy.__version__}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_low = sy.orchestra.launch(\n", + " name=\"bigquery-low\",\n", + " server_side_type=\"low\",\n", + " dev_mode=True,\n", + " reset=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")\n", + "\n", + "server_high = sy.orchestra.launch(\n", + " name=\"bigquery-high\",\n", + " server_side_type=\"high\",\n", + " dev_mode=True,\n", + " reset=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "low_client = server_low.login(email=\"info@openmined.org\", password=\"changethis\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client = server_high.login(email=\"info@openmined.org\", password=\"changethis\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "low_client.worker_pools.get_all()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(high_client.worker_pools.get_all()) == 1\n", + "assert len(low_client.worker_pools.get_all()) == 1" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def launch_worker_pool(client, pool_name):\n", + " if pool_name not in [x.name for x in client.worker_pools]:\n", + " external_registry = test_settings.get(\"external_registry\", default=\"docker.io\")\n", + " worker_docker_tag = f\"openmined/bigquery:{sy.__version__}\"\n", + " result = client.api.services.worker_image.submit(\n", + " worker_config=sy.PrebuiltWorkerConfig(\n", + " tag=f\"{external_registry}/{worker_docker_tag}\"\n", + " )\n", + " )\n", + " worker_image = client.images.get_all()[1]\n", + " result = client.api.services.image_registry.add(external_registry)\n", + " result = client.api.services.worker_pool.launch(\n", + " pool_name=pool_name,\n", + " image_uid=worker_image.id,\n", + " num_workers=1,\n", + " )\n", + " return result\n", + " else:\n", + " print(\"Pool already exists\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "pool_name = \"bigquery-pool\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "launch_worker_pool(high_client, pool_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "launch_worker_pool(low_client, pool_name)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# result = high_client.worker_pools.scale(number=5, pool_name=pool_name)\n", + "# result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(high_client.worker_pools.get_all()) == 2\n", + "assert len(low_client.worker_pools.get_all()) == 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "base_worker_image = high_client.images.get_all()[0]\n", + "base_worker_image" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "low_client.register(\n", + " email=\"data_scientist@openmined.org\",\n", + " password=\"verysecurepassword\",\n", + " password_verify=\"verysecurepassword\",\n", + " name=\"John Doe\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.settings.allow_guest_signup(enable=False)\n", + "low_client.settings.allow_guest_signup(enable=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert (\n", + " len(low_client.api.services.user.get_all()) == 2\n", + "), \"Only DS and Admin should be at low side\"\n", + "assert (\n", + " len(high_client.api.services.user.get_all()) == 1\n", + "), \"Only Admin should be at high side\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_high.land()\n", + "server_low.land()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/02-configure-api-and-sync.ipynb b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/02-configure-api-and-sync.ipynb new file mode 100644 index 00000000000..ad7bf7a823a --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/02-configure-api-and-sync.ipynb @@ -0,0 +1,611 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# !pip install db-dtypes google-cloud-bigquery" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "# stdlib\n", + "\n", + "# third party\n", + "# set to use the live APIs\n", + "# import os\n", + "# os.environ[\"TEST_BIGQUERY_APIS_LIVE\"] = \"True\"\n", + "from apis import make_schema\n", + "from apis import make_submit_query\n", + "from apis import make_test_query" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "# syft absolute\n", + "import syft as sy\n", + "from syft import test_settings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "server_low = sy.orchestra.launch(\n", + " name=\"bigquery-low\",\n", + " server_side_type=\"low\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")\n", + "\n", + "server_high = sy.orchestra.launch(\n", + " name=\"bigquery-high\",\n", + " server_side_type=\"high\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "low_client = server_low.login(email=\"info@openmined.org\", password=\"changethis\")\n", + "high_client = server_high.login(email=\"info@openmined.org\", password=\"changethis\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "assert len(high_client.worker_pools.get_all()) == 2\n", + "assert len(low_client.worker_pools.get_all()) == 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "this_worker_pool_name = \"bigquery-pool\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "# !pip list | grep bigquery" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "# !pip install db-dtypes google-cloud-bigquery" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Twin endpoints" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "mock_func = make_test_query(\n", + " settings={\n", + " \"rate_limiter_enabled\": True,\n", + " \"calls_per_min\": 10,\n", + " }\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "private_func = make_test_query(\n", + " settings={\n", + " \"rate_limiter_enabled\": False,\n", + " }\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "new_endpoint = sy.TwinAPIEndpoint(\n", + " path=\"bigquery.test_query\",\n", + " description=\"This endpoint allows to query Bigquery storage via SQL queries.\",\n", + " private_function=private_func,\n", + " mock_function=mock_func,\n", + " worker_pool=this_worker_pool_name,\n", + ")\n", + "\n", + "high_client.custom_api.add(endpoint=new_endpoint)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "# Here, we update the endpoint to timeout after 100s (rather the default of 60s)\n", + "high_client.api.services.api.update(\n", + " endpoint_path=\"bigquery.test_query\", endpoint_timeout=120\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "high_client.api.services.api.update(\n", + " endpoint_path=\"bigquery.test_query\", hide_mock_definition=True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "schema_function = make_schema(\n", + " settings={\n", + " \"calls_per_min\": 5,\n", + " },\n", + " worker_pool=this_worker_pool_name,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "high_client.custom_api.add(endpoint=schema_function)\n", + "high_client.refresh()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "metadata": {} + }, + "outputs": [], + "source": [ + "dataset_1 = test_settings.get(\"dataset_1\", default=\"dataset_1\")\n", + "dataset_2 = test_settings.get(\"dataset_2\", default=\"dataset_2\")\n", + "table_1 = test_settings.get(\"table_1\", default=\"table_1\")\n", + "table_2 = test_settings.get(\"table_2\", default=\"table_2\")\n", + "table_2_col_id = test_settings.get(\"table_2_col_id\", default=\"table_id\")\n", + "table_2_col_score = test_settings.get(\"table_2_col_score\", default=\"colname\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test mock version\n", + "result = high_client.api.services.bigquery.test_query.mock(\n", + " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 10\"\n", + ")\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.api.services.bigquery.schema()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "submit_query_function = make_submit_query(\n", + " settings={}, worker_pool=this_worker_pool_name\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.custom_api.add(endpoint=submit_query_function)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.api.services.api.update(\n", + " endpoint_path=\"bigquery.submit_query\", hide_mock_definition=True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.custom_api.api_endpoints()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(high_client.custom_api.api_endpoints()) == 3" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert (\n", + " high_client.api.services.bigquery.test_query\n", + " and high_client.api.services.bigquery.submit_query\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test mock version\n", + "result = high_client.api.services.bigquery.test_query.mock(\n", + " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 10\"\n", + ")\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Bug with the new Error PR: message printed multiple times. TODO clean up the duplicate exception messages.\n", + "\n", + "# Test mock version for wrong queries\n", + "with sy.raises(\n", + " sy.SyftException(public_message=\"*must be qualified with a dataset*\"), show=True\n", + "):\n", + " high_client.api.services.bigquery.test_query.mock(\n", + " sql_query=\"SELECT * FROM invalid_table LIMIT 1\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test private version\n", + "result = high_client.api.services.bigquery.test_query.private(\n", + " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 10\"\n", + ")\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Testing submit query\n", + "result = high_client.api.services.bigquery.submit_query(\n", + " func_name=\"my_func\",\n", + " query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 1\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert \"Query submitted\" in result\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job = high_client.code.my_func(blocking=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job.result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job.wait()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "from syft.client.syncing import compare_clients\n", + "from syft.service.job.job_stash import Job\n", + "from syft.service.job.job_stash import JobStatus" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def is_job_to_sync(batch):\n", + " if batch.status != \"NEW\":\n", + " return False\n", + " if not isinstance(batch.root.high_obj, Job):\n", + " return False\n", + " job = batch.root.high_obj\n", + " return job.status in (JobStatus.ERRORED, JobStatus.COMPLETED)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def sync_new_objects(\n", + " from_client, to_client, dry_run: bool = True, private_data: bool = False\n", + "):\n", + " sim = \"Simulating \" if dry_run else \"\"\n", + " priv = \"WITH PRIVATE DATA\" if private_data else \"\"\n", + " print(f\"{sim}Syncing from {from_client.name} to {to_client.name} {priv}\")\n", + " changes = []\n", + " diff = compare_clients(\n", + " from_client=from_client, to_client=to_client, hide_usercode=False\n", + " )\n", + " if isinstance(diff, sy.SyftError):\n", + " return diff\n", + "\n", + " for batch in diff.batches:\n", + " try:\n", + " if is_job_to_sync(batch) or batch.status == \"NEW\":\n", + " w = batch.resolve(build_state=False)\n", + " if private_data:\n", + " w.click_share_all_private_data()\n", + " if not dry_run:\n", + " w.click_sync()\n", + " change_text = f\"Synced {batch.status} {batch.root_type.__name__}\"\n", + " if not dry_run:\n", + " changes.append(change_text)\n", + " else:\n", + " print(f\"Would have run: {change_text}\")\n", + " except Exception as e:\n", + " print(\"sync_new_objects\", e)\n", + " raise e\n", + " return changes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result = sync_new_objects(high_client, low_client)\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result = sync_new_objects(high_client, low_client, dry_run=False)\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert [\n", + " \"Synced NEW TwinAPIEndpoint\",\n", + " \"Synced NEW TwinAPIEndpoint\",\n", + " \"Synced NEW TwinAPIEndpoint\",\n", + "] == result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# widget = sy.sync(from_client=high_client, to_client=low_client, hide_usercode=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# # TODO: ignore private function from high side in diff\n", + "# widget" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# widget.click_sync(0)\n", + "# widget.click_sync(1)\n", + "# widget.click_sync(2)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Some internal helper methods\n", + "\n", + "# widget._share_all()\n", + "# widget._sync_all()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_high.land()\n", + "server_low.land()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/03-ds-submit-request.ipynb b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/03-ds-submit-request.ipynb new file mode 100644 index 00000000000..8d6c0665dc7 --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/03-ds-submit-request.ipynb @@ -0,0 +1,269 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "import syft as sy\n", + "from syft import test_settings" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Only low side server and login as DS" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_low = sy.orchestra.launch(\n", + " name=\"bigquery-low\",\n", + " server_side_type=\"low\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ds_client = server_low.login(\n", + " email=\"data_scientist@openmined.org\", password=\"verysecurepassword\"\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Low side research" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(ds_client.custom_api.api_endpoints()) == 3" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataset_1 = test_settings.get(\"dataset_1\", default=\"dataset_1\")\n", + "dataset_2 = test_settings.get(\"dataset_2\", default=\"dataset_2\")\n", + "table_1 = test_settings.get(\"table_1\", default=\"table_1\")\n", + "table_2 = test_settings.get(\"table_2\", default=\"table_2\")\n", + "table_2_col_id = test_settings.get(\"table_2_col_id\", default=\"table_id\")\n", + "table_2_col_score = test_settings.get(\"table_2_col_score\", default=\"colname\")\n", + "query_limit_size = test_settings.get(\"query_limit_size\", default=10000)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result = ds_client.api.services.bigquery.test_query.mock(\n", + " sql_query=f\"SELECT * from {dataset_2}.{table_2} limit 10\"\n", + ")\n", + "assert len(result) == 10" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "with sy.raises(sy.SyftException, show=True):\n", + " ds_client.api.services.bigquery.test_query.private(\n", + " sql_query=f\"SELECT * from {dataset_2}.{table_2} limit 10\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res = ds_client.api.services.bigquery.schema()\n", + "# third party\n", + "import pandas as pd\n", + "\n", + "assert isinstance(res.get(), pd.DataFrame)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "FUNC_NAME = \"large_sample\"\n", + "LARGE_SAMPLE_QUERY = f\"SELECT * FROM {dataset_2}.{table_2} LIMIT {query_limit_size}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "mock_res = ds_client.api.services.bigquery.test_query(sql_query=LARGE_SAMPLE_QUERY)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "submission = ds_client.api.services.bigquery.submit_query(\n", + " func_name=FUNC_NAME, query=LARGE_SAMPLE_QUERY\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def extract_code_path(response):\n", + " # stdlib\n", + " import re\n", + "\n", + " pattern = r\"client\\.code\\.(\\w+)\\(\\)\"\n", + " match = re.search(pattern, str(response))\n", + " if match:\n", + " extracted_code = match.group(1)\n", + " return extracted_code\n", + " return None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# why are we randomizing things here?\n", + "func_name = extract_code_path(submission)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "api_method = getattr(ds_client.code, func_name, None)\n", + "api_method" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# todo: this is very noisy, but it actually passes\n", + "with sy.raises(\n", + " sy.SyftException(\n", + " public_message=\"*Please wait for the admin to allow the execution of this code*\"\n", + " ),\n", + " show=True,\n", + "):\n", + " result = api_method(blocking=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert \"large_sample\" in func_name" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "api_method_2 = getattr(ds_client.code, func_name, None)\n", + "api_method_2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "with sy.raises(\n", + " sy.SyftException(public_message=\"*Your code is waiting for approval*\"), show=True\n", + "):\n", + " result = api_method_2()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_low.land()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/04-do-review-requests.ipynb b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/04-do-review-requests.ipynb new file mode 100644 index 00000000000..a4a632e2f13 --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/04-do-review-requests.ipynb @@ -0,0 +1,422 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "import syft as sy\n", + "from syft.service.code.user_code import UserCode\n", + "from syft.service.request.request import Request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_low = sy.orchestra.launch(\n", + " name=\"bigquery-low\",\n", + " server_side_type=\"low\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")\n", + "\n", + "server_high = sy.orchestra.launch(\n", + " name=\"bigquery-high\",\n", + " server_side_type=\"high\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "low_client = server_low.login(email=\"info@openmined.org\", password=\"changethis\")\n", + "high_client = server_high.login(email=\"info@openmined.org\", password=\"changethis\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# # todo: this is way too noisy\n", + "# widget = sy.sync(from_client=low_client, to_client=high_client)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# widget" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# sync the users new request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "from syft.client.syncing import compare_clients" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "from syft.service.job.job_stash import Job\n", + "from syft.service.job.job_stash import JobStatus" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def is_job_to_sync(batch):\n", + " if batch.status != \"NEW\":\n", + " return False\n", + " if not isinstance(batch.root.high_obj, Job):\n", + " return False\n", + " job = batch.root.high_obj\n", + " return job.status in (JobStatus.ERRORED, JobStatus.COMPLETED)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def sync_new_objects(\n", + " from_client, to_client, dry_run: bool = True, private_data: bool = False\n", + "):\n", + " sim = \"Simulating \" if dry_run else \"\"\n", + " priv = \"WITH PRIVATE DATA\" if private_data else \"\"\n", + " print(f\"{sim}Syncing from {from_client.name} to {to_client.name} {priv}\")\n", + " changes = []\n", + " diff = compare_clients(\n", + " from_client=from_client, to_client=to_client, hide_usercode=False\n", + " )\n", + " if isinstance(diff, sy.SyftError):\n", + " return diff\n", + "\n", + " for batch in diff.batches:\n", + " try:\n", + " if is_job_to_sync(batch) or batch.status == \"NEW\":\n", + " w = batch.resolve(build_state=False)\n", + " if private_data:\n", + " w.click_share_all_private_data()\n", + " if not dry_run:\n", + " w.click_sync()\n", + " change_text = f\"Synced {batch.status} {batch.root_type.__name__}\"\n", + " if not dry_run:\n", + " changes.append(change_text)\n", + " else:\n", + " print(f\"Would have run: {change_text}\")\n", + " except Exception as e:\n", + " print(\"sync_new_objects\", e)\n", + " raise e\n", + " return changes" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sync_new_objects(low_client, high_client)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result = sync_new_objects(low_client, high_client, dry_run=False, private_data=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert \"Synced NEW UserCode\" in result\n", + "assert \"Synced NEW Request\" in result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(high_client.code.get_all()) == 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "requests = high_client.requests\n", + "requests" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "user_request = None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for request in requests:\n", + " if \"large_sample\" in getattr(\n", + " getattr(request, \"code\", None), \"service_func_name\", None\n", + " ):\n", + " user_request = request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert user_request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "def execute_request(client, request) -> dict:\n", + " if not isinstance(request, Request):\n", + " return \"This is not a request\"\n", + "\n", + " code = request.code\n", + " if not isinstance(code, UserCode):\n", + " return \"No usercode found\"\n", + "\n", + " func_name = request.code.service_func_name\n", + " api_func = getattr(client.code, func_name, None)\n", + " if api_func is None:\n", + " return \"Code name was not found on the client.\"\n", + "\n", + " job = api_func(blocking=False)\n", + " return job" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job = execute_request(high_client, user_request)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job.wait()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# # todo: this is way too noisy\n", + "# widget = sy.sync(from_client=high_client, to_client=low_client)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# widget" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sync_new_objects(high_client, low_client)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result = sync_new_objects(high_client, low_client, dry_run=False, private_data=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert \"Synced NEW Job\" in result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "requests = low_client.requests\n", + "requests" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "user_request = None" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for request in requests:\n", + " if \"large_sample\" in getattr(\n", + " getattr(request, \"code\", None), \"service_func_name\", None\n", + " ):\n", + " user_request = request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "user_request.status" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert \"approved\" in str(user_request.status).lower()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_high.land()\n", + "server_low.land()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/05-ds-get-results.ipynb b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/05-ds-get-results.ipynb new file mode 100644 index 00000000000..6981f1e7105 --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/05-ds-get-results.ipynb @@ -0,0 +1,131 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# third party\n", + "import pandas as pd\n", + "\n", + "# syft absolute\n", + "import syft as sy\n", + "from syft import test_settings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_low = sy.orchestra.launch(\n", + " name=\"bigquery-low\",\n", + " server_side_type=\"low\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ds_client = server_low.login(\n", + " email=\"data_scientist@openmined.org\", password=\"verysecurepassword\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "api_method = None\n", + "for code in ds_client.code:\n", + " if \"large_sample\" in code.service_func_name:\n", + " api_method = getattr(ds_client.code, code.service_func_name, None)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job = api_method(blocking=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res = job.wait().get()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(res, pd.DataFrame)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "query_limit_size = test_settings.get(\"query_limit_size\", default=10000)\n", + "assert len(res) == query_limit_size" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_low.land()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 4 +} diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/__init__.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/__init__.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/__init__.py new file mode 100644 index 00000000000..7231b580696 --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/__init__.py @@ -0,0 +1,23 @@ +# stdlib +import os + +# syft absolute +from syft.util.util import str_to_bool + +# relative +from .submit_query import make_submit_query + +env_var = "TEST_BIGQUERY_APIS_LIVE" +use_live = str_to_bool(str(os.environ.get(env_var, "False"))) +env_name = "Live" if use_live else "Mock" +print(f"Using {env_name} API Code, this will query BigQuery. ${env_var}=={use_live}") + + +if use_live: + # relative + from .live.schema import make_schema + from .live.test_query import make_test_query +else: + # relative + from .mock.schema import make_schema + from .mock.test_query import make_test_query diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/live/__init__.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/live/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/live/schema.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/live/schema.py new file mode 100644 index 00000000000..5b39d9d9066 --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/live/schema.py @@ -0,0 +1,108 @@ +# stdlib +from collections.abc import Callable + +# syft absolute +import syft as sy +from syft import test_settings + +# relative +from ..rate_limiter import is_within_rate_limit + + +def make_schema(settings: dict, worker_pool: str) -> Callable: + updated_settings = { + "calls_per_min": 5, + "rate_limiter_enabled": True, + "credentials": test_settings.gce_service_account.to_dict(), + "region": test_settings.gce_region, + "project_id": test_settings.gce_project_id, + "dataset_1": test_settings.dataset_1, + "table_1": test_settings.table_1, + "table_2": test_settings.table_2, + } | settings + + @sy.api_endpoint( + path="bigquery.schema", + description="This endpoint allows for visualising the metadata of tables available in BigQuery.", + settings=updated_settings, + helper_functions=[ + is_within_rate_limit + ], # Adds ratelimit as this is also a method available to data scientists + worker_pool=worker_pool, + ) + def live_schema( + context, + ) -> str: + # stdlib + import datetime + + # third party + from google.cloud import bigquery # noqa: F811 + from google.oauth2 import service_account + import pandas as pd + + # syft absolute + from syft import SyftException + + # Auth for Bigquer based on the workload identity + credentials = service_account.Credentials.from_service_account_info( + context.settings["credentials"] + ) + scoped_credentials = credentials.with_scopes( + ["https://www.googleapis.com/auth/cloud-platform"] + ) + + client = bigquery.Client( + credentials=scoped_credentials, + location=context.settings["region"], + ) + + # Store a dict with the calltimes for each user, via the email. + if context.settings["rate_limiter_enabled"]: + if context.user.email not in context.state.keys(): + context.state[context.user.email] = [] + + if not context.code.is_within_rate_limit(context): + raise SyftException( + public_message="Rate limit of calls per minute has been reached." + ) + context.state[context.user.email].append(datetime.datetime.now()) + + try: + # Formats the data schema in a data frame format + # Warning: the only supported format types are primitives, np.ndarrays and pd.DataFrames + + data_schema = [] + for table_id in [ + f"{context.settings['dataset_1']}.{context.settings['table_1']}", + f"{context.settings['dataset_1']}.{context.settings['table_2']}", + ]: + table = client.get_table(table_id) + for schema in table.schema: + data_schema.append( + { + "project": str(table.project), + "dataset_id": str(table.dataset_id), + "table_id": str(table.table_id), + "schema_name": str(schema.name), + "schema_field": str(schema.field_type), + "description": str(table.description), + "num_rows": str(table.num_rows), + } + ) + return pd.DataFrame(data_schema) + + except Exception as e: + # not a bigquery exception + if not hasattr(e, "_errors"): + output = f"got exception e: {type(e)} {str(e)}" + raise SyftException( + public_message=f"An error occured executing the API call {output}" + ) + + # Should add appropriate error handling for what should be exposed to the data scientists. + raise SyftException( + public_message="An error occured executing the API call, please contact the domain owner." + ) + + return live_schema diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/live/test_query.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/live/test_query.py new file mode 100644 index 00000000000..344879dcb62 --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/live/test_query.py @@ -0,0 +1,113 @@ +# stdlib +from collections.abc import Callable + +# syft absolute +import syft as sy +from syft import test_settings + +# relative +from ..rate_limiter import is_within_rate_limit + + +def make_test_query(settings) -> Callable: + updated_settings = { + "calls_per_min": 10, + "rate_limiter_enabled": True, + "credentials": test_settings.gce_service_account.to_dict(), + "region": test_settings.gce_region, + "project_id": test_settings.gce_project_id, + } | settings + + # these are the same if you allow the rate limiter to be turned on and off + @sy.api_endpoint_method( + settings=updated_settings, + helper_functions=[is_within_rate_limit], + ) + def live_test_query( + context, + sql_query: str, + ) -> str: + # stdlib + import datetime + + # third party + from google.cloud import bigquery # noqa: F811 + from google.oauth2 import service_account + + # syft absolute + from syft import SyftException + + # Auth for Bigquer based on the workload identity + credentials = service_account.Credentials.from_service_account_info( + context.settings["credentials"] + ) + scoped_credentials = credentials.with_scopes( + ["https://www.googleapis.com/auth/cloud-platform"] + ) + + client = bigquery.Client( + credentials=scoped_credentials, + location=context.settings["region"], + ) + + # Store a dict with the calltimes for each user, via the email. + if context.settings["rate_limiter_enabled"]: + if context.user.email not in context.state.keys(): + context.state[context.user.email] = [] + + if not context.code.is_within_rate_limit(context): + raise SyftException( + public_message="Rate limit of calls per minute has been reached." + ) + context.state[context.user.email].append(datetime.datetime.now()) + + try: + rows = client.query_and_wait( + sql_query, + project=context.settings["project_id"], + ) + + if rows.total_rows > 1_000_000: + raise SyftException( + public_message="Please only write queries that gather aggregate statistics" + ) + + return rows.to_dataframe() + + except Exception as e: + # not a bigquery exception + if not hasattr(e, "_errors"): + output = f"got exception e: {type(e)} {str(e)}" + raise SyftException( + public_message=f"An error occured executing the API call {output}" + ) + + # Treat all errors that we would like to be forwarded to the data scientists + # By default, any exception is only visible to the data owner. + + if e._errors[0]["reason"] in [ + "badRequest", + "blocked", + "duplicate", + "invalidQuery", + "invalid", + "jobBackendError", + "jobInternalError", + "notFound", + "notImplemented", + "rateLimitExceeded", + "resourceInUse", + "resourcesExceeded", + "tableUnavailable", + "timeout", + ]: + raise SyftException( + public_message="Error occured during the call: " + + e._errors[0]["message"] + ) + else: + raise SyftException( + public_message="An error occured executing the API call, please contact the domain owner." + ) + + return live_test_query diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/__init__.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/__init__.py new file mode 100644 index 00000000000..e69de29bb2d diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/data.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/data.py new file mode 100644 index 00000000000..82262bf7a01 --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/data.py @@ -0,0 +1,268 @@ +# stdlib +from math import nan + +schema_dict = { + "project": { + 0: "example-project", + 1: "example-project", + 2: "example-project", + 3: "example-project", + 4: "example-project", + 5: "example-project", + 6: "example-project", + 7: "example-project", + 8: "example-project", + 9: "example-project", + 10: "example-project", + 11: "example-project", + 12: "example-project", + 13: "example-project", + 14: "example-project", + 15: "example-project", + 16: "example-project", + 17: "example-project", + 18: "example-project", + 19: "example-project", + 20: "example-project", + 21: "example-project", + 22: "example-project", + }, + "dataset_id": { + 0: "test_1gb", + 1: "test_1gb", + 2: "test_1gb", + 3: "test_1gb", + 4: "test_1gb", + 5: "test_1gb", + 6: "test_1gb", + 7: "test_1gb", + 8: "test_1gb", + 9: "test_1gb", + 10: "test_1gb", + 11: "test_1gb", + 12: "test_1gb", + 13: "test_1gb", + 14: "test_1gb", + 15: "test_1gb", + 16: "test_1gb", + 17: "test_1gb", + 18: "test_1gb", + 19: "test_1gb", + 20: "test_1gb", + 21: "test_1gb", + 22: "test_1gb", + }, + "table_id": { + 0: "posts", + 1: "posts", + 2: "posts", + 3: "posts", + 4: "posts", + 5: "posts", + 6: "posts", + 7: "comments", + 8: "comments", + 9: "comments", + 10: "comments", + 11: "comments", + 12: "comments", + 13: "comments", + 14: "comments", + 15: "comments", + 16: "comments", + 17: "comments", + 18: "comments", + 19: "comments", + 20: "comments", + 21: "comments", + 22: "comments", + }, + "schema_name": { + 0: "int64_field_0", + 1: "id", + 2: "name", + 3: "subscribers_count", + 4: "permalink", + 5: "nsfw", + 6: "spam", + 7: "int64_field_0", + 8: "id", + 9: "body", + 10: "parent_id", + 11: "created_at", + 12: "last_modified_at", + 13: "gilded", + 14: "permalink", + 15: "score", + 16: "comment_id", + 17: "post_id", + 18: "author_id", + 19: "spam", + 20: "deleted", + 21: "upvote_raio", + 22: "collapsed_in_crowd_control", + }, + "schema_field": { + 0: "INTEGER", + 1: "STRING", + 2: "STRING", + 3: "INTEGER", + 4: "STRING", + 5: "FLOAT", + 6: "BOOLEAN", + 7: "INTEGER", + 8: "STRING", + 9: "STRING", + 10: "STRING", + 11: "INTEGER", + 12: "INTEGER", + 13: "BOOLEAN", + 14: "STRING", + 15: "INTEGER", + 16: "STRING", + 17: "STRING", + 18: "STRING", + 19: "BOOLEAN", + 20: "BOOLEAN", + 21: "FLOAT", + 22: "BOOLEAN", + }, + "description": { + 0: "None", + 1: "None", + 2: "None", + 3: "None", + 4: "None", + 5: "None", + 6: "None", + 7: "None", + 8: "None", + 9: "None", + 10: "None", + 11: "None", + 12: "None", + 13: "None", + 14: "None", + 15: "None", + 16: "None", + 17: "None", + 18: "None", + 19: "None", + 20: "None", + 21: "None", + 22: "None", + }, + "num_rows": { + 0: "2000000", + 1: "2000000", + 2: "2000000", + 3: "2000000", + 4: "2000000", + 5: "2000000", + 6: "2000000", + 7: "2000000", + 8: "2000000", + 9: "2000000", + 10: "2000000", + 11: "2000000", + 12: "2000000", + 13: "2000000", + 14: "2000000", + 15: "2000000", + 16: "2000000", + 17: "2000000", + 18: "2000000", + 19: "2000000", + 20: "2000000", + 21: "2000000", + 22: "2000000", + }, +} + + +query_dict = { + "int64_field_0": { + 0: 4, + 1: 5, + 2: 10, + 3: 16, + 4: 17, + 5: 23, + 6: 24, + 7: 25, + 8: 27, + 9: 40, + }, + "id": { + 0: "t5_via1x", + 1: "t5_cv9gn", + 2: "t5_8p2tq", + 3: "t5_8fcro", + 4: "t5_td5of", + 5: "t5_z01fv", + 6: "t5_hmqjk", + 7: "t5_1flyj", + 8: "t5_5rwej", + 9: "t5_uurcv", + }, + "name": { + 0: "/channel/mylittlepony", + 1: "/channel/polyamory", + 2: "/channel/Catholicism", + 3: "/channel/cordcutters", + 4: "/channel/stevenuniverse", + 5: "/channel/entitledbitch", + 6: "/channel/engineering", + 7: "/channel/nottheonion", + 8: "/channel/FoodPorn", + 9: "/channel/puppysmiles", + }, + "subscribers_count": { + 0: 4323081, + 1: 2425929, + 2: 4062607, + 3: 7543226, + 4: 2692168, + 5: 2709080, + 6: 8766144, + 7: 2580984, + 8: 7784809, + 9: 3715991, + }, + "permalink": { + 0: "/channel//channel/mylittlepony", + 1: "/channel//channel/polyamory", + 2: "/channel//channel/Catholicism", + 3: "/channel//channel/cordcutters", + 4: "/channel//channel/stevenuniverse", + 5: "/channel//channel/entitledbitch", + 6: "/channel//channel/engineering", + 7: "/channel//channel/nottheonion", + 8: "/channel//channel/FoodPorn", + 9: "/channel//channel/puppysmiles", + }, + "nsfw": { + 0: nan, + 1: nan, + 2: nan, + 3: nan, + 4: nan, + 5: nan, + 6: nan, + 7: nan, + 8: nan, + 9: nan, + }, + "spam": { + 0: False, + 1: False, + 2: False, + 3: False, + 4: False, + 5: False, + 6: False, + 7: False, + 8: False, + 9: False, + }, +} diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/schema.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/schema.py new file mode 100644 index 00000000000..a95e04f2f1d --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/schema.py @@ -0,0 +1,52 @@ +# stdlib +from collections.abc import Callable + +# syft absolute +import syft as sy + +# relative +from ..rate_limiter import is_within_rate_limit +from .data import schema_dict + + +def make_schema(settings, worker_pool) -> Callable: + updated_settings = { + "calls_per_min": 5, + "rate_limiter_enabled": True, + "schema_dict": schema_dict, + } | settings + + @sy.api_endpoint( + path="bigquery.schema", + description="This endpoint allows for visualising the metadata of tables available in BigQuery.", + settings=updated_settings, + helper_functions=[is_within_rate_limit], + worker_pool=worker_pool, + ) + def mock_schema( + context, + ) -> str: + # syft absolute + from syft import SyftException + + # Store a dict with the calltimes for each user, via the email. + if context.settings["rate_limiter_enabled"]: + # stdlib + import datetime + + if context.user.email not in context.state.keys(): + context.state[context.user.email] = [] + + if not context.code.is_within_rate_limit(context): + raise SyftException( + public_message="Rate limit of calls per minute has been reached." + ) + context.state[context.user.email].append(datetime.datetime.now()) + + # third party + import pandas as pd + + df = pd.DataFrame(context.settings["schema_dict"]) + return df + + return mock_schema diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/test_query.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/test_query.py new file mode 100644 index 00000000000..ae028a8cf36 --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/mock/test_query.py @@ -0,0 +1,138 @@ +# stdlib +from collections.abc import Callable + +# syft absolute +import syft as sy + +# relative +from ..rate_limiter import is_within_rate_limit +from .data import query_dict + + +def extract_limit_value(sql_query: str) -> int: + # stdlib + import re + + limit_pattern = re.compile(r"\bLIMIT\s+(\d+)\b", re.IGNORECASE) + match = limit_pattern.search(sql_query) + if match: + return int(match.group(1)) + return None + + +def is_valid_sql(query: str) -> bool: + # stdlib + import sqlite3 + + # Prepare an in-memory SQLite database + conn = sqlite3.connect(":memory:") + cursor = conn.cursor() + + try: + # Use the EXPLAIN QUERY PLAN command to get the query plan + cursor.execute(f"EXPLAIN QUERY PLAN {query}") + except sqlite3.Error as e: + if "no such table" in str(e).lower(): + return True + return False + finally: + conn.close() + + +def adjust_dataframe_rows(df, target_rows: int): + # third party + import pandas as pd + + current_rows = len(df) + + if target_rows > current_rows: + # Repeat rows to match target_rows + repeat_times = (target_rows + current_rows - 1) // current_rows + df_expanded = pd.concat([df] * repeat_times, ignore_index=True).head( + target_rows + ) + else: + # Truncate rows to match target_rows + df_expanded = df.head(target_rows) + + return df_expanded + + +def make_test_query(settings: dict) -> Callable: + updated_settings = { + "calls_per_min": 10, + "rate_limiter_enabled": True, + "query_dict": query_dict, + } | settings + + # these are the same if you allow the rate limiter to be turned on and off + @sy.api_endpoint_method( + settings=updated_settings, + helper_functions=[ + is_within_rate_limit, + extract_limit_value, + is_valid_sql, + adjust_dataframe_rows, + ], + ) + def mock_test_query( + context, + sql_query: str, + ) -> str: + # stdlib + import datetime + + # third party + from google.api_core.exceptions import BadRequest + + # syft absolute + from syft import SyftException + + # Store a dict with the calltimes for each user, via the email. + if context.settings["rate_limiter_enabled"]: + if context.user.email not in context.state.keys(): + context.state[context.user.email] = [] + + if not context.code.is_within_rate_limit(context): + raise SyftException( + public_message="Rate limit of calls per minute has been reached." + ) + context.state[context.user.email].append(datetime.datetime.now()) + + bad_table = "invalid_table" + bad_post = ( + "BadRequest: 400 POST " + "https://bigquery.googleapis.com/bigquery/v2/projects/project-id/" + "queries?prettyPrint=false: " + ) + if bad_table in sql_query: + try: + raise BadRequest( + f'{bad_post} Table "{bad_table}" must be qualified ' + "with a dataset (e.g. dataset.table)." + ) + except Exception as e: + raise SyftException( + public_message=f"*must be qualified with a dataset*. {e}" + ) + + if not context.code.is_valid_sql(sql_query): + raise BadRequest( + f'{bad_post} Syntax error: Unexpected identifier "{sql_query}" at [1:1]' + ) + + # third party + import pandas as pd + + limit = context.code.extract_limit_value(sql_query) + if limit > 1_000_000: + raise SyftException( + public_message="Please only write queries that gather aggregate statistics" + ) + + base_df = pd.DataFrame(context.settings["query_dict"]) + + df = context.code.adjust_dataframe_rows(base_df, limit) + return df + + return mock_test_query diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/rate_limiter.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/rate_limiter.py new file mode 100644 index 00000000000..8ce319b61f4 --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/rate_limiter.py @@ -0,0 +1,16 @@ +def is_within_rate_limit(context) -> bool: + """Rate limiter for custom API calls made by users.""" + # stdlib + import datetime + + state = context.state + settings = context.settings + email = context.user.email + + current_time = datetime.datetime.now() + calls_last_min = [ + 1 if (current_time - call_time).seconds < 60 else 0 + for call_time in state[email] + ] + + return sum(calls_last_min) < settings.get("calls_per_min", 5) diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/submit_query.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/submit_query.py new file mode 100644 index 00000000000..a0125ee009b --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/apis/submit_query.py @@ -0,0 +1,42 @@ +# syft absolute +import syft as sy + + +def make_submit_query(settings, worker_pool): + updated_settings = {"user_code_worker": worker_pool} | settings + + @sy.api_endpoint( + path="bigquery.submit_query", + description="API endpoint that allows you to submit SQL queries to run on the private data.", + worker_pool=worker_pool, + settings=updated_settings, + ) + def submit_query( + context, + func_name: str, + query: str, + ) -> str: + # syft absolute + import syft as sy + + @sy.syft_function( + name=func_name, + input_policy=sy.MixedInputPolicy( + endpoint=sy.Constant( + val=context.admin_client.api.services.bigquery.test_query + ), + query=sy.Constant(val=query), + client=context.admin_client, + ), + worker_pool_name=context.settings["user_code_worker"], + ) + def execute_query(query: str, endpoint): + res = endpoint(sql_query=query) + return res + + request = context.user_client.code.request_code_execution(execute_query) + context.admin_client.requests.set_tags(request, ["autosync"]) + + return f"Query submitted {request}. Use `client.code.{func_name}()` to run your query" + + return submit_query diff --git a/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/sync_helpers.py b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/sync_helpers.py new file mode 100644 index 00000000000..e1d558016ba --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/0.9.1_notebooks/sync_helpers.py @@ -0,0 +1,190 @@ +# third party +from tqdm import tqdm + +# syft absolute +import syft as sy +from syft.client.datasite_client import DatasiteClient +from syft.client.syncing import compare_clients +from syft.service.code.user_code import UserCode +from syft.service.job.job_stash import Job +from syft.service.job.job_stash import JobStatus +from syft.service.request.request import Request +from syft.service.request.request import RequestStatus +from syft.service.sync.diff_state import ObjectDiffBatch +from syft.types.result import Err + + +def deny_requests_without_autosync_tag(client_low: DatasiteClient): + # Deny all requests that are not autosync + requests = client_low.requests.get_all() + if isinstance(requests, sy.SyftError): + print(requests) + return + + denied_requests = [] + for request in tqdm(requests): + if request.status != RequestStatus.PENDING: + continue + if "autosync" not in request.tags: + request.deny( + reason="This request has been denied automatically. " + "Please use the designated API to submit your request." + ) + denied_requests.append(request.id) + print(f"Denied {len(denied_requests)} requests without autosync tag") + + +def is_request_to_sync(batch: ObjectDiffBatch) -> bool: + # True if this is a new low-side request + # TODO add condition for sql requests/usercodes + low_request = batch.root.low_obj + return ( + isinstance(low_request, Request) + and batch.status == "NEW" + and "autosync" in low_request.tags + ) + + +def is_job_to_sync(batch: ObjectDiffBatch): + # True if this is a new high-side job that is either COMPLETED or ERRORED + if batch.status != "NEW": + return False + if not isinstance(batch.root.high_obj, Job): + return False + job = batch.root.high_obj + return job.status in (JobStatus.ERRORED, JobStatus.COMPLETED) + + +def execute_requests( + client_high: DatasiteClient, request_ids: list[sy.UID] +) -> dict[sy.UID, Job]: + jobs_by_request_id = {} + for request_id in request_ids: + request = client_high.requests.get_by_uid(request_id) + if not isinstance(request, Request): + continue + + code = request.code + if not isinstance(code, UserCode): + continue + + func_name = request.code.service_func_name + api_func = getattr(client_high.code, func_name, None) + if api_func is None: + continue + + job = api_func(blocking=False) + jobs_by_request_id[request_id] = job + + return jobs_by_request_id + + +def deny_failed_jobs( + client_low: DatasiteClient, + jobs: list[Job], +) -> None: + # NOTE no syncing is needed, requests are denied on the low side + denied_requests = [] + + for job in jobs: + if job.status != JobStatus.ERRORED: + continue + + error_result = job.result + if isinstance(error_result, Err): + error_msg = error_result.err_value + else: + error_msg = "An unknown error occurred, please check the Job logs for more information." + + code_id = job.user_code_id + if code_id is None: + continue + requests = client_low.requests.get_by_usercode_id(code_id) + if isinstance(requests, list) and len(requests) > 0: + request = requests[0] + request.deny(reason=f"Execution failed: {error_msg}") + denied_requests.append(request.id) + else: + print(f"Failed to deny request for job {job.id}") + + print(f"Denied {len(denied_requests)} failed requests") + + +def sync_finished_jobs( + client_low: DatasiteClient, + client_high: DatasiteClient, +) -> dict[sy.UID, sy.SyftError | sy.SyftSuccess] | sy.SyftError: + sync_job_results = {} + synced_jobs = [] + diff = compare_clients( + from_client=client_high, to_client=client_low, include_types=["job"] + ) + if isinstance(diff, sy.SyftError): + print(diff) + return diff + + for batch in diff.batches: + if is_job_to_sync(batch): + job = batch.root.high_obj + + w = batch.resolve(build_state=False) + share_result = w.click_share_all_private_data() + if isinstance(share_result, sy.SyftError): + sync_job_results[job.id] = share_result + continue + sync_result = w.click_sync() + + synced_jobs.append(job) + sync_job_results[job.id] = sync_result + + print(f"Sharing {len(sync_job_results)} new results") + deny_failed_jobs(client_low, synced_jobs) + return sync_job_results + + +def sync_new_requests( + client_low: DatasiteClient, + client_high: DatasiteClient, +) -> dict[sy.UID, sy.SyftSuccess | sy.SyftError] | sy.SyftError: + sync_request_results = {} + diff = compare_clients( + from_client=client_low, to_client=client_high, include_types=["request"] + ) + if isinstance(diff, sy.SyftError): + print(diff) + return sync_request_results + print(f"{len(diff.batches)} request batches found") + for batch in tqdm(diff.batches): + if is_request_to_sync(batch): + request_id = batch.root.low_obj.id + w = batch.resolve(build_state=False) + result = w.click_sync() + sync_request_results[request_id] = result + return sync_request_results + + +def sync_and_execute_new_requests( + client_low: DatasiteClient, client_high: DatasiteClient +) -> None: + sync_results = sync_new_requests(client_low, client_high) + if isinstance(sync_results, sy.SyftError): + print(sync_results) + return + + request_ids = [ + uid for uid, res in sync_results.items() if isinstance(res, sy.SyftSuccess) + ] + print(f"Synced {len(request_ids)} new requests") + + jobs_by_request = execute_requests(client_high, request_ids) + print(f"Started {len(jobs_by_request)} new jobs") + + +def auto_sync(client_low: DatasiteClient, client_high: DatasiteClient) -> None: + print("Starting auto sync") + print("Denying non tagged jobs") + deny_requests_without_autosync_tag(client_low) + print("Syncing and executing") + sync_and_execute_new_requests(client_low, client_high) + sync_finished_jobs(client_low, client_high) + print("Finished auto sync") diff --git a/notebooks/scenarios/bigquery/upgradability/sync/1-dump-database-to-file.ipynb b/notebooks/scenarios/bigquery/upgradability/sync/1-dump-database-to-file.ipynb new file mode 100644 index 00000000000..09b02ee5ddd --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/1-dump-database-to-file.ipynb @@ -0,0 +1,201 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "0", + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import os\n", + "from pathlib import Path\n", + "\n", + "# syft absolute\n", + "import syft as sy" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "server_low = sy.orchestra.launch(\n", + " name=\"bigquery-low\",\n", + " server_side_type=\"low\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")\n", + "\n", + "server_high = sy.orchestra.launch(\n", + " name=\"bigquery-high\",\n", + " server_side_type=\"high\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=\"auto\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [], + "source": [ + "low_client = server_low.login(email=\"info@openmined.org\", password=\"changethis\")\n", + "high_client = server_high.login(email=\"info@openmined.org\", password=\"changethis\")" + ] + }, + { + "cell_type": "markdown", + "id": "3", + "metadata": {}, + "source": [ + "# Dump low side" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "4", + "metadata": {}, + "outputs": [], + "source": [ + "migration_data_dir = Path(os.getenv(\"MIGRATION_DATA_DIR\", \".\"))\n", + "migration_data_dir.mkdir(exist_ok=True)\n", + "\n", + "low_blob_path = migration_data_dir / \"migration_low.blob\"\n", + "low_yaml_path = migration_data_dir / \"migration_low.yaml\"\n", + "\n", + "low_blob_path.unlink(missing_ok=True)\n", + "low_yaml_path.unlink(missing_ok=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "# Dump low state\n", + "\n", + "low_migration_data = low_client.get_migration_data(include_blobs=True)\n", + "low_migration_data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "low_migration_data.save(path=low_blob_path, yaml_path=low_yaml_path)" + ] + }, + { + "cell_type": "markdown", + "id": "7", + "metadata": {}, + "source": [ + "# Dump high side" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "high_blob_path = migration_data_dir / \"migration_high.blob\"\n", + "high_yaml_path = migration_data_dir / \"migration_high.yaml\"\n", + "\n", + "high_blob_path.unlink(missing_ok=True)\n", + "high_yaml_path.unlink(missing_ok=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "9", + "metadata": {}, + "outputs": [], + "source": [ + "high_migration_data = high_client.get_migration_data(include_blobs=True)\n", + "high_migration_data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "high_migration_data.save(path=high_blob_path, yaml_path=high_yaml_path)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "11", + "metadata": {}, + "outputs": [], + "source": [ + "for path in (low_blob_path, low_yaml_path, high_blob_path, high_yaml_path):\n", + " assert path.exists(), f\"Migration file {path} does not exist\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12", + "metadata": {}, + "outputs": [], + "source": [ + "if server_high.server_type.value == \"python\":\n", + " server_high.land()\n", + "\n", + "if server_low.server_type.value == \"python\":\n", + " server_low.land()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13", + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.10.13" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/scenarios/bigquery/upgradability/sync/2-migrate-for-scenarios.ipynb b/notebooks/scenarios/bigquery/upgradability/sync/2-migrate-for-scenarios.ipynb new file mode 100644 index 00000000000..326eb8c62cd --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/2-migrate-for-scenarios.ipynb @@ -0,0 +1,403 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import os\n", + "from os import environ as env\n", + "from pathlib import Path\n", + "\n", + "# syft absolute\n", + "import syft as sy" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import sys\n", + "\n", + "# syft absolute\n", + "from syft.util.util import find_base_dir_with_tox_ini\n", + "from syft.util.util import get_caller_file_path\n", + "from syft.util.util import is_interpreter_jupyter\n", + "\n", + "\n", + "def add_helper_path_to_python_path() -> None:\n", + " current_path = \".\"\n", + "\n", + " # jupyter uses \".\" which resolves to the notebook\n", + " if not is_interpreter_jupyter():\n", + " # python uses the file which has from syft import test_settings in it\n", + " import_path = get_caller_file_path()\n", + " if import_path:\n", + " current_path = import_path\n", + "\n", + " base_dir = find_base_dir_with_tox_ini(current_path)\n", + " notebook_helper_path = os.path.join(base_dir, \"notebooks/notebook_helpers\")\n", + " sys.path.append(notebook_helper_path)\n", + "\n", + "\n", + "add_helper_path_to_python_path()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# third party\n", + "from email_helpers import load_users\n", + "from job_helpers import load_jobs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# when in k8s these are the default values\n", + "ROOT_EMAIL = \"admin@bigquery.org\"\n", + "ROOT_PASSWORD = \"bqpw\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# in case we are not in k8s we set them here for orchestra to use\n", + "env[\"DEFAULT_ROOT_EMAIL\"] = ROOT_EMAIL\n", + "env[\"DEFAULT_ROOT_PASSWORD\"] = ROOT_PASSWORD" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Login" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server = sy.orchestra.launch(\n", + " name=\"bigquery-high\",\n", + " dev_mode=True,\n", + " server_side_type=\"high\",\n", + " reset=True,\n", + " port=\"8080\",\n", + " n_consumers=1, # How many workers to be spawned\n", + " create_producer=True, # Can produce more workers\n", + ")\n", + "\n", + "client = sy.login(url=\"http://localhost:8080\", email=ROOT_EMAIL, password=ROOT_PASSWORD)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Check if this is a new server\n", + "migration_data = client.get_migration_data()\n", + "\n", + "# assert len(migration_data.store_objects[User]) == 1\n", + "# assert UserCode not in migration_data.store_objects" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "migration_data" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Load migration data" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "!pwd" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# migration_data_dir = Path(os.getenv(\"MIGRATION_DATA_DIR\", \".\"))\n", + "migration_data_dir = Path(\"/home/teo/OpenMined/PySyft/.tox/.tmp/migration\")\n", + "blob_path = migration_data_dir / \"migration.blob\"\n", + "yaml_path = migration_data_dir / \"migration.yaml\"\n", + "\n", + "print(f\"Loading migration data from {str(blob_path.resolve())}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res = client.load_migration_data(blob_path)\n", + "assert isinstance(res, sy.SyftSuccess), res.message" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "sy.upgrade_custom_workerpools(client, blob_path, mode=\"auto\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Post migration tests" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "\"\"\"\n", + "TODO:\n", + " * verify users\n", + " * login\n", + " * check every role\n", + " * mostly check on lengths\n", + " * can a DS see the results of their old jobs/logs\n", + " * still use the api schema both mock \n", + " * still submit a new query via submit_query\n", + " * can admin still approve and approve_by_running, deny\n", + " * check on old broken queries\n", + " * create a new broken query\n", + " * can ds get the results of the new queries\n", + " * emails should work now\n", + " * test in k8s (both L2 and L0)\n", + " * test in L0 (migrate both nodes?)\n", + "\"\"\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "users = load_users(client, path=\"0.9.1_notebooks/users.json\")\n", + "jobs = load_jobs(users, client, filepath=\"0.9.1_notebooks/jobs.json\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Check users" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# TODO fix users??" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "client.users" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "users" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Old jobs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# submitted_jobs = [job for job in jobs if job.is_submitted]\n", + "reviewed_jobs = [job for job in jobs if job.admin_reviewed]\n", + "reviewed_jobs_should_succeed = [j for j in reviewed_jobs if j.should_succeed]\n", + "reviewed_jobs_should_fail = [j for j in reviewed_jobs if not j.should_succeed]\n", + "\n", + "print(\n", + " f\"{len(reviewed_jobs)=}, {len(reviewed_jobs_should_succeed)=}, {len(reviewed_jobs_should_fail)=}\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for job in reviewed_jobs_should_succeed:\n", + " print(f\"> Checking job: {job.job_type} {job.func_name} for user {job.user_email}\")\n", + " api_method = job.code_method\n", + " j = api_method(blocking=False)\n", + " res = j.wait()\n", + "\n", + " if isinstance(res, sy.SyftError):\n", + " raise sy.SyftException(public_message=\"Expected success, got error\")\n", + "\n", + " result = res.get()\n", + " job.result_as_expected = True" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for job in reviewed_jobs_should_fail:\n", + " print(f\"> Checking job: {job.job_type} {job.func_name} for user {job.user_email}\")\n", + " api_method = job.code_method\n", + "\n", + " j = api_method(blocking=False)\n", + " res = j.wait()\n", + " if isinstance(res, sy.SyftError):\n", + " job.result_as_expected = True\n", + " else:\n", + " raise sy.SyftException(public_message=f\"failed, job didnt raise {type(j)}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "expected_jobs = [job for job in jobs if job.result_as_expected]\n", + "print(f\"got expected_jobs: {len(expected_jobs)} == reviewed_jobs: {len(reviewed_jobs)}\")\n", + "assert len(reviewed_jobs) == len(expected_jobs)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Use old DS to go through the flow again" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ds_client = users[0].client" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Check on emails now?" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "syft_3.12", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/notebooks/scenarios/bigquery/upgradability/sync/migration_high.blob b/notebooks/scenarios/bigquery/upgradability/sync/migration_high.blob new file mode 100644 index 00000000000..4143e59c7af Binary files /dev/null and b/notebooks/scenarios/bigquery/upgradability/sync/migration_high.blob differ diff --git a/notebooks/scenarios/bigquery/upgradability/sync/migration_high.yaml b/notebooks/scenarios/bigquery/upgradability/sync/migration_high.yaml new file mode 100644 index 00000000000..c3819a0c46a --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/migration_high.yaml @@ -0,0 +1,6 @@ +server: + env: + - name: SERVER_UID + value: fbdf5a287e58454cbbd3fac4ad744d37 + - name: SERVER_PRIVATE_KEY + value: fcfd09deed32e3574558b6719fed46e0b8fd957d59608e9d8b42ef07c6080d3e diff --git a/notebooks/scenarios/bigquery/upgradability/sync/migration_low.blob b/notebooks/scenarios/bigquery/upgradability/sync/migration_low.blob new file mode 100644 index 00000000000..6abeef2b057 Binary files /dev/null and b/notebooks/scenarios/bigquery/upgradability/sync/migration_low.blob differ diff --git a/notebooks/scenarios/bigquery/upgradability/sync/migration_low.yaml b/notebooks/scenarios/bigquery/upgradability/sync/migration_low.yaml new file mode 100644 index 00000000000..c950671118c --- /dev/null +++ b/notebooks/scenarios/bigquery/upgradability/sync/migration_low.yaml @@ -0,0 +1,6 @@ +server: + env: + - name: SERVER_UID + value: 4a471a09f56b4a1d809c0a7614074283 + - name: SERVER_PRIVATE_KEY + value: 3c095c07c94d7f7aec863d61641c71c467cee08cf9a44120a9cb7a493def22cc