From 64b7778bc69a6fb83f8d1e397477ef5e4c83070b Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Tue, 16 Jan 2024 14:39:26 +0800 Subject: [PATCH 1/8] Move worker_pool_service.get_worker to worker_service.get --- .../service/worker/worker_pool_service.py | 53 ------------------- .../src/syft/service/worker/worker_service.py | 29 ++++++++++ 2 files changed, 29 insertions(+), 53 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index d21d41c578c..8ed8cf7ab7a 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -37,7 +37,6 @@ from .worker_pool import WorkerPool from .worker_pool import WorkerStatus from .worker_pool import _get_worker_container -from .worker_pool import _get_worker_container_status from .worker_pool_stash import SyftWorkerPoolStash from .worker_service import WorkerService from .worker_stash import WorkerStash @@ -288,58 +287,6 @@ def filter_by_image_id( return result.ok() - @service_method( - path="worker_pool.get_worker", - name="get_worker", - roles=DATA_SCIENTIST_ROLE_LEVEL, - ) - def get_worker( - self, context: AuthedServiceContext, worker_pool_id: UID, worker_id: UID - ) -> Union[SyftWorker, SyftError]: - worker_pool_worker = self._get_worker_pool_and_worker( - context, worker_pool_id, worker_id - ) - if isinstance(worker_pool_worker, SyftError): - return worker_pool_worker - - _, linked_worker = worker_pool_worker - - result = linked_worker.resolve_with_context(context=context) - - if result.is_err(): - return SyftError( - message=f"Failed to retrieve Linked SyftWorker {linked_worker.object_uid}" - ) - - worker = result.ok() - - if context.node.in_memory_workers: - return worker - - with contextlib.closing(docker.from_env()) as client: - worker_status = _get_worker_container_status(client, worker) - - if isinstance(worker_status, SyftError): - return worker_status - - if worker_status != WorkerStatus.PENDING: - worker.status = worker_status - - result = self.worker_stash.update( - credentials=context.credentials, - obj=worker, - ) - - return ( - SyftError( - message=f"Failed to update worker status. Error: {result.err()}" - ) - if result.is_err() - else worker - ) - - return worker - @service_method( path="worker_pool.get_worker_status", name="get_worker_status", diff --git a/packages/syft/src/syft/service/worker/worker_service.py b/packages/syft/src/syft/service/worker/worker_service.py index 37226d96952..209eb152200 100644 --- a/packages/syft/src/syft/service/worker/worker_service.py +++ b/packages/syft/src/syft/service/worker/worker_service.py @@ -205,6 +205,35 @@ def status( return worker.status, worker.healthcheck + @service_method( + path="worker.get", + name="get", + roles=DATA_SCIENTIST_ROLE_LEVEL, + ) + def get( + self, context: AuthedServiceContext, uid: UID + ) -> Union[SyftWorker, SyftError]: + result = self.stash.get_by_uid(credentials=context.credentials, uid=uid) + if result.is_err(): + return SyftError(message=f"Failed to retrieve worker with UID {uid}") + worker: SyftWorker = result.ok() + + if context.node.in_memory_workers: + return worker + + result = check_and_update_status_for_worker( + worker=worker, + worker_stash=self.stash, + credentials=context.credentials, + ) + + if result.is_err(): + return SyftError( + message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" + ) + + return result.ok() + def check_and_update_status_for_worker( worker: SyftWorker, From 906c81cf2cba9209ce3a758eab9dab09e5fee500 Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Tue, 16 Jan 2024 14:44:40 +0800 Subject: [PATCH 2/8] Improve check_and_update_status_for_worker * Pass in DockerClient instead of creating on the fly to improve efficiency (use only 1 DockerClient in case of looping through multiple workers) * Rename to _check_and_update_status_for_worker --- .../src/syft/service/worker/worker_service.py | 52 +++++++++++-------- 1 file changed, 29 insertions(+), 23 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_service.py b/packages/syft/src/syft/service/worker/worker_service.py index 209eb152200..745ef3cbefb 100644 --- a/packages/syft/src/syft/service/worker/worker_service.py +++ b/packages/syft/src/syft/service/worker/worker_service.py @@ -160,17 +160,19 @@ def list(self, context: AuthedServiceContext) -> Union[SyftSuccess, SyftError]: return workers # If container workers, check their statuses - for idx, worker in enumerate(workers): - result = check_and_update_status_for_worker( - worker=worker, - worker_stash=self.stash, - credentials=context.credentials, - ) - if result.is_err(): - return SyftError( - message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" + with contextlib.closing(docker.from_env()) as client: + for idx, worker in enumerate(workers): + result = _check_and_update_status_for_worker( + client=client, + worker=worker, + worker_stash=self.stash, + credentials=context.credentials, ) - workers[idx] = worker + if result.is_err(): + return SyftError( + message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" + ) + workers[idx] = worker return workers @@ -190,11 +192,13 @@ def status( if context.node.in_memory_workers: return worker.status, worker.healthcheck - result = check_and_update_status_for_worker( - worker=worker, - worker_stash=self.stash, - credentials=context.credentials, - ) + with contextlib.closing(docker.from_env()) as client: + result = _check_and_update_status_for_worker( + client=client, + worker=worker, + worker_stash=self.stash, + credentials=context.credentials, + ) if result.is_err(): return SyftError( @@ -221,11 +225,13 @@ def get( if context.node.in_memory_workers: return worker - result = check_and_update_status_for_worker( - worker=worker, - worker_stash=self.stash, - credentials=context.credentials, - ) + with contextlib.closing(docker.from_env()) as client: + result = _check_and_update_status_for_worker( + client=client, + worker=worker, + worker_stash=self.stash, + credentials=context.credentials, + ) if result.is_err(): return SyftError( @@ -235,13 +241,13 @@ def get( return result.ok() -def check_and_update_status_for_worker( +def _check_and_update_status_for_worker( + client: docker.DockerClient, worker: SyftWorker, worker_stash: WorkerStash, credentials: SyftVerifyKey, ) -> Result[SyftWorker, str]: - with contextlib.closing(docker.from_env()) as client: - worker_status = _get_worker_container_status(client, worker) + worker_status = _get_worker_container_status(client, worker) if isinstance(worker_status, SyftError): return worker_status From 81cb0d0b01f1b8202f03248c4ef361ab828d7857 Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Tue, 16 Jan 2024 14:47:37 +0800 Subject: [PATCH 3/8] Use worker with updated status --- packages/syft/src/syft/service/worker/worker_service.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/src/syft/service/worker/worker_service.py b/packages/syft/src/syft/service/worker/worker_service.py index 745ef3cbefb..029a4d1c7e4 100644 --- a/packages/syft/src/syft/service/worker/worker_service.py +++ b/packages/syft/src/syft/service/worker/worker_service.py @@ -172,7 +172,7 @@ def list(self, context: AuthedServiceContext) -> Union[SyftSuccess, SyftError]: return SyftError( message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" ) - workers[idx] = worker + workers[idx] = result.ok() return workers From b6ae27d47c51f9583418f8b4661f7969c7b5a7d1 Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Tue, 16 Jan 2024 15:04:31 +0800 Subject: [PATCH 4/8] Refactor _check_and_update_status_for_worker to include error message --- .../src/syft/service/worker/worker_service.py | 71 ++++++++----------- 1 file changed, 31 insertions(+), 40 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_service.py b/packages/syft/src/syft/service/worker/worker_service.py index 029a4d1c7e4..eaff2205c4e 100644 --- a/packages/syft/src/syft/service/worker/worker_service.py +++ b/packages/syft/src/syft/service/worker/worker_service.py @@ -4,10 +4,10 @@ from typing import List from typing import Tuple from typing import Union +from typing import cast # third party import docker -from result import Result # relative from ...node.credentials import SyftVerifyKey @@ -162,17 +162,17 @@ def list(self, context: AuthedServiceContext) -> Union[SyftSuccess, SyftError]: # If container workers, check their statuses with contextlib.closing(docker.from_env()) as client: for idx, worker in enumerate(workers): - result = _check_and_update_status_for_worker( + worker_ = _check_and_update_status_for_worker( client=client, worker=worker, worker_stash=self.stash, credentials=context.credentials, ) - if result.is_err(): - return SyftError( - message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" - ) - workers[idx] = result.ok() + + if not isinstance(worker_, SyftWorker): + return worker_ + + workers[idx] = worker_ return workers @@ -184,28 +184,10 @@ def status( context: AuthedServiceContext, uid: UID, ) -> Union[Tuple[WorkerStatus, WorkerHealth], SyftError]: - result = self.stash.get_by_uid(credentials=context.credentials, uid=uid) - if result.is_err(): - return SyftError(message=f"Failed to retrieve worker with UID {uid}") - worker: SyftWorker = result.ok() - - if context.node.in_memory_workers: - return worker.status, worker.healthcheck - - with contextlib.closing(docker.from_env()) as client: - result = _check_and_update_status_for_worker( - client=client, - worker=worker, - worker_stash=self.stash, - credentials=context.credentials, - ) - - if result.is_err(): - return SyftError( - message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" - ) + worker = self.get(context=context, uid=uid) - worker = result.ok() + if not isinstance(worker, SyftWorker): + return worker return worker.status, worker.healthcheck @@ -217,28 +199,31 @@ def status( def get( self, context: AuthedServiceContext, uid: UID ) -> Union[SyftWorker, SyftError]: - result = self.stash.get_by_uid(credentials=context.credentials, uid=uid) - if result.is_err(): - return SyftError(message=f"Failed to retrieve worker with UID {uid}") - worker: SyftWorker = result.ok() + worker = self._get_worker(context, uid) + if not isinstance(worker, SyftWorker): + return worker if context.node.in_memory_workers: return worker with contextlib.closing(docker.from_env()) as client: - result = _check_and_update_status_for_worker( + return _check_and_update_status_for_worker( client=client, worker=worker, worker_stash=self.stash, credentials=context.credentials, ) - if result.is_err(): - return SyftError( - message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" - ) + def _get_worker( + self, context: AuthedServiceContext, uid: UID + ) -> Union[SyftWorker, SyftError]: + result = self.stash.get_by_uid(credentials=context.credentials, uid=uid) - return result.ok() + return ( + SyftError(message=f"Failed to retrieve worker with UID {uid}") + if result.is_err() + else cast(SyftWorker, result.ok()) + ) def _check_and_update_status_for_worker( @@ -246,7 +231,7 @@ def _check_and_update_status_for_worker( worker: SyftWorker, worker_stash: WorkerStash, credentials: SyftVerifyKey, -) -> Result[SyftWorker, str]: +) -> Union[SyftWorker, SyftError]: worker_status = _get_worker_container_status(client, worker) if isinstance(worker_status, SyftError): @@ -261,4 +246,10 @@ def _check_and_update_status_for_worker( obj=worker, ) - return result + return ( + SyftError( + message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" + ) + if result.is_err() + else cast(SyftWorker, result.ok()) + ) From abab0aec4f410e860866a0808619b85f184eff14 Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Tue, 16 Jan 2024 15:17:02 +0800 Subject: [PATCH 5/8] Remove redundant helper function --- .../src/syft/service/worker/worker_service.py | 19 +++++-------------- 1 file changed, 5 insertions(+), 14 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_service.py b/packages/syft/src/syft/service/worker/worker_service.py index eaff2205c4e..2ac3af317c2 100644 --- a/packages/syft/src/syft/service/worker/worker_service.py +++ b/packages/syft/src/syft/service/worker/worker_service.py @@ -199,9 +199,11 @@ def status( def get( self, context: AuthedServiceContext, uid: UID ) -> Union[SyftWorker, SyftError]: - worker = self._get_worker(context, uid) - if not isinstance(worker, SyftWorker): - return worker + result = self.stash.get_by_uid(credentials=context.credentials, uid=uid) + if result.is_err(): + return SyftError(message=f"Failed to retrieve worker with UID {uid}") + + worker = cast(SyftWorker, result.ok()) if context.node.in_memory_workers: return worker @@ -214,17 +216,6 @@ def get( credentials=context.credentials, ) - def _get_worker( - self, context: AuthedServiceContext, uid: UID - ) -> Union[SyftWorker, SyftError]: - result = self.stash.get_by_uid(credentials=context.credentials, uid=uid) - - return ( - SyftError(message=f"Failed to retrieve worker with UID {uid}") - if result.is_err() - else cast(SyftWorker, result.ok()) - ) - def _check_and_update_status_for_worker( client: docker.DockerClient, From 01c89f3cfd92d55ea1495cf404ff14ae9a02fec4 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Tue, 16 Jan 2024 17:08:17 +0530 Subject: [PATCH 6/8] handle worker does not exists for given uid in worker.get API - remove unused code --- .../src/syft/service/worker/worker_service.py | 91 +------------------ 1 file changed, 3 insertions(+), 88 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_service.py b/packages/syft/src/syft/service/worker/worker_service.py index 2ac3af317c2..e1f1f5880f7 100644 --- a/packages/syft/src/syft/service/worker/worker_service.py +++ b/packages/syft/src/syft/service/worker/worker_service.py @@ -1,6 +1,5 @@ # stdlib import contextlib -import socket from typing import List from typing import Tuple from typing import Union @@ -31,93 +30,6 @@ from .worker_pool import _get_worker_container_status from .worker_stash import WorkerStash -WORKER_NUM = 0 - - -def get_main_backend(): - hostname = socket.gethostname() - return f"{hostname}-backend-1" - - -def start_worker_container( - worker_num: int, context: AuthedServiceContext, syft_worker_uid -): - client = docker.from_env() - existing_container_name = get_main_backend() - hostname = socket.gethostname() - worker_name = f"{hostname}-worker-{worker_num}" - return create_new_container_from_existing( - worker_name=worker_name, - client=client, - existing_container_name=existing_container_name, - syft_worker_uid=syft_worker_uid, - ) - - -def create_new_container_from_existing( - worker_name: str, - client: docker.client.DockerClient, - existing_container_name: str, - syft_worker_uid, -) -> docker.models.containers.Container: - # Get the existing container - existing_container = client.containers.get(existing_container_name) - - # Inspect the existing container - details = existing_container.attrs - - # Extract relevant settings - image = details["Config"]["Image"] - command = details["Config"]["Cmd"] - environment = details["Config"]["Env"] - ports = details["NetworkSettings"]["Ports"] - host_config = details["HostConfig"] - - volumes = {} - for vol in host_config["Binds"]: - parts = vol.split(":") - key = parts[0] - bind = parts[1] - mode = parts[2] - if "/storage" in bind: - # we need this because otherwise we are using the same node private key - # which will make account creation fail - worker_postfix = worker_name.split("-", 1)[1] - key = f"{key}-{worker_postfix}" - volumes[key] = {"bind": bind, "mode": mode} - - # we need this because otherwise we are using the same node private key - # which will make account creation fail - - environment = dict([e.split("=", 1) for e in environment]) - environment["CREATE_PRODUCER"] = "false" - environment["N_CONSUMERS"] = 1 - environment["DOCKER_WORKER_NAME"] = worker_name - environment["DEFAULT_ROOT_USERNAME"] = worker_name - environment["DEFAULT_ROOT_EMAIL"] = f"{worker_name}@openmined.org" - environment["PORT"] = str(8003 + WORKER_NUM) - environment["HTTP_PORT"] = str(88 + WORKER_NUM) - environment["HTTPS_PORT"] = str(446 + WORKER_NUM) - environment["SYFT_WORKER_UID"] = str(syft_worker_uid) - - environment.pop("NODE_PRIVATE_KEY", None) - - new_container = client.containers.create( - name=worker_name, - image=image, - command=command, - environment=environment, - ports=ports, - detach=True, - volumes=volumes, - tty=True, - stdin_open=True, - network_mode=f"container:{existing_container.id}", - ) - - new_container.start() - return new_container - @instrument @serializable() @@ -203,6 +115,9 @@ def get( if result.is_err(): return SyftError(message=f"Failed to retrieve worker with UID {uid}") + if result.ok() is None: + return SyftError(message=f"Worker doesn't exists for uid: {uid}") + worker = cast(SyftWorker, result.ok()) if context.node.in_memory_workers: From d4bc2a465ba96f75db36381a3e6b53091cb65940 Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Tue, 16 Jan 2024 19:46:09 +0800 Subject: [PATCH 7/8] Use type guard instead of cast --- packages/syft/src/syft/service/worker/worker_service.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_service.py b/packages/syft/src/syft/service/worker/worker_service.py index e1f1f5880f7..e6011ddd60e 100644 --- a/packages/syft/src/syft/service/worker/worker_service.py +++ b/packages/syft/src/syft/service/worker/worker_service.py @@ -115,10 +115,9 @@ def get( if result.is_err(): return SyftError(message=f"Failed to retrieve worker with UID {uid}") - if result.ok() is None: - return SyftError(message=f"Worker doesn't exists for uid: {uid}") - - worker = cast(SyftWorker, result.ok()) + worker = result.ok() + if worker is None: + return SyftError(message=f"Worker does not exist for UID {uid}") if context.node.in_memory_workers: return worker From 2b2823c6d906006ebc57eb31bb188881631af922 Mon Sep 17 00:00:00 2001 From: Kien Dang Date: Tue, 16 Jan 2024 19:47:09 +0800 Subject: [PATCH 8/8] Fix BaseStash.update typing --- packages/syft/src/syft/service/worker/worker_service.py | 3 +-- packages/syft/src/syft/store/document_store.py | 2 +- 2 files changed, 2 insertions(+), 3 deletions(-) diff --git a/packages/syft/src/syft/service/worker/worker_service.py b/packages/syft/src/syft/service/worker/worker_service.py index e6011ddd60e..b5bb331ba5b 100644 --- a/packages/syft/src/syft/service/worker/worker_service.py +++ b/packages/syft/src/syft/service/worker/worker_service.py @@ -3,7 +3,6 @@ from typing import List from typing import Tuple from typing import Union -from typing import cast # third party import docker @@ -156,5 +155,5 @@ def _check_and_update_status_for_worker( message=f"Failed to update status for worker: {worker.id}. Error: {result.err()}" ) if result.is_err() - else cast(SyftWorker, result.ok()) + else result.ok() ) diff --git a/packages/syft/src/syft/store/document_store.py b/packages/syft/src/syft/store/document_store.py index efad2a9a7d1..5151f43294c 100644 --- a/packages/syft/src/syft/store/document_store.py +++ b/packages/syft/src/syft/store/document_store.py @@ -699,7 +699,7 @@ def update( credentials: SyftVerifyKey, obj: BaseStash.object_type, has_permission=False, - ) -> Optional[Result[BaseStash.object_type, str]]: + ) -> Result[BaseStash.object_type, str]: qk = self.partition.store_query_key(obj) return self.partition.update( credentials=credentials, qk=qk, obj=obj, has_permission=has_permission