Skip to content

Commit

Permalink
some early l0 activities in sim
Browse files Browse the repository at this point in the history
  • Loading branch information
yashgorana committed Sep 18, 2024
1 parent 7cec793 commit b650495
Show file tree
Hide file tree
Showing 9 changed files with 258 additions and 30 deletions.
8 changes: 4 additions & 4 deletions packages/syft/setup.cfg
Original file line number Diff line number Diff line change
Expand Up @@ -139,10 +139,10 @@ test_plugins =
faker
distro
dynaconf
; pytest-asyncio
; pytest-timeout
; anyio
; unsync
pytest-asyncio
pytest-timeout
anyio
unsync

[options.entry_points]
console_scripts =
Expand Down
Empty file added test_helpers/__init__.py
Empty file.
22 changes: 6 additions & 16 deletions tests/scenarios/bigquery/helpers/api.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,15 @@
# stdlib
import os
import sys

# third party
from unsync import unsync

# syft absolute
import syft as sy
from syft.util.util import find_base_dir_with_tox_ini

# TODO remove hacky imports once https://github.com/OpenMined/PySyft/pull/9291/ is merged
base_dir = find_base_dir_with_tox_ini()
notebook_helpers_module_path = os.path.abspath(os.path.join(base_dir, "notebooks/"))
if notebook_helpers_module_path not in sys.path:
sys.path.append(notebook_helpers_module_path)
from syft import test_helpers # noqa: F401


# third party
# The below two imports work only after the above sys.path.append
from notebook_helpers.apis import make_schema # noqa: E402
from notebook_helpers.apis import make_test_query # noqa: E402
# the fuck? fix test_helpers
if True:
# third party
from apis import make_schema # type: ignore
from apis import make_test_query


# Define any helper methods for our rate limiter
Expand Down
4 changes: 2 additions & 2 deletions tests/scenarios/bigquery/helpers/code.py
Original file line number Diff line number Diff line change
Expand Up @@ -61,11 +61,11 @@ def get_pending(client):


@unsync
async def triage_requests(events, client, after, register):
async def triage_requests(events, client, after, register, sleep=2):
if after:
await events.await_for(event_name=after)
while True:
await asyncio.sleep(2)
await asyncio.sleep(sleep)
requests = get_pending(client)
for request in requests:
approve_and_deposit(client, request.id)
Expand Down
6 changes: 6 additions & 0 deletions tests/scenarios/bigquery/helpers/events.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,19 +19,25 @@ class Event:
PREBUILT_WORKER_IMAGE_BIGQUERY_CREATED = "prebuilt_worker_image_bigquery_created"
EXTERNAL_REGISTRY_BIGQUERY_CREATED = "external_registry_bigquery_created"
WORKER_POOL_CREATED = "worker_pool_created"
WORKER_POOL_SCALED = "worker_pool_scaled"
ALLOW_GUEST_SIGNUP_ENABLED = "allow_guest_signup_enabled"
ALLOW_GUEST_SIGNUP_DISABLED = "allow_guest_signup_disabled"
USERS_CREATED_CHECKED = "users_created_checked"
SCHEMA_ENDPOINT_CREATED = "schema_endpoint_created"
SUBMIT_QUERY_ENDPOINT_CREATED = "submit_query_endpoint_created"
SUBMIT_QUERY_ENDPOINT_CONFIGURED = "submit_query_endpoint_configured"
USERS_CAN_QUERY_MOCK = "users_can_query_mock"
USERS_CAN_SUBMIT_QUERY = "users_can_submit_query"
ADMIN_APPROVED_REQUEST = "admin_approved_request"
ADMIN_APPROVED_FIRST_REQUEST = "admin_approved_first_request"
USERS_CAN_GET_APPROVED_RESULT = "users_can_get_approved_result"
USERS_QUERY_NOT_READY = "users_query_not_ready"
QUERY_ENDPOINT_CREATED = "query_endpoint_created"
QUERY_ENDPOINT_CONFIGURED = "query_endpoint_configured"

