diff --git a/notebooks/scenarios/bigquery/042-data_review.ipynb b/notebooks/scenarios/bigquery/042-data_review.ipynb new file mode 100644 index 00000000000..283e8a32e2c --- /dev/null +++ b/notebooks/scenarios/bigquery/042-data_review.ipynb @@ -0,0 +1,273 @@ +{ + "cells": [ + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import os\n", + "\n", + "environment = os.environ.get(\"ORCHESTRA_DEPLOYMENT_TYPE\", \"python\")\n", + "environment" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# third party\n", + "\n", + "# syft absolute\n", + "import syft as sy" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "SERVER_PORT = \"8080\"\n", + "SERVER_URL = f\"http://localhost:{SERVER_PORT}\"" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "server = sy.orchestra.launch(\n", + " name=\"bigquery-high\",\n", + " dev_mode=True,\n", + " server_side_type=\"high\",\n", + " # reset=True,\n", + " port=SERVER_PORT,\n", + " n_consumers=4, # How many workers to be spawned\n", + " create_producer=True, # Can produce more workers\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "ADMIN_EMAIL, ADMIN_PW = \"admin2@bigquery.org\", \"bqpw2\"\n", + "high_client = sy.login(\n", + " url=\"http://localhost:8080\", email=ADMIN_EMAIL, password=ADMIN_PW\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.requests" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.requests.get_all_approved()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.requests[2].code" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.requests[2].code()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job = high_client.requests[2].code(blocking=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "job.wait().get()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.requests.get_all_pending()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "for request in high_client.requests.get_all_pending():\n", + " if request.code.service_func_name.startswith(\"wrong_syntax_query\"):\n", + " bad_request = request\n", + " if request.code.service_func_name.startswith(\"simple_query\"):\n", + " good_request = request" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "good_job = good_request.code(blocking=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "good_job.wait()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "good_request.deposit_result(good_job.info, approve=True)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "high_client.requests.get_all_approved()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import json\n", + "\n", + "# third party\n", + "from job_helpers import resolve_request\n", + "\n", + "request_dict = {}\n", + "\n", + "for request in high_client.requests:\n", + " request_id, request_status = resolve_request(request)\n", + " request_dict[str(request_id)] = str(request_status)\n", + "\n", + "with open(\".requests.json\", \"w\") as fp:\n", + " json.dump(request_dict, fp)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "bad_job = bad_request.code(blocking=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "bad_job" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "retry_good_job = good_request.code(blocking=False)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "retry_good_job.wait()" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [] + } + ], + "metadata": { + "kernelspec": { + "display_name": "syft_3.12", + "language": "python", + "name": "python3" + }, + "language_info": { + "codemirror_mode": { + "name": "ipython", + "version": 3 + }, + "file_extension": ".py", + "mimetype": "text/x-python", + "name": "python", + "nbconvert_exporter": "python", + "pygments_lexer": "ipython3", + "version": "3.12.4" + } + }, + "nbformat": 4, + "nbformat_minor": 2 +} diff --git a/notebooks/scenarios/bigquery/job_helpers.py b/notebooks/scenarios/bigquery/job_helpers.py index 804a218f962..69695939ccd 100644 --- a/notebooks/scenarios/bigquery/job_helpers.py +++ b/notebooks/scenarios/bigquery/job_helpers.py @@ -8,6 +8,7 @@ import re import secrets import textwrap +from typing import Any # third party from helpers import TestUser @@ -282,6 +283,15 @@ def create_jobs(users: list[TestUser], total_jobs: int = 10) -> list[TestJob]: return jobs +def submit_job(job: TestJob) -> tuple[Any, str]: + client = job.client + response = client.api.services.bigquery.submit_query( + func_name=job.func_name, query=job.query + ) + job.code_path = extract_code_path(response) + return response + + def extract_code_path(response) -> str | None: pattern = r"client\.code\.(\w+)\(\)" match = re.search(pattern, str(response)) @@ -291,9 +301,36 @@ def extract_code_path(response) -> str | None: return None +def resolve_request(request): + service_func_name = request.code.service_func_name + if service_func_name.startswith("simple_query"): + request.approve() # approve because it is good + if service_func_name.startswith("wrong_asset_query"): + request.approve() # approve because it is bad + if service_func_name.startswith("wrong_syntax_query"): + request.approve() # approve because it is bad + if service_func_name.startswith("job_too_much_text"): + request.deny(reason="too long, boring!") # deny because it is bad + if service_func_name.startswith("job_long_name"): + request.deny(reason="too long, boring!") # deny because it is bad + if service_func_name.startswith("job_funcname_xss"): + request.deny(reason="too long, boring!") # never reach doesnt matter + if service_func_name.startswith("job_query_xss"): + request.approve() # approve because it is bad + if service_func_name.startswith("job_many_columns"): + request.approve() # approve because it is bad + + return (request.id, request.status) + + create_job_functions = [ + create_simple_query_job, # quick way to increase the odds + create_simple_query_job, + create_simple_query_job, + create_simple_query_job, + create_simple_query_job, create_simple_query_job, - # create_wrong_asset_query, + create_wrong_asset_query, create_wrong_syntax_query, create_long_query_job, create_query_long_name, diff --git a/packages/syft/src/syft/service/request/request_service.py b/packages/syft/src/syft/service/request/request_service.py index 234ba59f4b8..5c2859331b4 100644 --- a/packages/syft/src/syft/service/request/request_service.py +++ b/packages/syft/src/syft/service/request/request_service.py @@ -95,6 +95,60 @@ def get_all(self, context: AuthedServiceContext) -> list[Request]: return requests + # DIRTY METHOD: DELETE AFTER DATABASE UPGRADE + @service_method( + path="request.get_all_approved", + name="get_all_approved", + roles=DATA_SCIENTIST_ROLE_LEVEL, + ) + def get_all_approved(self, context: AuthedServiceContext) -> list[Request]: + requests = self.stash.get_all(context.credentials).unwrap() + # TODO remove once sorting is handled by the stash + requests = [ + request + for request in requests + if request.get_status(context) == RequestStatus.APPROVED + ] + requests.sort(key=lambda x: (x.request_time, x.id), reverse=True) + + return requests + + # DIRTY METHOD: DELETE AFTER DATABASE UPGRADE + @service_method( + path="request.get_all_rejected", + name="get_all_rejected", + roles=DATA_SCIENTIST_ROLE_LEVEL, + ) + def get_all_rejected(self, context: AuthedServiceContext) -> list[Request]: + requests = self.stash.get_all(context.credentials).unwrap() + # TODO remove once sorting is handled by the stash + requests = [ + request + for request in requests + if request.get_status(context) == RequestStatus.REJECTED + ] + requests.sort(key=lambda x: (x.request_time, x.id), reverse=True) + + return requests + + # DIRTY METHOD: DELETE AFTER DATABASE UPGRADE + @service_method( + path="request.get_all_pending", + name="get_all_pending", + roles=DATA_SCIENTIST_ROLE_LEVEL, + ) + def get_all_pending(self, context: AuthedServiceContext) -> list[Request]: + requests = self.stash.get_all(context.credentials).unwrap() + # TODO remove once sorting is handled by the stash + requests = [ + request + for request in requests + if request.get_status(context) == RequestStatus.PENDING + ] + requests.sort(key=lambda x: (x.request_time, x.id), reverse=True) + + return requests + @service_method(path="request.get_all_info", name="get_all_info") def get_all_info( self,