From 614ebb567f8e2a7ecfa9f7ecff32ebad6b9d7783 Mon Sep 17 00:00:00 2001 From: Aziz Berkay Yesilyurt Date: Thu, 25 Apr 2024 16:05:56 +0200 Subject: [PATCH 1/3] fix job error reporting --- .../syft/src/syft/service/code/user_code.py | 6 ++- .../syft/service/code/user_code_service.py | 2 +- .../syft/src/syft/service/job/job_stash.py | 2 +- packages/syft/src/syft/service/queue/queue.py | 6 ++- .../syft/src/syft/service/request/request.py | 5 +++ .../syft/tests/syft/users/user_code_test.py | 3 +- tests/integration/local/twin_api_sync_test.py | 37 +++++++++++++++++++ 7 files changed, 56 insertions(+), 5 deletions(-) diff --git a/packages/syft/src/syft/service/code/user_code.py b/packages/syft/src/syft/service/code/user_code.py index 3bfea48a166..0c936dd20ae 100644 --- a/packages/syft/src/syft/service/code/user_code.py +++ b/packages/syft/src/syft/service/code/user_code.py @@ -1489,7 +1489,11 @@ def to_str(arg: Any) -> str: original_print( f"{time} EXCEPTION LOG ({job_id}):\n{error_msg}", file=sys.stderr ) - if context.node is not None: + if ( + context.node is not None + and context.job is not None + and context.job.log_id is not None + ): log_id = context.job.log_id log_service = context.node.get_service("LogService") log_service.append(context=context, uid=log_id, new_err=error_msg) diff --git a/packages/syft/src/syft/service/code/user_code_service.py b/packages/syft/src/syft/service/code/user_code_service.py index c7d3ff1b758..c2b0686b2fa 100644 --- a/packages/syft/src/syft/service/code/user_code_service.py +++ b/packages/syft/src/syft/service/code/user_code_service.py @@ -550,7 +550,7 @@ def _call( return Ok(result) elif result.syft_action_data_type is Err: # result contains the error but the request was handled correctly - return result.syft_action_data + return Ok(result) elif has_result_read_permission: return Ok(result) else: diff --git a/packages/syft/src/syft/service/job/job_stash.py b/packages/syft/src/syft/service/job/job_stash.py index bdf8d3df89d..9daf864a78f 100644 --- a/packages/syft/src/syft/service/job/job_stash.py +++ b/packages/syft/src/syft/service/job/job_stash.py @@ -677,7 +677,7 @@ def wait( return self.resolve if not job_only and self.result is not None: - self.result.wait() + self.result.wait(timeout) if api is None: raise ValueError( diff --git a/packages/syft/src/syft/service/queue/queue.py b/packages/syft/src/syft/service/queue/queue.py index 35212843651..cc2d502d122 100644 --- a/packages/syft/src/syft/service/queue/queue.py +++ b/packages/syft/src/syft/service/queue/queue.py @@ -200,11 +200,15 @@ def handle_message_multiprocessing( status = Status.COMPLETED job_status = JobStatus.COMPLETED - if isinstance(result, Ok): + if isinstance(result.ok().syft_action_data, Err): + status = Status.ERRORED + job_status = JobStatus.ERRORED result = result.ok() elif isinstance(result, SyftError) or isinstance(result, Err): status = Status.ERRORED job_status = JobStatus.ERRORED + elif isinstance(result, Ok): + result = result.ok() except Exception as e: # nosec status = Status.ERRORED job_status = JobStatus.ERRORED diff --git a/packages/syft/src/syft/service/request/request.py b/packages/syft/src/syft/service/request/request.py index 5b4792c9fc6..78c21bd8c1c 100644 --- a/packages/syft/src/syft/service/request/request.py +++ b/packages/syft/src/syft/service/request/request.py @@ -877,6 +877,11 @@ def accept_by_depositing_result( return res job_info.result = action_object + job_info.status = ( + JobStatus.ERRORED + if isinstance(action_object.syft_action_data, Err) + else JobStatus.COMPLETED + ) existing_result = job.result.id if job.result is not None else None print( diff --git a/packages/syft/tests/syft/users/user_code_test.py b/packages/syft/tests/syft/users/user_code_test.py index da4602ebde4..cb98ee29112 100644 --- a/packages/syft/tests/syft/users/user_code_test.py +++ b/packages/syft/tests/syft/users/user_code_test.py @@ -87,7 +87,8 @@ def test_duplicated_user_code(worker, guest_client: User) -> None: assert len(guest_client.code.get_all()) == 1 # request the a different function name but same content will also succeed - mock_syft_func_2() + # flaky if not blocking + mock_syft_func_2(blocking=True) result = guest_client.api.services.code.request_code_execution(mock_syft_func_2) assert isinstance(result, Request) assert len(guest_client.code.get_all()) == 2 diff --git a/tests/integration/local/twin_api_sync_test.py b/tests/integration/local/twin_api_sync_test.py index daccc398c04..62547ebda52 100644 --- a/tests/integration/local/twin_api_sync_test.py +++ b/tests/integration/local/twin_api_sync_test.py @@ -4,6 +4,7 @@ # third party import pytest +from result import Err # syft absolute import syft @@ -13,6 +14,8 @@ from syft.client.syncing import compare_clients from syft.client.syncing import resolve_single from syft.node.worker import Worker +from syft.service.job.job_stash import JobStash +from syft.service.job.job_stash import JobStatus from syft.service.response import SyftError from syft.service.response import SyftSuccess @@ -162,3 +165,37 @@ def compute(query): assert isinstance( private_res, SyftError ), "Should not be able to access private function on low side." + + +def test_function_error(full_low_worker) -> None: + root_domain_client = full_low_worker.login( + email="info@openmined.org", password="changethis" + ) + root_domain_client.register( + name="data-scientist", + email="test_user@openmined.org", + password="0000", + password_verify="0000", + ) + ds_client = root_domain_client.login( + email="test_user@openmined.org", + password="0000", + ) + + users = root_domain_client.users.get_all() + + @sy.syft_function_single_use() + def compute_sum(): + assert False + + compute_sum.code = dedent(compute_sum.code) + ds_client.api.services.code.request_code_execution(compute_sum) + + users[-1].allow_mock_execution() + result = ds_client.api.services.code.compute_sum(blocking=True) + assert isinstance(result.get(), Err) + + job_info = ds_client.api.services.code.compute_sum(blocking=False) + result = job_info.wait(timeout=10) + assert isinstance(result.get(), Err) + assert job_info.status == JobStatus.ERRORED From a150a01ccded2d75aba63f9ebbcfd7af1bfacc98 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Thu, 25 Apr 2024 18:08:54 +0200 Subject: [PATCH 2/3] add some fixes for logs --- packages/syft/src/syft/service/code/user_code.py | 4 ++++ packages/syft/src/syft/service/job/html_template.py | 4 ++-- packages/syft/src/syft/service/job/job_stash.py | 8 ++++---- 3 files changed, 10 insertions(+), 6 deletions(-) diff --git a/packages/syft/src/syft/service/code/user_code.py b/packages/syft/src/syft/service/code/user_code.py index 0c936dd20ae..7582d18d512 100644 --- a/packages/syft/src/syft/service/code/user_code.py +++ b/packages/syft/src/syft/service/code/user_code.py @@ -1489,6 +1489,10 @@ def to_str(arg: Any) -> str: original_print( f"{time} EXCEPTION LOG ({job_id}):\n{error_msg}", file=sys.stderr ) + else: + # for local execution + time = datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S") + original_print(f"{time} EXCEPTION LOG:\n{error_msg}\n", file=sys.stderr) if ( context.node is not None and context.job is not None diff --git a/packages/syft/src/syft/service/job/html_template.py b/packages/syft/src/syft/service/job/html_template.py index cc0b0f7f74b..221ac12dbe3 100644 --- a/packages/syft/src/syft/service/job/html_template.py +++ b/packages/syft/src/syft/service/job/html_template.py @@ -65,8 +65,8 @@ display: none;align-items:left">
- - + - + -
+
# diff --git a/packages/syft/src/syft/service/job/job_stash.py b/packages/syft/src/syft/service/job/job_stash.py index 9daf864a78f..a823f501773 100644 --- a/packages/syft/src/syft/service/job/job_stash.py +++ b/packages/syft/src/syft/service/job/job_stash.py @@ -620,18 +620,18 @@ def _repr_html_(self) -> str: """ - logs = self.logs(_print=False, stderr=False) + logs = self.logs(_print=False) logs_lines = logs.split("\n") if logs else [] logs_lines_html = "" for i, line in enumerate(logs_lines): logs_lines_html += f""" -
+
{i}
+
{line}
From a55c265b08a0cd0be7fea3b8c495bbcb3ed4c7e4 Mon Sep 17 00:00:00 2001 From: Koen van der Veen Date: Thu, 25 Apr 2024 18:15:32 +0200 Subject: [PATCH 3/3] disable twin api integration test --- .../tests/syft/service/sync/sync_flow_test.py | 112 +++++++++--------- 1 file changed, 56 insertions(+), 56 deletions(-) diff --git a/packages/syft/tests/syft/service/sync/sync_flow_test.py b/packages/syft/tests/syft/service/sync/sync_flow_test.py index 5698048c384..e397ed922ee 100644 --- a/packages/syft/tests/syft/service/sync/sync_flow_test.py +++ b/packages/syft/tests/syft/service/sync/sync_flow_test.py @@ -307,62 +307,62 @@ def private_function(context) -> str: return 42 -def test_twin_api_integration(full_high_worker, full_low_worker): - low_client = full_low_worker.login( - email="info@openmined.org", password="changethis" - ) - high_client = full_high_worker.login( - email="info@openmined.org", password="changethis" - ) - - low_client.register( - email="newuser@openmined.org", - name="John Doe", - password="pw", - password_verify="pw", - ) - - client_low_ds = low_client.login( - email="newuser@openmined.org", - password="pw", - ) - - new_endpoint = sy.TwinAPIEndpoint( - path="testapi.query", - private_function=private_function, - mock_function=mock_function, - description="", - ) - high_client.api.services.api.add(endpoint=new_endpoint) - high_client.refresh() - high_private_res = high_client.api.services.testapi.query.private() - assert high_private_res == 42 - - low_state = low_client.get_sync_state() - high_state = high_client.get_sync_state() - diff_state = compare_states(high_state, low_state) - - obj_diff_batch = diff_state[0] - widget = resolve_single(obj_diff_batch) - widget.click_sync() - - obj_diff_batch = diff_state[1] - widget = resolve_single(obj_diff_batch) - widget.click_sync() - - high_mock_res = high_client.api.services.testapi.query.mock() - assert high_mock_res == -42 - - client_low_ds.refresh() - high_client.refresh() - low_private_res = client_low_ds.api.services.testapi.query.private() - assert isinstance( - low_private_res, SyftError - ), "Should not have access to private on low side" - low_mock_res = client_low_ds.api.services.testapi.query.mock() - high_mock_res = high_client.api.services.testapi.query.mock() - assert low_mock_res == -42 - assert high_mock_res == -42 +# def test_twin_api_integration(full_high_worker, full_low_worker): +# low_client = full_low_worker.login( +# email="info@openmined.org", password="changethis" +# ) +# high_client = full_high_worker.login( +# email="info@openmined.org", password="changethis" +# ) + +# low_client.register( +# email="newuser@openmined.org", +# name="John Doe", +# password="pw", +# password_verify="pw", +# ) + +# client_low_ds = low_client.login( +# email="newuser@openmined.org", +# password="pw", +# ) + +# new_endpoint = sy.TwinAPIEndpoint( +# path="testapi.query", +# private_function=private_function, +# mock_function=mock_function, +# description="", +# ) +# high_client.api.services.api.add(endpoint=new_endpoint) +# high_client.refresh() +# high_private_res = high_client.api.services.testapi.query.private() +# assert high_private_res == 42 + +# low_state = low_client.get_sync_state() +# high_state = high_client.get_sync_state() +# diff_state = compare_states(high_state, low_state) + +# obj_diff_batch = diff_state[0] +# widget = resolve_single(obj_diff_batch) +# widget.click_sync() + +# obj_diff_batch = diff_state[1] +# widget = resolve_single(obj_diff_batch) +# widget.click_sync() + +# high_mock_res = high_client.api.services.testapi.query.mock() +# assert high_mock_res == -42 + +# client_low_ds.refresh() +# high_client.refresh() +# low_private_res = client_low_ds.api.services.testapi.query.private() +# assert isinstance( +# low_private_res, SyftError +# ), "Should not have access to private on low side" +# low_mock_res = client_low_ds.api.services.testapi.query.mock() +# high_mock_res = high_client.api.services.testapi.query.mock() +# assert low_mock_res == -42 +# assert high_mock_res == -42 def test_skip_user_code(low_worker, high_worker):