Skip to content

Commit

Permalink
Merge pull request #8466 from khoaguin/fix-mypy-issues-service-worker
Browse files Browse the repository at this point in the history
[Refactor] Fixing mypy issues of `syft/service/worker`
  • Loading branch information
kiendang authored Feb 14, 2024
2 parents 658ebcf + df7cfef commit d32f76d
Show file tree
Hide file tree
Showing 12 changed files with 159 additions and 102 deletions.
2 changes: 1 addition & 1 deletion .pre-commit-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ repos:
- id: mypy
name: "mypy: syft"
always_run: true
files: "^packages/syft/src/syft/serde|^packages/syft/src/syft/util/env.py|^packages/syft/src/syft/util/logger.py|^packages/syft/src/syft/util/markdown.py|^packages/syft/src/syft/util/notebook_ui/notebook_addons.py|^packages/syft/src/syft/service/warnings.py|^packages/syft/src/syft/service/dataset"
files: "^packages/syft/src/syft/serde|^packages/syft/src/syft/util/env.py|^packages/syft/src/syft/util/logger.py|^packages/syft/src/syft/util/markdown.py|^packages/syft/src/syft/util/notebook_ui/notebook_addons.py|^packages/syft/src/syft/service/warnings.py|^packages/syft/src/syft/service/dataset|^packages/syft/src/syft/service/worker"
#files: "^packages/syft/src/syft/serde"
args: [
"--follow-imports=skip",
Expand Down
2 changes: 1 addition & 1 deletion packages/syft/src/syft/service/worker/image_identifier.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def from_str(cls, tag: str) -> Self:
return cls(repo=repo, registry=registry, tag=tag)

@property
def repo_with_tag(self) -> str:
def repo_with_tag(self) -> Optional[str]:
if self.repo or self.tag:
return f"{self.repo}:{self.tag}"
return None
Expand Down
5 changes: 3 additions & 2 deletions packages/syft/src/syft/service/worker/image_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

# third party
from pydantic import validator
from typing_extensions import Self

# relative
from ...serde.serializable import serializable
Expand All @@ -28,7 +29,7 @@ class SyftImageRegistry(SyftObject):
url: str

@validator("url")
def validate_url(cls, val: str):
def validate_url(cls, val: str) -> str:
if not val:
raise ValueError("Invalid Registry URL. Must not be empty")

Expand All @@ -38,7 +39,7 @@ def validate_url(cls, val: str):
return val

@classmethod
def from_url(cls, full_str: str):
def from_url(cls, full_str: str) -> Self:
# this is only for urlparse
if "://" not in full_str:
full_str = f"http://{full_str}"
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# stdlib
from typing import List
from typing import Optional
from typing import Union

# relative
Expand Down Expand Up @@ -62,7 +63,7 @@ def delete(
self,
context: AuthedServiceContext,
uid: UID = None,
url: str = None,
url: Optional[str] = None,
) -> Union[SyftSuccess, SyftError]:
# TODO - we need to make sure that there are no workers running an image bound to this registry

Expand Down
185 changes: 110 additions & 75 deletions packages/syft/src/syft/service/worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,17 @@
import socket
import socketserver
import sys
from typing import Any
from typing import Dict
from typing import List
from typing import Optional
from typing import Tuple
from typing import Union

# third party
import docker
from docker.models.containers import Container
from kr8s.objects import Pod

# relative
from ...abstract_node import AbstractNode
Expand Down Expand Up @@ -47,7 +51,9 @@ def backend_container_name() -> str:
return f"{hostname}-{service_name}-1"


def get_container(docker_client: docker.DockerClient, container_name: str):
def get_container(
docker_client: docker.DockerClient, container_name: str
) -> Optional[Container]:
try:
existing_container = docker_client.containers.get(container_name)
except docker.errors.NotFound:
Expand All @@ -56,14 +62,20 @@ def get_container(docker_client: docker.DockerClient, container_name: str):
return existing_container


def extract_config_from_backend(worker_name: str, docker_client: docker.DockerClient):
def extract_config_from_backend(
worker_name: str, docker_client: docker.DockerClient
) -> Dict[str, Any]:
# Existing main backend container
backend_container = get_container(
docker_client, container_name=backend_container_name()
)

# Config with defaults
extracted_config = {"volume_binds": {}, "network_mode": None, "environment": {}}
extracted_config: Dict[str, Any] = {
"volume_binds": {},
"network_mode": None,
"environment": {},
}

if backend_container is None:
return extracted_config
Expand Down Expand Up @@ -94,7 +106,7 @@ def extract_config_from_backend(worker_name: str, docker_client: docker.DockerCl
return extracted_config


def get_free_tcp_port():
def get_free_tcp_port() -> int:
with socketserver.TCPServer(("localhost", 0), None) as s:
free_port = s.server_address[1]
return free_port
Expand All @@ -113,7 +125,7 @@ def run_container_using_docker(
registry_url: Optional[str] = None,
) -> ContainerSpawnStatus:
if not worker_image.is_built:
raise Exception("Image must be built before running it.")
raise ValueError("Image must be built before running it.")

# Get hostname
hostname = socket.gethostname()
Expand Down Expand Up @@ -165,8 +177,11 @@ def run_container_using_docker(
environment["QUEUE_PORT"] = queue_port
environment["CONTAINER_HOST"] = "docker"

if worker_image.image_identifier is None:
raise ValueError(f"Image {worker_image} does not have an identifier")

container = docker_client.containers.run(
worker_image.image_identifier.full_name_with_tag,
image=worker_image.image_identifier.full_name_with_tag,
name=f"{hostname}-{worker_name}",
detach=True,
auto_remove=True,
Expand Down Expand Up @@ -257,20 +272,22 @@ def run_workers_in_threads(
return results


def prepare_kubernetes_pool_env(runner: KubernetesRunner, env_vars: dict):
def prepare_kubernetes_pool_env(
runner: KubernetesRunner, env_vars: dict
) -> Tuple[List, Dict]:
# get current backend pod name
backend_pod_name = os.getenv("K8S_POD_NAME")
if not backend_pod_name:
raise ValueError(message="Pod name not provided in environment variable")
raise ValueError("Pod name not provided in environment variable")

# get current backend's credentials path
creds_path = os.getenv("CREDENTIALS_PATH")
creds_path: Optional[Union[str, Path]] = os.getenv("CREDENTIALS_PATH")
if not creds_path:
raise ValueError(message="Credentials path not provided")
raise ValueError("Credentials path not provided")

creds_path = Path(creds_path)
if not creds_path.exists():
raise ValueError(message="Credentials file does not exist")
if creds_path is not None and not creds_path.exists():
raise ValueError("Credentials file does not exist")

# create a secret for the node credentials owned by the backend, not the pool.
node_secret = KubeUtils.create_secret(
Expand All @@ -283,15 +300,15 @@ def prepare_kubernetes_pool_env(runner: KubernetesRunner, env_vars: dict):

# clone and patch backend environment variables
backend_env = runner.get_pod_env_vars(backend_pod_name) or []
env_vars = KubeUtils.patch_env_vars(backend_env, env_vars)
env_vars_: List = KubeUtils.patch_env_vars(backend_env, env_vars)
mount_secrets = {
node_secret.metadata.name: {
"mountPath": str(creds_path),
"subPath": creds_path.name,
},
}

return env_vars, mount_secrets
return env_vars_, mount_secrets


def create_kubernetes_pool(
Expand All @@ -304,8 +321,8 @@ def create_kubernetes_pool(
reg_username: Optional[str] = None,
reg_password: Optional[str] = None,
reg_url: Optional[str] = None,
**kwargs,
):
**kwargs: Any,
) -> Union[List[Pod], SyftError]:
pool = None
error = False

Expand Down Expand Up @@ -355,7 +372,7 @@ def scale_kubernetes_pool(
runner: KubernetesRunner,
pool_name: str,
replicas: int,
):
) -> Union[List[Pod], SyftError]:
pool = runner.get_pool(pool_name)
if not pool:
return SyftError(message=f"Pool does not exist. name={pool_name}")
Expand All @@ -374,28 +391,33 @@ def run_workers_in_kubernetes(
worker_count: int,
pool_name: str,
queue_port: int,
start_idx=0,
start_idx: int = 0,
debug: bool = False,
reg_username: Optional[str] = None,
reg_password: Optional[str] = None,
reg_url: Optional[str] = None,
**kwargs,
**kwargs: Any,
) -> Union[List[ContainerSpawnStatus], SyftError]:
spawn_status = []
runner = KubernetesRunner()

if not runner.exists(pool_name=pool_name):
pool_pods = create_kubernetes_pool(
runner=runner,
tag=worker_image.image_identifier.full_name_with_tag,
pool_name=pool_name,
replicas=worker_count,
queue_port=queue_port,
debug=debug,
reg_username=reg_username,
reg_password=reg_password,
reg_url=reg_url,
)
if worker_image.image_identifier is not None:
pool_pods = create_kubernetes_pool(
runner=runner,
tag=worker_image.image_identifier.full_name_with_tag,
pool_name=pool_name,
replicas=worker_count,
queue_port=queue_port,
debug=debug,
reg_username=reg_username,
reg_password=reg_password,
reg_url=reg_url,
)
else:
return SyftError(
message=f"image with uid {worker_image.id} does not have an image identifier"
)
else:
pool_pods = scale_kubernetes_pool(runner, pool_name, worker_count)

Expand Down Expand Up @@ -584,28 +606,35 @@ def _get_healthcheck_based_on_status(status: WorkerStatus) -> WorkerHealth:
return WorkerHealth.UNHEALTHY


def image_build(image: SyftWorkerImage, **kwargs) -> Union[ImageBuildResult, SyftError]:
full_tag = image.image_identifier.full_name_with_tag
try:
builder = CustomWorkerBuilder()
return builder.build_image(
config=image.config,
tag=full_tag,
rm=True,
forcerm=True,
**kwargs,
)
except docker.errors.APIError as e:
return SyftError(
message=f"Docker API error when building '{full_tag}'. Reason - {e}"
)
except docker.errors.DockerException as e:
return SyftError(
message=f"Docker exception when building '{full_tag}'. Reason - {e}"
)
except Exception as e:
def image_build(
image: SyftWorkerImage, **kwargs: Dict[str, Any]
) -> Union[ImageBuildResult, SyftError]:
if image.image_identifier is not None:
full_tag = image.image_identifier.full_name_with_tag
try:
builder = CustomWorkerBuilder()
return builder.build_image(
config=image.config,
tag=full_tag,
rm=True,
forcerm=True,
**kwargs,
)
except docker.errors.APIError as e:
return SyftError(
message=f"Docker API error when building '{full_tag}'. Reason - {e}"
)
except docker.errors.DockerException as e:
return SyftError(
message=f"Docker exception when building '{full_tag}'. Reason - {e}"
)
except Exception as e:
return SyftError(
message=f"Unknown exception when building '{full_tag}'. Reason - {e}"
)
else:
return SyftError(
message=f"Unknown exception when building '{full_tag}'. Reason - {e}"
message=f"image with uid {image.id} does not have an image identifier"
)


Expand All @@ -614,34 +643,40 @@ def image_push(
username: Optional[str] = None,
password: Optional[str] = None,
) -> Union[ImagePushResult, SyftError]:
full_tag = image.image_identifier.full_name_with_tag
try:
builder = CustomWorkerBuilder()
result = builder.push_image(
# this should be consistent with docker build command
tag=image.image_identifier.full_name_with_tag,
registry_url=image.image_identifier.registry_host,
username=username,
password=password,
)
if image.image_identifier is not None:
full_tag = image.image_identifier.full_name_with_tag
try:
builder = CustomWorkerBuilder()
result = builder.push_image(
# this should be consistent with docker build command
tag=image.image_identifier.full_name_with_tag,
registry_url=image.image_identifier.registry_host,
username=username,
password=password,
)

if "error" in result.logs.lower() or result.exit_code:
return SyftError(
message=f"Failed to push {full_tag}. "
f"Exit code: {result.exit_code}. "
f"Logs:\n{result.logs}"
)

if "error" in result.logs.lower() or result.exit_code:
return result
except docker.errors.APIError as e:
return SyftError(message=f"Docker API error when pushing {full_tag}. {e}")
except docker.errors.DockerException as e:
return SyftError(
message=f"Failed to push {full_tag}. "
f"Exit code: {result.exit_code}. "
f"Logs:\n{result.logs}"
message=f"Docker exception when pushing {full_tag}. Reason - {e}"
)

return result
except docker.errors.APIError as e:
return SyftError(message=f"Docker API error when pushing {full_tag}. {e}")
except docker.errors.DockerException as e:
return SyftError(
message=f"Docker exception when pushing {full_tag}. Reason - {e}"
)
except Exception as e:
except Exception as e:
return SyftError(
message=f"Unknown exception when pushing {image.image_identifier}. Reason - {e}"
)
else:
return SyftError(
message=f"Unknown exception when pushing {image.image_identifier}. Reason - {e}"
message=f"image with uid {image.id} does not have an "
"image identifier and tag, hence we can't push it."
)


Expand Down
Loading

0 comments on commit d32f76d

Please sign in to comment.