diff --git a/.github/workflows/pr-tests-syft.yml b/.github/workflows/pr-tests-syft.yml index 1083357f141..f2bee6a78cf 100644 --- a/.github/workflows/pr-tests-syft.yml +++ b/.github/workflows/pr-tests-syft.yml @@ -202,7 +202,7 @@ jobs: matrix: os: [ubuntu-latest] python-version: ["3.10", "3.11", "3.12"] - deployment-type: ["k8s"] + deployment-type: ["remote"] notebook-paths: ["api/0.8"] fail-fast: false diff --git a/notebooks/admin/Custom API + Custom Worker.ipynb b/notebooks/admin/Custom API + Custom Worker.ipynb index bbf47476301..ef6bde6ab9c 100644 --- a/notebooks/admin/Custom API + Custom Worker.ipynb +++ b/notebooks/admin/Custom API + Custom Worker.ipynb @@ -36,8 +36,8 @@ "metadata": {}, "outputs": [], "source": [ - "## k8s mode\n", - "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"k8s\"\n", + "## remote mode\n", + "# os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", "# os.environ[\"DEV_MODE\"] = \"True\"\n", "domain_client = sy.login(\n", " email=\"info@openmined.org\",\n", diff --git a/notebooks/api/0.8/09-blob-storage.ipynb b/notebooks/api/0.8/09-blob-storage.ipynb index 713fd53f6f4..93491499896 100644 --- a/notebooks/api/0.8/09-blob-storage.ipynb +++ b/notebooks/api/0.8/09-blob-storage.ipynb @@ -36,7 +36,6 @@ "node = sy.orchestra.launch(\n", " name=\"test-domain-1\",\n", " dev_mode=True,\n", - " in_memory_workers=True,\n", " reset=True,\n", " create_producer=True,\n", ")" diff --git a/notebooks/api/0.8/10-container-images.ipynb b/notebooks/api/0.8/10-container-images.ipynb index 84fd1a340ab..cb4bd49cf86 100644 --- a/notebooks/api/0.8/10-container-images.ipynb +++ b/notebooks/api/0.8/10-container-images.ipynb @@ -91,8 +91,7 @@ "# Disable inmemory worker for container stack\n", "running_as_container = os.environ.get(\"ORCHESTRA_DEPLOYMENT_TYPE\") in (\n", " \"container_stack\",\n", - ")\n", - "in_memory_workers = not running_as_container" + ")" ] }, { @@ -106,7 +105,6 @@ " name=\"test-domain-1\",\n", " dev_mode=True,\n", " create_producer=True,\n", - " in_memory_workers=in_memory_workers,\n", " reset=True,\n", " port=8081,\n", ")" @@ -1485,7 +1483,7 @@ "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", - "version": "3.11.8" + "version": "3.11.7" } }, "nbformat": 4, diff --git a/notebooks/api/0.8/11-container-images-k8s.ipynb b/notebooks/api/0.8/11-container-images-k8s.ipynb index c9663acd3ad..77cd71c7912 100644 --- a/notebooks/api/0.8/11-container-images-k8s.ipynb +++ b/notebooks/api/0.8/11-container-images-k8s.ipynb @@ -45,7 +45,7 @@ "metadata": {}, "outputs": [], "source": [ - "os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"k8s\"\n", + "os.environ[\"ORCHESTRA_DEPLOYMENT_TYPE\"] = \"remote\"\n", "os.environ[\"DEV_MODE\"] = \"True\"\n", "\n", "# Uncomment this to add custom values\n", diff --git a/notebooks/api/0.8/12-custom-api-endpoint.ipynb b/notebooks/api/0.8/12-custom-api-endpoint.ipynb index dd867dc3757..f84ca9c5c3f 100644 --- a/notebooks/api/0.8/12-custom-api-endpoint.ipynb +++ b/notebooks/api/0.8/12-custom-api-endpoint.ipynb @@ -33,7 +33,6 @@ " dev_mode=True,\n", " create_producer=True,\n", " n_consumers=3,\n", - " in_memory_workers=True,\n", " reset=True,\n", " port=8081,\n", ")\n", diff --git a/notebooks/tutorials/data-engineer/02-deployment-types.ipynb b/notebooks/tutorials/data-engineer/02-deployment-types.ipynb index b4c43e5929d..1bd572a26fe 100644 --- a/notebooks/tutorials/data-engineer/02-deployment-types.ipynb +++ b/notebooks/tutorials/data-engineer/02-deployment-types.ipynb @@ -67,7 +67,7 @@ "metadata": {}, "outputs": [], "source": [ - "memory_node = sy.Orchestra.launch(\n", + "memory_node = sy.orchestra.launch(\n", " name=\"Arbitrary Dev Node\",\n", " dev_mode=True,\n", " reset=True,\n", @@ -99,7 +99,7 @@ "metadata": {}, "outputs": [], "source": [ - "webserver_node = sy.Orchestra.launch(\n", + "webserver_node = sy.orchestra.launch(\n", " name=\"Arbitrary Webserver Dev Node\", dev_mode=True, reset=True, port=8081\n", ")" ] diff --git a/notebooks/tutorials/enclaves/Enclave-single-notebook-high-low-network.ipynb b/notebooks/tutorials/enclaves/Enclave-single-notebook-high-low-network.ipynb index ecfdefff737..771f0ad4389 100644 --- a/notebooks/tutorials/enclaves/Enclave-single-notebook-high-low-network.ipynb +++ b/notebooks/tutorials/enclaves/Enclave-single-notebook-high-low-network.ipynb @@ -45,7 +45,7 @@ "metadata": {}, "outputs": [], "source": [ - "embassador_node_low = sy.Orchestra.launch(\n", + "embassador_node_low = sy.orchestra.launch(\n", " name=\"ambassador node\",\n", " node_side_type=\"low\",\n", " local_db=True,\n", @@ -69,14 +69,14 @@ "metadata": {}, "outputs": [], "source": [ - "ca_node_low = sy.Orchestra.launch(\n", + "ca_node_low = sy.orchestra.launch(\n", " name=\"canada-1\",\n", " node_side_type=\"low\",\n", " local_db=True,\n", " reset=True,\n", " # enable_warnings=True,\n", ")\n", - "it_node_low = sy.Orchestra.launch(\n", + "it_node_low = sy.orchestra.launch(\n", " name=\"italy-1\",\n", " node_side_type=\"low\",\n", " local_db=True,\n", @@ -125,13 +125,13 @@ " reset=True,\n", " # enable_warnings=True,\n", ")\n", - "ca_node_high = sy.Orchestra.launch(\n", + "ca_node_high = sy.orchestra.launch(\n", " name=\"canada-2\",\n", " local_db=True,\n", " reset=True,\n", " # enable_warnings=True,\n", ")\n", - "it_node_high = sy.Orchestra.launch(\n", + "it_node_high = sy.orchestra.launch(\n", " name=\"italy-2\",\n", " local_db=True,\n", " reset=True,\n", diff --git a/packages/grid/backend/backend.dockerfile b/packages/grid/backend/backend.dockerfile index f5c68b3fcb3..08ea2c9a72a 100644 --- a/packages/grid/backend/backend.dockerfile +++ b/packages/grid/backend/backend.dockerfile @@ -34,8 +34,7 @@ COPY syft/src/syft/VERSION ./syft/src/syft/ RUN --mount=type=cache,target=/root/.cache,sharing=locked \ # remove torch because we already have the cpu version pre-installed sed --in-place /torch==/d ./syft/setup.cfg && \ - uv pip install -e ./syft[data_science] && \ - uv pip freeze | grep ansible | xargs uv pip uninstall + uv pip install -e ./syft[data_science] # ==================== [Final] Setup Syft Server ==================== # diff --git a/packages/grid/syft-client/syft.Dockerfile b/packages/grid/syft-client/syft.Dockerfile index e3d1189a8e8..8f94e38b81b 100644 --- a/packages/grid/syft-client/syft.Dockerfile +++ b/packages/grid/syft-client/syft.Dockerfile @@ -14,8 +14,7 @@ RUN apk update && apk upgrade && \ COPY ./syft /tmp/syft RUN --mount=type=cache,target=/root/.cache,sharing=locked \ - pip install --user jupyterlab==4.1.6 pip-autoremove==0.10.0 /tmp/syft && \ - pip-autoremove ansible ansible-core -y + pip install --user jupyterlab==4.1.6 /tmp/syft # ==================== [Final] Setup Syft Client ==================== # diff --git a/packages/hagrid/hagrid/orchestra.py b/packages/hagrid/hagrid/orchestra.py deleted file mode 100644 index 8826c073841..00000000000 --- a/packages/hagrid/hagrid/orchestra.py +++ /dev/null @@ -1,633 +0,0 @@ -"""Python Level API to launch Docker Containers using Hagrid""" - -# future -from __future__ import annotations - -# stdlib -from collections.abc import Callable -from enum import Enum -import getpass -import inspect -import os -import subprocess # nosec -import sys -from threading import Thread -from typing import Any -from typing import TYPE_CHECKING - -# relative -from .cli import str_to_bool -from .grammar import find_available_port -from .names import random_name -from .util import ImportFromSyft -from .util import NodeSideType -from .util import shell - -DEFAULT_PORT = 8080 -DEFAULT_URL = "http://localhost" -# Gevent used instead of threading module ,as we monkey patch gevent in syft -# and this causes context switch error when we use normal threading in hagrid - -ClientAlias = Any # we don't want to import Client in case it changes - -if TYPE_CHECKING: - NodeType = ImportFromSyft.import_node_type() - - -# Define a function to read and print a stream -def read_stream(stream: subprocess.PIPE) -> None: - while True: - line = stream.readline() - if not line: - break - print(line, end="") - - -def to_snake_case(name: str) -> str: - return name.lower().replace(" ", "_") - - -def get_syft_client() -> Any | None: - try: - # syft absolute - import syft as sy - - return sy - except Exception: # nosec - # print("Please install syft with `pip install syft`") - pass - return None - - -def container_exists(name: str) -> bool: - output = shell(f"docker ps -q -f name='{name}'") - return len(output) > 0 - - -def port_from_container(name: str, deployment_type: DeploymentType) -> int | None: - container_suffix = "" - if deployment_type == DeploymentType.SINGLE_CONTAINER: - container_suffix = "-worker-1" - elif deployment_type == DeploymentType.CONTAINER_STACK: - container_suffix = "-proxy-1" - else: - raise NotImplementedError( - f"port_from_container not implemented for the deployment type:{deployment_type}" - ) - - container_name = name + container_suffix - output = shell(f"docker port {container_name}") - if len(output) > 0: - try: - # 80/tcp -> 0.0.0.0:8080 - lines = output.split("\n") - parts = lines[0].split(":") - port = int(parts[1].strip()) - return port - except Exception: # nosec - return None - return None - - -def container_exists_with(name: str, port: int) -> bool: - output = shell( - f"docker ps -q -f name={name} | xargs -n 1 docker port | grep 0.0.0.0:{port}" - ) - return len(output) > 0 - - -def get_node_type(node_type: str | NodeType | None) -> NodeType | None: - NodeType = ImportFromSyft.import_node_type() - if node_type is None: - node_type = os.environ.get("ORCHESTRA_NODE_TYPE", NodeType.DOMAIN) - try: - return NodeType(node_type) - except ValueError: - print(f"node_type: {node_type} is not a valid NodeType: {NodeType}") - return None - - -def get_deployment_type(deployment_type: str | None) -> DeploymentType | None: - if deployment_type is None: - deployment_type = os.environ.get( - "ORCHESTRA_DEPLOYMENT_TYPE", DeploymentType.PYTHON - ) - - # provide shorthands - if deployment_type == "container": - deployment_type = "container_stack" - - try: - return DeploymentType(deployment_type) - except ValueError: - print( - f"deployment_type: {deployment_type} is not a valid DeploymentType: {DeploymentType}" - ) - return None - - -# Can also be specified by the environment variable -# ORCHESTRA_DEPLOYMENT_TYPE -class DeploymentType(Enum): - PYTHON = "python" - SINGLE_CONTAINER = "single_container" - CONTAINER_STACK = "container_stack" - K8S = "k8s" - PODMAN = "podman" - - -class NodeHandle: - def __init__( - self, - node_type: NodeType, - deployment_type: DeploymentType, - node_side_type: NodeSideType, - name: str, - port: int | None = None, - url: str | None = None, - python_node: Any | None = None, - shutdown: Callable | None = None, - ) -> None: - self.node_type = node_type - self.name = name - self.port = port - self.url = url - self.python_node = python_node - self.shutdown = shutdown - self.deployment_type = deployment_type - self.node_side_type = node_side_type - - @property - def client(self) -> Any: - if self.port: - sy = get_syft_client() - return sy.login_as_guest(url=self.url, port=self.port) # type: ignore - elif self.deployment_type == DeploymentType.PYTHON: - return self.python_node.get_guest_client(verbose=False) # type: ignore - else: - raise NotImplementedError( - f"client not implemented for the deployment type:{self.deployment_type}" - ) - - def login_as_guest(self, **kwargs: Any) -> ClientAlias: - return self.client.login_as_guest(**kwargs) - - def login( - self, email: str | None = None, password: str | None = None, **kwargs: Any - ) -> ClientAlias: - if not email: - email = input("Email: ") - - if not password: - password = getpass.getpass("Password: ") - - return self.client.login(email=email, password=password, **kwargs) - - def register( - self, - name: str, - email: str | None = None, - password: str | None = None, - password_verify: str | None = None, - institution: str | None = None, - website: str | None = None, - ) -> Any: - SyftError = ImportFromSyft.import_syft_error() - if not email: - email = input("Email: ") - if not password: - password = getpass.getpass("Password: ") - if not password_verify: - password_verify = getpass.getpass("Confirm Password: ") - if password != password_verify: - return SyftError(message="Passwords do not match") - - client = self.client - return client.register( - name=name, - email=email, - password=password, - institution=institution, - password_verify=password_verify, - website=website, - ) - - def land(self) -> None: - if self.deployment_type == DeploymentType.PYTHON: - if self.shutdown: - self.shutdown() - else: - Orchestra.land(self.name, deployment_type=self.deployment_type) - - -def deploy_to_python( - node_type_enum: NodeType, - deployment_type_enum: DeploymentType, - port: int | str, - name: str, - host: str, - reset: bool, - tail: bool, - dev_mode: bool, - processes: int, - local_db: bool, - node_side_type: NodeSideType, - enable_warnings: bool, - n_consumers: int, - thread_workers: bool, - create_producer: bool = False, - queue_port: int | None = None, - association_request_auto_approval: bool = False, - background_tasks: bool = False, -) -> NodeHandle | None: - stage_protocol_changes = ImportFromSyft.import_stage_protocol_changes() - NodeType = ImportFromSyft.import_node_type() - sy = get_syft_client() - if sy is None: - return sy - worker_classes = {NodeType.DOMAIN: sy.Domain, NodeType.NETWORK: sy.Gateway} - - # syft >= 0.8.2 - if hasattr(sy, "Enclave"): - worker_classes[NodeType.ENCLAVE] = sy.Enclave - if hasattr(NodeType, "GATEWAY"): - worker_classes[NodeType.GATEWAY] = sy.Gateway - - if dev_mode: - print("Staging Protocol Changes...") - stage_protocol_changes() - - kwargs = { - "name": name, - "host": host, - "port": port, - "reset": reset, - "processes": processes, - "dev_mode": dev_mode, - "tail": tail, - "node_type": node_type_enum, - "node_side_type": node_side_type, - "enable_warnings": enable_warnings, - # new kwargs - "queue_port": queue_port, - "n_consumers": n_consumers, - "create_producer": create_producer, - "association_request_auto_approval": association_request_auto_approval, - "background_tasks": background_tasks, - } - - if port: - kwargs["in_memory_workers"] = True - if port == "auto": - # dont use default port to prevent port clashes in CI - port = find_available_port(host="localhost", port=None, search=True) - kwargs["port"] = port - - sig = inspect.signature(sy.serve_node) - supported_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters} - - start, stop = sy.serve_node(**supported_kwargs) - start() - return NodeHandle( - node_type=node_type_enum, - deployment_type=deployment_type_enum, - name=name, - port=port, - url="http://localhost", - shutdown=stop, - node_side_type=node_side_type, - ) - else: - kwargs["local_db"] = local_db - kwargs["thread_workers"] = thread_workers - if node_type_enum in worker_classes: - worker_class = worker_classes[node_type_enum] - sig = inspect.signature(worker_class.named) - supported_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters} - if "node_type" in sig.parameters.keys() and "migrate" in sig.parameters: - supported_kwargs["migrate"] = True - worker = worker_class.named(**supported_kwargs) - else: - raise NotImplementedError(f"node_type: {node_type_enum} is not supported") - - def stop() -> None: - worker.stop() - - return NodeHandle( - node_type=node_type_enum, - deployment_type=deployment_type_enum, - name=name, - python_node=worker, - node_side_type=node_side_type, - shutdown=stop, - ) - - -def deploy_to_k8s( - node_type_enum: NodeType, - deployment_type_enum: DeploymentType, - name: str, - node_side_type: NodeSideType, -) -> NodeHandle: - node_port = int(os.environ.get("NODE_PORT", f"{DEFAULT_PORT}")) - node_url = str(os.environ.get("NODE_URL", f"{DEFAULT_URL}")) - return NodeHandle( - node_type=node_type_enum, - deployment_type=deployment_type_enum, - name=name, - port=node_port, - url=node_url, - node_side_type=node_side_type, - ) - - -def deploy_to_podman( - node_type_enum: NodeType, - deployment_type_enum: DeploymentType, - name: str, - node_side_type: NodeSideType, -) -> NodeHandle: - node_port = int(os.environ.get("NODE_PORT", f"{DEFAULT_PORT}")) - return NodeHandle( - node_type=node_type_enum, - deployment_type=deployment_type_enum, - name=name, - port=node_port, - url="http://localhost", - node_side_type=node_side_type, - ) - - -def deploy_to_container( - node_type_enum: NodeType, - deployment_type_enum: DeploymentType, - node_side_type: NodeSideType, - reset: bool, - cmd: bool, - tail: bool, - verbose: bool, - tag: str, - render: bool, - dev_mode: bool, - port: int | str, - name: str, - enable_warnings: bool, - in_memory_workers: bool, - association_request_auto_approval: bool = False, -) -> NodeHandle | None: - if port == "auto" or port is None: - if container_exists(name=name): - port = port_from_container(name=name, deployment_type=deployment_type_enum) # type: ignore - else: - port = find_available_port(host="localhost", port=DEFAULT_PORT, search=True) - - # Currently by default we launch in dev mode - if reset: - Orchestra.reset(name, deployment_type_enum) - else: - if container_exists_with(name=name, port=port): - return NodeHandle( - node_type=node_type_enum, - deployment_type=deployment_type_enum, - name=name, - port=port, - url="http://localhost", - node_side_type=node_side_type, - ) - - # Start a subprocess and capture its output - commands = ["hagrid", "launch"] - - name = random_name() if not name else name - commands.extend([name, node_type_enum.value]) - - commands.append("to") - commands.append(f"docker:{port}") - - if dev_mode: - commands.append("--dev") - - if not enable_warnings: - commands.append("--no-warnings") - - if node_side_type.lower() == NodeSideType.LOW_SIDE.value.lower(): - commands.append("--low-side") - - if in_memory_workers: - commands.append("--in-mem-workers") - - # by default , we deploy as container stack - if deployment_type_enum == DeploymentType.SINGLE_CONTAINER: - commands.append("--deployment-type=single_container") - - if association_request_auto_approval: - commands.append("--enable-association-auto-approval") - - if cmd: - commands.append("--cmd") - - if tail: - commands.append("--tail") - - if verbose: - commands.append("--verbose") - - if tag: - commands.append(f"--tag={tag}") - - if render: - commands.append("--render") - - # needed for building containers - USER = os.environ.get("USER", getpass.getuser()) - env = os.environ.copy() - env["USER"] = USER - - process = subprocess.Popen( # nosec - commands, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True, env=env - ) - # Start threads to read and print the output and error streams - stdout_thread = Thread(target=read_stream, args=(process.stdout,)) - stderr_thread = Thread(target=read_stream, args=(process.stderr,)) - # todo, raise errors - stdout_thread.start() - stderr_thread.start() - stdout_thread.join() - stderr_thread.join() - - if not cmd: - return NodeHandle( - node_type=node_type_enum, - deployment_type=deployment_type_enum, - name=name, - port=port, - url="http://localhost", - node_side_type=node_side_type, - ) - return None - - -class Orchestra: - @staticmethod - def launch( - # node information and deployment - name: str | None = None, - node_type: str | NodeType | None = None, - deploy_to: str | None = None, - node_side_type: str | None = None, - # worker related inputs - port: int | str | None = None, - processes: int = 1, # temporary work around for jax in subprocess - local_db: bool = False, - dev_mode: bool = False, - cmd: bool = False, - reset: bool = False, - tail: bool = False, - host: str | None = "0.0.0.0", # nosec - tag: str | None = "latest", - verbose: bool = False, - render: bool = False, - enable_warnings: bool = False, - n_consumers: int = 0, - thread_workers: bool = False, - create_producer: bool = False, - queue_port: int | None = None, - in_memory_workers: bool = True, - association_request_auto_approval: bool = False, - background_tasks: bool = False, - ) -> NodeHandle | None: - NodeType = ImportFromSyft.import_node_type() - os.environ["DEV_MODE"] = str(dev_mode) - if dev_mode is True: - thread_workers = True - - # syft 0.8.1 - if node_type == "python": - node_type = NodeType.DOMAIN - if deploy_to is None: - deploy_to = "python" - - dev_mode = str_to_bool(os.environ.get("DEV_MODE", f"{dev_mode}")) - - node_type_enum: NodeType | None = get_node_type(node_type=node_type) - - node_side_type_enum = ( - NodeSideType.HIGH_SIDE - if node_side_type is None - else NodeSideType(node_side_type) - ) - - deployment_type_enum: DeploymentType | None = get_deployment_type( - deployment_type=deploy_to - ) - if not deployment_type_enum: - return None - - if deployment_type_enum == DeploymentType.PYTHON: - return deploy_to_python( - node_type_enum=node_type_enum, - deployment_type_enum=deployment_type_enum, - port=port, - name=name, - host=host, - reset=reset, - tail=tail, - dev_mode=dev_mode, - processes=processes, - local_db=local_db, - node_side_type=node_side_type_enum, - enable_warnings=enable_warnings, - n_consumers=n_consumers, - thread_workers=thread_workers, - create_producer=create_producer, - queue_port=queue_port, - association_request_auto_approval=association_request_auto_approval, - background_tasks=background_tasks, - ) - - elif deployment_type_enum == DeploymentType.K8S: - return deploy_to_k8s( - node_type_enum=node_type_enum, - deployment_type_enum=deployment_type_enum, - name=name, - node_side_type=node_side_type_enum, - ) - - elif ( - deployment_type_enum == DeploymentType.CONTAINER_STACK - or deployment_type_enum == DeploymentType.SINGLE_CONTAINER - ): - return deploy_to_container( - node_type_enum=node_type_enum, - deployment_type_enum=deployment_type_enum, - reset=reset, - cmd=cmd, - tail=tail, - verbose=verbose, - tag=tag, - render=render, - dev_mode=dev_mode, - port=port, - name=name, - node_side_type=node_side_type_enum, - enable_warnings=enable_warnings, - in_memory_workers=in_memory_workers, - association_request_auto_approval=association_request_auto_approval, - ) - elif deployment_type_enum == DeploymentType.PODMAN: - return deploy_to_podman( - node_type_enum=node_type_enum, - deployment_type_enum=deployment_type_enum, - name=name, - node_side_type=node_side_type_enum, - ) - # else: - # print(f"deployment_type: {deployment_type_enum} is not supported") - # return None - - @staticmethod - def land( - name: str, deployment_type: str | DeploymentType, reset: bool = False - ) -> None: - deployment_type_enum = DeploymentType(deployment_type) - Orchestra.shutdown(name=name, deployment_type_enum=deployment_type_enum) - if reset: - Orchestra.reset(name, deployment_type_enum=deployment_type_enum) - - @staticmethod - def shutdown( - name: str, deployment_type_enum: DeploymentType, reset: bool = False - ) -> None: - if deployment_type_enum != DeploymentType.PYTHON: - snake_name = to_snake_case(name) - - if reset: - land_output = shell(f"hagrid land {snake_name} --force --prune-vol") - else: - land_output = shell(f"hagrid land {snake_name} --force") - if "Removed" in land_output: - print(f" ✅ {snake_name} Container Removed") - elif "No resource found to remove for project" in land_output: - print(f" ✅ {snake_name} Container does not exist") - else: - print( - f"❌ Unable to remove container: {snake_name} :{land_output}", - file=sys.stderr, - ) - - @staticmethod - def reset(name: str, deployment_type_enum: DeploymentType) -> None: - if deployment_type_enum == DeploymentType.PYTHON: - sy = get_syft_client() - _ = sy.Worker.named(name=name, processes=1, reset=True) # type: ignore - elif ( - deployment_type_enum == DeploymentType.CONTAINER_STACK - or deployment_type_enum == DeploymentType.SINGLE_CONTAINER - ): - Orchestra.shutdown( - name=name, deployment_type_enum=deployment_type_enum, reset=True - ) - else: - raise NotImplementedError( - f"Reset not implemented for the deployment type:{deployment_type_enum}" - ) diff --git a/packages/syft/setup.cfg b/packages/syft/setup.cfg index fec606a5685..6f433aa0424 100644 --- a/packages/syft/setup.cfg +++ b/packages/syft/setup.cfg @@ -51,8 +51,7 @@ syft = sherlock[filelock]==0.4.1 uvicorn[standard]==0.29.0 fastapi==0.111.0 - psutil>=5.9.5,<5.9.9 - hagrid>=0.3 + psutil==5.9.8 itables==1.7.1 argon2-cffi==23.1.0 matplotlib>=3.7.1,<3.9.1 @@ -68,6 +67,7 @@ syft = PyYAML==6.0.1 azure-storage-blob==12.19.1 ipywidgets==8.1.2 + rich==13.7.1 jinja2==3.1.4 tenacity==8.3.0 diff --git a/packages/syft/src/syft/__init__.py b/packages/syft/src/syft/__init__.py index f0dd9e427d5..3aae76848dc 100644 --- a/packages/syft/src/syft/__init__.py +++ b/packages/syft/src/syft/__init__.py @@ -16,7 +16,6 @@ from .client.client import login # noqa: F401 from .client.client import login_as_guest # noqa: F401 from .client.client import register # noqa: F401 -from .client.deploy import Orchestra # noqa: F401 from .client.domain_client import DomainClient # noqa: F401 from .client.gateway_client import GatewayClient # noqa: F401 from .client.registry import DomainRegistry # noqa: F401 @@ -34,6 +33,7 @@ from .node.server import serve_node # noqa: F401 from .node.server import serve_node as bind_worker # noqa: F401 from .node.worker import Worker # noqa: F401 +from .orchestra import Orchestra as orchestra # noqa: F401 from .protocol.data_protocol import bump_protocol_version # noqa: F401 from .protocol.data_protocol import check_or_stage_protocol # noqa: F401 from .protocol.data_protocol import get_data_protocol # noqa: F401 @@ -232,11 +232,6 @@ def _settings() -> UserSettings: return settings -@module_property -def _orchestra() -> Orchestra: - return Orchestra - - @module_property def hello_baby() -> None: print("Hello baby!") diff --git a/packages/syft/src/syft/client/deploy.py b/packages/syft/src/syft/client/deploy.py deleted file mode 100644 index bd19895ced5..00000000000 --- a/packages/syft/src/syft/client/deploy.py +++ /dev/null @@ -1,33 +0,0 @@ -# stdlib -from typing import Any - -# relative -from ..service.response import SyftError - - -class InstallOrchestra: - def launch(self, *args: Any, **kwargs: Any) -> None: - return self.error() - - def error(self) -> Any: - message = "Please install hagrid with `pip install -U hagrid`" - return SyftError(message=message) - - def _repr_html_(self) -> str: - return self.error()._repr_html_() - - -def import_orchestra() -> Any: - try: - # third party - from hagrid import Orchestra - - return Orchestra - - except Exception as e: # nosec - print(e) - pass - return InstallOrchestra() - - -Orchestra = import_orchestra() diff --git a/packages/syft/src/syft/client/domain_client.py b/packages/syft/src/syft/client/domain_client.py index 4843cf9d41d..75ec142bfde 100644 --- a/packages/syft/src/syft/client/domain_client.py +++ b/packages/syft/src/syft/client/domain_client.py @@ -8,7 +8,6 @@ from typing import cast # third party -from hagrid.orchestra import NodeHandle from loguru import logger from tqdm import tqdm @@ -43,6 +42,7 @@ if TYPE_CHECKING: # relative + from ..orchestra import NodeHandle from ..service.project.project import Project diff --git a/packages/syft/src/syft/client/enclave_client.py b/packages/syft/src/syft/client/enclave_client.py index ddf61c30f86..32eebdf3189 100644 --- a/packages/syft/src/syft/client/enclave_client.py +++ b/packages/syft/src/syft/client/enclave_client.py @@ -5,9 +5,6 @@ from typing import Any from typing import TYPE_CHECKING -# third party -from hagrid.orchestra import NodeHandle - # relative from ..abstract_node import NodeSideType from ..client.api import APIRegistry @@ -29,6 +26,7 @@ if TYPE_CHECKING: # relative + from ..orchestra import NodeHandle from ..service.code.user_code import SubmitUserCode diff --git a/packages/syft/src/syft/node/run.py b/packages/syft/src/syft/node/run.py index d82d88c9a97..5d731d48fd5 100644 --- a/packages/syft/src/syft/node/run.py +++ b/packages/syft/src/syft/node/run.py @@ -1,11 +1,9 @@ # stdlib import argparse -# third party -from hagrid.orchestra import NodeHandle - # relative -from ..client.deploy import Orchestra +from ..orchestra import NodeHandle +from ..orchestra import Orchestra def str_to_bool(bool_str: str | None) -> bool: @@ -71,16 +69,8 @@ def run() -> NodeHandle | None: default="True", dest="tail", ) - parser.add_argument( - "--cmd", - help="cmd mode", - type=str, - default="False", - dest="cmd", - ) args = parser.parse_args() - if args.command != "launch": print("syft launch is the only command currently supported") @@ -100,7 +90,6 @@ def run() -> NodeHandle | None: local_db=args.local_db, processes=args.processes, tail=args.tail, - cmd=args.cmd, ) if not args.tail: return node diff --git a/packages/syft/src/syft/orchestra.py b/packages/syft/src/syft/orchestra.py new file mode 100644 index 00000000000..1a08f594aa2 --- /dev/null +++ b/packages/syft/src/syft/orchestra.py @@ -0,0 +1,331 @@ +"""Python Level API to launch Syft services.""" + +# future +from __future__ import annotations + +# stdlib +from collections.abc import Callable +from enum import Enum +import getpass +import inspect +import os +import sys +from typing import Any + +# relative +from .abstract_node import NodeSideType +from .abstract_node import NodeType +from .client.client import login_as_guest as sy_login_as_guest +from .node.domain import Domain +from .node.enclave import Enclave +from .node.gateway import Gateway +from .node.server import serve_node +from .protocol.data_protocol import stage_protocol_changes +from .service.response import SyftError +from .util.util import get_random_available_port + +DEFAULT_PORT = 8080 +DEFAULT_URL = "http://localhost" + +ClientAlias = Any # we don't want to import Client in case it changes + + +def get_node_type(node_type: str | NodeType | None) -> NodeType | None: + if node_type is None: + node_type = os.environ.get("ORCHESTRA_NODE_TYPE", NodeType.DOMAIN) + try: + return NodeType(node_type) + except ValueError: + print(f"node_type: {node_type} is not a valid NodeType: {NodeType}") + return None + + +def get_deployment_type(deployment_type: str | None) -> DeploymentType | None: + if deployment_type is None: + deployment_type = os.environ.get( + "ORCHESTRA_DEPLOYMENT_TYPE", DeploymentType.PYTHON + ) + + try: + return DeploymentType(deployment_type) + except ValueError: + print( + f"deployment_type: {deployment_type} is not a valid DeploymentType: {DeploymentType}" + ) + return None + + +# Can also be specified by the environment variable +# ORCHESTRA_DEPLOYMENT_TYPE +class DeploymentType(Enum): + PYTHON = "python" + REMOTE = "remote" + + +class NodeHandle: + def __init__( + self, + node_type: NodeType, + deployment_type: DeploymentType, + node_side_type: NodeSideType, + name: str, + port: int | None = None, + url: str | None = None, + python_node: Any | None = None, + shutdown: Callable | None = None, + ) -> None: + self.node_type = node_type + self.name = name + self.port = port + self.url = url + self.python_node = python_node + self.shutdown = shutdown + self.deployment_type = deployment_type + self.node_side_type = node_side_type + + @property + def client(self) -> Any: + if self.port: + return sy_login_as_guest(url=self.url, port=self.port) # type: ignore + elif self.deployment_type == DeploymentType.PYTHON: + return self.python_node.get_guest_client(verbose=False) # type: ignore + else: + raise NotImplementedError( + f"client not implemented for the deployment type:{self.deployment_type}" + ) + + def login_as_guest(self, **kwargs: Any) -> ClientAlias: + return self.client.login_as_guest(**kwargs) + + def login( + self, email: str | None = None, password: str | None = None, **kwargs: Any + ) -> ClientAlias: + if not email: + email = input("Email: ") + + if not password: + password = getpass.getpass("Password: ") + + return self.client.login(email=email, password=password, **kwargs) + + def register( + self, + name: str, + email: str | None = None, + password: str | None = None, + password_verify: str | None = None, + institution: str | None = None, + website: str | None = None, + ) -> Any: + if not email: + email = input("Email: ") + if not password: + password = getpass.getpass("Password: ") + if not password_verify: + password_verify = getpass.getpass("Confirm Password: ") + if password != password_verify: + return SyftError(message="Passwords do not match") + + client = self.client + return client.register( + name=name, + email=email, + password=password, + institution=institution, + password_verify=password_verify, + website=website, + ) + + def land(self) -> None: + if self.deployment_type == DeploymentType.PYTHON: + if self.shutdown: + self.shutdown() + else: + print( + f"Shutdown not implemented for the deployment type:{self.deployment_type}", + file=sys.stderr, + ) + + +def deploy_to_python( + node_type_enum: NodeType, + deployment_type_enum: DeploymentType, + port: int | str, + name: str, + host: str, + reset: bool, + tail: bool, + dev_mode: bool, + processes: int, + local_db: bool, + node_side_type: NodeSideType, + enable_warnings: bool, + n_consumers: int, + thread_workers: bool, + create_producer: bool = False, + queue_port: int | None = None, + association_request_auto_approval: bool = False, + background_tasks: bool = False, +) -> NodeHandle: + worker_classes = { + NodeType.DOMAIN: Domain, + NodeType.GATEWAY: Gateway, + NodeType.ENCLAVE: Enclave, + } + + if dev_mode: + print("Staging Protocol Changes...") + stage_protocol_changes() + + kwargs = { + "name": name, + "host": host, + "port": port, + "reset": reset, + "processes": processes, + "dev_mode": dev_mode, + "tail": tail, + "node_type": node_type_enum, + "node_side_type": node_side_type, + "enable_warnings": enable_warnings, + "queue_port": queue_port, + "n_consumers": n_consumers, + "create_producer": create_producer, + "association_request_auto_approval": association_request_auto_approval, + "background_tasks": background_tasks, + } + + if port: + kwargs["in_memory_workers"] = True + if port == "auto": + port = get_random_available_port() + kwargs["port"] = port + + sig = inspect.signature(serve_node) + supported_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters} + + start, stop = serve_node(**supported_kwargs) + start() + return NodeHandle( + node_type=node_type_enum, + deployment_type=deployment_type_enum, + name=name, + port=port, + url="http://localhost", + shutdown=stop, + node_side_type=node_side_type, + ) + else: + kwargs["local_db"] = local_db + kwargs["thread_workers"] = thread_workers + if node_type_enum in worker_classes: + worker_class = worker_classes[node_type_enum] + sig = inspect.signature(worker_class.named) + supported_kwargs = {k: v for k, v in kwargs.items() if k in sig.parameters} + if "node_type" in sig.parameters.keys() and "migrate" in sig.parameters: + supported_kwargs["migrate"] = True + worker = worker_class.named(**supported_kwargs) + else: + raise NotImplementedError(f"node_type: {node_type_enum} is not supported") + + def stop() -> None: + worker.stop() + + return NodeHandle( + node_type=node_type_enum, + deployment_type=deployment_type_enum, + name=name, + python_node=worker, + node_side_type=node_side_type, + shutdown=stop, + ) + + +def deploy_to_remote( + node_type_enum: NodeType, + deployment_type_enum: DeploymentType, + name: str, + node_side_type: NodeSideType, +) -> NodeHandle: + node_port = int(os.environ.get("NODE_PORT", f"{DEFAULT_PORT}")) + node_url = str(os.environ.get("NODE_URL", f"{DEFAULT_URL}")) + return NodeHandle( + node_type=node_type_enum, + deployment_type=deployment_type_enum, + name=name, + port=node_port, + url=node_url, + node_side_type=node_side_type, + ) + + +class Orchestra: + @staticmethod + def launch( + # node information and deployment + name: str | None = None, + node_type: str | NodeType | None = None, + deploy_to: str | None = None, + node_side_type: str | None = None, + # worker related inputs + port: int | str | None = None, + processes: int = 1, # temporary work around for jax in subprocess + local_db: bool = False, + dev_mode: bool = False, + reset: bool = False, + tail: bool = False, + host: str | None = "0.0.0.0", # nosec + enable_warnings: bool = False, + n_consumers: int = 0, + thread_workers: bool = False, + create_producer: bool = False, + queue_port: int | None = None, + association_request_auto_approval: bool = False, + background_tasks: bool = False, + ) -> NodeHandle: + if dev_mode is True: + thread_workers = True + os.environ["DEV_MODE"] = str(dev_mode) + + node_type_enum: NodeType | None = get_node_type(node_type=node_type) + node_side_type_enum = ( + NodeSideType.HIGH_SIDE + if node_side_type is None + else NodeSideType(node_side_type) + ) + + deployment_type_enum: DeploymentType | None = get_deployment_type( + deployment_type=deploy_to + ) + + if deployment_type_enum == DeploymentType.PYTHON: + return deploy_to_python( + node_type_enum=node_type_enum, + deployment_type_enum=deployment_type_enum, + port=port, + name=name, + host=host, + reset=reset, + tail=tail, + dev_mode=dev_mode, + processes=processes, + local_db=local_db, + node_side_type=node_side_type_enum, + enable_warnings=enable_warnings, + n_consumers=n_consumers, + thread_workers=thread_workers, + create_producer=create_producer, + queue_port=queue_port, + association_request_auto_approval=association_request_auto_approval, + background_tasks=background_tasks, + ) + elif deployment_type_enum == DeploymentType.REMOTE: + return deploy_to_remote( + node_type_enum=node_type_enum, + deployment_type_enum=deployment_type_enum, + name=name, + node_side_type=node_side_type_enum, + ) + raise NotImplementedError( + f"deployment_type: {deployment_type_enum} is not supported" + ) diff --git a/packages/syft/src/syft/service/code/user_code.py b/packages/syft/src/syft/service/code/user_code.py index b840f94f83a..e68436a47b0 100644 --- a/packages/syft/src/syft/service/code/user_code.py +++ b/packages/syft/src/syft/service/code/user_code.py @@ -825,7 +825,7 @@ def _ephemeral_node_call( **kwargs: Any, ) -> Any: # relative - from ... import _orchestra + from ...orchestra import Orchestra # Right now we only create a number of workers # In the future we might need to have the same pools/images as well @@ -839,7 +839,7 @@ def _ephemeral_node_call( time_alive = 300 # This could be changed given the work on containers - ep_node = _orchestra().launch( + ep_node = Orchestra.launch( name=f"ephemeral_node_{self.func_name}_{random.randint(a=0, b=10000)}", # nosec reset=True, create_producer=True, diff --git a/packages/syft/src/syft/util/util.py b/packages/syft/src/syft/util/util.py index fd2c4dc4074..b0affa2b1a0 100644 --- a/packages/syft/src/syft/util/util.py +++ b/packages/syft/src/syft/util/util.py @@ -19,6 +19,7 @@ import os from pathlib import Path import platform +import random import re from secrets import randbelow import socket @@ -309,7 +310,11 @@ def print_dynamic_log( return (finish, success) -def find_available_port(host: str, port: int, search: bool = False) -> int: +def find_available_port( + host: str, port: int | None = None, search: bool = False +) -> int: + if port is None: + port = random.randint(1500, 65000) # nosec port_available = False while not port_available: try: @@ -324,6 +329,7 @@ def find_available_port(host: str, port: int, search: bool = False) -> int: port += 1 else: break + sock.close() except Exception as e: print(f"Failed to check port {port}. {e}") @@ -338,6 +344,18 @@ def find_available_port(host: str, port: int, search: bool = False) -> int: return port +def get_random_available_port() -> int: + """Retrieve a random available port number from the host OS. + + Returns + ------- + int: Available port number. + """ + with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as soc: + soc.bind(("localhost", 0)) + return soc.getsockname()[1] + + def get_loaded_syft() -> ModuleType: return sys.modules[__name__.split(".")[0]] diff --git a/packages/syft/tests/syft/zmq_queue_test.py b/packages/syft/tests/syft/zmq_queue_test.py index 9b22ac7d260..8c5b8dedebe 100644 --- a/packages/syft/tests/syft/zmq_queue_test.py +++ b/packages/syft/tests/syft/zmq_queue_test.py @@ -21,9 +21,7 @@ from syft.service.response import SyftError from syft.service.response import SyftSuccess from syft.util.util import get_queue_address - -# relative -from ..utils.random_port import get_random_port +from syft.util.util import get_random_available_port @pytest.fixture @@ -118,7 +116,7 @@ def handle_message(message: bytes, *args, **kwargs): @pytest.fixture def producer(): - pub_port = get_random_port() + pub_port = get_random_available_port() QueueName = token_hex(8) # Create a producer diff --git a/packages/syft/tests/utils/random_port.py b/packages/syft/tests/utils/random_port.py deleted file mode 100644 index c3370694afb..00000000000 --- a/packages/syft/tests/utils/random_port.py +++ /dev/null @@ -1,8 +0,0 @@ -# stdlib -import socket - - -def get_random_port(): - soc = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - soc.bind(("", 0)) - return soc.getsockname()[1] diff --git a/tests/integration/conftest.py b/tests/integration/conftest.py index f6ccf94f32c..9152038b1f7 100644 --- a/tests/integration/conftest.py +++ b/tests/integration/conftest.py @@ -51,7 +51,6 @@ def full_low_worker(n_consumers: int = 3, create_producer: bool = True) -> Worke n_consumers=n_consumers, create_producer=create_producer, queue_port=None, - in_memory_workers=True, local_db=False, thread_workers=False, ) @@ -72,7 +71,6 @@ def full_high_worker(n_consumers: int = 3, create_producer: bool = True) -> Work n_consumers=n_consumers, create_producer=create_producer, queue_port=None, - in_memory_workers=True, local_db=False, thread_workers=False, ) diff --git a/tests/integration/local/request_multiple_nodes_test.py b/tests/integration/local/request_multiple_nodes_test.py index 601988673dc..e81f75b57d6 100644 --- a/tests/integration/local/request_multiple_nodes_test.py +++ b/tests/integration/local/request_multiple_nodes_test.py @@ -21,7 +21,6 @@ def node_1(): local_db=True, create_producer=True, n_consumers=1, - in_memory_workers=True, queue_port=None, ) yield node @@ -39,7 +38,6 @@ def node_2(): local_db=True, create_producer=True, n_consumers=1, - in_memory_workers=True, queue_port=None, ) yield node diff --git a/tests/integration/local/syft_function_test.py b/tests/integration/local/syft_function_test.py index 6ca60f3b90d..8cc85cce4e2 100644 --- a/tests/integration/local/syft_function_test.py +++ b/tests/integration/local/syft_function_test.py @@ -23,7 +23,6 @@ def node(): n_consumers=3, create_producer=True, queue_port=None, - in_memory_workers=True, local_db=False, ) # startup code here diff --git a/tox.ini b/tox.ini index ac5c47ea65c..f3763f23d0c 100644 --- a/tox.ini +++ b/tox.ini @@ -421,7 +421,6 @@ commands = description = Syft Notebook Tests deps = -e{toxinidir}/packages/syft[dev,data_science] - {[testenv:hagrid]deps} nbmake changedir = {toxinidir}/notebooks allowlist_externals = @@ -496,14 +495,13 @@ changedir = {toxinidir}/notebooks allowlist_externals = bash setenv = - ORCHESTRA_DEPLOYMENT_TYPE = {env:ORCHESTRA_DEPLOYMENT_TYPE:k8s} + ORCHESTRA_DEPLOYMENT_TYPE = {env:ORCHESTRA_DEPLOYMENT_TYPE:remote} DEV_MODE = {env:DEV_MODE:True} TEST_NOTEBOOK_PATHS = {env:TEST_NOTEBOOK_PATHS:api/0.8} ENABLE_SIGNUP=True commands = # Volume cleanup - bash -c 'hagrid land all --force --prune-vol || true' bash -c 'docker volume rm -f $(docker volume ls -q --filter "label=orgs.openmined.syft") || true' bash -c 'docker volume rm -f $(docker volume ls -q --filter "label=com.docker.volume.anonymous") || true' bash -c 'docker network rm -f $(docker network ls -q --filter "label=orgs.openmined.syft") || true' @@ -519,7 +517,6 @@ commands = ; pytest -x --nbmake --nbmake-timeout=1000 tutorials -p no:randomly -vvvv ; pytest -x --nbmake --nbmake-timeout=1000 tutorials/pandas-cookbook -p no:randomly -vvvv - bash -c 'hagrid land all --force --prune-vol' bash -c 'docker volume rm -f $(docker volume ls -q --filter "label=orgs.openmined.syft") || true' bash -c 'docker volume rm -f $(docker volume ls -q --filter "label=com.docker.volume.anonymous") || true' bash -c 'docker network rm -f $(docker network ls -q --filter "label=orgs.openmined.syft") || true' @@ -788,7 +785,7 @@ allowlist_externals = echo tox setenv = - ORCHESTRA_DEPLOYMENT_TYPE = {env:ORCHESTRA_DEPLOYMENT_TYPE:k8s} + ORCHESTRA_DEPLOYMENT_TYPE = {env:ORCHESTRA_DEPLOYMENT_TYPE:remote} GITHUB_CI = {env:GITHUB_CI:false} SYFT_BASE_IMAGE_REGISTRY = {env:SYFT_BASE_IMAGE_REGISTRY:k3d-registry.localhost:5800} DOMAIN_CLUSTER_NAME = {env:DOMAIN_CLUSTER_NAME:test-domain-1} @@ -895,7 +892,7 @@ allowlist_externals = bash tox setenv = - ORCHESTRA_DEPLOYMENT_TYPE = {env:ORCHESTRA_DEPLOYMENT_TYPE:k8s} + ORCHESTRA_DEPLOYMENT_TYPE = {env:ORCHESTRA_DEPLOYMENT_TYPE:remote} NODE_PORT = {env:NODE_PORT:8080} NODE_URL = {env:NODE_URL:http://localhost} EXCLUDE_NOTEBOOKS = {env:EXCLUDE_NOTEBOOKS:not 10-container-images.ipynb} @@ -1097,7 +1094,7 @@ commands = tox -e dev.k8s.{posargs:deploy} [testenv:dev.k8s.launch.enclave] -description = Launch a single Enclave on K8s +description = Launch a single Enclave on K8s passenv = HOME, USER setenv= CLUSTER_NAME = {env:CLUSTER_NAME:test-enclave-1} @@ -1199,7 +1196,7 @@ allowlist_externals = pytest passenv = EXTERNAL_REGISTRY,EXTERNAL_REGISTRY_USERNAME,EXTERNAL_REGISTRY_PASSWORD setenv = - ORCHESTRA_DEPLOYMENT_TYPE = {env:ORCHESTRA_DEPLOYMENT_TYPE:k8s} + ORCHESTRA_DEPLOYMENT_TYPE = {env:ORCHESTRA_DEPLOYMENT_TYPE:remote} NODE_PORT = {env:NODE_PORT:8080} NODE_URL = {env:NODE_URL:http://localhost} EXCLUDE_NOTEBOOKS = {env:EXCLUDE_NOTEBOOKS:}