Skip to content

Commit

Permalink
Merge pull request #8346 from kiendang/worker-pool-log
Browse files Browse the repository at this point in the history
Add endpoint for getting worker container logs
  • Loading branch information
rasswanth-s authored Dec 15, 2023
2 parents 4b661b9 + 457e956 commit 7448214
Show file tree
Hide file tree
Showing 2 changed files with 167 additions and 39 deletions.
78 changes: 56 additions & 22 deletions notebooks/api/0.8/10-container-images.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,8 @@
"source": [
"import syft as sy\n",
"sy.requires(SYFT_VERSION)\n",
"from syft.service.worker.worker_image import SyftWorkerImage, SyftWorkerImageTag\n",
"from syft.custom_worker.config import DockerWorkerConfig\n",
"from syft.service.worker.worker_image import build_using_docker\n",
"from syft.service.worker.utils import run_container_using_docker\n",
"\n",
"\n",
"#third party\n",
"import docker"
"from syft.service.worker.worker_image import SyftWorkerImage\n",
"from syft.custom_worker.config import DockerWorkerConfig"
]
},
{
Expand Down Expand Up @@ -338,15 +332,62 @@
"second_worker = worker_pool.workers[1]"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "fda29eca",
"metadata": {},
"outputs": [],
"source": [
"raw_worker_logs = domain_client.api.services.worker_pool.worker_logs(\n",
" worker_pool_id=worker_pool.id, worker_id=second_worker.id, raw=True\n",
")\n",
"raw_worker_logs"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "1386d881",
"metadata": {},
"outputs": [],
"source": [
"assert isinstance(raw_worker_logs, bytes)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "187cb1ee",
"metadata": {},
"outputs": [],
"source": [
"worker_logs = domain_client.api.services.worker_pool.worker_logs(\n",
" worker_pool_id=worker_pool.id, worker_id=second_worker.id\n",
")\n",
"worker_logs"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "f08fc155",
"metadata": {},
"outputs": [],
"source": [
"assert isinstance(worker_logs, str)"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "c23a5008-0fa6-4d38-9102-71696b3eea41",
"metadata": {},
"outputs": [],
"source": [
"worker_delete_res = domain_client.api.services.worker_pool.delete_worker(worker_pool_id=worker_pool.id,\n",
" worker_id=second_worker.id)"
"worker_delete_res = domain_client.api.services.worker_pool.delete_worker(\n",
" worker_pool_id=worker_pool.id, worker_id=second_worker.id\n",
")"
]
},
{
Expand All @@ -366,7 +407,7 @@
"metadata": {},
"outputs": [],
"source": [
"assert isinstance(worker_delete_res,sy.SyftSuccess)"
"assert isinstance(worker_delete_res, sy.SyftSuccess)"
]
},
{
Expand All @@ -379,7 +420,7 @@
"# Refetch the worker pool\n",
"# Ensure that the deleted worker's id is not present\n",
"worker_pool = domain_client.api.services.worker_pool.get_all()[0]\n",
"assert len(worker_pool.workers)==2\n",
"assert len(worker_pool.workers) == 2\n",
"for worker in worker_pool.workers:\n",
" assert second_worker.id != worker.id"
]
Expand Down Expand Up @@ -415,8 +456,9 @@
"source": [
"# delete the remaining workers\n",
"for worker in worker_pool.workers:\n",
" res =domain_client.api.services.worker_pool.delete_worker(worker_pool_id=worker_pool.id,\n",
" worker_id= worker.id)\n",
" res =domain_client.api.services.worker_pool.delete_worker(\n",
" worker_pool_id=worker_pool.id, worker_id= worker.id\n",
" )\n",
" assert isinstance(res, sy.SyftSuccess)"
]
},
Expand All @@ -441,14 +483,6 @@
"assert isinstance(delete_res, sy.SyftSuccess)\n",
"delete_res"
]
},
{
"cell_type": "code",
"execution_count": null,
"id": "6e2769f9-d0aa-49d2-b9fe-077262229de8",
"metadata": {},
"outputs": [],
"source": []
}
],
"metadata": {
Expand Down
128 changes: 111 additions & 17 deletions packages/syft/src/syft/service/worker/worker_pool_service.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,13 @@
# stdlib
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union
from typing import cast

# third party
import docker
from docker.models.containers import Container

# relative
from ...serde.serializable import serializable
Expand All @@ -18,6 +22,7 @@
from .utils import run_containers
from .worker_image_stash import SyftWorkerImageStash
from .worker_pool import ContainerSpawnStatus
from .worker_pool import SyftWorker
from .worker_pool import WorkerOrchestrationType
from .worker_pool import WorkerPool
from .worker_pool_stash import SyftWorkerPoolStash
Expand Down Expand Up @@ -131,26 +136,19 @@ def delete_worker(
worker_id: UID,
force: bool = False,
) -> Union[SyftSuccess, SyftError]:
worker_pool = self.stash.get_by_uid(
credentials=context.credentials, uid=worker_pool_id
worker_pool_worker = self._get_worker_pool_and_worker(
context, worker_pool_id, worker_id
)
if worker_pool.is_err():
return SyftError(message=f"{worker_pool.err()}")

worker_pool: WorkerPool = worker_pool.ok()
worker = None
for w in worker_pool.workers:
if w.id == worker_id:
worker = w
break
if worker is None:
return SyftError(
message=f"Worker with id: {worker_id} not found in pool: {worker_pool.name}"
)
if isinstance(worker_pool_worker, SyftError):
return worker_pool_worker

worker_pool, worker = worker_pool_worker

# delete the worker using docker client sdk
docker_client = docker.from_env()
docker_container = docker_client.containers.get(worker.container_id)
docker_container = _get_worker_container(worker)
if isinstance(docker_container, SyftError):
return docker_container

try:
# stop the container
docker_container.stop()
Expand Down Expand Up @@ -179,3 +177,99 @@ def delete_worker(
return SyftSuccess(
message=f"Worker with id: {worker_id} deleted successfully from pool: {worker_pool.name}"
)

@service_method(
path="worker_pool.worker_logs",
name="worker_logs",
roles=DATA_OWNER_ROLE_LEVEL,
)
def worker_logs(
self,
context: AuthedServiceContext,
worker_pool_id: UID,
worker_id: UID,
raw: bool = False,
) -> Union[bytes, str, SyftError]:
worker_pool_worker = self._get_worker_pool_and_worker(
context, worker_pool_id, worker_id
)
if isinstance(worker_pool_worker, SyftError):
return worker_pool_worker

_, worker = worker_pool_worker

docker_container = _get_worker_container(worker)
if isinstance(docker_container, SyftError):
return docker_container

try:
logs = cast(bytes, docker_container.logs())
except docker.errors.APIError as e:
return SyftError(
f"Failed to get worker {worker.id} container logs. Error {e}"
)

return logs if raw else logs.decode(errors="ignore")

def _get_worker_pool(
self,
context: AuthedServiceContext,
worker_pool_id: UID,
) -> Union[WorkerPool, SyftError]:
worker_pool = self.stash.get_by_uid(
credentials=context.credentials, uid=worker_pool_id
)

return (
SyftError(message=f"{worker_pool.err()}")
if worker_pool.is_err()
else cast(WorkerPool, worker_pool.ok())
)

def _get_worker_pool_and_worker(
self, context: AuthedServiceContext, worker_pool_id: UID, worker_id: UID
) -> Union[Tuple[WorkerPool, SyftWorker], SyftError]:
worker_pool = self._get_worker_pool(context, worker_pool_id)
if isinstance(worker_pool, SyftError):
return worker_pool

worker = _get_worker(worker_pool, worker_id)
if isinstance(worker, SyftError):
return worker

return worker_pool, worker


def _get_worker_opt(worker_pool: WorkerPool, worker_id: UID) -> Optional[SyftWorker]:
try:
return next(worker for worker in worker_pool.workers if worker.id == worker_id)
except StopIteration:
return None


def _get_worker(
worker_pool: WorkerPool, worker_id: UID
) -> Union[SyftWorker, SyftError]:
worker = _get_worker_opt(worker_pool, worker_id)
return (
worker
if worker is not None
else SyftError(
message=f"Worker with id: {worker_id} not found in pool: {worker_pool.name}"
)
)


def _get_worker_container(
worker: SyftWorker, docker_client: Optional[docker.DockerClient] = None
) -> Union[Container, SyftError]:
docker_client = docker_client if docker_client is not None else docker.from_env()
try:
return cast(Container, docker_client.containers.get(worker.container_id))
except docker.errors.NotFound as e:
return SyftError(f"Worker {worker.id} container not found. Error {e}")
except docker.errors.APIError as e:
return SyftError(
f"Unable to access worker {worker.id} container. "
+ f"Container server error {e}"
)

0 comments on commit 7448214

Please sign in to comment.