Skip to content

Commit

Permalink
Merge pull request #9224 from OpenMined/aziz/filter_high_side_requests
Browse files Browse the repository at this point in the history
sync: filter out high side requests and codes
  • Loading branch information
abyesilyurt authored Sep 2, 2024
2 parents e4e0ca3 + e9f4158 commit 10a3d46
Show file tree
Hide file tree
Showing 4 changed files with 97 additions and 40 deletions.
69 changes: 49 additions & 20 deletions notebooks/scenarios/bigquery/sync/02-configure-api-and-sync.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"# stdlib\n",
Expand Down Expand Up @@ -31,7 +33,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"# syft absolute\n",
Expand All @@ -42,7 +46,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"server_low = sy.orchestra.launch(\n",
Expand All @@ -67,7 +73,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"low_client = server_low.login(email=\"[email protected]\", password=\"changethis\")\n",
Expand All @@ -77,7 +85,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"assert len(high_client.worker_pools.get_all()) == 2\n",
Expand All @@ -87,7 +97,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"this_worker_pool_name = \"bigquery-pool\""
Expand All @@ -96,7 +108,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"# !pip list | grep bigquery"
Expand All @@ -105,7 +119,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"# !pip install db-dtypes google-cloud-bigquery"
Expand All @@ -121,7 +137,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"mock_func = make_test_query(\n",
Expand All @@ -135,7 +153,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"private_func = make_test_query(\n",
Expand All @@ -148,7 +168,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"new_endpoint = sy.TwinAPIEndpoint(\n",
Expand All @@ -165,7 +187,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"# Here, we update the endpoint to timeout after 100s (rather the default of 60s)\n",
Expand All @@ -177,7 +201,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"high_client.api.services.api.update(\n",
Expand All @@ -188,7 +214,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"schema_function = make_schema(\n",
Expand All @@ -202,7 +230,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"high_client.custom_api.add(endpoint=schema_function)\n",
Expand All @@ -212,7 +242,9 @@
{
"cell_type": "code",
"execution_count": null,
"metadata": {},
"metadata": {
"metadata": {}
},
"outputs": [],
"source": [
"dataset_1 = test_settings.get(\"dataset_1\", default=\"dataset_1\")\n",
Expand Down Expand Up @@ -491,9 +523,6 @@
"outputs": [],
"source": [
"assert [\n",
" \"Synced NEW UserCode\",\n",
" \"Synced NEW Request\",\n",
" \"Synced NEW Job\",\n",
" \"Synced NEW TwinAPIEndpoint\",\n",
" \"Synced NEW TwinAPIEndpoint\",\n",
" \"Synced NEW TwinAPIEndpoint\",\n",
Expand Down Expand Up @@ -576,7 +605,7 @@
"name": "python",
"nbconvert_exporter": "python",
"pygments_lexer": "ipython3",
"version": "3.12.2"
"version": "3.10.14"
}
},
"nbformat": 4,
Expand Down
23 changes: 14 additions & 9 deletions packages/syft/src/syft/service/code/user_code.py
Original file line number Diff line number Diff line change
Expand Up @@ -1960,17 +1960,22 @@ def load_approved_policy_code(
user_code_items: list[UserCode], context: AuthedServiceContext | None
) -> Any:
"""Reload the policy code in memory for user code that is approved."""
try:
for user_code in user_code_items:
for user_code in user_code_items:
try:
if context is None:
status = user_code.status
else:
status = user_code.get_status(context).unwrap()
except SyftException:
display(
SyftWarning(
message=f"Failed to load UserCode {user_code.id.no_dash} {user_code.service_func_name=}"
)
)
continue

if status.approved:
if isinstance(user_code.input_policy_type, UserPolicy):
load_policy_code(user_code.input_policy_type)
if isinstance(user_code.output_policy_type, UserPolicy):
load_policy_code(user_code.output_policy_type)
except Exception as e:
raise Exception(f"Failed to load code: {user_code}: {e}")
if status.approved:
if isinstance(user_code.input_policy_type, UserPolicy):
load_policy_code(user_code.input_policy_type)
if isinstance(user_code.output_policy_type, UserPolicy):
load_policy_code(user_code.output_policy_type)
30 changes: 19 additions & 11 deletions packages/syft/src/syft/service/sync/diff_state.py
Original file line number Diff line number Diff line change
Expand Up @@ -915,17 +915,7 @@ def root(self) -> ObjectDiff:
return self.root_diff

def __repr__(self) -> Any:
try:
return f"""{self.hierarchy_str('low')}
{self.hierarchy_str('high')}
"""
except Exception as _:
raise SyftException(
public_message=html.escape(
"Could not render batch, please use resolve(<batch>) instead."
)
)
return f"{self.__class__.__name__}[{self.root_type.__name__}](#{str(self.root_id)})"

def _repr_markdown_(self, wrap_as_python: bool = True, indent: int = 0) -> str:
return "" # Turns off the _repr_markdown_ of SyftObject
Expand Down Expand Up @@ -1248,6 +1238,7 @@ def from_sync_state(
# TODO: Check if high and low ignored batches are the same else error
previously_ignored_batches = low_state.ignored_batches
ServerDiff.apply_previous_ignore_state(all_batches, previously_ignored_batches)
ServerDiff.ignore_high_side_code(all_batches)

res = cls(
low_server_uid=low_state.server_uid,
Expand Down Expand Up @@ -1320,6 +1311,23 @@ def apply_previous_ignore_state(
if len(required_dependencies & other_batch_root_id):
other_batch.decision = None

@staticmethod
def ignore_high_side_code(batches: list[ObjectDiffBatch]) -> None:
# relative
from ...abstract_server import ServerSideType
from ...client.syncing import get_other_ignore_batches

for batch in batches:
if not issubclass(batch.root_type, UserCode):
continue

user_code: UserCode = batch.root.non_empty_object # type: ignore
if user_code.origin_server_side_type == ServerSideType.HIGH_SIDE:
batch.decision = SyncDecision.IGNORE
other_batches = get_other_ignore_batches(batch, batches)
for other_batch in other_batches:
other_batch.decision = SyncDecision.IGNORE

@staticmethod
def dependencies_from_states(
low_state: SyncState, high_state: SyncState
Expand Down
15 changes: 15 additions & 0 deletions packages/syft/tests/syft/service/sync/sync_resolve_single_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,21 @@ def compute_thrice() -> int:
assert diff_after.is_same


def test_filter_out_l2_requests(low_worker, high_worker):
low_client = low_worker.root_client
high_client = high_worker.root_client

@sy.syft_function_single_use()
def compute() -> int:
return 42

high_client.code.request_code_execution(compute)
high_client.code.compute(blocking=False)

w = sy.sync(from_client=high_client, to_client=low_client)
assert isinstance(w, SyftSuccess), f"Expected empty diff, got {w}"


def test_approve_request_on_sync_blocking(low_worker, high_worker):
low_client = low_worker.root_client
client_low_ds = get_ds_client(low_client)
Expand Down

0 comments on commit 10a3d46

Please sign in to comment.