diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index dd12d571ca5..175eb901290 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -20,14 +20,8 @@ "source": [ "import syft as sy\n", "sy.requires(SYFT_VERSION)\n", - "from syft.service.worker.worker_image import SyftWorkerImage, SyftWorkerImageTag\n", - "from syft.custom_worker.config import DockerWorkerConfig\n", - "from syft.service.worker.worker_image import build_using_docker\n", - "from syft.service.worker.utils import run_container_using_docker\n", - "\n", - "\n", - "#third party\n", - "import docker" + "from syft.service.worker.worker_image import SyftWorkerImage\n", + "from syft.custom_worker.config import DockerWorkerConfig" ] }, { @@ -338,6 +332,52 @@ "second_worker = worker_pool.workers[1]" ] }, + { + "cell_type": "code", + "execution_count": null, + "id": "fda29eca", + "metadata": {}, + "outputs": [], + "source": [ + "raw_worker_logs = domain_client.api.services.worker_pool.worker_logs(\n", + " worker_pool_id=worker_pool.id, worker_id=second_worker.id, raw=True\n", + ")\n", + "raw_worker_logs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "1386d881", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(raw_worker_logs, bytes)" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "187cb1ee", + "metadata": {}, + "outputs": [], + "source": [ + "worker_logs = domain_client.api.services.worker_pool.worker_logs(\n", + " worker_pool_id=worker_pool.id, worker_id=second_worker.id\n", + ")\n", + "worker_logs" + ] + }, + { + "cell_type": "code", + "execution_count": null, + "id": "f08fc155", + "metadata": {}, + "outputs": [], + "source": [ + "assert isinstance(worker_logs, str)" + ] + }, { "cell_type": "code", "execution_count": null, @@ -345,8 +385,9 @@ "metadata": {}, "outputs": [], "source": [ - "worker_delete_res = domain_client.api.services.worker_pool.delete_worker(worker_pool_id=worker_pool.id,\n", - " worker_id=second_worker.id)" + "worker_delete_res = domain_client.api.services.worker_pool.delete_worker(\n", + " worker_pool_id=worker_pool.id, worker_id=second_worker.id\n", + ")" ] }, { @@ -366,7 +407,7 @@ "metadata": {}, "outputs": [], "source": [ - "assert isinstance(worker_delete_res,sy.SyftSuccess)" + "assert isinstance(worker_delete_res, sy.SyftSuccess)" ] }, { @@ -379,7 +420,7 @@ "# Refetch the worker pool\n", "# Ensure that the deleted worker's id is not present\n", "worker_pool = domain_client.api.services.worker_pool.get_all()[0]\n", - "assert len(worker_pool.workers)==2\n", + "assert len(worker_pool.workers) == 2\n", "for worker in worker_pool.workers:\n", " assert second_worker.id != worker.id" ] @@ -415,8 +456,9 @@ "source": [ "# delete the remaining workers\n", "for worker in worker_pool.workers:\n", - " res =domain_client.api.services.worker_pool.delete_worker(worker_pool_id=worker_pool.id,\n", - " worker_id= worker.id)\n", + " res =domain_client.api.services.worker_pool.delete_worker(\n", + " worker_pool_id=worker_pool.id, worker_id= worker.id\n", + " )\n", " assert isinstance(res, sy.SyftSuccess)" ] }, @@ -441,14 +483,6 @@ "assert isinstance(delete_res, sy.SyftSuccess)\n", "delete_res" ] - }, - { - "cell_type": "code", - "execution_count": null, - "id": "6e2769f9-d0aa-49d2-b9fe-077262229de8", - "metadata": {}, - "outputs": [], - "source": [] } ], "metadata": { diff --git a/packages/syft/src/syft/service/worker/worker_pool_service.py b/packages/syft/src/syft/service/worker/worker_pool_service.py index 0bdd3cf610a..efa76eca551 100644 --- a/packages/syft/src/syft/service/worker/worker_pool_service.py +++ b/packages/syft/src/syft/service/worker/worker_pool_service.py @@ -1,9 +1,13 @@ # stdlib from typing import List +from typing import Optional +from typing import Tuple from typing import Union +from typing import cast # third party import docker +from docker.models.containers import Container # relative from ...serde.serializable import serializable @@ -18,6 +22,7 @@ from .utils import run_containers from .worker_image_stash import SyftWorkerImageStash from .worker_pool import ContainerSpawnStatus +from .worker_pool import SyftWorker from .worker_pool import WorkerOrchestrationType from .worker_pool import WorkerPool from .worker_pool_stash import SyftWorkerPoolStash @@ -131,26 +136,19 @@ def delete_worker( worker_id: UID, force: bool = False, ) -> Union[SyftSuccess, SyftError]: - worker_pool = self.stash.get_by_uid( - credentials=context.credentials, uid=worker_pool_id + worker_pool_worker = self._get_worker_pool_and_worker( + context, worker_pool_id, worker_id ) - if worker_pool.is_err(): - return SyftError(message=f"{worker_pool.err()}") - - worker_pool: WorkerPool = worker_pool.ok() - worker = None - for w in worker_pool.workers: - if w.id == worker_id: - worker = w - break - if worker is None: - return SyftError( - message=f"Worker with id: {worker_id} not found in pool: {worker_pool.name}" - ) + if isinstance(worker_pool_worker, SyftError): + return worker_pool_worker + + worker_pool, worker = worker_pool_worker # delete the worker using docker client sdk - docker_client = docker.from_env() - docker_container = docker_client.containers.get(worker.container_id) + docker_container = _get_worker_container(worker) + if isinstance(docker_container, SyftError): + return docker_container + try: # stop the container docker_container.stop() @@ -179,3 +177,99 @@ def delete_worker( return SyftSuccess( message=f"Worker with id: {worker_id} deleted successfully from pool: {worker_pool.name}" ) + + @service_method( + path="worker_pool.worker_logs", + name="worker_logs", + roles=DATA_OWNER_ROLE_LEVEL, + ) + def worker_logs( + self, + context: AuthedServiceContext, + worker_pool_id: UID, + worker_id: UID, + raw: bool = False, + ) -> Union[bytes, str, SyftError]: + worker_pool_worker = self._get_worker_pool_and_worker( + context, worker_pool_id, worker_id + ) + if isinstance(worker_pool_worker, SyftError): + return worker_pool_worker + + _, worker = worker_pool_worker + + docker_container = _get_worker_container(worker) + if isinstance(docker_container, SyftError): + return docker_container + + try: + logs = cast(bytes, docker_container.logs()) + except docker.errors.APIError as e: + return SyftError( + f"Failed to get worker {worker.id} container logs. Error {e}" + ) + + return logs if raw else logs.decode(errors="ignore") + + def _get_worker_pool( + self, + context: AuthedServiceContext, + worker_pool_id: UID, + ) -> Union[WorkerPool, SyftError]: + worker_pool = self.stash.get_by_uid( + credentials=context.credentials, uid=worker_pool_id + ) + + return ( + SyftError(message=f"{worker_pool.err()}") + if worker_pool.is_err() + else cast(WorkerPool, worker_pool.ok()) + ) + + def _get_worker_pool_and_worker( + self, context: AuthedServiceContext, worker_pool_id: UID, worker_id: UID + ) -> Union[Tuple[WorkerPool, SyftWorker], SyftError]: + worker_pool = self._get_worker_pool(context, worker_pool_id) + if isinstance(worker_pool, SyftError): + return worker_pool + + worker = _get_worker(worker_pool, worker_id) + if isinstance(worker, SyftError): + return worker + + return worker_pool, worker + + +def _get_worker_opt(worker_pool: WorkerPool, worker_id: UID) -> Optional[SyftWorker]: + try: + return next(worker for worker in worker_pool.workers if worker.id == worker_id) + except StopIteration: + return None + + +def _get_worker( + worker_pool: WorkerPool, worker_id: UID +) -> Union[SyftWorker, SyftError]: + worker = _get_worker_opt(worker_pool, worker_id) + return ( + worker + if worker is not None + else SyftError( + message=f"Worker with id: {worker_id} not found in pool: {worker_pool.name}" + ) + ) + + +def _get_worker_container( + worker: SyftWorker, docker_client: Optional[docker.DockerClient] = None +) -> Union[Container, SyftError]: + docker_client = docker_client if docker_client is not None else docker.from_env() + try: + return cast(Container, docker_client.containers.get(worker.container_id)) + except docker.errors.NotFound as e: + return SyftError(f"Worker {worker.id} container not found. Error {e}") + except docker.errors.APIError as e: + return SyftError( + f"Unable to access worker {worker.id} container. " + + f"Container server error {e}" + )