Skip to content

Commit

Permalink
Merge pull request #9236 from OpenMined/teo/data_review_test
Browse files Browse the repository at this point in the history
BigQuery data review test
  • Loading branch information
madhavajay authored Sep 4, 2024
2 parents 3f7960a + 7f7c977 commit d60722a
Show file tree
Hide file tree
Showing 3 changed files with 365 additions and 1 deletion.
273 changes: 273 additions & 0 deletions notebooks/scenarios/bigquery/042-data_review.ipynb
Original file line number Diff line number Diff line change
@@ -0,0 +1,273 @@
{
"cells": [
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# stdlib\n",
"import os\n",
"\n",
"environment = os.environ.get(\"ORCHESTRA_DEPLOYMENT_TYPE\", \"python\")\n",
"environment"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# third party\n",
"\n",
"# syft absolute\n",
"import syft as sy"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"SERVER_PORT = \"8080\"\n",
"SERVER_URL = f\"http://localhost:{SERVER_PORT}\""
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"server = sy.orchestra.launch(\n",
" name=\"bigquery-high\",\n",
" dev_mode=True,\n",
" server_side_type=\"high\",\n",
" # reset=True,\n",
" port=SERVER_PORT,\n",
" n_consumers=4, # How many workers to be spawned\n",
" create_producer=True, # Can produce more workers\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"ADMIN_EMAIL, ADMIN_PW = \"[email protected]\", \"bqpw2\"\n",
"high_client = sy.login(\n",
" url=\"http://localhost:8080\", email=ADMIN_EMAIL, password=ADMIN_PW\n",
")"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"high_client.requests"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"high_client.requests.get_all_approved()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"high_client.requests[2].code"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"high_client.requests[2].code()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"job = high_client.requests[2].code(blocking=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"job.wait().get()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"high_client.requests.get_all_pending()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"for request in high_client.requests.get_all_pending():\n",
" if request.code.service_func_name.startswith(\"wrong_syntax_query\"):\n",
" bad_request = request\n",
" if request.code.service_func_name.startswith(\"simple_query\"):\n",
" good_request = request"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"good_job = good_request.code(blocking=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"good_job.wait()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"good_request.deposit_result(good_job.info, approve=True)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"high_client.requests.get_all_approved()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"# stdlib\n",
"import json\n",
"\n",
"# third party\n",
"from job_helpers import resolve_request\n",
"\n",
"request_dict = {}\n",
"\n",
"for request in high_client.requests:\n",
" request_id, request_status = resolve_request(request)\n",
" request_dict[str(request_id)] = str(request_status)\n",
"\n",
"with open(\".requests.json\", \"w\") as fp:\n",
" json.dump(request_dict, fp)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bad_job = bad_request.code(blocking=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"bad_job"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"retry_good_job = good_request.code(blocking=False)"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": [
"retry_good_job.wait()"
]
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
},
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
"kernelspec": {
"display_name": "syft_3.12",
"language": "python",
"name": "python3"
},
"language_info": {
"codemirror_mode": {
"name": "ipython",
"version": 3
},
"file_extension": ".py",
"mimetype": "text/x-python",
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.4"
}
},
"nbformat": 4,
"nbformat_minor": 2
}
39 changes: 38 additions & 1 deletion notebooks/scenarios/bigquery/job_helpers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
import re
import secrets
import textwrap
from typing import Any

# third party
from helpers import TestUser
Expand Down Expand Up @@ -282,6 +283,15 @@ def create_jobs(users: list[TestUser], total_jobs: int = 10) -> list[TestJob]:
return jobs


def submit_job(job: TestJob) -> tuple[Any, str]:
client = job.client
response = client.api.services.bigquery.submit_query(
func_name=job.func_name, query=job.query
)
job.code_path = extract_code_path(response)
return response


def extract_code_path(response) -> str | None:
pattern = r"client\.code\.(\w+)\(\)"
match = re.search(pattern, str(response))
Expand All @@ -291,9 +301,36 @@ def extract_code_path(response) -> str | None:
return None


def resolve_request(request):
service_func_name = request.code.service_func_name
if service_func_name.startswith("simple_query"):
request.approve() # approve because it is good
if service_func_name.startswith("wrong_asset_query"):
request.approve() # approve because it is bad
if service_func_name.startswith("wrong_syntax_query"):
request.approve() # approve because it is bad
if service_func_name.startswith("job_too_much_text"):
request.deny(reason="too long, boring!") # deny because it is bad
if service_func_name.startswith("job_long_name"):
request.deny(reason="too long, boring!") # deny because it is bad
if service_func_name.startswith("job_funcname_xss"):
request.deny(reason="too long, boring!") # never reach doesnt matter
if service_func_name.startswith("job_query_xss"):
request.approve() # approve because it is bad
if service_func_name.startswith("job_many_columns"):
request.approve() # approve because it is bad

return (request.id, request.status)


create_job_functions = [
create_simple_query_job, # quick way to increase the odds
create_simple_query_job,
create_simple_query_job,
create_simple_query_job,
create_simple_query_job,
create_simple_query_job,
# create_wrong_asset_query,
create_wrong_asset_query,
create_wrong_syntax_query,
create_long_query_job,
create_query_long_name,
Expand Down
54 changes: 54 additions & 0 deletions packages/syft/src/syft/service/request/request_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,60 @@ def get_all(self, context: AuthedServiceContext) -> list[Request]:

return requests

# DIRTY METHOD: DELETE AFTER DATABASE UPGRADE
@service_method(
path="request.get_all_approved",
name="get_all_approved",
roles=DATA_SCIENTIST_ROLE_LEVEL,
)
def get_all_approved(self, context: AuthedServiceContext) -> list[Request]:
requests = self.stash.get_all(context.credentials).unwrap()
# TODO remove once sorting is handled by the stash
requests = [
request
for request in requests
if request.get_status(context) == RequestStatus.APPROVED
]
requests.sort(key=lambda x: (x.request_time, x.id), reverse=True)

return requests

# DIRTY METHOD: DELETE AFTER DATABASE UPGRADE
@service_method(
path="request.get_all_rejected",
name="get_all_rejected",
roles=DATA_SCIENTIST_ROLE_LEVEL,
)
def get_all_rejected(self, context: AuthedServiceContext) -> list[Request]:
requests = self.stash.get_all(context.credentials).unwrap()
# TODO remove once sorting is handled by the stash
requests = [
request
for request in requests
if request.get_status(context) == RequestStatus.REJECTED
]
requests.sort(key=lambda x: (x.request_time, x.id), reverse=True)

return requests

# DIRTY METHOD: DELETE AFTER DATABASE UPGRADE
@service_method(
path="request.get_all_pending",
name="get_all_pending",
roles=DATA_SCIENTIST_ROLE_LEVEL,
)
def get_all_pending(self, context: AuthedServiceContext) -> list[Request]:
requests = self.stash.get_all(context.credentials).unwrap()
# TODO remove once sorting is handled by the stash
requests = [
request
for request in requests
if request.get_status(context) == RequestStatus.PENDING
]
requests.sort(key=lambda x: (x.request_time, x.id), reverse=True)

return requests

@service_method(path="request.get_all_info", name="get_all_info")
def get_all_info(
self,
Expand Down

0 comments on commit d60722a

Please sign in to comment.