Skip to content

Commit

Permalink
Merge branch 'dev' into fix-wp-bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
yashgorana authored Jan 10, 2024
2 parents 58c157f + 5254b0d commit 021129a
Show file tree
Hide file tree
Showing 12 changed files with 312 additions and 132 deletions.
265 changes: 197 additions & 68 deletions notebooks/api/0.8/10-container-images.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion packages/grid/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ services:
- DEV_MODE=${DEV_MODE}
- NODE_SIDE_TYPE=${NODE_SIDE_TYPE}
- ENABLE_WARNINGS=${ENABLE_WARNINGS}
- INMEMORY_WORKERS=true
- INMEMORY_WORKERS=${INMEMORY_WORKERS}
ports:
- "${HTTP_PORT}:${HTTP_PORT}"
volumes:
Expand Down
10 changes: 0 additions & 10 deletions packages/syft/src/syft/custom_worker/builder.py
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,12 @@ def push_image(self, tag: str, **kwargs: Any) -> str:
return self._push_image(tag, **kwargs)

def _build_dockerfile(self, config: DockerWorkerConfig, tag: str, **kwargs):
print("Building with provided dockerfile")

# convert string to file-like object
file_obj = io.BytesIO(config.dockerfile.encode("utf-8"))
return self._build_image(fileobj=file_obj, tag=tag, **kwargs)

def _build_template(self, config: CustomWorkerConfig, **kwargs: Any):
# Builds a Docker pre-made CPU/GPU image template using a CustomWorkerConfig
print("Building with dockerfule template")

# remove once GPU is supported
if config.build.gpu:
raise Exception("GPU custom worker is not supported yet")
Expand All @@ -87,12 +83,6 @@ def _build_template(self, config: CustomWorkerConfig, **kwargs: Any):
"CUSTOM_CMD": config.build.merged_custom_cmds(),
}

print(
f"Building dockerfile={dockerfile} "
f"in context={contextdir} "
f"with args={build_args}"
)

