diff --git a/notebooks/scenarios/bigquery/sync/000-setup-high-low-datasites.ipynb b/notebooks/scenarios/bigquery/sync/000-setup-high-low-datasites.ipynb index 3fe3966ba5d..1b07c9476c0 100644 --- a/notebooks/scenarios/bigquery/sync/000-setup-high-low-datasites.ipynb +++ b/notebooks/scenarios/bigquery/sync/000-setup-high-low-datasites.ipynb @@ -1,331 +1,345 @@ { - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# stdlib\n", - "import os\n", - "\n", - "# Testing works over 4 possibilities\n", - "# 1. (python/in-memory workers and using tox commands)\n", - "# 2. (python/in-memory workers and manually running notebooks)\n", - "# 3. (using k8s and using tox commands)\n", - "# 4. (using k8s and manually running notebooks)\n", - "# Uncomment the lines below if in the 4th possibility\n", - "\n", - "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", - "# os.environ[\"DEV_MODE\"] = \"True\"\n", - "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", - "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", - "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# stdlib\n", - "environment = os.environ.get(\"ORCHESTRA_DEPLOYMENT_TYPE\", \"python\")\n", - "high_port = os.environ.get(\"CLUSTER_HTTP_PORT_HIGH\", \"9081\")\n", - "low_port = os.environ.get(\"CLUSTER_HTTP_PORT_LOW\", \"9083\")\n", - "print(environment, high_port, low_port)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# syft absolute\n", - "import syft as sy\n", - "from syft import test_settings\n", - "from syft.util.test_helpers.worker_helpers import (\n", - " build_and_launch_worker_pool_from_docker_str,\n", - ")\n", - "from syft.util.test_helpers.worker_helpers import (\n", - " launch_worker_pool_from_docker_tag_and_registry,\n", - ")\n", - "from syft.util.test_helpers.email_helpers import get_email_server\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Launch server & login" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "server_low = sy.orchestra.launch(\n", - " name=\"bigquery-low\",\n", - " server_side_type=\"low\",\n", - " dev_mode=True,\n", - " reset=True,\n", - " n_consumers=1,\n", - " create_producer=True,\n", - " port=low_port,\n", - ")\n", - "\n", - "server_high = sy.orchestra.launch(\n", - " name=\"bigquery-high\",\n", - " server_side_type=\"high\",\n", - " dev_mode=True,\n", - " reset=True,\n", - " n_consumers=1,\n", - " create_producer=True,\n", - " port=high_port,\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Email Server" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "email_server_low, smtp_server_low = get_email_server(reset=True, server_side_type=\"low\")\n", - "email_server_high, smtp_server_high = get_email_server(\n", - " reset=True, server_side_type=\"high\"\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "low_client = server_low.login(email=\"info@openmined.org\", password=\"changethis\")\n", - "high_client = server_high.login(email=\"info@openmined.org\", password=\"changethis\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(high_client.worker_pools.get_all()) == 1\n", - "assert len(low_client.worker_pools.get_all()) == 1" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Setup High First\n", - "\n", - "- If using an external registery, we want to get this from the test_settings.\n", - "- We build the docker image over the base docker image in Syft\n", - "- We give a tag called worker-bigquery to our custom pool image" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "external_registry = test_settings.get(\"external_registry\", default=\"docker.io\")\n", - "\n", - "base_worker_image = high_client.images.get_all()[0]\n", - "\n", - "worker_dockerfile = f\"\"\"\n", - "FROM {str(base_worker_image.image_identifier)}\n", - "\n", - "RUN uv pip install db-dtypes google-cloud-bigquery \n", - "\n", - "\"\"\".strip()\n", - "\n", - "docker_tag = str(base_worker_image.image_identifier).replace(\n", - " \"backend\", \"worker-bigquery\"\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "worker_pool_name = \"bigquery-pool\"\n", - "custom_pool_pod_annotations = {\"bigquery-custom-pool\": \"Pod annotation for bigquery\"}\n", - "custom_pool_pod_labels = {\"bigquery-custom-pool\": \"Pod_label_for_bigquery\"}" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "build_and_launch_worker_pool_from_docker_str(\n", - " environment=environment,\n", - " client=high_client,\n", - " worker_pool_name=worker_pool_name,\n", - " custom_pool_pod_annotations=custom_pool_pod_annotations,\n", - " custom_pool_pod_labels=custom_pool_pod_labels,\n", - " worker_dockerfile=worker_dockerfile,\n", - " external_registry=external_registry,\n", - " docker_tag=docker_tag,\n", - " scale_to=2,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(high_client.worker_pools.get_all()) == 2" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "high_client.settings.allow_guest_signup(enable=False)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Setup Low" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "launch_result = launch_worker_pool_from_docker_tag_and_registry(\n", - " environment=environment,\n", - " client=low_client,\n", - " worker_pool_name=worker_pool_name,\n", - " custom_pool_pod_annotations=custom_pool_pod_annotations,\n", - " custom_pool_pod_labels=custom_pool_pod_labels,\n", - " docker_tag=docker_tag,\n", - " external_registry=external_registry,\n", - " scale_to=1,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(low_client.worker_pools.get_all()) == 2" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Register a DS only on the low side" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "low_client.register(\n", - " email=\"data_scientist@openmined.org\",\n", - " password=\"verysecurepassword\",\n", - " password_verify=\"verysecurepassword\",\n", - " name=\"John Doe\",\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "low_client.settings.allow_guest_signup(enable=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert (\n", - " len(low_client.api.services.user.get_all()) == 2\n", - "), \"Only DS and Admin should be at low side\"\n", - "assert (\n", - " len(high_client.api.services.user.get_all()) == 1\n", - "), \"Only Admin should be at high side\"" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Close" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "if environment != \"remote\":\n", - " server_high.land()\n", - " server_low.land()\n", - "smtp_server_low.stop()\n", - "smtp_server_high.stop()" - ] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.4" - } - }, - "nbformat": 4, - "nbformat_minor": 4 + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import os\n", + "\n", + "# Testing works over 4 possibilities\n", + "# 1. (python/in-memory workers and using tox commands)\n", + "# 2. (python/in-memory workers and manually running notebooks)\n", + "# 3. (using k8s and using tox commands)\n", + "# 4. (using k8s and manually running notebooks)\n", + "# Uncomment the lines below if in the 4th possibility\n", + "\n", + "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", + "# os.environ[\"DEV_MODE\"] = \"True\"\n", + "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "environment = os.environ.get(\"ORCHESTRA_DEPLOYMENT_TYPE\", \"python\")\n", + "high_port = os.environ.get(\"CLUSTER_HTTP_PORT_HIGH\", \"9081\")\n", + "low_port = os.environ.get(\"CLUSTER_HTTP_PORT_LOW\", \"9083\")\n", + "print(environment, high_port, low_port)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "import syft as sy\n", + "from syft import test_settings\n", + "from syft.util.test_helpers.email_helpers import get_email_server\n", + "from syft.util.test_helpers.worker_helpers import (\n", + " build_and_launch_worker_pool_from_docker_str,\n", + ")\n", + "from syft.util.test_helpers.worker_helpers import (\n", + " launch_worker_pool_from_docker_tag_and_registry,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Launch server & login" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_low = sy.orchestra.launch(\n", + " name=\"bigquery-low\",\n", + " server_side_type=\"low\",\n", + " dev_mode=True,\n", + " reset=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=low_port,\n", + ")\n", + "\n", + "server_high = sy.orchestra.launch(\n", + " name=\"bigquery-high\",\n", + " server_side_type=\"high\",\n", + " dev_mode=True,\n", + " reset=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=high_port,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Email Server" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "email_server_low, smtp_server_low = get_email_server(reset=True, server_side_type=\"low\")\n", + "email_server_high, smtp_server_high = get_email_server(\n", + " reset=True, server_side_type=\"high\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "low_client = server_low.login(email=\"info@openmined.org\", password=\"changethis\")\n", + "high_client = server_high.login(email=\"info@openmined.org\", password=\"changethis\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(high_client.worker_pools.get_all()) == 1\n", + "assert len(low_client.worker_pools.get_all()) == 1" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Setup High First\n", + "\n", + "- If using an external registery, we want to get this from the test_settings.\n", + "- We build the docker image over the base docker image in Syft\n", + "- We give a tag called worker-bigquery to our custom pool image" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "external_registry = test_settings.get(\"external_registry\", default=\"docker.io\")\n", + "\n", + "base_worker_image = high_client.images.get_all()[0]\n", + "\n", + "worker_dockerfile = f\"\"\"\n", + "FROM {str(base_worker_image.image_identifier)}\n", + "\n", + "RUN uv pip install db-dtypes google-cloud-bigquery \n", + "\n", + "\"\"\".strip()\n", + "\n", + "docker_tag = str(base_worker_image.image_identifier).replace(\n", + " \"backend\", \"worker-bigquery\"\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "worker_pool_name = \"bigquery-pool\"\n", + "custom_pool_pod_annotations = {\"bigquery-custom-pool\": \"Pod annotation for bigquery\"}\n", + "custom_pool_pod_labels = {\"bigquery-custom-pool\": \"Pod_label_for_bigquery\"}" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "build_and_launch_worker_pool_from_docker_str(\n", + " environment=environment,\n", + " client=high_client,\n", + " worker_pool_name=worker_pool_name,\n", + " custom_pool_pod_annotations=custom_pool_pod_annotations,\n", + " custom_pool_pod_labels=custom_pool_pod_labels,\n", + " worker_dockerfile=worker_dockerfile,\n", + " external_registry=external_registry,\n", + " docker_tag=docker_tag,\n", + " scale_to=2,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(high_client.worker_pools.get_all()) == 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.settings.allow_guest_signup(enable=False)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Setup Low" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "launch_result = launch_worker_pool_from_docker_tag_and_registry(\n", + " environment=environment,\n", + " client=low_client,\n", + " worker_pool_name=worker_pool_name,\n", + " custom_pool_pod_annotations=custom_pool_pod_annotations,\n", + " custom_pool_pod_labels=custom_pool_pod_labels,\n", + " docker_tag=docker_tag,\n", + " external_registry=external_registry,\n", + " scale_to=1,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(low_client.worker_pools.get_all()) == 2" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Register a DS only on the low side" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "low_client.register(\n", + " email=\"data_scientist@openmined.org\",\n", + " password=\"verysecurepassword\",\n", + " password_verify=\"verysecurepassword\",\n", + " name=\"John Doe\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "low_client.settings.allow_guest_signup(enable=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert (\n", + " len(low_client.api.services.user.get_all()) == 2\n", + "), \"Only DS and Admin should be at low side\"\n", + "assert (\n", + " len(high_client.api.services.user.get_all()) == 1\n", + "), \"Only Admin should be at high side\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Close" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if environment != \"remote\":\n", + " server_high.land()\n", + " server_low.land()\n", + "smtp_server_low.stop()\n", + "smtp_server_high.stop()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 4 } diff --git a/notebooks/scenarios/bigquery/sync/001-scale-delete-worker-pools.ipynb b/notebooks/scenarios/bigquery/sync/001-scale-delete-worker-pools.ipynb index 6ea1af97242..ac6f4a1a395 100644 --- a/notebooks/scenarios/bigquery/sync/001-scale-delete-worker-pools.ipynb +++ b/notebooks/scenarios/bigquery/sync/001-scale-delete-worker-pools.ipynb @@ -79,20 +79,20 @@ { "cell_type": "code", "execution_count": null, - "id": "6", + "id": "5", "metadata": {}, "outputs": [], "source": [ - "email_server_low, smtp_server_low = get_email_server(server_side_type=\"low\")\n", + "email_server_low, smtp_server_low = get_email_server(server_side_type=\"low\", reset=True)\n", "email_server_high, smtp_server_high = get_email_server(\n", - " server_side_type=\"high\"\n", + " server_side_type=\"high\", reset=True\n", ")" ] }, { "cell_type": "code", "execution_count": null, - "id": "5", + "id": "6", "metadata": {}, "outputs": [], "source": [ @@ -334,7 +334,7 @@ { "cell_type": "code", "execution_count": null, - "id": "27", + "id": "26", "metadata": {}, "outputs": [], "source": [ @@ -343,6 +343,14 @@ "smtp_server_low.stop()\n", "smtp_server_high.stop()" ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27", + "metadata": {}, + "outputs": [], + "source": [] } ], "metadata": { diff --git a/notebooks/scenarios/bigquery/sync/011-users-emails-passwords.ipynb b/notebooks/scenarios/bigquery/sync/011-users-emails-passwords.ipynb index fc52c3ba391..1cb7887fe4b 100644 --- a/notebooks/scenarios/bigquery/sync/011-users-emails-passwords.ipynb +++ b/notebooks/scenarios/bigquery/sync/011-users-emails-passwords.ipynb @@ -23,21 +23,16 @@ "metadata": {}, "outputs": [], "source": [ - "# isort: off\n", "# stdlib\n", "import os\n", "\n", "# syft absolute\n", "import syft as sy\n", - "from syft import test_helpers # noqa: F401\n", - "\n", - "# third party\n", - "from email_helpers import SENDER\n", - "from email_helpers import create_user\n", - "from email_helpers import get_email_server\n", - "from email_helpers import make_user\n", - "from email_helpers import save_users\n", - "# isort: on" + "from syft.util.test_helpers.email_helpers import SENDER\n", + "from syft.util.test_helpers.email_helpers import create_user\n", + "from syft.util.test_helpers.email_helpers import get_email_server\n", + "from syft.util.test_helpers.email_helpers import make_user\n", + "from syft.util.test_helpers.email_helpers import save_users" ] }, { @@ -661,7 +656,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.3" + "version": "3.12.4" } }, "nbformat": 4, diff --git a/notebooks/scenarios/bigquery/sync/020-configure-api-and-sync.ipynb b/notebooks/scenarios/bigquery/sync/020-configure-api-and-sync.ipynb index bdf917e83ea..ed6bfcb0399 100644 --- a/notebooks/scenarios/bigquery/sync/020-configure-api-and-sync.ipynb +++ b/notebooks/scenarios/bigquery/sync/020-configure-api-and-sync.ipynb @@ -1,694 +1,701 @@ { - "cells": [ - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# stdlib\n", - "import os\n", - "\n", - "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", - "# os.environ[\"DEV_MODE\"] = \"True\"\n", - "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", - "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", - "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# stdlib\n", - "\n", - "environment = os.environ.get(\"ORCHESTRA_DEPLOYMENT_TYPE\", \"python\")\n", - "high_port = os.environ.get(\"CLUSTER_HTTP_PORT_HIGH\", \"9081\")\n", - "low_port = os.environ.get(\"CLUSTER_HTTP_PORT_LOW\", \"9083\")\n", - "print(environment, high_port, low_port)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# stdlib\n", - "\n", - "# set to use the live APIs\n", - "# import os\n", - "# os.environ[\"TEST_BIGQUERY_APIS_LIVE\"] = \"True\"\n", - "# third party\n", - "import pandas as pd\n", - "\n", - "# syft absolute\n", - "import syft as sy\n", - "from syft import test_settings\n", - "from syft.client.syncing import compare_clients\n", - "from syft.util.test_helpers.apis import make_schema\n", - "from syft.util.test_helpers.apis import make_submit_query\n", - "from syft.util.test_helpers.apis import make_test_query\n", - "from syft.util.test_helpers.email_helpers import get_email_server\n" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Launch server and login" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "server_low = sy.orchestra.launch(\n", - " name=\"bigquery-low\",\n", - " server_side_type=\"low\",\n", - " dev_mode=True,\n", - " n_consumers=1,\n", - " create_producer=True,\n", - " port=low_port,\n", - ")\n", - "\n", - "server_high = sy.orchestra.launch(\n", - " name=\"bigquery-high\",\n", - " server_side_type=\"high\",\n", - " dev_mode=True,\n", - " n_consumers=1,\n", - " create_producer=True,\n", - " port=high_port,\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Email Server" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "email_server_low, smtp_server_low = get_email_server(server_side_type=\"low\")\n", - "email_server_high, smtp_server_high = get_email_server(server_side_type=\"high\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "low_client = server_low.login(email=\"info@openmined.org\", password=\"changethis\")\n", - "high_client = server_high.login(email=\"info@openmined.org\", password=\"changethis\")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(high_client.worker_pools.get_all()) == 2\n", - "assert len(low_client.worker_pools.get_all()) == 2" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "this_worker_pool_name = \"bigquery-pool\"" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Load database information from test_settings" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "dataset_1 = test_settings.get(\"dataset_1\", default=\"dataset_1\")\n", - "dataset_2 = test_settings.get(\"dataset_2\", default=\"dataset_2\")\n", - "table_1 = test_settings.get(\"table_1\", default=\"table_1\")\n", - "table_2 = test_settings.get(\"table_2\", default=\"table_2\")\n", - "table_2_col_id = test_settings.get(\"table_2_col_id\", default=\"table_id\")\n", - "table_2_col_score = test_settings.get(\"table_2_col_score\", default=\"colname\")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Create and test different endpoints" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "----" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create `biquery.schema` endpoint" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "schema_function = make_schema(\n", - " settings={\n", - " \"calls_per_min\": 5,\n", - " },\n", - " worker_pool=this_worker_pool_name,\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "high_client.custom_api.add(endpoint=schema_function)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "result = high_client.api.services.bigquery.schema()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(result) == 23" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "TODO: Note that when we do not create a job, the type of result is `syft.service.action.pandas.PandasDataFrameObject` and not pandas but the `.get()` method will get you the expected answer" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# syft absolute\n", - "from syft.service.action.pandas import PandasDataFrameObject\n", - "\n", - "# assert isinstance(result, pd.DataFrame)\n", - "assert isinstance(result, PandasDataFrameObject)\n", - "assert isinstance(result.get(), pd.DataFrame)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "____" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create `biquery.test_query` endpoint" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "mock_func = make_test_query(\n", - " settings={\n", - " \"rate_limiter_enabled\": True,\n", - " \"calls_per_min\": 10,\n", - " }\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "private_func = make_test_query(\n", - " settings={\n", - " \"rate_limiter_enabled\": False,\n", - " }\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "new_endpoint = sy.TwinAPIEndpoint(\n", - " path=\"bigquery.test_query\",\n", - " description=\"This endpoint allows to query Bigquery storage via SQL queries.\",\n", - " private_function=private_func,\n", - " mock_function=mock_func,\n", - " worker_pool=this_worker_pool_name,\n", - ")\n", - "\n", - "high_client.custom_api.add(endpoint=new_endpoint)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Some features for updating endpoint" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Here, we update the endpoint to timeout after 100s (rather the default of 60s)\n", - "high_client.api.services.api.update(\n", - " endpoint_path=\"bigquery.test_query\", endpoint_timeout=120\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "high_client.api.services.api.update(\n", - " endpoint_path=\"bigquery.test_query\", hide_mock_definition=True\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "#### Test the `bigquery.test_query` endpoint" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Test mock version\n", - "result = high_client.api.services.bigquery.test_query.mock(\n", - " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 10\"\n", - ")\n", - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(result) == 10" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Test mock version for wrong queries\n", - "with sy.raises(\n", - " sy.SyftException(public_message=\"*must be qualified with a dataset*\"), show=True\n", - "):\n", - " high_client.api.services.bigquery.test_query.mock(\n", - " sql_query=\"SELECT * FROM invalid_table LIMIT 1\"\n", - " )" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Test private version\n", - "result = high_client.api.services.bigquery.test_query.private(\n", - " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 12\"\n", - ")\n", - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(result) == 12" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "____" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "### Create `submit_query` endpoint" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "submit_query_function = make_submit_query(\n", - " settings={}, worker_pool=this_worker_pool_name\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "high_client.custom_api.add(endpoint=submit_query_function)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "high_client.api.services.api.update(\n", - " endpoint_path=\"bigquery.submit_query\", hide_mock_definition=True\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# Testing submit query\n", - "result = high_client.api.services.bigquery.submit_query(\n", - " func_name=\"my_func\",\n", - " query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 2\",\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert \"Query submitted\" in result\n", - "result" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "job = high_client.code.my_func(blocking=False)" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "res = job.wait().get()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert isinstance(res, pd.DataFrame)" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Test endpoints" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "high_client.custom_api.api_endpoints()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(high_client.custom_api.api_endpoints()) == 3" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert (\n", - " high_client.api.services.bigquery.test_query\n", - " and high_client.api.services.bigquery.submit_query\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Syncing" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "diff = compare_clients(\n", - " from_client=high_client, to_client=low_client, hide_usercode=False\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# TODO verify that jobs are actually filtered out\n", - "# TODO we need to think about whether its possible for the admin to create more data here that would break sync" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "widget = diff.resolve()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# TODO maybe see if non-internal method we can use or make it public\n", - "widget._share_all()\n", - "widget._sync_all()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(low_client.jobs.get_all()) == 0" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(low_client.custom_api.api_endpoints()) == 3" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert len(high_client.custom_api.api_endpoints()) == 3" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Test emails" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "# email_server_low.get_emails_for_user(user_email=\"info@openmined.org\")\n", - "assert len(email_server_high.get_emails_for_user(user_email=\"info@openmined.org\")) == 2" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert (\n", - " \"Job Failed\"\n", - " in email_server_high.get_emails_for_user(user_email=\"info@openmined.org\")[\n", - " 0\n", - " ].email_content\n", - ")" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "assert (\n", - " \"A new request has been submitted and requires your attention\"\n", - " in email_server_high.get_emails_for_user(user_email=\"info@openmined.org\")[\n", - " 1\n", - " ].email_content\n", - ")" - ] - }, - { - "cell_type": "markdown", - "metadata": {}, - "source": [ - "# Clean up" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [ - "if environment != \"remote\":\n", - " server_high.land()\n", - " server_low.land()\n", - "smtp_server_low.stop()\n", - "smtp_server_high.stop()" - ] - }, - { - "cell_type": "code", - "execution_count": null, - "metadata": {}, - "outputs": [], - "source": [] - } - ], - "metadata": { - "kernelspec": { - "display_name": "Python 3 (ipykernel)", - "language": "python", - "name": "python3" - }, - "language_info": { - "codemirror_mode": { - "name": "ipython", - "version": 3 - }, - "file_extension": ".py", - "mimetype": "text/x-python", - "name": "python", - "nbconvert_exporter": "python", - "pygments_lexer": "ipython3", - "version": "3.12.4" - } - }, - "nbformat": 4, - "nbformat_minor": 4 + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import os\n", + "\n", + "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", + "# os.environ[\"DEV_MODE\"] = \"True\"\n", + "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "\n", + "environment = os.environ.get(\"ORCHESTRA_DEPLOYMENT_TYPE\", \"python\")\n", + "high_port = os.environ.get(\"CLUSTER_HTTP_PORT_HIGH\", \"9081\")\n", + "low_port = os.environ.get(\"CLUSTER_HTTP_PORT_LOW\", \"9083\")\n", + "print(environment, high_port, low_port)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "\n", + "# set to use the live APIs\n", + "# import os\n", + "# os.environ[\"TEST_BIGQUERY_APIS_LIVE\"] = \"True\"\n", + "# third party\n", + "import pandas as pd\n", + "\n", + "# syft absolute\n", + "import syft as sy\n", + "from syft import test_settings\n", + "from syft.client.syncing import compare_clients\n", + "from syft.util.test_helpers.apis import make_schema\n", + "from syft.util.test_helpers.apis import make_submit_query\n", + "from syft.util.test_helpers.apis import make_test_query\n", + "from syft.util.test_helpers.email_helpers import get_email_server" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Launch server and login" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server_low = sy.orchestra.launch(\n", + " name=\"bigquery-low\",\n", + " server_side_type=\"low\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=low_port,\n", + ")\n", + "\n", + "server_high = sy.orchestra.launch(\n", + " name=\"bigquery-high\",\n", + " server_side_type=\"high\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=high_port,\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Email Server" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "email_server_low, smtp_server_low = get_email_server(server_side_type=\"low\")\n", + "email_server_high, smtp_server_high = get_email_server(server_side_type=\"high\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "low_client = server_low.login(email=\"info@openmined.org\", password=\"changethis\")\n", + "high_client = server_high.login(email=\"info@openmined.org\", password=\"changethis\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(high_client.worker_pools.get_all()) == 2\n", + "assert len(low_client.worker_pools.get_all()) == 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "this_worker_pool_name = \"bigquery-pool\"" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Load database information from test_settings" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "dataset_1 = test_settings.get(\"dataset_1\", default=\"dataset_1\")\n", + "dataset_2 = test_settings.get(\"dataset_2\", default=\"dataset_2\")\n", + "table_1 = test_settings.get(\"table_1\", default=\"table_1\")\n", + "table_2 = test_settings.get(\"table_2\", default=\"table_2\")\n", + "table_2_col_id = test_settings.get(\"table_2_col_id\", default=\"table_id\")\n", + "table_2_col_score = test_settings.get(\"table_2_col_score\", default=\"colname\")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Create and test different endpoints" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "----" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create `biquery.schema` endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "schema_function = make_schema(\n", + " settings={\n", + " \"calls_per_min\": 5,\n", + " },\n", + " worker_pool=this_worker_pool_name,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.custom_api.add(endpoint=schema_function)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "result = high_client.api.services.bigquery.schema()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(result) == 23" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "TODO: Note that when we do not create a job, the type of result is `syft.service.action.pandas.PandasDataFrameObject` and not pandas but the `.get()` method will get you the expected answer" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "from syft.service.action.pandas import PandasDataFrameObject\n", + "\n", + "# assert isinstance(result, pd.DataFrame)\n", + "assert isinstance(result, PandasDataFrameObject)\n", + "assert isinstance(result.get(), pd.DataFrame)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "____" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create `biquery.test_query` endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "mock_func = make_test_query(\n", + " settings={\n", + " \"rate_limiter_enabled\": True,\n", + " \"calls_per_min\": 10,\n", + " }\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "private_func = make_test_query(\n", + " settings={\n", + " \"rate_limiter_enabled\": False,\n", + " }\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "new_endpoint = sy.TwinAPIEndpoint(\n", + " path=\"bigquery.test_query\",\n", + " description=\"This endpoint allows to query Bigquery storage via SQL queries.\",\n", + " private_function=private_func,\n", + " mock_function=mock_func,\n", + " worker_pool=this_worker_pool_name,\n", + ")\n", + "\n", + "high_client.custom_api.add(endpoint=new_endpoint)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Some features for updating endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Here, we update the endpoint to timeout after 100s (rather the default of 60s)\n", + "high_client.api.services.api.update(\n", + " endpoint_path=\"bigquery.test_query\", endpoint_timeout=120\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.api.services.api.update(\n", + " endpoint_path=\"bigquery.test_query\", hide_mock_definition=True\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "#### Test the `bigquery.test_query` endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test mock version\n", + "result = high_client.api.services.bigquery.test_query.mock(\n", + " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 10\"\n", + ")\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(result) == 10" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test mock version for wrong queries\n", + "with sy.raises(\n", + " sy.SyftException(public_message=\"*must be qualified with a dataset*\"), show=True\n", + "):\n", + " high_client.api.services.bigquery.test_query.mock(\n", + " sql_query=\"SELECT * FROM invalid_table LIMIT 1\"\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Test private version\n", + "result = high_client.api.services.bigquery.test_query.private(\n", + " sql_query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 12\"\n", + ")\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(result) == 12" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "____" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "### Create `submit_query` endpoint" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "submit_query_function = make_submit_query(\n", + " settings={}, worker_pool=this_worker_pool_name\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.custom_api.add(endpoint=submit_query_function)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.api.services.api.update(\n", + " endpoint_path=\"bigquery.submit_query\", hide_mock_definition=True\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# Testing submit query\n", + "result = high_client.api.services.bigquery.submit_query(\n", + " func_name=\"my_func\",\n", + " query=f\"SELECT * FROM {dataset_1}.{table_1} LIMIT 2\",\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert \"Query submitted\" in result\n", + "result" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job = high_client.code.my_func(blocking=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "res = job.wait().get()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(res, pd.DataFrame)" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Test endpoints" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.custom_api.api_endpoints()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(high_client.custom_api.api_endpoints()) == 3" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert (\n", + " high_client.api.services.bigquery.test_query\n", + " and high_client.api.services.bigquery.submit_query\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Syncing" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "diff = compare_clients(\n", + " from_client=high_client, to_client=low_client, hide_usercode=False\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# TODO verify that jobs are actually filtered out\n", + "# TODO we need to think about whether its possible for the admin to create more data here that would break sync" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "widget = diff.resolve()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# TODO maybe see if non-internal method we can use or make it public\n", + "widget._share_all()\n", + "widget._sync_all()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(low_client.jobs.get_all()) == 0" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(low_client.custom_api.api_endpoints()) == 3" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert len(high_client.custom_api.api_endpoints()) == 3" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Test emails" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# email_server_low.get_emails_for_user(user_email=\"info@openmined.org\")\n", + "assert len(email_server_high.get_emails_for_user(user_email=\"info@openmined.org\")) == 2" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert (\n", + " \"Job Failed\"\n", + " in email_server_high.get_emails_for_user(user_email=\"info@openmined.org\")[\n", + " 0\n", + " ].email_content\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "assert (\n", + " \"A new request has been submitted and requires your attention\"\n", + " in email_server_high.get_emails_for_user(user_email=\"info@openmined.org\")[\n", + " 1\n", + " ].email_content\n", + ")" + ] + }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Clean up" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "if environment != \"remote\":\n", + " server_high.land()\n", + " server_low.land()\n", + "smtp_server_low.stop()\n", + "smtp_server_high.stop()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 4 } diff --git a/notebooks/scenarios/bigquery/sync/021-create-jobs.ipynb b/notebooks/scenarios/bigquery/sync/021-create-jobs.ipynb new file mode 100644 index 00000000000..f63763148f2 --- /dev/null +++ b/notebooks/scenarios/bigquery/sync/021-create-jobs.ipynb @@ -0,0 +1,441 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "id": "0", + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import os\n", + "\n", + "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", + "# os.environ[\"DEV_MODE\"] = \"True\"\n", + "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1", + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "\n", + "environment = os.environ.get(\"ORCHESTRA_DEPLOYMENT_TYPE\", \"python\")\n", + "low_port = os.environ.get(\"CLUSTER_HTTP_PORT_LOW\", \"9083\")\n", + "print(environment, low_port)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "2", + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "from collections import Counter\n", + "\n", + "# syft absolute\n", + "import syft as sy\n", + "from syft.util.test_helpers.email_helpers import get_email_server\n", + "from syft.util.test_helpers.email_helpers import load_users" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "3", + "metadata": {}, + "outputs": [], + "source": [ + "ADMIN_EMAIL, ADMIN_PW = \"info@openmined.org\", \"changethis\"" + ] + }, + { + "cell_type": "markdown", + "id": "4", + "metadata": {}, + "source": [ + "# Launch server & login" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "5", + "metadata": {}, + "outputs": [], + "source": [ + "server_low = sy.orchestra.launch(\n", + " name=\"bigquery-low\",\n", + " server_side_type=\"low\",\n", + " dev_mode=True,\n", + " n_consumers=1,\n", + " create_producer=True,\n", + " port=low_port,\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "6", + "metadata": {}, + "outputs": [], + "source": [ + "email_server_low, smtp_server_low = get_email_server(server_side_type=\"low\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "7", + "metadata": {}, + "outputs": [], + "source": [ + "low_client = server_low.login(email=ADMIN_EMAIL, password=ADMIN_PW)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "8", + "metadata": {}, + "outputs": [], + "source": [ + "users = load_users(low_client)" + ] + }, + { + "cell_type": "markdown", + "id": "9", + "metadata": {}, + "source": [ + "# Create jobs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "10", + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "from syft.util.test_helpers.job_helpers import TestJob\n", + "from syft.util.test_helpers.job_helpers import create_jobs\n", + "from syft.util.test_helpers.job_helpers import extract_code_path" + ] + }, + { + "cell_type": "markdown", + "id": "11", + "metadata": {}, + "source": [ + "# Inspect job data (requests for these jobs to be created)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "12", + "metadata": {}, + "outputs": [], + "source": [ + "num_jobs = int(os.environ.get(\"NUM_TEST_JOBS\", 10))\n", + "\n", + "jobs_data = create_jobs(users, total_jobs=num_jobs)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "13", + "metadata": {}, + "outputs": [], + "source": [ + "counts = Counter([j.job_type for j in jobs_data])\n", + "for k, v in counts.most_common():\n", + " print(f\"{k}: #{v}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "14", + "metadata": {}, + "outputs": [], + "source": [ + "# syft absolute\n", + "from syft.util.test_helpers.job_helpers import save_jobs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "15", + "metadata": {}, + "outputs": [], + "source": [ + "print(f\"{len(jobs_data)=}\")\n", + "\n", + "for job in jobs_data:\n", + " print(f\"{job.job_type=}, {job.should_succeed=}, {job.should_submit=}\")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "16", + "metadata": {}, + "outputs": [], + "source": [ + "assert len(jobs_data) == num_jobs\n", + "assert all(isinstance(j, TestJob) for j in jobs_data)\n", + "assert all(job.client is not None for job in jobs_data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "17", + "metadata": {}, + "outputs": [], + "source": [ + "save_jobs(jobs_data)" + ] + }, + { + "cell_type": "markdown", + "id": "18", + "metadata": {}, + "source": [ + "# Submit jobs\n" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "19", + "metadata": {}, + "outputs": [], + "source": [ + "admin_emails_before = len(email_server_low.get_emails_for_user(\"admin@bigquery.org\"))\n", + "print(f\"{admin_emails_before=}\")" + ] + }, + { + "cell_type": "markdown", + "id": "20", + "metadata": {}, + "source": [ + "## Test Succesful jobs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "21", + "metadata": {}, + "outputs": [], + "source": [ + "jobs_submit_should_succeed = [j for j in jobs_data if j.should_submit]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "22", + "metadata": {}, + "outputs": [], + "source": [ + "for job in jobs_submit_should_succeed:\n", + " client = job.client\n", + " response = client.api.services.bigquery.submit_query(\n", + " func_name=job.func_name, query=job.query\n", + " )\n", + " job.code_path = extract_code_path(response)" + ] + }, + { + "cell_type": "markdown", + "id": "23", + "metadata": {}, + "source": [ + "## Test failures" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "24", + "metadata": {}, + "outputs": [], + "source": [ + "jobs_submit_should_fail = [j for j in jobs_data if not j.should_submit]" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "25", + "metadata": {}, + "outputs": [], + "source": [ + "for job in jobs_submit_should_fail:\n", + " client = job.client\n", + "\n", + " with sy.raises(sy.SyftException):\n", + " client.api.services.bigquery.submit_query(\n", + " func_name=job.func_name, query=job.query\n", + " )" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "26", + "metadata": {}, + "outputs": [], + "source": [ + "for job in jobs_data:\n", + " print(f\"Job {job.func_name:.20} {job.should_submit=}, {job.is_submitted=}\")\n", + "\n", + "assert all(job.is_submitted == job.should_submit for job in jobs_data)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "27", + "metadata": {}, + "outputs": [], + "source": [ + "save_jobs(jobs_data)" + ] + }, + { + "cell_type": "markdown", + "id": "28", + "metadata": {}, + "source": [ + "## Test: cannot execute submitted jobs yet" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "29", + "metadata": {}, + "outputs": [], + "source": [ + "submitted_jobs = [job for job in jobs_data if job.should_submit]\n", + "job_execution_fns = [getattr(job.client.code, job.code_path) for job in submitted_jobs]\n", + "assert len(submitted_jobs) # failsafe for next tests" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "30", + "metadata": {}, + "outputs": [], + "source": [ + "for fn in job_execution_fns:\n", + " # blocking\n", + " with sy.raises(\n", + " sy.SyftException(public_message=\"*Your code is waiting for approval*\")\n", + " ):\n", + " result = fn()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "31", + "metadata": {}, + "outputs": [], + "source": [ + "# currently errors out with\n", + "# syft.types.errors.SyftException: Please wait for the admin to allow the execution of this code\n", + "\n", + "\"\"\" for fn in job_execution_fns:\n", + " # nonblocking\n", + " result_job = fn(blocking=False)\n", + " result_job.wait()\n", + " assert isinstance(result_job.result, sy.SyftError)\n", + " assert result_job.status == JobStatus.ERRORED \"\"\"" + ] + }, + { + "cell_type": "markdown", + "id": "32", + "metadata": {}, + "source": [ + "# Verify that admin has emails for submitted requests" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "33", + "metadata": {}, + "outputs": [], + "source": [ + "num_should_submit = sum(j.should_submit for j in jobs_data)\n", + "admin_emails_after = len(email_server_low.get_emails_for_user(\"admin@bigquery.org\"))\n", + "print(\"admin emails after\", admin_emails_after)\n", + "assert admin_emails_after >= admin_emails_before + num_should_submit\n", + "# assert len(users_emails) > after_number_of_emails\n", + "# assert len(users_emails) == after_number_of_emails + 1" + ] + }, + { + "cell_type": "markdown", + "id": "34", + "metadata": {}, + "source": [ + "# Cleanup" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "35", + "metadata": {}, + "outputs": [], + "source": [ + "if environment != \"remote\":\n", + " server_low.land()\n", + "smtp_server_low.stop()" + ] + } + ], + "metadata": { + "kernelspec": { + "display_name": "syft", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 5 +} diff --git a/notebooks/scenarios/bigquery/sync/030-ds-submit-request.ipynb b/notebooks/scenarios/bigquery/sync/030-ds-submit-request.ipynb index 8d64892206a..2d8ed894d40 100644 --- a/notebooks/scenarios/bigquery/sync/030-ds-submit-request.ipynb +++ b/notebooks/scenarios/bigquery/sync/030-ds-submit-request.ipynb @@ -12,6 +12,7 @@ "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", "# os.environ[\"DEV_MODE\"] = \"True\"\n", "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" ] }, @@ -34,16 +35,13 @@ "metadata": {}, "outputs": [], "source": [ - "# isort: off\n", + "# third party\n", + "import pandas as pd\n", + "\n", "# syft absolute\n", "import syft as sy\n", "from syft import test_settings\n", - "from syft import test_helpers # noqa: F401\n", - "\n", - "# third party\n", - "import pandas as pd\n", - "from email_helpers import get_email_server\n", - "# isort: on" + "from syft.util.test_helpers.email_helpers import get_email_server" ] }, { diff --git a/notebooks/scenarios/bigquery/sync/040-do-review-requests.ipynb b/notebooks/scenarios/bigquery/sync/040-do-review-requests.ipynb index ca7dc0b0597..9a0555d84dc 100644 --- a/notebooks/scenarios/bigquery/sync/040-do-review-requests.ipynb +++ b/notebooks/scenarios/bigquery/sync/040-do-review-requests.ipynb @@ -6,23 +6,32 @@ "metadata": {}, "outputs": [], "source": [ - "# isort: off\n", "# stdlib\n", "import os\n", "\n", - "# syft absolute\n", - "import syft as sy\n", - "from syft.client.syncing import compare_clients\n", - "from syft import test_helpers # noqa: F401\n", + "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", + "# os.environ[\"DEV_MODE\"] = \"True\"\n", + "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", "\n", "# third party\n", "import pandas as pd\n", - "from email_helpers import get_email_server\n", - "# isort: on\n", "\n", - "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", - "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", - "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" + "# syft absolute\n", + "import syft as sy\n", + "from syft import test_helpers # noqa: F401\n", + "from syft.client.syncing import compare_clients\n", + "from syft.util.test_helpers.email_helpers import get_email_server" ] }, { diff --git a/notebooks/scenarios/bigquery/sync/050-ds-get-results.ipynb b/notebooks/scenarios/bigquery/sync/050-ds-get-results.ipynb index e8432254ebe..6169fafb81b 100644 --- a/notebooks/scenarios/bigquery/sync/050-ds-get-results.ipynb +++ b/notebooks/scenarios/bigquery/sync/050-ds-get-results.ipynb @@ -6,21 +6,30 @@ "metadata": {}, "outputs": [], "source": [ - "# isort: off\n", "# stdlib\n", "import os\n", "\n", - "# syft absolute\n", - "import syft as sy\n", - "from syft import test_helpers # noqa: F401\n", + "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", + "# os.environ[\"DEV_MODE\"] = \"True\"\n", + "# os.environ[\"TEST_EXTERNAL_REGISTRY\"] = \"k3d-registry.localhost:5800\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_HIGH\"] = \"9081\"\n", + "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", "\n", "# third party\n", "import pandas as pd\n", - "from email_helpers import get_email_server\n", - "# isort: on\n", "\n", - "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", - "# os.environ[\"CLUSTER_HTTP_PORT_LOW\"] = \"9083\"" + "# syft absolute\n", + "import syft as sy\n", + "from syft.util.test_helpers.email_helpers import get_email_server" ] }, { diff --git a/packages/syft/src/syft/util/test_helpers/email_helpers.py b/packages/syft/src/syft/util/test_helpers/email_helpers.py index 6579ab7e5d9..cc6ab7a2839 100644 --- a/packages/syft/src/syft/util/test_helpers/email_helpers.py +++ b/packages/syft/src/syft/util/test_helpers/email_helpers.py @@ -218,7 +218,7 @@ def user_exists(root_client, email: str) -> bool: class SMTPTestServer: - def __init__(self, email_server, port=9025): + def __init__(self, email_server, port=9025, ready_timeout=20): self.port = port self.hostname = "0.0.0.0" self._stop_event = asyncio.Event() @@ -245,7 +245,10 @@ async def handle_DATA(self, server, session, envelope): try: self.handler = SimpleHandler() self.controller = Controller( - self.handler, hostname=self.hostname, port=self.port + self.handler, + hostname=self.hostname, + port=self.port, + ready_timeout=ready_timeout, ) except Exception as e: print(f"> Error initializing SMTPTestServer Controller: {e}")