From ebd6bffd0bf1b923915d6ec1089444882d0f046c Mon Sep 17 00:00:00 2001 From: Ubuntu Date: Tue, 9 Jan 2024 13:41:31 +0000 Subject: [PATCH 1/4] fix default worker not being spawned on hot-reload - force remove container if container already exists - fix passing user password correctly to worker image on image built - rename DocekrWorkerConfigPK to WorkerConfigPK --- packages/syft/src/syft/node/node.py | 46 ++++++++++++------- .../syft/src/syft/service/worker/utils.py | 2 +- .../syft/service/worker/worker_image_stash.py | 6 +-- .../service/worker/worker_pool_service.py | 2 +- 4 files changed, 34 insertions(+), 22 deletions(-) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 240dc31ba1a..4458a0a1144 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -1435,26 +1435,38 @@ def create_default_worker_pool(node: Node) -> Optional[SyftError]: print("Building Default Worker Image") - # Build the Image for given tag - result = image_build_method( - context, image_uid=default_image.id, tag=DEFAULT_WORKER_IMAGE_TAG - ) - - if isinstance(result, SyftError): - print("Failed to build default worker image: ", result.message) - return + if not default_image.built_at: + # Build the Image for given tag + result = image_build_method( + context, image_uid=default_image.id, tag=DEFAULT_WORKER_IMAGE_TAG + ) - create_pool_method = node.get_service_method(SyftWorkerPoolService.create_pool) + if isinstance(result, SyftError): + print("Failed to build default worker image: ", result.message) + return + default_worker_pool = node.get_default_worker_pool() worker_count = node.queue_config.client_config.n_consumers - print("Creating default Worker Pool") - result = create_pool_method( - context, - name=DEFAULT_WORKER_POOL_NAME, - image_uid=default_image.id, - number=worker_count, - ) + # Create worker pool if it doesn't exists + if default_worker_pool is None: + create_pool_method = node.get_service_method(SyftWorkerPoolService.create_pool) + print("Creating default Worker Pool") + result = create_pool_method( + context, + name=DEFAULT_WORKER_POOL_NAME, + image_uid=default_image.id, + number=worker_count, + ) + + else: + # Else add a worker to existing worker pool + add_worker_method = node.get_service_method(SyftWorkerPoolService.add_workers) + result = add_worker_method( + context=context, + number=worker_count, + pool_name=DEFAULT_WORKER_POOL_NAME + ) if isinstance(result, SyftError): print(f"Failed to create Worker for Default workers. Error: {result.message}") @@ -1465,7 +1477,7 @@ def create_default_worker_pool(node: Node) -> Optional[SyftError]: if container_status.error: print( f"Failed to create container: Worker: {container_status.worker}," - "Error: {container_status.error}" + f"Error: {container_status.error}" ) return diff --git a/packages/syft/src/syft/service/worker/utils.py b/packages/syft/src/syft/service/worker/utils.py index 5caf700bd0e..2fa70c3b2c3 100644 --- a/packages/syft/src/syft/service/worker/utils.py +++ b/packages/syft/src/syft/service/worker/utils.py @@ -139,7 +139,7 @@ def run_container_using_docker( container_name=container_name, ) if existing_container: - existing_container.stop() + existing_container.remove(force=True) # Extract Config from backend container backend_host_config = extract_config_from_backend( diff --git a/packages/syft/src/syft/service/worker/worker_image_stash.py b/packages/syft/src/syft/service/worker/worker_image_stash.py index d83edcfcdc6..819e218b886 100644 --- a/packages/syft/src/syft/service/worker/worker_image_stash.py +++ b/packages/syft/src/syft/service/worker/worker_image_stash.py @@ -7,7 +7,7 @@ from result import Result # relative -from ...custom_worker.config import DockerWorkerConfig +from ...custom_worker.config import DockerWorkerConfig, WorkerConfig from ...node.credentials import SyftVerifyKey from ...serde.serializable import serializable from ...store.document_store import BaseUIDStoreStash @@ -19,7 +19,7 @@ from ..action.action_permissions import ActionPermission from .worker_image import SyftWorkerImage -DockerWorkerConfigPK = PartitionKey(key="config", type_=DockerWorkerConfig) +WorkerConfigPK = PartitionKey(key="config", type_=WorkerConfig) @serializable() @@ -59,5 +59,5 @@ def set( def get_by_docker_config( self, credentials: SyftVerifyKey, config: DockerWorkerConfig ): - qks = QueryKeys(qks=[DockerWorkerConfigPK.with_obj(config)]) + qks = QueryKeys(qks=[WorkerConfigPK.with_obj(config)]) return self.query_one(credentials=credentials, qks=qks) diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index 6bd8143833d..3407143b07e 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -114,7 +114,7 @@ def create_pool( worker_image=worker_image, worker_stash=self.worker_stash, reg_username=reg_username, - reg_password=reg_username, + reg_password=reg_password, ) worker_pool = WorkerPool( From 8b4e313b0b24673a7382757797c8e048896c1262 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 10 Jan 2024 11:52:37 +0530 Subject: [PATCH 2/4] fix lint --- packages/syft/src/syft/node/node.py | 4 +--- packages/syft/src/syft/service/worker/worker_image_stash.py | 3 ++- 2 files changed, 3 insertions(+), 4 deletions(-) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 4458a0a1144..6bcaed79c10 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -1463,9 +1463,7 @@ def create_default_worker_pool(node: Node) -> Optional[SyftError]: # Else add a worker to existing worker pool add_worker_method = node.get_service_method(SyftWorkerPoolService.add_workers) result = add_worker_method( - context=context, - number=worker_count, - pool_name=DEFAULT_WORKER_POOL_NAME + context=context, number=worker_count, pool_name=DEFAULT_WORKER_POOL_NAME ) if isinstance(result, SyftError): diff --git a/packages/syft/src/syft/service/worker/worker_image_stash.py b/packages/syft/src/syft/service/worker/worker_image_stash.py index 819e218b886..c2e3a5a1d5d 100644 --- a/packages/syft/src/syft/service/worker/worker_image_stash.py +++ b/packages/syft/src/syft/service/worker/worker_image_stash.py @@ -7,7 +7,8 @@ from result import Result # relative -from ...custom_worker.config import DockerWorkerConfig, WorkerConfig +from ...custom_worker.config import DockerWorkerConfig +from ...custom_worker.config import WorkerConfig from ...node.credentials import SyftVerifyKey from ...serde.serializable import serializable from ...store.document_store import BaseUIDStoreStash From 58c157f7bd1cf29ef0e5db3bba86e388501da2bb Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 10 Jan 2024 12:34:14 +0530 Subject: [PATCH 3/4] add missing associate jobs during message consumption --- packages/syft/src/syft/service/queue/zmq_queue.py | 1 + 1 file changed, 1 insertion(+) diff --git a/packages/syft/src/syft/service/queue/zmq_queue.py b/packages/syft/src/syft/service/queue/zmq_queue.py index 23f9392adf5..a62a8a9e8ba 100644 --- a/packages/syft/src/syft/service/queue/zmq_queue.py +++ b/packages/syft/src/syft/service/queue/zmq_queue.py @@ -577,6 +577,7 @@ def _run(self): # Call Message Handler try: message = msg.pop() + self.associate_job(message) self.message_handler.handle_message( message=message, syft_worker_id=self.syft_worker_id, From 04d116caa158474f482206191f792ca499999520 Mon Sep 17 00:00:00 2001 From: Shubham Gupta Date: Wed, 10 Jan 2024 16:01:30 +0530 Subject: [PATCH 4/4] use is_built property to check if image is built --- packages/syft/src/syft/node/node.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/packages/syft/src/syft/node/node.py b/packages/syft/src/syft/node/node.py index 6bcaed79c10..e83a1c56d8b 100644 --- a/packages/syft/src/syft/node/node.py +++ b/packages/syft/src/syft/node/node.py @@ -1435,7 +1435,7 @@ def create_default_worker_pool(node: Node) -> Optional[SyftError]: print("Building Default Worker Image") - if not default_image.built_at: + if not default_image.is_built: # Build the Image for given tag result = image_build_method( context, image_uid=default_image.id, tag=DEFAULT_WORKER_IMAGE_TAG