return self._build_image(
tag=f"{self.CUSTOM_IMAGE_PREFIX}-{type}:{imgtag}",
path=str(contextdir),
Expand Down
16 changes: 9 additions & 7 deletions packages/syft/src/syft/service/worker/image_identifier.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# stdlib
from typing import Optional
from typing import Tuple
from typing import Union

# third party
from typing_extensions import Self
Expand Down Expand Up @@ -28,7 +29,7 @@ class SyftWorkerImageIdentifier(SyftBaseModel):
https://docs.docker.com/engine/reference/commandline/tag/#tag-an-image-referenced-by-name-and-tag
"""

registry: Optional[SyftImageRegistry]
registry: Optional[Union[SyftImageRegistry, str]]
repo: str
tag: str

Expand Down Expand Up @@ -73,11 +74,12 @@ def repo_with_tag(self) -> str:

@property
def full_name_with_tag(self) -> str:
if self.registry:
return f"{self.registry.url}/{self.repo}:{self.tag}"
else:
# default registry is always docker.io
if self.registry is None:
return f"docker.io/{self.repo}:{self.tag}"
elif isinstance(self.registry, str):
return f"{self.registry}/{self.repo}:{self.tag}"
else:
return f"{self.registry.url}/{self.repo}:{self.tag}"

@property
def registry_host(self) -> str:
Expand All @@ -91,5 +93,5 @@ def registry_host(self) -> str:
def __hash__(self) -> int:
return hash(self.repo + self.tag + str(hash(self.registry)))

def __str__(self) -> str:
return f"registry: {str(self.registry)}, repo: {self.repo}, tag: {self.tag}"
def __repr__(self) -> str:
return f"SyftWorkerImageIdentifier(repo={self.repo}, tag={self.tag}, registry={self.registry})"
3 changes: 3 additions & 0 deletions packages/syft/src/syft/service/worker/image_registry.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,5 +29,8 @@ def tls_enabled(self) -> bool:
def __hash__(self) -> int:
return hash(self.url + str(self.tls_enabled))

def __repr__(self) -> str:
return f"SyftImageRegistry(url={self.url})"

def __str__(self) -> str:
return self.url
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def add(
return SyftError(message=res.err())

return SyftSuccess(
message=f"Image registry <id: {registry.id}> created successfully"
message=f"Image Registry ID: {registry.id} created successfully"
)

@service_method(
Expand All @@ -68,7 +68,7 @@ def delete(
if res.is_err():
return SyftError(message=res.err())
return SyftSuccess(
message=f"Image registry <url: {url}> successfully deleted."
message=f"Image Registry URL: {url} successfully deleted."
)

# if uid is provided, delete by uid
Expand All @@ -77,7 +77,7 @@ def delete(
if res.is_err():
return SyftError(message=res.err())
return SyftSuccess(
message=f"Image registry <id: {uid}> successfully deleted."
message=f"Image Registry ID: {uid} successfully deleted."
)
else:
return SyftError(message="Either UID or URL must be provided.")
Expand Down
14 changes: 10 additions & 4 deletions packages/syft/src/syft/service/worker/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,9 @@ def run_container_using_docker(
password: Optional[str] = None,
registry_url: Optional[str] = None,
) -> ContainerSpawnStatus:
if not worker_image.is_built:
raise Exception("Image must be built before running it.")

# Get hostname
hostname = socket.gethostname()

Expand Down Expand Up @@ -160,7 +163,7 @@ def run_container_using_docker(
environment["CONTAINER_HOST"] = "docker"

container = docker_client.containers.run(
worker_image.image_identifier.repo_with_tag,
worker_image.image_identifier.full_name_with_tag,
name=f"{hostname}-{worker_name}",
detach=True,
auto_remove=True,
Expand Down Expand Up @@ -267,6 +270,9 @@ def run_containers(
if orchestration not in [WorkerOrchestrationType.DOCKER]:
return SyftError(message="Only Orchestration via Docker is supported.")

if not worker_image.is_built:
return SyftError(message="Image must be built before running it.")

with contextlib.closing(docker.from_env()) as client:
for worker_count in range(start_idx + 1, number + 1):
worker_name = f"{pool_name}-{worker_count}"
Expand Down Expand Up @@ -359,15 +365,15 @@ def docker_build(
return ImageBuildResult(image_hash=built_image.id, logs=parse_output(logs))
except docker.errors.APIError as e:
return SyftError(
message=f"Docker API error when building {image.image_tag}. Reason - {e}"
message=f"Docker API error when building {image.image_identifier}. Reason - {e}"
)
except docker.errors.DockerException as e:
return SyftError(
message=f"Docker exception when building {image.image_tag}. Reason - {e}"
message=f"Docker exception when building {image.image_identifier}. Reason - {e}"
)
except Exception as e:
return SyftError(
message=f"Unknown exception when building {image.image_tag}. Reason - {e}"
message=f"Unknown exception when building {image.image_identifier}. Reason - {e}"
)


Expand Down
22 changes: 18 additions & 4 deletions packages/syft/src/syft/service/worker/worker_image.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,22 @@ class SyftWorkerImage(SyftObject):

id: UID
config: WorkerConfig
image_identifier: Optional[SyftWorkerImageIdentifier]
image_hash: Optional[str]
created_at: DateTime = DateTime.now()
created_by: SyftVerifyKey
built_at: Optional[DateTime]
created_at: DateTime = DateTime.now()
image_identifier: Optional[SyftWorkerImageIdentifier] = None
image_hash: Optional[str] = None
built_at: Optional[DateTime] = None

@property
def is_built(self) -> bool:
"""Returns True if the image has been built."""

return self.built_at is not None

@property
def built_image_tag(self) -> Optional[str]:
"""Returns the full name of the image if it has been built."""

if self.is_built and self.image_identifier:
return self.image_identifier.full_name_with_tag
return None
56 changes: 24 additions & 32 deletions packages/syft/src/syft/service/worker/worker_image_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,19 +48,17 @@ def __init__(self, store: DocumentStore) -> None:
def submit_dockerfile(
self, context: AuthedServiceContext, docker_config: DockerWorkerConfig
) -> Union[SyftSuccess, SyftError]:
image_identifier = SyftWorkerImageIdentifier(repo="", tag="")
worker_image = SyftWorkerImage(
config=docker_config,
created_by=context.credentials,
image_identifier=image_identifier,
)
res = self.stash.set(context.credentials, worker_image)

if res.is_err():
return SyftError(message=res.err())

return SyftSuccess(
message=f"Dockerfile <id: {worker_image.id}> successfully submitted."
message=f"Dockerfile ID: {worker_image.id} successfully submitted."
)

@service_method(
Expand Down Expand Up @@ -90,10 +88,10 @@ def build(
image_registry_service: SyftImageRegistryService = context.node.get_service(
SyftImageRegistryService
)
result = image_registry_service.get_by_id(context, registry_uid)
if result.is_err():
return result
registry: SyftImageRegistry = result.ok()
registry_result = image_registry_service.get_by_id(context, registry_uid)
if registry_result.is_err():
return registry_result
registry: SyftImageRegistry = registry_result.ok()

try:
if registry:
Expand All @@ -112,24 +110,25 @@ def build(
and worker_image.image_identifier.full_name_with_tag
== image_identifier.full_name_with_tag
):
return SyftError(message=f"Image<{image_uid}> is already built")
return SyftError(message=f"Image ID: {image_uid} is already built")

worker_image.image_identifier = image_identifier
result = None

if not context.node.in_memory_workers:
result = docker_build(worker_image)
if isinstance(result, SyftError):
return result
build_result = docker_build(worker_image)
if isinstance(build_result, SyftError):
return build_result

worker_image.image_hash = result.image_hash
worker_image.image_hash = build_result.image_hash
worker_image.built_at = DateTime.now()

result = SyftSuccess(
message=f"Build {worker_image} succeeded.\n{result.logs}"
message=f"Build for Worker ID: {worker_image.id} succeeded.\n{build_result.logs}"
)
else:
result = SyftSuccess(
message="Image building skipped, since using InMemory workers."
message="Image building skipped, since using in-memory workers."
)

update_result = self.stash.update(context.credentials, obj=worker_image)
Expand All @@ -153,28 +152,21 @@ def push(
username: Optional[str] = None,
password: Optional[str] = None,
) -> Union[SyftSuccess, SyftError]:
if context.node.in_memory_workers:
return SyftSuccess(
message="Skipped pushing image, since using InMemory workers."
)

result = self.stash.get_by_uid(credentials=context.credentials, uid=image)
if result.is_err():
return SyftError(
message=f"Failed to get image for uid: {image}. Error: {result.err()}"
message=f"Failed to get Image ID: {image}. Error: {result.err()}"
)
worker_image: SyftWorkerImage = result.ok()

if (
if not worker_image.is_built:
return SyftError(message=f"Image ID: {worker_image.id} is not built yet.")
elif (
worker_image.image_identifier is None
or worker_image.image_identifier.registry_host == ""
):
return SyftError(
message=f"Image {worker_image} does not have a valid registry host."
)
elif worker_image.built_at is None:
return SyftError(
message=f"Image {worker_image} is not built yet. Please build it first."
message=f"Image ID: {worker_image.id} does not have a valid registry host."
)

result = docker_push(
Expand All @@ -187,7 +179,7 @@ def push(
return result

return SyftSuccess(
message=f'The image was successfully pushed to "{worker_image.image_identifier.full_name_with_tag}"'
message=f'Pushed Image ID: {worker_image.id} to "{worker_image.image_identifier.full_name_with_tag}".'
)

@service_method(
Expand All @@ -209,7 +201,7 @@ def get_all(
res: List[Tuple] = []
for im in images:
if im.image_identifier is not None:
res.append((im.image_identifier.repo_with_tag, im))
res.append((im.image_identifier.full_name_with_tag, im))
else:
res.append(("default-worker-image", im))

Expand All @@ -231,14 +223,14 @@ def delete(

if not context.node.in_memory_workers and image and image.image_identifier:
try:
full_tag: str = image.image_identifier.repo_with_tag
full_tag: str = image.image_identifier.full_name_with_tag
with contextlib.closing(docker.from_env()) as client:
client.images.remove(image=full_tag)
except docker.errors.ImageNotFound:
return SyftError(message=f"Image {full_tag} not found.")
return SyftError(message=f"Image Tag: {full_tag} not found.")
except Exception as e:
return SyftError(
message=f"Failed to delete image {full_tag}. Error: {e}"
message=f"Failed to delete Image Tag: {full_tag}. Error: {e}"
)

result = self.stash.delete_by_uid(credentials=context.credentials, uid=uid)
Expand All @@ -247,7 +239,7 @@ def delete(
return SyftError(message=f"{result.err()}")

returned_message: str = (
result.ok().message + f". Image {uid} deleted successfully."
result.ok().message + f". Image ID: {uid} deleted successfully."
)

return SyftSuccess(message=returned_message)
6 changes: 3 additions & 3 deletions packages/syft/src/syft/service/worker/worker_pool_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ def create_pool(
message=f"Failed to retrieve Worker Image with id: {image_uid}. Error: {result.err()}"
)

worker_image = result.ok()
worker_image: SyftWorkerImage = result.ok()

worker_list, container_statuses = _create_workers_in_pool(
context=context,
Expand Down Expand Up @@ -148,7 +148,7 @@ def get_all(
res: List[Tuple] = []
for pool in worker_pools:
if pool.image.image_identifier is not None:
res.append((pool.image.image_identifier.repo_with_tag, pool))
res.append((pool.image.image_identifier.full_name_with_tag, pool))
else:
res.append(("in-memory-pool", pool))
return DictTuple(res)
Expand Down Expand Up @@ -193,7 +193,7 @@ def add_workers(
message=f"Failed to retrieve image for worker pool: {worker_pool.name}"
)

worker_image = result.ok()
worker_image: SyftWorkerImage = result.ok()

worker_list, container_statuses = _create_workers_in_pool(
context=context,
Expand Down
Loading

0 comments on commit 021129a

Please sign in to comment.