ADMIN_LOW_SIDE_WORKFLOW_COMPLETED = "admin_low_side_workflow_completed"
ADMIN_HIGH_SIDE_WORKFLOW_COMPLETED = "admin_high_side_workflow_completed"


@dataclass
class Scenario:
Expand Down
8 changes: 8 additions & 0 deletions tests/scenarios/bigquery/helpers/fixtures_sync.py
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@ def cleanup():
return server


def make_client(url: str, email: str, password: str) -> Any:
return sy.login(url=url, email=email, password=password)


def sync_clients(from_client, to_client):
return sy.sync(from_client, to_client)


@unsync
async def create_users(root_client, events, users, event_name):
for test_user in users:
Expand Down
30 changes: 25 additions & 5 deletions tests/scenarios/bigquery/helpers/workers.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@


@unsync
async def get_prebuilt_worker_image(events, client, expected_tag, event_name):
await events.await_for(event_name=event_name, show=True)
async def get_prebuilt_worker_image(events, client, expected_tag, after):
await events.await_for(event_name=after, show=True)
worker_images = client.images.get_all()
for worker_image in worker_images:
if expected_tag in str(worker_image.image_identifier):
Expand All @@ -35,7 +35,11 @@ async def add_external_registry(events, client, event_name):

@unsync
async def create_worker_pool(
events, client, worker_pool_name, worker_pool_result, event_name
events,
client,
worker_pool_name,
worker_pool_result,
event_name,
):
# block until this is available
worker_image = worker_pool_result.result(timeout=5)
Expand All @@ -53,11 +57,27 @@ async def create_worker_pool(


@unsync
async def check_worker_pool_exists(events, client, worker_pool_name, event_name):
async def check_worker_pool_exists(events, client, worker_pool_name, after):
timeout = 30
await events.await_for(event_name=event_name, timeout=timeout)
await events.await_for(event_name=after, timeout=timeout)
pools = client.worker_pools.get_all()
for pool in pools:
if worker_pool_name == pool.name:
assert worker_pool_name == pool.name
return worker_pool_name == pool.name


@unsync
async def scale_worker_pool(
events, client, worker_pool_name, num_workers, event_name, after
):
if after:
await events.await_for(event_name=after)

result = client.api.services.worker_pool.scale(
pool_name=worker_pool_name,
num_workers=num_workers,
)

assert isinstance(result, sy.SyftSuccess)
events.register(event_name)
205 changes: 205 additions & 0 deletions tests/scenarios/bigquery/level_0_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,205 @@
# stdlib
import asyncio
import random

# third party
from helpers.events import Event
from helpers.events import EventManager
from helpers.events import Scenario
from helpers.fixtures_sync import make_client
from helpers.fixtures_sync import sync_clients
from helpers.users import set_settings_allow_guest_signup
from helpers.workers import add_external_registry
from helpers.workers import check_worker_pool_exists
from helpers.workers import create_prebuilt_worker_image
from helpers.workers import create_worker_pool
from helpers.workers import get_prebuilt_worker_image
import pytest
from unsync import unsync

random.seed(42069)


async def user_low_side_activity():
# loop: guest user creation is allowed
# create_user

# login_user

# submit_code
# request_approval

# loop: wait for approval

# execute code
# get result

# dump result in a file
pass


@unsync
async def admin_sync_activity(_, events, after):
if after:
await events.await_for(event_name=after)

# login to high side
admin_client_high = make_client(
url="http://localhost:8080",
email="[email protected]",
password="changethis",
)

admin_client_low = make_client(
url="http://localhost:8081",
email="[email protected]",
password="changethis",
)

while True:
await asyncio.sleep(3)
print("admin_sync_activity: syncing high & low")
sync_clients(admin_client_high, admin_client_low)


@unsync
async def admin_create_worker_pool(_, admin_client, events):
"""
Worker pool creation typically involves
- Register custom image
- Launch worker pool
- Scale worker pool
"""

worker_pool_name = "bigquery-pool"
worker_docker_tag = "openmined/worker-bigquery:0.9.1"

create_prebuilt_worker_image(
events,
admin_client,
worker_docker_tag,
Event.PREBUILT_WORKER_IMAGE_BIGQUERY_CREATED,
)

worker_image_result = get_prebuilt_worker_image(
events,
admin_client,
worker_docker_tag,
after=Event.PREBUILT_WORKER_IMAGE_BIGQUERY_CREATED,
)

# todo - configure this manually??
add_external_registry(
events,
admin_client,
Event.EXTERNAL_REGISTRY_BIGQUERY_CREATED,
)

create_worker_pool(
events,
admin_client,
worker_pool_name,
worker_image_result,
Event.WORKER_POOL_CREATED,
)

check_worker_pool_exists(
events,
admin_client,
worker_pool_name,
after=Event.WORKER_POOL_CREATED,
)

# TODO
# scale_worker_pool(
# events,
# admin_client,
# worker_pool_name,
# event_name=Event.WORKER_POOL_SCALED,
# after=Event.WORKER_POOL_CREATED,
# )


@unsync
async def admin_low_side_activity(_, events):
"""
Typical admin activity on low-side server
1. Login to low-side server
2. Enable guest sign up
3. Start checking requests every 'n' seconds
"""

# login to low side
admin_client = make_client(
url="http://localhost:8081",
email="[email protected]",
password="changethis",
)

# enable guest sign up
set_settings_allow_guest_signup(
events,
admin_client,
True,
Event.ALLOW_GUEST_SIGNUP_ENABLED,
)

# create worker pool on low side
admin_create_worker_pool(_, admin_client, events)

# start checking requests every 5s
# triage_requests(
# events,
# admin_client,
# register=Event.ADMIN_APPROVED_REQUEST,
# sleep=5,
# )

events.register(Event.ADMIN_LOW_SIDE_WORKFLOW_COMPLETED)


@unsync
async def admin_high_side_activity(_, events):
# login
admin_client = make_client(
url="http://localhost:8080",
email="[email protected]",
password="changethis",
)

admin_create_worker_pool(_, admin_client, events)

events.register(Event.ADMIN_HIGH_SIDE_WORKFLOW_COMPLETED)


@pytest.mark.asyncio
async def test_level_0_k8s(request):
scenario = Scenario(
name="test_level_0_k8s",
events=[
Event.ALLOW_GUEST_SIGNUP_ENABLED,
Event.ADMIN_LOW_SIDE_WORKFLOW_COMPLETED,
Event.ADMIN_HIGH_SIDE_WORKFLOW_COMPLETED,
],
)

events = EventManager()
events.add_scenario(scenario)
events.monitor()

# start admin activity on high side
admin_low_side_activity(request, events)

# todo
admin_high_side_activity(request, events)

# todo - only start syncing after the root user created other admin users
admin_sync_activity(request, events, after=Event.USER_ADMIN_CREATED)

# todo
# users = create_users()
# [user_low_side_activity(user) for user in users]

await events.await_scenario(scenario_name="test_level_0_k8s", timeout=30)
assert events.scenario_completed("test_level_0_k8s")
5 changes: 2 additions & 3 deletions tests/scenarios/bigquery/level_2_basic_test.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,3 @@
# stdlib

# third party
from helpers.api import create_endpoints_query
from helpers.api import create_endpoints_schema
Expand Down Expand Up @@ -235,6 +233,7 @@ async def test_level_2_basic_scenario(request):
assert res is True

await events.await_scenario(
scenario_name="test_create_apis_and_triage_requests", timeout=30
scenario_name="test_create_apis_and_triage_requests",
timeout=30,
)
assert events.scenario_completed("test_create_apis_and_triage_requests")

0 comments on commit b650495

Please sign in to comment.