From e4b1842be8a274866512c01213baa60df3389d90 Mon Sep 17 00:00:00 2001 From: IonesioJunior Date: Tue, 27 Aug 2024 10:25:37 -0300 Subject: [PATCH] Add Twin API endpoint logs --- .../api/0.8/12-custom-api-endpoint.ipynb | 143 +++++++++++++++++- .../src/syft/protocol/protocol_version.json | 5 + packages/syft/src/syft/server/server.py | 7 +- .../syft/service/action/action_endpoint.py | 38 ++++- packages/syft/src/syft/service/api/api.py | 55 +++++-- .../syft/src/syft/service/api/api_service.py | 25 ++- packages/syft/src/syft/service/api/utils.py | 45 ++++++ .../syft/src/syft/service/code/user_code.py | 3 +- 8 files changed, 296 insertions(+), 25 deletions(-) create mode 100644 packages/syft/src/syft/service/api/utils.py diff --git a/notebooks/api/0.8/12-custom-api-endpoint.ipynb b/notebooks/api/0.8/12-custom-api-endpoint.ipynb index 1bd6cde46c1..3db886b66b9 100644 --- a/notebooks/api/0.8/12-custom-api-endpoint.ipynb +++ b/notebooks/api/0.8/12-custom-api-endpoint.ipynb @@ -533,15 +533,154 @@ " )" ] }, + { + "cell_type": "markdown", + "metadata": {}, + "source": [ + "# Syft Function/API Logs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "@sy.api_endpoint_method()\n", + "def public_log_function(\n", + " context,\n", + ") -> str:\n", + " print(\"Logging Public Function Call\")\n", + " return \"Public Function Execution\"\n", + "\n", + "\n", + "@sy.api_endpoint_method()\n", + "def private_log_function(\n", + " context,\n", + ") -> str:\n", + " print(\"Logging Private Function Call\")\n", + " return \"Private Function Execution\"\n", + "\n", + "\n", + "new_endpoint = sy.TwinAPIEndpoint(\n", + " path=\"test.log\",\n", + " mock_function=public_log_function,\n", + " private_function=private_log_function,\n", + " description=\"Lore ipsulum ...\",\n", + ")\n", + "\n", + "# # Add it to the server.\n", + "response = datasite_client.api.services.api.add(endpoint=new_endpoint)" + ] + }, { "cell_type": "code", "execution_count": null, "metadata": {}, "outputs": [], - "source": [] + "source": [ + "@sy.syft_function_single_use(endpoint=datasite_client.api.services.test.log)\n", + "def test_log_call(endpoint): # noqa: F811\n", + " print(\"In Syft Function Context\")\n", + " endpoint()\n", + " print(\"After API endpoint call\")\n", + " return True\n", + "\n", + "\n", + "@sy.syft_function_single_use(endpoint=datasite_client.api.services.test.log)\n", + "def test_log_call_mock(endpoint): # noqa: F811\n", + " print(\"In Syft Function Context\")\n", + " endpoint.mock()\n", + " print(\"After API endpoint call\")\n", + " return True\n", + "\n", + "\n", + "@sy.syft_function_single_use(endpoint=datasite_client.api.services.test.log)\n", + "def test_log_call_private(endpoint): # noqa: F811\n", + " print(\"In Syft Function Context\")\n", + " endpoint.private()\n", + " print(\"After API endpoint call\")\n", + " return True\n", + "\n", + "\n", + "# Create a project\n", + "project = sy.Project(\n", + " name=\"My Cool Project\",\n", + " description=\"\"\"Hi, I want to calculate the mean of your private data,\\\n", + " pretty please!\"\"\",\n", + " members=[datasite_client],\n", + ")\n", + "project.create_code_request(test_log_call, datasite_client)\n", + "project.create_code_request(test_log_call_mock, datasite_client)\n", + "project.create_code_request(test_log_call_private, datasite_client)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "log_call_job = datasite_client.code.test_log_call(\n", + " endpoint=datasite_client.api.services.test.log, blocking=False\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "log_call_mock_job = datasite_client.code.test_log_call_mock(\n", + " endpoint=datasite_client.api.services.test.log, blocking=False\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "log_call_private_job = datasite_client.code.test_log_call_private(\n", + " endpoint=datasite_client.api.services.test.log, blocking=False\n", + ")" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": {}, + "outputs": [], + "source": [ + "# stdlib\n", + "import time\n", + "\n", + "# Iterate over the Jobs waiting them to finish their pipelines.\n", + "job_pool = [\n", + " (log_call_job, \"Logging Private Function Call\"),\n", + " (log_call_mock_job, \"Logging Public Function Call\"),\n", + " (log_call_private_job, \"Logging Private Function Call\"),\n", + "]\n", + "for job, expected_log in job_pool:\n", + " updated_job = datasite_client.api.services.job.get(job.id)\n", + " while updated_job.status.value != \"completed\":\n", + " updated_job = datasite_client.api.services.job.get(job.id)\n", + " time.sleep(1)\n", + " # If they're completed. Then, check if the TwinAPI print appears in the job logs.\n", + " assert expected_log in datasite_client.api.services.job.get(job.id).logs(\n", + " _print=False\n", + " )" + ] } ], "metadata": { + "kernelspec": { + "display_name": "Python 3 (ipykernel)", + "language": "python", + "name": "python3" + }, "language_info": { "codemirror_mode": { "name": "ipython", @@ -552,7 +691,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.12.5" + "version": "3.12.4" } }, "nbformat": 4, diff --git a/packages/syft/src/syft/protocol/protocol_version.json b/packages/syft/src/syft/protocol/protocol_version.json index 047d8571c04..5f2c6df848d 100644 --- a/packages/syft/src/syft/protocol/protocol_version.json +++ b/packages/syft/src/syft/protocol/protocol_version.json @@ -404,6 +404,11 @@ "version": 1, "hash": "c7addbaf2777707f3e91e5c1e092343476cd22efc4ec8617f39ccf76e61a5a14", "action": "add" + }, + "2": { + "version": 2, + "hash": "846ba36e8737a1bec16853c9de54c4948450009278e0b76fe7e3355ef9e70089", + "action": "add" } }, "DataSubject": { diff --git a/packages/syft/src/syft/server/server.py b/packages/syft/src/syft/server/server.py index a183f5135d5..0dd1eefa41c 100644 --- a/packages/syft/src/syft/server/server.py +++ b/packages/syft/src/syft/server/server.py @@ -1266,6 +1266,7 @@ def add_api_endpoint_execution_to_queue( credentials: SyftVerifyKey, method: str, path: str, + log_id: UID, *args: Any, worker_pool: str | None = None, **kwargs: Any, @@ -1294,7 +1295,7 @@ def add_api_endpoint_execution_to_queue( job_id=job_id, worker_settings=worker_settings, args=args, - kwargs={"path": path, **kwargs}, + kwargs={"path": path, "log_id": log_id, **kwargs}, has_execute_permissions=True, worker_pool=worker_pool_ref, # set worker pool reference as part of queue item ) @@ -1385,9 +1386,11 @@ def add_queueitem_to_queue( action: Action | None = None, parent_job_id: UID | None = None, user_id: UID | None = None, + log_id: UID | None = None, job_type: JobType = JobType.JOB, ) -> Job: - log_id = UID() + if log_id is None: + log_id = UID() role = self.get_role_for_credentials(credentials=credentials) context = AuthedServiceContext(server=self, credentials=credentials, role=role) diff --git a/packages/syft/src/syft/service/action/action_endpoint.py b/packages/syft/src/syft/service/action/action_endpoint.py index 90248c5f60d..32cc2128541 100644 --- a/packages/syft/src/syft/service/action/action_endpoint.py +++ b/packages/syft/src/syft/service/action/action_endpoint.py @@ -2,14 +2,19 @@ from __future__ import annotations # stdlib +from collections.abc import Callable from enum import Enum from enum import auto from typing import Any # relative from ...serde.serializable import serializable +from ...types.syft_migration import migrate from ...types.syft_object import SYFT_OBJECT_VERSION_1 +from ...types.syft_object import SYFT_OBJECT_VERSION_2 from ...types.syft_object import SyftObject +from ...types.transforms import drop +from ...types.transforms import make_set_default from ...types.uid import UID from ..context import AuthedServiceContext @@ -21,15 +26,28 @@ class EXECUTION_MODE(Enum): @serializable() -class CustomEndpointActionObject(SyftObject): +class CustomEndpointActionObjectV1(SyftObject): __canonical_name__ = "CustomEndpointActionObject" __version__ = SYFT_OBJECT_VERSION_1 endpoint_id: UID context: AuthedServiceContext | None = None - def add_context(self, context: AuthedServiceContext) -> CustomEndpointActionObject: + +@serializable() +class CustomEndpointActionObject(SyftObject): + __canonical_name__ = "CustomEndpointActionObject" + __version__ = SYFT_OBJECT_VERSION_2 + + endpoint_id: UID + context: AuthedServiceContext | None = None + log_id: UID | None = None + + def add_context( + self, context: AuthedServiceContext, log_id: UID | None = None + ) -> CustomEndpointActionObject: self.context = context + self.log_id = log_id return self def __call__(self, *args: Any, **kwargs: Any) -> Any: @@ -69,10 +87,24 @@ def __call_function( __endpoint_mode = endpoint_service.execute_server_side_endpoint_by_id return __endpoint_mode( - *args, context=self.context, endpoint_uid=self.endpoint_id, **kwargs + *args, + context=self.context, + endpoint_uid=self.endpoint_id, + log_id=self.log_id, + **kwargs, ).unwrap() def __check_context(self) -> AuthedServiceContext: if self.context is None: raise Exception("No context provided to CustomEndpointActionObject") return self.context + + +@migrate(CustomEndpointActionObjectV1, CustomEndpointActionObject) +def migrate_custom_endpoint_v1_to_v2() -> list[Callable]: + return [make_set_default("log_id", None)] + + +@migrate(CustomEndpointActionObject, CustomEndpointActionObjectV1) +def migrate_custom_endpoint_v2_to_v1() -> list[Callable]: + return [drop(["log_id"])] diff --git a/packages/syft/src/syft/service/api/api.py b/packages/syft/src/syft/service/api/api.py index 3798e8ca2ce..ed9d5837f15 100644 --- a/packages/syft/src/syft/service/api/api.py +++ b/packages/syft/src/syft/service/api/api.py @@ -37,6 +37,7 @@ from ..response import SyftError from ..user.user import UserView from ..user.user_service import UserService +from .utils import print as log_print NOT_ACCESSIBLE_STRING = "N / A" @@ -439,7 +440,13 @@ def select_code( return self.private_function return self.mock_function - def exec(self, context: AuthedServiceContext, *args: Any, **kwargs: Any) -> Any: + def exec( + self, + context: AuthedServiceContext, + *args: Any, + log_id: UID | None = None, + **kwargs: Any, + ) -> Any: """Execute the code based on the user's permissions and public code availability. Args: @@ -450,19 +457,29 @@ def exec(self, context: AuthedServiceContext, *args: Any, **kwargs: Any) -> Any: Any: The result of the executed code. """ selected_code = self.select_code(context) - return self.exec_code(selected_code, context, *args, **kwargs) + return self.exec_code(selected_code, context, *args, log_id=log_id, **kwargs) def exec_mock_function( - self, context: AuthedServiceContext, *args: Any, **kwargs: Any + self, + context: AuthedServiceContext, + *args: Any, + log_id: UID | None = None, + **kwargs: Any, ) -> Any: """Execute the public code if it exists.""" if self.mock_function: - return self.exec_code(self.mock_function, context, *args, **kwargs) + return self.exec_code( + self.mock_function, context, *args, log_id=log_id, **kwargs + ) raise SyftException(public_message="No public code available") def exec_private_function( - self, context: AuthedServiceContext, *args: Any, **kwargs: Any + self, + context: AuthedServiceContext, + *args: Any, + log_id: UID | None = None, + **kwargs: Any, ) -> Any: """Execute the private code if user is has the proper permissions. @@ -477,7 +494,9 @@ def exec_private_function( raise SyftException(public_message="No private code available") if self.has_permission(context): - return self.exec_code(self.private_function, context, *args, **kwargs) + return self.exec_code( + self.private_function, context, *args, log_id=log_id, **kwargs + ) raise SyftException(public_message="You're not allowed to run this code.") @@ -504,9 +523,21 @@ def exec_code( code: PrivateAPIEndpoint | PublicAPIEndpoint, context: AuthedServiceContext, *args: Any, + log_id: UID | None = None, **kwargs: Any, ) -> Any: + # stdlib + import builtins as __builtin__ + import functools + + original_print = __builtin__.print + try: + if log_id is not None: + print = functools.partial(log_print, context, log_id) + else: + print = original_print # type: ignore + inner_function = ast.parse(code.api_code).body[0] inner_function.decorator_list = [] # compile the function @@ -516,19 +547,21 @@ def exec_code( user_client = self.get_user_client_from_server(context) admin_client = self.get_admin_client_from_server(context) - # load it - exec(raw_byte_code) # nosec - internal_context = code.build_internal_context( context=context, admin_client=admin_client, user_client=user_client ) + evil_string = f"{code.func_name}(*args, **kwargs,context=internal_context)" + + _globals = {"print": print} + # load it + exec(raw_byte_code, _globals, locals()) # nosec # execute it evil_string = f"{code.func_name}(*args, **kwargs,context=internal_context)" result = None try: # users can raise SyftException in their code - result = eval(evil_string, None, locals()) # nosec + result = eval(evil_string, _globals, locals()) # nosec except SyftException as e: # capture it as the result variable result = e @@ -546,11 +579,13 @@ def exec_code( context.server.get_service("userservice").admin_verify_key(), self ).unwrap() + print = original_print # type: ignore # if we caught a SyftException above we will raise and auto wrap to Result if isinstance(result, SyftException): raise result # here we got a non Exception result which will also be wrapped in Result + # return the results return result except Exception as e: # If it's admin, return the error message. diff --git a/packages/syft/src/syft/service/api/api_service.py b/packages/syft/src/syft/service/api/api_service.py index fd935454443..cc9b34d940b 100644 --- a/packages/syft/src/syft/service/api/api_service.py +++ b/packages/syft/src/syft/service/api/api_service.py @@ -337,13 +337,14 @@ def _call_in_jobs( context=context, endpoint_path=path, ).unwrap() - + log_id = UID() job = context.server.add_api_endpoint_execution_to_queue( context.credentials, method, path, *args, worker_pool=custom_endpoint.worker_pool, + log_id=log_id, **kwargs, ) @@ -435,6 +436,7 @@ def call( context: AuthedServiceContext, path: str, *args: Any, + log_id: UID | None = None, **kwargs: Any, ) -> SyftSuccess: """Call a Custom API Method""" @@ -443,7 +445,9 @@ def call( endpoint_path=path, ).unwrap() - exec_result = custom_endpoint.exec(context, *args, **kwargs).unwrap() + exec_result = custom_endpoint.exec( + context, *args, log_id=log_id, **kwargs + ).unwrap() action_obj = ActionObject.from_obj(exec_result) action_service = cast(ActionService, context.server.get_service(ActionService)) try: @@ -466,6 +470,7 @@ def call_public( context: AuthedServiceContext, path: str, *args: Any, + log_id: UID | None = None, **kwargs: Any, ) -> ActionObject: """Call a Custom API Method in public mode""" @@ -474,7 +479,7 @@ def call_public( endpoint_path=path, ).unwrap() exec_result = custom_endpoint.exec_mock_function( - context, *args, **kwargs + context, *args, log_id=log_id, **kwargs ).unwrap() action_obj = ActionObject.from_obj(exec_result) @@ -501,6 +506,7 @@ def call_private( context: AuthedServiceContext, path: str, *args: Any, + log_id: UID | None = None, **kwargs: Any, ) -> ActionObject: """Call a Custom API Method in private mode""" @@ -510,7 +516,7 @@ def call_private( ).unwrap() exec_result = custom_endpoint.exec_private_function( - context, *args, **kwargs + context, *args, log_id=log_id, **kwargs ).unwrap() action_obj = ActionObject.from_obj(exec_result) @@ -546,6 +552,7 @@ def execute_server_side_endpoint_by_id( context: AuthedServiceContext, endpoint_uid: UID, *args: Any, + log_id: UID | None = None, **kwargs: Any, ) -> Any: endpoint = self.get_endpoint_by_uid(context, endpoint_uid).unwrap() @@ -553,7 +560,9 @@ def execute_server_side_endpoint_by_id( if not selected_code: selected_code = endpoint.mock_function - return endpoint.exec_code(selected_code, context, *args, **kwargs).unwrap() + return endpoint.exec_code( + selected_code, context, *args, log_id=log_id, **kwargs + ).unwrap() @as_result(StashException, NotFoundException, SyftException) def execute_service_side_endpoint_private_by_id( @@ -561,11 +570,12 @@ def execute_service_side_endpoint_private_by_id( context: AuthedServiceContext, endpoint_uid: UID, *args: Any, + log_id: UID | None = None, **kwargs: Any, ) -> Any: endpoint = self.get_endpoint_by_uid(context, endpoint_uid).unwrap() return endpoint.exec_code( - endpoint.private_function, context, *args, **kwargs + endpoint.private_function, context, *args, log_id=log_id, **kwargs ).unwrap() @as_result(StashException, NotFoundException, SyftException) @@ -574,11 +584,12 @@ def execute_server_side_endpoint_mock_by_id( context: AuthedServiceContext, endpoint_uid: UID, *args: Any, + log_id: UID | None = None, **kwargs: Any, ) -> Any: endpoint = self.get_endpoint_by_uid(context, endpoint_uid).unwrap() return endpoint.exec_code( - endpoint.mock_function, context, *args, **kwargs + endpoint.mock_function, context, *args, log_id=log_id, **kwargs ).unwrap() @as_result(StashException, NotFoundException) diff --git a/packages/syft/src/syft/service/api/utils.py b/packages/syft/src/syft/service/api/utils.py new file mode 100644 index 00000000000..5194d7de9cf --- /dev/null +++ b/packages/syft/src/syft/service/api/utils.py @@ -0,0 +1,45 @@ +# stdlib +import builtins as __builtin__ +import datetime +import sys +from typing import Any + +# relative +from ...types.uid import UID +from ..action.action_object import ActionObject +from ..context import AuthedServiceContext +from ..job.job_stash import Job +from ..response import SyftError + + +def print( + context: AuthedServiceContext, + log_id: UID, + *args: Any, + sep: str = " ", + end: str = "\n", +) -> str | None: + def to_str(arg: Any) -> str: + if isinstance(arg, bytes): + return arg.decode("utf-8") + if isinstance(arg, Job): + return f"JOB: {arg.id}" + if isinstance(arg, SyftError): + return f"JOB: {arg.message}" + if isinstance(arg, ActionObject): + return str(arg.syft_action_data) + return str(arg) + + new_args = [to_str(arg) for arg in args] + new_str = sep.join(new_args) + end + if context.server is not None: + log_service = context.server.get_service("LogService") + log_service.append(context=context, uid=log_id, new_str=new_str) + time = datetime.datetime.now().strftime("%d/%m/%y %H:%M:%S") + return __builtin__.print( + f"{time} FUNCTION LOG :", + *new_args, + end=end, + sep=sep, + file=sys.stderr, + ) diff --git a/packages/syft/src/syft/service/code/user_code.py b/packages/syft/src/syft/service/code/user_code.py index 8d20572c0cc..87f3c8b8da6 100644 --- a/packages/syft/src/syft/service/code/user_code.py +++ b/packages/syft/src/syft/service/code/user_code.py @@ -1834,9 +1834,10 @@ def to_str(arg: Any) -> str: if code_item.uses_datasite: kwargs["datasite"] = LocalDatasiteClient() + job_log_id = context.job.log_id if context.job else None for k, v in kwargs.items(): if isinstance(v, CustomEndpointActionObject): - kwargs[k] = v.add_context(context=context) + kwargs[k] = v.add_context(context=context, log_id=job_log_id) stdout = StringIO() stderr = StringIO()